diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index bf3536ce28c10ae1c4d2742a0928589ef60bb763..9ddf5f1c61268e767592f55048abc01e6316df72 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -13,7 +13,6 @@ variables: SLURM_ENDPOINT: "http://127.0.0.1:8002" ACTIVE_WORKFLOW_EXECUTION_LIMIT: 3 DEV_SYSTEM: "True" - SLURM_JOB_STATUS_CHECK_INTERVAL: 0 SLURM_JOB_MONITORING: "NOMONITORING" cache: diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 399192f0085161a6a07561d48994cdf79b74ab67..74497a0c33bd2467a01c728ed79f3c5353baaf50 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -15,13 +15,13 @@ repos: - id: check-merge-conflict - id: check-ast - repo: https://github.com/psf/black - rev: 23.10.0 + rev: 23.10.1 hooks: - id: black files: app args: [--check] - repo: https://github.com/charliermarsh/ruff-pre-commit - rev: 'v0.1.1' + rev: 'v0.1.2' hooks: - id: ruff - repo: https://github.com/PyCQA/isort diff --git a/app/api/utils.py b/app/api/utils.py index 5d9a439c388d475589663d45f3a7a63401c78d28..e0a18856f75eed22269c88b3e27955db9855eb1f 100644 --- a/app/api/utils.py +++ b/app/api/utils.py @@ -29,7 +29,7 @@ if TYPE_CHECKING: else: S3ServiceResource = object -nextflow_command_template = Template(filename="mako_templates/nextflow_command.template") +nextflow_command_template = Template(filename="mako_templates/nextflow_command.tmpl") # regex to find S3 files in parameters of workflow execution s3_file_regex = re.compile( r"s3://(?!(((2(5[0-5]|[0-4]\d)|[01]?\d{1,2})\.){3}(2(5[0-5]|[0-4]\d)|[01]?\d{1,2})$))[a-z\d][a-z\d.-]{1,61}[a-z\d][^\"]*" @@ -218,7 +218,10 @@ async def start_workflow_execution( try: # Try to start the job on the slurm cluster slurm_job_id = await slurm_client.submit_job( - nextflow_script=nextflow_command, execution_id=execution.execution_id, scm_file_name=scm_file_name + nextflow_script=nextflow_command, + execution_id=execution.execution_id, + git_repo=git_repo, + scm_file_name=scm_file_name, ) await CRUDWorkflowExecution.update_slurm_job_id( db, slurm_job_id=slurm_job_id, execution_id=execution.execution_id diff --git a/app/git_repository/abstract_repository.py b/app/git_repository/abstract_repository.py index 156ffc680cd83d5896aa709155600b417090975b..ec974eae7b98665fa1080ce08ddf53e875bf9830 100644 --- a/app/git_repository/abstract_repository.py +++ b/app/git_repository/abstract_repository.py @@ -29,21 +29,9 @@ class GitRepository(ABC): ... @property - @abstractmethod - def repo_url(self) -> str: - ... - - @property - @abstractmethod - def git_commit_hash(self) -> str: - ... - - @property - @abstractmethod def token(self) -> Optional[str]: - ... + return self._token - @abstractmethod def __init__(self, url: str, git_commit_hash: str, token: Optional[str] = None): """ Initialize Git repository object. @@ -57,7 +45,10 @@ class GitRepository(ABC): token : str | None Token to access a private git repository """ - ... + self.url = url + self.name = (url[:-1] if url.endswith("/") else url).split("/")[-1] + self.commit = git_commit_hash + self._token = token @abstractmethod async def download_file_url(self, filepath: str, client: AsyncClient) -> AnyHttpUrl: @@ -154,7 +145,7 @@ class GitRepository(ABC): Flags if the files exist. """ with tracer.start_as_current_span("git_check_files_exists") as span: - span.set_attribute("repository", self.repo_url) + span.set_attribute("repository", self.url) tasks = [asyncio.ensure_future(self.check_file_exists(file, client=client)) for file in files] result = await asyncio.gather(*tasks) if raise_error: @@ -180,7 +171,7 @@ class GitRepository(ABC): Async HTTP Client with an open connection. """ with tracer.start_as_current_span("git_copy_file_to_bucket") as span: - span.set_attributes({"repository": self.repo_url, "file": filepath}) + span.set_attributes({"repository": self.url, "file": filepath}) with SpooledTemporaryFile(max_size=512000) as f: # temporary file with 500kB data spooled in memory await self.download_file(filepath, client=client, file_handle=f) f.seek(0) @@ -203,7 +194,7 @@ class GitRepository(ABC): Async iterator over the bytes of the file """ with tracer.start_as_current_span("git_stream_file_content") as span: - span.set_attributes({"repository": self.repo_url, "file": filepath}) + span.set_attributes({"repository": self.url, "file": filepath}) async with client.stream( method="GET", url=str(await self.download_file_url(filepath, client)), @@ -227,6 +218,6 @@ class GitRepository(ABC): Write the file into this stream in binary mode. """ with tracer.start_as_current_span("git_download_file") as span: - span.set_attributes({"repository": self.repo_url, "file": filepath}) + span.set_attributes({"repository": self.url, "file": filepath}) async for chunk in self.download_file_stream(filepath, client): file_handle.write(chunk) diff --git a/app/git_repository/github.py b/app/git_repository/github.py index 1f4fdd5146df59dfb7dc993a5472cc29b63b82cf..29497e33a89e0f8b9fd99591c7c487908cc7dc2d 100644 --- a/app/git_repository/github.py +++ b/app/git_repository/github.py @@ -17,22 +17,10 @@ class GitHubRepository(GitRepository): Implementation for GitHub Repository """ - @property - def token(self) -> Optional[str]: - return self._token - @property def provider(self) -> str: return "github" - @property - def repo_url(self) -> str: - return self.url - - @property - def git_commit_hash(self) -> str: - return self.commit - @cached_property def request_auth(self) -> Optional[BasicAuth]: if self._token is not None: @@ -44,13 +32,11 @@ class GitHubRepository(GitRepository): return {"Accept": "application/vnd.github.object+json", "X-GitHub-Api-Version": "2022-11-28"} def __init__(self, url: str, git_commit_hash: str, token: Optional[str] = None): + super().__init__(url=url, git_commit_hash=git_commit_hash, token=token) parse_result = urlparse(url) path_parts = parse_result.path[1:].split("/") - self.url = url self.account = path_parts[0] self.repository = path_parts[1] - self.commit = git_commit_hash - self._token = token def check_file_url(self, filepath: str) -> AnyHttpUrl: return AnyHttpUrl.build( @@ -70,7 +56,7 @@ class GitHubRepository(GitRepository): ) # If the repo is private, request a download URL with a token from the GitHub API with tracer.start_as_current_span("github_get_download_link") as span: - span.set_attributes({"repository": self.repo_url, "file": filepath}) + span.set_attributes({"repository": self.url, "file": filepath}) response = await client.get( str(self.check_file_url(filepath)), auth=USE_CLIENT_DEFAULT if self.request_auth is None else self.request_auth, diff --git a/app/git_repository/gitlab.py b/app/git_repository/gitlab.py index d927d58433358895be77609d2653d964e5683f2e..01429e16114c559a6505027fb4e3e9d77b4afbd0 100644 --- a/app/git_repository/gitlab.py +++ b/app/git_repository/gitlab.py @@ -22,22 +22,10 @@ class GitlabRepository(GitRepository): Implementation for a Gitlab Repository """ - @property - def token(self) -> Optional[str]: - return self._token - @property def provider(self) -> str: return "gitlab" - @property - def repo_url(self) -> str: - return self.url - - @property - def git_commit_hash(self) -> str: - return self.commit - @cached_property def request_auth(self) -> Optional[BearerAuth]: if self._token is not None: @@ -49,12 +37,10 @@ class GitlabRepository(GitRepository): return {} def __init__(self, url: str, git_commit_hash: str, token: Optional[str] = None): - self.url = url + super().__init__(url=url, git_commit_hash=git_commit_hash, token=token) parse_result = urlparse(url) self.domain = parse_result.netloc self.project = parse_result.path[1:] - self.commit = git_commit_hash - self._token = token def check_file_url(self, filepath: str) -> AnyHttpUrl: return AnyHttpUrl.build( diff --git a/app/scm/scm.py b/app/scm/scm.py index 1783a027390f83c72c12a7df0423882d0c3ea83f..ce14cb9b7415f160451a2a2dc56b847c262325a2 100644 --- a/app/scm/scm.py +++ b/app/scm/scm.py @@ -118,7 +118,7 @@ class Provider: return GitlabProvider( name=name, password=repo.token, server=str(AnyHttpUrl.build(scheme="https", host=repo.domain))[:-1] ) - return Provider(name=name, password=repo.token, server=repo.repo_url) # pragma: no cover + return Provider(name=name, password=repo.token, server=repo.url) # pragma: no cover class GitHubProvider(Provider): diff --git a/app/slurm/slurm_rest_client.py b/app/slurm/slurm_rest_client.py index 84ed6b6a44a8ac9b6dea6f33a7a27be6267ad5c8..7a75a720292a7ebca73f867b3ea8a77d3a54340d 100644 --- a/app/slurm/slurm_rest_client.py +++ b/app/slurm/slurm_rest_client.py @@ -1,4 +1,5 @@ from os import environ +from pathlib import Path from typing import Optional from uuid import UUID @@ -8,6 +9,7 @@ from httpx import AsyncClient from opentelemetry import trace from app.core.config import settings +from app.git_repository import GitRepository tracer = trace.get_tracer_provider().get_tracer(__name__) @@ -31,7 +33,9 @@ class SlurmClient: self._headers = {"X-SLURM-USER-TOKEN": settings.SLURM_TOKEN, "X-SLURM-USER-NAME": settings.SLURM_USER} self.version = version - async def submit_job(self, nextflow_script: str, execution_id: UUID, scm_file_name: Optional[str] = None) -> int: + async def submit_job( + self, nextflow_script: str, execution_id: UUID, git_repo: GitRepository, scm_file_name: Optional[str] = None + ) -> int: """ Submit a job to the slurm cluster. @@ -41,6 +45,8 @@ class SlurmClient: Script to execute on the slurm cluster. execution_id : uuid.UUID ID of the workflow execution. + git_repo : app.git_repository.abstract_repository.GitRepository + The Git repository from which the workflow is executed scm_file_name : str | None, default None Name of a SCM file if the workflow is in a private repository @@ -54,6 +60,10 @@ class SlurmClient: # Reference a SCM file for a private repository if scm_file_name is not None: env["NXF_SCM_FILE"] = f"{settings.PARAMS_BUCKET_MOUNT_PATH}/{scm_file_name}" + if "NXF_ASSETS" in env.keys(): # pragma: no cover + env["NXF_ASSETS"] = str(Path(env["NXF_ASSETS"]) / f"{git_repo.name}_{git_repo.commit}") + else: # pragma: no cover + env["NXF_ASSETS"] = f"$HOME/.nextflow/assets/{git_repo.name}_{git_repo.commit}" body = { "script": nextflow_script, diff --git a/mako_templates/nextflow_command.template b/mako_templates/nextflow_command.tmpl similarity index 83% rename from mako_templates/nextflow_command.template rename to mako_templates/nextflow_command.tmpl index 72dd2fd08c3227dd8e52203be3d89ac900a0d490..b3e7a07b1b0010b2648b2006976a896e2186fdb8 100644 --- a/mako_templates/nextflow_command.template +++ b/mako_templates/nextflow_command.tmpl @@ -1,6 +1,10 @@ #!/bin/bash -${nx_bin} run ${repo.repo_url} \ +if [ -d "$NXF_ASSETS" ]; then + touch "$NXF_ASSETS" +fi + +${nx_bin} run ${repo.url} \ % if scm_file_id is not None and repo.provider != 'github': -hub ${scm_file_id} \ % else: @@ -16,7 +20,7 @@ ${nx_bin} run ${repo.repo_url} \ % if configuration is not None: -c ${configuration} \ % endif --revision ${repo.git_commit_hash} \ +-revision ${repo.commit} \ % for param_name, param_value in parameters.items(): --${param_name} ${param_value} \ % endfor diff --git a/requirements-dev.txt b/requirements-dev.txt index f63739f710d32f34ab3dd2dc88c361bb5369421a..d0e72cf34023f4028e1b6d4eb7f07b0276c6444c 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -4,7 +4,7 @@ pytest-asyncio>=0.21.0,<0.22.0 pytest-cov>=4.1.0,<4.2.0 coverage[toml]>=7.3.0,<7.4.0 # Linters -ruff>=0.1.1,<0.1.2 +ruff>=0.1.2,<0.1.3 black>=23.10.0,<23.11.0 isort>=5.12.0,<5.13.0 mypy>=1.6.0,<1.7.0