-
Florian Schröder authored
Implemented multiple code adjustments across various Python modules, including renaming variables for clarity and improving data type definition. Added comprehensive docstrings to all functions and classes to enhance code readability and maintainability. Corrected a bug in debug parameter where it incorrectly employed args.debug, replacing it with args.do_study for correct implementation.
Florian Schröder authoredImplemented multiple code adjustments across various Python modules, including renaming variables for clarity and improving data type definition. Added comprehensive docstrings to all functions and classes to enhance code readability and maintainability. Corrected a bug in debug parameter where it incorrectly employed args.debug, replacing it with args.do_study for correct implementation.
counter_factory.py 21.76 KiB
"""
The `CounterFactory` initializes the `Counter` classes from the characters in the layout config.
The mapping depends on the definition in the `environment_config.yml` in the `layout_chars` section.
```yaml
layout_chars:
_: Free
hash: Counter
A: Agent
P: PlateDispenser
C: CuttingBoard
X: Trashcan
W: ServingWindow
S: Sink
+: SinkAddon
U: Pot # with Stove
Q: Pan # with Stove
O: Peel # with Oven
F: Basket # with DeepFryer
T: Tomato
N: Onion # oNioN
L: Lettuce
K: Potato # Kartoffel
I: Fish # fIIIsh
D: Dough
E: Cheese # chEEEse
G: Sausage # sausaGe
B: Bun
M: Meat
```
# Code Documentation
"""
import inspect
import sys
from random import Random
from typing import Any, Type, TypeVar, Tuple
import numpy as np
import numpy.typing as npt
from cooperative_cuisine.counters import (
Counter,
CookingCounter,
Dispenser,
ServingWindow,
CuttingBoard,
PlateDispenser,
Sink,
PlateConfig,
SinkAddon,
Trashcan,
)
from cooperative_cuisine.effects import EffectManager
from cooperative_cuisine.hooks import Hooks
from cooperative_cuisine.items import (
ItemInfo,
ItemType,
CookingEquipment,
Plate,
Item,
)
from cooperative_cuisine.orders import OrderManager
from cooperative_cuisine.utils import get_closest
T = TypeVar("T")
def convert_words_to_chars(layout_chars_config: dict[str, str]) -> dict[str, str]:
"""Converts words in a given layout chars configuration dictionary to their corresponding characters.
This is useful for characters that can not be keys in a yaml file. For example, `#` produces a comment.
Therefore, you can specify `hash` as a key (`hash: Counter`). `word_refs` defines the conversions. *Click on `▶ View Source`.*
Args:
layout_chars_config: A dictionary containing layout character configurations, where the keys are words
representing layout characters and the values are the corresponding character representations.
Returns:
A dictionary where the keys are the layout characters and the values are their corresponding words.
"""
word_refs = {
"hash": "#",
# "space": " ",
"dot": ".",
"comma": ",",
# "semicolon": ";",
"colon": ":",
"minus": "-",
"exclamation": "!",
"question": "?",
"dquote": '"',
"squote": "'",
"star": "*",
"ampersand": "&",
"equal": "=",
"right": ">",
"left": "<",
"pipe": "|",
"at": "@",
"wave": "~", # ~ is None / null in yaml
"ocurlybracket": "{",
"ccurlybracket": "}",
"osquarebracket": "[",
"csquarebracket": "]",
}
return {word_refs.get(c, c): name for c, name in layout_chars_config.items()}
class CounterFactory:
"""The `CounterFactory` class is responsible for creating counter objects based on the layout configuration and
item information provided. It also provides methods for mapping and filtering the item information.
"""
def __init__(
self,
layout_chars_config: dict[str, str],
item_info: dict[str, ItemInfo],
serving_window_additional_kwargs: dict[str, Any],
plate_config: PlateConfig,
order_manager: OrderManager,
effect_manager_config: dict,
undo_dispenser_pickup: bool,
hook: Hooks,
random: Random,
) -> None:
"""Constructor for the `CounterFactory` class. Set up the attributes necessary to instantiate the counters.
Initializes the object with the provided parameters. It performs the following tasks:
- Converts the layout character configuration from words to characters.
- Sets the item information dictionary.
- Sets the additional keyword arguments for serving window configuration.
- Sets the plate configuration.
Args:
layout_chars_config: A dictionary mapping layout characters to their corresponding names.
item_info: A dictionary containing information about different items.
serving_window_additional_kwargs: Additional keyword arguments for serving window configuration.
plate_config: The configuration for plate usage.
"""
self.layout_chars_config: dict[str, str] = convert_words_to_chars(
layout_chars_config
)
"""Layout chars to the counter names."""
self.item_info: dict[str, ItemInfo] = item_info
"""All item infos from the `item_info` config."""
self.serving_window_additional_kwargs: dict[
str, Any
] = serving_window_additional_kwargs
"""The additional keyword arguments for the serving window."""
self.plate_config: PlateConfig = plate_config
"""The plate config from the `environment_config`"""
self.order_manager: OrderManager = order_manager
"""The order and score manager to pass to `ServingWindow` and the `Tashcan` which can affect the scores."""
self.effect_manager_config: dict = effect_manager_config
"""The effect manager config to setup the effect manager based on the defined effects in the item info."""
self.no_counter_chars: set[str] = set(
c
for c, name in self.layout_chars_config.items()
if name in ["Agent", "Free"]
)
"""A set of characters that represent counters for agents or free spaces."""
self.counter_classes: dict[str, Type] = dict(
filter(
lambda k: issubclass(k[1], Counter),
inspect.getmembers(
sys.modules["cooperative_cuisine.counters"], inspect.isclass
),
)
)
"""A dictionary of counter classes imported from the 'cooperative_cuisine.counters' module."""
self.cooking_counter_equipments: dict[str, set[str]] = {
cooking_counter: {
equipment
for equipment, e_info in self.item_info.items()
if e_info.equipment and e_info.equipment.name == cooking_counter
}
for cooking_counter, info in self.item_info.items()
if info.type == ItemType.Equipment and info.equipment is None
}
"""A dictionary mapping cooking counters to the list of equipment items associated with them."""
self.undo_dispenser_pickup: bool = undo_dispenser_pickup
"""Put back ingredients of the same type on a dispenser."""
self.hook: Hooks = hook
"""Reference to the hook manager."""
self.random: Random = random
"""Random instance."""
def get_counter_object(self, c: str, pos: npt.NDArray[float]) -> Counter:
"""Create and returns a counter object based on the provided character and position."""
assert self.can_map(c), f"Can't map counter char {c}"
counter_class = None
# if c == "@":
# print("-")
if self.layout_chars_config[c] in self.item_info:
item_info = self.item_info[self.layout_chars_config[c]]
if item_info.type == ItemType.Equipment and item_info.equipment:
if item_info.equipment.name not in self.counter_classes:
return CookingCounter(
name=item_info.equipment.name,
equipments=self.cooking_counter_equipments[
item_info.equipment.name
],
pos=pos,
occupied_by=CookingEquipment(
name=item_info.name,
item_info=item_info,
transitions=self.filter_item_info(
by_equipment_name=item_info.name,
add_effects=True,
),
),
hook=self.hook,
)
elif item_info.type == ItemType.Ingredient:
return Dispenser(
pos=pos,
hook=self.hook,
dispensing=item_info,
undo_dispenser_pickup=self.undo_dispenser_pickup,
)
elif item_info.type == ItemType.Tool:
return Counter(
pos=pos,
hook=self.hook,
occupied_by=Item(name=item_info.name, item_info=item_info),
)
if counter_class is None:
if self.layout_chars_config[c] in self.counter_classes:
counter_class = self.counter_classes[self.layout_chars_config[c]]
elif self.layout_chars_config[c] == "Plate":
return Counter(
pos=pos,
hook=self.hook,
occupied_by=Plate(
transitions=self.filter_item_info(
by_item_type=ItemType.Meal, add_effects=True
),
clean=True,
item_info=self.item_info[Plate.__name__],
),
)
kwargs = {
"pos": pos,
"hook": self.hook,
}
if issubclass(counter_class, (CuttingBoard, Sink)):
kwargs["transitions"] = self.filter_item_info(
by_equipment_name=counter_class.__name__,
add_effects=True,
)
elif issubclass(counter_class, PlateDispenser):
kwargs.update(
{
"plate_transitions": self.filter_item_info(
by_item_type=ItemType.Meal, add_effects=True
),
"plate_config": self.plate_config,
"dispensing": self.item_info[Plate.__name__],
"random": self.random,
}
)
elif issubclass(counter_class, ServingWindow):
kwargs.update(self.serving_window_additional_kwargs)
if issubclass(counter_class, (ServingWindow, Trashcan)):
kwargs[
"order_manager"
] = self.order_manager # individual because for the later trash scorer
return counter_class(**kwargs)
def can_map(self, char) -> bool:
"""Check if the provided character can be mapped to a counter object."""
return char in self.layout_chars_config and (
not self.is_counter(char)
or self.layout_chars_config[char] in self.item_info
or self.layout_chars_config[char] in self.counter_classes
)
def is_counter(self, c: str) -> bool:
"""Checks if the provided character represents a counter."""
return c in self.layout_chars_config and c not in self.no_counter_chars
def map_not_counter(self, c: str) -> str:
"""Maps the provided character to a non-counter word based on the layout configuration."""
assert self.can_map(c) and not self.is_counter(
c
), "Cannot map char {c} as a 'not counter'"
return self.layout_chars_config[c]
def filter_item_info(
self,
by_item_type: ItemType = None,
by_equipment_name: str = None,
add_effects: bool = False,
) -> dict[str, ItemInfo]:
"""Filter the item info dict by item type or equipment name.
Args:
by_item_type: An optional parameter of ItemType enum type. If provided, filters the item info by item type.
by_equipment_name: An optional parameter of string type. If provided, filters the item info by equipment name.
add_effects: A boolean parameter indicating whether to include items with effects in the filtered result.
Returns:
A dictionary mapping item names (strings) to ItemInfo objects. The filtered item info based on the provided parameters.
"""
filtered = {}
if by_item_type is not None:
filtered = {
name: info
for name, info in self.item_info.items()
if info.type == by_item_type
}
if by_equipment_name is not None:
filtered = {
name: info
for name, info in self.item_info.items()
if info.equipment is not None
and info.equipment.name == by_equipment_name
}
if add_effects:
for name, effect in self.filter_item_info(
by_item_type=ItemType.Effect
).items():
for need in effect.needs:
if need in filtered:
filtered.update({name: effect})
if by_item_type or by_equipment_name:
return filtered
return self.item_info
def post_counter_setup(self, counters: list[Counter]):
"""Initialize the counters in the environment.
Connect the `ServingWindow`(s) with the `PlateDispenser`.
Find and connect the `SinkAddon`s with the `Sink`s
Args:
counters: list of counters to perform the post setup on.
"""
plate_dispenser = self.get_counter_of_type(PlateDispenser, counters)
assert len(plate_dispenser) > 0, "No Plate Dispenser in the environment"
sink_addons = self.get_counter_of_type(SinkAddon, counters)
for counter in counters:
match counter:
case ServingWindow():
counter: ServingWindow # Pycharm type checker does now work for match statements?
counter.add_plate_dispenser(plate_dispenser[0])
case Sink(pos=pos):
counter: Sink # Pycharm type checker does now work for match statements?
assert len(sink_addons) > 0, "No SinkAddon but normal Sink"
closest_addon = get_closest(pos, sink_addons)
assert 1.0 == np.linalg.norm(
closest_addon.pos - pos
), f"No SinkAddon connected to Sink at pos {pos}"
counter.set_addon(closest_addon)
def setup_effect_manger(self, counters: list[Counter]) -> dict[str, EffectManager]:
"""Setup effect manager for effects in the item info.
Args:
counters (list[Counter]): A list of Counter objects.
Returns:
dict[str, EffectManager]: A dictionary where the keys are manager names and the values are EffectManager objects.
Raises:
AssertionError: If the manager for an effect is not found in the effect_manager_config.
Example Usage:
counters = [counter1, counter2]
effect_manager = setup_effect_manger(counters)
This method sets up the effect managers for the given counters. It iterates over the effects obtained from
the filter_item_info() method using the by_item_type argument set to ItemType.Effect. For each effect,
it checks if the effect's manager is present in the self.effect_manager_config dictionary. If not,
it raises an AssertionError.
If the effect's manager is already present in the effect_manager dictionary, it assigns the manager to the
variable 'manager'. Otherwise, it creates a new instance of the effect manager class using the values from
self.effect_manager_config[effect.manager]["class"] and self.effect_manager_config[effect.manager]["kwargs"].
It then adds the effect to the manager and updates the effect's manager attribute with the assigned manager.
Finally, it returns the effect_manager dictionary containing the assigned effect managers.
Note: This method assumes the following variables and methods are available within the class:
- self.filter_item_info(by_item_type: ItemType) -> dict[str, Effect]: This method returns a dictionary of effects filtered by item type.
- self.effect_manager_config: A dictionary containing the configuration for the effect managers.
- self.hook: An object representing the hook.
- self.random: An object representing a random generator.
- Counter: A class representing a counter.
- EffectManager: A class representing an effect manager.
- ItemType: An enum representing different types of items.
"""
effect_manager = {}
for name, effect in self.filter_item_info(by_item_type=ItemType.Effect).items():
assert (
effect.manager in self.effect_manager_config
), f"Manager for effect not found: {name} -> {effect.manager} not in {list(self.effect_manager_config.keys())}"
if effect.manager in effect_manager:
manager = effect_manager[effect.manager]
else:
manager = self.effect_manager_config[effect.manager]["class"](
hook=self.hook,
random=self.random,
**self.effect_manager_config[effect.manager]["kwargs"],
)
effect_manager[effect.manager] = manager
manager.add_effect(effect)
effect.manager = manager
return effect_manager
@staticmethod
def get_counter_of_type(counter_type: Type[T], counters: list[Counter]) -> list[T]:
"""Filter all counters in the environment for a counter type."""
return list(filter(lambda counter: isinstance(counter, counter_type), counters))
def parse_layout_file(
self,
layout_config: str,
) -> Tuple[
list[Counter], list[npt.NDArray[float]], list[npt.NDArray[float]], int, int
]:
"""Creates layout of kitchen counters in the environment based on layout file.
Counters are arranged in a fixed size grid starting at [0,0]. The center of the first counter is at
[counter_size/2, counter_size/2], counters are directly next to each other (of no empty space is specified
in layout).
Args:
layout_config: the layout config string. Each character corresponds to a counter and each line to a row.
"""
starting_at: float = 0.0
current_y: float = starting_at
counters: list[Counter] = []
designated_player_positions: list[npt.NDArray] = []
free_positions: list[npt.NDArray] = []
lines = layout_config.split("\n")
grid = []
max_width = 0
lines = list(filter(lambda l: l != "", lines))
for line in lines:
line = line.replace(" ", "")
if not line or line.startswith(";"):
break
current_x: float = starting_at
grid_line = []
for character in line:
# character = character.capitalize()
pos = np.array([current_x, current_y])
assert self.can_map(
character
), f"{character=} in layout file can not be mapped"
if self.is_counter(character):
counters.append(self.get_counter_object(character, pos))
grid_line.append(1)
else:
grid_line.append(0)
match self.map_not_counter(character):
case "Agent":
designated_player_positions.append(pos)
case "Free":
free_positions.append(np.array([current_x, current_y]))
current_x += 1
if len(line) >= max_width:
max_width = len(line)
grid.append(grid_line)
current_y += 1
grid = [line + ([0] * (max_width - len(line))) for line in grid]
kitchen_width = int(max_width + starting_at)
kitchen_height = int(current_y)
determine_counter_orientations(
counters, grid, np.array([kitchen_width / 2, kitchen_height / 2])
)
self.post_counter_setup(counters)
return (
counters,
designated_player_positions,
free_positions,
kitchen_width,
kitchen_height,
)
def determine_counter_orientations(
counters: list[Counter], grid: list[list[int]], kitchen_center: npt.NDArray[float]
):
"""Determines the orientation of counters on a grid based on free positions around it.
Args:
counters: A list of Counter objects.
grid: The grid representing the kitchen layout.
kitchen_center: The coordinates of the kitchen center.
"""
grid = np.array(grid).T
grid_width = grid.shape[0]
grid_height = grid.shape[1]
last_counter = None
fst_counter_in_row = None
for c in counters:
grid_idx = np.floor(c.pos).astype(int)
neighbour_offsets = np.array([[0, 1], [0, -1], [1, 0], [-1, 0]], dtype=int)
neighbours_free = []
for offset in neighbour_offsets:
neighbour_pos = grid_idx + offset
if (
neighbour_pos[0] > (grid_width - 1)
or neighbour_pos[0] < 0
or neighbour_pos[1] > (grid_height - 1)
or neighbour_pos[1] < 0
):
pass
else:
if grid[neighbour_pos[0]][neighbour_pos[1]] == 0:
neighbours_free.append(offset)
if len(neighbours_free) > 0:
vector_to_center = c.pos - kitchen_center
vector_to_center /= np.linalg.norm(vector_to_center)
n_idx = np.argmin(
[np.linalg.norm(vector_to_center - n) for n in neighbours_free]
)
nearest_vec = neighbours_free[n_idx]
# print(nearest_vec, type(nearest_vec))
c.set_orientation(nearest_vec)
elif grid_idx[0] == 0:
if grid_idx[1] == 0 or fst_counter_in_row is None:
# counter top left
c.set_orientation(np.array([1, 0]))
else:
c.set_orientation(fst_counter_in_row.orientation)
fst_counter_in_row = c
else:
c.set_orientation(last_counter.orientation)
last_counter = c