Skip to content
Snippets Groups Projects
Commit cf2c3aed authored by Christoph Kowalski's avatar Christoph Kowalski
Browse files

Added a new approach to converting the observation to a state

parent 433bf225
No related branches found
No related tags found
2 merge requests!110V1.2.0 changes,!109SB3 RL with Hydra
Pipeline #60269 passed
# Here the filename of the converter should be given. The converter class needs to be called StateConverter and implement the abstract StateToObservationConverter class
state_converter:
_target_: "cooperative_cuisine.reinforcement_learning.obs_converter.base_converter.BaseStateConverter"
_target_: "cooperative_cuisine.reinforcement_learning.obs_converter.advanced_converter_int.AdvancedStateConverterInt"
log_path: "logs/reinforcement_learning"
checkpoint_path: "rl_agent_checkpoints"
render_mode: "rgb_array"
......
......@@ -130,7 +130,7 @@ class StateToObservationConverter:
"""
@abstractmethod
def setup(self, env):
def setup(self, env, item_info):
...
@abstractmethod
......@@ -197,7 +197,8 @@ class EnvGymWrapper(Env):
self.seen_items = []
self.converter = instantiate(config.additional_configs.state_converter)
self.converter.setup(self.env)
# self.converter.setup could also get the item info config in order to get all the possible items.
self.converter.setup(self.env, self.config_item_info)
if hasattr(self.converter, "onehot"):
self.onehot_state = self.converter.onehot
else:
......
import time
from collections import deque
import numpy as np
from cooperative_cuisine.counters import CookingCounter, Counter, Dispenser
from cooperative_cuisine.items import CookingEquipment
from cooperative_cuisine.reinforcement_learning.gym_env import StateToObservationConverter
class AdvancedStateConverterArray(StateToObservationConverter):
"""
Converts an environment state to an Encoding where each counter/item has its unique value
"""
def __init__(self):
self.onehot = False
self.grid_height: int | None = None
self.grid_width: int | None = None
self.counter_list = [
"Empty",
"Counter",
"PlateDispenser",
"TomatoDispenser",
"OnionDispenser",
"ServingWindow",
"PlateReturn",
"Trashcan",
"Stove",
"CuttingBoard",
"LettuceDispenser",
]
self.player = "0"
self.item_list = ["None"]
def setup(self, env, item_info):
"""
Constructor setting basic variables as attributes.
"""
self.grid_width, self.grid_height = int(env.kitchen_width), int(
env.kitchen_height)
for key in item_info.keys():
if item_info[key]["type"] == "Equipment" and "equipment" not in item_info[key].keys():
continue
else:
self.item_list.append(key)
# ToDO: Here we should initalize the dict that holds all items in order to retrieve them easily afterwards
def convert_state_to_observation(self, env) -> np.ndarray:
"""
Convert the environment into an onehot encoding
Args:
env: The environment object used
Returns: An encoding for the environment state that is not onehot
"""
grid_base_array = np.zeros(
(
self.grid_width,
self.grid_height,
),
dtype=int,
)
grid_idxs = [(x, y) for x in range(self.grid_width) for y in range(self.grid_height)]
# counter_items = np.zeros((self.grid_width, self.grid_height), dtype=int)
counter_items = [[[] for x in range(self.grid_width)] for y in
range(self.grid_height)]
counters = np.zeros((self.grid_width, self.grid_height), dtype=int)
for counter in env.counters:
grid_idx = np.floor(counter.pos).astype(int)
counter_oh_idx = self.vectorize_counter(
counter, self.counter_list
)
grid_base_array[grid_idx[0], grid_idx[1]] = counter_oh_idx
grid_idxs.remove((int(grid_idx[0]), int(grid_idx[1])))
# from here on the new items vectorization needs to be implemented. However, not much should change here.
counter_item_oh_idx = self.vectorize_item(
counter.occupied_by, self.item_list
)
counter_items[int(grid_idx[0])][int(grid_idx[1])] = (
counter_item_oh_idx
)
counters[grid_idx] = counter_oh_idx
for free_idx in grid_idxs:
grid_base_array[free_idx[0], free_idx[1]] = self.counter_list.index("Empty")
counter_items[free_idx[0]][free_idx[1]] = [[0], [0], [0], [0], [0], [0]]
print(counter_items)
player_pos = env.players[self.player].pos.astype(int)
player_dir = env.players[self.player].facing_direction.astype(int)
player_data = np.concatenate((player_pos, player_dir), axis=0)
player_item_idx = self.vectorize_item(
env.players[self.player].holding, self.item_list
)
player_item = [player_item_idx]
final = np.concatenate(
(
counters.flatten(),
np.array(counter_items).flatten(),
player_data.flatten(),
player_item,
),
axis=0,
)
return final
def vectorize_item(self, item, item_list):
if item is None:
item_name = "None"
elif isinstance(item, deque):
print(item)
if len(item) > 0:
item_name = item[0].name
item = item[0]
else:
item_name = "None"
else:
item_name = item.name
encoding = []
assert item_name in item_list, f"Unknown item {item_name}."
encoding.append([self.item_list.index(item_name)])
if item is not None:
encoding.append([item.progress_percentage])
else:
encoding.append([0])
containing_items = []
if isinstance(item, CookingEquipment):
for index in range(len(item.content_list)):
assert item.content_list[index].name in item_list, f"Unknown item {item.content_list[index].name}."
containing_items.append(self.item_list.index(item.content_list[index].name))
while len(containing_items) < 4:
containing_items.append([0])
containing_items.sort(reverse=True)
for item in containing_items:
encoding.append(item)
print(encoding)
return np.array(encoding)
@staticmethod
def vectorize_counter(counter, counter_list):
counter_name = (
counter.name
if isinstance(counter, CookingCounter)
else (
repr(counter)
if isinstance(Counter, Dispenser)
else counter.__class__.__name__
)
)
if counter_name == "Dispenser":
counter_name = f"{counter.occupied_by.name}Dispenser"
assert counter_name in counter_list, f"Unknown Counter {counter}"
counter_oh_idx = counter_list.index("Empty")
if counter_name in counter_list:
counter_oh_idx = counter_list.index(counter_name)
return counter_oh_idx
import time
from collections import deque
import numpy as np
from cooperative_cuisine.counters import CookingCounter, Counter, Dispenser
from cooperative_cuisine.items import CookingEquipment
from cooperative_cuisine.reinforcement_learning.gym_env import StateToObservationConverter
class AdvancedStateConverterInt(StateToObservationConverter):
"""
Converts an environment state to an Encoding where each counter/item has its unique value
"""
def __init__(self):
self.onehot = False
self.grid_height: int | None = None
self.grid_width: int | None = None
self.counter_list = [
"Empty",
"Counter",
"PlateDispenser",
"TomatoDispenser",
"OnionDispenser",
"ServingWindow",
"PlateReturn",
"Trashcan",
"Stove",
"CuttingBoard",
"LettuceDispenser",
]
self.player = "0"
self.item_list = ["None"]
def setup(self, env, item_info):
"""
Constructor setting basic variables as attributes.
"""
self.grid_width, self.grid_height = int(env.kitchen_width), int(
env.kitchen_height)
for key in item_info.keys():
if item_info[key]["type"] == "Equipment" and "equipment" not in item_info[key].keys():
continue
else:
self.item_list.append(key)
# ToDO: Here we should initalize the dict that holds all items in order to retrieve them easily afterwards
def convert_state_to_observation(self, env) -> np.ndarray:
"""
Convert the environment into an onehot encoding
Args:
env: The environment object used
Returns: An encoding for the environment state that is not onehot
"""
grid_base_array = np.zeros(
(
self.grid_width,
self.grid_height,
),
dtype=int,
)
grid_idxs = [(x, y) for x in range(self.grid_width) for y in range(self.grid_height)]
counter_items = np.zeros((self.grid_width, self.grid_height), dtype=int)
counters = np.zeros((self.grid_width, self.grid_height), dtype=int)
for counter in env.counters:
grid_idx = np.floor(counter.pos).astype(int)
counter_oh_idx = self.vectorize_counter(
counter, self.counter_list
)
grid_base_array[grid_idx[0], grid_idx[1]] = counter_oh_idx
grid_idxs.remove((int(grid_idx[0]), int(grid_idx[1])))
# from here on the new items vectorization needs to be implemented. However, not much should change here.
counter_item_oh_idx = self.vectorize_item(
counter.occupied_by, self.item_list
)
counter_items[grid_idx[0], grid_idx[1]] = (
counter_item_oh_idx
)
counters[grid_idx] = counter_oh_idx
for free_idx in grid_idxs:
grid_base_array[free_idx[0], free_idx[1]] = self.counter_list.index("Empty")
player_pos = env.players[self.player].pos.astype(int)
player_dir = env.players[self.player].facing_direction.astype(int)
player_data = np.concatenate((player_pos, player_dir), axis=0)
player_item_idx = self.vectorize_item(
env.players[self.player].holding, self.item_list
)
player_item = [player_item_idx]
final = np.concatenate(
(
counters.flatten(),
np.array(counter_items).flatten(),
player_data.flatten(),
player_item,
),
axis=0,
)
return final
def vectorize_item(self, item, item_list):
if item is None:
item_name = "None"
elif isinstance(item, deque):
if len(item) > 0:
item = item[0]
item_name = item.name
else:
item = None
item_name = "None"
else:
item_name = item.name
encoding = 0
assert item_name in item_list, f"Unknown item {item_name}."
idx = self.item_list.index(item_name)
encoding += idx
if idx < 10:
encoding *= 10
encoding *= 100
if item is not None:
if item.progress_percentage == 100:
print("Ohohoho Percentage kann 100 werden ")
time.sleep(20)
return "Abort"
encoding += item.progress_percentage
encoding *= 100
else:
encoding *= 100
containing_items = []
if isinstance(item, CookingEquipment):
for index in range(len(item.content_list)):
assert item.content_list[index].name in item_list, f"Unknown item {item.content_list[index].name}."
containing_items.append(self.item_list.index(item.content_list[index].name))
while len(containing_items) < 4:
containing_items.append(0)
containing_items.sort(reverse=True)
for item in containing_items:
encoding += item
encoding *= 100
return encoding / 100
@staticmethod
def vectorize_counter(counter, counter_list):
counter_name = (
counter.name
if isinstance(counter, CookingCounter)
else (
repr(counter)
if isinstance(Counter, Dispenser)
else counter.__class__.__name__
)
)
if counter_name == "Dispenser":
counter_name = f"{counter.occupied_by.name}Dispenser"
assert counter_name in counter_list, f"Unknown Counter {counter}"
counter_oh_idx = counter_list.index("Empty")
if counter_name in counter_list:
counter_oh_idx = counter_list.index(counter_name)
return counter_oh_idx
......@@ -56,7 +56,7 @@ class BaseStateConverter(StateToObservationConverter):
]
self.player = "0"
def setup(self, env):
def setup(self, env, item_info):
"""
Constructor setting basic variables as attributes.
......
......@@ -60,12 +60,13 @@ class BaseStateConverterOnehot(StateToObservationConverter):
]
self.player = "0"
def setup(self, env):
def setup(self, env, item_info):
"""
Set the grid width and height according to the present environment
Args:
env: The environment object used
item_info: The data from the item_config_rl
"""
self.grid_width, self.grid_height = int(env.kitchen_width), int(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment