Newer
Older

Fabian Heinrich
committed
from __future__ import annotations

Florian Schröder
committed
import logging
from collections import deque
from datetime import datetime, timedelta

Florian Schröder
committed
from typing import TYPE_CHECKING, Optional

Fabian Heinrich
committed
if TYPE_CHECKING:

Fabian Heinrich
committed
from overcooked_simulator.overcooked_environment import (
GameScore,
)

Fabian Heinrich
committed
from overcooked_simulator.game_items import (
CuttableItem,

Florian Schröder
committed
CookingEquipment,

Fabian Heinrich
committed
Plate,
)

Florian Schröder
committed
log = logging.getLogger(__name__)
"""Simple class for a counter at a specified position (center of counter). Can hold things on top."""
def __init__(self, pos: npt.NDArray[float], occupied_by: Optional[Item] = None):
self.pos: npt.NDArray[float] = pos
self.occupied_by: Optional[Item] = occupied_by

Florian Schröder
committed
def pick_up(self, on_hands: bool = True):

Fabian Heinrich
committed
"""Gets called upon a player performing the pickup action. If the counter can give something to
the player, it does so. In the standard counter this is when an item is on the counter.
Returns: The item which the counter is occupied by. None if nothing is there.
"""

Florian Schröder
committed
if on_hands:
if self.occupied_by:
occupied_by = self.occupied_by
self.occupied_by = None
return occupied_by
return None
if self.occupied_by and isinstance(self.occupied_by, CookingEquipment):
return self.occupied_by.release()
occupied_by = self.occupied_by

Fabian Heinrich
committed
self.occupied_by = None

Florian Schröder
committed
return occupied_by

Fabian Heinrich
committed
def can_drop_off(self, item: Item) -> bool:

Fabian Heinrich
committed
"""Checks whether an item by the player can be dropped of. More relevant for example with
ingredient dispensers, which should always be occupied and cannot take an item.

Fabian Heinrich
committed
Args:

Fabian Heinrich
committed
item: The item for which to check, if it can be placed on the counter.

Fabian Heinrich
committed

Fabian Heinrich
committed
Returns: True if the item can be placed on the counter, False if not.

Fabian Heinrich
committed
"""

Fabian Heinrich
committed
return self.occupied_by is None or self.occupied_by.can_combine(item)

Fabian Heinrich
committed
def drop_off(self, item: Item) -> Item | None:

Fabian Heinrich
committed
"""Takes the thing dropped of by the player.

Fabian Heinrich
committed
Args:

Fabian Heinrich
committed
item: The item to be placed on the counter.

Fabian Heinrich
committed
Returns: TODO Return information, whether the score is affected (Serving Window?)

Fabian Heinrich
committed
"""

Fabian Heinrich
committed
if self.occupied_by is None:
self.occupied_by = item
elif self.occupied_by.can_combine(item):

Florian Schröder
committed
return self.occupied_by.combine(item)

Florian Schröder
committed
return None

Fabian Heinrich
committed
def interact_start(self):
"""Starts an interaction by the player. Nothing happens for the standard counter."""
pass
def interact_stop(self):
"""Stops an interaction by the player. Nothing happens for the standard counter."""
pass
return (
f"{self.__class__.__name__}(pos={self.pos},occupied_by={self.occupied_by})"
)

Fabian Heinrich
committed
def __init__(self, pos: np.ndarray):
self.progressing = False
super().__init__(pos)
def progress(self):
"""Called by environment step function for time progression"""
if self.progressing:
if isinstance(self.occupied_by, CuttableItem):
self.occupied_by.progress()
def start_progress(self):
self.progressing = True
def pause_progress(self):
self.progressing = False
def interact_start(self):
"""Handles player interaction, starting to hold key down."""
self.start_progress()
def interact_stop(self):
"""Handles player interaction, stopping to hold key down."""
class ServingWindow(Counter):

Florian Schröder
committed
def __init__(
self, pos, game_score: GameScore, plate_dispenser: PlateDispenser = None
):

Florian Schröder
committed
self.plate_dispenser = plate_dispenser

Fabian Heinrich
committed
super().__init__(pos)

Fabian Heinrich
committed
reward = 5

Fabian Heinrich
committed
print(item)

Florian Schröder
committed
# TODO define rewards

Fabian Heinrich
committed
self.game_score.increment_score(reward)

Florian Schröder
committed
if self.plate_dispenser is not None:
self.plate_dispenser.update_plate_out_of_kitchen()

Florian Schröder
committed
return None
def can_score(self, item):
if (
isinstance(item, CookingEquipment)
and "Plate" in item.name
and item.content is not None
):
if isinstance(item.content, Meal) and item.content.progressed_steps:
if not item.item_info.steps_needed and len(
item.content.item_info.needs
) == len(item.content.parts):
def can_drop_off(self, item: Item) -> bool:
return self.can_score(item)

Florian Schröder
committed
def pick_up(self, on_hands: bool = True):
pass

Florian Schröder
committed
def add_plate_dispenser(self, plate_dispenser):
self.plate_dispenser = plate_dispenser
class Dispenser(Counter):
def __init__(self, pos, dispensing):
self.dispensing = dispensing
self.dispensing.create_item(),

Florian Schröder
committed
def pick_up(self, on_hands: bool = True):
return_this = self.occupied_by
self.occupied_by = self.dispensing.create_item()
return return_this
def drop_off(self, item: Item) -> Item | None:
if self.occupied_by.can_combine(item):
return self.occupied_by.combine(item)
def can_drop_off(self, item: Item) -> bool:
return self.occupied_by.can_combine(item)
return f"{self.dispensing.name}Dispenser"

Florian Schröder
committed
class PlateDispenser(Counter):
def __init__(self, pos, dispensing, plate_config):

Fabian Heinrich
committed
self.dispensing = dispensing
super().__init__(pos)

Florian Schröder
committed
self.occupied_by = deque()
self.out_of_kitchen_timer = []
self.plate_config = {"plate_delay": [5, 10]}
self.plate_config.update(plate_config)
self.next_plate_time = datetime.max
self.setup_plates()

Fabian Heinrich
committed
def pick_up(self, on_hands: bool = True):

Florian Schröder
committed
if self.occupied_by:
return self.occupied_by.pop()

Fabian Heinrich
committed
def can_drop_off(self, item: Item) -> bool:
return not self.occupied_by or self.occupied_by[-1].can_combine(item)

Florian Schröder
committed
def drop_off(self, item: Item) -> Item | None:
"""Takes the thing dropped of by the player.

Fabian Heinrich
committed

Florian Schröder
committed
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
Args:
item: The item to be placed on the counter.
Returns: TODO Return information, whether the score is affected (Serving Window?)
"""
if not self.occupied_by:
self.occupied_by.append(item)
elif self.occupied_by[-1].can_combine(item):
return self.occupied_by[-1].combine(item)
return None
def add_dirty_plate(self):
self.occupied_by.appendleft(self.dispensing.create_item())
def update_plate_out_of_kitchen(self):
"""Is called from the serving window to add a plate out of kitchen."""
time_plate_to_add = datetime.now() + timedelta(
seconds=np.random.uniform(
low=self.plate_config["plate_delay"][0],
high=self.plate_config["plate_delay"][1],
)
)
log.debug(f"New plate out of kitchen until {time_plate_to_add}")
self.out_of_kitchen_timer.append(time_plate_to_add)
if time_plate_to_add < self.next_plate_time:
self.next_plate_time = time_plate_to_add
def setup_plates(self):
"""Create plates based on the config. Clean and dirty ones."""
if "dirty_plates" in self.plate_config:
self.occupied_by.extend(
[
self.dispensing.create_item()
for _ in range(self.plate_config["dirty_plates"])
]
)
if "clean_plates" in self.plate_config:
self.occupied_by.extend(
[
self.dispensing.create_item(clean_plate=True)
for _ in range(self.plate_config["clean_plates"])
]
)
def progress(self):
"""Check if plates arrive from outside the kitchen and add a dirty plate accordingly"""
now = datetime.now()
if self.next_plate_time < now:
idx_delete = []
for i, times in enumerate(self.out_of_kitchen_timer):
if times < now:
idx_delete.append(i)
log.debug("Add dirty plate")
self.add_dirty_plate()
for idx in reversed(idx_delete):
self.out_of_kitchen_timer.pop(idx)
self.next_plate_time = (
min(self.out_of_kitchen_timer)
if self.out_of_kitchen_timer
else datetime.max
)

Fabian Heinrich
committed
def __repr__(self):
return "PlateReturn"
class Trash(Counter):

Florian Schröder
committed
def pick_up(self, on_hands: bool = True):
pass
def drop_off(self, item: Item) -> Item | None:

Florian Schröder
committed
if isinstance(item, CookingEquipment):
item.content = None
return item

Florian Schröder
committed
return None
def can_drop_off(self, item: Item) -> bool:
return True
def can_drop_off(self, item: Item) -> bool:
if self.occupied_by is None:
return isinstance(item, CookingEquipment) and item.name in ["Pot", "Pan"]
else:
return self.occupied_by.can_combine(item)

Florian Schröder
committed
def progress(self):
"""Called by environment step function for time progression"""

Florian Schröder
committed
if (
self.occupied_by
and isinstance(self.occupied_by, CookingEquipment)
and self.occupied_by.can_progress()
):

Florian Schröder
committed
self.occupied_by.progress()

Fabian Heinrich
committed
class Sink(Counter):
def __init__(self, pos, sink_addon=None):

Fabian Heinrich
committed
super().__init__(pos)
self.progressing = False
self.sink_addon: SinkAddon = sink_addon
self.occupied_by = deque()

Fabian Heinrich
committed
def progress(self):
"""Called by environment step function for time progression"""
if self.progressing:
if self.occupied_by:
self.occupied_by[-1].progress()
if self.occupied_by[-1].finished:
plate = self.occupied_by.pop()
if not self.occupied_by:
self.pause_progress()
plate.finished_call()
self.sink_addon.add_clean_plate(plate)

Fabian Heinrich
committed
def start_progress(self):
"""Starts the cutting process."""
self.progressing = True
def pause_progress(self):
"""Pauses the cutting process"""
self.progressing = False
def interact_start(self):
"""Handles player interaction, starting to hold key down."""
print(self.occupied_by)
if isinstance(self.occupied_by, Plate):
print(self.occupied_by.can_progress())
self.start_progress()
def interact_stop(self):
"""Handles player interaction, stopping to hold key down."""
self.pause_progress()
def can_drop_off(self, item: Item) -> bool:
return isinstance(item, Plate) and not item.clean
def drop_off(self, item: Item) -> Item | None:
self.occupied_by.appendleft(item)
return None
def pick_up(self, on_hands: bool = True):
return
def set_addon(self, sink_addon):
self.sink_addon = sink_addon

Fabian Heinrich
committed
class SinkAddon(Counter):
def __init__(self, pos, occupied_by=None):

Fabian Heinrich
committed
super().__init__(pos)
self.occupied_by = deque(occupied_by) if occupied_by else deque()

Fabian Heinrich
committed
def can_drop_off(self, item: Item) -> bool:
return not self.occupied_by or self.occupied_by[-1].can_combine(item)

Fabian Heinrich
committed
def drop_off(self, item: Item) -> Item | None:
"""Takes the thing dropped of by the player.
Args:
item: The item to be placed on the counter.
Returns: TODO Return information, whether the score is affected (Serving Window?)
"""
if not self.occupied_by:
self.occupied_by.append(item)
elif self.occupied_by[-1].can_combine(item):
return self.occupied_by[-1].combine(item)
return None
def add_clean_plate(self, plate: Plate):
self.occupied_by.appendleft(plate)

Fabian Heinrich
committed
def pick_up(self, on_hands: bool = True):
if self.occupied_by:
return self.occupied_by.pop()