Skip to content
Snippets Groups Projects
Commit 38af7c1a authored by Fabian Heinrich's avatar Fabian Heinrich
Browse files

Merge branch '50-plate-washing-cycle-2' into 'main'

Resolve "Plate washing cycle"

Closes #50

See merge request scs/cocosy/overcooked-simulator!20
parents 234bab1c 3a97e072
No related branches found
No related tags found
1 merge request!20Resolve "Plate washing cycle"
Pipeline #42406 passed
Showing
with 371 additions and 30 deletions
from __future__ import annotations
import logging
from collections import deque
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from overcooked_simulator.overcooked_environment import GameScore
from overcooked_simulator.overcooked_environment import (
GameScore,
)
import numpy as np
import numpy.typing as npt
......@@ -14,8 +18,10 @@ from overcooked_simulator.game_items import (
Item,
CookingEquipment,
Meal,
Plate,
)
log = logging.getLogger(__name__)
......@@ -115,15 +121,20 @@ class CuttingBoard(Counter):
class ServingWindow(Counter):
def __init__(self, pos, game_score: GameScore):
def __init__(
self, pos, game_score: GameScore, plate_dispenser: PlateDispenser = None
):
self.game_score = game_score
self.plate_dispenser = plate_dispenser
super().__init__(pos)
def drop_off(self, item) -> Item | None:
reward = 5
log.debug(f"Drop off {item}")
log.debug(f"Drop off item {item}")
# TODO define rewards
self.game_score.increment_score(reward)
if self.plate_dispenser is not None:
self.plate_dispenser.update_plate_out_of_kitchen()
return None
def can_score(self, item):
......@@ -146,6 +157,9 @@ class ServingWindow(Counter):
def pick_up(self, on_hands: bool = True):
pass
def add_plate_dispenser(self, plate_dispenser):
self.plate_dispenser = plate_dispenser
class Dispenser(Counter):
def __init__(self, pos, dispensing):
......@@ -171,6 +185,94 @@ class Dispenser(Counter):
return f"{self.dispensing.name}Dispenser"
class PlateDispenser(Counter):
def __init__(self, pos, dispensing, plate_config):
self.dispensing = dispensing
super().__init__(pos)
self.occupied_by = deque()
self.out_of_kitchen_timer = []
self.plate_config = {"plate_delay": [5, 10]}
self.plate_config.update(plate_config)
self.next_plate_time = datetime.max
self.setup_plates()
def pick_up(self, on_hands: bool = True):
if self.occupied_by:
return self.occupied_by.pop()
def can_drop_off(self, item: Item) -> bool:
return not self.occupied_by or self.occupied_by[-1].can_combine(item)
def drop_off(self, item: Item) -> Item | None:
"""Takes the thing dropped of by the player.
Args:
item: The item to be placed on the counter.
Returns: TODO Return information, whether the score is affected (Serving Window?)
"""
if not self.occupied_by:
self.occupied_by.append(item)
elif self.occupied_by[-1].can_combine(item):
return self.occupied_by[-1].combine(item)
return None
def add_dirty_plate(self):
self.occupied_by.appendleft(self.dispensing.create_item())
def update_plate_out_of_kitchen(self):
"""Is called from the serving window to add a plate out of kitchen."""
time_plate_to_add = datetime.now() + timedelta(
seconds=np.random.uniform(
low=self.plate_config["plate_delay"][0],
high=self.plate_config["plate_delay"][1],
)
)
log.debug(f"New plate out of kitchen until {time_plate_to_add}")
self.out_of_kitchen_timer.append(time_plate_to_add)
if time_plate_to_add < self.next_plate_time:
self.next_plate_time = time_plate_to_add
def setup_plates(self):
"""Create plates based on the config. Clean and dirty ones."""
if "dirty_plates" in self.plate_config:
self.occupied_by.extend(
[
self.dispensing.create_item()
for _ in range(self.plate_config["dirty_plates"])
]
)
if "clean_plates" in self.plate_config:
self.occupied_by.extend(
[
self.dispensing.create_item(clean_plate=True)
for _ in range(self.plate_config["clean_plates"])
]
)
def progress(self):
"""Check if plates arrive from outside the kitchen and add a dirty plate accordingly"""
now = datetime.now()
if self.next_plate_time < now:
idx_delete = []
for i, times in enumerate(self.out_of_kitchen_timer):
if times < now:
idx_delete.append(i)
log.debug("Add dirty plate")
self.add_dirty_plate()
for idx in reversed(idx_delete):
self.out_of_kitchen_timer.pop(idx)
self.next_plate_time = (
min(self.out_of_kitchen_timer)
if self.out_of_kitchen_timer
else datetime.max
)
def __repr__(self):
return "PlateReturn"
class Trash(Counter):
def pick_up(self, on_hands: bool = True):
pass
......@@ -200,3 +302,79 @@ class Stove(Counter):
and self.occupied_by.can_progress()
):
self.occupied_by.progress()
class Sink(Counter):
def __init__(self, pos, sink_addon=None):
super().__init__(pos)
self.progressing = False
self.sink_addon: SinkAddon = sink_addon
self.occupied_by = deque()
def progress(self):
"""Called by environment step function for time progression"""
if self.progressing:
if self.occupied_by:
self.occupied_by[-1].progress()
if self.occupied_by[-1].finished:
plate = self.occupied_by.pop()
if not self.occupied_by:
self.pause_progress()
plate.finished_call()
self.sink_addon.add_clean_plate(plate)
def start_progress(self):
"""Starts the cutting process."""
self.progressing = True
def pause_progress(self):
"""Pauses the cutting process"""
self.progressing = False
def interact_start(self):
"""Handles player interaction, starting to hold key down."""
self.start_progress()
def interact_stop(self):
"""Handles player interaction, stopping to hold key down."""
self.pause_progress()
def can_drop_off(self, item: Item) -> bool:
return isinstance(item, Plate) and not item.clean
def drop_off(self, item: Item) -> Item | None:
self.occupied_by.appendleft(item)
return None
def pick_up(self, on_hands: bool = True):
return
def set_addon(self, sink_addon):
self.sink_addon = sink_addon
class SinkAddon(Counter):
def __init__(self, pos, occupied_by=None):
super().__init__(pos)
self.occupied_by = deque(occupied_by) if occupied_by else deque()
def can_drop_off(self, item: Item) -> bool:
return self.occupied_by and self.occupied_by[-1].can_combine(item)
def drop_off(self, item: Item) -> Item | None:
"""Takes the thing dropped of by the player.
Args:
item: The item to be placed on the counter.
Returns:
"""
return self.occupied_by[-1].combine(item)
def add_clean_plate(self, plate: Plate):
self.occupied_by.appendleft(plate)
def pick_up(self, on_hands: bool = True):
if self.occupied_by:
return self.occupied_by.pop()
counter_side_length: 40
world_width: 800
world_height: 600
plates:
clean_plates: 3
dirty_plates: 2
plate_delay: [ 5, 10 ]
\ No newline at end of file
......@@ -35,14 +35,14 @@ Burger:
Salad:
type: Meal
needs: [ ChoppedLettuce, ChoppedTomato ]
needs: [ ChoppedLettuce, Tomato ]
equipment: Plate
TomatoSoup:
type: Meal
finished_progress_name: TomatoSoup
steps_needed: 500
needs: [ ChoppedTomato, ChoppedTomato, ChoppedTomato ]
needs: [ Tomato, Tomato, Tomato ]
equipment: Pot
OnionSoup:
......@@ -54,6 +54,8 @@ OnionSoup:
Plate:
type: Equipment
is_cuttable: True
steps_needed: 200
Pot:
type: Equipment
......
......@@ -4,8 +4,8 @@ _#_______M_______
_#_______#_______
_W_______________
_#__A__A_________
_P_______________
_#_______#_______
_C_______________
_C_______#_______
_#_______X_______
_##CC#####_______
_#P#S+#S+#_______
_________________
_
\ No newline at end of file
_____
_____
____P
\ No newline at end of file
from __future__ import annotations
import dataclasses
import logging
from enum import Enum
log = logging.getLogger(__name__)
class ItemType(Enum):
Ingredient = "Ingredient"
......@@ -32,7 +35,7 @@ class ItemInfo:
return self.finished_progress_name.replace("*", self.name)
return self.name
def create_item(self, parts=None) -> Item:
def create_item(self, clean_plate=False, parts=None) -> Item:
match self.type:
case ItemType.Ingredient:
if self.is_cuttable:
......@@ -45,7 +48,16 @@ class ItemInfo:
)
return Item(name=self.name, item_info=self)
case ItemType.Equipment:
return CookingEquipment(name=self.name, item_info=self)
if "Plate" in self.name:
return Plate(
name=self.name,
steps_needed=self.steps_needed,
finished=False,
item_info=self,
clean=clean_plate,
)
else:
return CookingEquipment(name=self.name, item_info=self)
case ItemType.Meal:
return Meal(
name=self.name,
......@@ -165,7 +177,7 @@ class CookingEquipment(Item):
if isinstance(other, CookingEquipment):
return other.can_release_content()
# TODO check other is start of a meal, create meal
if isinstance(other, Meal) and self.name == "Plate":
if isinstance(other, Meal) and "Plate" in self.name:
return not other.steps_needed or other.finished
return self.item_info.can_start_meal([other])
if self.content.can_combine(other):
......@@ -182,7 +194,7 @@ class CookingEquipment(Item):
if isinstance(other, CookingEquipment):
self.content = other.release()
return other
if isinstance(other, Meal) and self.name == "Plate":
if isinstance(other, Meal) and "Plate" in self.name:
self.content = other
return
# find starting meal for other
......@@ -272,3 +284,39 @@ class Meal(ProgressibleItem):
@property
def extra_repr(self):
return self.parts
class Plate(CookingEquipment):
def __init__(
self, clean, steps_needed, finished, content: Meal = None, *args, **kwargs
):
super().__init__(content, *args, **kwargs)
self.clean = clean
self.name = self.create_name()
self.steps_needed = steps_needed
self.finished = finished
self.progressed_steps = steps_needed if finished else 0
def finished_call(self):
self.clean = True
self.name = self.create_name()
def progress(self):
"""Progresses the item process as long as it is not finished."""
if self.progressed_steps >= self.steps_needed:
self.finished = True
self.finished_call()
if not self.finished:
self.progressed_steps += 1
def can_progress(self, counter_type="Sink") -> bool:
return not self.clean
def can_combine(self, other):
return self.clean and super().can_combine(other)
def combine(self, other):
return super().combine(other)
def create_name(self):
return "CleanPlate" if self.clean else "DirtyPlate"
......@@ -3,6 +3,7 @@ from __future__ import annotations
import logging
import random
from pathlib import Path
from threading import Lock
import numpy as np
import numpy.typing as npt
......@@ -16,9 +17,11 @@ from overcooked_simulator.counters import (
Dispenser,
ServingWindow,
Stove,
Sink,
PlateDispenser,
SinkAddon,
)
from overcooked_simulator.game_items import ItemInfo, ItemType
# if TYPE_CHECKING:
from overcooked_simulator.player import Player
log = logging.getLogger(__name__)
......@@ -53,6 +56,18 @@ class Action:
return f"Action({self.player},{self.act_type},{self.action})"
class GameScore:
def __init__(self):
self.score = 0
def increment_score(self, score: int):
self.score += score
log.debug(f"Score: {self.score}")
def read_score(self):
return self.score
class Environment:
"""Environment class which handles the game logic for the overcooked-inspired environment.
......@@ -61,6 +76,7 @@ class Environment:
"""
def __init__(self, env_config_path: Path, layout_path, item_info_path: Path):
self.lock = Lock()
self.players: dict[str, Player] = {}
with open(env_config_path, "r") as file:
......@@ -72,6 +88,7 @@ class Environment:
self.item_info = self.load_item_info()
self.game_score = GameScore()
self.SYMBOL_TO_CHARACTER_MAP = {
"#": Counter,
"C": CuttingBoard,
......@@ -79,7 +96,11 @@ class Environment:
"W": lambda pos: ServingWindow(pos, self.game_score),
"T": lambda pos: Dispenser(pos, self.item_info["Tomato"]),
"L": lambda pos: Dispenser(pos, self.item_info["Lettuce"]),
"P": lambda pos: Dispenser(pos, self.item_info["Plate"]),
"P": lambda pos: PlateDispenser(
pos,
self.item_info["Plate"],
environment_config["plates"] if "plates" in environment_config else {},
),
"N": lambda pos: Dispenser(pos, self.item_info["Onion"]), # N for oNioN
"_": "Free",
"A": "Agent",
......@@ -93,6 +114,8 @@ class Environment:
), # Stove with pot: U because it looks like a pot
"B": lambda pos: Dispenser(pos, self.item_info["Bun"]),
"M": lambda pos: Dispenser(pos, self.item_info["Meat"]),
"S": lambda pos: Sink(pos),
"+": SinkAddon,
}
(
......@@ -101,6 +124,8 @@ class Environment:
self.free_positions,
) = self.parse_layout_file(self.layout_path)
self.init_counters()
self.score: int = 0
self.world_width: int = environment_config["world_width"]
......@@ -405,9 +430,10 @@ class Environment:
"""Performs a step of the environment. Affects time based events such as cooking or cutting things, orders
and time limits.
"""
for counter in self.counters:
if isinstance(counter, (CuttingBoard, Stove)):
counter.progress()
with self.lock:
for counter in self.counters:
if isinstance(counter, (CuttingBoard, Stove, Sink, PlateDispenser)):
counter.progress()
def get_state(self):
"""Get the current state of the game environment. The state here is accessible by the current python objects.
......@@ -424,3 +450,34 @@ class Environment:
"""
pass
def init_counters(self):
plate_dispenser = self.get_counter_of_type(PlateDispenser)
assert len(plate_dispenser) > 0, "No Plate Return in the environment"
sink_addons = self.get_counter_of_type(SinkAddon)
for counter in self.counters:
match counter:
case ServingWindow():
counter.add_plate_dispenser(plate_dispenser[0])
case Sink(pos=pos):
assert len(sink_addons) > 0, "No SinkAddon but normal Sink"
closest_addon = self.get_closest(pos, sink_addons)
assert self.counter_side_length - (
self.counter_side_length * 0.05
) <= np.linalg.norm(
closest_addon.pos - pos
), f"No SinkAddon connected to Sink at pos {pos}"
counter.set_addon(closest_addon)
pass
@staticmethod
def get_closest(pos: npt.NDArray[float], counter: list[Counter]):
return min(counter, key=lambda c: np.linalg.norm(c.pos - pos))
def get_counter_of_type(self, counter_type) -> list[Counter]:
return list(
filter(lambda counter: isinstance(counter, counter_type), self.counters)
)
import logging
from collections import deque
from pathlib import Path
from typing import Optional
......@@ -107,7 +108,9 @@ class Player:
elif counter.can_drop_off(self.holding):
self.holding = counter.drop_off(self.holding)
elif self.holding.can_combine(counter.occupied_by):
elif not isinstance(
counter.occupied_by, (list, deque)
) and self.holding.can_combine(counter.occupied_by):
returned_by_counter = counter.pick_up(on_hands=False)
self.holding.combine(returned_by_counter)
......
overcooked_simulator/pygame_gui/images/plate_clean.png

334 KiB

overcooked_simulator/pygame_gui/images/plate_dirty.png

269 KiB

......@@ -2,6 +2,7 @@ import colorsys
import logging
import math
import sys
from collections import deque
import numpy as np
import numpy.typing as npt
......@@ -15,13 +16,14 @@ from overcooked_simulator.game_items import (
Item,
CookingEquipment,
Meal,
Plate,
)
from overcooked_simulator.overcooked_environment import Action
from overcooked_simulator.pygame_gui.game_colors import BLUE
from overcooked_simulator.pygame_gui.game_colors import colors, Color
from overcooked_simulator.simulation_runner import Simulator
USE_PLAYER_COOK_SPRITES = False
USE_PLAYER_COOK_SPRITES = True
SHOW_INTERACTION_RANGE = False
......@@ -324,7 +326,7 @@ class PyGameGUI:
pos, self.visualization_config[item.name]["parts"], scale=scale
)
if isinstance(item, ProgressibleItem) and not item.finished:
if isinstance(item, (ProgressibleItem, Plate)) and not item.finished:
self.draw_progress_bar(pos, item.progressed_steps, item.steps_needed)
if isinstance(item, CookingEquipment) and item.content:
......@@ -374,11 +376,12 @@ class PyGameGUI:
if counter.occupied_by is not None:
# Multiple plates on plate return:
if isinstance(counter.occupied_by, list):
for i, o in enumerate(counter.occupied_by):
self.draw_item(
np.abs([counter.pos[0], counter.pos[1] - (i * 3)]), o
)
if isinstance(counter.occupied_by, (list, deque)):
with self.simulator.env.lock:
for i, o in enumerate(counter.occupied_by):
self.draw_item(
np.abs([counter.pos[0], counter.pos[1] - (i * 3)]), o
)
# All other items:
else:
self.draw_item(counter.pos, counter.occupied_by)
......
......@@ -92,6 +92,37 @@ Stove:
type: circle
radius: 0.25
Sink:
parts:
- color: black
type: rect
height: 0.875
width: 0.625
- color: darkslategray1
type: circle
radius: 0.45
SinkAddon:
parts:
- color: black
type: rect
height: 0.875
width: 0.625
- type: rect
color: gray83
height: 0.8
width: 0.1
center_offset: [ -0.4, 0.1 ]
- type: rect
color: gray83
height: 0.8
width: 0.1
center_offset: [ -0.4, -0.1 ]
- type: rect
color: gray83
height: 0.8
width: 0.1
center_offset: [ -0.4, -0.3 ]
# Items
Tomato:
parts:
......@@ -237,11 +268,17 @@ Cook:
path: images/pixel_cook.png
size: 1
Plate:
CleanPlate:
parts:
- type: image
path: images/plate.png
size: 1
path: images/plate_clean.png
size: 0.8
DirtyPlate:
parts:
- type: image
path: images/plate_dirty.png
size: 0.8
Pot:
parts:
......
......@@ -2,6 +2,8 @@ import logging
import time
from threading import Thread
import numpy as np
from overcooked_simulator import ROOT_DIR
from overcooked_simulator.overcooked_environment import Environment, Action
......@@ -28,12 +30,17 @@ class Simulator(Thread):
layout_path,
frequency: int,
item_info_path=ROOT_DIR / "game_content" / "item_info.yaml",
seed: int = 8654321,
):
# TODO look at https://builtin.com/data-science/numpy-random-seed to change to other random
np.random.seed(seed)
self.finished: bool = False
self.step_frequency: int = frequency
self.preferred_sleep_time_ns: float = 1e9 / self.step_frequency
self.env: Environment = Environment(env_layout_path, layout_path, item_info_path)
self.env: Environment = Environment(
env_layout_path, layout_path, item_info_path
)
super().__init__()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment