from __future__ import annotations import logging from collections import deque from datetime import datetime, timedelta from typing import TYPE_CHECKING, Optional from overcooked_simulator.utils import create_init_env_time if TYPE_CHECKING: from overcooked_simulator.overcooked_environment import ( GameScore, ) import numpy as np import numpy.typing as npt from overcooked_simulator.game_items import ( CuttableItem, Item, CookingEquipment, Meal, Plate, ) log = logging.getLogger(__name__) class Counter: """Simple class for a counter at a specified position (center of counter). Can hold things on top.""" def __init__(self, pos: npt.NDArray[float], occupied_by: Optional[Item] = None): self.pos: npt.NDArray[float] = pos self.occupied_by: Optional[Item] = occupied_by def pick_up(self, on_hands: bool = True): """Gets called upon a player performing the pickup action. If the counter can give something to the player, it does so. In the standard counter this is when an item is on the counter. Returns: The item which the counter is occupied by. None if nothing is there. """ if on_hands: if self.occupied_by: occupied_by = self.occupied_by self.occupied_by = None return occupied_by return None if self.occupied_by and isinstance(self.occupied_by, CookingEquipment): return self.occupied_by.release() occupied_by = self.occupied_by self.occupied_by = None return occupied_by def can_drop_off(self, item: Item) -> bool: """Checks whether an item by the player can be dropped of. More relevant for example with ingredient dispensers, which should always be occupied and cannot take an item. Args: item: The item for which to check, if it can be placed on the counter. Returns: True if the item can be placed on the counter, False if not. """ return self.occupied_by is None or self.occupied_by.can_combine(item) def drop_off(self, item: Item) -> Item | None: """Takes the thing dropped of by the player. Args: item: The item to be placed on the counter. Returns: TODO Return information, whether the score is affected (Serving Window?) """ if self.occupied_by is None: self.occupied_by = item elif self.occupied_by.can_combine(item): return self.occupied_by.combine(item) return None def interact_start(self): """Starts an interaction by the player. Nothing happens for the standard counter.""" pass def interact_stop(self): """Stops an interaction by the player. Nothing happens for the standard counter.""" pass def __repr__(self): return ( f"{self.__class__.__name__}(pos={self.pos},occupied_by={self.occupied_by})" ) class CuttingBoard(Counter): def __init__(self, pos: np.ndarray): self.progressing = False super().__init__(pos) def progress(self, passed_time: timedelta, now: datetime): """Called by environment step function for time progression""" if self.progressing: if isinstance(self.occupied_by, CuttableItem): self.occupied_by.progress() def start_progress(self): """Starts the cutting process.""" self.progressing = True def pause_progress(self): """Pauses the cutting process""" self.progressing = False def interact_start(self): """Handles player interaction, starting to hold key down.""" self.start_progress() def interact_stop(self): """Handles player interaction, stopping to hold key down.""" self.pause_progress() class ServingWindow(Counter): def __init__( self, pos, game_score: GameScore, plate_dispenser: PlateDispenser = None ): self.game_score = game_score self.plate_dispenser = plate_dispenser super().__init__(pos) def drop_off(self, item) -> Item | None: reward = 5 log.debug(f"Drop off item {item}") # TODO define rewards self.game_score.increment_score(reward) if self.plate_dispenser is not None: self.plate_dispenser.update_plate_out_of_kitchen() return None def can_score(self, item): if ( isinstance(item, CookingEquipment) and "Plate" in item.name and item.content is not None ): if isinstance(item.content, Meal) and item.content.progressed_steps: return item.content.finished if not item.content.item_info.steps_needed and len( item.content.item_info.needs ) == len(item.content.parts): return True return False def can_drop_off(self, item: Item) -> bool: return self.can_score(item) def pick_up(self, on_hands: bool = True): pass def add_plate_dispenser(self, plate_dispenser): self.plate_dispenser = plate_dispenser class Dispenser(Counter): def __init__(self, pos, dispensing): self.dispensing = dispensing super().__init__( pos, self.dispensing.create_item(), ) def pick_up(self, on_hands: bool = True): return_this = self.occupied_by self.occupied_by = self.dispensing.create_item() return return_this def drop_off(self, item: Item) -> Item | None: if self.occupied_by.can_combine(item): return self.occupied_by.combine(item) def can_drop_off(self, item: Item) -> bool: return self.occupied_by.can_combine(item) def __repr__(self): return f"{self.dispensing.name}Dispenser" class PlateDispenser(Counter): def __init__(self, pos, dispensing, plate_config): self.dispensing = dispensing super().__init__(pos) self.occupied_by = deque() self.out_of_kitchen_timer = [] self.plate_config = {"plate_delay": [5, 10]} self.plate_config.update(plate_config) self.next_plate_time = datetime.max self.env_time = create_init_env_time() # is overwritten in progress anyway self.setup_plates() def pick_up(self, on_hands: bool = True): if self.occupied_by: return self.occupied_by.pop() def can_drop_off(self, item: Item) -> bool: return not self.occupied_by or self.occupied_by[-1].can_combine(item) def drop_off(self, item: Item) -> Item | None: """Takes the thing dropped of by the player. Args: item: The item to be placed on the counter. Returns: TODO Return information, whether the score is affected (Serving Window?) """ if not self.occupied_by: self.occupied_by.append(item) elif self.occupied_by[-1].can_combine(item): return self.occupied_by[-1].combine(item) return None def add_dirty_plate(self): self.occupied_by.appendleft(self.dispensing.create_item()) def update_plate_out_of_kitchen(self): """Is called from the serving window to add a plate out of kitchen.""" # not perfect identical to datetime.now but based on framerate enough. time_plate_to_add = self.env_time + timedelta( seconds=np.random.uniform( low=self.plate_config["plate_delay"][0], high=self.plate_config["plate_delay"][1], ) ) log.debug(f"New plate out of kitchen until {time_plate_to_add}") self.out_of_kitchen_timer.append(time_plate_to_add) if time_plate_to_add < self.next_plate_time: self.next_plate_time = time_plate_to_add def setup_plates(self): """Create plates based on the config. Clean and dirty ones.""" if "dirty_plates" in self.plate_config: self.occupied_by.extend( [ self.dispensing.create_item() for _ in range(self.plate_config["dirty_plates"]) ] ) if "clean_plates" in self.plate_config: self.occupied_by.extend( [ self.dispensing.create_item(clean_plate=True) for _ in range(self.plate_config["clean_plates"]) ] ) def progress(self, passed_time: timedelta, now: datetime): """Check if plates arrive from outside the kitchen and add a dirty plate accordingly""" self.env_time = now if self.next_plate_time < now: idx_delete = [] for i, times in enumerate(self.out_of_kitchen_timer): if times < now: idx_delete.append(i) log.debug("Add dirty plate") self.add_dirty_plate() for idx in reversed(idx_delete): self.out_of_kitchen_timer.pop(idx) self.next_plate_time = ( min(self.out_of_kitchen_timer) if self.out_of_kitchen_timer else datetime.max ) def __repr__(self): return "PlateReturn" class Trash(Counter): def pick_up(self, on_hands: bool = True): pass def drop_off(self, item: Item) -> Item | None: if isinstance(item, CookingEquipment): item.content = None return item return None def can_drop_off(self, item: Item) -> bool: return True class Stove(Counter): def can_drop_off(self, item: Item) -> bool: if self.occupied_by is None: return isinstance(item, CookingEquipment) and item.name in ["Pot", "Pan"] else: return self.occupied_by.can_combine(item) def progress(self, passed_time: timedelta, now: datetime): """Called by environment step function for time progression""" if ( self.occupied_by and isinstance(self.occupied_by, CookingEquipment) and self.occupied_by.can_progress() ): self.occupied_by.progress() class Sink(Counter): def __init__(self, pos, sink_addon=None): super().__init__(pos) self.progressing = False self.sink_addon: SinkAddon = sink_addon self.occupied_by = deque() def progress(self, passed_time: timedelta, now: datetime): """Called by environment step function for time progression""" if self.progressing: if self.occupied_by: self.occupied_by[-1].progress() if self.occupied_by[-1].finished: plate = self.occupied_by.pop() if not self.occupied_by: self.pause_progress() plate.finished_call() self.sink_addon.add_clean_plate(plate) def start_progress(self): """Starts the cutting process.""" self.progressing = True def pause_progress(self): """Pauses the cutting process""" self.progressing = False def interact_start(self): """Handles player interaction, starting to hold key down.""" self.start_progress() def interact_stop(self): """Handles player interaction, stopping to hold key down.""" self.pause_progress() def can_drop_off(self, item: Item) -> bool: return isinstance(item, Plate) and not item.clean def drop_off(self, item: Item) -> Item | None: self.occupied_by.appendleft(item) return None def pick_up(self, on_hands: bool = True): return def set_addon(self, sink_addon): self.sink_addon = sink_addon class SinkAddon(Counter): def __init__(self, pos, occupied_by=None): super().__init__(pos) self.occupied_by = deque(occupied_by) if occupied_by else deque() def can_drop_off(self, item: Item) -> bool: return self.occupied_by and self.occupied_by[-1].can_combine(item) def drop_off(self, item: Item) -> Item | None: """Takes the thing dropped of by the player. Args: item: The item to be placed on the counter. Returns: """ return self.occupied_by[-1].combine(item) def add_clean_plate(self, plate: Plate): self.occupied_by.appendleft(plate) def pick_up(self, on_hands: bool = True): if self.occupied_by: return self.occupied_by.pop()