-
Notifications
You must be signed in to change notification settings - Fork 14.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix scheduler logic to plan new dag runs by ignoring manual runs #34027
Conversation
# In such case, schedule next only if last_dag_run is finished and was an automated run. | ||
if last_dag_run and not ( | ||
last_dag_run.state in State.finished_dr_states | ||
and last_dag_run.run_type in [DagRunType.SCHEDULED, DagRunType.BACKFILL_JOB] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the actual bugfix: previously we didn't check run type so manual runs were messing with scheduling decisions. Compare it with
Line 2943 in 3ae6b4e
or_(DagRun.run_type == DagRunType.BACKFILL_JOB, DagRun.run_type == DagRunType.SCHEDULED), |
where exact same filtering is applied before eventually invoking calculate_dagrun_date_fields for filtered dag runs.
@@ -1465,11 +1478,9 @@ def _schedule_dag_run( | |||
return callback | |||
# TODO[HA]: Rename update_state -> schedule_dag_run, ?? something else? | |||
schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False) | |||
# Check if DAG not scheduled then skip interval calculation to same scheduler runtime | |||
if dag_run.state in State.finished_dr_states: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this part was pulled into _should_update_dag_next_dagruns
self, | ||
dag: DAG, | ||
dag_model: DagModel, | ||
last_dag_run: DagRun | None = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there are 3 places where this method is used:
- when dag run is finished successfully
- when dag run fails
- when new dag runs were created
The first 2 cases are similar and we look at last dag run to make this decision, but in the 3rd case there is no previous dag run so I made this parameter optional and skip checking it if not provided
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good but the renaming should be in a separate PR if it's actually needed
2b950a6
to
df068d5
Compare
Follow-up to discussion in apache#34027 Some variables used when invoking calculate_dagrun_date_fields where renamed to make it explicit that this method expected automated dag runs only - clarifying naming should make it less likely for maintainers to accidentally misuse this method again.
Ok I removed it and moved to separate PR #34049 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch
Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com>
) * Fix manual task triggering scheduled tasks Fixes #33949 * fix static checks * static checks * add unit test * static check * Undo renaming * Update airflow/jobs/scheduler_job_runner.py Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com> * use keyword-only arguments for last_dag_run and total_active_runs --------- Co-authored-by: daniel.dylag <danieldylag1990@gmail.com> Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com> (cherry picked from commit 20d8142)
Fixes #33949
Added filter to exclude manual runs from triggering calculate_dagrun_date_fields method which was causing extra scheduled runs.
Before this change, manual runs were triggering re-calculating next_dagrun_create_after which was supposed to be calculated only after an automated (SCHEDULED or BACKFILL) run is over in order to help scheduler schedule the next DAG run. There is no reason for manual runs to affect scheduled runs so they were excluded with an extra condition.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.