Skip to content
Snippets Groups Projects
Commit 7e2b9783 authored by Fabian Heinrich's avatar Fabian Heinrich
Browse files

Added simple pathfinding to the random agent

- added function for calculating a graph with possible movements
- used astar on this graph for the random agent
parent d76a8d0a
No related branches found
No related tags found
1 merge request!89Resolve "simple pathfinding"
Pipeline #48867 failed
......@@ -7,21 +7,45 @@ import time
from collections import defaultdict
from datetime import datetime, timedelta
import networkx
import numpy as np
import numpy.typing as npt
from websockets import connect
from cooperative_cuisine.action import ActionType, InterActionData, Action
from cooperative_cuisine.state_representation import (
create_movement_graph,
astar_heuristic,
restrict_movement_graph,
)
from cooperative_cuisine.utils import custom_asdict_factory
TIME_TO_STOP_ACTION = 3.0
def get_free_neighbours(
state: dict, counter_pos: list[float] | tuple[float, float] | npt.NDArray
) -> list[tuple[float, float]]:
width, height = state["kitchen"]["width"], state["kitchen"]["height"]
free_space = np.ones((width, height), dtype=bool)
for counter in state["counters"]:
grid_idx = np.array(counter["pos"]).astype(int)
free_space[*grid_idx] = False
i, j = np.array(counter_pos).astype(int)
free = []
for x, y in [(i - 1, j), (i + 1, j), (i, j - 1), (i, j + 1)]:
if 0 < x < width and 0 < y < height and free_space[x, y]:
free.append((x, y))
return free
async def agent():
parser = argparse.ArgumentParser("Random agent")
parser.add_argument("--uri", type=str)
parser.add_argument("--player_id", type=str)
parser.add_argument("--player_hash", type=str)
parser.add_argument("--step_time", type=float, default=0.5)
parser.add_argument("--step_time", type=float, default=0.1)
args = parser.parse_args()
......@@ -35,6 +59,8 @@ async def agent():
counters = None
movement_graph = None
player_info = {}
current_agent_pos = None
interaction_counter = None
......@@ -60,6 +86,9 @@ async def agent():
if not state["all_players_ready"]:
continue
if movement_graph is None:
movement_graph = create_movement_graph(state, diagonal=True)
if counters is None:
counters = defaultdict(list)
for counter in state["counters"]:
......@@ -125,10 +154,71 @@ async def agent():
task_type = None
match task_type:
case "GOTO":
diff = np.array(task_args) - np.array(current_agent_pos)
dist = np.linalg.norm(diff)
if dist > 1.2:
if dist != 0:
target_diff = np.array(task_args) - np.array(current_agent_pos)
target_dist = np.linalg.norm(target_diff)
source = tuple(
np.round(np.array(current_agent_pos)).astype(int)
)
target = tuple(np.array(task_args).astype(int))
target_free_spaces = get_free_neighbours(state, target)
paths = []
for free in target_free_spaces:
try:
modified_graph = restrict_movement_graph(
graph=movement_graph,
player_positions=[
p["pos"]
for p in state["players"]
if p["id"] != args.player_id
],
)
path = networkx.astar_path(
modified_graph,
source,
free,
heuristic=astar_heuristic,
)
paths.append(path)
except networkx.exception.NetworkXNoPath:
pass
except networkx.exception.NodeNotFound:
pass
if paths:
shortest_path = paths[np.argmin([len(p) for p in paths])]
if len(shortest_path) > 1:
node_diff = shortest_path[1] - np.array(
current_agent_pos
)
node_dist = np.linalg.norm(node_diff)
movement = node_diff / node_dist
else:
movement = target_diff / target_dist
else:
movement = np.array([0, 0])
task_type = None
task_args = None
if target_dist > 1.2:
if target_dist != 0:
# random_small_rotation_angle = (
# np.random.random() * np.pi * 0.1
# )
# rotation_matrix = np.array(
# [
# [
# np.cos(random_small_rotation_angle),
# -np.sin(random_small_rotation_angle),
# ],
# [
# np.sin(random_small_rotation_angle),
# np.cos(random_small_rotation_angle),
# ],
# ]
# )
# movement = rotation_matrix @ movement
await websocket.send(
json.dumps(
{
......@@ -137,7 +227,7 @@ async def agent():
Action(
args.player_id,
ActionType.MOVEMENT,
(diff / dist).tolist(),
movement.tolist(),
args.step_time + 0.01,
),
dict_factory=custom_asdict_factory,
......@@ -204,7 +294,9 @@ async def agent():
...
if not task_type:
task_type = random.choice(["GOTO", "PUT", "INTERACT"])
# task_type = random.choice(["GOTO"])
task_type = random.choice(["GOTO", "PUT"])
# task_type = random.choice(["GOTO", "PUT", "INTERACT"])
threshold = datetime.now() + timedelta(seconds=TIME_TO_STOP_ACTION)
if task_type == "GOTO":
counter_type = random.choice(list(counters.keys()))
......
......@@ -193,30 +193,45 @@ class Movement:
updated_movement * (self.player_movement_speed * d_time)
)
# Check collisions with counters
(
collided,
relevant_axes,
nearest_counter_to_player,
) = self.get_counter_collisions(new_targeted_positions)
new_targeted_positions[collided] = player_positions[collided]
# Check if sliding against counters is possible
for idx, player in enumerate(player_positions):
axis = relevant_axes[idx]
if collided[idx]:
# collide with counter left or top
if nearest_counter_to_player[idx][axis] > 0:
updated_movement[idx, axis] = np.max(
[updated_movement[idx, axis], 0]
)
# collide with counter right or bottom
if nearest_counter_to_player[idx][axis] < 0:
updated_movement[idx, axis] = np.min(
[updated_movement[idx, axis], 0]
)
new_positions = player_positions + (
updated_movement * (self.player_movement_speed * d_time)
projected_x = updated_movement.copy()
projected_x[collided, 1] = 0
new_targeted_positions[collided] = player_positions[collided] + (
projected_x[collided] * (self.player_movement_speed * d_time)
)
(
collided,
relevant_axes,
nearest_counter_to_player,
) = self.get_counter_collisions(new_targeted_positions)
new_targeted_positions[collided] = player_positions[collided]
projected_y = updated_movement.copy()
projected_y[collided, 0] = 0
new_targeted_positions[collided] = player_positions[collided] + (
projected_y[collided] * (self.player_movement_speed * d_time)
)
new_positions = new_targeted_positions
# # Check if sliding against counters is possible
# for idx, player in enumerate(player_positions):
# axis = relevant_axes[idx]
# if collided[idx]:
# # collide with counter left or top
# if nearest_counter_to_player[idx][axis] > 0:
# updated_movement[idx, axis] = np.max(
# [updated_movement[idx, axis], 0]
# )
# # collide with counter right or bottom
# if nearest_counter_to_player[idx][axis] < 0:
# updated_movement[idx, axis] = np.min(
# [updated_movement[idx, axis], 0]
# )
# Check collisions with counters again, now absolute with no sliding possible
(
......
......@@ -134,7 +134,9 @@ class Player:
def update_facing_point(self):
"""Update facing point on the player border circle based on the radius."""
self.facing_point = self.pos + (
self.facing_direction * self.player_config.radius * 0.5
self.facing_direction
* self.player_config.radius
* self.player_config.interaction_range
)
def can_reach(self, counter: Counter) -> bool:
......
......@@ -6,9 +6,15 @@ from datetime import datetime
from enum import Enum
from typing import Any
import networkx
import numpy as np
import numpy.typing as npt
from networkx import Graph
from pydantic import BaseModel
from typing_extensions import Literal, TypedDict
from cooperative_cuisine import ROOT_DIR
class OrderState(TypedDict):
"""Format of the state representation of an order."""
......@@ -186,10 +192,136 @@ class StateRepresentation(BaseModel):
"""Added by the game server, indicate if all players are ready and actions are passed to the environment."""
def astar_heuristic(x, y):
return np.linalg.norm(np.array(list(x)) - np.array((y)))
def create_movement_graph(state: StateRepresentation, diagonal=True) -> Graph:
"""
Creates a graph which represents the connections of empty kitchen tiles.
Args:
state: State representation to determine the graph to.
diagonal: if True use 8 way connection, ie diagonal connections between the spaces.
Returns: Graph representing the connections between empty kitchen tiles.
"""
width, height = state["kitchen"]["width"], state["kitchen"]["height"]
free_space = np.ones((width, height), dtype=bool)
for counter in state["counters"]:
grid_idx = np.array(counter["pos"]).round().astype(int)
free_space[*grid_idx] = False
graph = networkx.Graph()
for i in range(width):
for j in range(height):
if free_space[i, j]:
graph.add_node((i, j))
if diagonal:
for di in range(-1, 2):
for dj in range(-1, 2):
x, y = i + di, j + dj
if (
0 < x < width
and 0 < y < height
and free_space[x, y]
and (di, dj) != (0, 0)
):
if np.sum(np.abs(np.array([di, dj]))) == 2:
if free_space[i + di, j] and free_space[i, j + dj]:
graph.add_edge(
(i, j),
(x, y),
weight=np.linalg.norm(
np.array([i - x, j - y])
),
)
else:
graph.add_edge(
(i, j),
(x, y),
weight=np.linalg.norm(np.array([i - x, j - y])),
)
else:
for x, y in [(i - 1, j), (i + 1, j), (i, j - 1), (i, j + 1)]:
if 0 < x < width and 0 < y < height and free_space[x, y]:
graph.add_edge(
(i, j),
(x, y),
weight=1,
)
return graph
def restrict_movement_graph(
graph: Graph,
player_positions: list[tuple[float, float] | list[float]] | npt.NDArray[float],
) -> Graph:
"""Modifies a given connection graph. Removed the nodes of spaces on which a players stand.
Args:
graph: The graph to modify.
player_positions: Positions of players.
Returns: The modified graph without nodes where players stand.
"""
for pos in player_positions:
tup = tuple(np.array(pos).round().astype(int))
if tup in graph.nodes.keys():
graph.remove_node(tup)
return graph
def create_json_schema() -> dict[str, Any]:
"""Create a json scheme of the state representation of an environment."""
return StateRepresentation.model_json_schema()
if __name__ == "__main__":
print(json.dumps(create_json_schema()))
sample_state_path = ROOT_DIR / "pygame_2d_vis" / "sample_state.json"
with open(sample_state_path, "r") as f:
state = json.load(f)
graph = create_movement_graph(state, diagonal=False)
width, height = state["kitchen"]["width"], state["kitchen"]["height"]
free_space = np.ones((width, height), dtype=bool)
for counter in state["counters"]:
grid_idx = np.array(counter["pos"]).round().astype(int)
free_space[*grid_idx] = False
other_players = [[2, 2], [3, 3]]
restricted = restrict_movement_graph(graph, other_players)
print(graph.nodes)
source = (1, 1)
target = (10, 7)
try:
path = networkx.astar_path(
G=restricted, source=source, target=target, heuristic=astar_heuristic
)
except networkx.exception.NetworkXNoPath:
print("NO PATH FOUND")
path = []
width, height = free_space.shape
for i in range(width):
for j in range(height):
if (i, j) == source:
print(" S ", end="")
elif (i, j) == target:
print(" T ", end="")
elif (i, j) in path:
print(" x ", end="")
elif [i, j] in other_players:
print(" O ", end="")
elif free_space[i, j]:
print(" ", end="")
else:
print(" # ", end="")
print()
# print(json.dumps(create_json_schema()))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment