Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented fault tolerance based on Provenance module #169

Open
wants to merge 227 commits into
base: master
Choose a base branch
from

Conversation

LanderOtto
Copy link
Collaborator

This PR changes the current fault tolerance module. When a step fails, the new fault tolerance module builds a sub-workflow composed of steps from the original workflow, and these steps are added to the sub-workflow if their output tokens are lost. The steps involved and the dependency are obtained using the Provenance module.

Copy link

@github-advanced-security github-advanced-security bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CodeQL found more than 10 potential problems in the proposed changes. Check the Files changed tab for more details.

streamflow/recovery/recovery.py Fixed Show fixed Hide fixed
self.token_instances.pop(old_token_id)

# replace
pass

Check warning

Code scanning / CodeQL

Unnecessary pass Warning

Unnecessary 'pass' statement.
streamflow/recovery/rollback_recovery.py Fixed Show fixed Hide fixed
streamflow/recovery/rollback_recovery.py Fixed Show fixed Hide fixed
streamflow/recovery/recovery.py Fixed Show fixed Hide fixed
Comment on lines +123 to +126
# if DirectGraph.LAST_GRAPH_FLAG in self.graph[src] and len(self.graph[src]) > 1:
# self.graph[src].remove(DirectGraph.LAST_GRAPH_FLAG)
# logger.debug(
# f"DG: Remove {DirectGraph.LAST_GRAPH_FLAG} from {src} next elems"

Check notice

Code scanning / CodeQL

Commented-out code Note

This comment appears to contain commented-out code.
Comment on lines +229 to +232
# if not self.is_present(t_id):
# logger.debug(f"Remove orphan token {t_id}")
# self.token_available.pop(t_id, None)
# self.token_instances.pop(t_id, None)

Check notice

Code scanning / CodeQL

Commented-out code Note

This comment appears to contain commented-out code.
Comment on lines +294 to +295
# if token.persistent_id == old_token_id:
# return

Check notice

Code scanning / CodeQL

Commented-out code Note

This comment appears to contain commented-out code.
Comment on lines +706 to +707
# if not isinstance(t, JobToken):
# self.add(t, None)

Check notice

Code scanning / CodeQL

Commented-out code Note

This comment appears to contain commented-out code.
Comment on lines +790 to +794
# for t_id, next_t_ids in self.dag_tokens.items():
# for next_t_id in next_t_ids:
# rdwp.new_add(
# self.info_tokens.get(t_id, None),
# self.info_tokens.get(next_t_id, None),

Check notice

Code scanning / CodeQL

Commented-out code Note

This comment appears to contain commented-out code.
Copy link

codecov bot commented May 4, 2024

Codecov Report

Attention: Patch coverage is 18.97356% with 521 lines in your changes are missing coverage. Please review.

Project coverage is 62.15%. Comparing base (f625559) to head (cedf30e).

❗ Current head cedf30e differs from pull request most recent head 4f3f074. Consider uploading reports for the commit 4f3f074 to get more accurate results

Files Patch % Lines
streamflow/recovery/failure_manager.py 13.00% 300 Missing and 1 partial ⚠️
streamflow/token_printer.py 13.37% 149 Missing ⚠️
streamflow/persistence/sqlite.py 23.33% 45 Missing and 1 partial ⚠️
streamflow/workflow/step.py 52.38% 9 Missing and 1 partial ⚠️
streamflow/data/data_manager.py 25.00% 6 Missing ⚠️
streamflow/core/workflow.py 80.00% 1 Missing and 3 partials ⚠️
streamflow/cwl/step.py 0.00% 2 Missing and 1 partial ⚠️
streamflow/cwl/token.py 50.00% 1 Missing ⚠️
streamflow/workflow/token.py 50.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master     #169      +/-   ##
==========================================
- Coverage   70.18%   62.15%   -8.03%     
==========================================
  Files          83       83              
  Lines       10591    10528      -63     
  Branches     2504     2522      +18     
==========================================
- Hits         7433     6544     -889     
- Misses       2702     3575     +873     
+ Partials      456      409      -47     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

workflow.ports[port_recovery.port.name] = port_recovery.port
if len(job_token_names) == 1:
logger.debug(f"New-workflow {workflow.name} will be empty.")
pass

Check warning

Code scanning / CodeQL

Unnecessary pass Warning

Unnecessary 'pass' statement.
f"\n\tis_avai: {info.is_available}\n\tport: {dict(info.port_row)}"
f"\n\tsteps: {[ s['name'] for s in info.step_rows] }"
)
pass

Check warning

Code scanning / CodeQL

Unnecessary pass Warning

Unnecessary 'pass' statement.
Comment on lines +33 to +35
# if len(token_list) != 2:
# raise FailureHandlingException(
# f"Step recovery {failed_step.name} did not generate the right number of tokens: {len(token_list)}"

Check notice

Code scanning / CodeQL

Commented-out code Note

This comment appears to contain commented-out code.
Comment on lines +38 to +41
# if not isinstance(token_list[1], TerminationToken):
# raise FailureHandlingException(
# f"Step recovery {failed_step.name} did not work well. "
# f"It moved two tokens instead of one: {[t.persistent_id for t in token_list]}"

Check notice

Code scanning / CodeQL

Commented-out code Note

This comment appears to contain commented-out code.
Comment on lines +121 to +125
# for port in failed_step.get_input_ports().values():
# if port.name not in new_workflow.ports.keys():
# raise FailureHandlingException(
# f"La input port {port.name} dello step fallito {failed_step.name} "
# f"non è presente nel new_workflow {new_workflow.name}"

Check notice

Code scanning / CodeQL

Commented-out code Note

This comment appears to contain commented-out code.
Comment on lines +795 to +800
# for next_t_id in self.dag_tokens.succ(job_token.persistent_id):
# if not isinstance(next_t_id, int):
# continue
# step_rows = self.info_tokens[next_t_id].step_rows
# if is_there_step_type(step_rows, (ExecuteStep,)):
# output_token_ids.add(next_t_id)

Check notice

Code scanning / CodeQL

Commented-out code Note

This comment appears to contain commented-out code.
Comment on lines +322 to +326
# if not data_locations:
# # it is possible that src='' following a symbolic link
# # edit. tenere solo per debug. Ho aggiunto un controllo a monte dentro remotepath.py
# raise WorkflowTransferException(
# f"No data locations available {src_path if src_path else 'None'}"

Check notice

Code scanning / CodeQL

Commented-out code Note

This comment appears to contain commented-out code.
@@ -7,6 +7,8 @@
from importlib_resources import files

from streamflow.core.data import DataLocation, DataManager, DataType
from streamflow.core.deployment import Connector
from streamflow.core.exception import WorkflowTransferException

Check notice

Code scanning / CodeQL

Unused import Note

Import of 'WorkflowTransferException' is not used.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

1 participant