""" The `CounterFactory` initializes the `Counter` classes from the characters in the layout config. The mapping depends on the definition in the `environment_config.yml` in the `layout_chars` section. ```yaml layout_chars: _: Free hash: Counter A: Agent P: PlateDispenser C: CuttingBoard X: Trashcan W: ServingWindow S: Sink +: SinkAddon U: Pot # with Stove Q: Pan # with Stove O: Peel # with Oven F: Basket # with DeepFryer T: Tomato N: Onion # oNioN L: Lettuce K: Potato # Kartoffel I: Fish # fIIIsh D: Dough E: Cheese # chEEEse G: Sausage # sausaGe B: Bun M: Meat ``` # Code Documentation """ import inspect import sys from random import Random from typing import Any, Type, TypeVar, Tuple import numpy as np import numpy.typing as npt from cooperative_cuisine.counters import ( Counter, CookingCounter, Dispenser, ServingWindow, CuttingBoard, PlateDispenser, Sink, PlateConfig, SinkAddon, Trashcan, ) from cooperative_cuisine.effects import EffectManager from cooperative_cuisine.hooks import Hooks from cooperative_cuisine.items import ( ItemInfo, ItemType, CookingEquipment, Plate, Item, ) from cooperative_cuisine.orders import OrderManager from cooperative_cuisine.utils import get_closest T = TypeVar("T") def convert_words_to_chars(layout_chars_config: dict[str, str]) -> dict[str, str]: """Converts words in a given layout chars configuration dictionary to their corresponding characters. This is useful for characters that can not be keys in a yaml file. For example, `#` produces a comment. Therefore, you can specify `hash` as a key (`hash: Counter`). `word_refs` defines the conversions. *Click on `▶ View Source`.* Args: layout_chars_config: A dictionary containing layout character configurations, where the keys are words representing layout characters and the values are the corresponding character representations. Returns: A dictionary where the keys are the layout characters and the values are their corresponding words. """ word_refs = { "hash": "#", # "space": " ", "dot": ".", "comma": ",", # "semicolon": ";", "colon": ":", "minus": "-", "exclamation": "!", "question": "?", "dquote": '"', "squote": "'", "star": "*", "ampersand": "&", "equal": "=", "right": ">", "left": "<", "pipe": "|", "at": "@", "wave": "~", # ~ is None / null in yaml "ocurlybracket": "{", "ccurlybracket": "}", "osquarebracket": "[", "csquarebracket": "]", } return {word_refs.get(c, c): name for c, name in layout_chars_config.items()} class CounterFactory: """The `CounterFactory` class is responsible for creating counter objects based on the layout configuration and item information provided. It also provides methods for mapping and filtering the item information. """ def __init__( self, layout_chars_config: dict[str, str], item_info: dict[str, ItemInfo], serving_window_additional_kwargs: dict[str, Any], plate_config: PlateConfig, order_manager: OrderManager, effect_manager_config: dict, undo_dispenser_pickup: bool, hook: Hooks, random: Random, ) -> None: """Constructor for the `CounterFactory` class. Set up the attributes necessary to instantiate the counters. Initializes the object with the provided parameters. It performs the following tasks: - Converts the layout character configuration from words to characters. - Sets the item information dictionary. - Sets the additional keyword arguments for serving window configuration. - Sets the plate configuration. Args: layout_chars_config: A dictionary mapping layout characters to their corresponding names. item_info: A dictionary containing information about different items. serving_window_additional_kwargs: Additional keyword arguments for serving window configuration. plate_config: The configuration for plate usage. """ self.layout_chars_config: dict[str, str] = convert_words_to_chars( layout_chars_config ) """Layout chars to the counter names.""" self.item_info: dict[str, ItemInfo] = item_info """All item infos from the `item_info` config.""" self.serving_window_additional_kwargs: dict[ str, Any ] = serving_window_additional_kwargs """The additional keyword arguments for the serving window.""" self.plate_config: PlateConfig = plate_config """The plate config from the `environment_config`""" self.order_manager: OrderManager = order_manager """The order and score manager to pass to `ServingWindow` and the `Tashcan` which can affect the scores.""" self.effect_manager_config: dict = effect_manager_config """The effect manager config to setup the effect manager based on the defined effects in the item info.""" self.no_counter_chars: set[str] = set( c for c, name in self.layout_chars_config.items() if name in ["Agent", "Free"] ) """A set of characters that represent counters for agents or free spaces.""" self.counter_classes: dict[str, Type] = dict( filter( lambda k: issubclass(k[1], Counter), inspect.getmembers( sys.modules["cooperative_cuisine.counters"], inspect.isclass ), ) ) """A dictionary of counter classes imported from the 'cooperative_cuisine.counters' module.""" self.cooking_counter_equipments: dict[str, set[str]] = { cooking_counter: { equipment for equipment, e_info in self.item_info.items() if e_info.equipment and e_info.equipment.name == cooking_counter } for cooking_counter, info in self.item_info.items() if info.type == ItemType.Equipment and info.equipment is None } """A dictionary mapping cooking counters to the list of equipment items associated with them.""" self.undo_dispenser_pickup: bool = undo_dispenser_pickup """Put back ingredients of the same type on a dispenser.""" self.hook: Hooks = hook """Reference to the hook manager.""" self.random: Random = random """Random instance.""" def get_counter_object(self, c: str, pos: npt.NDArray[float]) -> Counter: """Create and returns a counter object based on the provided character and position.""" assert self.can_map(c), f"Can't map counter char {c}" counter_class = None if self.layout_chars_config[c] in self.item_info: item_info = self.item_info[self.layout_chars_config[c]] if item_info.type == ItemType.Equipment and item_info.equipment: if item_info.equipment.name not in self.counter_classes: return CookingCounter( name=item_info.equipment.name, equipments=self.cooking_counter_equipments[ item_info.equipment.name ], pos=pos, occupied_by=CookingEquipment( name=item_info.name, item_info=item_info, transitions=self.filter_item_info( by_equipment_name=item_info.name, add_effects=True, ), hook=self.hook, ), hook=self.hook, ) elif item_info.type == ItemType.Ingredient: return Dispenser( pos=pos, hook=self.hook, dispensing=item_info, undo_dispenser_pickup=self.undo_dispenser_pickup, ) elif item_info.type == ItemType.Tool: return Counter( pos=pos, hook=self.hook, occupied_by=Item(name=item_info.name, item_info=item_info), ) if counter_class is None: if self.layout_chars_config[c] in self.counter_classes: counter_class = self.counter_classes[self.layout_chars_config[c]] elif self.layout_chars_config[c] == "Plate": return Counter( pos=pos, hook=self.hook, occupied_by=Plate( transitions=self.filter_item_info( by_item_type=ItemType.Meal, add_effects=True ), clean=True, item_info=self.item_info[Plate.__name__], hook=self.hook, ), ) kwargs = { "pos": pos, "hook": self.hook, } if issubclass(counter_class, (CuttingBoard, Sink)): kwargs["transitions"] = self.filter_item_info( by_equipment_name=counter_class.__name__, add_effects=True, ) elif issubclass(counter_class, PlateDispenser): kwargs.update( { "plate_transitions": self.filter_item_info( by_item_type=ItemType.Meal, add_effects=True ), "plate_config": self.plate_config, "dispensing": self.item_info[Plate.__name__], "random": self.random, } ) elif issubclass(counter_class, ServingWindow): kwargs.update(self.serving_window_additional_kwargs) if issubclass(counter_class, (ServingWindow, Trashcan)): kwargs[ "order_manager" ] = self.order_manager # individual because for the later trash scorer return counter_class(**kwargs) def can_map(self, char) -> bool: """Check if the provided character can be mapped to a counter object.""" return char in self.layout_chars_config and ( not self.is_counter(char) or self.layout_chars_config[char] in self.item_info or self.layout_chars_config[char] in self.counter_classes ) def is_counter(self, c: str) -> bool: """Checks if the provided character represents a counter.""" return c in self.layout_chars_config and c not in self.no_counter_chars def map_not_counter(self, c: str) -> str: """Maps the provided character to a non-counter word based on the layout configuration.""" assert self.can_map(c) and not self.is_counter( c ), "Cannot map char {c} as a 'not counter'" return self.layout_chars_config[c] def filter_item_info( self, by_item_type: ItemType = None, by_equipment_name: str = None, add_effects: bool = False, ) -> dict[str, ItemInfo]: """Filter the item info dict by item type or equipment name. Args: by_item_type: An optional parameter of ItemType enum type. If provided, filters the item info by item type. by_equipment_name: An optional parameter of string type. If provided, filters the item info by equipment name. add_effects: A boolean parameter indicating whether to include items with effects in the filtered result. Returns: A dictionary mapping item names (strings) to ItemInfo objects. The filtered item info based on the provided parameters. """ filtered = {} if by_item_type is not None: filtered = { name: info for name, info in self.item_info.items() if info.type == by_item_type } if by_equipment_name is not None: filtered = { name: info for name, info in self.item_info.items() if info.equipment is not None and info.equipment.name == by_equipment_name } if add_effects: for name, effect in self.filter_item_info( by_item_type=ItemType.Effect ).items(): for need in effect.needs: if need in filtered: filtered.update({name: effect}) if by_item_type or by_equipment_name: return filtered return self.item_info def post_counter_setup(self, counters: list[Counter]): """Initialize the counters in the environment. Connect the `ServingWindow`(s) with the `PlateDispenser`. Find and connect the `SinkAddon`s with the `Sink`s Args: counters: list of counters to perform the post setup on. """ plate_dispenser = self.get_counter_of_type(PlateDispenser, counters) assert len(plate_dispenser) > 0, "No Plate Dispenser in the environment" sink_addons = self.get_counter_of_type(SinkAddon, counters) for counter in counters: match counter: case ServingWindow(): counter: ServingWindow # Pycharm type checker does now work for match statements? counter.add_plate_dispenser(plate_dispenser[0]) case Sink(pos=pos): counter: Sink # Pycharm type checker does now work for match statements? assert len(sink_addons) > 0, "No SinkAddon but normal Sink" closest_addon = get_closest(pos, sink_addons) assert 1.0 == np.linalg.norm( closest_addon.pos - pos ), f"No SinkAddon connected to Sink at pos {pos}" counter.set_addon(closest_addon) def setup_effect_manger(self) -> dict[str, EffectManager]: """Setup effect manager for effects in the item info. Returns: dict[str, EffectManager]: A dictionary where the keys are manager names and the values are EffectManager objects. Raises: AssertionError: If the manager for an effect is not found in the effect_manager_config. Example Usage: counters = [counter1, counter2] effect_manager = setup_effect_manger(counters) This method sets up the effect managers for the given counters. It iterates over the effects obtained from the filter_item_info() method using the by_item_type argument set to ItemType.Effect. For each effect, it checks if the effect's manager is present in the self.effect_manager_config dictionary. If not, it raises an AssertionError. If the effect's manager is already present in the effect_manager dictionary, it assigns the manager to the variable 'manager'. Otherwise, it creates a new instance of the effect manager class using the values from self.effect_manager_config[effect.manager]["class"] and self.effect_manager_config[effect.manager]["kwargs"]. It then adds the effect to the manager and updates the effect's manager attribute with the assigned manager. Finally, it returns the effect_manager dictionary containing the assigned effect managers. Note: This method assumes the following variables and methods are available within the class: - self.filter_item_info(by_item_type: ItemType) -> dict[str, Effect]: This method returns a dictionary of effects filtered by item type. - self.effect_manager_config: A dictionary containing the configuration for the effect managers. - self.hook: An object representing the hook. - self.random: An object representing a random generator. - Counter: A class representing a counter. - EffectManager: A class representing an effect manager. - ItemType: An enum representing different types of items. """ effect_manager = {} for name, effect in self.filter_item_info(by_item_type=ItemType.Effect).items(): assert ( effect.manager in self.effect_manager_config ), f"Manager for effect not found: {name} -> {effect.manager} not in {list(self.effect_manager_config.keys())}" if effect.manager in effect_manager: manager = effect_manager[effect.manager] else: manager = self.effect_manager_config[effect.manager]["class"]( hook=self.hook, random=self.random, **self.effect_manager_config[effect.manager]["kwargs"], ) effect_manager[effect.manager] = manager manager.add_effect(effect) effect.manager = manager return effect_manager @staticmethod def get_counter_of_type(counter_type: Type[T], counters: list[Counter]) -> list[T]: """Filter all counters in the environment for a counter type.""" return list(filter(lambda counter: isinstance(counter, counter_type), counters)) def parse_layout_file( self, layout_config: str, ) -> Tuple[ list[Counter], list[npt.NDArray[float]], list[npt.NDArray[float]], int, int ]: """Creates layout of kitchen counters in the environment based on layout file. Counters are arranged in a fixed size grid starting at [0,0]. The center of the first counter is at [counter_size/2, counter_size/2], counters are directly next to each other (of no empty space is specified in layout). Args: layout_config: the layout config string. Each character corresponds to a counter and each line to a row. """ starting_at: float = 0.0 current_y: float = starting_at counters: list[Counter] = [] designated_player_positions: list[npt.NDArray] = [] free_positions: list[npt.NDArray] = [] lines = layout_config.split("\n") grid = [] max_width = 0 for line in lines: line = line.strip() if not line or line.startswith(";"): break current_x: float = starting_at grid_line = [] for character in line: # character = character.capitalize() pos = np.array([current_x, current_y]) assert self.can_map( character ), f"{character=} in layout file can not be mapped" if self.is_counter(character): counters.append(self.get_counter_object(character, pos)) grid_line.append(1) else: grid_line.append(0) match self.map_not_counter(character): case "Agent": designated_player_positions.append(pos) case "Free": free_positions.append(np.array([current_x, current_y])) current_x += 1 if len(line) >= max_width: max_width = len(line) grid.append(grid_line) current_y += 1 grid = [line + ([0] * (max_width - len(line))) for line in grid] kitchen_width = int(max_width + starting_at) kitchen_height = int(current_y) determine_counter_orientations( counters, grid, np.array([kitchen_width / 2, kitchen_height / 2]) ) self.post_counter_setup(counters) return ( counters, designated_player_positions, free_positions, kitchen_width, kitchen_height, ) def determine_counter_orientations( counters: list[Counter], grid: list[list[int]], kitchen_center: npt.NDArray[float] ): """Determines the orientation of counters on a grid based on free positions around it. Args: counters: A list of Counter objects. grid: The grid representing the kitchen layout. kitchen_center: The coordinates of the kitchen center. """ grid = np.array(grid).T grid_width = grid.shape[0] grid_height = grid.shape[1] last_counter = None fst_counter_in_row = None for c in counters: grid_idx = np.floor(c.pos).astype(int) neighbour_offsets = np.array([[0, 1], [0, -1], [1, 0], [-1, 0]], dtype=int) neighbours_free = [] for offset in neighbour_offsets: neighbour_pos = grid_idx + offset if ( neighbour_pos[0] > (grid_width - 1) or neighbour_pos[0] < 0 or neighbour_pos[1] > (grid_height - 1) or neighbour_pos[1] < 0 ): pass else: if grid[neighbour_pos[0]][neighbour_pos[1]] == 0: neighbours_free.append(offset) if len(neighbours_free) > 0: vector_to_center = c.pos - kitchen_center vector_to_center /= np.linalg.norm(vector_to_center) n_idx = np.argmin( [np.linalg.norm(vector_to_center - n) for n in neighbours_free] ) nearest_vec = neighbours_free[n_idx] c.set_orientation(nearest_vec) elif grid_idx[0] == 0: if grid_idx[1] == 0 or fst_counter_in_row is None: # counter top left c.set_orientation(np.array([1, 0])) else: c.set_orientation(fst_counter_in_row.orientation) fst_counter_in_row = c else: c.set_orientation(last_counter.orientation) last_counter = c