From aac539f7fcc4a72e110f3e2cd0c29efabebaa74f Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 26 Jan 2024 13:42:58 +0000 Subject: [PATCH 1/2] Bump black from 23.12.1 to 24.1.0 Bumps [black](https://github.com/psf/black) from 23.12.1 to 24.1.0. - [Release notes](https://github.com/psf/black/releases) - [Changelog](https://github.com/psf/black/blob/main/CHANGES.md) - [Commits](https://github.com/psf/black/compare/23.12.1...24.1.0) --- updated-dependencies: - dependency-name: black dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] --- lint-requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lint-requirements.txt b/lint-requirements.txt index ad7a6234..ef16c98f 100644 --- a/lint-requirements.txt +++ b/lint-requirements.txt @@ -1,4 +1,4 @@ -black==23.12.1 +black==24.1.0 codespell==2.2.6 flake8-bugbear==24.1.17 pyupgrade==3.15.0 \ No newline at end of file From 936c8bf78aa5e71051bacbe7ebb7bfaf6bd192d8 Mon Sep 17 00:00:00 2001 From: Alberto Mulone Date: Sat, 27 Jan 2024 16:58:17 +0100 Subject: [PATCH 2/2] new format style --- streamflow/__main__.py | 1 + streamflow/core/asyncache.py | 1 + streamflow/core/context.py | 3 +- streamflow/core/data.py | 36 ++-- streamflow/core/deployment.py | 42 ++--- streamflow/core/exception.py | 12 +- streamflow/core/persistence.py | 174 +++++++----------- streamflow/core/provenance.py | 3 +- streamflow/core/recovery.py | 15 +- streamflow/core/scheduling.py | 21 +-- streamflow/core/workflow.py | 28 ++- streamflow/cwl/command.py | 6 +- streamflow/cwl/processor.py | 44 +++-- .../cwl/requirement/docker/translator.py | 3 +- streamflow/cwl/runner.py | 6 +- streamflow/cwl/translator.py | 52 +++--- streamflow/cwl/utils.py | 24 ++- streamflow/data/manager.py | 8 +- streamflow/data/remotepath.py | 12 +- streamflow/deployment/connector/base.py | 7 +- streamflow/deployment/connector/container.py | 15 +- streamflow/deployment/connector/kubernetes.py | 9 +- streamflow/deployment/connector/local.py | 24 ++- .../deployment/connector/queue_manager.py | 26 ++- streamflow/deployment/connector/ssh.py | 26 ++- streamflow/ext/plugin.py | 3 +- streamflow/provenance/run_crate.py | 9 +- streamflow/recovery/failure_manager.py | 3 +- streamflow/scheduling/scheduler.py | 24 ++- streamflow/workflow/step.py | 46 +++-- streamflow/workflow/token.py | 3 +- 31 files changed, 303 insertions(+), 383 deletions(-) diff --git a/streamflow/__main__.py b/streamflow/__main__.py index 4bf4b2ab..ec1215a8 100644 --- a/streamflow/__main__.py +++ b/streamflow/__main__.py @@ -1,4 +1,5 @@ """Default entrypoint for the streamflow module.""" + import sys from streamflow import main diff --git a/streamflow/core/asyncache.py b/streamflow/core/asyncache.py index b05bec73..de9f343b 100644 --- a/streamflow/core/asyncache.py +++ b/streamflow/core/asyncache.py @@ -2,6 +2,7 @@ Helpers to use [cachetools](https://github.com/tkem/cachetools) with asyncio. """ + import functools import inspect diff --git a/streamflow/core/context.py b/streamflow/core/context.py index 46c7bc97..30c9a683 100644 --- a/streamflow/core/context.py +++ b/streamflow/core/context.py @@ -18,8 +18,7 @@ class SchemaEntity(ABC): @classmethod @abstractmethod - def get_schema(cls) -> str: - ... + def get_schema(cls) -> str: ... class StreamFlowContext: diff --git a/streamflow/core/data.py b/streamflow/core/data.py index 8952ec35..c2f20f4d 100644 --- a/streamflow/core/data.py +++ b/streamflow/core/data.py @@ -60,8 +60,7 @@ def __init__(self, context: StreamFlowContext): self.context: StreamFlowContext = context @abstractmethod - async def close(self) -> None: - ... + async def close(self) -> None: ... @abstractmethod def get_data_locations( @@ -70,18 +69,15 @@ def get_data_locations( deployment: str | None = None, location_name: str | None = None, data_type: DataType | None = None, - ) -> MutableSequence[DataLocation]: - ... + ) -> MutableSequence[DataLocation]: ... @abstractmethod def get_source_location( self, path: str, dst_deployment: str - ) -> DataLocation | None: - ... + ) -> DataLocation | None: ... @abstractmethod - def invalidate_location(self, location: Location, path: str) -> None: - ... + def invalidate_location(self, location: Location, path: str) -> None: ... @abstractmethod def register_path( @@ -90,14 +86,12 @@ def register_path( path: str, relpath: str | None = None, data_type: DataType = DataType.PRIMARY, - ) -> DataLocation: - ... + ) -> DataLocation: ... @abstractmethod def register_relation( self, src_location: DataLocation, dst_location: DataLocation - ) -> None: - ... + ) -> None: ... @abstractmethod async def transfer_data( @@ -107,8 +101,7 @@ async def transfer_data( dst_locations: MutableSequence[Location], dst_path: str, writable: bool = False, - ) -> None: - ... + ) -> None: ... class FileType(Enum): @@ -121,23 +114,18 @@ def __init__(self, stream: Any): self.stream: Any = stream @abstractmethod - async def close(self): - ... + async def close(self): ... @abstractmethod - async def read(self, size: int | None = None): - ... + async def read(self, size: int | None = None): ... @abstractmethod - async def write(self, data: Any): - ... + async def write(self, data: Any): ... class StreamWrapperContextManager(ABC): @abstractmethod - async def __aenter__(self) -> StreamWrapper: - ... + async def __aenter__(self) -> StreamWrapper: ... @abstractmethod - async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: - ... + async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: ... diff --git a/streamflow/core/deployment.py b/streamflow/core/deployment.py index dec909bb..33290990 100644 --- a/streamflow/core/deployment.py +++ b/streamflow/core/deployment.py @@ -62,8 +62,7 @@ class BindingFilter(SchemaEntity): @abstractmethod async def get_targets( self, job: Job, targets: MutableSequence[Target] - ) -> MutableSequence[Target]: - ... + ) -> MutableSequence[Target]: ... class Connector(SchemaEntity): @@ -79,8 +78,7 @@ async def copy_local_to_remote( dst: str, locations: MutableSequence[Location], read_only: bool = False, - ) -> None: - ... + ) -> None: ... @abstractmethod async def copy_remote_to_local( @@ -89,8 +87,7 @@ async def copy_remote_to_local( dst: str, locations: MutableSequence[Location], read_only: bool = False, - ) -> None: - ... + ) -> None: ... @abstractmethod async def copy_remote_to_remote( @@ -101,12 +98,10 @@ async def copy_remote_to_remote( source_location: Location, source_connector: Connector | None = None, read_only: bool = False, - ) -> None: - ... + ) -> None: ... @abstractmethod - async def deploy(self, external: bool) -> None: - ... + async def deploy(self, external: bool) -> None: ... @abstractmethod async def get_available_locations( @@ -115,8 +110,7 @@ async def get_available_locations( input_directory: str | None = None, output_directory: str | None = None, tmp_directory: str | None = None, - ) -> MutableMapping[str, AvailableLocation]: - ... + ) -> MutableMapping[str, AvailableLocation]: ... @abstractmethod async def run( @@ -131,18 +125,15 @@ async def run( capture_output: bool = False, timeout: int | None = None, job_name: str | None = None, - ) -> tuple[Any | None, int] | None: - ... + ) -> tuple[Any | None, int] | None: ... @abstractmethod - async def undeploy(self, external: bool) -> None: - ... + async def undeploy(self, external: bool) -> None: ... @abstractmethod async def get_stream_reader( self, location: Location, src: str - ) -> StreamWrapperContextManager: - ... + ) -> StreamWrapperContextManager: ... class DeploymentManager(SchemaEntity): @@ -150,24 +141,19 @@ def __init__(self, context: StreamFlowContext) -> None: self.context: StreamFlowContext = context @abstractmethod - async def close(self) -> None: - ... + async def close(self) -> None: ... @abstractmethod - async def deploy(self, deployment_config: DeploymentConfig) -> None: - ... + async def deploy(self, deployment_config: DeploymentConfig) -> None: ... @abstractmethod - def get_connector(self, deployment_name: str) -> Connector | None: - ... + def get_connector(self, deployment_name: str) -> Connector | None: ... @abstractmethod - async def undeploy(self, deployment_name: str) -> None: - ... + async def undeploy(self, deployment_name: str) -> None: ... @abstractmethod - async def undeploy_all(self): - ... + async def undeploy_all(self): ... class DeploymentConfig(Config, PersistableEntity): diff --git a/streamflow/core/exception.py b/streamflow/core/exception.py index e915f048..01d02010 100644 --- a/streamflow/core/exception.py +++ b/streamflow/core/exception.py @@ -7,27 +7,27 @@ class WorkflowException(Exception): - ... + pass class WorkflowDefinitionException(WorkflowException): - ... + pass class WorkflowExecutionException(WorkflowException): - ... + pass class WorkflowProvenanceException(WorkflowException): - ... + pass class FailureHandlingException(WorkflowException): - ... + pass class InvalidPluginException(Exception): - ... + pass class UnrecoverableTokenException(WorkflowException): diff --git a/streamflow/core/persistence.py b/streamflow/core/persistence.py index b08fd66b..02a65755 100644 --- a/streamflow/core/persistence.py +++ b/streamflow/core/persistence.py @@ -14,60 +14,46 @@ class DatabaseLoadingContext(ABC): @abstractmethod - def add_deployment(self, persistent_id: int, deployment: DeploymentConfig): - ... + def add_deployment(self, persistent_id: int, deployment: DeploymentConfig): ... @abstractmethod - def add_filter(self, persistent_id: int, filter_config: FilterConfig): - ... + def add_filter(self, persistent_id: int, filter_config: FilterConfig): ... @abstractmethod - def add_port(self, persistent_id: int, port: Port): - ... + def add_port(self, persistent_id: int, port: Port): ... @abstractmethod - def add_step(self, persistent_id: int, step: Step): - ... + def add_step(self, persistent_id: int, step: Step): ... @abstractmethod - def add_target(self, persistent_id: int, target: Target): - ... + def add_target(self, persistent_id: int, target: Target): ... @abstractmethod - def add_token(self, persistent_id: int, token: Token): - ... + def add_token(self, persistent_id: int, token: Token): ... @abstractmethod - def add_workflow(self, persistent_id: int, workflow: Workflow): - ... + def add_workflow(self, persistent_id: int, workflow: Workflow): ... @abstractmethod - async def load_deployment(self, context: StreamFlowContext, persistent_id: int): - ... + async def load_deployment(self, context: StreamFlowContext, persistent_id: int): ... @abstractmethod - async def load_filter(self, context: StreamFlowContext, persistent_id: int): - ... + async def load_filter(self, context: StreamFlowContext, persistent_id: int): ... @abstractmethod - async def load_port(self, context: StreamFlowContext, persistent_id: int): - ... + async def load_port(self, context: StreamFlowContext, persistent_id: int): ... @abstractmethod - async def load_step(self, context: StreamFlowContext, persistent_id: int): - ... + async def load_step(self, context: StreamFlowContext, persistent_id: int): ... @abstractmethod - async def load_target(self, context: StreamFlowContext, persistent_id: int): - ... + async def load_target(self, context: StreamFlowContext, persistent_id: int): ... @abstractmethod - async def load_token(self, context: StreamFlowContext, persistent_id: int): - ... + async def load_token(self, context: StreamFlowContext, persistent_id: int): ... @abstractmethod - async def load_workflow(self, context: StreamFlowContext, persistent_id: int): - ... + async def load_workflow(self, context: StreamFlowContext, persistent_id: int): ... class PersistableEntity: @@ -82,12 +68,10 @@ async def load( context: StreamFlowContext, persistent_id: int, loading_context: DatabaseLoadingContext, - ) -> PersistableEntity: - ... + ) -> PersistableEntity: ... @abstractmethod - async def save(self, context: StreamFlowContext) -> None: - ... + async def save(self, context: StreamFlowContext) -> None: ... class DependencyType(Enum): @@ -102,8 +86,7 @@ def __init__(self, context: StreamFlowContext): @abstractmethod async def add_dependency( self, step: int, port: int, type: DependencyType, name: str - ) -> None: - ... + ) -> None: ... @abstractmethod async def add_deployment( @@ -114,12 +97,10 @@ async def add_deployment( external: bool, lazy: bool, workdir: str | None, - ) -> int: - ... + ) -> int: ... @abstractmethod - async def add_execution(self, step_id: int, tag: str, cmd: str) -> int: - ... + async def add_execution(self, step_id: int, tag: str, cmd: str) -> int: ... @abstractmethod async def add_filter( @@ -127,8 +108,7 @@ async def add_filter( name: str, type: str, config: str, - ) -> int: - ... + ) -> int: ... @abstractmethod async def add_port( @@ -137,12 +117,12 @@ async def add_port( workflow_id: int, type: type[Port], params: MutableMapping[str, Any], - ) -> int: - ... + ) -> int: ... @abstractmethod - async def add_provenance(self, inputs: MutableSequence[int], token: int) -> None: - ... + async def add_provenance( + self, inputs: MutableSequence[int], token: int + ) -> None: ... @abstractmethod async def add_step( @@ -152,8 +132,7 @@ async def add_step( status: int, type: type[Step], params: MutableMapping[str, Any], - ) -> int: - ... + ) -> int: ... @abstractmethod async def add_target( @@ -164,171 +143,142 @@ async def add_target( locations: int = 1, service: str | None = None, workdir: str | None = None, - ) -> int: - ... + ) -> int: ... @abstractmethod async def add_token( self, tag: str, type: type[Token], value: Any, port: int | None = None - ) -> int: - ... + ) -> int: ... @abstractmethod async def add_workflow( self, name: str, params: MutableMapping[str, Any], status: int, type: str - ) -> int: - ... + ) -> int: ... @abstractmethod - async def close(self) -> None: - ... + async def close(self) -> None: ... @abstractmethod async def get_dependees( self, token_id: int - ) -> MutableSequence[MutableMapping[str, Any]]: - ... + ) -> MutableSequence[MutableMapping[str, Any]]: ... @abstractmethod async def get_dependers( self, token_id: int - ) -> MutableSequence[MutableMapping[str, Any]]: - ... + ) -> MutableSequence[MutableMapping[str, Any]]: ... @abstractmethod - async def get_deployment(self, deployment_id: int) -> MutableMapping[str, Any]: - ... + async def get_deployment(self, deployment_id: int) -> MutableMapping[str, Any]: ... @abstractmethod - async def get_execution(self, execution_id: int) -> MutableMapping[str, Any]: - ... + async def get_execution(self, execution_id: int) -> MutableMapping[str, Any]: ... @abstractmethod async def get_executions_by_step( self, step_id: int - ) -> MutableSequence[MutableMapping[str, Any]]: - ... + ) -> MutableSequence[MutableMapping[str, Any]]: ... @abstractmethod - async def get_filter(self, filter_id: int) -> MutableMapping[str, Any]: - ... + async def get_filter(self, filter_id: int) -> MutableMapping[str, Any]: ... @abstractmethod async def get_input_ports( self, step_id: int - ) -> MutableSequence[MutableMapping[str, Any]]: - ... + ) -> MutableSequence[MutableMapping[str, Any]]: ... @abstractmethod async def get_input_steps( self, port_id: int - ) -> MutableSequence[MutableMapping[str, Any]]: - ... + ) -> MutableSequence[MutableMapping[str, Any]]: ... @abstractmethod async def get_output_ports( self, step_id: int - ) -> MutableSequence[MutableMapping[str, Any]]: - ... + ) -> MutableSequence[MutableMapping[str, Any]]: ... @abstractmethod async def get_output_steps( self, port_id: int - ) -> MutableSequence[MutableMapping[str, Any]]: - ... + ) -> MutableSequence[MutableMapping[str, Any]]: ... @abstractmethod - async def get_port(self, port_id: int) -> MutableMapping[str, Any]: - ... + async def get_port(self, port_id: int) -> MutableMapping[str, Any]: ... @abstractmethod - async def get_port_from_token(self, token_id: int) -> MutableMapping[str, Any]: - ... + async def get_port_from_token(self, token_id: int) -> MutableMapping[str, Any]: ... @abstractmethod - async def get_port_tokens(self, port_id: int) -> MutableSequence[int]: - ... + async def get_port_tokens(self, port_id: int) -> MutableSequence[int]: ... @abstractmethod async def get_reports( self, workflow: str, last_only: bool = False - ) -> MutableSequence[MutableSequence[MutableMapping[str, Any]]]: - ... + ) -> MutableSequence[MutableSequence[MutableMapping[str, Any]]]: ... @abstractmethod - async def get_step(self, step_id: int) -> MutableMapping[str, Any]: - ... + async def get_step(self, step_id: int) -> MutableMapping[str, Any]: ... @abstractmethod - async def get_target(self, target_id: int) -> MutableMapping[str, Any]: - ... + async def get_target(self, target_id: int) -> MutableMapping[str, Any]: ... @abstractmethod - async def get_token(self, token_id: int) -> MutableMapping[str, Any]: - ... + async def get_token(self, token_id: int) -> MutableMapping[str, Any]: ... @abstractmethod - async def get_workflow(self, workflow_id: int) -> MutableMapping[str, Any]: - ... + async def get_workflow(self, workflow_id: int) -> MutableMapping[str, Any]: ... @abstractmethod async def get_workflow_ports( self, workflow_id: int - ) -> MutableSequence[MutableMapping[str, Any]]: - ... + ) -> MutableSequence[MutableMapping[str, Any]]: ... @abstractmethod async def get_workflow_steps( self, workflow_id: int - ) -> MutableSequence[MutableMapping[str, Any]]: - ... + ) -> MutableSequence[MutableMapping[str, Any]]: ... @abstractmethod async def get_workflows_by_name( self, workflow_name: str, last_only: bool = False - ) -> MutableSequence[MutableMapping[str, Any]]: - ... + ) -> MutableSequence[MutableMapping[str, Any]]: ... @abstractmethod async def get_workflows_list( self, name: str | None - ) -> MutableSequence[MutableMapping[str, Any]]: - ... + ) -> MutableSequence[MutableMapping[str, Any]]: ... @abstractmethod async def update_deployment( self, deployment_id: int, updates: MutableMapping[str, Any] - ) -> int: - ... + ) -> int: ... @abstractmethod async def update_execution( self, execution_id: int, updates: MutableMapping[str, Any] - ) -> int: - ... + ) -> int: ... @abstractmethod async def update_filter( self, filter_id: int, updates: MutableMapping[str, Any] - ) -> int: - ... + ) -> int: ... @abstractmethod - async def update_port(self, port_id: int, updates: MutableMapping[str, Any]) -> int: - ... + async def update_port( + self, port_id: int, updates: MutableMapping[str, Any] + ) -> int: ... @abstractmethod - async def update_step(self, step_id: int, updates: MutableMapping[str, Any]) -> int: - ... + async def update_step( + self, step_id: int, updates: MutableMapping[str, Any] + ) -> int: ... @abstractmethod async def update_target( self, target_id: str, updates: MutableMapping[str, Any] - ) -> int: - ... + ) -> int: ... @abstractmethod async def update_workflow( self, workflow_id: int, updates: MutableMapping[str, Any] - ) -> int: - ... + ) -> int: ... diff --git a/streamflow/core/provenance.py b/streamflow/core/provenance.py index 3f069435..4cab8749 100644 --- a/streamflow/core/provenance.py +++ b/streamflow/core/provenance.py @@ -29,5 +29,4 @@ async def create_archive( config: str | None, additional_files: MutableSequence[MutableMapping[str, str]] | None, additional_properties: MutableSequence[MutableMapping[str, str]] | None, - ): - ... + ): ... diff --git a/streamflow/core/recovery.py b/streamflow/core/recovery.py index 72af4649..f0f8ff0e 100644 --- a/streamflow/core/recovery.py +++ b/streamflow/core/recovery.py @@ -17,12 +17,10 @@ def __init__(self, context: StreamFlowContext): self.context: StreamFlowContext = context @abstractmethod - async def close(self): - ... + async def close(self): ... @abstractmethod - def register(self, data_location: DataLocation) -> None: - ... + def register(self, data_location: DataLocation) -> None: ... class FailureManager(SchemaEntity): @@ -30,20 +28,17 @@ def __init__(self, context: StreamFlowContext): self.context: StreamFlowContext = context @abstractmethod - async def close(self): - ... + async def close(self): ... @abstractmethod async def handle_exception( self, job: Job, step: Step, exception: BaseException - ) -> CommandOutput: - ... + ) -> CommandOutput: ... @abstractmethod async def handle_failure( self, job: Job, step: Step, command_output: CommandOutput - ) -> CommandOutput: - ... + ) -> CommandOutput: ... class ReplayRequest: diff --git a/streamflow/core/scheduling.py b/streamflow/core/scheduling.py index 60aded54..70fdc7c8 100644 --- a/streamflow/core/scheduling.py +++ b/streamflow/core/scheduling.py @@ -90,18 +90,15 @@ async def _load( context: StreamFlowContext, row: MutableMapping[str, Any], loading_context: DatabaseLoadingContext, - ): - ... + ): ... @abstractmethod async def _save_additional_params( self, context: StreamFlowContext - ) -> MutableMapping[str, Any]: - ... + ) -> MutableMapping[str, Any]: ... @abstractmethod - def eval(self, inputs: MutableMapping[str, Token]) -> Hardware: - ... + def eval(self, inputs: MutableMapping[str, Token]) -> Hardware: ... @classmethod async def load( @@ -196,8 +193,7 @@ async def get_location( available_locations: MutableMapping[str, AvailableLocation], jobs: MutableMapping[str, JobAllocation], locations: MutableMapping[str, MutableMapping[str, LocationAllocation]], - ) -> Location | None: - ... + ) -> Location | None: ... class Scheduler(SchemaEntity): @@ -209,8 +205,7 @@ def __init__(self, context: StreamFlowContext): ] = {} @abstractmethod - async def close(self) -> None: - ... + async def close(self) -> None: ... def get_allocation(self, job_name: str) -> JobAllocation | None: return self.job_allocations.get(job_name) @@ -245,11 +240,9 @@ def get_service(self, job_name: str) -> str | None: return allocation.target.service if allocation else None @abstractmethod - async def notify_status(self, job_name: str, status: Status) -> None: - ... + async def notify_status(self, job_name: str, status: Status) -> None: ... @abstractmethod async def schedule( self, job: Job, binding_config: BindingConfig, hardware_requirement: Hardware - ) -> None: - ... + ) -> None: ... diff --git a/streamflow/core/workflow.py b/streamflow/core/workflow.py index 3d97fbbc..84a934e4 100644 --- a/streamflow/core/workflow.py +++ b/streamflow/core/workflow.py @@ -28,8 +28,7 @@ def __init__(self, step: Step): self.step: Step = step @abstractmethod - async def execute(self, job: Job) -> CommandOutput: - ... + async def execute(self, job: Job) -> CommandOutput: ... @classmethod async def load( @@ -106,9 +105,11 @@ async def _load( return cls( name=row["name"], workflow=await loading_context.load_workflow(context, row["workflow"]), - target=(await loading_context.load_target(context, row["workflow"])) - if row["target"] - else None, + target=( + (await loading_context.load_target(context, row["workflow"])) + if row["target"] + else None + ), ) async def _save_additional_params(self, context: StreamFlowContext): @@ -138,8 +139,7 @@ async def process( job: Job, command_output: CommandOutput, connector: Connector | None = None, - ) -> Token | None: - ... + ) -> Token | None: ... async def save(self, context: StreamFlowContext): return { @@ -153,8 +153,7 @@ def __init__(self, workflow: Workflow): self.workflow: Workflow = workflow @abstractmethod - async def run(self) -> MutableMapping[str, Any]: - ... + async def run(self) -> MutableMapping[str, Any]: ... class Job: @@ -459,8 +458,7 @@ async def load( return step @abstractmethod - async def run(self): - ... + async def run(self): ... async def save(self, context: StreamFlowContext) -> None: async with self.persistence_lock: @@ -498,8 +496,7 @@ async def save(self, context: StreamFlowContext) -> None: await asyncio.gather(*save_tasks) @abstractmethod - async def terminate(self, status: Status): - ... + async def terminate(self, status: Status): ... class Token(PersistableEntity): @@ -604,8 +601,9 @@ async def load( return await type._load(context, row["params"], loading_context) @abstractmethod - async def process(self, inputs: MutableMapping[str, Token], token: Token) -> Token: - ... + async def process( + self, inputs: MutableMapping[str, Token], token: Token + ) -> Token: ... async def save(self, context: StreamFlowContext): return { diff --git a/streamflow/cwl/command.py b/streamflow/cwl/command.py index 4f37d3d2..d3c3fc63 100644 --- a/streamflow/cwl/command.py +++ b/streamflow/cwl/command.py @@ -446,9 +446,9 @@ async def _prepare_work_dir( await utils.write_remote_file( context=self.step.workflow.context, job=job, - content=listing["contents"] - if "contents" in listing - else "", + content=( + listing["contents"] if "contents" in listing else "" + ), path=dest_path, ) # If `listing` is present, recursively process folder contents diff --git a/streamflow/cwl/processor.py b/streamflow/cwl/processor.py index 91efea2c..fc78edfd 100644 --- a/streamflow/cwl/processor.py +++ b/streamflow/cwl/processor.py @@ -166,9 +166,11 @@ async def _load( ), # todo: fix multiple instances full_js=row["full_js"], load_contents=row["load_contents"], - load_listing=LoadListing(row["load_listing"]) - if row["load_listing"] is not None - else None, + load_listing=( + LoadListing(row["load_listing"]) + if row["load_listing"] is not None + else None + ), only_propagate_secondary_files=row["only_propagate_secondary_files"], optional=row["optional"], secondary_files=[ @@ -394,9 +396,11 @@ async def _load( return cls( name=row["name"], workflow=await loading_context.load_workflow(context, row["workflow"]), - target=(await loading_context.load_target(context, row["workflow"])) - if row["target"] - else None, + target=( + (await loading_context.load_target(context, row["workflow"])) + if row["target"] + else None + ), token_type=row["token_type"], enum_symbols=row["enum_symbols"], expression_lib=row["expression_lib"], @@ -404,9 +408,11 @@ async def _load( full_js=row["full_js"], glob=row["glob"], load_contents=row["load_contents"], - load_listing=LoadListing(row["load_listing"]) - if row["load_listing"] is not None - else None, + load_listing=( + LoadListing(row["load_listing"]) + if row["load_listing"] is not None + else None + ), optional=row["optional"], output_eval=row["output_eval"], secondary_files=[ @@ -585,9 +591,7 @@ async def _process_command_output( return ( token_list if len(token_list) > 1 - else token_list[0] - if len(token_list) == 1 - else None + else token_list[0] if len(token_list) == 1 else None ) # Otherwise, fill context['self'] with glob data and proceed else: @@ -797,9 +801,11 @@ async def _load( processor=await CommandOutputProcessor.load( context, row["processor"], loading_context ), - target=(await loading_context.load_target(context, row["workflow"])) - if row["target"] - else None, + target=( + (await loading_context.load_target(context, row["workflow"])) + if row["target"] + else None + ), ) async def process( @@ -968,9 +974,11 @@ async def _load( return cls( name=row["name"], workflow=await loading_context.load_workflow(context, row["workflow"]), - target=(await loading_context.load_target(context, row["workflow"])) - if row["target"] - else None, + target=( + (await loading_context.load_target(context, row["workflow"])) + if row["target"] + else None + ), processors={ k: v for k, v in zip( diff --git a/streamflow/cwl/requirement/docker/translator.py b/streamflow/cwl/requirement/docker/translator.py index 1e90cd31..8c17b500 100644 --- a/streamflow/cwl/requirement/docker/translator.py +++ b/streamflow/cwl/requirement/docker/translator.py @@ -32,5 +32,4 @@ def get_target( output_directory: str | None, network_access: bool, target: Target, - ) -> Target: - ... + ) -> Target: ... diff --git a/streamflow/cwl/runner.py b/streamflow/cwl/runner.py index b6c964c2..1b5414ac 100644 --- a/streamflow/cwl/runner.py +++ b/streamflow/cwl/runner.py @@ -79,9 +79,9 @@ async def _async_main(args: argparse.Namespace): {"file": os.path.abspath(args.processfile)} ) if args.jobfile: - streamflow_config["workflows"][workflow_name]["config"][ - "settings" - ] = os.path.abspath(args.jobfile) + streamflow_config["workflows"][workflow_name]["config"]["settings"] = ( + os.path.abspath(args.jobfile) + ) validator.validate(streamflow_config) streamflow_config["path"] = ( args.streamflow_file if args.streamflow_file is not None else os.getcwd() diff --git a/streamflow/cwl/translator.py b/streamflow/cwl/translator.py index 9b1c7408..3c25d246 100644 --- a/streamflow/cwl/translator.py +++ b/streamflow/cwl/translator.py @@ -758,9 +758,7 @@ def _create_token_processor( port_type = ( "long" if port_type == "int" - else "double" - if port_type == "float" - else port_type + else "double" if port_type == "float" else port_type ) return CWLTokenProcessor( name=port_name, @@ -825,9 +823,7 @@ def _get_command_token( token_type = ( "long" if token_type == "int" # nosec - else "double" - if token_type == "float" # nosec - else token_type + else "double" if token_type == "float" else token_type # nosec ) if isinstance(binding, MutableMapping): item_separator = binding.get("itemSeparator", None) @@ -1064,9 +1060,11 @@ def _get_secondary_files( return [ SecondaryFile( pattern=sf["pattern"], - required=sf.get("required") - if sf.get("required") is not None - else default_required, + required=( + sf.get("required") + if sf.get("required") is not None + else default_required + ), ) for sf in cwl_element ] @@ -1074,9 +1072,11 @@ def _get_secondary_files( return [ SecondaryFile( pattern=cwl_element["pattern"], - required=cwl_element.get("required") - if cwl_element.get("required") is not None - else default_required, + required=( + cwl_element.get("required") + if cwl_element.get("required") is not None + else default_required + ), ) ] else: @@ -2217,9 +2217,9 @@ def _translate_workflow_step( cls=GatherStep, name=global_name + "-gather", size_port=size_port, - depth=1 - if scatter_method == "dotproduct" - else len(scatter_inputs), + depth=( + 1 if scatter_method == "dotproduct" else len(scatter_inputs) + ), ) internal_output_ports[global_name] = workflow.create_port() gather_step.add_input_port( @@ -2280,9 +2280,11 @@ def _translate_workflow_step( ) # Create loop output step loop_output_step = workflow.create_step( - cls=CWLLoopOutputLastStep - if output_method == "last" - else CWLLoopOutputAllStep, + cls=( + CWLLoopOutputLastStep + if output_method == "last" + else CWLLoopOutputAllStep + ), name=global_name + "-loop-output", ) loop_output_step.add_input_port( @@ -2468,9 +2470,11 @@ def _translate_workflow_step_input( expression_lib=expression_lib, ) input_dependencies[global_name] = set.union( - {global_name} - if "source" in element_input or "default" in element_input - else set(), + ( + {global_name} + if "source" in element_input or "default" in element_input + else set() + ), {posixpath.join(step_name, d) for d in local_deps}, ) or {global_name} # If `source` entry is present, process output dependencies @@ -2695,9 +2699,9 @@ def translate(self) -> Workflow: ) transfer_step.add_output_port(port_name, workflow.create_port()) # Add the output port of the TransferStep to the workflow output ports - workflow.output_ports[ - port_name - ] = transfer_step.get_output_port().name + workflow.output_ports[port_name] = ( + transfer_step.get_output_port().name + ) # Return the final workflow object return workflow diff --git a/streamflow/cwl/utils.py b/streamflow/cwl/utils.py index 782c2cfa..9662f99f 100644 --- a/streamflow/cwl/utils.py +++ b/streamflow/cwl/utils.py @@ -413,9 +413,11 @@ def eval_expression( return cwl_utils.expression.interpolate( expression, context, - jslib=cwl_utils.expression.jshead(expression_lib or [], context) - if full_js - else "", + jslib=( + cwl_utils.expression.jshead(expression_lib or [], context) + if full_js + else "" + ), fullJS=full_js, strip_whitespace=strip_whitespace, timeout=timeout, @@ -786,9 +788,9 @@ async def process_secondary_files( sf_specs.append(secondary_file) for sf_value, sf_spec in zip(await asyncio.gather(*sf_tasks), sf_specs): if sf_value is not None: - sf_map[ - get_path_from_token(cast(MutableMapping[str, Any], sf_value)) - ] = sf_value + sf_map[get_path_from_token(cast(MutableMapping[str, Any], sf_value))] = ( + sf_value + ) else: required = eval_expression( expression=sf_spec.required, @@ -807,7 +809,7 @@ async def register_data( connector: Connector, locations: MutableSequence[Location], base_path: str | None, - token_value: (MutableSequence[MutableMapping[str, Any]] | MutableMapping[str, Any]), + token_value: MutableSequence[MutableMapping[str, Any]] | MutableMapping[str, Any], ): # If `token_value` is a list, process every item independently if isinstance(token_value, MutableSequence): @@ -878,9 +880,11 @@ def resolve_dependencies( cwl_utils.expression.interpolate( expression, context, - jslib=cwl_utils.expression.jshead(expression_lib or [], context) - if full_js - else "", + jslib=( + cwl_utils.expression.jshead(expression_lib or [], context) + if full_js + else "" + ), fullJS=full_js, strip_whitespace=strip_whitespace, timeout=timeout, diff --git a/streamflow/data/manager.py b/streamflow/data/manager.py index cb18e039..6872bbbf 100644 --- a/streamflow/data/manager.py +++ b/streamflow/data/manager.py @@ -409,9 +409,11 @@ def put( else: location = DataLocation( path=node_path, - relpath=relpath - if relpath and node_path.endswith(relpath) - else path_processor.basename(node_path), + relpath=( + relpath + if relpath and node_path.endswith(relpath) + else path_processor.basename(node_path) + ), deployment=data_location.deployment, service=data_location.service, data_type=DataType.PRIMARY, diff --git a/streamflow/data/remotepath.py b/streamflow/data/remotepath.py index 90f1b2e9..04234560 100644 --- a/streamflow/data/remotepath.py +++ b/streamflow/data/remotepath.py @@ -279,11 +279,13 @@ async def listdir( else: command = 'find -L "{path}" -mindepth 1 -maxdepth 1 {type}'.format( path=path, - type="-type {type}".format( - type="d" if file_type == FileType.DIRECTORY else "f" - ) - if file_type is not None - else "", + type=( + "-type {type}".format( + type="d" if file_type == FileType.DIRECTORY else "f" + ) + if file_type is not None + else "" + ), ).split() content, status = await connector.run( location=location, command=command, capture_output=True diff --git a/streamflow/deployment/connector/base.py b/streamflow/deployment/connector/base.py index 732146c7..ba64e3d6 100644 --- a/streamflow/deployment/connector/base.py +++ b/streamflow/deployment/connector/base.py @@ -173,8 +173,8 @@ async def _copy_remote_to_remote( ) ) # Open source StreamReader - async with ( - await source_connector.get_stream_reader(source_location, src) + async with await source_connector.get_stream_reader( + source_location, src ) as reader: # Open a target StreamWriter for each location writers = await asyncio.gather( @@ -220,8 +220,7 @@ async def _copy_remote_to_remote( @abstractmethod def _get_run_command( self, command: str, location: Location, interactive: bool = False - ) -> str: - ... + ) -> str: ... def _get_shell(self) -> str: return "sh" diff --git a/streamflow/deployment/connector/container.py b/streamflow/deployment/connector/container.py index 1ee6460c..c0d7dee5 100644 --- a/streamflow/deployment/connector/container.py +++ b/streamflow/deployment/connector/container.py @@ -272,18 +272,15 @@ async def _get_effective_locations( @abstractmethod async def _get_bind_mounts( self, location: Location - ) -> MutableSequence[MutableMapping[str, str]]: - ... + ) -> MutableSequence[MutableMapping[str, str]]: ... @abstractmethod - async def _get_location(self, location_name: str) -> AvailableLocation | None: - ... + async def _get_location(self, location_name: str) -> AvailableLocation | None: ... @abstractmethod async def _get_volumes( self, location: Location - ) -> MutableSequence[MutableMapping[str, str]]: - ... + ) -> MutableSequence[MutableMapping[str, str]]: ... async def _is_bind_transfer( self, location: Location, host_path: str, instance_path: str @@ -1079,9 +1076,9 @@ async def _get_volumes( # Exclude `overlay` and `tmpfs` mounts return [ { - "Source": line.split()[2] - if line.split()[1] == "/" - else line.split()[1], + "Source": ( + line.split()[2] if line.split()[1] == "/" else line.split()[1] + ), "Destination": line.split()[2], } for line in bind_mounts.splitlines() diff --git a/streamflow/deployment/connector/kubernetes.py b/streamflow/deployment/connector/kubernetes.py index 6fe12be0..140bf24c 100644 --- a/streamflow/deployment/connector/kubernetes.py +++ b/streamflow/deployment/connector/kubernetes.py @@ -270,7 +270,7 @@ async def _copy_local_to_remote_single( async def _copy_remote_to_local( self, src: str, dst: str, location: Location, read_only: bool = False ): - async with (await self.get_stream_reader(location, src)) as reader: + async with await self.get_stream_reader(location, src) as reader: try: async with aiotarstream.open( stream=reader, @@ -311,8 +311,8 @@ async def _copy_remote_to_remote( dst=dst, ) ) - async with ( - await source_connector.get_stream_reader(source_location, src) + async with await source_connector.get_stream_reader( + source_location, src ) as reader: # Open a target response for each location writers = [ @@ -425,8 +425,7 @@ def _get_run_command( ) @abstractmethod - async def _get_running_pods(self) -> V1PodList: - ... + async def _get_running_pods(self) -> V1PodList: ... async def get_stream_reader( self, location: Location, src: str diff --git a/streamflow/deployment/connector/local.py b/streamflow/deployment/connector/local.py index 49ab14d8..fa8fd0a3 100644 --- a/streamflow/deployment/connector/local.py +++ b/streamflow/deployment/connector/local.py @@ -94,15 +94,21 @@ async def get_available_locations( hardware=Hardware( cores=self.cores, memory=self.memory, - input_directory=_get_disk_usage(Path(input_directory)) - if input_directory - else float("inf"), - output_directory=_get_disk_usage(Path(output_directory)) - if output_directory - else float("inf"), - tmp_directory=_get_disk_usage(Path(tmp_directory)) - if tmp_directory - else float("inf"), + input_directory=( + _get_disk_usage(Path(input_directory)) + if input_directory + else float("inf") + ), + output_directory=( + _get_disk_usage(Path(output_directory)) + if output_directory + else float("inf") + ), + tmp_directory=( + _get_disk_usage(Path(tmp_directory)) + if tmp_directory + else float("inf") + ), ), ) } diff --git a/streamflow/deployment/connector/queue_manager.py b/streamflow/deployment/connector/queue_manager.py index 98f31849..e13b7ce1 100644 --- a/streamflow/deployment/connector/queue_manager.py +++ b/streamflow/deployment/connector/queue_manager.py @@ -422,20 +422,16 @@ async def _get_location(self): return list(locations.values())[0] @abstractmethod - async def _get_output(self, job_id: str, location: Location) -> str: - ... + async def _get_output(self, job_id: str, location: Location) -> str: ... @abstractmethod - async def _get_returncode(self, job_id: str, location: Location) -> int: - ... + async def _get_returncode(self, job_id: str, location: Location) -> int: ... @abstractmethod - async def _get_running_jobs(self, location: Location) -> bool: - ... + async def _get_running_jobs(self, location: Location) -> bool: ... @abstractmethod - async def _remove_jobs(self, location: Location) -> None: - ... + async def _remove_jobs(self, location: Location) -> None: ... @abstractmethod async def _run_batch_command( @@ -448,13 +444,11 @@ async def _run_batch_command( stdout: int | str = asyncio.subprocess.STDOUT, stderr: int | str = asyncio.subprocess.STDOUT, timeout: int | None = None, - ) -> str: - ... + ) -> str: ... @property @abstractmethod - def _service_class(self) -> type[QueueManagerService]: - ... + def _service_class(self) -> type[QueueManagerService]: ... async def get_available_locations( self, @@ -543,9 +537,11 @@ async def run( await asyncio.sleep(self.pollingInterval) self.scheduledJobs.remove(job_id) return ( - await self._get_output(job_id, location) - if stdout == asyncio.subprocess.STDOUT - else None, + ( + await self._get_output(job_id, location) + if stdout == asyncio.subprocess.STDOUT + else None + ), await self._get_returncode(job_id, location), ) else: diff --git a/streamflow/deployment/connector/ssh.py b/streamflow/deployment/connector/ssh.py index 4bb0dd2b..5ad2ae18 100644 --- a/streamflow/deployment/connector/ssh.py +++ b/streamflow/deployment/connector/ssh.py @@ -345,9 +345,9 @@ def __init__( self.sshKey: str | None = sshKey self.sshKeyPassphraseFile: str | None = sshKeyPassphraseFile self.ssh_context_factories: MutableMapping[str, SSHContextFactory] = {} - self.data_transfer_context_factories: MutableMapping[ - str, SSHContextFactory - ] = {} + self.data_transfer_context_factories: MutableMapping[str, SSHContextFactory] = ( + {} + ) self.username: str = username self.tunnel: SSHConfig | None = self._get_config(tunnel) self.dataTransferConfig: SSHConfig | None = self._get_config( @@ -440,8 +440,8 @@ async def _copy_remote_to_remote( locations[i : i + rounds] for i in range(0, len(locations), rounds) ] for location_group in location_groups: - async with ( - await source_connector.get_stream_reader(source_location, src) + async with await source_connector.get_stream_reader( + source_location, src ) as reader: async with contextlib.AsyncExitStack() as exit_stack: # Open a target StreamWriter for each location @@ -511,13 +511,13 @@ def _get_config(self, node: str | MutableMapping[str, Any]): return SSHConfig( hostname=node["hostname"], username=node["username"] if "username" in node else self.username, - check_host_key=node["checkHostKey"] - if "checkHostKey" in node - else self.checkHostKey, + check_host_key=( + node["checkHostKey"] if "checkHostKey" in node else self.checkHostKey + ), client_keys=[ssh_key] if ssh_key is not None else [], - password_file=node["passwordFile"] - if "passwordFile" in node - else self.passwordFile, + password_file=( + node["passwordFile"] if "passwordFile" in node else self.passwordFile + ), ssh_key_passphrase_file=( node["sshKeyPassphraseFile"] if "sshKeyPassphraseFile" in node @@ -526,9 +526,7 @@ def _get_config(self, node: str | MutableMapping[str, Any]): tunnel=( self._get_config(node["tunnel"]) if "tunnel" in node - else self.tunnel - if hasattr(self, "tunnel") - else None + else self.tunnel if hasattr(self, "tunnel") else None ), ) diff --git a/streamflow/ext/plugin.py b/streamflow/ext/plugin.py index cd33e85e..6a523c41 100644 --- a/streamflow/ext/plugin.py +++ b/streamflow/ext/plugin.py @@ -57,8 +57,7 @@ def _register(self, name: str, cls: type, extension_point: str): extension_points[extension_point][name] = cls @abstractmethod - def register(self) -> None: - ... + def register(self) -> None: ... def register_binding_filter(self, name: str, cls: type[BindingFilter]): self._register(name, cls, "binding_filter") diff --git a/streamflow/provenance/run_crate.py b/streamflow/provenance/run_crate.py index 789e7973..da131e91 100644 --- a/streamflow/provenance/run_crate.py +++ b/streamflow/provenance/run_crate.py @@ -664,8 +664,7 @@ async def add_file(self, file: MutableMapping[str, str]) -> None: self.graph[dst][k] = v @abstractmethod - async def add_initial_inputs(self, wf_id: int, workflow: Workflow) -> None: - ... + async def add_initial_inputs(self, wf_id: int, workflow: Workflow) -> None: ... async def add_property(self, key: str, value: str): current_obj = self.graph @@ -893,14 +892,12 @@ async def create_archive( print(f"Successfully created run_crate archive at {path}") @abstractmethod - async def get_main_entity(self) -> MutableMapping[str, Any]: - ... + async def get_main_entity(self) -> MutableMapping[str, Any]: ... @abstractmethod async def get_property_value( self, name: str, token: Token - ) -> MutableMapping[str, Any] | None: - ... + ) -> MutableMapping[str, Any] | None: ... def register_input_port( self, streamflow_name: str, run_crate_param: MutableMapping[str, Any] diff --git a/streamflow/recovery/failure_manager.py b/streamflow/recovery/failure_manager.py index 9cf77cf7..98579032 100644 --- a/streamflow/recovery/failure_manager.py +++ b/streamflow/recovery/failure_manager.py @@ -257,8 +257,7 @@ async def replay_job(self, replay_request: ReplayRequest) -> ReplayResponse: class DummyFailureManager(FailureManager): - async def close(self): - ... + async def close(self): ... @classmethod def get_schema(cls) -> str: diff --git a/streamflow/scheduling/scheduler.py b/streamflow/scheduling/scheduler.py index 4704c540..dfc1f2e6 100644 --- a/streamflow/scheduling/scheduler.py +++ b/streamflow/scheduling/scheduler.py @@ -212,9 +212,11 @@ async def _process_target( logger.debug( "Retrieving available locations for job {} on {}.".format( job_context.job.name, - posixpath.join(deployment, target.service) - if target.service - else deployment, + ( + posixpath.join(deployment, target.service) + if target.service + else deployment + ), ) ) available_locations = dict( @@ -240,9 +242,11 @@ async def _process_target( logger.debug( "Available locations for job {} on {} are {}.".format( job_context.job.name, - posixpath.join(deployment, target.service) - if target.service - else deployment, + ( + posixpath.join(deployment, target.service) + if target.service + else deployment + ), list(valid_locations.keys()), ) ) @@ -315,9 +319,11 @@ async def _process_target( logger.debug( "No location available for job {} on deployment {}.".format( job_context.job.name, - posixpath.join(deployment, target.service) - if target.service - else deployment, + ( + posixpath.join(deployment, target.service) + if target.service + else deployment + ), ) ) try: diff --git a/streamflow/workflow/step.py b/streamflow/workflow/step.py index c63add41..b55a8ad4 100644 --- a/streamflow/workflow/step.py +++ b/streamflow/workflow/step.py @@ -205,8 +205,7 @@ def get_items(self, recursive: bool = False) -> set[str]: @abstractmethod async def combine( self, port_name: str, token: Token - ) -> AsyncIterable[MutableMapping[str, Token]]: - ... + ) -> AsyncIterable[MutableMapping[str, Token]]: ... @classmethod async def load( @@ -367,16 +366,13 @@ def __init__(self, name: str, workflow: Workflow): super().__init__(name, workflow) @abstractmethod - async def _eval(self, inputs: MutableMapping[str, Token]): - ... + async def _eval(self, inputs: MutableMapping[str, Token]): ... @abstractmethod - async def _on_true(self, inputs: MutableMapping[str, Token]): - ... + async def _on_true(self, inputs: MutableMapping[str, Token]): ... @abstractmethod - async def _on_false(self, inputs: MutableMapping[str, Token]): - ... + async def _on_false(self, inputs: MutableMapping[str, Token]): ... async def run(self): try: @@ -740,9 +736,9 @@ def add_output_port( self, name: str, port: Port, output_processor: CommandOutputProcessor = None ) -> None: super().add_output_port(name, port) - self.output_processors[ - name - ] = output_processor or DefaultCommandOutputProcessor(name, self.workflow) + self.output_processors[name] = ( + output_processor or DefaultCommandOutputProcessor(name, self.workflow) + ) async def run(self) -> None: jobs = [] @@ -1005,8 +1001,7 @@ def add_output_port(self, name: str, port: Port) -> None: ) @abstractmethod - async def process_input(self, job: Job, token_value: Any) -> Token: - ... + async def process_input(self, job: Job, token_value: Any) -> Token: ... async def run(self): input_ports = { @@ -1166,8 +1161,7 @@ def __init__(self, name: str, workflow: Workflow): self.termination_map: MutableMapping[str, bool] = {} @abstractmethod - async def _process_output(self, tag: str) -> Token: - ... + async def _process_output(self, tag: str) -> Token: ... def add_input_port(self, name: str, port: Port) -> None: if not self.input_ports or name in self.input_ports: @@ -1345,9 +1339,11 @@ async def _propagate_job( location=location, path=directory, relpath=directory, - data_type=DataType.PRIMARY - if realpath == directory - else DataType.SYMBOLIC_LINK, + data_type=( + DataType.PRIMARY + if realpath == directory + else DataType.SYMBOLIC_LINK + ), ) # Propagate job token_inputs = [] @@ -1471,9 +1467,11 @@ async def run(self): await self.workflow.context.scheduler.schedule( job, self.binding_config, - self.hardware_requirement.eval({}) - if self.hardware_requirement - else None, + ( + self.hardware_requirement.eval({}) + if self.hardware_requirement + else None + ), ) locations = self.workflow.context.scheduler.get_locations(job.name) await self._propagate_job( @@ -1683,8 +1681,7 @@ async def run(self): await self.terminate(status) @abstractmethod - async def transfer(self, job: Job, token: Token) -> Token: - ... + async def transfer(self, job: Job, token: Token) -> Token: ... class Transformer(BaseStep, ABC): @@ -1764,5 +1761,4 @@ async def run(self): @abstractmethod async def transform( self, inputs: MutableMapping[str, Token] - ) -> MutableMapping[str, Token | MutableSequence[Token]]: - ... + ) -> MutableMapping[str, Token | MutableSequence[Token]]: ... diff --git a/streamflow/workflow/token.py b/streamflow/workflow/token.py index 19dda0ee..aa580af4 100644 --- a/streamflow/workflow/token.py +++ b/streamflow/workflow/token.py @@ -35,8 +35,7 @@ async def _load( class FileToken(Token, ABC): @abstractmethod - async def get_paths(self, context: StreamFlowContext) -> MutableSequence[str]: - ... + async def get_paths(self, context: StreamFlowContext) -> MutableSequence[str]: ... class JobToken(Token):