Skip to content
Snippets Groups Projects
Verified Commit 9c5d4d7a authored by Daniel Göbel's avatar Daniel Göbel
Browse files

Fix race condition during job monitoring

#56
parent 2121c602
No related branches found
No related tags found
No related merge requests found
...@@ -212,7 +212,9 @@ async def start_workflow_execution( ...@@ -212,7 +212,9 @@ async def start_workflow_execution(
await CRUDWorkflowExecution.update_slurm_job_id( await CRUDWorkflowExecution.update_slurm_job_id(
db, slurm_job_id=slurm_job_id, execution_id=execution.execution_id db, slurm_job_id=slurm_job_id, execution_id=execution.execution_id
) )
await _monitor_proper_job_execution(db=db, slurm_client=slurm_client, execution_id=execution.execution_id) await _monitor_proper_job_execution(
db=db, slurm_client=slurm_client, execution_id=execution.execution_id, slurm_job_id=slurm_job_id
)
except (ConnectError, ConnectTimeout): # pragma: no cover except (ConnectError, ConnectTimeout): # pragma: no cover
# Mark job as aborted when there is an error # Mark job as aborted when there is an error
await CRUDWorkflowExecution.cancel( await CRUDWorkflowExecution.cancel(
...@@ -221,7 +223,7 @@ async def start_workflow_execution( ...@@ -221,7 +223,7 @@ async def start_workflow_execution(
async def _monitor_proper_job_execution( async def _monitor_proper_job_execution(
db: AsyncSession, slurm_client: SlurmClient, execution_id: UUID db: AsyncSession, slurm_client: SlurmClient, execution_id: UUID, slurm_job_id: int
) -> None: # pragma: no cover ) -> None: # pragma: no cover
""" """
Checks every settings.SLURM_JOB_STATUS_CHECK_INTERVAL seconds if the slurm job is still running as long as Checks every settings.SLURM_JOB_STATUS_CHECK_INTERVAL seconds if the slurm job is still running as long as
...@@ -235,17 +237,19 @@ async def _monitor_proper_job_execution( ...@@ -235,17 +237,19 @@ async def _monitor_proper_job_execution(
Slurm Rest Client to communicate with Slurm cluster. Slurm Rest Client to communicate with Slurm cluster.
execution_id : uuid.UUID execution_id : uuid.UUID
ID of the workflow execution ID of the workflow execution
slurm_job_id : int
ID of the slurm job to monitor
""" """
while True: while True:
await async_sleep(settings.SLURM_JOB_STATUS_CHECK_INTERVAL) await async_sleep(settings.SLURM_JOB_STATUS_CHECK_INTERVAL)
execution = await CRUDWorkflowExecution.get(db, execution_id=execution_id) if await slurm_client.is_job_finished(slurm_job_id):
if execution is None or execution.end_time is not None: execution = await CRUDWorkflowExecution.get(db, execution_id=execution_id)
break # Check if the execution is marked as finished in the database
if await slurm_client.is_job_finished(execution.slurm_job_id): if execution is not None and execution.end_time is None:
# Mark job as finished with an error when the slurm job is finished # Mark job as finished with an error
await CRUDWorkflowExecution.cancel( await CRUDWorkflowExecution.cancel(
db, execution_id=execution_id, status=WorkflowExecution.WorkflowExecutionStatus.ERROR db, execution_id=execution_id, status=WorkflowExecution.WorkflowExecutionStatus.ERROR
) )
break break
......
...@@ -98,6 +98,6 @@ class SlurmClient: ...@@ -98,6 +98,6 @@ class SlurmClient:
return True return True
try: try:
job_state = response.json()["jobs"][0]["job_state"] job_state = response.json()["jobs"][0]["job_state"]
return job_state == "COMPLETED" or job_state == "FAILED" return job_state == "COMPLETED" or job_state == "FAILED" or job_state == "CANCELLED"
except (KeyError, IndexError): except (KeyError, IndexError):
return True return True
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment