Skip to content
Snippets Groups Projects
Verified Commit e45f52c7 authored by Daniel Göbel's avatar Daniel Göbel
Browse files

Set assets directory for each workflow version differently

#60
parent 0aee78f5
No related branches found
No related tags found
2 merge requests!69Delete dev branch,!58Resolve "Set ASSETS directory for every workflow version differently"
Pipeline #38965 passed
...@@ -13,7 +13,6 @@ variables: ...@@ -13,7 +13,6 @@ variables:
SLURM_ENDPOINT: "http://127.0.0.1:8002" SLURM_ENDPOINT: "http://127.0.0.1:8002"
ACTIVE_WORKFLOW_EXECUTION_LIMIT: 3 ACTIVE_WORKFLOW_EXECUTION_LIMIT: 3
DEV_SYSTEM: "True" DEV_SYSTEM: "True"
SLURM_JOB_STATUS_CHECK_INTERVAL: 0
SLURM_JOB_MONITORING: "NOMONITORING" SLURM_JOB_MONITORING: "NOMONITORING"
cache: cache:
......
...@@ -15,13 +15,13 @@ repos: ...@@ -15,13 +15,13 @@ repos:
- id: check-merge-conflict - id: check-merge-conflict
- id: check-ast - id: check-ast
- repo: https://github.com/psf/black - repo: https://github.com/psf/black
rev: 23.10.0 rev: 23.10.1
hooks: hooks:
- id: black - id: black
files: app files: app
args: [--check] args: [--check]
- repo: https://github.com/charliermarsh/ruff-pre-commit - repo: https://github.com/charliermarsh/ruff-pre-commit
rev: 'v0.1.1' rev: 'v0.1.2'
hooks: hooks:
- id: ruff - id: ruff
- repo: https://github.com/PyCQA/isort - repo: https://github.com/PyCQA/isort
......
...@@ -29,7 +29,7 @@ if TYPE_CHECKING: ...@@ -29,7 +29,7 @@ if TYPE_CHECKING:
else: else:
S3ServiceResource = object S3ServiceResource = object
nextflow_command_template = Template(filename="mako_templates/nextflow_command.template") nextflow_command_template = Template(filename="mako_templates/nextflow_command.tmpl")
# regex to find S3 files in parameters of workflow execution # regex to find S3 files in parameters of workflow execution
s3_file_regex = re.compile( s3_file_regex = re.compile(
r"s3://(?!(((2(5[0-5]|[0-4]\d)|[01]?\d{1,2})\.){3}(2(5[0-5]|[0-4]\d)|[01]?\d{1,2})$))[a-z\d][a-z\d.-]{1,61}[a-z\d][^\"]*" r"s3://(?!(((2(5[0-5]|[0-4]\d)|[01]?\d{1,2})\.){3}(2(5[0-5]|[0-4]\d)|[01]?\d{1,2})$))[a-z\d][a-z\d.-]{1,61}[a-z\d][^\"]*"
...@@ -218,7 +218,10 @@ async def start_workflow_execution( ...@@ -218,7 +218,10 @@ async def start_workflow_execution(
try: try:
# Try to start the job on the slurm cluster # Try to start the job on the slurm cluster
slurm_job_id = await slurm_client.submit_job( slurm_job_id = await slurm_client.submit_job(
nextflow_script=nextflow_command, execution_id=execution.execution_id, scm_file_name=scm_file_name nextflow_script=nextflow_command,
execution_id=execution.execution_id,
git_repo=git_repo,
scm_file_name=scm_file_name,
) )
await CRUDWorkflowExecution.update_slurm_job_id( await CRUDWorkflowExecution.update_slurm_job_id(
db, slurm_job_id=slurm_job_id, execution_id=execution.execution_id db, slurm_job_id=slurm_job_id, execution_id=execution.execution_id
......
...@@ -29,21 +29,9 @@ class GitRepository(ABC): ...@@ -29,21 +29,9 @@ class GitRepository(ABC):
... ...
@property @property
@abstractmethod
def repo_url(self) -> str:
...
@property
@abstractmethod
def git_commit_hash(self) -> str:
...
@property
@abstractmethod
def token(self) -> Optional[str]: def token(self) -> Optional[str]:
... return self._token
@abstractmethod
def __init__(self, url: str, git_commit_hash: str, token: Optional[str] = None): def __init__(self, url: str, git_commit_hash: str, token: Optional[str] = None):
""" """
Initialize Git repository object. Initialize Git repository object.
...@@ -57,7 +45,10 @@ class GitRepository(ABC): ...@@ -57,7 +45,10 @@ class GitRepository(ABC):
token : str | None token : str | None
Token to access a private git repository Token to access a private git repository
""" """
... self.url = url
self.name = (url[:-1] if url.endswith("/") else url).split("/")[-1]
self.commit = git_commit_hash
self._token = token
@abstractmethod @abstractmethod
async def download_file_url(self, filepath: str, client: AsyncClient) -> AnyHttpUrl: async def download_file_url(self, filepath: str, client: AsyncClient) -> AnyHttpUrl:
...@@ -154,7 +145,7 @@ class GitRepository(ABC): ...@@ -154,7 +145,7 @@ class GitRepository(ABC):
Flags if the files exist. Flags if the files exist.
""" """
with tracer.start_as_current_span("git_check_files_exists") as span: with tracer.start_as_current_span("git_check_files_exists") as span:
span.set_attribute("repository", self.repo_url) span.set_attribute("repository", self.url)
tasks = [asyncio.ensure_future(self.check_file_exists(file, client=client)) for file in files] tasks = [asyncio.ensure_future(self.check_file_exists(file, client=client)) for file in files]
result = await asyncio.gather(*tasks) result = await asyncio.gather(*tasks)
if raise_error: if raise_error:
...@@ -180,7 +171,7 @@ class GitRepository(ABC): ...@@ -180,7 +171,7 @@ class GitRepository(ABC):
Async HTTP Client with an open connection. Async HTTP Client with an open connection.
""" """
with tracer.start_as_current_span("git_copy_file_to_bucket") as span: with tracer.start_as_current_span("git_copy_file_to_bucket") as span:
span.set_attributes({"repository": self.repo_url, "file": filepath}) span.set_attributes({"repository": self.url, "file": filepath})
with SpooledTemporaryFile(max_size=512000) as f: # temporary file with 500kB data spooled in memory with SpooledTemporaryFile(max_size=512000) as f: # temporary file with 500kB data spooled in memory
await self.download_file(filepath, client=client, file_handle=f) await self.download_file(filepath, client=client, file_handle=f)
f.seek(0) f.seek(0)
...@@ -203,7 +194,7 @@ class GitRepository(ABC): ...@@ -203,7 +194,7 @@ class GitRepository(ABC):
Async iterator over the bytes of the file Async iterator over the bytes of the file
""" """
with tracer.start_as_current_span("git_stream_file_content") as span: with tracer.start_as_current_span("git_stream_file_content") as span:
span.set_attributes({"repository": self.repo_url, "file": filepath}) span.set_attributes({"repository": self.url, "file": filepath})
async with client.stream( async with client.stream(
method="GET", method="GET",
url=str(await self.download_file_url(filepath, client)), url=str(await self.download_file_url(filepath, client)),
...@@ -227,6 +218,6 @@ class GitRepository(ABC): ...@@ -227,6 +218,6 @@ class GitRepository(ABC):
Write the file into this stream in binary mode. Write the file into this stream in binary mode.
""" """
with tracer.start_as_current_span("git_download_file") as span: with tracer.start_as_current_span("git_download_file") as span:
span.set_attributes({"repository": self.repo_url, "file": filepath}) span.set_attributes({"repository": self.url, "file": filepath})
async for chunk in self.download_file_stream(filepath, client): async for chunk in self.download_file_stream(filepath, client):
file_handle.write(chunk) file_handle.write(chunk)
...@@ -17,22 +17,10 @@ class GitHubRepository(GitRepository): ...@@ -17,22 +17,10 @@ class GitHubRepository(GitRepository):
Implementation for GitHub Repository Implementation for GitHub Repository
""" """
@property
def token(self) -> Optional[str]:
return self._token
@property @property
def provider(self) -> str: def provider(self) -> str:
return "github" return "github"
@property
def repo_url(self) -> str:
return self.url
@property
def git_commit_hash(self) -> str:
return self.commit
@cached_property @cached_property
def request_auth(self) -> Optional[BasicAuth]: def request_auth(self) -> Optional[BasicAuth]:
if self._token is not None: if self._token is not None:
...@@ -44,13 +32,11 @@ class GitHubRepository(GitRepository): ...@@ -44,13 +32,11 @@ class GitHubRepository(GitRepository):
return {"Accept": "application/vnd.github.object+json", "X-GitHub-Api-Version": "2022-11-28"} return {"Accept": "application/vnd.github.object+json", "X-GitHub-Api-Version": "2022-11-28"}
def __init__(self, url: str, git_commit_hash: str, token: Optional[str] = None): def __init__(self, url: str, git_commit_hash: str, token: Optional[str] = None):
super().__init__(url=url, git_commit_hash=git_commit_hash, token=token)
parse_result = urlparse(url) parse_result = urlparse(url)
path_parts = parse_result.path[1:].split("/") path_parts = parse_result.path[1:].split("/")
self.url = url
self.account = path_parts[0] self.account = path_parts[0]
self.repository = path_parts[1] self.repository = path_parts[1]
self.commit = git_commit_hash
self._token = token
def check_file_url(self, filepath: str) -> AnyHttpUrl: def check_file_url(self, filepath: str) -> AnyHttpUrl:
return AnyHttpUrl.build( return AnyHttpUrl.build(
...@@ -70,7 +56,7 @@ class GitHubRepository(GitRepository): ...@@ -70,7 +56,7 @@ class GitHubRepository(GitRepository):
) )
# If the repo is private, request a download URL with a token from the GitHub API # If the repo is private, request a download URL with a token from the GitHub API
with tracer.start_as_current_span("github_get_download_link") as span: with tracer.start_as_current_span("github_get_download_link") as span:
span.set_attributes({"repository": self.repo_url, "file": filepath}) span.set_attributes({"repository": self.url, "file": filepath})
response = await client.get( response = await client.get(
str(self.check_file_url(filepath)), str(self.check_file_url(filepath)),
auth=USE_CLIENT_DEFAULT if self.request_auth is None else self.request_auth, auth=USE_CLIENT_DEFAULT if self.request_auth is None else self.request_auth,
......
...@@ -22,22 +22,10 @@ class GitlabRepository(GitRepository): ...@@ -22,22 +22,10 @@ class GitlabRepository(GitRepository):
Implementation for a Gitlab Repository Implementation for a Gitlab Repository
""" """
@property
def token(self) -> Optional[str]:
return self._token
@property @property
def provider(self) -> str: def provider(self) -> str:
return "gitlab" return "gitlab"
@property
def repo_url(self) -> str:
return self.url
@property
def git_commit_hash(self) -> str:
return self.commit
@cached_property @cached_property
def request_auth(self) -> Optional[BearerAuth]: def request_auth(self) -> Optional[BearerAuth]:
if self._token is not None: if self._token is not None:
...@@ -49,12 +37,10 @@ class GitlabRepository(GitRepository): ...@@ -49,12 +37,10 @@ class GitlabRepository(GitRepository):
return {} return {}
def __init__(self, url: str, git_commit_hash: str, token: Optional[str] = None): def __init__(self, url: str, git_commit_hash: str, token: Optional[str] = None):
self.url = url super().__init__(url=url, git_commit_hash=git_commit_hash, token=token)
parse_result = urlparse(url) parse_result = urlparse(url)
self.domain = parse_result.netloc self.domain = parse_result.netloc
self.project = parse_result.path[1:] self.project = parse_result.path[1:]
self.commit = git_commit_hash
self._token = token
def check_file_url(self, filepath: str) -> AnyHttpUrl: def check_file_url(self, filepath: str) -> AnyHttpUrl:
return AnyHttpUrl.build( return AnyHttpUrl.build(
......
...@@ -118,7 +118,7 @@ class Provider: ...@@ -118,7 +118,7 @@ class Provider:
return GitlabProvider( return GitlabProvider(
name=name, password=repo.token, server=str(AnyHttpUrl.build(scheme="https", host=repo.domain))[:-1] name=name, password=repo.token, server=str(AnyHttpUrl.build(scheme="https", host=repo.domain))[:-1]
) )
return Provider(name=name, password=repo.token, server=repo.repo_url) # pragma: no cover return Provider(name=name, password=repo.token, server=repo.url) # pragma: no cover
class GitHubProvider(Provider): class GitHubProvider(Provider):
......
from os import environ from os import environ
from pathlib import Path
from typing import Optional from typing import Optional
from uuid import UUID from uuid import UUID
...@@ -8,6 +9,7 @@ from httpx import AsyncClient ...@@ -8,6 +9,7 @@ from httpx import AsyncClient
from opentelemetry import trace from opentelemetry import trace
from app.core.config import settings from app.core.config import settings
from app.git_repository import GitRepository
tracer = trace.get_tracer_provider().get_tracer(__name__) tracer = trace.get_tracer_provider().get_tracer(__name__)
...@@ -31,7 +33,9 @@ class SlurmClient: ...@@ -31,7 +33,9 @@ class SlurmClient:
self._headers = {"X-SLURM-USER-TOKEN": settings.SLURM_TOKEN, "X-SLURM-USER-NAME": settings.SLURM_USER} self._headers = {"X-SLURM-USER-TOKEN": settings.SLURM_TOKEN, "X-SLURM-USER-NAME": settings.SLURM_USER}
self.version = version self.version = version
async def submit_job(self, nextflow_script: str, execution_id: UUID, scm_file_name: Optional[str] = None) -> int: async def submit_job(
self, nextflow_script: str, execution_id: UUID, git_repo: GitRepository, scm_file_name: Optional[str] = None
) -> int:
""" """
Submit a job to the slurm cluster. Submit a job to the slurm cluster.
...@@ -41,6 +45,8 @@ class SlurmClient: ...@@ -41,6 +45,8 @@ class SlurmClient:
Script to execute on the slurm cluster. Script to execute on the slurm cluster.
execution_id : uuid.UUID execution_id : uuid.UUID
ID of the workflow execution. ID of the workflow execution.
git_repo : app.git_repository.abstract_repository.GitRepository
The Git repository from which the workflow is executed
scm_file_name : str | None, default None scm_file_name : str | None, default None
Name of a SCM file if the workflow is in a private repository Name of a SCM file if the workflow is in a private repository
...@@ -54,6 +60,10 @@ class SlurmClient: ...@@ -54,6 +60,10 @@ class SlurmClient:
# Reference a SCM file for a private repository # Reference a SCM file for a private repository
if scm_file_name is not None: if scm_file_name is not None:
env["NXF_SCM_FILE"] = f"{settings.PARAMS_BUCKET_MOUNT_PATH}/{scm_file_name}" env["NXF_SCM_FILE"] = f"{settings.PARAMS_BUCKET_MOUNT_PATH}/{scm_file_name}"
if "NXF_ASSETS" in env.keys(): # pragma: no cover
env["NXF_ASSETS"] = str(Path(env["NXF_ASSETS"]) / f"{git_repo.name}_{git_repo.commit}")
else: # pragma: no cover
env["NXF_ASSETS"] = f"$HOME/.nextflow/assets/{git_repo.name}_{git_repo.commit}"
body = { body = {
"script": nextflow_script, "script": nextflow_script,
......
#!/bin/bash #!/bin/bash
${nx_bin} run ${repo.repo_url} \ if [ -d "$NXF_ASSETS" ]; then
touch "$NXF_ASSETS"
fi
${nx_bin} run ${repo.url} \
% if scm_file_id is not None and repo.provider != 'github': % if scm_file_id is not None and repo.provider != 'github':
-hub ${scm_file_id} \ -hub ${scm_file_id} \
% else: % else:
...@@ -16,7 +20,7 @@ ${nx_bin} run ${repo.repo_url} \ ...@@ -16,7 +20,7 @@ ${nx_bin} run ${repo.repo_url} \
% if configuration is not None: % if configuration is not None:
-c ${configuration} \ -c ${configuration} \
% endif % endif
-revision ${repo.git_commit_hash} \ -revision ${repo.commit} \
% for param_name, param_value in parameters.items(): % for param_name, param_value in parameters.items():
--${param_name} ${param_value} \ --${param_name} ${param_value} \
% endfor % endfor
...@@ -4,7 +4,7 @@ pytest-asyncio>=0.21.0,<0.22.0 ...@@ -4,7 +4,7 @@ pytest-asyncio>=0.21.0,<0.22.0
pytest-cov>=4.1.0,<4.2.0 pytest-cov>=4.1.0,<4.2.0
coverage[toml]>=7.3.0,<7.4.0 coverage[toml]>=7.3.0,<7.4.0
# Linters # Linters
ruff>=0.1.1,<0.1.2 ruff>=0.1.2,<0.1.3
black>=23.10.0,<23.11.0 black>=23.10.0,<23.11.0
isort>=5.12.0,<5.13.0 isort>=5.12.0,<5.13.0
mypy>=1.6.0,<1.7.0 mypy>=1.6.0,<1.7.0
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment