Skip to content
Snippets Groups Projects
counters.py 13.7 KiB
Newer Older
  • Learn to ignore specific revisions
  • import logging
    from collections import deque
    from datetime import datetime, timedelta
    
    from typing import TYPE_CHECKING, Optional, Callable
    
        from overcooked_simulator.overcooked_environment import (
    
            OrderAndScoreManager,
    
    import numpy.typing as npt
    
    from overcooked_simulator.game_items import (
    
    Fabian Heinrich's avatar
    Fabian Heinrich committed
        """Simple class for a counter at a specified position (center of counter). Can hold things on top."""
    
    
        def __init__(self, pos: npt.NDArray[float], occupied_by: Optional[Item] = None):
    
            self.pos: npt.NDArray[float] = pos
    
            self.occupied_by: Optional[Item] = occupied_by
    
        @property
        def occupied(self):
            return self.occupied_by is not None
    
    
            """Gets called upon a player performing the pickup action. If the counter can give something to
            the player, it does so. In the standard counter this is when an item is on the counter.
    
            Returns: The item which the counter is occupied by. None if nothing is there.
    
            """
    
            if on_hands:
                if self.occupied_by:
                    occupied_by = self.occupied_by
                    self.occupied_by = None
                    return occupied_by
                return None
            if self.occupied_by and isinstance(self.occupied_by, CookingEquipment):
                return self.occupied_by.release()
            occupied_by = self.occupied_by
    
        def can_drop_off(self, item: Item) -> bool:
    
            """Checks whether an item by the player can be dropped of. More relevant for example with
            ingredient dispensers, which should always be occupied and cannot take an item.
    
                item: The item for which to check, if it can be placed on the counter.
    
            Returns: True if the item can be placed on the counter, False if not.
    
            return self.occupied_by is None or self.occupied_by.can_combine(item)
    
        def drop_off(self, item: Item) -> Item | None:
    
            Returns: TODO Return information, whether the score is affected (Serving Window?)
    
            if self.occupied_by is None:
                self.occupied_by = item
    
            elif self.occupied_by.can_combine(item):
    
    
        def interact_start(self):
            """Starts an interaction by the player. Nothing happens for the standard counter."""
            pass
    
        def interact_stop(self):
            """Stops an interaction by the player. Nothing happens for the standard counter."""
            pass
    
    
            return (
                f"{self.__class__.__name__}(pos={self.pos},occupied_by={self.occupied_by})"
            )
    
    
    
    class CuttingBoard(Counter):
    
        def __init__(self, pos: np.ndarray, transitions: dict):
    
            self.progressing = False
    
            self.transitions = transitions
    
            super().__init__(pos)
    
    
        def progress(self, passed_time: timedelta, now: datetime):
    
            """Called by environment step function for time progression"""
    
            if (
                self.occupied
                and self.progressing
                and self.occupied_by.name in self.transitions
            ):
                percent = (
                    passed_time.total_seconds()
                    / self.transitions[self.occupied_by.name]["seconds"]
                )
                self.occupied_by.progress(
                    equipment=self.__class__.__name__, percent=percent
                )
                if self.occupied_by.progress_percentage == 1.0:
                    self.occupied_by.reset()
                    self.occupied_by.name = self.transitions[self.occupied_by.name][
                        "result"
                    ]
    
    
        def start_progress(self):
    
            """Starts the cutting process."""
    
            self.progressing = True
    
        def pause_progress(self):
    
            """Pauses the cutting process"""
    
            self.progressing = False
    
        def interact_start(self):
    
            """Handles player interaction, starting to hold key down."""
    
            self.start_progress()
    
        def interact_stop(self):
    
            """Handles player interaction, stopping to hold key down."""
    
            self.pause_progress()
    
    
            order_and_score: OrderAndScoreManager,
    
    Florian Schröder's avatar
    Florian Schröder committed
            meals: set[str],
    
            env_time_func: Callable[[], datetime],
    
            plate_dispenser: PlateDispenser = None,
    
            self.order_and_score = order_and_score
    
            self.env_time_func = env_time_func
    
        def drop_off(self, item) -> Item | None:
    
            env_time = self.env_time_func()
            if self.order_and_score.serve_meal(item=item, env_time=env_time):
                if self.plate_dispenser is not None:
                    self.plate_dispenser.update_plate_out_of_kitchen(env_time=env_time)
                return None
            return item
    
        def can_drop_off(self, item: Item) -> bool:
    
            return isinstance(item, CookingEquipment) and (
                (item.content_ready is not None and item.content_ready.name in self.meals)
                or (len(item.content_list) == 1 and item.content_list[0].name in self.meals)
            )
    
        def add_plate_dispenser(self, plate_dispenser):
            self.plate_dispenser = plate_dispenser
    
    
    class Dispenser(Counter):
    
            self.dispensing = dispensing
    
    Fabian Heinrich's avatar
    Fabian Heinrich committed
            super().__init__(
                pos,
    
    Fabian Heinrich's avatar
    Fabian Heinrich committed
            )
    
        def drop_off(self, item: Item) -> Item | None:
    
            if self.occupied_by.can_combine(item):
                return self.occupied_by.combine(item)
    
        def can_drop_off(self, item: Item) -> bool:
    
            return self.occupied_by.can_combine(item)
    
        def __repr__(self):
    
            return f"{self.dispensing.name}Dispenser"
    
        def create_item(self):
            kwargs = {
                "name": self.dispensing.name,
                "item_info": self.dispensing,
            }
            return Item(**kwargs)
    
    
        def __init__(
            self, pos, dispensing, plate_config, plate_transitions, **kwargs
        ) -> None:
            super().__init__(pos, **kwargs)
    
            self.occupied_by = deque()
            self.out_of_kitchen_timer = []
            self.plate_config = {"plate_delay": [5, 10]}
            self.plate_config.update(plate_config)
            self.next_plate_time = datetime.max
    
        def can_drop_off(self, item: Item) -> bool:
    
            return not self.occupied_by or self.occupied_by[-1].can_combine(item)
    
    
        def drop_off(self, item: Item) -> Item | None:
            """Takes the thing dropped of by the player.
    
            Args:
                item: The item to be placed on the counter.
    
            Returns: TODO Return information, whether the score is affected (Serving Window?)
    
            """
            if not self.occupied_by:
                self.occupied_by.append(item)
            elif self.occupied_by[-1].can_combine(item):
                return self.occupied_by[-1].combine(item)
            return None
    
        def add_dirty_plate(self):
    
            self.occupied_by.appendleft(self.create_item())
    
        def update_plate_out_of_kitchen(self, env_time: datetime):
    
            """Is called from the serving window to add a plate out of kitchen."""
    
            # not perfect identical to datetime.now but based on framerate enough.
    
            time_plate_to_add = env_time + timedelta(
    
                seconds=np.random.uniform(
                    low=self.plate_config["plate_delay"][0],
                    high=self.plate_config["plate_delay"][1],
                )
            )
            log.debug(f"New plate out of kitchen until {time_plate_to_add}")
            self.out_of_kitchen_timer.append(time_plate_to_add)
            if time_plate_to_add < self.next_plate_time:
                self.next_plate_time = time_plate_to_add
    
        def setup_plates(self):
            """Create plates based on the config. Clean and dirty ones."""
            if "dirty_plates" in self.plate_config:
                self.occupied_by.extend(
    
                    [self.create_item() for _ in range(self.plate_config["dirty_plates"])]
    
                )
            if "clean_plates" in self.plate_config:
                self.occupied_by.extend(
                    [
    
        def progress(self, passed_time: timedelta, now: datetime):
    
            """Check if plates arrive from outside the kitchen and add a dirty plate accordingly"""
            if self.next_plate_time < now:
                idx_delete = []
                for i, times in enumerate(self.out_of_kitchen_timer):
                    if times < now:
                        idx_delete.append(i)
                        log.debug("Add dirty plate")
                        self.add_dirty_plate()
                for idx in reversed(idx_delete):
                    self.out_of_kitchen_timer.pop(idx)
                self.next_plate_time = (
                    min(self.out_of_kitchen_timer)
                    if self.out_of_kitchen_timer
                    else datetime.max
                )
    
        def create_item(self, clean: bool = False):
            kwargs = {
                "clean": clean,
                "transitions": self.plate_transitions,
                "item_info": self.dispensing,
            }
            return Plate(**kwargs)
    
    
        def drop_off(self, item: Item) -> Item | None:
    
                item.reset_content()
    
        def can_drop_off(self, item: Item) -> bool:
    
    
    class Stove(Counter):
    
        def can_drop_off(self, item: Item) -> bool:
            if self.occupied_by is None:
                return isinstance(item, CookingEquipment) and item.name in ["Pot", "Pan"]
            else:
                return self.occupied_by.can_combine(item)
    
    
        def progress(self, passed_time: timedelta, now: datetime):
    
            """Called by environment step function for time progression"""
    
            if (
                self.occupied_by
                and isinstance(self.occupied_by, CookingEquipment)
                and self.occupied_by.can_progress()
            ):
    
                self.occupied_by.progress(passed_time, now)
    
        def __init__(self, pos, transitions, sink_addon=None):
    
            self.sink_addon: SinkAddon = sink_addon
            self.occupied_by = deque()
    
            self.transitions = transitions
    
        @property
        def occupied(self):
            return len(self.occupied_by) != 0
    
        def progress(self, passed_time: timedelta, now: datetime):
    
            """Called by environment step function for time progression"""
    
            if (
                self.occupied
                and self.progressing
                and self.occupied_by[-1].name in self.transitions
            ):
                percent = (
                    passed_time.total_seconds()
                    / self.transitions[self.occupied_by[-1].name]["seconds"]
                )
                self.occupied_by[-1].progress(
                    equipment=self.__class__.__name__, percent=percent
                )
                if self.occupied_by[-1].progress_percentage == 1.0:
                    self.occupied_by[-1].reset()
    
                    print(self.transitions[self.occupied_by[-1].name]["result"])
    
                    self.occupied_by[-1].name = self.transitions[self.occupied_by[-1].name][
                        "result"
                    ]
                    plate = self.occupied_by.pop()
    
    
        def start_progress(self):
            """Starts the cutting process."""
            self.progressing = True
    
        def pause_progress(self):
            """Pauses the cutting process"""
            self.progressing = False
    
        def interact_start(self):
            """Handles player interaction, starting to hold key down."""
            self.start_progress()
    
        def interact_stop(self):
            """Handles player interaction, stopping to hold key down."""
            self.pause_progress()
    
        def can_drop_off(self, item: Item) -> bool:
            return isinstance(item, Plate) and not item.clean
    
    
        def drop_off(self, item: Item) -> Item | None:
            self.occupied_by.appendleft(item)
            return None
    
        def pick_up(self, on_hands: bool = True):
            return
    
        def set_addon(self, sink_addon):
            self.sink_addon = sink_addon
    
    
        def __init__(self, pos, occupied_by=None):
    
            self.occupied_by = deque([occupied_by]) if occupied_by else deque()
    
            return self.occupied_by and self.occupied_by[-1].can_combine(item)
    
        def drop_off(self, item: Item) -> Item | None:
            """Takes the thing dropped of by the player.
    
            Args:
                item: The item to be placed on the counter.
    
    
            return self.occupied_by[-1].combine(item)
    
    
        def add_clean_plate(self, plate: Plate):
            self.occupied_by.appendleft(plate)
    
            if self.occupied_by:
                return self.occupied_by.pop()