-
Fabian Heinrich authoredFabian Heinrich authored
test_start.py 22.82 KiB
from collections import deque
from datetime import timedelta
import numpy as np
import pytest
import yaml
from cooperative_cuisine import ROOT_DIR
from cooperative_cuisine.action import ActionType, InterActionData, Action
from cooperative_cuisine.counters import (
Counter,
CuttingBoard,
CookingCounter,
ServingWindow,
Trashcan,
PlateDispenser,
Sink,
)
from cooperative_cuisine.effects import FireEffectManager
from cooperative_cuisine.environment import (
Environment,
)
from cooperative_cuisine.game_server import PlayerRequestType
from cooperative_cuisine.hooks import (
Hooks,
SERVE_NOT_ORDERED_MEAL,
PLAYER_ADDED,
POST_STEP,
)
from cooperative_cuisine.info_msg import InfoMsgManager
from cooperative_cuisine.items import Item, ItemInfo, ItemType, Plate, CookingEquipment
from cooperative_cuisine.scores import ScoreViaHooks
from cooperative_cuisine.server_results import (
PlayerInfo,
CreateEnvResult,
PlayerRequestResult,
)
from cooperative_cuisine.state_representation import (
StateRepresentation,
create_json_schema,
)
from cooperative_cuisine.utils import create_init_env_time, get_touching_counters
layouts_folder = ROOT_DIR / "configs" / "layouts"
environment_config_path = ROOT_DIR / "configs" / "environment_config.yaml"
environment_config_no_validation_path = (
ROOT_DIR / "configs" / "environment_config_no_validation.yaml"
)
layout_path = ROOT_DIR / "configs" / "layouts" / "basic.layout"
layout_empty_path = ROOT_DIR / "configs" / "layouts" / "empty.layout"
item_info_path = ROOT_DIR / "configs" / "item_info.yaml"
# TODO: TESTs are in absolute pixel coordinates still.
@pytest.fixture(autouse=True)
def test_file_availability():
assert layouts_folder.is_dir(), "layouts folder does not exists"
assert environment_config_path.is_file(), "environment config file does not exists"
assert (
environment_config_no_validation_path.is_file()
), "environment config file does not exists"
assert layout_path.is_file(), "layout config file does not exists"
assert layout_empty_path.is_file(), "layout empty config file does not exists"
assert item_info_path.is_file(), "item info config file does not exists"
@pytest.fixture
def env_config():
with open(environment_config_path, "r") as file:
env_config = file.read()
return env_config
@pytest.fixture
def env_config_no_validation():
with open(environment_config_no_validation_path, "r") as file:
env_config = file.read()
return env_config
@pytest.fixture
def layout_config():
with open(layout_path, "r") as file:
layout = file.read()
return layout
@pytest.fixture
def layout_empty_config():
with open(layout_empty_path, "r") as file:
layout = file.read()
return layout
@pytest.fixture
def item_info():
with open(item_info_path, "r") as file:
item_info = file.read()
return item_info
def test_player_registration(env_config, layout_config, item_info):
env = Environment(env_config, layout_config, item_info, as_files=False)
env.add_player("1")
assert len(env.players) == 1, "Wrong number of players"
env.add_player("2")
assert len(env.players) == 2, "Wrong number of players"
with pytest.raises(ValueError):
env.add_player("2")
def test_movement(env_config_no_validation, layout_empty_config, item_info):
env = Environment(
env_config_no_validation, layout_empty_config, item_info, as_files=False
)
player_name = "1"
start_pos = np.array([3, 4])
env.add_player(player_name, start_pos)
env.movement.player_movement_speed = 1
move_direction = np.array([1, 0])
move_action = Action(player_name, ActionType.MOVEMENT, move_direction, duration=0.1)
do_moves_number = 3
for i in range(do_moves_number):
env.perform_action(action=move_action)
env.step(timedelta(seconds=0.1))
expected = start_pos + do_moves_number * (
move_direction * env.movement.player_movement_speed * move_action.duration
)
assert np.isclose(
np.linalg.norm(expected - env.players[player_name].pos), 0
), "Performed movement do not move the player as expected."
def test_player_movement_speed(
env_config_no_validation, layout_empty_config, item_info
):
env = Environment(
env_config_no_validation, layout_empty_config, item_info, as_files=False
)
player_name = "1"
start_pos = np.array([3, 4])
env.add_player(player_name, start_pos)
env.movement.player_movement_speed = 2
move_direction = np.array([1, 0])
move_action = Action(player_name, ActionType.MOVEMENT, move_direction, duration=0.1)
do_moves_number = 3
for i in range(do_moves_number):
env.perform_action(action=move_action)
env.step(timedelta(seconds=0.1))
expected = start_pos + do_moves_number * (
move_direction * env.movement.player_movement_speed * move_action.duration
)
assert np.isclose(
np.linalg.norm(expected - env.players[player_name].pos), 0
), "Performed movement do not move the player as expected."
assert StateRepresentation.model_validate_json(
json_data=env.get_json_state(player_id="1")
), "json state does not match expected StateRepresentation."
def test_player_reach(env_config_no_validation, layout_empty_config, item_info):
env = Environment(
env_config_no_validation, layout_empty_config, item_info, as_files=False
)
counter_pos = np.array([2, 2])
counter = Counter(pos=counter_pos, hook=Hooks(env))
env.overwrite_counters([counter])
env.add_player("1", np.array([2, 4]))
env.movement.player_movement_speed = 1
player = env.players["1"]
assert not player.can_reach(counter), "Player is too far away."
do_moves_number = 30
for i in range(do_moves_number):
move_action = Action("1", ActionType.MOVEMENT, np.array([0, -1]), duration=1)
env.perform_action(move_action)
env.step(passed_time=timedelta(seconds=1))
assert player.can_reach(counter), "Player can reach counter?"
def test_pickup(env_config, layout_config, item_info):
env = Environment(env_config, layout_config, item_info, as_files=False)
counter_pos = np.array([2, 2])
counter = Counter(pos=counter_pos, hook=Hooks(env))
counter.occupied_by = Item(name="Tomato", item_info=None)
env.overwrite_counters([counter])
env.add_player("1", np.array([2, 3]))
player = env.players["1"]
env.movement.player_movement_speed = 1
move_down = Action("1", ActionType.MOVEMENT, np.array([0, -1]), duration=1)
move_up = Action("1", ActionType.MOVEMENT, np.array([0, 1]), duration=1)
pick = Action("1", ActionType.PICK_UP_DROP, None)
env.perform_action(move_down)
env.step(timedelta(seconds=1))
assert player.can_reach(counter), "Player can reach counter?"
env.perform_action(pick)
assert player.holding is not None, "Player should have picked up tomato."
assert player.holding.name == "Tomato", "Should be tomato."
for _ in range(5):
env.perform_action(move_up)
env.step(timedelta(seconds=1))
env.perform_action(pick)
assert (
player.holding is not None
), "Player should be too far away to put tomato down."
for _ in range(4):
env.perform_action(move_down)
env.step(timedelta(seconds=1))
env.perform_action(move_down)
env.step(timedelta(seconds=1))
env.perform_action(move_down)
env.step(timedelta(seconds=1))
env.perform_action(move_down)
env.step(timedelta(seconds=1))
env.perform_action(pick)
assert player.holding is None, "Player should have put tomato down."
assert (
counter.occupied_by is not None and counter.occupied_by.name == "Tomato"
), "Tomato should be here now."
def test_processing(env_config, layout_config, item_info):
env = Environment(env_config, layout_config, item_info, as_files=False)
counter_pos = np.array([2, 2])
counter = CuttingBoard(
pos=counter_pos,
hook=Hooks(env),
transitions={
"ChoppedTomato": ItemInfo(
name="ChoppedTomato",
seconds=0.5,
equipment=ItemInfo(name="CuttingBoard", type=ItemType.Equipment),
type=ItemType.Ingredient,
needs=["Tomato"],
)
},
)
env.counters.append(counter)
env.overwrite_counters(env.counters)
tomato = Item(name="Tomato", item_info=None)
env.add_player("1", np.array([2, 3]))
player = env.players["1"]
env.movement.player_movement_speed = 1
player.holding = tomato
move = Action("1", ActionType.MOVEMENT, np.array([0, -1]), duration=1)
pick = Action("1", ActionType.PICK_UP_DROP, None)
env.perform_action(move)
env.step(timedelta(seconds=1))
env.perform_action(pick)
hold_down = Action("1", ActionType.INTERACT, InterActionData.START)
env.perform_action(hold_down)
assert tomato.name != "ChoppedTomato", "Tomato is not finished yet."
env.step(timedelta(seconds=1))
assert tomato.name == "ChoppedTomato", "Tomato should be finished."
button_up = Action("1", ActionType.INTERACT, InterActionData.STOP)
env.perform_action(button_up)
env.perform_action(pick)
assert player.holding.name == "ChoppedTomato", "Tomato should be finished."
def test_time_passed():
np.random.seed(42)
env = Environment(
ROOT_DIR / "configs" / "environment_config.yaml",
layouts_folder / "basic.layout",
ROOT_DIR / "configs" / "item_info.yaml",
)
env.add_player("0")
env.reset_env_time()
passed_time = timedelta(seconds=10)
env.step(passed_time)
assert (
env.env_time == create_init_env_time() + passed_time
), "Env time needs to be updated via the step function"
passed_time_2 = timedelta(seconds=12)
env.step(passed_time_2)
assert (
env.env_time == create_init_env_time() + passed_time + passed_time_2
), "Env time needs to be updated via the step function"
def test_time_limit():
np.random.seed(42)
env = Environment(
ROOT_DIR / "configs" / "environment_config.yaml",
layouts_folder / "basic.layout",
ROOT_DIR / "configs" / "item_info.yaml",
)
env.add_player("0")
env.reset_env_time()
assert not env.game_ended, "Game has not ended yet"
passed_time = timedelta(seconds=10)
env.step(passed_time)
assert not env.game_ended, "Game has not ended yet"
passed_time_2 = timedelta(
seconds=(env.env_time_end - env.start_time).total_seconds()
)
env.step(passed_time_2)
assert env.game_ended, "Game has ended now."
def test_json_schema():
assert isinstance(create_json_schema(), dict)
def test_server_result_definition():
plater_info = PlayerInfo(client_id="123", player_hash="234567890", player_id="0")
CreateEnvResult(
env_id="123344",
player_info={"0": plater_info},
recipe_graphs=[],
kitchen_size=(0, 0),
)
PlayerRequestResult(
request_type=PlayerRequestType.READY,
status=200,
msg="123",
player_hash="1234324",
)
def test_fire(env_config, layout_config, item_info):
env = Environment(env_config, layout_config, item_info, as_files=False)
env.add_player("0")
oven = None
for c in env.counters:
if (
isinstance(c, CookingCounter)
and c.name == "Stove"
and c.occupied_by.name == "Pan"
):
oven = c
break
assert oven is not None
raw_patty = Item(name="RawPatty", item_info=env.item_info["RawPatty"])
assert oven.can_drop_off(raw_patty)
oven.drop_off(raw_patty, "0")
assert isinstance(oven.occupied_by, CookingEquipment)
assert oven.occupied_by.content_list == [raw_patty]
env.step(timedelta(seconds=env.item_info["CookedPatty"].seconds))
assert oven.occupied_by.content_list[0].name == "CookedPatty"
env.step(timedelta(seconds=env.item_info["BurntCookedPatty"].seconds))
assert oven.occupied_by.content_list[0].name == "BurntCookedPatty"
env.step(timedelta(seconds=env.item_info["Fire"].seconds))
assert len(oven.occupied_by.active_effects) != 0
assert oven.occupied_by.active_effects[0].name == "Fire"
fire_manager = env.effect_manager["FireManager"]
assert isinstance(fire_manager, FireEffectManager)
env.step(fire_manager.next_finished_timer - env.env_time)
touching_counters = get_touching_counters(oven, env.counters)
next_empty = None
connect_counter = None
for c in touching_counters:
if c.occupied_by:
assert len(c.occupied_by.active_effects) == 1
else:
assert len(c.active_effects) == 1
next_touching = get_touching_counters(c, env.counters)
for a in next_touching:
if a not in touching_counters and a.__class__.__name__ == "Counter":
a.occupied_by = None
next_empty = a
connect_counter = c
env.step(timedelta(seconds=0.01))
assert next_empty is not None
next_empty.occupied_by = Item(name="Tomato", item_info=env.item_info["Tomato"])
env.step(
fire_manager.effect_to_timer[connect_counter.active_effects[0].uuid]
- env.env_time
)
assert len(next_empty.occupied_by.active_effects) == 1
fire_extinguisher = None
for c in env.counters:
if c.occupied_by and c.occupied_by.name == "Extinguisher":
fire_extinguisher = c.occupied_by
c.occupied_by = None
break
assert fire_extinguisher is not None
env.players["0"].holding = fire_extinguisher
env.players["0"].pos = oven.pos
env.players["0"].pos[1] += 1.0
env.perform_action(
Action(
player="0",
action_type=ActionType.MOVEMENT,
action_data=np.array([0.0, -1.0]),
duration=0.1,
)
)
env.step(timedelta(seconds=0.1))
env.perform_action(
Action(
player="0",
action_type=ActionType.INTERACT,
action_data=InterActionData.START,
)
)
env.step(timedelta(seconds=env.item_info["Extinguisher"].seconds))
env.step(timedelta(seconds=env.item_info["Extinguisher"].seconds))
assert len(oven.occupied_by.active_effects) == 0
def test_score(env_config, layout_config, item_info):
def incr_score_callback(hook_ref, env: Environment, meal, meal_name, **kwargs):
assert isinstance(meal, Item)
assert isinstance(meal_name, str)
assert meal_name == "TomatoSoup"
env.increment_score(1_000, "Soup Soup")
env = Environment(env_config, layout_config, item_info, as_files=False)
assert env.score == 0.0
env.add_player("0")
env.register_callback_for_hook(SERVE_NOT_ORDERED_MEAL, incr_score_callback)
env.register_callback_for_hook(
SERVE_NOT_ORDERED_MEAL,
ScoreViaHooks(
name="123",
env=env,
score_on_specific_kwarg="meal_name",
score_map={"TomatoSoup": 2_000},
),
)
env.register_callback_for_hook(
SERVE_NOT_ORDERED_MEAL,
ScoreViaHooks(name="124", env=env, score_map={SERVE_NOT_ORDERED_MEAL: 4_000}),
)
env.register_callback_for_hook(
SERVE_NOT_ORDERED_MEAL,
ScoreViaHooks(
name="124",
env=env,
score_map={SERVE_NOT_ORDERED_MEAL: 8_000},
kwarg_filter={"meal_name": "TomatoSoup"},
),
)
env.register_callback_for_hook(
SERVE_NOT_ORDERED_MEAL,
ScoreViaHooks(
name="123",
env=env,
score_on_specific_kwarg="meal_name",
static_score=16_000,
score_map={},
),
)
serving_window = None
for c in env.counters:
if isinstance(c, ServingWindow):
serving_window = c
break
assert serving_window is not None
env.order_manager.serving_not_ordered_meals = True
env.order_manager.open_orders = []
plate = Plate(
transitions=env.counter_factory.filter_item_info(
by_item_type=ItemType.Meal, add_effects=True
),
clean=True,
item_info=env.item_info["Plate"],
hook=env.hook,
)
plate.content_list = [
Item(name="TomatoSoup", item_info=env.item_info["TomatoSoup"])
]
assert serving_window.can_drop_off(plate)
returned = serving_window.drop_off(plate, "0")
assert returned is None
plates_prev = len(serving_window.plate_dispenser.occupied_by)
assert env.score >= 31_000
assert len(serving_window.plate_dispenser.out_of_kitchen_timer) == 1
env.step(serving_window.plate_dispenser.out_of_kitchen_timer[0] - env.env_time)
assert len(serving_window.plate_dispenser.out_of_kitchen_timer) == 0
assert len(serving_window.plate_dispenser.occupied_by) == plates_prev + 1
assert (
serving_window.plate_dispenser.occupied_by[0].clean
!= serving_window.plate_dispenser.plate_config.return_dirty
)
def test_info_msgs(env_config, layout_config, item_info):
env_config_dict = yaml.load(env_config, Loader=yaml.Loader)
# TODO change after merge with 115
env_config_dict["hook_callbacks"]["dummy_msg"] = {
"hooks": [PLAYER_ADDED],
"callback_class": InfoMsgManager,
"callback_class_kwargs": {"msg": "hello there"},
}
env_config_dict["hook_callbacks"]["dummy_msg_2"] = {
"hooks": [POST_STEP],
"callback_class": InfoMsgManager,
"callback_class_kwargs": {"msg": "step step"},
}
env_config = yaml.dump(env_config_dict)
env = Environment(env_config, layout_config, item_info, as_files=False)
env.add_player("0")
assert env.info_msgs_per_player["0"][0]["msg"] == "hello there"
env.step(timedelta(seconds=0.1))
assert env.info_msgs_per_player["0"][1]["msg"] == "step step"
env.step(env.info_msgs_per_player["0"][0]["end_time"] - env.env_time)
assert len(env.info_msgs_per_player["0"]) == 2
assert env.info_msgs_per_player["0"][0]["msg"] == "step step"
def test_trashcan(env_config, layout_config, item_info):
env = Environment(env_config, layout_config, item_info, as_files=False)
env.add_player("0")
trash = None
for c in env.counters:
if isinstance(c, Trashcan):
trash = c
break
assert trash is not None
item = Item(name="Tomato", item_info=env.item_info["Tomato"])
assert trash.can_drop_off(item)
assert trash.drop_off(item, "0") is None
plate = Plate(
transitions=env.counter_factory.filter_item_info(
by_item_type=ItemType.Meal, add_effects=True
),
clean=True,
item_info=env.item_info["Plate"],
hook=env.hook,
)
plate.content_list = [
Item(name="TomatoSoup", item_info=env.item_info["TomatoSoup"])
]
assert trash.can_drop_off(plate)
assert trash.drop_off(plate, "0") == plate
assert plate.content_list == []
assert trash.pick_up(True, "0") is None
def test_plate_dispenser(env_config, layout_config, item_info):
env = Environment(env_config, layout_config, item_info, as_files=False)
env.add_player("0")
plate_dis = None
for c in env.counters:
if isinstance(c, PlateDispenser):
plate_dis = c
break
assert plate_dis is not None
assert (
len(plate_dis.occupied_by) > 0
) # Otherwise adapt env_config above before passing to Environment
n_plates = len(plate_dis.occupied_by)
env = Environment(env_config, layout_config, item_info, as_files=False)
env.add_player("0")
item = Item(name="ChoppedTomato", item_info=env.item_info["ChoppedTomato"])
assert plate_dis.can_drop_off(item)
returned = plate_dis.drop_off(item, "0")
assert returned is None
assert len(plate_dis.occupied_by) == n_plates
first_plate = plate_dis.pick_up(True, "0")
assert isinstance(first_plate, Plate)
assert (
first_plate.content_list and first_plate.content_list[0].name == "ChoppedTomato"
)
assert plate_dis.can_drop_off(first_plate)
returned = plate_dis.drop_off(first_plate, "0")
assert isinstance(returned, Plate) and len(returned.content_list) == 0
assert plate_dis.occupied_by[-1].content_list[0].name == "ChoppedTomato"
plate_dis.occupied_by = deque()
assert plate_dis.can_drop_off(item)
returned = plate_dis.drop_off(item, "0")
assert returned is None
assert plate_dis.occupied_by[0].name == "ChoppedTomato"
def test_sink(env_config, layout_config, item_info):
env = Environment(env_config, layout_config, item_info, as_files=False)
env.add_player("0")
sink = None
for c in env.counters:
if isinstance(c, Sink):
sink = c
break
assert sink is not None
env.players["0"].pos = sink.pos
env.players["0"].pos[1] -= 1.0
env.perform_action(
Action(
player="0",
action_type=ActionType.MOVEMENT,
action_data=np.array([0.0, 1.0]),
duration=0.1,
)
)
env.step(timedelta(seconds=0.1))
plate = Plate(
transitions=env.counter_factory.filter_item_info(
by_item_type=ItemType.Meal, add_effects=True
),
clean=False,
item_info=env.item_info["Plate"],
hook=env.hook,
)
plate_2 = Plate(
transitions=env.counter_factory.filter_item_info(
by_item_type=ItemType.Meal, add_effects=True
),
clean=False,
item_info=env.item_info["Plate"],
hook=env.hook,
)
clean_plate = Plate(
transitions=env.counter_factory.filter_item_info(
by_item_type=ItemType.Meal, add_effects=True
),
clean=True,
item_info=env.item_info["Plate"],
hook=env.hook,
)
assert sink.can_drop_off(plate)
assert sink.drop_off(plate, "0") is None
assert sink.can_drop_off(plate_2)
assert sink.drop_off(plate_2, "0") is None
assert not sink.can_drop_off(clean_plate)
assert len(sink.occupied_by) == 2
env.perform_action(
Action(
player="0",
action_type=ActionType.INTERACT,
action_data=InterActionData.START,
)
)
env.step(timedelta(seconds=env.item_info["Plate"].seconds))
assert len(sink.occupied_by) == 1
assert len(sink.sink_addon.occupied_by) == 1
assert sink.sink_addon.occupied_by[0].clean
assert not sink.occupied_by[0].clean
assert sink.pick_up(True, "0") is None
env.step(timedelta(seconds=env.item_info["Plate"].seconds))
assert len(sink.occupied_by) == 0
assert len(sink.sink_addon.occupied_by) == 2
assert sink.sink_addon.occupied_by[0].clean
assert sink.sink_addon.occupied_by[1].clean
item = Item(name="ChoppedTomato", item_info=env.item_info["ChoppedTomato"])
assert sink.sink_addon.can_drop_off(item)
assert sink.sink_addon.drop_off(item, "0") is None
assert sink.sink_addon.pick_up(True, "0").content_list[0].name == "ChoppedTomato"
assert len(sink.sink_addon.occupied_by) == 1