from collections import deque from datetime import timedelta import numpy as np import pytest import yaml from cooperative_cuisine import ROOT_DIR from cooperative_cuisine.action import ActionType, InterActionData, Action from cooperative_cuisine.counters import ( Counter, CuttingBoard, CookingCounter, ServingWindow, Trashcan, PlateDispenser, Sink, ) from cooperative_cuisine.effects import FireEffectManager from cooperative_cuisine.environment import ( Environment, ) from cooperative_cuisine.game_server import PlayerRequestType from cooperative_cuisine.hooks import ( Hooks, SERVE_NOT_ORDERED_MEAL, PLAYER_ADDED, POST_STEP, ) from cooperative_cuisine.info_msg import InfoMsgManager from cooperative_cuisine.items import Item, ItemInfo, ItemType, Plate, CookingEquipment from cooperative_cuisine.scores import ScoreViaHooks from cooperative_cuisine.server_results import ( PlayerInfo, CreateEnvResult, PlayerRequestResult, ) from cooperative_cuisine.state_representation import ( StateRepresentation, create_json_schema, ) from cooperative_cuisine.utils import create_init_env_time, get_touching_counters layouts_folder = ROOT_DIR / "configs" / "layouts" environment_config_path = ROOT_DIR / "configs" / "environment_config.yaml" environment_config_no_validation_path = ( ROOT_DIR / "configs" / "environment_config_no_validation.yaml" ) layout_path = ROOT_DIR / "configs" / "layouts" / "basic.layout" layout_empty_path = ROOT_DIR / "configs" / "layouts" / "empty.layout" item_info_path = ROOT_DIR / "configs" / "item_info.yaml" # TODO: TESTs are in absolute pixel coordinates still. @pytest.fixture(autouse=True) def test_file_availability(): assert layouts_folder.is_dir(), "layouts folder does not exists" assert environment_config_path.is_file(), "environment config file does not exists" assert ( environment_config_no_validation_path.is_file() ), "environment config file does not exists" assert layout_path.is_file(), "layout config file does not exists" assert layout_empty_path.is_file(), "layout empty config file does not exists" assert item_info_path.is_file(), "item info config file does not exists" @pytest.fixture def env_config(): with open(environment_config_path, "r") as file: env_config = file.read() return env_config @pytest.fixture def env_config_no_validation(): with open(environment_config_no_validation_path, "r") as file: env_config = file.read() return env_config @pytest.fixture def layout_config(): with open(layout_path, "r") as file: layout = file.read() return layout @pytest.fixture def layout_empty_config(): with open(layout_empty_path, "r") as file: layout = file.read() return layout @pytest.fixture def item_info(): with open(item_info_path, "r") as file: item_info = file.read() return item_info def test_player_registration(env_config, layout_config, item_info): env = Environment(env_config, layout_config, item_info, as_files=False) env.add_player("1") assert len(env.players) == 1, "Wrong number of players" env.add_player("2") assert len(env.players) == 2, "Wrong number of players" with pytest.raises(ValueError): env.add_player("2") def test_movement(env_config_no_validation, layout_empty_config, item_info): env = Environment( env_config_no_validation, layout_empty_config, item_info, as_files=False ) player_name = "1" start_pos = np.array([3, 4]) env.add_player(player_name, start_pos) env.movement.player_movement_speed = 1 move_direction = np.array([1, 0]) move_action = Action(player_name, ActionType.MOVEMENT, move_direction, duration=0.1) do_moves_number = 3 for i in range(do_moves_number): env.perform_action(action=move_action) env.step(timedelta(seconds=0.1)) expected = start_pos + do_moves_number * ( move_direction * env.movement.player_movement_speed * move_action.duration ) assert np.isclose( np.linalg.norm(expected - env.players[player_name].pos), 0 ), "Performed movement do not move the player as expected." def test_player_movement_speed( env_config_no_validation, layout_empty_config, item_info ): env = Environment( env_config_no_validation, layout_empty_config, item_info, as_files=False ) player_name = "1" start_pos = np.array([3, 4]) env.add_player(player_name, start_pos) env.movement.player_movement_speed = 2 move_direction = np.array([1, 0]) move_action = Action(player_name, ActionType.MOVEMENT, move_direction, duration=0.1) do_moves_number = 3 for i in range(do_moves_number): env.perform_action(action=move_action) env.step(timedelta(seconds=0.1)) expected = start_pos + do_moves_number * ( move_direction * env.movement.player_movement_speed * move_action.duration ) assert np.isclose( np.linalg.norm(expected - env.players[player_name].pos), 0 ), "Performed movement do not move the player as expected." assert StateRepresentation.model_validate_json( json_data=env.get_json_state(player_id="1") ), "json state does not match expected StateRepresentation." def test_player_reach(env_config_no_validation, layout_empty_config, item_info): env = Environment( env_config_no_validation, layout_empty_config, item_info, as_files=False ) counter_pos = np.array([2, 2]) counter = Counter(pos=counter_pos, hook=Hooks(env)) env.overwrite_counters([counter]) env.add_player("1", np.array([2, 4])) env.movement.player_movement_speed = 1 player = env.players["1"] assert not player.can_reach(counter), "Player is too far away." do_moves_number = 30 for i in range(do_moves_number): move_action = Action("1", ActionType.MOVEMENT, np.array([0, -1]), duration=1) env.perform_action(move_action) env.step(passed_time=timedelta(seconds=1)) assert player.can_reach(counter), "Player can reach counter?" def test_pickup(env_config, layout_config, item_info): env = Environment(env_config, layout_config, item_info, as_files=False) counter_pos = np.array([2, 2]) counter = Counter(pos=counter_pos, hook=Hooks(env)) counter.occupied_by = Item(name="Tomato", item_info=None) env.overwrite_counters([counter]) env.add_player("1", np.array([2, 3])) player = env.players["1"] env.movement.player_movement_speed = 1 move_down = Action("1", ActionType.MOVEMENT, np.array([0, -1]), duration=1) move_up = Action("1", ActionType.MOVEMENT, np.array([0, 1]), duration=1) pick = Action("1", ActionType.PICK_UP_DROP, None) env.perform_action(move_down) env.step(timedelta(seconds=1)) assert player.can_reach(counter), "Player can reach counter?" env.perform_action(pick) assert player.holding is not None, "Player should have picked up tomato." assert player.holding.name == "Tomato", "Should be tomato." for _ in range(5): env.perform_action(move_up) env.step(timedelta(seconds=1)) env.perform_action(pick) assert ( player.holding is not None ), "Player should be too far away to put tomato down." for _ in range(4): env.perform_action(move_down) env.step(timedelta(seconds=1)) env.perform_action(move_down) env.step(timedelta(seconds=1)) env.perform_action(move_down) env.step(timedelta(seconds=1)) env.perform_action(move_down) env.step(timedelta(seconds=1)) env.perform_action(pick) assert player.holding is None, "Player should have put tomato down." assert ( counter.occupied_by is not None and counter.occupied_by.name == "Tomato" ), "Tomato should be here now." def test_processing(env_config, layout_config, item_info): env = Environment(env_config, layout_config, item_info, as_files=False) counter_pos = np.array([2, 2]) counter = CuttingBoard( pos=counter_pos, hook=Hooks(env), transitions={ "ChoppedTomato": ItemInfo( name="ChoppedTomato", seconds=0.5, equipment=ItemInfo(name="CuttingBoard", type=ItemType.Equipment), type=ItemType.Ingredient, needs=["Tomato"], ) }, ) env.counters.append(counter) env.overwrite_counters(env.counters) tomato = Item(name="Tomato", item_info=None) env.add_player("1", np.array([2, 3])) player = env.players["1"] env.movement.player_movement_speed = 1 player.holding = tomato move = Action("1", ActionType.MOVEMENT, np.array([0, -1]), duration=1) pick = Action("1", ActionType.PICK_UP_DROP, None) env.perform_action(move) env.step(timedelta(seconds=1)) env.perform_action(pick) hold_down = Action("1", ActionType.INTERACT, InterActionData.START) env.perform_action(hold_down) assert tomato.name != "ChoppedTomato", "Tomato is not finished yet." env.step(timedelta(seconds=1)) assert tomato.name == "ChoppedTomato", "Tomato should be finished." button_up = Action("1", ActionType.INTERACT, InterActionData.STOP) env.perform_action(button_up) env.perform_action(pick) assert player.holding.name == "ChoppedTomato", "Tomato should be finished." def test_time_passed(): np.random.seed(42) env = Environment( ROOT_DIR / "configs" / "environment_config.yaml", layouts_folder / "basic.layout", ROOT_DIR / "configs" / "item_info.yaml", ) env.add_player("0") env.reset_env_time() passed_time = timedelta(seconds=10) env.step(passed_time) assert ( env.env_time == create_init_env_time() + passed_time ), "Env time needs to be updated via the step function" passed_time_2 = timedelta(seconds=12) env.step(passed_time_2) assert ( env.env_time == create_init_env_time() + passed_time + passed_time_2 ), "Env time needs to be updated via the step function" def test_time_limit(): np.random.seed(42) env = Environment( ROOT_DIR / "configs" / "environment_config.yaml", layouts_folder / "basic.layout", ROOT_DIR / "configs" / "item_info.yaml", ) env.add_player("0") env.reset_env_time() assert not env.game_ended, "Game has not ended yet" passed_time = timedelta(seconds=10) env.step(passed_time) assert not env.game_ended, "Game has not ended yet" passed_time_2 = timedelta( seconds=(env.env_time_end - env.start_time).total_seconds() ) env.step(passed_time_2) assert env.game_ended, "Game has ended now." def test_json_schema(): assert isinstance(create_json_schema(), dict) def test_server_result_definition(): plater_info = PlayerInfo(client_id="123", player_hash="234567890", player_id="0") CreateEnvResult( env_id="123344", player_info={"0": plater_info}, recipe_graphs=[], kitchen_size=(0, 0), ) PlayerRequestResult( request_type=PlayerRequestType.READY, status=200, msg="123", player_hash="1234324", ) def test_fire(env_config, layout_config, item_info): env = Environment(env_config, layout_config, item_info, as_files=False) env.add_player("0") oven = None for c in env.counters: if ( isinstance(c, CookingCounter) and c.name == "Stove" and c.occupied_by.name == "Pan" ): oven = c break assert oven is not None raw_patty = Item(name="RawPatty", item_info=env.item_info["RawPatty"]) assert oven.can_drop_off(raw_patty) oven.drop_off(raw_patty, "0") assert isinstance(oven.occupied_by, CookingEquipment) assert oven.occupied_by.content_list == [raw_patty] env.step(timedelta(seconds=env.item_info["CookedPatty"].seconds)) assert oven.occupied_by.content_list[0].name == "CookedPatty" env.step(timedelta(seconds=env.item_info["BurntCookedPatty"].seconds)) assert oven.occupied_by.content_list[0].name == "BurntCookedPatty" env.step(timedelta(seconds=env.item_info["Fire"].seconds)) assert len(oven.occupied_by.active_effects) != 0 assert oven.occupied_by.active_effects[0].name == "Fire" fire_manager = env.effect_manager["FireManager"] assert isinstance(fire_manager, FireEffectManager) env.step(fire_manager.next_finished_timer - env.env_time) touching_counters = get_touching_counters(oven, env.counters) next_empty = None connect_counter = None for c in touching_counters: if c.occupied_by: assert len(c.occupied_by.active_effects) == 1 else: assert len(c.active_effects) == 1 next_touching = get_touching_counters(c, env.counters) for a in next_touching: if a not in touching_counters and a.__class__.__name__ == "Counter": a.occupied_by = None next_empty = a connect_counter = c env.step(timedelta(seconds=0.01)) assert next_empty is not None next_empty.occupied_by = Item(name="Tomato", item_info=env.item_info["Tomato"]) env.step( fire_manager.effect_to_timer[connect_counter.active_effects[0].uuid] - env.env_time ) assert len(next_empty.occupied_by.active_effects) == 1 fire_extinguisher = None for c in env.counters: if c.occupied_by and c.occupied_by.name == "Extinguisher": fire_extinguisher = c.occupied_by c.occupied_by = None break assert fire_extinguisher is not None env.players["0"].holding = fire_extinguisher env.players["0"].pos = oven.pos env.players["0"].pos[1] += 1.0 env.perform_action( Action( player="0", action_type=ActionType.MOVEMENT, action_data=np.array([0.0, -1.0]), duration=0.1, ) ) env.step(timedelta(seconds=0.1)) env.perform_action( Action( player="0", action_type=ActionType.INTERACT, action_data=InterActionData.START, ) ) env.step(timedelta(seconds=env.item_info["Extinguisher"].seconds)) env.step(timedelta(seconds=env.item_info["Extinguisher"].seconds)) assert len(oven.occupied_by.active_effects) == 0 def test_score(env_config, layout_config, item_info): def incr_score_callback(hook_ref, env: Environment, meal, meal_name, **kwargs): assert isinstance(meal, Item) assert isinstance(meal_name, str) assert meal_name == "TomatoSoup" env.increment_score(1_000, "Soup Soup") env = Environment(env_config, layout_config, item_info, as_files=False) assert env.score == 0.0 env.add_player("0") env.register_callback_for_hook(SERVE_NOT_ORDERED_MEAL, incr_score_callback) env.register_callback_for_hook( SERVE_NOT_ORDERED_MEAL, ScoreViaHooks( name="123", env=env, score_on_specific_kwarg="meal_name", score_map={"TomatoSoup": 2_000}, ), ) env.register_callback_for_hook( SERVE_NOT_ORDERED_MEAL, ScoreViaHooks(name="124", env=env, score_map={SERVE_NOT_ORDERED_MEAL: 4_000}), ) env.register_callback_for_hook( SERVE_NOT_ORDERED_MEAL, ScoreViaHooks( name="124", env=env, score_map={SERVE_NOT_ORDERED_MEAL: 8_000}, kwarg_filter={"meal_name": "TomatoSoup"}, ), ) env.register_callback_for_hook( SERVE_NOT_ORDERED_MEAL, ScoreViaHooks( name="123", env=env, score_on_specific_kwarg="meal_name", static_score=16_000, score_map={}, ), ) serving_window = None for c in env.counters: if isinstance(c, ServingWindow): serving_window = c break assert serving_window is not None env.order_manager.serving_not_ordered_meals = True env.order_manager.open_orders = [] plate = Plate( transitions=env.counter_factory.filter_item_info( by_item_type=ItemType.Meal, add_effects=True ), clean=True, item_info=env.item_info["Plate"], hook=env.hook, ) plate.content_list = [ Item(name="TomatoSoup", item_info=env.item_info["TomatoSoup"]) ] assert serving_window.can_drop_off(plate) returned = serving_window.drop_off(plate, "0") assert returned is None plates_prev = len(serving_window.plate_dispenser.occupied_by) assert env.score >= 31_000 assert len(serving_window.plate_dispenser.out_of_kitchen_timer) == 1 env.step(serving_window.plate_dispenser.out_of_kitchen_timer[0] - env.env_time) assert len(serving_window.plate_dispenser.out_of_kitchen_timer) == 0 assert len(serving_window.plate_dispenser.occupied_by) == plates_prev + 1 assert ( serving_window.plate_dispenser.occupied_by[0].clean != serving_window.plate_dispenser.plate_config.return_dirty ) def test_info_msgs(env_config, layout_config, item_info): env_config_dict = yaml.load(env_config, Loader=yaml.Loader) # TODO change after merge with 115 env_config_dict["hook_callbacks"]["dummy_msg"] = { "hooks": [PLAYER_ADDED], "callback_class": InfoMsgManager, "callback_class_kwargs": {"msg": "hello there"}, } env_config_dict["hook_callbacks"]["dummy_msg_2"] = { "hooks": [POST_STEP], "callback_class": InfoMsgManager, "callback_class_kwargs": {"msg": "step step"}, } env_config = yaml.dump(env_config_dict) env = Environment(env_config, layout_config, item_info, as_files=False) env.add_player("0") assert env.info_msgs_per_player["0"][0]["msg"] == "hello there" env.step(timedelta(seconds=0.1)) assert env.info_msgs_per_player["0"][1]["msg"] == "step step" env.step(env.info_msgs_per_player["0"][0]["end_time"] - env.env_time) assert len(env.info_msgs_per_player["0"]) == 2 assert env.info_msgs_per_player["0"][0]["msg"] == "step step" def test_trashcan(env_config, layout_config, item_info): env = Environment(env_config, layout_config, item_info, as_files=False) env.add_player("0") trash = None for c in env.counters: if isinstance(c, Trashcan): trash = c break assert trash is not None item = Item(name="Tomato", item_info=env.item_info["Tomato"]) assert trash.can_drop_off(item) assert trash.drop_off(item, "0") is None plate = Plate( transitions=env.counter_factory.filter_item_info( by_item_type=ItemType.Meal, add_effects=True ), clean=True, item_info=env.item_info["Plate"], hook=env.hook, ) plate.content_list = [ Item(name="TomatoSoup", item_info=env.item_info["TomatoSoup"]) ] assert trash.can_drop_off(plate) assert trash.drop_off(plate, "0") == plate assert plate.content_list == [] assert trash.pick_up(True, "0") is None def test_plate_dispenser(env_config, layout_config, item_info): env = Environment(env_config, layout_config, item_info, as_files=False) env.add_player("0") plate_dis = None for c in env.counters: if isinstance(c, PlateDispenser): plate_dis = c break assert plate_dis is not None assert ( len(plate_dis.occupied_by) > 0 ) # Otherwise adapt env_config above before passing to Environment n_plates = len(plate_dis.occupied_by) env = Environment(env_config, layout_config, item_info, as_files=False) env.add_player("0") item = Item(name="ChoppedTomato", item_info=env.item_info["ChoppedTomato"]) assert plate_dis.can_drop_off(item) returned = plate_dis.drop_off(item, "0") assert returned is None assert len(plate_dis.occupied_by) == n_plates first_plate = plate_dis.pick_up(True, "0") assert isinstance(first_plate, Plate) assert ( first_plate.content_list and first_plate.content_list[0].name == "ChoppedTomato" ) assert plate_dis.can_drop_off(first_plate) returned = plate_dis.drop_off(first_plate, "0") assert isinstance(returned, Plate) and len(returned.content_list) == 0 assert plate_dis.occupied_by[-1].content_list[0].name == "ChoppedTomato" plate_dis.occupied_by = deque() assert plate_dis.can_drop_off(item) returned = plate_dis.drop_off(item, "0") assert returned is None assert plate_dis.occupied_by[0].name == "ChoppedTomato" def test_sink(env_config, layout_config, item_info): env = Environment(env_config, layout_config, item_info, as_files=False) env.add_player("0") sink = None for c in env.counters: if isinstance(c, Sink): sink = c break assert sink is not None env.players["0"].pos = sink.pos env.players["0"].pos[1] -= 1.0 env.perform_action( Action( player="0", action_type=ActionType.MOVEMENT, action_data=np.array([0.0, 1.0]), duration=0.1, ) ) env.step(timedelta(seconds=0.1)) plate = Plate( transitions=env.counter_factory.filter_item_info( by_item_type=ItemType.Meal, add_effects=True ), clean=False, item_info=env.item_info["Plate"], hook=env.hook, ) plate_2 = Plate( transitions=env.counter_factory.filter_item_info( by_item_type=ItemType.Meal, add_effects=True ), clean=False, item_info=env.item_info["Plate"], hook=env.hook, ) clean_plate = Plate( transitions=env.counter_factory.filter_item_info( by_item_type=ItemType.Meal, add_effects=True ), clean=True, item_info=env.item_info["Plate"], hook=env.hook, ) assert sink.can_drop_off(plate) assert sink.drop_off(plate, "0") is None assert sink.can_drop_off(plate_2) assert sink.drop_off(plate_2, "0") is None assert not sink.can_drop_off(clean_plate) assert len(sink.occupied_by) == 2 env.perform_action( Action( player="0", action_type=ActionType.INTERACT, action_data=InterActionData.START, ) ) env.step(timedelta(seconds=env.item_info["Plate"].seconds)) assert len(sink.occupied_by) == 1 assert len(sink.sink_addon.occupied_by) == 1 assert sink.sink_addon.occupied_by[0].clean assert not sink.occupied_by[0].clean assert sink.pick_up(True, "0") is None env.step(timedelta(seconds=env.item_info["Plate"].seconds)) assert len(sink.occupied_by) == 0 assert len(sink.sink_addon.occupied_by) == 2 assert sink.sink_addon.occupied_by[0].clean assert sink.sink_addon.occupied_by[1].clean item = Item(name="ChoppedTomato", item_info=env.item_info["ChoppedTomato"]) assert sink.sink_addon.can_drop_off(item) assert sink.sink_addon.drop_off(item, "0") is None assert sink.sink_addon.pick_up(True, "0").content_list[0].name == "ChoppedTomato" assert len(sink.sink_addon.occupied_by) == 1