diff --git a/cooperative_cuisine/reinforcement_learning/config/additional_configs/additional_config_base.yaml b/cooperative_cuisine/reinforcement_learning/config/additional_configs/additional_config_base.yaml index 4ba0f49b5cb740b6bb6f1ba3d36fa1518950ef6b..5920176eb35e5249725a055a5e8e7cd93c9b5068 100644 --- a/cooperative_cuisine/reinforcement_learning/config/additional_configs/additional_config_base.yaml +++ b/cooperative_cuisine/reinforcement_learning/config/additional_configs/additional_config_base.yaml @@ -1,6 +1,6 @@ # Here the filename of the converter should be given. The converter class needs to be called StateConverter and implement the abstract StateToObservationConverter class state_converter: - _target_: "cooperative_cuisine.reinforcement_learning.obs_converter.base_converter.BaseStateConverter" + _target_: "cooperative_cuisine.reinforcement_learning.obs_converter.advanced_converter_int.AdvancedStateConverterInt" log_path: "logs/reinforcement_learning" checkpoint_path: "rl_agent_checkpoints" render_mode: "rgb_array" diff --git a/cooperative_cuisine/reinforcement_learning/gym_env.py b/cooperative_cuisine/reinforcement_learning/gym_env.py index 96842152ad3f1c02a04219af8984c5b716c59c71..b255ac1f36ecba5d39be3e75463f6587df9f9dc6 100644 --- a/cooperative_cuisine/reinforcement_learning/gym_env.py +++ b/cooperative_cuisine/reinforcement_learning/gym_env.py @@ -130,7 +130,7 @@ class StateToObservationConverter: """ @abstractmethod - def setup(self, env): + def setup(self, env, item_info): ... @abstractmethod @@ -197,7 +197,8 @@ class EnvGymWrapper(Env): self.seen_items = [] self.converter = instantiate(config.additional_configs.state_converter) - self.converter.setup(self.env) + # self.converter.setup could also get the item info config in order to get all the possible items. + self.converter.setup(self.env, self.config_item_info) if hasattr(self.converter, "onehot"): self.onehot_state = self.converter.onehot else: diff --git a/cooperative_cuisine/reinforcement_learning/obs_converter/advanced_converter_array.py b/cooperative_cuisine/reinforcement_learning/obs_converter/advanced_converter_array.py new file mode 100644 index 0000000000000000000000000000000000000000..f3702bebb126d575f90e841cdbe1d506f998f495 --- /dev/null +++ b/cooperative_cuisine/reinforcement_learning/obs_converter/advanced_converter_array.py @@ -0,0 +1,169 @@ +import time +from collections import deque + +import numpy as np + +from cooperative_cuisine.counters import CookingCounter, Counter, Dispenser +from cooperative_cuisine.items import CookingEquipment +from cooperative_cuisine.reinforcement_learning.gym_env import StateToObservationConverter + + +class AdvancedStateConverterArray(StateToObservationConverter): + """ + Converts an environment state to an Encoding where each counter/item has its unique value + """ + + def __init__(self): + self.onehot = False + self.grid_height: int | None = None + self.grid_width: int | None = None + self.counter_list = [ + "Empty", + "Counter", + "PlateDispenser", + "TomatoDispenser", + "OnionDispenser", + "ServingWindow", + "PlateReturn", + "Trashcan", + "Stove", + "CuttingBoard", + "LettuceDispenser", + ] + self.player = "0" + self.item_list = ["None"] + + def setup(self, env, item_info): + """ + Constructor setting basic variables as attributes. + + """ + self.grid_width, self.grid_height = int(env.kitchen_width), int( + env.kitchen_height) + for key in item_info.keys(): + if item_info[key]["type"] == "Equipment" and "equipment" not in item_info[key].keys(): + continue + else: + self.item_list.append(key) + + # ToDO: Here we should initalize the dict that holds all items in order to retrieve them easily afterwards + + def convert_state_to_observation(self, env) -> np.ndarray: + + """ + Convert the environment into an onehot encoding + Args: + env: The environment object used + + Returns: An encoding for the environment state that is not onehot + + """ + + grid_base_array = np.zeros( + ( + self.grid_width, + self.grid_height, + ), + dtype=int, + ) + grid_idxs = [(x, y) for x in range(self.grid_width) for y in range(self.grid_height)] + + # counter_items = np.zeros((self.grid_width, self.grid_height), dtype=int) + counter_items = [[[] for x in range(self.grid_width)] for y in + range(self.grid_height)] + counters = np.zeros((self.grid_width, self.grid_height), dtype=int) + + for counter in env.counters: + grid_idx = np.floor(counter.pos).astype(int) + + counter_oh_idx = self.vectorize_counter( + counter, self.counter_list + ) + grid_base_array[grid_idx[0], grid_idx[1]] = counter_oh_idx + grid_idxs.remove((int(grid_idx[0]), int(grid_idx[1]))) + + # from here on the new items vectorization needs to be implemented. However, not much should change here. + counter_item_oh_idx = self.vectorize_item( + counter.occupied_by, self.item_list + ) + counter_items[int(grid_idx[0])][int(grid_idx[1])] = ( + counter_item_oh_idx + ) + counters[grid_idx] = counter_oh_idx + + for free_idx in grid_idxs: + grid_base_array[free_idx[0], free_idx[1]] = self.counter_list.index("Empty") + counter_items[free_idx[0]][free_idx[1]] = [[0], [0], [0], [0], [0], [0]] + print(counter_items) + player_pos = env.players[self.player].pos.astype(int) + player_dir = env.players[self.player].facing_direction.astype(int) + player_data = np.concatenate((player_pos, player_dir), axis=0) + + player_item_idx = self.vectorize_item( + env.players[self.player].holding, self.item_list + ) + player_item = [player_item_idx] + + final = np.concatenate( + ( + counters.flatten(), + np.array(counter_items).flatten(), + player_data.flatten(), + player_item, + ), + axis=0, + ) + return final + + def vectorize_item(self, item, item_list): + if item is None: + item_name = "None" + elif isinstance(item, deque): + print(item) + if len(item) > 0: + item_name = item[0].name + item = item[0] + else: + item_name = "None" + else: + item_name = item.name + encoding = [] + assert item_name in item_list, f"Unknown item {item_name}." + encoding.append([self.item_list.index(item_name)]) + if item is not None: + encoding.append([item.progress_percentage]) + else: + encoding.append([0]) + containing_items = [] + if isinstance(item, CookingEquipment): + for index in range(len(item.content_list)): + assert item.content_list[index].name in item_list, f"Unknown item {item.content_list[index].name}." + containing_items.append(self.item_list.index(item.content_list[index].name)) + while len(containing_items) < 4: + containing_items.append([0]) + containing_items.sort(reverse=True) + for item in containing_items: + encoding.append(item) + print(encoding) + return np.array(encoding) + + @staticmethod + def vectorize_counter(counter, counter_list): + counter_name = ( + counter.name + if isinstance(counter, CookingCounter) + else ( + repr(counter) + if isinstance(Counter, Dispenser) + else counter.__class__.__name__ + ) + ) + if counter_name == "Dispenser": + counter_name = f"{counter.occupied_by.name}Dispenser" + assert counter_name in counter_list, f"Unknown Counter {counter}" + + counter_oh_idx = counter_list.index("Empty") + if counter_name in counter_list: + counter_oh_idx = counter_list.index(counter_name) + + return counter_oh_idx diff --git a/cooperative_cuisine/reinforcement_learning/obs_converter/advanced_converter_int.py b/cooperative_cuisine/reinforcement_learning/obs_converter/advanced_converter_int.py new file mode 100644 index 0000000000000000000000000000000000000000..5b0d20f336deefab6d0719216d4987398f9cf233 --- /dev/null +++ b/cooperative_cuisine/reinforcement_learning/obs_converter/advanced_converter_int.py @@ -0,0 +1,174 @@ +import time +from collections import deque + +import numpy as np + +from cooperative_cuisine.counters import CookingCounter, Counter, Dispenser +from cooperative_cuisine.items import CookingEquipment +from cooperative_cuisine.reinforcement_learning.gym_env import StateToObservationConverter + + +class AdvancedStateConverterInt(StateToObservationConverter): + """ + Converts an environment state to an Encoding where each counter/item has its unique value + """ + + def __init__(self): + self.onehot = False + self.grid_height: int | None = None + self.grid_width: int | None = None + self.counter_list = [ + "Empty", + "Counter", + "PlateDispenser", + "TomatoDispenser", + "OnionDispenser", + "ServingWindow", + "PlateReturn", + "Trashcan", + "Stove", + "CuttingBoard", + "LettuceDispenser", + ] + self.player = "0" + self.item_list = ["None"] + + def setup(self, env, item_info): + """ + Constructor setting basic variables as attributes. + + """ + self.grid_width, self.grid_height = int(env.kitchen_width), int( + env.kitchen_height) + for key in item_info.keys(): + if item_info[key]["type"] == "Equipment" and "equipment" not in item_info[key].keys(): + continue + else: + self.item_list.append(key) + + # ToDO: Here we should initalize the dict that holds all items in order to retrieve them easily afterwards + + def convert_state_to_observation(self, env) -> np.ndarray: + + """ + Convert the environment into an onehot encoding + Args: + env: The environment object used + + Returns: An encoding for the environment state that is not onehot + + """ + + grid_base_array = np.zeros( + ( + self.grid_width, + self.grid_height, + ), + dtype=int, + ) + grid_idxs = [(x, y) for x in range(self.grid_width) for y in range(self.grid_height)] + + counter_items = np.zeros((self.grid_width, self.grid_height), dtype=int) + counters = np.zeros((self.grid_width, self.grid_height), dtype=int) + + for counter in env.counters: + grid_idx = np.floor(counter.pos).astype(int) + + counter_oh_idx = self.vectorize_counter( + counter, self.counter_list + ) + grid_base_array[grid_idx[0], grid_idx[1]] = counter_oh_idx + grid_idxs.remove((int(grid_idx[0]), int(grid_idx[1]))) + + # from here on the new items vectorization needs to be implemented. However, not much should change here. + counter_item_oh_idx = self.vectorize_item( + counter.occupied_by, self.item_list + ) + counter_items[grid_idx[0], grid_idx[1]] = ( + counter_item_oh_idx + ) + counters[grid_idx] = counter_oh_idx + + for free_idx in grid_idxs: + grid_base_array[free_idx[0], free_idx[1]] = self.counter_list.index("Empty") + player_pos = env.players[self.player].pos.astype(int) + player_dir = env.players[self.player].facing_direction.astype(int) + player_data = np.concatenate((player_pos, player_dir), axis=0) + + player_item_idx = self.vectorize_item( + env.players[self.player].holding, self.item_list + ) + player_item = [player_item_idx] + + final = np.concatenate( + ( + counters.flatten(), + np.array(counter_items).flatten(), + player_data.flatten(), + player_item, + ), + axis=0, + ) + return final + + def vectorize_item(self, item, item_list): + if item is None: + item_name = "None" + elif isinstance(item, deque): + if len(item) > 0: + item = item[0] + item_name = item.name + else: + item = None + item_name = "None" + else: + item_name = item.name + encoding = 0 + assert item_name in item_list, f"Unknown item {item_name}." + idx = self.item_list.index(item_name) + encoding += idx + if idx < 10: + encoding *= 10 + encoding *= 100 + if item is not None: + if item.progress_percentage == 100: + print("Ohohoho Percentage kann 100 werden ") + time.sleep(20) + return "Abort" + encoding += item.progress_percentage + encoding *= 100 + else: + encoding *= 100 + containing_items = [] + if isinstance(item, CookingEquipment): + for index in range(len(item.content_list)): + assert item.content_list[index].name in item_list, f"Unknown item {item.content_list[index].name}." + containing_items.append(self.item_list.index(item.content_list[index].name)) + while len(containing_items) < 4: + containing_items.append(0) + containing_items.sort(reverse=True) + for item in containing_items: + encoding += item + encoding *= 100 + return encoding / 100 + + @staticmethod + def vectorize_counter(counter, counter_list): + counter_name = ( + counter.name + if isinstance(counter, CookingCounter) + else ( + repr(counter) + if isinstance(Counter, Dispenser) + else counter.__class__.__name__ + ) + ) + if counter_name == "Dispenser": + counter_name = f"{counter.occupied_by.name}Dispenser" + assert counter_name in counter_list, f"Unknown Counter {counter}" + + counter_oh_idx = counter_list.index("Empty") + if counter_name in counter_list: + counter_oh_idx = counter_list.index(counter_name) + + return counter_oh_idx diff --git a/cooperative_cuisine/reinforcement_learning/obs_converter/base_converter.py b/cooperative_cuisine/reinforcement_learning/obs_converter/base_converter.py index bccd685674a5de259f9b871dc323cf60764f9d0d..df55ec9a259dccbc37dbc2593b0a9d42903fe7ef 100644 --- a/cooperative_cuisine/reinforcement_learning/obs_converter/base_converter.py +++ b/cooperative_cuisine/reinforcement_learning/obs_converter/base_converter.py @@ -56,7 +56,7 @@ class BaseStateConverter(StateToObservationConverter): ] self.player = "0" - def setup(self, env): + def setup(self, env, item_info): """ Constructor setting basic variables as attributes. diff --git a/cooperative_cuisine/reinforcement_learning/obs_converter/base_converter_onehot.py b/cooperative_cuisine/reinforcement_learning/obs_converter/base_converter_onehot.py index 1a5ce31a7ba25c110c8a8978c79e385538386392..257441bba2975583c7515c30a0dbdb2abe696de4 100644 --- a/cooperative_cuisine/reinforcement_learning/obs_converter/base_converter_onehot.py +++ b/cooperative_cuisine/reinforcement_learning/obs_converter/base_converter_onehot.py @@ -60,12 +60,13 @@ class BaseStateConverterOnehot(StateToObservationConverter): ] self.player = "0" - def setup(self, env): + def setup(self, env, item_info): """ Set the grid width and height according to the present environment Args: env: The environment object used + item_info: The data from the item_config_rl """ self.grid_width, self.grid_height = int(env.kitchen_width), int(