Skip to content
Snippets Groups Projects
state_representation.py 9.93 KiB
Newer Older
  • Learn to ignore specific revisions
  • """
    Type hint classes for the representation of the json state.
    """
    
    from enum import Enum
    
    Fabian Heinrich's avatar
    Fabian Heinrich committed
    from typing import Any
    
    import networkx
    import numpy as np
    import numpy.typing as npt
    from networkx import Graph
    
    from pydantic import BaseModel
    from typing_extensions import Literal, TypedDict
    
    
    class OrderState(TypedDict):
    
    Fabian Heinrich's avatar
    Fabian Heinrich committed
        """Format of the state representation of an order."""
    
    
        """Object category of env output, should be "Order"."""
    
    Fabian Heinrich's avatar
    Fabian Heinrich committed
        """Name of the ordered meal."""
    
        start_time: datetime | str  # isoformat str
    
    Fabian Heinrich's avatar
    Fabian Heinrich committed
        """Time of the creation of the order."""
    
        """Maximum duration of the order until it should be served."""
    
    class EffectState(TypedDict):
    
        """Format of the state representation of an effect (fire)."""
    
    Fabian Heinrich's avatar
    Fabian Heinrich committed
    
    
    Fabian Heinrich's avatar
    Fabian Heinrich committed
        """Type of the effect."""
    
        progress_percentage: float | int
    
    Fabian Heinrich's avatar
    Fabian Heinrich committed
        """Progressed percentage of the effect."""
    
        inverse_progress: bool
    
        """Display the "inverse" of the percentage: 1 - progress_percentage."""
    
    Fabian Heinrich's avatar
    Fabian Heinrich committed
        """Format of the state representation of an item."""
    
    
        category: Literal["Item"] | Literal["ItemCookingEquipment"]
    
        """Object category of env output, should be "Item" or "ItemCookingEquipment"."""
    
        """Type of the item, "Tomato", ..."""
    
    Fabian Heinrich's avatar
    Fabian Heinrich committed
        """Progressed percentage of the item."""
    
        inverse_progress: bool
    
        """Display the "inverse" of the percentage: 1 - progress_percentage."""
    
        active_effects: list[EffectState]
    
        """Active effects of the item, e.g., fire."""
    
    class CookingEquipmentState(ItemState):
    
    Fabian Heinrich's avatar
    Fabian Heinrich committed
        """Format of the state representation of cooking equipment."""
    
    
        """Items in/on the the cooking equipment."""
    
        """Utility attribute, to show meals that would be ready to be served. But could also be transformed to other 
        meals. (Therefore, keeping the `content_list`.) If not None, you can display the `content_ready` meal instead of 
        the `content_list`"""
    
    Fabian Heinrich's avatar
    Fabian Heinrich committed
        """Format of the state representation of a counter."""
    
    
        """Object category of env output, should be "Counter"."""
    
    Fabian Heinrich's avatar
    Fabian Heinrich committed
        """Type of the counter."""
    
        """2D Position of the counter."""
    
        orientation: list[float]
    
    Fabian Heinrich's avatar
    Fabian Heinrich committed
        """Orientation / facing direction of the counter, currently only visual."""
    
        occupied_by: None | list[
            ItemState | CookingEquipmentState
        ] | ItemState | CookingEquipmentState
    
    Fabian Heinrich's avatar
    Fabian Heinrich committed
        """What is occupying the counter, what is lying/standing on it."""
    
        active_effects: list[EffectState]
    
    Fabian Heinrich's avatar
    Fabian Heinrich committed
        """Active effects of the counter."""
    
        # list[ItemState] -> type in ["Sink", "PlateDispenser"]
    
    
    class PlayerState(TypedDict):
    
    Fabian Heinrich's avatar
    Fabian Heinrich committed
        """Format of the state representation of a player."""
    
    
    Fabian Heinrich's avatar
    Fabian Heinrich committed
        """ID of the player."""
    
        """2D Position of the player."""
    
    Fabian Heinrich's avatar
    Fabian Heinrich committed
        """The direction the player is facing."""
    
        holding: ItemState | CookingEquipmentState | None
    
    Fabian Heinrich's avatar
    Fabian Heinrich committed
        """What the player is currently holding."""
    
        current_nearest_counter_pos: list[float] | None
    
        """2D Position of the currently nearest counter to the player."""
    
        current_nearest_counter_id: str | None
    
    Fabian Heinrich's avatar
    Fabian Heinrich committed
        """ID of the currently nearest counter to the player."""
    
    Fabian Heinrich's avatar
    Fabian Heinrich committed
        """Format of the state representation of basic information of the kitchen."""
    
    Fabian Heinrich's avatar
    Fabian Heinrich committed
        """Width of the kitchen. One counter counts as one unit of measurement."""
    
    Fabian Heinrich's avatar
    Fabian Heinrich committed
        """Height of the kitchen. One counter counts as one unit of measurement."""
    
    class ViewRestriction(BaseModel):
    
    Fabian Heinrich's avatar
    Fabian Heinrich committed
        """Format of the state representation of a view restriction from the players perspectives.
    
        Currently, as a view cone, like a flashlight in the dark."""
    
    Fabian Heinrich's avatar
    Fabian Heinrich committed
    
    
        """2D Direction of what the player can see."""
    
        """2D Starting position of the view cone."""
    
    Fabian Heinrich's avatar
    Fabian Heinrich committed
        """Angle of the view cone."""
    
        counter_mask: None | list[bool]
    
    Fabian Heinrich's avatar
    Fabian Heinrich committed
        """Mask of which counters are in view. Currently unused."""
    
    Fabian Heinrich's avatar
    Fabian Heinrich committed
        """Range of the view."""
    
    class InfoMsgLevel(Enum):
    
    Fabian Heinrich's avatar
    Fabian Heinrich committed
        """Level of importance of the messages displayed to the players."""
    
    
        """Standard text, maybe black font color."""
    
        Warning = "Warning"
    
        """Warning text, should display emergency. Maybe red font color."""
    
        Success = "Success"
    
        """Something was good. Maybe green font color."""
    
    Fabian Heinrich's avatar
    Fabian Heinrich committed
        """Info messages for the players to be displayed."""
    
    
    Fabian Heinrich's avatar
    Fabian Heinrich committed
        """The actual message for the player."""
    
        start_time: datetime
    
    Fabian Heinrich's avatar
    Fabian Heinrich committed
        """Start time of the message."""
    
        end_time: datetime
    
    Fabian Heinrich's avatar
    Fabian Heinrich committed
        """End time of the message, when the message will disappear."""
    
        level: InfoMsgLevel
    
    Fabian Heinrich's avatar
    Fabian Heinrich committed
        """Importance level of the message."""
    
    class StateRepresentation(BaseModel):
    
        """The format of the returned state representation."""
    
    
    Fabian Heinrich's avatar
    Fabian Heinrich committed
        """Information about the players in the environment."""
    
    Fabian Heinrich's avatar
    Fabian Heinrich committed
        """Information about the counters in the environment."""
    
        """General information about the kitchen: Width, Height."""
    
        """The total score achieved by the players."""
    
    Fabian Heinrich's avatar
    Fabian Heinrich committed
        """Current orders in the environment."""
    
    Fabian Heinrich's avatar
    Fabian Heinrich committed
        """If the environment has ended."""
    
        """Current time of the environment. Each environment start time is 2000-01-01T00:00:00. See 
        `cooperative_cuisine.utils.create_init_env_time`."""
    
        """Remaining seconds for the players to act in the environment."""
    
        view_restrictions: None | list[ViewRestriction]
    
    Fabian Heinrich's avatar
    Fabian Heinrich committed
        """Restriction of the view for the players."""
    
        served_meals: list[tuple[str, str]]
    
    Fabian Heinrich's avatar
    Fabian Heinrich committed
        """List of already served meals in the environment."""
    
        info_msg: list[tuple[str, str]]
    
    Fabian Heinrich's avatar
    Fabian Heinrich committed
        """Info messages for the players to be displayed."""
    
        # all_players_ready: bool
    
        """Added by the game server, indicate if all players are ready and actions are passed to the environment."""
    
    def astar_heuristic(x, y):
    
        """Heuristic distance function used in astar algorithm."""
    
        return np.linalg.norm(np.array(x) - np.array(y))
    
    
    
    def create_movement_graph(state: StateRepresentation, diagonal=True) -> Graph:
        """
    
        Creates a graph which represents the connections of empty kitchen tiles and such
        possible coarse movements of an agent.
    
        Args:
            state: State representation to determine the graph to.
    
            diagonal: if True use 8 way connection, i.e. diagonal connections between the spaces.
    
    
        Returns: Graph representing the connections between empty kitchen tiles.
        """
        width, height = state["kitchen"]["width"], state["kitchen"]["height"]
        free_space = np.ones((width, height), dtype=bool)
        for counter in state["counters"]:
            grid_idx = np.array(counter["pos"]).round().astype(int)
    
            free_space[grid_idx[0], grid_idx[1]] = False
    
    
        graph = networkx.Graph()
        for i in range(width):
            for j in range(height):
                if free_space[i, j]:
                    graph.add_node((i, j))
    
                    if diagonal:
                        for di in range(-1, 2):
                            for dj in range(-1, 2):
                                x, y = i + di, j + dj
                                if (
    
                                    and 0 < y < height
                                    and free_space[x, y]
                                    and (di, dj) != (0, 0)
                                ):
                                    if np.sum(np.abs(np.array([di, dj]))) == 2:
                                        if free_space[i + di, j] and free_space[i, j + dj]:
                                            graph.add_edge(
                                                (i, j),
                                                (x, y),
                                                weight=np.linalg.norm(
                                                    np.array([i - x, j - y])
                                                ),
                                            )
                                    else:
                                        graph.add_edge(
                                            (i, j),
                                            (x, y),
                                            weight=np.linalg.norm(np.array([i - x, j - y])),
                                        )
                    else:
                        for x, y in [(i - 1, j), (i + 1, j), (i, j - 1), (i, j + 1)]:
    
                            if 0 <= x < width and 0 <= y < height and free_space[x, y]:
    
                                graph.add_edge(
                                    (i, j),
                                    (x, y),
                                    weight=1,
                                )
        return graph
    
    
    def restrict_movement_graph(
        graph: Graph,
        player_positions: list[tuple[float, float] | list[float]] | npt.NDArray[float],
    ) -> Graph:
    
        """Modifies a given movement graph. Removed the nodes of spaces on which players stand.
    
    
        Args:
            graph: The graph to modify.
            player_positions: Positions of players.
    
        Returns: The modified graph without nodes where players stand.
    
        """
    
        copied = graph.copy()
    
        for pos in player_positions:
            tup = tuple(np.array(pos).round().astype(int))
    
            if tup in copied.nodes.keys():
                copied.remove_node(tup)
        return copied
    
    Fabian Heinrich's avatar
    Fabian Heinrich committed
    def create_json_schema() -> dict[str, Any]:
        """Create a json scheme of the state representation of an environment."""
    
        return StateRepresentation.model_json_schema()
    
    
    if __name__ == "__main__":
    
    Fabian Heinrich's avatar
    Fabian Heinrich committed
        print(json.dumps(create_json_schema()))