"""All counters are derived from the `Counter` class. Counters implement the `Counter.pick_up` method, which defines what should happen when the agent wants to pick something up from the counter. On the other side, the `Counter.drop_off` method receives the item what should be put on the counter. Before that the `Counter.can_drop_off` method checked if the item can be put on the counter. The progress on Counters or on objects on the counters are handled via the Counters. They have the task to delegate the progress call via the `progress` method, e.g., the `CuttingBoard.progress`. On which type of counter the progress method is called is currently defined in the environment class. Inside the item_info.yaml, equipment needs to be defined. It includes counters that are part of the interaction/requirements for the interaction. CuttingBoard: type: Equipment Sink: type: Equipment Stove: type: Equipment The defined counter classes are: - `Counter` - `CuttingBoard` - `ServingWindow` - `Dispenser` - `PlateDispenser` - `Trashcan` - `CookingCounter` - `Sink` - `SinkAddon` ## Code Documentation """ from __future__ import annotations import dataclasses import logging from collections import deque from datetime import datetime, timedelta from typing import TYPE_CHECKING, Optional, Callable, TypedDict if TYPE_CHECKING: from overcooked_simulator.overcooked_environment import ( OrderAndScoreManager, ) import numpy as np import numpy.typing as npt from overcooked_simulator.game_items import ( Item, CookingEquipment, Plate, ItemInfo, ) log = logging.getLogger(__name__) class TransitionsValueDict(TypedDict): """The values in the transitions dicts of the `CookingEquipment`.""" seconds: int | float """The needed seconds to progress for the transition.""" needs: list[str] """The names of the needed items for the transition.""" info: ItemInfo | str """The ItemInfo of the resulting item.""" class TransitionsValueByNameDict(TypedDict): """The values in the transitions dicts of the `CuttingBoard` and the `Sink`.""" seconds: int | float """The needed seconds to progress for the transition.""" result: str """The new name of the item after the transition.""" class Counter: """Simple class for a counter at a specified position (center of counter). Can hold things on top. The character `#` in the `layout` file represents the standard Counter. """ def __init__(self, pos: npt.NDArray[float], occupied_by: Optional[Item] = None): """Constructor setting the arguments as attributes. Args: pos: Position of the counter in the environment. 2-element vector. occupied_by: The item on top of the counter. """ self.pos: npt.NDArray[float] = pos self.occupied_by: Optional[Item] = occupied_by @property def occupied(self): return self.occupied_by is not None def pick_up(self, on_hands: bool = True) -> Item | None: """Gets called upon a player performing the pickup action. If the counter can give something to the player, it does so. In the standard counter this is when an item is on the counter. Args: on_hands: Will the item be put on empty hands or on a cooking equipment. Returns: The item which the counter is occupied by. None if nothing is there. """ if on_hands: if self.occupied_by: occupied_by = self.occupied_by self.occupied_by = None return occupied_by return None if self.occupied_by and isinstance(self.occupied_by, CookingEquipment): return self.occupied_by.release() occupied_by = self.occupied_by self.occupied_by = None return occupied_by def can_drop_off(self, item: Item) -> bool: """Checks whether an item by the player can be dropped of. More relevant for example with ingredient dispensers, which should always be occupied and cannot take an item. Args: item: The item for which to check, if it can be placed on the counter. Returns: True if the item can be placed on the counter, False if not. """ return self.occupied_by is None or self.occupied_by.can_combine(item) def drop_off(self, item: Item) -> Item | None: """Takes the thing dropped of by the player. Args: item: The item to be placed on the counter. Returns: Item or None what should be put back on the players hand, e.g., the cooking equipment. """ if self.occupied_by is None: self.occupied_by = item elif self.occupied_by.can_combine(item): return self.occupied_by.combine(item) return None def interact_start(self): """Starts an interaction by the player. Nothing happens for the standard counter.""" pass def interact_stop(self): """Stops an interaction by the player. Nothing happens for the standard counter.""" pass def __repr__(self): return ( f"{self.__class__.__name__}(pos={self.pos},occupied_by={self.occupied_by})" ) class CuttingBoard(Counter): """Cutting ingredients on. The requirement in a new object could look like ```yaml ChoppedTomato: type: Ingredient needs: [ Tomato ] seconds: 4.0 equipment: CuttingBoard ``` The character `C` in the `layout` file represents the CuttingBoard. """ def __init__( self, pos: np.ndarray, transitions: dict[str, TransitionsValueByNameDict] ): self.progressing = False self.transitions = transitions super().__init__(pos=pos) def progress(self, passed_time: timedelta, now: datetime): """Called by environment step function for time progression. Args: passed_time: the time passed since the last progress call now: the current env time. **Not the same as `datetime.now`**. Checks if the item on the board is in the allowed transitions via a Cutting board. Pass the progress call to the item on the board. If the progress on the item reaches 100% it changes the name of the item based on the "goal" name in the transition definition. """ if ( self.occupied and self.progressing and self.occupied_by.name in self.transitions ): percent = ( passed_time.total_seconds() / self.transitions[self.occupied_by.name]["seconds"] ) self.occupied_by.progress( equipment=self.__class__.__name__, percent=percent ) if self.occupied_by.progress_percentage == 1.0: self.occupied_by.reset() self.occupied_by.name = self.transitions[self.occupied_by.name][ "result" ] def start_progress(self): """Starts the cutting process.""" self.progressing = True def pause_progress(self): """Pauses the cutting process""" self.progressing = False def interact_start(self): """Handles player interaction, starting to hold key down.""" self.start_progress() def interact_stop(self): """Handles player interaction, stopping to hold key down.""" self.pause_progress() class ServingWindow(Counter): """The orders and scores are updated based on completed and dropped off meals. The plate dispenser is pinged for the info about a plate outside of the kitchen. All items in the `item_info.yml` with the type meal are considered to be servable, if they are ordered. Not ordered meals can also be served, if a `serving_not_ordered_meals` function is set in the `environment_config.yml`. The plate dispenser will put after some time a dirty plate on itself after a meal was served. The character `W` in the `layout` file represents the ServingWindow. """ def __init__( self, pos: npt.NDArray[float], order_and_score: OrderAndScoreManager, meals: set[str], env_time_func: Callable[[], datetime], plate_dispenser: PlateDispenser = None, ): self.order_and_score = order_and_score self.plate_dispenser = plate_dispenser self.meals = meals self.env_time_func = env_time_func super().__init__(pos=pos) def drop_off(self, item) -> Item | None: env_time = self.env_time_func() if self.order_and_score.serve_meal(item=item, env_time=env_time): if self.plate_dispenser is not None: self.plate_dispenser.update_plate_out_of_kitchen(env_time=env_time) return None return item def can_drop_off(self, item: Item) -> bool: return isinstance(item, CookingEquipment) and ( (item.content_ready is not None and item.content_ready.name in self.meals) or (len(item.content_list) == 1 and item.content_list[0].name in self.meals) ) def pick_up(self, on_hands: bool = True) -> Item | None: pass def add_plate_dispenser(self, plate_dispenser): self.plate_dispenser = plate_dispenser class Dispenser(Counter): """The class for all dispensers except plate dispenser. Here ingredients can be grabbed from the player/agent. At the moment all ingredients have an unlimited stock. The character for each dispenser in the `layout` file is currently hard coded in the environment class: ```yaml T: Tomato L: Lettuce N: Onion # N for oNioN B: Bun M: Meat ``` The plan is to put the info also in the config. In the implementation, an instance of the item to dispense is always on top of the dispenser. Which also is easier for the visualization of the dispenser. """ def __init__(self, pos: npt.NDArray[float], dispensing: ItemInfo): self.dispensing = dispensing super().__init__( pos=pos, occupied_by=self.create_item(), ) def pick_up(self, on_hands: bool = True) -> Item | None: return_this = self.occupied_by self.occupied_by = self.create_item() return return_this def drop_off(self, item: Item) -> Item | None: if self.occupied_by.can_combine(item): return self.occupied_by.combine(item) def can_drop_off(self, item: Item) -> bool: return self.occupied_by.can_combine(item) def __repr__(self): return f"{self.dispensing.name}Dispenser" def create_item(self) -> Item: kwargs = { "name": self.dispensing.name, "item_info": self.dispensing, } return Item(**kwargs) @dataclasses.dataclass class PlateConfig: """Configure the initial and behavior of the plates in the environment.""" clean_plates: int = 0 """clean plates at the start.""" dirty_plates: int = 3 """dirty plates at the start.""" plate_delay: list[int, int] = dataclasses.field(default_factory=lambda: [5, 10]) """The uniform sampling range for the plate delay between serving and return in seconds.""" class PlateDispenser(Counter): """At the moment, one and only one plate dispenser must exist in an environment, because only at one place the dirty plates should arrive. How many plates should exist at the start of the level on the plate dispenser is defined in the `environment_config.yml`: ```yaml plates: clean_plates: 1 dirty_plates: 2 plate_delay: [ 5, 10 ] # seconds until the dirty plate arrives. ``` The character `P` in the `layout` file represents the PlateDispenser. """ def __init__( self, pos: npt.NDArray[float], dispensing: ItemInfo, plate_config: PlateConfig, plate_transitions: dict, **kwargs, ) -> None: super().__init__(pos=pos, **kwargs) self.dispensing = dispensing self.occupied_by = deque() self.out_of_kitchen_timer = [] self.plate_config = plate_config self.next_plate_time = datetime.max self.plate_transitions: dict[str, TransitionsValueDict] = plate_transitions self.setup_plates() def pick_up(self, on_hands: bool = True) -> Item | None: if self.occupied_by: return self.occupied_by.pop() def can_drop_off(self, item: Item) -> bool: return not self.occupied_by or self.occupied_by[-1].can_combine(item) def drop_off(self, item: Item) -> Item | None: """At the moment items can be put on the top of the plate dispenser or the top plate if it is clean and can be put on a plate.""" if not self.occupied_by: self.occupied_by.append(item) elif self.occupied_by[-1].can_combine(item): return self.occupied_by[-1].combine(item) return None def add_dirty_plate(self): self.occupied_by.appendleft(self.create_item()) def update_plate_out_of_kitchen(self, env_time: datetime): """Is called from the serving window to add a plate out of kitchen.""" # not perfect identical to datetime.now but based on framerate enough. time_plate_to_add = env_time + timedelta( seconds=np.random.uniform( low=self.plate_config.plate_delay[0], high=self.plate_config.plate_delay[1], ) ) log.debug(f"New plate out of kitchen until {time_plate_to_add}") self.out_of_kitchen_timer.append(time_plate_to_add) if time_plate_to_add < self.next_plate_time: self.next_plate_time = time_plate_to_add def setup_plates(self): """Create plates based on the config. Clean and dirty ones.""" if self.plate_config.dirty_plates > 0: log.info(f"Setup {self.plate_config.dirty_plates} dirty plates.") self.occupied_by.extend( [self.create_item() for _ in range(self.plate_config.dirty_plates)] ) if self.plate_config.clean_plates > 0: log.info(f"Setup {self.plate_config.dirty_plates} clean plates.") self.occupied_by.extend( [ self.create_item(clean=True) for _ in range(self.plate_config.clean_plates) ] ) def progress(self, passed_time: timedelta, now: datetime): """Check if plates arrive from outside the kitchen and add a dirty plate accordingly""" if self.next_plate_time < now: idx_delete = [] for i, times in enumerate(self.out_of_kitchen_timer): if times < now: idx_delete.append(i) log.debug("Add dirty plate") self.add_dirty_plate() for idx in reversed(idx_delete): self.out_of_kitchen_timer.pop(idx) self.next_plate_time = ( min(self.out_of_kitchen_timer) if self.out_of_kitchen_timer else datetime.max ) def __repr__(self): return "PlateReturn" def create_item(self, clean: bool = False) -> Plate: kwargs = { "clean": clean, "transitions": self.plate_transitions, "item_info": self.dispensing, } return Plate(**kwargs) class Trashcan(Counter): """Ingredients and content on a cooking equipment can be removed from the environment via the trash. The character `X` in the `layout` file represents the Trashcan. """ def pick_up(self, on_hands: bool = True) -> Item | None: pass def drop_off(self, item: Item) -> Item | None: if isinstance(item, CookingEquipment): item.reset_content() return item return None def can_drop_off(self, item: Item) -> bool: return True class CookingCounter(Counter): """Cooking machine. Currently, the stove which can have a pot and pan on top. In the future one class for stove, deep fryer, and oven. The character depends on the cooking equipment on top of it: ```yaml U: Stove with a pot Q: Stove with a pan ``` """ def __init__( self, name: str, cooking_counter_equipments: dict[str, list[str]], **kwargs, ): self.name = name self.cooking_counter_equipments = cooking_counter_equipments super().__init__(**kwargs) def can_drop_off(self, item: Item) -> bool: if self.occupied_by is None: return ( isinstance(item, CookingEquipment) and item.name in self.cooking_counter_equipments[self.name] ) else: return self.occupied_by.can_combine(item) def progress(self, passed_time: timedelta, now: datetime): """Called by environment step function for time progression""" if ( self.occupied_by and isinstance(self.occupied_by, CookingEquipment) and self.occupied_by.name in self.cooking_counter_equipments[self.name] and self.occupied_by.can_progress() ): self.occupied_by.progress(passed_time, now) def __repr__(self): return f"{self.name}(pos={self.pos},occupied_by={self.occupied_by})" class Sink(Counter): """The counter in which the dirty plates can be washed to clean plates. Needs a `SinkAddon`. The closest is calculated during initialisation, should not be seperated by each other (needs to touch the sink). The logic is similar to the CuttingBoard because there is no additional cooking equipment between the object to progress and the counter. When the progress on the dirty plate is done, it is set to clean and is passed to the `SinkAddon`. The character `S` in the `layout` file represents the Sink. """ def __init__( self, pos: npt.NDArray[float], transitions: dict[str, TransitionsValueByNameDict], sink_addon: SinkAddon = None, ): super().__init__(pos=pos) self.progressing = False self.sink_addon: SinkAddon = sink_addon """The connected sink addon which will receive the clean plates""" self.occupied_by = deque() """The queue of dirty plates. Only the one on the top is progressed.""" self.transitions = transitions """The allowed transitions for the items in the sink. Here only clean plates transfer from dirty plates.""" @property def occupied(self): return len(self.occupied_by) != 0 def progress(self, passed_time: timedelta, now: datetime): """Called by environment step function for time progression""" if ( self.occupied and self.progressing and self.occupied_by[-1].name in self.transitions ): percent = ( passed_time.total_seconds() / self.transitions[self.occupied_by[-1].name]["seconds"] ) self.occupied_by[-1].progress( equipment=self.__class__.__name__, percent=percent ) if self.occupied_by[-1].progress_percentage == 1.0: self.occupied_by[-1].reset() self.occupied_by[-1].name = self.transitions[self.occupied_by[-1].name][ "result" ] plate = self.occupied_by.pop() plate.clean = True self.sink_addon.add_clean_plate(plate) def start_progress(self): """Starts the cutting process.""" self.progressing = True def pause_progress(self): """Pauses the cutting process""" self.progressing = False def interact_start(self): """Handles player interaction, starting to hold key down.""" self.start_progress() def interact_stop(self): """Handles player interaction, stopping to hold key down.""" self.pause_progress() def can_drop_off(self, item: Item) -> bool: return isinstance(item, Plate) and not item.clean def drop_off(self, item: Item) -> Item | None: self.occupied_by.appendleft(item) return None def pick_up(self, on_hands: bool = True) -> Item | None: return None def set_addon(self, sink_addon: SinkAddon): self.sink_addon = sink_addon class SinkAddon(Counter): """The counter on which the clean plates appear after cleaning them in the `Sink` It needs to be set close to/touching the `Sink`. The character `+` in the `layout` file represents the SinkAddon. """ def __init__(self, pos: npt.NDArray[float], occupied_by=None): super().__init__(pos=pos) # maybe check if occupied by is already a list or deque? self.occupied_by = deque([occupied_by]) if occupied_by else deque() def can_drop_off(self, item: Item) -> bool: return self.occupied_by and self.occupied_by[-1].can_combine(item) def drop_off(self, item: Item) -> Item | None: return self.occupied_by[-1].combine(item) def add_clean_plate(self, plate: Plate): self.occupied_by.appendleft(plate) def pick_up(self, on_hands: bool = True) -> Item | None: if self.occupied_by: return self.occupied_by.pop()