Skip to content
Snippets Groups Projects
Verified Commit b3795ce6 authored by Daniel Göbel's avatar Daniel Göbel
Browse files

Initial commit with barebone structure for service

parents
No related branches found
No related tags found
No related merge requests found
Showing
with 777 additions and 0 deletions
.idea/
*/__pycache__/
.env
.venv
env/
venv/
ENV/
README.md
.pytest_cache
.mypy_cache
.ruff_cache
htmlcov
app/tests
figures/
.git
.idea/
__pycache__/
.env
.venv
env/
venv/
ENV/
.coverage
image: ${CI_DEPENDENCY_PROXY_DIRECT_GROUP_IMAGE_PREFIX}/python:3.11-slim
variables:
PIP_CACHE_DIR: "$CI_PROJECT_DIR/.cache/pip"
PYTHONPATH: "$CI_PROJECT_DIR"
OBJECT_GATEWAY_URI: "http://127.0.0.1:8001"
BUCKET_CEPH_ACCESS_KEY: ""
BUCKET_CEPH_SECRET_KEY: ""
FF_NETWORK_PER_BUILD: 1
PUBLIC_KEY_VALUE: "empty"
OPA_URI: "http://127.0.0.1:8181"
SLURM_TOKEN: "empty"
SLURM_ENDPOINT: "http://127.0.0.1:8002"
cache:
paths:
- .cache/pip
- venv/
default:
tags:
- docker
before_script:
- python --version # For debugging
- pip install virtualenv
- virtualenv venv
- source venv/bin/activate
- python -m pip install --upgrade -r requirements.txt -r requirements-dev.txt
stages: # List of stages for jobs, and their order of execution
- test
- deploy
integration-test-job: # Runs integration tests with the database
stage: test
variables:
DB_PASSWORD: "$TEST_DB_PASSWORD"
DB_USER: "test_api_user"
DB_DATABASE: "integration-test-db"
DB_HOST: "integration-test-db"
services:
- name: ${CI_DEPENDENCY_PROXY_DIRECT_GROUP_IMAGE_PREFIX}/mysql:8
alias: integration-test-db
variables:
MYSQL_RANDOM_ROOT_PASSWORD: "yes"
MYSQL_DATABASE: "$DB_DATABASE"
MYSQL_USER: "$DB_USER"
MYSQL_PASSWORD: "$DB_PASSWORD"
- name: $CI_REGISTRY/cmg/clowm/clowm-database:v2.3
alias: upgrade-db
script:
- python app/check_database_connection.py
- pytest --junitxml=integration-report.xml --cov=app --cov-report=term-missing app/tests/crud
- mkdir coverage-integration
- mv .coverage coverage-integration
artifacts:
paths:
- $CI_PROJECT_DIR/coverage-integration/.coverage
reports:
junit: $CI_PROJECT_DIR/integration-report.xml
e2e-test-job: # Runs e2e tests on the API endpoints
stage: test
variables:
DB_PASSWORD: "$TEST_DB_PASSWORD"
DB_USER: "test_api_user"
DB_DATABASE: "e2e-test-db"
DB_HOST: "e2e-test-db"
services:
- name: ${CI_DEPENDENCY_PROXY_DIRECT_GROUP_IMAGE_PREFIX}/mysql:8
alias: e2e-test-db
variables:
MYSQL_RANDOM_ROOT_PASSWORD: "yes"
MYSQL_DATABASE: "$DB_DATABASE"
MYSQL_USER: "$DB_USER"
MYSQL_PASSWORD: "$DB_PASSWORD"
- name: $CI_REGISTRY/cmg/clowm/clowm-database:v2.3
alias: upgrade-db
script:
- python app/check_database_connection.py
- pytest --junitxml=e2e-report.xml --cov=app --cov-report=term-missing app/tests/api
- mkdir coverage-e2e
- mv .coverage coverage-e2e
artifacts:
paths:
- $CI_PROJECT_DIR/coverage-e2e/.coverage
reports:
junit: $CI_PROJECT_DIR/e2e-report.xml
unit-test-job: # Runs unit tests
stage: test
variables:
DB_PASSWORD: "random"
DB_USER: "random"
DB_DATABASE: "random"
DB_HOST: "random"
script:
- pytest --junitxml=unit-report.xml --cov=app --cov-report=term-missing app/tests/unit
- mkdir coverage-unit
- mv .coverage coverage-unit
artifacts:
paths:
- $CI_PROJECT_DIR/coverage-unit/.coverage
reports:
junit: $CI_PROJECT_DIR/unit-report.xml
combine-test-coverage-job: # Combine coverage reports from different test jobs
stage: test
needs:
- job: "e2e-test-job"
artifacts: true
- job: "integration-test-job"
artifacts: true
- job: "unit-test-job"
artifacts: true
script:
- coverage combine coverage-e2e/.coverage coverage-integration/.coverage coverage-unit/.coverage
- coverage report
- coverage xml --data-file=$CI_PROJECT_DIR/.coverage -o coverage.xml
coverage: '/(?i)total.*? (100(?:\.0+)?\%|[1-9]?\d(?:\.\d+)?\%)$/'
artifacts:
reports:
coverage_report:
coverage_format: cobertura
path: $CI_PROJECT_DIR/coverage.xml
lint-test-job: # Runs linters checks on code
stage: test
script:
- ./scripts/lint.sh
publish-dev-docker-container-job:
stage: deploy
image:
name: gcr.io/kaniko-project/executor:v1.17.0-debug
entrypoint: [""]
dependencies: []
only:
refs:
- development
before_script:
- echo "{\"auths\":{\"${CI_REGISTRY}\":{\"auth\":\"$(printf "%s:%s" "${CI_REGISTRY_USER}" "${CI_REGISTRY_PASSWORD}" | base64 | tr -d '\n')\"},\"$CI_DEPENDENCY_PROXY_SERVER\":{\"auth\":\"$(printf "%s:%s" ${CI_DEPENDENCY_PROXY_USER} "${CI_DEPENDENCY_PROXY_PASSWORD}" | base64 | tr -d '\n')\"}}}" > /kaniko/.docker/config.json
script:
- /kaniko/executor
--context "${CI_PROJECT_DIR}"
--dockerfile "${CI_PROJECT_DIR}/Dockerfile"
--destination "${CI_REGISTRY_IMAGE}:dev-${CI_COMMIT_SHA}"
--destination "${CI_REGISTRY_IMAGE}:dev-latest"
- /kaniko/executor
--context "${CI_PROJECT_DIR}"
--dockerfile "${CI_PROJECT_DIR}/Dockerfile-Gunicorn"
--destination "${CI_REGISTRY_IMAGE}:dev-${CI_COMMIT_SHA}-gunicorn"
--destination "${CI_REGISTRY_IMAGE}:dev-latest-gunicorn"
publish-docker-container-job:
stage: deploy
image:
name: gcr.io/kaniko-project/executor:v1.17.0-debug
entrypoint: [""]
dependencies: []
only:
- tags
before_script:
- echo "{\"auths\":{\"${CI_REGISTRY}\":{\"auth\":\"$(printf "%s:%s" "${CI_REGISTRY_USER}" "${CI_REGISTRY_PASSWORD}" | base64 | tr -d '\n')\"},\"$CI_DEPENDENCY_PROXY_SERVER\":{\"auth\":\"$(printf "%s:%s" ${CI_DEPENDENCY_PROXY_USER} "${CI_DEPENDENCY_PROXY_PASSWORD}" | base64 | tr -d '\n')\"}}}" > /kaniko/.docker/config.json
script:
- /kaniko/executor
--context "${CI_PROJECT_DIR}"
--dockerfile "${CI_PROJECT_DIR}/Dockerfile"
--destination "${CI_REGISTRY_IMAGE}:${CI_COMMIT_TAG}"
--destination "${CI_REGISTRY_IMAGE}:$(echo ${CI_COMMIT_TAG} | cut -d'.' -f1-2)"
--destination "${CI_REGISTRY_IMAGE}:$(echo ${CI_COMMIT_TAG} | cut -d'.' -f1)"
--destination "${CI_REGISTRY_IMAGE}:latest"
- /kaniko/executor
--context "${CI_PROJECT_DIR}"
--dockerfile "${CI_PROJECT_DIR}/Dockerfile-Gunicorn"
--destination "${CI_REGISTRY_IMAGE}:${CI_COMMIT_TAG}-gunicorn"
--destination "${CI_REGISTRY_IMAGE}:$(echo ${CI_COMMIT_TAG} | cut -d'.' -f1-2)-gunicorn"
--destination "${CI_REGISTRY_IMAGE}:$(echo ${CI_COMMIT_TAG} | cut -d'.' -f1)-gunicorn"
--destination "${CI_REGISTRY_IMAGE}:latest-gunicorn"
# See https://pre-commit.com for more information
# See https://pre-commit.com/hooks.html for more hooks
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.5.0
hooks:
- id: end-of-file-fixer
- id: check-added-large-files
- id: check-toml
- id: check-docstring-first
- id: detect-private-key
- id: trailing-whitespace
- id: check-yaml
- id: debug-statements
- id: check-merge-conflict
- id: check-ast
- repo: https://github.com/psf/black
rev: 23.11.0
hooks:
- id: black
files: app
args: [--check]
- repo: https://github.com/charliermarsh/ruff-pre-commit
rev: 'v0.1.7'
hooks:
- id: ruff
- repo: https://github.com/PyCQA/isort
rev: 5.12.0
hooks:
- id: isort
files: app
args: [-c]
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.7.1
hooks:
- id: mypy
files: app
args: [--config=pyproject.toml]
additional_dependencies:
- types-aiobotocore-lite[s3]>=2.8.0,<2.9.0
- sqlalchemy>=2.0.0.<2.1.0
- pydantic<2.6.0
- types-requests
FROM python:3.11-slim
EXPOSE 8000
# dumb-init forwards the kill signal to the python process
RUN apt-get update && apt-get -y install dumb-init curl
ENTRYPOINT ["/usr/bin/dumb-init", "--"]
HEALTHCHECK --interval=30s --timeout=4s CMD curl -f http://localhost:8000/health || exit 1
RUN useradd -m worker
USER worker
WORKDIR /home/worker/code
ENV PYTHONPATH=/home/worker/code
ENV PATH="/home/worker/.local/bin:${PATH}"
COPY --chown=worker:worker requirements.txt ./requirements.txt
RUN pip install --user --no-cache-dir --upgrade -r requirements.txt
COPY --chown=worker:worker . .
CMD ["./start_service.sh"]
FROM tiangolo/uvicorn-gunicorn-fastapi:python3.11-slim
EXPOSE 8000
ENV PORT=8000
RUN pip install --no-cache-dir httpx[cli]
HEALTHCHECK --interval=30s --timeout=4s CMD httpx http://localhost:$PORT/health || exit 1
COPY ./scripts/prestart.sh /app/prestart.sh
COPY ./requirements.txt /app/requirements.txt
RUN pip install --no-cache-dir --upgrade -r requirements.txt
COPY ./mako_templates /app/mako_templates
COPY ./app /app/app
# CloWM Resource Service
## Description
This is the Resource service of the CloWM service.
## Environment Variables
### Mandatory / Recommended Variables
| Variable | Default | Value | Description |
|----------------------------------------|--------------------|---------------------------------|---------------------------------------------------------------------------------------|
| `PUBLIC_KEY_VALUE` / `PUBLIC_KEY_FILE` | randomly generated | Public Key / Path to Public Key | Public part of RSA Key in PEM format to verify JWTs |
| `DB_HOST` | unset | <db hostname / IP> | IP or Hostname Address of DB |
| `DB_PORT` | 3306 | Number | Port of the database |
| `DB_USER` | unset | \<db username> | Username of the database user |
| `DB_PASSWORD` | unset | \<db password> | Password of the database user |
| `DB_DATABASE` | unset | \<db name> | Name of the database |
| `OBJECT_GATEWAY_URI` | unset | HTTP URL | HTTP URL of the Ceph Object Gateway |
| `BUCKET_CEPH_ACCESS_KEY` | unset | \<access key> | Ceph access key with admin privileges |
| `BUCKET_CEPH_SECRET_KEY` | unset | \<secret key> | Ceph secret key with admin privileges |
| `OPA_URI` | unset | HTTP URL | HTTP URL of the OPA service |
| `SLURM_ENDPOINT` | unset | HTTP URL | HTTP URL to communicate with the Slurm cluster |
| `SLURM_TOKEN` | unset | \<JWT> | JWT for communication with the Slurm REST API. Should belong to the user `SLURM_USER` |
### Optional Variables
| Variable | Default | Value | Description |
|-----------------------------------|-------------------------|--------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------|
| `API_PREFIX` | `/api/workflow-service` | URL path | Prefix before every URL path |
| `SQLALCHEMY_VERBOSE_LOGGER` | `False` | `<"True"&#x7c;"False">` | Enables verbose SQL output.<br>Should be `false` in production |
| `RESOURCE_BUCKET` | `clowm-resources` | Bucket name | Bucket where to save workflow icons. Should be publicly available. |
| `SLURM_USER` | `slurm` | string | User on the slurm cluster who should run the job. Should be the user of the `SLURM_TOKEN` |
| `SLURM_WORKING_DIRECTORY` | `/tmp` | Path on slurm cluster | Working directory for the slurm job with the nextflow command |
| `OPA_POLICY_PATH` | `/clowm/authz/allow` | URL path | Path to the OPA Policy for Authorization |
| `OTLP_GRPC_ENDPOINT` | unset | <hostname / IP> | OTLP compatible endpoint to send traces via gRPC, e.g. Jaeger |
from typing import Any, Dict, Union
from fastapi import APIRouter, Depends, status
from app.api.dependencies import decode_bearer_token
from app.api.endpoints import resources
from app.schemas.security import ErrorDetail
alternative_responses: Dict[Union[int, str], Dict[str, Any]] = {
status.HTTP_400_BAD_REQUEST: {
"model": ErrorDetail,
"description": "Error decoding JWT Token",
"content": {"application/json": {"example": {"detail": "Malformed JWT Token"}}},
},
status.HTTP_403_FORBIDDEN: {
"model": ErrorDetail,
"description": "Not authenticated",
"content": {"application/json": {"example": {"detail": "Not authenticated"}}},
},
status.HTTP_404_NOT_FOUND: {
"model": ErrorDetail,
"description": "Entity not Found",
"content": {"application/json": {"example": {"detail": "Entity not found."}}},
},
}
api_router = APIRouter()
api_router.include_router(
resources.router,
dependencies=[Depends(decode_bearer_token)],
responses=alternative_responses,
)
from typing import TYPE_CHECKING, Annotated, AsyncIterator, Awaitable, Callable, Dict
from authlib.jose.errors import BadSignatureError, DecodeError, ExpiredTokenError
from clowmdb.db.session import get_async_session
from clowmdb.models import User
from fastapi import Depends, HTTPException, Request, status
from fastapi.security import HTTPBearer
from fastapi.security.http import HTTPAuthorizationCredentials
from httpx import AsyncClient
from opentelemetry import trace
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import settings
from app.core.security import decode_token, request_authorization
from app.crud import CRUDUser
from app.schemas.security import JWT, AuthzRequest, AuthzResponse
from app.slurm.rest_client import SlurmClient
from app.utils.otlp import start_as_current_span_async
if TYPE_CHECKING:
from types_aiobotocore_s3.service_resource import S3ServiceResource
else:
S3ServiceResource = object
bearer_token = HTTPBearer(description="JWT Header")
tracer = trace.get_tracer_provider().get_tracer(__name__)
async def get_s3_resource(request: Request) -> S3ServiceResource:
return request.app.s3_resource
S3Resource = Annotated[S3ServiceResource, Depends(get_s3_resource)]
async def get_db() -> AsyncIterator[AsyncSession]: # pragma: no cover
"""
Get a Session with the database.
FastAPI Dependency Injection Function.
Returns
-------
db : AsyncIterator[AsyncSession, None]
Async session object with the database
"""
async with get_async_session(
str(settings.SQLALCHEMY_DATABASE_ASYNC_URI), verbose=settings.SQLALCHEMY_VERBOSE_LOGGER
) as db:
yield db
DBSession = Annotated[AsyncSession, Depends(get_db)]
async def get_httpx_client(request: Request) -> AsyncClient: # pragma: no cover
# Fetch open http client from the app
return request.app.requests_client
HTTPClient = Annotated[AsyncClient, Depends(get_httpx_client)]
def get_slurm_client(client: AsyncClient = Depends(get_httpx_client)) -> SlurmClient:
return SlurmClient(client=client)
def get_decode_jwt_function() -> Callable[[str], Dict[str, str]]: # pragma: no cover
"""
Get function to decode and verify the JWT.
This will be injected into the function which will handle the JWT. With this approach, the function to decode and
verify the JWT can be overwritten during tests.
Returns
-------
decode : Callable[[str], Dict[str, str]]
Function to decode & verify the token. raw_token -> claims. Dependency Injection
"""
return decode_token
@start_as_current_span_async("decode_jwt", tracer=tracer)
async def decode_bearer_token(
token: HTTPAuthorizationCredentials = Depends(bearer_token),
decode: Callable[[str], Dict[str, str]] = Depends(get_decode_jwt_function),
db: AsyncSession = Depends(get_db),
) -> JWT:
"""
Get the decoded JWT or reject request if it is not valid or the user doesn't exist.
FastAPI Dependency Injection Function.
Parameters
----------
token : fastapi.security.http.HTTPAuthorizationCredentials
Bearer token sent with the HTTP request. Dependency Injection.
decode : Callable[[str], Dict[str, str]]
Function to decode & verify the token. raw_token -> claims. Dependency Injection
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on. Dependency Injection.
Returns
-------
token : app.schemas.security.JWT
The verified and decoded JWT.
"""
try:
jwt = JWT(**decode(token.credentials), raw_token=token.credentials)
trace.get_current_span().set_attributes({"exp": jwt.exp.isoformat(), "uid": jwt.sub})
await get_current_user(jwt, db) # make sure the user exists
return jwt
except ExpiredTokenError: # pragma: no cover
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="JWT Signature has expired")
except (DecodeError, BadSignatureError):
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Malformed JWT")
class AuthorizationDependency:
"""
Class to parameterize the authorization request with the resource to perform an operation on.
"""
def __init__(self, resource: str):
"""
Parameters
----------
resource : str
Resource parameter for the authorization requests
"""
self.resource = resource
def __call__(
self,
token: JWT = Depends(decode_bearer_token),
client: AsyncClient = Depends(get_httpx_client),
) -> Callable[[str], Awaitable[AuthzResponse]]:
"""
Get the function to request the authorization service with the resource, JWT and HTTP Client already injected.
Parameters
----------
token : app.schemas.security.JWT
The verified and decoded JWT. Dependency Injection.
client : httpx.AsyncClient
HTTP Client with an open connection. Dependency Injection.
Returns
-------
authorization_function : Callable[[str], Awaitable[app.schemas.security.AuthzResponse]]
Async function which ask the Auth service for authorization. It takes the operation as the only input.
"""
async def authorization_wrapper(operation: str) -> AuthzResponse:
params = AuthzRequest(operation=operation, resource=self.resource, uid=token.sub)
return await request_authorization(request_params=params, client=client)
return authorization_wrapper
async def get_current_user(token: JWT = Depends(decode_bearer_token), db: AsyncSession = Depends(get_db)) -> User:
"""
Get the current user from the database based on the JWT.
FastAPI Dependency Injection Function.
Parameters
----------
token : app.schemas.security.JWT
The verified and decoded JWT.
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on. Dependency Injection.
Returns
-------
user : clowmdb.models.User
User associated with the JWT sent with the HTTP request.
"""
user = await CRUDUser.get(db, token.sub)
if user:
return user
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
CurrentUser = Annotated[User, Depends(get_current_user)]
from typing import Annotated, Any, Awaitable, Callable
from fastapi import APIRouter, Depends
from opentelemetry import trace
from app.api.dependencies import AuthorizationDependency
from app.utils.otlp import start_as_current_span_async
router = APIRouter(prefix="/resources", tags=["Bucket"])
resource_authorization = AuthorizationDependency(resource="resource")
Authorization = Annotated[Callable[[str], Awaitable[Any]], Depends(resource_authorization)]
tracer = trace.get_tracer_provider().get_tracer(__name__)
@router.get("", summary="List buckets of user")
@start_as_current_span_async("api_list_resources", tracer=tracer)
async def list_buckets(
authorization: Authorization,
) -> None:
"""
List all the buckets in the system or of the desired user where the user has READ permissions for.\n
Permission "resource:read" required.
\f
Parameters
----------
authorization : Callable[[str], Awaitable[Any]]
Async function to ask the auth service for authorization. Dependency Injection.
"""
await authorization("read")
pass
from hashlib import md5
from typing import Mapping, Optional
from fastapi import status
from fastapi.responses import JSONResponse
class HashJSONResponse(JSONResponse):
def init_headers(self, headers: Optional[Mapping[str, str]] = None) -> None:
super().init_headers(headers=headers)
# Add the ETag header (MD5 hash of content) to the response
if self.status_code == status.HTTP_200_OK:
self.headers["ETag"] = md5(self.body).hexdigest()
from typing import Dict
from fastapi import APIRouter, status
miscellaneous_router = APIRouter(include_in_schema=False)
@miscellaneous_router.get(
"/health",
tags=["Miscellaneous"],
responses={
status.HTTP_200_OK: {
"description": "Service Health is OK",
"content": {"application/json": {"example": {"status": "OK"}}},
},
},
)
def health_check() -> Dict[str, str]:
"""
Check if the service is reachable.
\f
Returns
-------
response : Dict[str, str]
status ok
"""
return {"status": "OK"}
import re
from asyncio import sleep as async_sleep
from typing import TYPE_CHECKING
from httpx import HTTPError
from mako.template import Template
from opentelemetry import trace
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import settings
from app.slurm import SlurmClient, SlurmJobSubmission
from app.utils.backoff_strategy import ExponentialBackoff
if TYPE_CHECKING:
from types_aiobotocore_s3.service_resource import S3ServiceResource
else:
S3ServiceResource = object
nextflow_command_template = Template(filename="mako_templates/nextflow_command.tmpl")
# regex to find S3 files in parameters of workflow execution
s3_file_regex = re.compile(
r"s3://(?!(((2(5[0-5]|[0-4]\d)|[01]?\d{1,2})\.){3}(2(5[0-5]|[0-4]\d)|[01]?\d{1,2})$))[a-z\d][a-z\d.-]{1,61}[a-z\d][^\"]*"
)
tracer = trace.get_tracer_provider().get_tracer(__name__)
async def start_workflow_execution(
s3: S3ServiceResource,
db: AsyncSession,
slurm_client: SlurmClient,
) -> None:
"""
Start a workflow on the Slurm cluster.
Parameters
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on.
slurm_client : app.slurm.rest_client.SlurmClient
Slurm Rest Client to communicate with Slurm cluster.
"""
nextflow_script = nextflow_command_template.render()
# Setup env for the workflow execution
try:
job_submission = SlurmJobSubmission(
script=nextflow_script.strip(),
job={
"current_working_directory": settings.SLURM_WORKING_DIRECTORY,
"name": "somename",
"requeue": False,
},
)
# Try to start the job on the slurm cluster
slurm_job_id = await slurm_client.submit_job(job_submission=job_submission)
await _monitor_proper_job_execution(db=db, slurm_client=slurm_client, slurm_job_id=slurm_job_id)
except (HTTPError, KeyError):
# Mark job as aborted when there is an error
pass
async def _monitor_proper_job_execution(
db: AsyncSession, slurm_client: SlurmClient, slurm_job_id: int
) -> None: # pragma: no cover
"""
Check in an interval based on a backoff strategy if the slurm job is still running
the workflow execution in the database is not marked as finished.
Parameters
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on.
slurm_client : app.slurm.rest_client.SlurmClient
Slurm Rest Client to communicate with Slurm cluster.
slurm_job_id : int
ID of the slurm job to monitor
"""
# exponential to 5 minutes
sleep_generator = ExponentialBackoff(initial_delay=30, max_value=300)
for sleep_time in sleep_generator:
await async_sleep(sleep_time)
with tracer.start_span("monitor_job") as span:
span.set_attributes({"slurm_job_id": slurm_job_id})
if await slurm_client.is_job_finished(slurm_job_id):
await db.close() # Reset connection
sleep_generator.close()
import logging
from clowmdb import latest_revision
from clowmdb.db.session import get_session
from sqlalchemy import text
from tenacity import after_log, before_log, retry, stop_after_attempt, wait_fixed
from app.core.config import settings
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
max_tries = 60 * 3 # 3 minutes
wait_seconds = 2
@retry(
stop=stop_after_attempt(max_tries),
wait=wait_fixed(wait_seconds),
before=before_log(logger, logging.INFO),
after=after_log(logger, logging.WARN),
)
def init() -> None:
try:
with get_session(url=str(settings.SQLALCHEMY_DATABASE_NORMAL_URI)) as db:
# Try to create session to check if DB is awake
db_revision = db.execute(text("SELECT version_num FROM alembic_version LIMIT 1")).scalar_one_or_none()
if db_revision != latest_revision:
raise ValueError(
f"Database revision doesn't match revision defined by package `clowmdb`. Expected {latest_revision}, found {db_revision}" # noqa:E501
)
except Exception as e:
logger.error(e)
raise e
def main() -> None:
logger.info("Initializing DB")
init()
logger.info("DB finished initializing")
if __name__ == "__main__":
main()
import logging
import httpx
from fastapi import status
from tenacity import after_log, before_log, retry, stop_after_attempt, wait_fixed
from app.core.config import settings
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
max_tries = 60 * 3 # 3 minutes
wait_seconds = 2
@retry(
stop=stop_after_attempt(max_tries),
wait=wait_fixed(wait_seconds),
before=before_log(logger, logging.INFO),
after=after_log(logger, logging.WARN),
)
def init() -> None:
try:
response = httpx.get(str(settings.OBJECT_GATEWAY_URI), timeout=5.0, follow_redirects=False)
assert response.status_code == status.HTTP_200_OK
except Exception as e:
logger.error(e)
raise e
def main() -> None:
logger.info("Check S3 connection")
init()
logger.info("S3 connection established")
if __name__ == "__main__":
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment