Newer
Older
"""
The _Cooperative Cuisine_ environment. It is configured via three configs:
- [`environment_config.yml`](https://gitlab.ub.uni-bielefeld.de/scs/cocosy/cooperative-cuisine/-/blob/main/cooperative_cuisine/configs/environment_config.yaml?ref_type=heads)
- [`xyz.layout`](https://gitlab.ub.uni-bielefeld.de/scs/cocosy/cooperative-cuisine/-/blob/main/cooperative_cuisine/configs/layouts/basic.layout?ref_type=heads)
- [`item_info.yml`](https://gitlab.ub.uni-bielefeld.de/scs/cocosy/cooperative-cuisine/-/blob/main/cooperative_cuisine/configs/item_info.yaml?ref_type=heads)
You can pass either the file path or the file content (str) to the `Environment`class of the config files. Set the `as_files` parameter accordingly.
"""
from __future__ import annotations
import inspect
from collections import defaultdict
from datetime import timedelta, datetime
from random import Random
from typing import Literal, TypedDict, Callable, Set, Any

Fabian Heinrich
committed
import numpy.typing as npt
from cooperative_cuisine.action import ActionType, InterActionData, Action
from cooperative_cuisine.counter_factory import (
CounterFactory,
)
from cooperative_cuisine.counters import (

Florian Schröder
committed
PlateConfig,

Florian Schröder
committed
Counter,
)
from cooperative_cuisine.effects import EffectManager
from cooperative_cuisine.hooks import (
ITEM_INFO_LOADED,
LAYOUT_FILE_PARSED,
ENV_INITIALIZED,
PRE_PERFORM_ACTION,
POST_PERFORM_ACTION,
PLAYER_ADDED,
GAME_ENDED_STEP,
PRE_STATE,
STATE_DICT,
JSON_STATE,
PRE_RESET_ENV_TIME,
POST_RESET_ENV_TIME,
Hooks,
ACTION_ON_NOT_REACHABLE_COUNTER,
ACTION_PUT,
ACTION_INTERACT_START,
hooks_via_callback_class,
ADDITIONAL_STATE_UPDATE,
from cooperative_cuisine.items import (
ItemInfo,
ItemType,
)
from cooperative_cuisine.movement import Movement
from cooperative_cuisine.orders import (
from cooperative_cuisine.player import Player, PlayerConfig
from cooperative_cuisine.state_representation import InfoMsg
from cooperative_cuisine.utils import (
create_init_env_time,
get_closest,
)
from cooperative_cuisine.validation import Validation
"""The logger for this module."""
class EnvironmentConfig(TypedDict):
game: dict[
Literal["time_limit_seconds"] | Literal["undo_dispenser_pickup"],
"""Config about the generation of orders for the players to complete."""
player_config: PlayerConfig
layout_chars: dict[str, str]
"""Definition of which characters in the layout file correspond to which kitchen counter."""
hook_callbacks: dict[str, dict]
"""Configuration of callbacks via HookCallbackClass."""
effect_manager: dict[str, dict]
"""Config of different effects in the environment, which control for example fire behavior."""
class Environment:
"""Environment class which handles the game logic for the overcooked-inspired environment.
Handles player movement, collision-detection, counters, cooking processes, recipes, incoming orders, time.
"""

Florian Schröder
committed
# pdoc does not detect attributes in the tuple assignment in init
counters: list[Counter]
"""Counters of the environment."""
designated_player_positions: list[npt.NDArray[float]]
"""The positions new players can spawn based on the layout (`A` char)."""
free_positions: list[npt.NDArray[float]]
"""list of 2D points of free (no counters) positions in the environment."""
kitchen_height: int
"""Grid height of the kitchen."""
kitchen_width: int
"""Grid width of the kitchen."""

Christoph Kowalski
committed
self,
env_config: Path | str,
layout_config: Path | str,
item_info: Path | str,
as_files: bool = True,
env_name: str = "cooperative_cuisine_1",
seed: int = 56789223842348,
yaml_already_loaded: bool = False
"""Constructor of the Environment.
Args:
env_config: The path or string representation of the environment configuration file.
layout_config: The path or string representation of the layout configuration file.
item_info: The path or string representation of the item info file.
as_files: (optional) A flag indicating whether the configuration parameters are file paths.
If True, the method will read the contents of the files. Defaults to True.
env_name: (optional) The name of the environment. Defaults to "overcooked_sim".
seed: (optional) The seed for generating random numbers. Defaults to 56789223842348.
"""
self.env_name: str = env_name
"""Reference to the run. E.g, the env id."""
self.env_time: datetime = create_init_env_time()
"""the internal time of the environment. An environment starts always with the time from
`create_init_env_time`."""
self.random: Random = Random(seed)
"""Random instance."""
self.hook: Hooks = Hooks(self)
"""Hook manager. Register callbacks and create hook points with additional kwargs."""
self.score: float = 0.0
"""The current score of the environment."""
self.players: dict[str, Player] = {}
"""the player, keyed by their id/name."""

Fabian Heinrich
committed

Florian Schröder
committed
self.as_files: bool = as_files

Florian Schröder
committed
"""Are the configs just the path to the files."""
if self.as_files:
with open(env_config, "r") as file:
env_config = file.read()
with open(layout_config, "r") as layout_file:
layout_config = layout_file.read()
with open(item_info, "r") as file:
item_info = file.read()
if not yaml_already_loaded:
self.environment_config: EnvironmentConfig = yaml.load(
env_config, Loader=yaml.Loader
)
else:
self.environment_config = env_config
"""The config of the environment. All environment specific attributes is configured here."""

Florian Schröder
committed
self.environment_config["player_config"] = PlayerConfig(
**(
self.environment_config["player_config"]
if "player_config" in self.environment_config
else {}
)
)
self.player_view_restricted: bool = self.environment_config[
"player_config"
].restricted_view
"""If field-of-view of players is restricted in this environment."""
if self.player_view_restricted:

Florian Schröder
committed
self.player_view_angle: float = self.environment_config[
"player_config"
].view_angle
self.player_view_range: float = self.environment_config[
"player_config"
].view_range
self.hook_callbacks()

Florian Schröder
committed
self.layout_config: str = layout_config

Florian Schröder
committed
"""The layout config for the environment"""

Fabian Heinrich
committed
# self.counter_side_length = 1 # -> this changed! is 1 now

Fabian Heinrich
committed
self.item_info: dict[str, ItemInfo] = self.load_item_info(item_info)
"""The loaded item info dict. Keys are the item names."""
self.hook(
ITEM_INFO_LOADED, item_info=item_info, parsed_item_info=self.item_info
)
if self.environment_config["orders"]["meals"]["all"]:

Florian Schröder
committed
self.allowed_meal_names: Set[str] = set(

Florian Schröder
committed
[
item
for item, info in self.item_info.items()
if info.type == ItemType.Meal
]
)
else:

Florian Schröder
committed
self.allowed_meal_names: Set[str] = set(
self.environment_config["orders"]["meals"]["list"]
)
"""The allowed meals depend on the `environment_config.yml` configured behaviour. Either all meals that
are possible or only a limited subset."""

Florian Schröder
committed
self.order_manager: OrderManager = OrderManager(
order_config=self.environment_config["orders"],
random=Random(seed),
"""The manager for the orders and score update."""

Fabian Heinrich
committed
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
do_validation = (
self.environment_config["game"]["validate_recipes"]
if "validate_recipes" in self.environment_config["game"].keys()
else True
)
self.recipe_validation: Validation = Validation(
meals=[m for m in self.item_info.values() if m.type == ItemType.Meal]
if self.environment_config["orders"]["meals"]["all"]
else [
self.item_info[m]
for m in self.environment_config["orders"]["meals"]["list"]
if self.item_info[m].type == ItemType.Meal
],
item_info=self.item_info,
order_manager=self.order_manager,
do_validation=do_validation,
)
"""Validates configs and creates recipe graphs."""
plate_config = PlateConfig(
**(
self.environment_config["plates"]
if "plates" in self.environment_config
else {}
)
)
self.recipe_validation.update_plate_config(plate_config, self.environment_config["layout_chars"],
self.layout_config)

Florian Schröder
committed
self.counter_factory: CounterFactory = CounterFactory(
layout_chars_config=self.environment_config["layout_chars"],
item_info=self.item_info,
serving_window_additional_kwargs={
"meals": self.allowed_meal_names,
"env_time_func": self.get_env_time,
},
plate_config=plate_config,
order_manager=self.order_manager,
effect_manager_config=self.environment_config["effect_manager"],
undo_dispenser_pickup=self.environment_config["game"][
"undo_dispenser_pickup"
]
if "game" in self.environment_config

Christoph Kowalski
committed
and "undo_dispenser_pickup" in self.environment_config["game"]
else False,
random=self.random,
"""Handles the creation of counters based on their config."""

Fabian Heinrich
committed
(
self.counters,
self.designated_player_positions,
self.free_positions,
self.kitchen_width,
self.kitchen_height,
) = self.counter_factory.parse_layout_file(self.layout_config)
self.hook(LAYOUT_FILE_PARSED)

Fabian Heinrich
committed

Florian Schröder
committed
self.movement: Movement = Movement(
counter_positions=np.array([c.pos for c in self.counters]),
player_config=self.environment_config["player_config"],
world_borders=np.array(
[[-0.5, self.kitchen_width - 0.5], [-0.5, self.kitchen_height - 0.5]],
dtype=float,
),
hook=self.hook,
"""Does the movement of players in each step."""

Florian Schröder
committed
self.progressing_counters: list[Counter] = []
"""Counters that needs to be called in the step function via the `progress` method."""
self.effect_manager: dict[
str, EffectManager
] = self.counter_factory.setup_effect_manger()
"""Dict of effect managers. Currently only the fire effect manager."""
self.overwrite_counters(self.counters)
meals_to_be_ordered = self.recipe_validation.validate_environment(self.counters)
assert meals_to_be_ordered, "Need possible meals for order generation."
available_meals = {meal: self.item_info[meal] for meal in meals_to_be_ordered}
self.order_manager.set_available_meals(available_meals)

Florian Schröder
committed
self.start_time: datetime = self.env_time
"""The relative env time when it started."""
self.env_time_end: datetime = self.env_time + timedelta(
seconds=self.environment_config["game"]["time_limit_seconds"]

Fabian Heinrich
committed
)
"""The relative env time when it will stop/end"""

Fabian Heinrich
committed
log.debug(f"End time: {self.env_time_end}")
self.info_msgs_per_player: dict[str, list[InfoMsg]] = defaultdict(list)
"""Cache of info messages per player which should be showed in the visualization of each player."""
self.additional_state_content: dict[str, Any] = {}
"""The environment will extend the content of each state with this dictionary. Adapt it with the setter
function."""
self.hook(
ENV_INITIALIZED,
environment_config=env_config,
layout_config=self.layout_config,
env_start_time_worldtime=datetime.now(),

Florian Schröder
committed
@property
def game_ended(self) -> bool:
"""Whether the game is over or not based on the calculated `Environment.env_time_end`"""
return self.env_time >= self.env_time_end
def overwrite_counters(self, counters):
"""Resets counters.
Args:
counters: A list of counter objects representing the counters in the system.
This method takes a list of counter objects representing the counters in the system and updates the counters
attribute of the current object to the provided list. It also updates the counter_positions attribute of the
movement object with the positions of the counters.
Additionally, it assigns the counter classes with a "progress" attribute to the variable
progress_counter_classes. It does this by filtering the classes in the cooperative_cuisine.counters module
using the inspect module to only keep the classes that have a "progress" attribute.
Next, it filters the counters based on whether their class is in the progress_counter_classes list and
assigns the filtered counters to the progressing_counters attribute of the current object.
Finally, it sets the counters for each effect manager in the effect_manager dictionary to the provided counters.
"""
self.counters = counters
self.movement.counter_positions = np.array([c.pos for c in self.counters])
progress_counter_classes = list(
filter(
lambda cl: hasattr(cl, "progress"),
dict(
inspect.getmembers(
sys.modules["cooperative_cuisine.counters"], inspect.isclass
)
).values(),
)
)
self.progressing_counters = list(
filter(
lambda c: c.__class__ in progress_counter_classes,
self.counters,
)
)
for manager in self.effect_manager.values():
manager.set_counters(counters)

Florian Schröder
committed
def get_env_time(self) -> datetime:

Florian Schröder
committed
"""the internal time of the environment. An environment starts always with the time from `create_init_env_time`.
Utility method to pass a reference to the serving window."""
def load_item_info(self, item_info: str | dict[str, ItemInfo] ) -> dict[str, ItemInfo]:

Florian Schröder
committed
"""Load `item_info.yml`, create ItemInfo classes and replace equipment strings with item infos."""
self.hook(ITEM_INFO_CONFIG, item_info_config=item_info)
if isinstance(item_info, str):
item_info = yaml.safe_load(item_info)
for item_name in item_info:
item_info[item_name] = ItemInfo(name=item_name, **item_info[item_name])
for item_name, single_item_info in item_info.items():
if single_item_info.equipment:
single_item_info.equipment = item_info[single_item_info.equipment]
return item_info
def perform_action(self, action: Action):
"""Performs an action of a player in the environment. Maps different types of action inputs to the
correct execution of the players.
Possible action types are movement, pickup and interact actions.
Args:
action: The action to be performed
"""
assert action.player in self.players.keys(), "Unknown player."
self.hook(PRE_PERFORM_ACTION, action=action)
player = self.players[action.player]

Fabian Heinrich
committed

Florian Schröder
committed
if action.action_type == ActionType.MOVEMENT:
player.set_movement(
action.action_data,
self.env_time + timedelta(seconds=action.duration),

Fabian Heinrich
committed
else:
counter = get_closest(player.facing_point, self.counters)
if action.action_type == ActionType.PICK_UP_DROP:

Fabian Heinrich
committed
self.hook(ACTION_PUT, action=action, counter=counter, player=action.player)

Florian Schröder
committed
elif action.action_type == ActionType.INTERACT:
if action.action_data == InterActionData.START:
player.perform_interact_start(counter)

Fabian Heinrich
committed
self.hook(ACTION_INTERACT_START, action=action, counter=counter, player=action.player)

Fabian Heinrich
committed
ACTION_ON_NOT_REACHABLE_COUNTER, action=action, counter=counter, player=action.player

Florian Schröder
committed
if action.action_data == InterActionData.STOP:
player.perform_interact_stop()

Fabian Heinrich
committed
self.hook(POST_PERFORM_ACTION, action=action)

Fabian Heinrich
committed
def add_player(self, player_name: str, pos: npt.NDArray = None):

Florian Schröder
committed
"""Add a player to the environment.
Args:
player_name: The id/name of the player to reference actions and in the state.
pos: The optional init position of the player.

Florian Schröder
committed
Raises:
ValueError: if the player_name already exists.

Florian Schröder
committed
"""
if player_name in self.players:
raise ValueError(f"Player {player_name} already exists.")
player = Player(

Florian Schröder
committed
player_name,

Florian Schröder
committed
player_config=self.environment_config["player_config"],
hook=self.hook,

Florian Schröder
committed
pos=pos,

Fabian Heinrich
committed
self.players[player.name] = player
if player.pos is None:
if len(self.designated_player_positions) > 0:
free_idx = self.random.randint(
0, len(self.designated_player_positions) - 1
)

Fabian Heinrich
committed
player.move_abs(self.designated_player_positions[free_idx])
del self.designated_player_positions[free_idx]
elif len(self.free_positions) > 0:
free_idx = self.random.randint(0, len(self.free_positions) - 1)

Fabian Heinrich
committed
player.move_abs(self.free_positions[free_idx])
del self.free_positions[free_idx]
else:
player.update_facing_point()

Fabian Heinrich
committed
self.movement.set_collision_arrays(len(self.players))
self.hook(PLAYER_ADDED, player=player, pos=pos)

Florian Schröder
committed
def step(self, passed_time: timedelta):
"""Performs a step of the environment. Affects time based events such as cooking or cutting things, orders
"""
# self.hook(PRE_STEP, passed_time=passed_time)

Florian Schröder
committed
self.env_time += passed_time

Fabian Heinrich
committed

Fabian Heinrich
committed
self.hook(GAME_ENDED_STEP, served_meals=self.order_manager.served_meals)
for player in self.players.values():
player.progress(passed_time, self.env_time)
self.movement.perform_movement(
passed_time, self.env_time, self.players, self.counters
)
for counter in self.progressing_counters:
counter.progress(passed_time=passed_time, now=self.env_time)
self.order_manager.progress(passed_time=passed_time, now=self.env_time)
for effect_manager in self.effect_manager.values():
effect_manager.progress(passed_time=passed_time, now=self.env_time)
self.hook(POST_STEP, passed_time=passed_time)
def get_state(self, player_id: str = None) -> dict:
"""Get the current state of the game environment. The state here is accessible by the current python objects.

Fabian Heinrich
committed
Args:
player_id: The player for which to get the state.
Returns:
The state of the game as a dict.

Fabian Heinrich
committed
"""
if player_id in self.players:
self.hook(PRE_STATE, player_id=player_id)
state = {
"players": [p.to_dict() for p in self.players.values()],
"counters": [c.to_dict() for c in self.counters],
"kitchen": {"width": self.kitchen_width, "height": self.kitchen_height},
"orders": self.order_manager.order_state(self.env_time),
"ended": self.game_ended,
"env_time": self.env_time.isoformat(),
"remaining_time": max(
self.env_time_end - self.env_time, timedelta(0)
).total_seconds(),
"view_restrictions": [
{
"direction": player.facing_direction.tolist(),
"position": player.pos.tolist(),
"angle": self.player_view_angle,
"counter_mask": None,
"range": self.player_view_range,
}
for player in self.players.values()
]
if self.player_view_restricted
(player, str(meal))
for (meal, time, player) in self.order_manager.served_meals
"info_msg": [
(msg["msg"], msg["level"])
for msg in self.info_msgs_per_player[player_id]

Florian Schröder
committed
if msg["start_time"] < self.env_time < msg["end_time"]
**self.additional_state_content,
}
self.hook(STATE_DICT, state=state, player_id=player_id)
raise ValueError(f"No valid {player_id=}")

Florian Schröder
committed
def get_json_state(self, player_id: str = None) -> str:
"""Return the current state of the game formatted in json dict.
Args:
player_id: The player for which to get the state.
Returns:
The state of the game formatted as a json-string
state = self.get_state(player_id)
json_data = json.dumps(state)
self.hook(JSON_STATE, json_data=json_data, player_id=player_id)
# assert StateRepresentation.model_validate_json(json_data=json_data)

Florian Schröder
committed
def reset_env_time(self):

Florian Schröder
committed
"""Reset the env time to the initial time, defined by `create_init_env_time`."""
self.hook(PRE_RESET_ENV_TIME, env_time=self.env_time)

Florian Schröder
committed
self.env_time = create_init_env_time()
self.hook(POST_RESET_ENV_TIME, env_time=self.env_time)

Florian Schröder
committed
log.debug(f"Reset env time to {self.env_time}")
def register_callback_for_hook(self, hook_ref: str | list[str], callback: Callable):
"""Registers a callback function for a given hook reference.
Args:
hook_ref (str | list[str]): The reference to the hook or hooks for which the callback should be registered.
It can be a single string or a list of strings.
callback (Callable): The callback function to be registered for the specified hook(s).
The function should accept the necessary parameters and perform the desired actions.
self.hook.register_callback(hook_ref, callback)
def hook_callbacks(self):
"""Executes extra setup functions specified in the environment configuration."""
if self.environment_config["hook_callbacks"]:
for callback_name, setup_kwargs in self.environment_config[
"hook_callbacks"
#log.info(f"Setup hook callback {callback_name}")
hooks_via_callback_class(name=callback_name, env=self, **setup_kwargs)
def increment_score(self, score: int | float, info: str = ""):
"""Add a value to the current score and log it."""
self.score += score
self.hook(SCORE_CHANGED, score=self.score, increase=score, info=info)
log.debug(f"Score: {self.score} ({score}) - {info}")
def update_additional_state_content(self, **kwargs):
"""
Update the additional state content with the given key-value pairs.
Args:
**kwargs: The key-value pairs to update the additional state content with.
"""
self.hook(ADDITIONAL_STATE_UPDATE, update=kwargs)
self.additional_state_content.update(kwargs)