Skip to content
Snippets Groups Projects
Verified Commit 39949c60 authored by Daniel Göbel's avatar Daniel Göbel
Browse files

Implement multiple backoff strategy for monitoring the workflow execution

#60
parent dd8070a1
No related branches found
No related tags found
No related merge requests found
...@@ -14,6 +14,7 @@ variables: ...@@ -14,6 +14,7 @@ variables:
ACTIVE_WORKFLOW_EXECUTION_LIMIT: 3 ACTIVE_WORKFLOW_EXECUTION_LIMIT: 3
DEV_SYSTEM: "True" DEV_SYSTEM: "True"
SLURM_JOB_STATUS_CHECK_INTERVAL: 0 SLURM_JOB_STATUS_CHECK_INTERVAL: 0
SLURM_JOB_MONITORING: "NOMONITORING"
cache: cache:
paths: paths:
......
...@@ -25,24 +25,24 @@ This is the Workflow service of the CloWM service. ...@@ -25,24 +25,24 @@ This is the Workflow service of the CloWM service.
### Optional Variables ### Optional Variables
| Variable | Default | Value | Description | | Variable | Default | Value | Description |
|-----------------------------------|-------------------------|-----------------------------|--------------------------------------------------------------------------------------------------------------------------------------------| |-----------------------------------|-------------------------|--------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------|
| `API_PREFIX` | `/api/workflow-service` | URL path | Prefix before every URL path | | `API_PREFIX` | `/api/workflow-service` | URL path | Prefix before every URL path |
| `BACKEND_CORS_ORIGINS` | `[]` | json formatted list of urls | List of valid CORS origins | | `BACKEND_CORS_ORIGINS` | `[]` | json formatted list of urls | List of valid CORS origins |
| `SQLALCHEMY_VERBOSE_LOGGER` | `False` | `<"True"&#x7c;"False">` | Enables verbose SQL output.<br>Should be `false` in production | | `SQLALCHEMY_VERBOSE_LOGGER` | `False` | `<"True"&#x7c;"False">` | Enables verbose SQL output.<br>Should be `false` in production |
| `PARAMS_BUCKET` | `nxf-params` | Bucket Name | Bucket where the nextflow configurations for each execution should be saved | | `PARAMS_BUCKET` | `nxf-params` | Bucket Name | Bucket where the nextflow configurations for each execution should be saved |
| `WORKFLOW_BUCKET` | `clowm-workflows` | Bucket Name | Bucket where to save important workflow files | | `WORKFLOW_BUCKET` | `clowm-workflows` | Bucket Name | Bucket where to save important workflow files |
| `ICON_BUCKET` | `clowm-icons` | Bucket name | Bucket where to save workflow icons. Should be publicly available. | | `ICON_BUCKET` | `clowm-icons` | Bucket name | Bucket where to save workflow icons. Should be publicly available. |
| `SLURM_USER` | `slurm` | string | User on the slurm cluster who should run the job. Should be the user of the `SLURM_TOKEN` | | `SLURM_USER` | `slurm` | string | User on the slurm cluster who should run the job. Should be the user of the `SLURM_TOKEN` |
| `PARAMS_BUCKET_MOUNT_PATH` | `/mnt/params-bucket` | Path on slurm cluster | Folder where the S3 bucket `PARAMS_BUCKET` will be mounted on the slurm cluster | | `PARAMS_BUCKET_MOUNT_PATH` | `/mnt/params-bucket` | Path on slurm cluster | Folder where the S3 bucket `PARAMS_BUCKET` will be mounted on the slurm cluster |
| `NX_CONFIG` | unset | Path on slurm cluster | Configuration file on the slurm cluster that is the same for every nextflow run | | `NX_CONFIG` | unset | Path on slurm cluster | Configuration file on the slurm cluster that is the same for every nextflow run |
| `NX_BIN` | `nextflow` | Path on slurm cluster | Path to the nextflow executable. Default it is in the `PATH` | | `NX_BIN` | `nextflow` | Path on slurm cluster | Path to the nextflow executable. Default it is in the `PATH` |
| `SLURM_WORKING_DIRECTORY` | `/tmp` | Path on slurm cluster | Working directory for the slurm job with the nextflow command | | `SLURM_WORKING_DIRECTORY` | `/tmp` | Path on slurm cluster | Working directory for the slurm job with the nextflow command |
| `ACTIVE_WORKFLOW_EXECUTION_LIMIT` | `3` | Integer | Limit of active workflow execution a user is allowed to have. `-1` means infinite. | | `ACTIVE_WORKFLOW_EXECUTION_LIMIT` | `3` | Integer | Limit of active workflow execution a user is allowed to have. `-1` means infinite. |
| `DEV_SYSTEM` | `False` | `<"True"&#x7c;"False">` | Activates an endpoint that allows execution of an workflow from an arbitrary Git Repository.<br>HAS TO BE `False` in PRODUCTION! | | `DEV_SYSTEM` | `False` | `<"True"&#x7c;"False">` | Activates an endpoint that allows execution of an workflow from an arbitrary Git Repository.<br>HAS TO BE `False` in PRODUCTION! |
| `OPA_POLICY_PATH` | `/clowm/authz/allow` | URL path | Path to the OPA Policy for Authorization | | `OPA_POLICY_PATH` | `/clowm/authz/allow` | URL path | Path to the OPA Policy for Authorization |
| `SLURM_JOB_STATUS_CHECK_INTERVAL` | 30 | integer (seconds) | Interval for checking the slurm jobs status after starting a workflow execution in seconds. If 0, then workflow execution is not monitored | | `SLURM_JOB_MONITORING` | `EXPONENTIAL` | `EXPONENTIAL,LINEAR<CONSTANT<NOMONITORING` | Strategy for polling the slurm job status for monitoring the workflow execution. |
| `OTLP_GRPC_ENDPOINT` | unset | <hostname / IP> | OTLP compatible endpoint to send traces via gRPC, e.g. Jaeger | | `OTLP_GRPC_ENDPOINT` | unset | <hostname / IP> | OTLP compatible endpoint to send traces via gRPC, e.g. Jaeger |
### Nextflow Variables ### Nextflow Variables
......
...@@ -15,9 +15,9 @@ from app.ceph.rgw import s3_resource ...@@ -15,9 +15,9 @@ from app.ceph.rgw import s3_resource
from app.core.config import settings from app.core.config import settings
from app.core.security import decode_token, request_authorization from app.core.security import decode_token, request_authorization
from app.crud import CRUDUser, CRUDWorkflow, CRUDWorkflowExecution, CRUDWorkflowVersion from app.crud import CRUDUser, CRUDWorkflow, CRUDWorkflowExecution, CRUDWorkflowVersion
from app.otlp import start_as_current_span_async
from app.schemas.security import JWT, AuthzRequest, AuthzResponse from app.schemas.security import JWT, AuthzRequest, AuthzResponse
from app.slurm.slurm_rest_client import SlurmClient from app.slurm.slurm_rest_client import SlurmClient
from app.utils.otlp import start_as_current_span_async
if TYPE_CHECKING: if TYPE_CHECKING:
from mypy_boto3_s3.service_resource import S3ServiceResource from mypy_boto3_s3.service_resource import S3ServiceResource
......
...@@ -11,10 +11,10 @@ from app.core.config import settings ...@@ -11,10 +11,10 @@ from app.core.config import settings
from app.crud import CRUDWorkflow, CRUDWorkflowVersion from app.crud import CRUDWorkflow, CRUDWorkflowVersion
from app.crud.crud_workflow_mode import CRUDWorkflowMode from app.crud.crud_workflow_mode import CRUDWorkflowMode
from app.git_repository import GitHubRepository, build_repository from app.git_repository import GitHubRepository, build_repository
from app.otlp import start_as_current_span_async
from app.schemas.workflow import WorkflowIn, WorkflowOut, WorkflowStatistic, WorkflowUpdate from app.schemas.workflow import WorkflowIn, WorkflowOut, WorkflowStatistic, WorkflowUpdate
from app.schemas.workflow_version import WorkflowVersion as WorkflowVersionSchema from app.schemas.workflow_version import WorkflowVersion as WorkflowVersionSchema
from app.scm import SCM, Provider from app.scm import SCM, Provider
from app.utils.otlp import start_as_current_span_async
router = APIRouter(prefix="/workflows", tags=["Workflow"]) router = APIRouter(prefix="/workflows", tags=["Workflow"])
workflow_authorization = AuthorizationDependency(resource="workflow") workflow_authorization = AuthorizationDependency(resource="workflow")
......
...@@ -9,9 +9,9 @@ from app.core.config import settings ...@@ -9,9 +9,9 @@ from app.core.config import settings
from app.crud.crud_workflow import CRUDWorkflow from app.crud.crud_workflow import CRUDWorkflow
from app.crud.crud_workflow_version import CRUDWorkflowVersion from app.crud.crud_workflow_version import CRUDWorkflowVersion
from app.git_repository import GitHubRepository, build_repository from app.git_repository import GitHubRepository, build_repository
from app.otlp import start_as_current_span_async
from app.schemas.workflow import WorkflowCredentialsIn, WorkflowCredentialsOut from app.schemas.workflow import WorkflowCredentialsIn, WorkflowCredentialsOut
from app.scm import SCM, Provider from app.scm import SCM, Provider
from app.utils.otlp import start_as_current_span_async
router = APIRouter(prefix="/workflows/{wid}/credentials", tags=["Workflow Credentials"]) router = APIRouter(prefix="/workflows/{wid}/credentials", tags=["Workflow Credentials"])
workflow_authorization = AuthorizationDependency(resource="workflow") workflow_authorization = AuthorizationDependency(resource="workflow")
......
...@@ -26,10 +26,10 @@ from app.api.utils import ( ...@@ -26,10 +26,10 @@ from app.api.utils import (
from app.core.config import settings from app.core.config import settings
from app.crud import CRUDWorkflowExecution, CRUDWorkflowVersion from app.crud import CRUDWorkflowExecution, CRUDWorkflowVersion
from app.git_repository import GitHubRepository, build_repository from app.git_repository import GitHubRepository, build_repository
from app.otlp import start_as_current_span_async
from app.schemas.workflow_execution import DevWorkflowExecutionIn, WorkflowExecutionIn, WorkflowExecutionOut from app.schemas.workflow_execution import DevWorkflowExecutionIn, WorkflowExecutionIn, WorkflowExecutionOut
from app.scm import SCM, Provider from app.scm import SCM, Provider
from app.slurm.slurm_rest_client import SlurmClient from app.slurm.slurm_rest_client import SlurmClient
from app.utils.otlp import start_as_current_span_async
router = APIRouter(prefix="/workflow_executions", tags=["Workflow Execution"]) router = APIRouter(prefix="/workflow_executions", tags=["Workflow Execution"])
workflow_authorization = AuthorizationDependency(resource="workflow_execution") workflow_authorization = AuthorizationDependency(resource="workflow_execution")
......
...@@ -6,8 +6,8 @@ from opentelemetry import trace ...@@ -6,8 +6,8 @@ from opentelemetry import trace
from app.api.dependencies import AuthorizationDependency, DBSession from app.api.dependencies import AuthorizationDependency, DBSession
from app.crud.crud_workflow_mode import CRUDWorkflowMode from app.crud.crud_workflow_mode import CRUDWorkflowMode
from app.otlp import start_as_current_span_async
from app.schemas.workflow_mode import WorkflowModeOut from app.schemas.workflow_mode import WorkflowModeOut
from app.utils.otlp import start_as_current_span_async
router = APIRouter(prefix="/workflow_modes", tags=["Workflow Mode"]) router = APIRouter(prefix="/workflow_modes", tags=["Workflow Mode"])
workflow_authorization = AuthorizationDependency(resource="workflow") workflow_authorization = AuthorizationDependency(resource="workflow")
......
...@@ -20,10 +20,10 @@ from app.api.utils import delete_remote_icon, upload_icon ...@@ -20,10 +20,10 @@ from app.api.utils import delete_remote_icon, upload_icon
from app.core.config import settings from app.core.config import settings
from app.crud import CRUDWorkflowVersion from app.crud import CRUDWorkflowVersion
from app.git_repository import build_repository from app.git_repository import build_repository
from app.otlp import start_as_current_span_async
from app.schemas.workflow_version import IconUpdateOut from app.schemas.workflow_version import IconUpdateOut
from app.schemas.workflow_version import WorkflowVersion as WorkflowVersionSchema from app.schemas.workflow_version import WorkflowVersion as WorkflowVersionSchema
from app.schemas.workflow_version import WorkflowVersionStatus from app.schemas.workflow_version import WorkflowVersionStatus
from app.utils.otlp import start_as_current_span_async
router = APIRouter(prefix="/{wid}/versions", tags=["Workflow Version"]) router = APIRouter(prefix="/{wid}/versions", tags=["Workflow Version"])
workflow_authorization = AuthorizationDependency(resource="workflow") workflow_authorization = AuthorizationDependency(resource="workflow")
......
...@@ -16,12 +16,13 @@ from opentelemetry import trace ...@@ -16,12 +16,13 @@ from opentelemetry import trace
from PIL import Image, UnidentifiedImageError from PIL import Image, UnidentifiedImageError
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import settings from app.core.config import MonitorJobBackoffStrategy, settings
from app.crud import CRUDBucket, CRUDWorkflowExecution, CRUDWorkflowVersion from app.crud import CRUDBucket, CRUDWorkflowExecution, CRUDWorkflowVersion
from app.git_repository.abstract_repository import GitRepository from app.git_repository.abstract_repository import GitRepository
from app.schemas.workflow_mode import WorkflowModeIn from app.schemas.workflow_mode import WorkflowModeIn
from app.scm import SCM from app.scm import SCM
from app.slurm.slurm_rest_client import SlurmClient from app.slurm.slurm_rest_client import SlurmClient
from app.utils.backoff_strategy import BackoffStrategy, ExponentialBackoff, LinearBackoff, NoBackoff
if TYPE_CHECKING: if TYPE_CHECKING:
from mypy_boto3_s3.service_resource import S3ServiceResource from mypy_boto3_s3.service_resource import S3ServiceResource
...@@ -222,7 +223,7 @@ async def start_workflow_execution( ...@@ -222,7 +223,7 @@ async def start_workflow_execution(
await CRUDWorkflowExecution.update_slurm_job_id( await CRUDWorkflowExecution.update_slurm_job_id(
db, slurm_job_id=slurm_job_id, execution_id=execution.execution_id db, slurm_job_id=slurm_job_id, execution_id=execution.execution_id
) )
if settings.SLURM_JOB_STATUS_CHECK_INTERVAL > 0: # pragma: no cover if not settings.SLURM_JOB_MONITORING == MonitorJobBackoffStrategy.NOMONITORING: # pragma: no cover
await _monitor_proper_job_execution( await _monitor_proper_job_execution(
db=db, slurm_client=slurm_client, execution_id=execution.execution_id, slurm_job_id=slurm_job_id db=db, slurm_client=slurm_client, execution_id=execution.execution_id, slurm_job_id=slurm_job_id
) )
...@@ -252,8 +253,21 @@ async def _monitor_proper_job_execution( ...@@ -252,8 +253,21 @@ async def _monitor_proper_job_execution(
ID of the slurm job to monitor ID of the slurm job to monitor
""" """
previous_span_link = None previous_span_link = None
while True: if settings.SLURM_JOB_MONITORING == MonitorJobBackoffStrategy.EXPONENTIAL:
await async_sleep(settings.SLURM_JOB_STATUS_CHECK_INTERVAL) # exponential to 50 minutes
sleep_generator: BackoffStrategy = ExponentialBackoff(initial_delay=30, max_value=300) # type: ignore[no-redef]
elif settings.SLURM_JOB_MONITORING == MonitorJobBackoffStrategy.LINEAR:
# 5 seconds increase to 5 minutes
sleep_generator: BackoffStrategy = LinearBackoff( # type: ignore[no-redef]
initial_delay=30, backoff=5, max_value=300
)
elif settings.SLURM_JOB_MONITORING == MonitorJobBackoffStrategy.CONSTANT:
# constant 30 seconds polling
sleep_generator: BackoffStrategy = NoBackoff(initial_delay=30, constant_value=30) # type: ignore[no-redef]
else:
return
for sleep_time in sleep_generator:
await async_sleep(sleep_time)
with tracer.start_span("monitor_job", links=previous_span_link) as span: with tracer.start_span("monitor_job", links=previous_span_link) as span:
span.set_attributes({"execution_id": str(execution_id), "slurm_job_id": slurm_job_id}) span.set_attributes({"execution_id": str(execution_id), "slurm_job_id": slurm_job_id})
if await slurm_client.is_job_finished(slurm_job_id): if await slurm_client.is_job_finished(slurm_job_id):
...@@ -267,7 +281,7 @@ async def _monitor_proper_job_execution( ...@@ -267,7 +281,7 @@ async def _monitor_proper_job_execution(
await CRUDWorkflowExecution.cancel( await CRUDWorkflowExecution.cancel(
db, execution_id=execution_id, status=WorkflowExecution.WorkflowExecutionStatus.ERROR db, execution_id=execution_id, status=WorkflowExecution.WorkflowExecutionStatus.ERROR
) )
break sleep_generator.close()
previous_span_link = [trace.Link(span.get_span_context())] previous_span_link = [trace.Link(span.get_span_context())]
......
from enum import Enum
from pathlib import Path from pathlib import Path
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
...@@ -28,6 +29,13 @@ def _load_public_key(pub_key_val: Optional[str], pub_key_file: Optional[Path]) - ...@@ -28,6 +29,13 @@ def _load_public_key(pub_key_val: Optional[str], pub_key_file: Optional[Path]) -
return pub_key return pub_key
class MonitorJobBackoffStrategy(str, Enum):
LINEAR = "LINEAR"
EXPONENTIAL = "EXPONENTIAL"
CONSTANT = "CONSTANT"
NOMONITORING = "NOMONITORING"
class Settings(BaseSettings): class Settings(BaseSettings):
API_PREFIX: str = Field("/api/workflow-service", description="Path Prefix for all API endpoints.") API_PREFIX: str = Field("/api/workflow-service", description="Path Prefix for all API endpoints.")
...@@ -105,11 +113,9 @@ class Settings(BaseSettings): ...@@ -105,11 +113,9 @@ class Settings(BaseSettings):
"/tmp", description="Working directory for the slurm job with the nextflow command" "/tmp", description="Working directory for the slurm job with the nextflow command"
) )
ACTIVE_WORKFLOW_EXECUTION_LIMIT: int = Field(3, description="The limit of active workflow executions per user.") ACTIVE_WORKFLOW_EXECUTION_LIMIT: int = Field(3, description="The limit of active workflow executions per user.")
SLURM_JOB_STATUS_CHECK_INTERVAL: int = Field( SLURM_JOB_MONITORING: MonitorJobBackoffStrategy = Field(
30, MonitorJobBackoffStrategy.EXPONENTIAL,
ge=0, description="Strategy for polling the slurm job status for monitoring the workflow execution.",
le=600,
description="Interval for checking the slurm jobs status after starting a workflow execution in seconds. If 0, then workflow execution is not monitored",
) )
DEV_SYSTEM: bool = Field(False, description="Open a endpoint where to execute arbitrary workflows.") DEV_SYSTEM: bool = Field(False, description="Open a endpoint where to execute arbitrary workflows.")
OTLP_GRPC_ENDPOINT: Optional[str] = Field( OTLP_GRPC_ENDPOINT: Optional[str] = Field(
......
...@@ -5,7 +5,7 @@ from opentelemetry import trace ...@@ -5,7 +5,7 @@ from opentelemetry import trace
from sqlalchemy import func, or_, select from sqlalchemy import func, or_, select
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from app.otlp import start_as_current_span_async from app.utils.otlp import start_as_current_span_async
tracer = trace.get_tracer_provider().get_tracer(__name__) tracer = trace.get_tracer_provider().get_tracer(__name__)
......
from math import ceil, floor, log2
from app.utils.backoff_strategy import ExponentialBackoff, LinearBackoff, NoBackoff
class TestExponentialBackoffStrategy:
def test_exponential_without_initial_delay(self) -> None:
"""
Test generating a bounded exponential backoff strategy series in seconds without an initial delay.
"""
for max_val in range(1023, 1026): # numbers around a power of 2
index_at_maximum = floor(log2(max_val))
sleep_generator = ExponentialBackoff(initial_delay=0, max_value=max_val)
for i, sleep in enumerate(sleep_generator):
if i < index_at_maximum:
assert sleep == 2 ** (i + 1)
elif i == index_at_maximum:
assert sleep == max_val
elif i == index_at_maximum + 1:
assert sleep == max_val
sleep_generator.close()
else:
assert False, "Iteration should have stopped"
def test_unbounded_exponential_without_initial_delay(self) -> None:
"""
Test generating an unbounded exponential backoff strategy series in seconds without an initial delay
"""
sleep_generator = ExponentialBackoff(initial_delay=0, max_value=-1)
for i, sleep in enumerate(sleep_generator):
assert sleep == 2 ** (i + 1)
if i == 20:
sleep_generator.close()
elif i > 20:
assert False, "Iteration should have stopped"
def test_exponential_with_initial_delay(self) -> None:
"""
Test generating a bounded exponential backoff strategy series in seconds with an initial delay
"""
for max_val in range(1023, 1026): # numbers around a power of 2
index_at_maximum = ceil(log2(max_val))
sleep_generator = ExponentialBackoff(initial_delay=30, max_value=max_val)
for i, sleep in enumerate(sleep_generator):
if i == 0:
assert sleep == 30
elif i < index_at_maximum:
assert sleep == 2**i
elif i == index_at_maximum:
assert sleep == max_val
elif i == index_at_maximum + 1:
assert sleep == max_val
sleep_generator.close()
else:
assert False, "Iteration should have stopped"
class TestLinearBackoffStrategy:
def test_linear_without_initial_delay(self) -> None:
"""
Test generating a bounded linear backoff strategy series in seconds without an initial delay
"""
linear_backoff = 5
repetition_factor = 5
for max_val in range((linear_backoff * repetition_factor) - 1, (linear_backoff * repetition_factor) + 2):
index_at_maximum = max_val // linear_backoff
sleep_generator = LinearBackoff(initial_delay=0, backoff=linear_backoff, max_value=max_val)
for i, sleep in enumerate(sleep_generator):
if i < index_at_maximum:
assert sleep == linear_backoff * (i + 1)
elif i == index_at_maximum:
assert sleep == max_val
elif i == index_at_maximum + 1:
assert sleep == max_val
sleep_generator.close()
else:
assert False, "Iteration should have stopped"
def test_unbounded_linear_without_initial_delay(self) -> None:
"""
Test generating an unbounded linear backoff strategy series in seconds without an initial delay
"""
sleep_generator = LinearBackoff(initial_delay=0, backoff=6, max_value=-1)
for i, sleep in enumerate(sleep_generator):
assert sleep == 6 * (i + 1)
if i == 200:
sleep_generator.close()
elif i > 200:
assert False, "Iteration should have stopped"
def test_linear_with_initial_delay(self) -> None:
"""
Test generating a bounded linear backoff strategy series in seconds with an initial delay
"""
linear_backoff = 5
repetition_factor = 5
for max_val in range((linear_backoff * repetition_factor) - 1, (linear_backoff * repetition_factor) + 2):
index_at_maximum = (max_val // linear_backoff) + 1
sleep_generator = LinearBackoff(initial_delay=30, backoff=linear_backoff, max_value=max_val)
for i, sleep in enumerate(sleep_generator):
if i == 0:
assert sleep == 30
elif i < index_at_maximum:
assert sleep == linear_backoff * i
elif i == index_at_maximum:
assert sleep == max_val
elif i == index_at_maximum + 1:
assert sleep == max_val
sleep_generator.close()
else:
assert False, "Iteration should have stopped"
class TestNoBackoffStrategy:
def test_no_backoff_without_initial_delay(self) -> None:
"""
Test generating no backoff strategy series in seconds without an initial delay
"""
sleep_generator = NoBackoff(initial_delay=0, constant_value=40)
for i, sleep in enumerate(sleep_generator):
assert sleep == 40
if i == 20:
sleep_generator.close()
elif i > 20:
assert False, "Iteration should have stopped"
def test_no_backoff_with_initial_delay(self) -> None:
"""
Test generating no backoff strategy series in seconds with an initial delay
"""
sleep_generator = NoBackoff(initial_delay=20, constant_value=40)
for i, sleep in enumerate(sleep_generator):
if i == 0:
assert sleep == 20
else:
assert sleep == 40
if i == 20:
sleep_generator.close()
elif i > 20:
assert False, "Iteration should have stopped"
from abc import ABC, abstractmethod
from collections.abc import Generator
from types import TracebackType
from typing import Any, Type
class BackoffStrategy(ABC, Generator):
def __init__(self, initial_delay: int = 0):
"""
Initialize the class BackoffStrategy
Parameters
----------
initial_delay : int, default 0
The initial delay in seconds that should be emitted first. if smaller than 1 then don't emit this value.
"""
self._current_val = 0
self._delay = initial_delay
self._delay_first_iteration = initial_delay > 0
self._iteration = 0
self._stop_next = False
def throw( # type: ignore
self, __typ: Type[BaseException], __val: BaseException | object = "", __tb: TracebackType | None = None
) -> Any: # pragma: no cover
raise __typ(__val, __tb)
def send(self, __value: Any) -> None: # pragma: no cover
pass
def close(self) -> None:
"""
Stop the iteration before the next turn.
"""
self._stop_next = True
def __iter__(self) -> "BackoffStrategy":
return self
@abstractmethod
def _compute_next_value(self, iteration: int) -> int:
"""
Compute the next value in the infinite series for this backoff strategy.
Returns
-------
val : int
Next value to emit.
"""
...
def __next__(self) -> int:
"""
Emits the next value for this Generator.
If there is an initial delay, it will be emitted first.
Returns
-------
val : int
Next value in the iteration
"""
if self._stop_next:
raise StopIteration
self._iteration += 1
if self._delay_first_iteration:
self._delay_first_iteration = False
self._iteration -= 1
return self._delay
self._current_val = self._compute_next_value(self._iteration)
return self._current_val
def __str__(self) -> str:
return f"BackoffStrategyGenerator(iterations={self._iteration}, current_val={self._current_val})"
def __repr__(self) -> str:
return str(self)
class ExponentialBackoff(BackoffStrategy):
"""
An exponential Backoff strategy based on the power of two. The generated values should be put into a sleep function.
"""
def __init__(self, initial_delay: int = 0, max_value: int = 300):
"""
Initialize the exponential BackoffStrategy class
Parameters
----------
initial_delay : int, default 0
The initial delay in seconds that should be emitted first. if smaller than 1 then don't emit this value.
max_value : int, default 300
The maximum this generator can emit. If smaller than 1 then this series is unbounded.
"""
super().__init__(initial_delay=initial_delay)
self.max_value = max_value
self._reached_max = False
def _compute_next_value(self, iteration: int) -> int:
if self._reached_max:
return self._current_val
next_val = 2 << (iteration - 1)
if 0 < self.max_value < next_val:
self._reached_max = True
return self.max_value
return next_val
class NoBackoff(BackoffStrategy):
"""
No Backoff strategy. It always emits a constant value. The generated values should be put into a sleep function.
"""
def __init__(self, initial_delay: int = 0, constant_value: int = 30):
"""
Initialize the no BackoffStrategy class
Parameters
----------
initial_delay : int, default 0
The initial delay in seconds that should be emitted first. if smaller than 1 then don't emit this value.
constant_value : int, default 30
The constant value this generator should emit.
"""
super().__init__(initial_delay=initial_delay)
self._val = constant_value
def _compute_next_value(self, iteration: int) -> int:
return self._val
class LinearBackoff(BackoffStrategy):
"""
A linear Backoff strategy. The generated values should be put into a sleep function.
"""
def __init__(self, initial_delay: int = 0, backoff: int = 5, max_value: int = 300):
"""
Initialize the linear BackoffStrategy class
Parameters
----------
initial_delay : int, default 0
The initial delay in seconds that should be emitted first. if smaller than 1 then don't emit this value.
backoff : int, default 5
The linear factor that is added each iteration.
max_value : int, default 300
The maximum this generator can emit. If smaller than 1 then this series is unbounded.
"""
super().__init__(initial_delay=initial_delay)
self.max_value = max_value
self._backoff = backoff
self._reached_max = False
def _compute_next_value(self, iteration: int) -> int:
if self._reached_max:
return self._current_val
next_val = self._current_val + self._backoff
if 0 < self.max_value < next_val:
self._reached_max = True
return self.max_value
return next_val
File moved
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment