Skip to content
Snippets Groups Projects
counters.py 25.3 KiB
Newer Older
  • Learn to ignore specific revisions
  • """All counters are derived from the `Counter` class. Counters implement the `Counter.pick_up` method, which defines
    what should happen when the agent wants to pick something up from the counter. On the other side,
    the `Counter.drop_off` method receives the item what should be put on the counter. Before that the
    `Counter.can_drop_off` method checked if the item can be put on the counter. The progress on Counters or on objects
    
    on the counters are handled via the Counters. They have the task to delegate the progress call via the `progress`
    
    method, e.g., the `CuttingBoard.progress`. The environment class detects which classes in this module have the
    `progress` method defined and on instances of these classes the progress will be delegated.
    
    Inside the item_info.yaml, equipment needs to be defined. It includes counters that are part of the
    interaction/requirements for the interaction.
    
    ```yaml
    CuttingBoard:
      type: Equipment
    
    Sink:
      type: Equipment
    
    Stove:
      type: Equipment
    ```
    
    
    The defined counter classes are:
    - `Counter`
    - `CuttingBoard`
    - `ServingWindow`
    - `Dispenser`
    - `PlateDispenser`
    - `Trashcan`
    
    Florian Schröder's avatar
    Florian Schröder committed
    """
    
    from collections.abc import Iterable
    
    from typing import TYPE_CHECKING, Optional, Callable, Set
    
        from overcooked_simulator.overcooked_environment import (
    
            OrderAndScoreManager,
    
    import numpy.typing as npt
    
    from overcooked_simulator.game_items import (
    
    """The logger for this module."""
    
    """The string for the `category` value in the json state representation for all counters."""
    
        """Simple class for a counter at a specified position (center of counter). Can hold things on top.
    
        The character `#` in the `layout` file represents the standard Counter.
        """
    
        def __init__(
            self,
            pos: npt.NDArray[float],
            occupied_by: Optional[Item] = None,
            uid: hex = None,
    
            """Constructor setting the arguments as attributes.
    
            Args:
                pos: Position of the counter in the environment. 2-element vector.
                occupied_by: The item on top of the counter.
            """
    
            self.uuid: str = uuid.uuid4().hex if uid is None else None
            """A unique id for better tracking in GUIs with assets which instance moved or changed."""
    
            self.pos: npt.NDArray[float] = pos
    
            """The position of the counter."""
    
            self.occupied_by: Optional[Item] = occupied_by
    
            """What is on top of the counter, e.g., `Item`s."""
    
        def occupied(self) -> bool:
            """Is something on top of the counter."""
    
            return self.occupied_by is not None
    
    
        def pick_up(self, on_hands: bool = True) -> Item | None:
    
            """Gets called upon a player performing the pickup action. If the counter can give something to
            the player, it does so. In the standard counter this is when an item is on the counter.
    
    
            Args:
                on_hands: Will the item be put on empty hands or on a cooking equipment.
    
    
            Returns: The item which the counter is occupied by. None if nothing is there.
    
            if on_hands:
                if self.occupied_by:
                    occupied_by = self.occupied_by
                    self.occupied_by = None
                    return occupied_by
                return None
            if self.occupied_by and isinstance(self.occupied_by, CookingEquipment):
                return self.occupied_by.release()
            occupied_by = self.occupied_by
    
        def can_drop_off(self, item: Item) -> bool:
    
            """Checks whether an item by the player can be dropped of. More relevant for example with
            ingredient dispensers, which should always be occupied and cannot take an item.
    
                item: The item for which to check, if it can be placed on the counter.
    
            Returns: True if the item can be placed on the counter, False if not.
    
            return self.occupied_by is None or self.occupied_by.can_combine(item)
    
        def drop_off(self, item: Item) -> Item | None:
    
            Returns:
                Item or None what should be put back on the players hand, e.g., the cooking equipment.
    
            elif self.occupied_by.can_combine(item):
    
    
        def interact_start(self):
            """Starts an interaction by the player. Nothing happens for the standard counter."""
            pass
    
        def interact_stop(self):
            """Stops an interaction by the player. Nothing happens for the standard counter."""
            pass
    
    
            return (
                f"{self.__class__.__name__}(pos={self.pos},occupied_by={self.occupied_by})"
            )
    
            """For the state representation. Only the relevant attributes are put into the dict."""
    
            return {
                "id": self.uuid,
                "category": COUNTER_CATEGORY,
                "type": self.__class__.__name__,
                "pos": self.pos.tolist(),
                "occupied_by": None
                if self.occupied_by is None
                else (
                    [o.to_dict() for o in self.occupied_by]
                    if isinstance(self.occupied_by, Iterable)
                    else self.occupied_by.to_dict()
                ),
            }
    
    
    
    class CuttingBoard(Counter):
    
    Florian Schröder's avatar
    Florian Schröder committed
        """Cutting ingredients on. The requirement in a new object could look like
    
        ```yaml
        ChoppedTomato:
          type: Ingredient
          needs: [ Tomato ]
          seconds: 4.0
          equipment: CuttingBoard
        ```
    
        The character `C` in the `layout` file represents the CuttingBoard.
    
        def __init__(self, pos: np.ndarray, transitions: dict[str, ItemInfo], **kwargs):
    
            self.progressing: bool = False
            """Is a player progressing/cutting on the board."""
            self.transitions: dict[str, ItemInfo] = transitions
            """The allowed transitions to a new item. Keys are the resulting items and the `ItemInfo` (value) contains 
            the needed items in the `need` attribute."""
            self.inverted_transition_dict: dict[str, ItemInfo] = {
    
                info.needs[0]: info for name, info in self.transitions.items()
            }
    
            """For faster accessing the needed item. Keys are the ingredients that the player can put and chop on the 
            board."""
    
            super().__init__(pos=pos, **kwargs)
    
        def progress(self, passed_time: timedelta, now: datetime):
    
    Florian Schröder's avatar
    Florian Schröder committed
            """Called by environment step function for time progression.
    
            Args:
                passed_time: the time passed since the last progress call
    
                now: the current env time. **Not the same as `datetime.now`**.
    
    Florian Schröder's avatar
    Florian Schröder committed
    
            Checks if the item on the board is in the allowed transitions via a Cutting board. Pass the progress call to
            the item on the board. If the progress on the item reaches 100% it changes the name of the item based on the
            "goal" name in the transition definition.
            """
    
            if (
                self.occupied
                and self.progressing
    
                and self.occupied_by.name in self.inverted_transition_dict
    
            ):
                percent = (
                    passed_time.total_seconds()
    
                    / self.inverted_transition_dict[self.occupied_by.name].seconds
    
                )
                self.occupied_by.progress(
                    equipment=self.__class__.__name__, percent=percent
                )
                if self.occupied_by.progress_percentage == 1.0:
                    self.occupied_by.reset()
    
                    self.occupied_by.name = self.inverted_transition_dict[
                        self.occupied_by.name
                    ].name
    
    
        def start_progress(self):
    
            """Starts the cutting process."""
    
            self.progressing = True
    
        def pause_progress(self):
    
            """Pauses the cutting process"""
    
            self.progressing = False
    
        def interact_start(self):
    
            """Handles player interaction, starting to hold key down."""
    
            self.start_progress()
    
        def interact_stop(self):
    
            """Handles player interaction, stopping to hold key down."""
    
            self.pause_progress()
    
    
        def to_dict(self) -> dict:
            d = super().to_dict()
            d.update((("progressing", self.progressing),))
            return d
    
    
        """The orders and scores are updated based on completed and dropped off meals. The plate dispenser is pinged for
    
        the info about a plate outside the kitchen.
    
    Florian Schröder's avatar
    Florian Schröder committed
    
        All items in the `item_info.yml` with the type meal are considered to be servable, if they are ordered. Not
        ordered meals can also be served, if a `serving_not_ordered_meals` function is set in the `environment_config.yml`.
    
        The plate dispenser will put after some time a dirty plate on itself after a meal was served.
    
    
        The character `W` in the `layout` file represents the ServingWindow.
    
            pos: npt.NDArray[float],
    
            order_and_score: OrderAndScoreManager,
    
    Florian Schröder's avatar
    Florian Schröder committed
            meals: set[str],
    
            env_time_func: Callable[[], datetime],
    
            plate_dispenser: PlateDispenser = None,
    
            self.order_and_score: OrderAndScoreManager = order_and_score
            """Reference to the OrderAndScoreManager class. It determines which meals can be served and it manages the 
            score."""
            self.plate_dispenser: PlateDispenser = plate_dispenser
            """Served meals are mentioned on the plate dispenser. So that the plate dispenser can add a dirty plate after 
            some time."""
            self.meals: set[str] = meals
            """All allowed meals by the `environment_config.yml`."""
            self.env_time_func: Callable[[], datetime] = env_time_func
            """Reference to get the current env time by calling the `env_time_func`."""
    
            super().__init__(pos=pos, **kwargs)
    
        def drop_off(self, item) -> Item | None:
    
            env_time = self.env_time_func()
            if self.order_and_score.serve_meal(item=item, env_time=env_time):
                if self.plate_dispenser is not None:
                    self.plate_dispenser.update_plate_out_of_kitchen(env_time=env_time)
                return None
            return item
    
        def can_drop_off(self, item: Item) -> bool:
    
            return isinstance(item, CookingEquipment) and (
                (item.content_ready is not None and item.content_ready.name in self.meals)
                or (len(item.content_list) == 1 and item.content_list[0].name in self.meals)
            )
    
        def pick_up(self, on_hands: bool = True) -> Item | None:
    
        def add_plate_dispenser(self, plate_dispenser):
            self.plate_dispenser = plate_dispenser
    
    
    class Dispenser(Counter):
    
        """The class for all dispensers except plate dispenser. Here ingredients can be grabbed from the player/agent.
    
    Florian Schröder's avatar
    Florian Schröder committed
    
        At the moment all ingredients have an unlimited stock.
    
        The character for each dispenser in the `layout` file is currently hard coded in the environment class:
        ```yaml
        T: Tomato
        L: Lettuce
        N: Onion  # N for oNioN
        B: Bun
        M: Meat
        ```
        The plan is to put the info also in the config.
    
        In the implementation, an instance of the item to dispense is always on top of the dispenser.
        Which also is easier for the visualization of the dispenser.
        """
    
    
        def __init__(self, pos: npt.NDArray[float], dispensing: ItemInfo, **kwargs):
    
            self.dispensing: ItemInfo = dispensing
            """`ItemInfo` what the the Dispenser is dispensing. One ready object always is on top of the counter."""
    
    Fabian Heinrich's avatar
    Fabian Heinrich committed
            super().__init__(
    
                pos=pos,
                occupied_by=self.create_item(),
    
    Fabian Heinrich's avatar
    Fabian Heinrich committed
            )
    
        def pick_up(self, on_hands: bool = True) -> Item | None:
    
        def drop_off(self, item: Item) -> Item | None:
    
            if self.occupied_by.can_combine(item):
                return self.occupied_by.combine(item)
    
        def can_drop_off(self, item: Item) -> bool:
    
            return self.occupied_by.can_combine(item)
    
        def __repr__(self):
    
            return f"{self.dispensing.name}Dispenser"
    
        def create_item(self) -> Item:
    
            """Create a new item to put on the dispenser after the previous one was picked up."""
    
            kwargs = {
                "name": self.dispensing.name,
                "item_info": self.dispensing,
            }
            return Item(**kwargs)
    
    
        def to_dict(self) -> dict:
            d = super().to_dict()
            d.update((("type", self.__repr__()),))
            return d
    
    
    @dataclasses.dataclass
    class PlateConfig:
        """Configure the initial and behavior of the plates in the environment."""
    
        clean_plates: int = 0
        """clean plates at the start."""
        dirty_plates: int = 3
        """dirty plates at the start."""
        plate_delay: list[int, int] = dataclasses.field(default_factory=lambda: [5, 10])
    
        """The uniform sampling range for the plate delay between serving and return in seconds."""
    
    Florian Schröder's avatar
    Florian Schröder committed
        """At the moment, one and only one plate dispenser must exist in an environment, because only at one place the dirty
        plates should arrive.
    
        How many plates should exist at the start of the level on the plate dispenser is defined in the `environment_config.yml`:
        ```yaml
        plates:
          clean_plates: 1
          dirty_plates: 2
          plate_delay: [ 5, 10 ]
          # seconds until the dirty plate arrives.
        ```
    
        The character `P` in the `layout` file represents the PlateDispenser.
        """
    
    
            self,
            pos: npt.NDArray[float],
            dispensing: ItemInfo,
            plate_config: PlateConfig,
    
            plate_transitions: dict[str, ItemInfo],
    
            **kwargs,
    
            super().__init__(pos=pos, **kwargs)
    
            self.dispensing: ItemInfo = dispensing
            """Plate ItemInfo."""
            self.occupied_by: deque = deque()
            """The queue of plates. New dirty ones are put at the end and therefore under the current plates."""
            self.out_of_kitchen_timer: list[datetime] = []
            """Internal timer for how many plates are out of kitchen and how long."""
            self.plate_config: PlateConfig = plate_config
            """The config how many plates are present in the kitchen at the beginning (and in total) and the config for 
            the random "out of kitchen" timer."""
            self.next_plate_time: datetime = datetime.max
            """For efficient checking if dirty plates should be created, instead of looping through the 
            `out_of_kitchen_timer` list every frame."""
            self.plate_transitions: dict[str, ItemInfo] = plate_transitions
            """Transitions for the plates. Relevant for the sink, because a plate can become a clean one there."""
    
        def pick_up(self, on_hands: bool = True) -> Item | None:
    
        def can_drop_off(self, item: Item) -> bool:
    
            return not self.occupied_by or self.occupied_by[-1].can_combine(item)
    
    
        def drop_off(self, item: Item) -> Item | None:
            if not self.occupied_by:
                self.occupied_by.append(item)
            elif self.occupied_by[-1].can_combine(item):
                return self.occupied_by[-1].combine(item)
            return None
    
        def add_dirty_plate(self):
    
            self.occupied_by.appendleft(self.create_item())
    
        def update_plate_out_of_kitchen(self, env_time: datetime):
    
            """Is called from the serving window to add a plate out of kitchen."""
    
            # not perfect identical to datetime.now but based on framerate enough.
    
            time_plate_to_add = env_time + timedelta(
    
                    low=self.plate_config.plate_delay[0],
                    high=self.plate_config.plate_delay[1],
    
                )
            )
            log.debug(f"New plate out of kitchen until {time_plate_to_add}")
            self.out_of_kitchen_timer.append(time_plate_to_add)
            if time_plate_to_add < self.next_plate_time:
                self.next_plate_time = time_plate_to_add
    
        def setup_plates(self):
            """Create plates based on the config. Clean and dirty ones."""
    
            if self.plate_config.dirty_plates > 0:
                log.info(f"Setup {self.plate_config.dirty_plates} dirty plates.")
    
                    [self.create_item() for _ in range(self.plate_config.dirty_plates)]
    
            if self.plate_config.clean_plates > 0:
                log.info(f"Setup {self.plate_config.dirty_plates} clean plates.")
    
                        for _ in range(self.plate_config.clean_plates)
    
        def progress(self, passed_time: timedelta, now: datetime):
    
            """Check if plates arrive from outside the kitchen and add a dirty plate accordingly"""
            if self.next_plate_time < now:
                idx_delete = []
                for i, times in enumerate(self.out_of_kitchen_timer):
                    if times < now:
                        idx_delete.append(i)
                        log.debug("Add dirty plate")
                        self.add_dirty_plate()
                for idx in reversed(idx_delete):
                    self.out_of_kitchen_timer.pop(idx)
                self.next_plate_time = (
                    min(self.out_of_kitchen_timer)
                    if self.out_of_kitchen_timer
                    else datetime.max
                )
    
        def create_item(self, clean: bool = False) -> Plate:
    
            """Create a plate.
    
            Args:
                clean: Whether the plate is clean or dirty.
            """
    
            kwargs = {
                "clean": clean,
                "transitions": self.plate_transitions,
                "item_info": self.dispensing,
            }
            return Plate(**kwargs)
    
    
    Florian Schröder's avatar
    Florian Schröder committed
    class Trashcan(Counter):
        """Ingredients and content on a cooking equipment can be removed from the environment via the trash.
    
        The character `X` in the `layout` file represents the Trashcan.
        """
    
    
        def __init__(
            self, order_and_score: OrderAndScoreManager, pos: npt.NDArray[float], **kwargs
        ):
            super().__init__(pos, **kwargs)
    
            self.order_and_score: OrderAndScoreManager = order_and_score
            """Reference to the `OrderAndScoreManager`, because unnecessary removed items can affect the score."""
    
        def pick_up(self, on_hands: bool = True) -> Item | None:
    
        def drop_off(self, item: Item) -> Item | None:
    
                self.order_and_score.apply_penalty_for_using_trash(item.content_list)
    
                item.reset_content()
    
            else:
                self.order_and_score.apply_penalty_for_using_trash(item)
    
        def can_drop_off(self, item: Item) -> bool:
    
        """Cooking machine. Class for the stove, deep fryer, and oven.
    
    Florian Schröder's avatar
    Florian Schröder committed
    
        The character depends on the cooking equipment on top of it:
        ```yaml
        U: Stove with a pot
        Q: Stove with a pan
    
        O: Oven with a (pizza) peel
        F: DeepFryer with a basket
    
            equipments: set[str],
    
            self.name: str = name
            """The type/name of the cooking counter, e.g., Stove, DeepFryer, Oven."""
            self.equipments: set[str] = equipments
            """The valid equipment for the cooking counter, e.g., for a Stove {'Pot','Pan'}."""
    
        def can_drop_off(self, item: Item) -> bool:
            if self.occupied_by is None:
    
                return isinstance(item, CookingEquipment) and item.name in self.equipments
    
            else:
                return self.occupied_by.can_combine(item)
    
    
        def progress(self, passed_time: timedelta, now: datetime):
    
            """Called by environment step function for time progression"""
    
            if (
                self.occupied_by
                and isinstance(self.occupied_by, CookingEquipment)
    
                and self.occupied_by.name in self.equipments
    
                self.occupied_by.progress(passed_time, now)
    
        def __repr__(self):
            return f"{self.name}(pos={self.pos},occupied_by={self.occupied_by})"
    
    
        def to_dict(self) -> dict:
            d = super().to_dict()
            d.update((("type", self.name),))
            return d
    
    
        """The counter in which the dirty plates can be washed to clean plates.
    
    Florian Schröder's avatar
    Florian Schröder committed
    
        Needs a `SinkAddon`. The closest is calculated during initialisation, should not be seperated by each other (needs
        to touch the sink).
    
        The logic is similar to the CuttingBoard because there is no additional cooking equipment between the object to
        progress and the counter. When the progress on the dirty plate is done, it is set to clean and is passed to the
        `SinkAddon`.
    
        The character `S` in the `layout` file represents the Sink.
        """
    
    
        def __init__(
            self,
            pos: npt.NDArray[float],
    
            transitions: dict[str, ItemInfo],
    
            sink_addon: SinkAddon = None,
    
            super().__init__(pos=pos, **kwargs)
    
            self.progressing: bool = False
            """If a player currently cleans a plate."""
    
            self.sink_addon: SinkAddon = sink_addon
    
    Florian Schröder's avatar
    Florian Schröder committed
            """The connected sink addon which will receive the clean plates"""
    
            self.occupied_by: deque[Plate] = deque()
    
    Florian Schröder's avatar
    Florian Schröder committed
            """The queue of dirty plates. Only the one on the top is progressed."""
    
            """The allowed transitions for the items in the sink. Here only clean plates transfer from dirty plates."""
    
            self.transition_needs: Set[str] = set()
            """Set of all first needs of the transition item info."""
    
            for name, info in transitions.items():
                assert (
                    len(info.needs) >= 1
                ), "transitions in a Sink need at least one item need."
                self.transition_needs.update([info.needs[0]])
    
        def occupied(self) -> bool:
            """If there is a plate in the sink."""
    
        def progress(self, passed_time: timedelta, now: datetime):
    
            """Called by environment step function for time progression"""
    
                and self.occupied_by[-1].name in self.transition_needs
    
                for name, info in self.transitions.items():
                    if info.needs[0] == self.occupied_by[-1].name:
                        percent = passed_time.total_seconds() / info.seconds
                        self.occupied_by[-1].progress(
                            equipment=self.__class__.__name__, percent=percent
                        )
                        if self.occupied_by[-1].progress_percentage == 1.0:
                            self.occupied_by[-1].reset()
                            self.occupied_by[-1].name = name
                            plate = self.occupied_by.pop()
                            plate.clean = True
                            self.sink_addon.add_clean_plate(plate)
                        break
    
    
        def start_progress(self):
            """Starts the cutting process."""
            self.progressing = True
    
        def pause_progress(self):
            """Pauses the cutting process"""
            self.progressing = False
    
        def interact_start(self):
            """Handles player interaction, starting to hold key down."""
            self.start_progress()
    
        def interact_stop(self):
            """Handles player interaction, stopping to hold key down."""
            self.pause_progress()
    
        def can_drop_off(self, item: Item) -> bool:
            return isinstance(item, Plate) and not item.clean
    
    
        def drop_off(self, item: Item) -> Item | None:
            self.occupied_by.appendleft(item)
            return None
    
    
        def pick_up(self, on_hands: bool = True) -> Item | None:
            return None
    
        def set_addon(self, sink_addon: SinkAddon):
    
            """Set the closest addon in post_setup."""
    
        def to_dict(self) -> dict:
            d = super().to_dict()
            d.update((("progressing", self.progressing),))
            return d
    
    
    Florian Schröder's avatar
    Florian Schröder committed
        """The counter on which the clean plates appear after cleaning them in the `Sink`
    
        It needs to be set close to/touching the `Sink`.
    
        The character `+` in the `layout` file represents the SinkAddon.
        """
    
    
        def __init__(self, pos: npt.NDArray[float], occupied_by=None):
            super().__init__(pos=pos)
            # maybe check if occupied by is already a list or deque?
    
            self.occupied_by: deque = deque([occupied_by]) if occupied_by else deque()
            """The stack of clean plates."""
    
            return self.occupied_by and self.occupied_by[-1].can_combine(item)
    
        def drop_off(self, item: Item) -> Item | None:
    
            return self.occupied_by[-1].combine(item)
    
    
        def add_clean_plate(self, plate: Plate):
    
            """Called from the `Sink` after a plate is cleaned / the progress is complete."""
    
            self.occupied_by.appendleft(plate)
    
        def pick_up(self, on_hands: bool = True) -> Item | None:
    
            if self.occupied_by:
                return self.occupied_by.pop()