Skip to content
Snippets Groups Projects

Resolve "simple pathfinding"

Merged Fabian Heinrich requested to merge 120-simple-pathfinding into dev
Files
5
@@ -22,7 +22,9 @@ from cooperative_cuisine.utils import custom_asdict_factory
TIME_TO_STOP_ACTION = 3.0
ADD_RANDOM_MOVEMENTS = True
ADD_RANDOM_MOVEMENTS = False
DIAGONAL_MOVEMENTS = True
AVOID_OTHER_PLAYERS = True
def get_free_neighbours(
@@ -32,7 +34,7 @@ def get_free_neighbours(
free_space = np.ones((width, height), dtype=bool)
for counter in state["counters"]:
grid_idx = np.array(counter["pos"]).astype(int)
free_space[*grid_idx] = False
free_space[grid_idx[0], grid_idx[1]] = False
i, j = np.array(counter_pos).astype(int)
free = []
@@ -90,8 +92,9 @@ async def agent():
continue
if movement_graph is None:
movement_graph = create_movement_graph(state, diagonal=True)
movement_graph = create_movement_graph(
state, diagonal=DIAGONAL_MOVEMENTS
)
if counters is None:
counters = defaultdict(list)
@@ -170,16 +173,17 @@ async def agent():
paths = []
for free in target_free_spaces:
try:
modified_graph = restrict_movement_graph(
graph=movement_graph.copy(),
player_positions=[
p["pos"]
for p in state["players"]
if p["id"] != args.player_id
],
)
path = networkx.astar_path(
modified_graph,
restrict_movement_graph(
graph=movement_graph,
player_positions=[
p["pos"]
for p in state["players"]
if p["id"] != args.player_id
],
)
if AVOID_OTHER_PLAYERS
else movement_graph,
source,
free,
heuristic=astar_heuristic,
@@ -203,6 +207,8 @@ async def agent():
do_movement = True
else:
# no paths available
print("NO PATHS")
# task_type = None
# task_args = None
do_movement = False
@@ -247,6 +253,7 @@ async def agent():
await websocket.recv()
else:
# Target reached here.
print("TARGET REACHED")
task_type = None
task_args = None
case "INTERACT":
@@ -303,9 +310,8 @@ async def agent():
...
if not task_type:
# task_type = random.choice(["GOTO"])
task_type = random.choice(["GOTO", "PUT"])
# task_type = random.choice(["GOTO", "PUT", "INTERACT"])
task_type = random.choice(["GOTO"])
threshold = datetime.now() + timedelta(seconds=TIME_TO_STOP_ACTION)
if task_type == "GOTO":
# counter_type = random.choice(list(counters.keys()))
@@ -314,7 +320,13 @@ async def agent():
random_counter = random.choice(all_counters)
counter_type = random_counter["type"]
task_args = random_counter["pos"]
print(args.player_hash, args.player_id, task_type, counter_type, task_args)
print(
args.player_hash,
args.player_id,
task_type,
counter_type,
task_args,
)
else:
print(args.player_hash, args.player_id, task_type)
task_args = None
Loading