Newer
Older
"""
Type hint classes for the representation of the json state.
"""
from datetime import datetime
import networkx
import numpy as np
import numpy.typing as npt
from networkx import Graph
from pydantic import BaseModel
from typing_extensions import Literal, TypedDict
class OrderState(TypedDict):
id: str
"""UUID of the order."""
category: Literal["Order"]
"""Object category of env output, should be "Order"."""
meal: str
start_time: datetime | str # isoformat str
max_duration: float
"""Maximum duration of the order until it should be served."""
class EffectState(TypedDict):
"""Format of the state representation of an effect (fire)."""
"""UUID of th effect."""
progress_percentage: float | int
"""Display the "inverse" of the percentage: 1 - progress_percentage."""
class ItemState(TypedDict):
id: str
"""UUID of the item."""
category: Literal["Item"] | Literal["ItemCookingEquipment"]
"""Object category of env output, should be "Item" or "ItemCookingEquipment"."""
type: str
"""Type of the item, "Tomato", ..."""
progress_percentage: float | int
"""Display the "inverse" of the percentage: 1 - progress_percentage."""
active_effects: list[EffectState]
"""Active effects of the item, e.g., fire."""
# add ItemType Meal ?
class CookingEquipmentState(ItemState):
"""Format of the state representation of cooking equipment."""
content_list: list[ItemState]
"""Items in/on the the cooking equipment."""
content_ready: None | ItemState
"""Utility attribute, to show meals that would be ready to be served. But could also be transformed to other
meals. (Therefore, keeping the `content_list`.) If not None, you can display the `content_ready` meal instead of
the `content_list`"""
class CounterState(TypedDict):
id: str
"""UUID of the counter."""
category: Literal["Counter"]
"""Object category of env output, should be "Counter"."""
type: str
pos: list[float]
"""2D Position of the counter."""
"""Orientation / facing direction of the counter, currently only visual."""
occupied_by: None | list[
ItemState | CookingEquipmentState
] | ItemState | CookingEquipmentState
"""What is occupying the counter, what is lying/standing on it."""
active_effects: list[EffectState]
# list[ItemState] -> type in ["Sink", "PlateDispenser"]
class PlayerState(TypedDict):
id: str
pos: list[float]
"""2D Position of the player."""
facing_direction: list[float]
holding: ItemState | CookingEquipmentState | None
current_nearest_counter_pos: list[float] | None
"""2D Position of the currently nearest counter to the player."""
current_nearest_counter_id: str | None
class KitchenInfo(BaseModel):
"""Format of the state representation of basic information of the kitchen."""
width: float
"""Width of the kitchen. One counter counts as one unit of measurement."""
height: float
"""Height of the kitchen. One counter counts as one unit of measurement."""
class ViewRestriction(BaseModel):
"""Format of the state representation of a view restriction from the players perspectives.
Currently, as a view cone, like a flashlight in the dark."""
direction: list[float]
"""2D Direction of what the player can see."""
position: list[float]
"""2D Starting position of the view cone."""
angle: int # degrees
counter_mask: None | list[bool]
"""Mask of which counters are in view. Currently unused."""
range: float | None
"""Level of importance of the messages displayed to the players."""
"""Standard text, maybe black font color."""
"""Warning text, should display emergency. Maybe red font color."""
"""Something was good. Maybe green font color."""
class InfoMsg(TypedDict):
"""End time of the message, when the message will disappear."""
class StateRepresentation(BaseModel):
"""The format of the returned state representation."""
players: list[PlayerState]
counters: list[CounterState]
kitchen: KitchenInfo
"""General information about the kitchen: Width, Height."""
score: float | int
"""The total score achieved by the players."""
orders: list[OrderState]
ended: bool
env_time: datetime # isoformat str
"""Current time of the environment. Each environment start time is 2000-01-01T00:00:00. See
`cooperative_cuisine.utils.create_init_env_time`."""
remaining_time: float
"""Remaining seconds for the players to act in the environment."""
view_restrictions: None | list[ViewRestriction]
served_meals: list[tuple[str, str]]
info_msg: list[tuple[str, str]]
"""Added by the game server, indicate if all players are ready and actions are passed to the environment."""
def astar_heuristic(x, y):
"""Heuristic distance function used in astar algorithm."""
return np.linalg.norm(np.array(x) - np.array(y))
def create_movement_graph(state: StateRepresentation, diagonal=True) -> Graph:
"""
Creates a graph which represents the connections of empty kitchen tiles and such
possible coarse movements of an agent.
Args:
state: State representation to determine the graph to.
diagonal: if True use 8 way connection, i.e. diagonal connections between the spaces.
Returns: Graph representing the connections between empty kitchen tiles.
"""
width, height = state["kitchen"]["width"], state["kitchen"]["height"]
free_space = np.ones((width, height), dtype=bool)
for counter in state["counters"]:
grid_idx = np.array(counter["pos"]).round().astype(int)

Fabian Heinrich
committed
free_space[grid_idx[0], grid_idx[1]] = False
graph = networkx.Graph()
for i in range(width):
for j in range(height):
if free_space[i, j]:
graph.add_node((i, j))
if diagonal:
for di in range(-1, 2):
for dj in range(-1, 2):
x, y = i + di, j + dj
if (
and 0 < y < height
and free_space[x, y]
and (di, dj) != (0, 0)
):
if np.sum(np.abs(np.array([di, dj]))) == 2:
if free_space[i + di, j] and free_space[i, j + dj]:
graph.add_edge(
(i, j),
(x, y),
weight=np.linalg.norm(
np.array([i - x, j - y])
),
)
else:
graph.add_edge(
(i, j),
(x, y),
weight=np.linalg.norm(np.array([i - x, j - y])),
)
else:
for x, y in [(i - 1, j), (i + 1, j), (i, j - 1), (i, j + 1)]:
if 0 <= x < width and 0 <= y < height and free_space[x, y]:
graph.add_edge(
(i, j),
(x, y),
weight=1,
)
return graph
def restrict_movement_graph(
graph: Graph,
player_positions: list[tuple[float, float] | list[float]] | npt.NDArray[float],
) -> Graph:
"""Modifies a given movement graph. Removed the nodes of spaces on which players stand.
Args:
graph: The graph to modify.
player_positions: Positions of players.
Returns: The modified graph without nodes where players stand.
"""
for pos in player_positions:
tup = tuple(np.array(pos).round().astype(int))
if tup in copied.nodes.keys():
copied.remove_node(tup)
return copied
def create_json_schema() -> dict[str, Any]:
"""Create a json scheme of the state representation of an environment."""
return StateRepresentation.model_json_schema()
if __name__ == "__main__":