-
Florian Schröder authoredFlorian Schröder authored
counters.py 12.29 KiB
from __future__ import annotations
import logging
from collections import deque
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Optional
from overcooked_simulator.utils import create_init_env_time
if TYPE_CHECKING:
from overcooked_simulator.overcooked_environment import (
GameScore,
)
import numpy as np
import numpy.typing as npt
from overcooked_simulator.game_items import (
CuttableItem,
Item,
CookingEquipment,
Meal,
Plate,
)
log = logging.getLogger(__name__)
class Counter:
"""Simple class for a counter at a specified position (center of counter). Can hold things on top."""
def __init__(self, pos: npt.NDArray[float], occupied_by: Optional[Item] = None):
self.pos: npt.NDArray[float] = pos
self.occupied_by: Optional[Item] = occupied_by
def pick_up(self, on_hands: bool = True):
"""Gets called upon a player performing the pickup action. If the counter can give something to
the player, it does so. In the standard counter this is when an item is on the counter.
Returns: The item which the counter is occupied by. None if nothing is there.
"""
if on_hands:
if self.occupied_by:
occupied_by = self.occupied_by
self.occupied_by = None
return occupied_by
return None
if self.occupied_by and isinstance(self.occupied_by, CookingEquipment):
return self.occupied_by.release()
occupied_by = self.occupied_by
self.occupied_by = None
return occupied_by
def can_drop_off(self, item: Item) -> bool:
"""Checks whether an item by the player can be dropped of. More relevant for example with
ingredient dispensers, which should always be occupied and cannot take an item.
Args:
item: The item for which to check, if it can be placed on the counter.
Returns: True if the item can be placed on the counter, False if not.
"""
return self.occupied_by is None or self.occupied_by.can_combine(item)
def drop_off(self, item: Item) -> Item | None:
"""Takes the thing dropped of by the player.
Args:
item: The item to be placed on the counter.
Returns: TODO Return information, whether the score is affected (Serving Window?)
"""
if self.occupied_by is None:
self.occupied_by = item
elif self.occupied_by.can_combine(item):
return self.occupied_by.combine(item)
return None
def interact_start(self):
"""Starts an interaction by the player. Nothing happens for the standard counter."""
pass
def interact_stop(self):
"""Stops an interaction by the player. Nothing happens for the standard counter."""
pass
def __repr__(self):
return (
f"{self.__class__.__name__}(pos={self.pos},occupied_by={self.occupied_by})"
)
class CuttingBoard(Counter):
def __init__(self, pos: np.ndarray):
self.progressing = False
super().__init__(pos)
def progress(self, passed_time: timedelta, now: datetime):
"""Called by environment step function for time progression"""
if self.progressing:
if isinstance(self.occupied_by, CuttableItem):
self.occupied_by.progress()
def start_progress(self):
"""Starts the cutting process."""
self.progressing = True
def pause_progress(self):
"""Pauses the cutting process"""
self.progressing = False
def interact_start(self):
"""Handles player interaction, starting to hold key down."""
self.start_progress()
def interact_stop(self):
"""Handles player interaction, stopping to hold key down."""
self.pause_progress()
class ServingWindow(Counter):
def __init__(
self, pos, game_score: GameScore, plate_dispenser: PlateDispenser = None
):
self.game_score = game_score
self.plate_dispenser = plate_dispenser
super().__init__(pos)
def drop_off(self, item) -> Item | None:
reward = 5
log.debug(f"Drop off item {item}")
# TODO define rewards
self.game_score.increment_score(reward)
if self.plate_dispenser is not None:
self.plate_dispenser.update_plate_out_of_kitchen()
return None
def can_score(self, item):
if (
isinstance(item, CookingEquipment)
and "Plate" in item.name
and item.content is not None
):
if isinstance(item.content, Meal) and item.content.progressed_steps:
return item.content.finished
if not item.content.item_info.steps_needed and len(
item.content.item_info.needs
) == len(item.content.parts):
return True
return False
def can_drop_off(self, item: Item) -> bool:
return self.can_score(item)
def pick_up(self, on_hands: bool = True):
pass
def add_plate_dispenser(self, plate_dispenser):
self.plate_dispenser = plate_dispenser
class Dispenser(Counter):
def __init__(self, pos, dispensing):
self.dispensing = dispensing
super().__init__(
pos,
self.dispensing.create_item(),
)
def pick_up(self, on_hands: bool = True):
return_this = self.occupied_by
self.occupied_by = self.dispensing.create_item()
return return_this
def drop_off(self, item: Item) -> Item | None:
if self.occupied_by.can_combine(item):
return self.occupied_by.combine(item)
def can_drop_off(self, item: Item) -> bool:
return self.occupied_by.can_combine(item)
def __repr__(self):
return f"{self.dispensing.name}Dispenser"
class PlateDispenser(Counter):
def __init__(self, pos, dispensing, plate_config):
self.dispensing = dispensing
super().__init__(pos)
self.occupied_by = deque()
self.out_of_kitchen_timer = []
self.plate_config = {"plate_delay": [5, 10]}
self.plate_config.update(plate_config)
self.next_plate_time = datetime.max
self.env_time = create_init_env_time() # is overwritten in progress anyway
self.setup_plates()
def pick_up(self, on_hands: bool = True):
if self.occupied_by:
return self.occupied_by.pop()
def can_drop_off(self, item: Item) -> bool:
return not self.occupied_by or self.occupied_by[-1].can_combine(item)
def drop_off(self, item: Item) -> Item | None:
"""Takes the thing dropped of by the player.
Args:
item: The item to be placed on the counter.
Returns: TODO Return information, whether the score is affected (Serving Window?)
"""
if not self.occupied_by:
self.occupied_by.append(item)
elif self.occupied_by[-1].can_combine(item):
return self.occupied_by[-1].combine(item)
return None
def add_dirty_plate(self):
self.occupied_by.appendleft(self.dispensing.create_item())
def update_plate_out_of_kitchen(self):
"""Is called from the serving window to add a plate out of kitchen."""
# not perfect identical to datetime.now but based on framerate enough.
time_plate_to_add = self.env_time + timedelta(
seconds=np.random.uniform(
low=self.plate_config["plate_delay"][0],
high=self.plate_config["plate_delay"][1],
)
)
log.debug(f"New plate out of kitchen until {time_plate_to_add}")
self.out_of_kitchen_timer.append(time_plate_to_add)
if time_plate_to_add < self.next_plate_time:
self.next_plate_time = time_plate_to_add
def setup_plates(self):
"""Create plates based on the config. Clean and dirty ones."""
if "dirty_plates" in self.plate_config:
self.occupied_by.extend(
[
self.dispensing.create_item()
for _ in range(self.plate_config["dirty_plates"])
]
)
if "clean_plates" in self.plate_config:
self.occupied_by.extend(
[
self.dispensing.create_item(clean_plate=True)
for _ in range(self.plate_config["clean_plates"])
]
)
def progress(self, passed_time: timedelta, now: datetime):
"""Check if plates arrive from outside the kitchen and add a dirty plate accordingly"""
self.env_time = now
if self.next_plate_time < now:
idx_delete = []
for i, times in enumerate(self.out_of_kitchen_timer):
if times < now:
idx_delete.append(i)
log.debug("Add dirty plate")
self.add_dirty_plate()
for idx in reversed(idx_delete):
self.out_of_kitchen_timer.pop(idx)
self.next_plate_time = (
min(self.out_of_kitchen_timer)
if self.out_of_kitchen_timer
else datetime.max
)
def __repr__(self):
return "PlateReturn"
class Trash(Counter):
def pick_up(self, on_hands: bool = True):
pass
def drop_off(self, item: Item) -> Item | None:
if isinstance(item, CookingEquipment):
item.content = None
return item
return None
def can_drop_off(self, item: Item) -> bool:
return True
class Stove(Counter):
def can_drop_off(self, item: Item) -> bool:
if self.occupied_by is None:
return isinstance(item, CookingEquipment) and item.name in ["Pot", "Pan"]
else:
return self.occupied_by.can_combine(item)
def progress(self, passed_time: timedelta, now: datetime):
"""Called by environment step function for time progression"""
if (
self.occupied_by
and isinstance(self.occupied_by, CookingEquipment)
and self.occupied_by.can_progress()
):
self.occupied_by.progress()
class Sink(Counter):
def __init__(self, pos, sink_addon=None):
super().__init__(pos)
self.progressing = False
self.sink_addon: SinkAddon = sink_addon
self.occupied_by = deque()
def progress(self, passed_time: timedelta, now: datetime):
"""Called by environment step function for time progression"""
if self.progressing:
if self.occupied_by:
self.occupied_by[-1].progress()
if self.occupied_by[-1].finished:
plate = self.occupied_by.pop()
if not self.occupied_by:
self.pause_progress()
plate.finished_call()
self.sink_addon.add_clean_plate(plate)
def start_progress(self):
"""Starts the cutting process."""
self.progressing = True
def pause_progress(self):
"""Pauses the cutting process"""
self.progressing = False
def interact_start(self):
"""Handles player interaction, starting to hold key down."""
self.start_progress()
def interact_stop(self):
"""Handles player interaction, stopping to hold key down."""
self.pause_progress()
def can_drop_off(self, item: Item) -> bool:
return isinstance(item, Plate) and not item.clean
def drop_off(self, item: Item) -> Item | None:
self.occupied_by.appendleft(item)
return None
def pick_up(self, on_hands: bool = True):
return
def set_addon(self, sink_addon):
self.sink_addon = sink_addon
class SinkAddon(Counter):
def __init__(self, pos, occupied_by=None):
super().__init__(pos)
self.occupied_by = deque(occupied_by) if occupied_by else deque()
def can_drop_off(self, item: Item) -> bool:
return self.occupied_by and self.occupied_by[-1].can_combine(item)
def drop_off(self, item: Item) -> Item | None:
"""Takes the thing dropped of by the player.
Args:
item: The item to be placed on the counter.
Returns:
"""
return self.occupied_by[-1].combine(item)
def add_clean_plate(self, plate: Plate):
self.occupied_by.appendleft(plate)
def pick_up(self, on_hands: bool = True):
if self.occupied_by:
return self.occupied_by.pop()