Skip to content

Commit

Permalink
Create more detailed error responses when submitting batch jobs (#2940)
Browse files Browse the repository at this point in the history
* Enables airflow operator level retry. (#2894)

* Update docker stuff for latest airflow 2.7.0

* Use AirflowException instead of AirflowFailException to allow for retries

Signed-off-by: Rich Scott <richscott@sent.com>

* Remove codecov workflows (#2902)

Signed-off-by: Rich Scott <richscott@sent.com>

* Upgrade Pulsar Client to v0.11 (#2896)

* update

* update pulsar client

* Fix bug causing server spinning

* Abstract out the retry until success logic for testing (#2901)

* Respond to review

---------

Co-authored-by: Chris Martin <chris@cmartinit.co.uk>
Co-authored-by: Daniel Rastelli <rastellidani@gmail.com>
Signed-off-by: Rich Scott <richscott@sent.com>

* Sync quickstart/index.md with gh-pages/quickstart.md (#2891)

Signed-off-by: Rich Scott <richscott@sent.com>

* Log Call Site (#2909)

* allow logger to report caller

* allow logger to report caller

* lint

---------

Co-authored-by: Chris Martin <chris@cmartinit.co.uk>
Signed-off-by: Rich Scott <richscott@sent.com>

* Add cleaner test output for mage with os/exec.Command (#2907)

Signed-off-by: Rich Scott <richscott@sent.com>

* feat: Update Semver from version 6.3.0 to 6.3.1 (#2686)

Co-authored-by: Adam McArthur <46480158+Sharpz7@users.noreply.github.com>
Signed-off-by: Rich Scott <richscott@sent.com>

* fix: upgrade @typescript-eslint/parser from 5.52.0 to 5.61.0 (#2743)

Snyk has created this PR to upgrade @typescript-eslint/parser from 5.52.0 to 5.61.0.

See this package in npm:

See this project in Snyk:
https://app.snyk.io/org/dave-gantenbein/project/5064983e-fa14-4803-8fc2-cfd6f1fa81b6?utm_source=github&utm_medium=referral&page=upgrade-pr

Co-authored-by: snyk-bot <snyk-bot@snyk.io>
Co-authored-by: Adam McArthur <46480158+Sharpz7@users.noreply.github.com>
Co-authored-by: Mohamed Abdelfatah <39927413+Mo-Fatah@users.noreply.github.com>
Signed-off-by: Rich Scott <richscott@sent.com>

* fix: upgrade @types/react from 16.14.32 to 16.14.43 (#2747)

Snyk has created this PR to upgrade @types/react from 16.14.32 to 16.14.43.

See this package in npm:

See this project in Snyk:
https://app.snyk.io/org/dave-gantenbein/project/5064983e-fa14-4803-8fc2-cfd6f1fa81b6?utm_source=github&utm_medium=referral&page=upgrade-pr

Co-authored-by: snyk-bot <snyk-bot@snyk.io>
Co-authored-by: Adam McArthur <46480158+Sharpz7@users.noreply.github.com>
Co-authored-by: Mohamed Abdelfatah <39927413+Mo-Fatah@users.noreply.github.com>
Signed-off-by: Rich Scott <richscott@sent.com>

* Bump github.com/go-openapi/jsonreference from 0.20.0 to 0.20.2 (#2316)

Bumps [github.com/go-openapi/jsonreference](https://github.com/go-openapi/jsonreference) from 0.20.0 to 0.20.2.
- [Release notes](https://github.com/go-openapi/jsonreference/releases)
- [Commits](go-openapi/jsonreference@v0.20.0...v0.20.2)

---
updated-dependencies:
- dependency-name: github.com/go-openapi/jsonreference
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Adam McArthur <46480158+Sharpz7@users.noreply.github.com>
Co-authored-by: Mohamed Abdelfatah <39927413+Mo-Fatah@users.noreply.github.com>
Signed-off-by: Rich Scott <richscott@sent.com>

* Order leased jobs by serial (#2912)

This will ensure the job leased first, gets send to the cluster first

Currently we just order by postgres default sorting - which often picks the most recently leased - causing the first lease jobs to get stuck
 - This only occurs when scheduling is faster than leasing

Signed-off-by: Rich Scott <richscott@sent.com>

* Bump webpack from 5.75.0 to 5.77.0 in /internal/lookout/ui (#2302)

Bumps [webpack](https://github.com/webpack/webpack) from 5.75.0 to 5.77.0.
- [Release notes](https://github.com/webpack/webpack/releases)
- [Commits](webpack/webpack@v5.75.0...v5.77.0)

---
updated-dependencies:
- dependency-name: webpack
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Adam McArthur <46480158+Sharpz7@users.noreply.github.com>
Co-authored-by: Mohamed Abdelfatah <39927413+Mo-Fatah@users.noreply.github.com>
Signed-off-by: Rich Scott <richscott@sent.com>

* Bump word-wrap from 1.2.3 to 1.2.5 in /internal/lookout/ui (#2806)

Bumps [word-wrap](https://github.com/jonschlinkert/word-wrap) from 1.2.3 to 1.2.5.
- [Release notes](https://github.com/jonschlinkert/word-wrap/releases)
- [Commits](jonschlinkert/word-wrap@1.2.3...1.2.5)

---
updated-dependencies:
- dependency-name: word-wrap
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Adam McArthur <46480158+Sharpz7@users.noreply.github.com>
Co-authored-by: Mohamed Abdelfatah <39927413+Mo-Fatah@users.noreply.github.com>
Signed-off-by: Rich Scott <richscott@sent.com>

* resolve flaky (#2914)

Co-authored-by: Adam McArthur <46480158+Sharpz7@users.noreply.github.com>
Signed-off-by: Rich Scott <richscott@sent.com>

* fix: upgrade @typescript-eslint/eslint-plugin from 5.52.0 to 5.61.0 (#2744)

Snyk has created this PR to upgrade @typescript-eslint/eslint-plugin from 5.52.0 to 5.61.0.

See this package in npm:

See this project in Snyk:
https://app.snyk.io/org/dave-gantenbein/project/5064983e-fa14-4803-8fc2-cfd6f1fa81b6?utm_source=github&utm_medium=referral&page=upgrade-pr

Co-authored-by: snyk-bot <snyk-bot@snyk.io>
Co-authored-by: Adam McArthur <46480158+Sharpz7@users.noreply.github.com>
Co-authored-by: Mohamed Abdelfatah <39927413+Mo-Fatah@users.noreply.github.com>
Signed-off-by: Rich Scott <richscott@sent.com>

* fix: upgrade react-router-dom from 6.9.0 to 6.14.1 (#2746)

Snyk has created this PR to upgrade react-router-dom from 6.9.0 to 6.14.1.

See this package in npm:

See this project in Snyk:
https://app.snyk.io/org/dave-gantenbein/project/5064983e-fa14-4803-8fc2-cfd6f1fa81b6?utm_source=github&utm_medium=referral&page=upgrade-pr

Co-authored-by: snyk-bot <snyk-bot@snyk.io>
Co-authored-by: Adam McArthur <46480158+Sharpz7@users.noreply.github.com>
Co-authored-by: Mohamed Abdelfatah <39927413+Mo-Fatah@users.noreply.github.com>
Signed-off-by: Rich Scott <richscott@sent.com>

* Bump semver from 6.3.0 to 6.3.1 in /internal/lookout/ui (#2661)

Bumps [semver](https://github.com/npm/node-semver) from 6.3.0 to 6.3.1.
- [Release notes](https://github.com/npm/node-semver/releases)
- [Changelog](https://github.com/npm/node-semver/blob/v6.3.1/CHANGELOG.md)
- [Commits](npm/node-semver@v6.3.0...v6.3.1)

---
updated-dependencies:
- dependency-name: semver
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Adam McArthur <46480158+Sharpz7@users.noreply.github.com>
Co-authored-by: Mohamed Abdelfatah <39927413+Mo-Fatah@users.noreply.github.com>
Signed-off-by: Rich Scott <richscott@sent.com>

* Run CodeQL once daily on a schedule (#2918)

Signed-off-by: Rich Scott <richscott@sent.com>

* Helm chart update: executor  (#2917)

* Helm chart update: executor

At the moment the helm chart for the executor doesn't include priorityClass even though one is created in the chart. This means that the executor deployment is unable to set the priorityClass.

Signed-off-by: Rich Scott <richscott@sent.com>

* Patch/dependencies (#2923)

* Bump github.com/go-openapi/strfmt from 0.21.3 to 0.21.7

Bumps [github.com/go-openapi/strfmt](https://github.com/go-openapi/strfmt) from 0.21.3 to 0.21.7.
- [Release notes](https://github.com/go-openapi/strfmt/releases)
- [Commits](go-openapi/strfmt@v0.21.3...v0.21.7)

---
updated-dependencies:
- dependency-name: github.com/go-openapi/strfmt
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>

* Bump github.com/go-openapi/runtime from 0.24.2 to 0.26.0

Bumps [github.com/go-openapi/runtime](https://github.com/go-openapi/runtime) from 0.24.2 to 0.26.0.
- [Release notes](https://github.com/go-openapi/runtime/releases)
- [Commits](go-openapi/runtime@v0.24.2...v0.26.0)

---
updated-dependencies:
- dependency-name: github.com/go-openapi/runtime
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

* Bump github.com/goreleaser/nfpm/v2 from 2.25.1 to 2.29.0

Bumps [github.com/goreleaser/nfpm/v2](https://github.com/goreleaser/nfpm) from 2.25.1 to 2.29.0.
- [Release notes](https://github.com/goreleaser/nfpm/releases)
- [Changelog](https://github.com/goreleaser/nfpm/blob/main/.goreleaser.yml)
- [Commits](goreleaser/nfpm@v2.25.1...v2.29.0)

---
updated-dependencies:
- dependency-name: github.com/goreleaser/nfpm/v2
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>

* Bump github.com/go-playground/validator/v10 from 10.11.1 to 10.14.1

Bumps [github.com/go-playground/validator/v10](https://github.com/go-playground/validator) from 10.11.1 to 10.14.1.
- [Release notes](https://github.com/go-playground/validator/releases)
- [Commits](go-playground/validator@v10.11.1...v10.14.1)

---
updated-dependencies:
- dependency-name: github.com/go-playground/validator/v10
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

* Bump Grpc.Net.Client in /client/DotNet/ArmadaProject.Io.Client

Bumps [Grpc.Net.Client](https://github.com/grpc/grpc-dotnet) from 2.47.0 to 2.52.0.
- [Release notes](https://github.com/grpc/grpc-dotnet/releases)
- [Changelog](https://github.com/grpc/grpc-dotnet/blob/master/doc/release_process.md)
- [Commits](grpc/grpc-dotnet@v2.47.0...v2.52.0)

---
updated-dependencies:
- dependency-name: Grpc.Net.Client
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>

* fix: upgrade @mui/material from 5.10.17 to 5.13.6

Snyk has created this PR to upgrade @mui/material from 5.10.17 to 5.13.6.

See this package in npm:

See this project in Snyk:
https://app.snyk.io/org/dave-gantenbein/project/5064983e-fa14-4803-8fc2-cfd6f1fa81b6?utm_source=github&utm_medium=referral&page=upgrade-pr

* fix: upgrade prettier from 2.7.1 to 2.8.8

Snyk has created this PR to upgrade prettier from 2.7.1 to 2.8.8.

See this package in npm:

See this project in Snyk:
https://app.snyk.io/org/dave-gantenbein/project/5064983e-fa14-4803-8fc2-cfd6f1fa81b6?utm_source=github&utm_medium=referral&page=upgrade-pr

* fix: upgrade @mui/icons-material from 5.10.16 to 5.14.3

Snyk has created this PR to upgrade @mui/icons-material from 5.10.16 to 5.14.3.

See this package in npm:

See this project in Snyk:
https://app.snyk.io/org/dave-gantenbein/project/5064983e-fa14-4803-8fc2-cfd6f1fa81b6?utm_source=github&utm_medium=referral&page=upgrade-pr

* fix: upgrade eslint-plugin-import from 2.26.0 to 2.28.0

Snyk has created this PR to upgrade eslint-plugin-import from 2.26.0 to 2.28.0.

See this package in npm:

See this project in Snyk:
https://app.snyk.io/org/dave-gantenbein/project/5064983e-fa14-4803-8fc2-cfd6f1fa81b6?utm_source=github&utm_medium=referral&page=upgrade-pr

* fix: upgrade eslint-config-prettier from 8.5.0 to 8.10.0

Snyk has created this PR to upgrade eslint-config-prettier from 8.5.0 to 8.10.0.

See this package in npm:

See this project in Snyk:
https://app.snyk.io/org/dave-gantenbein/project/5064983e-fa14-4803-8fc2-cfd6f1fa81b6?utm_source=github&utm_medium=referral&page=upgrade-pr

* Trying to update klog

* go mod fix

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: snyk-bot <snyk-bot@snyk.io>
Co-authored-by: Mohamed Abdelfatah <39927413+Mo-Fatah@users.noreply.github.com>
Signed-off-by: Rich Scott <richscott@sent.com>

* Fix bug causing GetJobSetEvents to get stuck (#2903)

* Add error message of final job run to JobFailedMessage

When we hit the maximum retry limit, the JobFailedMessage just says something along the lines of
"Job has been retried too many times, giving up"

Now we include the final run error in that message - to make it easier to work out the cause of retries

* Fix bug causing GetJobSetEvents to get stuck

GetJobSetEvents only increments its fromId variable on sending new messages

However now all redis events produce api events that will be sent downstream

The issue here is if we get 500 redis events in a row that don't produce api events, then the fromId never gets updated
 - Meaning the watching gets stuck here

To fix this, ReadEvents now returns a lastMessageId. So if there are no messages to process, the fromId should be updated using the lastMessageId

* Formatting

Signed-off-by: Rich Scott <richscott@sent.com>

* Bump @adobe/css-tools from 4.0.1 to 4.3.1 in /internal/lookout/ui (#2931)

Bumps [@adobe/css-tools](https://github.com/adobe/css-tools) from 4.0.1 to 4.3.1.
- [Changelog](https://github.com/adobe/css-tools/blob/main/History.md)
- [Commits](https://github.com/adobe/css-tools/commits)

---
updated-dependencies:
- dependency-name: "@adobe/css-tools"
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Signed-off-by: Rich Scott <richscott@sent.com>

* Update CreateJobs

Signed-off-by: Rich Scott <richscott@sent.com>

* Update CreateJobs to return a SubmitResponse with error details

Signed-off-by: Rich Scott <richscott@sent.com>

* Update createJobs sub functions to check jobs individually

* Update function usage to count errored jobs

Signed-off-by: Rich Scott <richscott@sent.com>

* Fix grammar

Signed-off-by: Rich Scott <richscott@sent.com>

* Added updated test cases

Signed-off-by: Rich Scott <richscott@sent.com>

* Update gang job testing

Signed-off-by: Rich Scott <richscott@sent.com>

* Merge branch 'master' into feat/create_job_error

Signed-off-by: Rich Scott <richscott@sent.com>

* Lint fix

Signed-off-by: Rich Scott <richscott@sent.com>

* Rework gRPC to send JobSubmitResponse over status.details

* Add better nil checking

Signed-off-by: Rich Scott <richscott@sent.com>

* Typo == instead of !=

Signed-off-by: Rich Scott <richscott@sent.com>

* Wrap gRPC SubmitJob function

Signed-off-by: Rich Scott <richscott@sent.com>

* Create new client function instead of sharing

Signed-off-by: Rich Scott <richscott@sent.com>

* Change import order

Signed-off-by: Rich Scott <richscott@sent.com>

* Add a space between imports

Signed-off-by: Rich Scott <richscott@sent.com>

* Avoid nil pointer deference

Signed-off-by: Rich Scott <richscott@sent.com>

* Improved etcd protection (#2925)

* Initial commit

* Delete unused code

* Export metrics collection delay metrics

* Add mutex to InMemoryJobRepository

* Add tests

* Lint

* Update internal/executor/configuration/types.go

* Lint

---------

Co-authored-by: JamesMurkin <jamesmurkin@hotmail.com>
Signed-off-by: Rich Scott <richscott@sent.com>

* Stop executor requesting more jobs when it still has leased jobs (#2932)

* Stop executor requesting more jobs when it still has leased jobs

Currently we "queue" jobs to be submitted on the executor - which sit the leased state until they are submitted to kubernetes

However this causes 2 issues with our current setup:
 - It prevents back-pressure from working well on the scheduler side. As it sees all these "Leased" jobs as active, so just keep scheduling more
 - In the case we are slowing submission due to etcd going over its limit. We "queue" lots of jobs, and as soon as etcd goes under its limit we hit it with potentially thousands of jobs

This flow needs further work and thought - however for now this is the minimal fix to prevent bad behaviour

Signed-off-by: JamesMurkin <jamesmurkin@hotmail.com>

* WIP

Signed-off-by: JamesMurkin <jamesmurkin@hotmail.com>

* Fix scheduler side tests

Signed-off-by: JamesMurkin <jamesmurkin@hotmail.com>

* Implement number of requested jobs on executor side

Signed-off-by: JamesMurkin <jamesmurkin@hotmail.com>

* Remove unused config

Signed-off-by: JamesMurkin <jamesmurkin@hotmail.com>

* Fixing panic on startup when etcd health monitor not registered

Signed-off-by: JamesMurkin <jamesmurkin@hotmail.com>

* Enhance logging

Signed-off-by: JamesMurkin <jamesmurkin@hotmail.com>

* Set more sensible default for maxLeasedJobs

Signed-off-by: JamesMurkin <jamesmurkin@hotmail.com>

---------

Signed-off-by: JamesMurkin <jamesmurkin@hotmail.com>
Signed-off-by: Rich Scott <richscott@sent.com>

* Fix race in etcd protections (#2937)

* Initial commit

* Fix MultiHealthMonitor race

Signed-off-by: Rich Scott <richscott@sent.com>

* Fix etcd health metric naming conflict (#2939)

* Fix metric naming conflict

* Fix metric names

* Fix metrix prefix

* Fix label

Signed-off-by: Rich Scott <richscott@sent.com>

* lint fix

Signed-off-by: Rich Scott <richscott@sent.com>

* Return clearer errors for multiple-jobs validation.

Signed-off-by: Rich Scott <richscott@sent.com>

* Return more detailed submission/validation errors.

Generate and return more detailed submission and/or validation errors.
If there are numerous jobs with errors, just give the number of failed
jobs (and the total number originally submitted), and truncate the list
of failed jobs errors to just the first 5 (this is defined in a single
constant variable, if neededed to change later).

Signed-off-by: Rich Scott <richscott@sent.com>

---------

Signed-off-by: Rich Scott <richscott@sent.com>
Signed-off-by: dependabot[bot] <support@github.com>
Signed-off-by: JamesMurkin <jamesmurkin@hotmail.com>
Co-authored-by: Clif Houck <me@clifhouck.com>
Co-authored-by: Mohamed Abdelfatah <39927413+Mo-Fatah@users.noreply.github.com>
Co-authored-by: Chris Martin <council_tax@hotmail.com>
Co-authored-by: Chris Martin <chris@cmartinit.co.uk>
Co-authored-by: Daniel Rastelli <rastellidani@gmail.com>
Co-authored-by: Kanu Mike Chibundu <michotall95@gmail.com>
Co-authored-by: Adam McArthur <46480158+Sharpz7@users.noreply.github.com>
Co-authored-by: Dave Gantenbein <dave@gr-oss.io>
Co-authored-by: snyk-bot <snyk-bot@snyk.io>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: JamesMurkin <jamesmurkin@hotmail.com>
Co-authored-by: Sarthak Negi <122533767+sarthaksarthak9@users.noreply.github.com>
Co-authored-by: owenthomas17 <owen@owen-thomas.co.uk>
Co-authored-by: Raajheer1 <raaj.patel229@gmail.com>
Co-authored-by: Raaj Patel <41763998+Raajheer1@users.noreply.github.com>
Co-authored-by: Albin Severinson <albin@severinson.org>
  • Loading branch information
17 people committed Dec 12, 2023
1 parent d255ffc commit 356a5f0
Show file tree
Hide file tree
Showing 11 changed files with 308 additions and 41 deletions.
2 changes: 2 additions & 0 deletions go.mod
Expand Up @@ -81,6 +81,7 @@ require (
github.com/go-openapi/swag v0.22.4
github.com/go-openapi/validate v0.22.1
github.com/go-playground/validator/v10 v10.15.4
github.com/gogo/status v1.1.1
github.com/golang/mock v1.6.0
github.com/goreleaser/goreleaser v1.15.2
github.com/jackc/pgx/v5 v5.5.0
Expand Down Expand Up @@ -132,6 +133,7 @@ require (
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/gobwas/glob v0.2.3 // indirect
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect
github.com/gogo/googleapis v0.0.0-20180223154316-0cd9801be74a // indirect
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
github.com/golang/snappy v0.0.3 // indirect
github.com/gomodule/redigo v2.0.0+incompatible // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Expand Up @@ -298,10 +298,14 @@ github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJA
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 h1:ZpnhV/YsD2/4cESfV5+Hoeu/iUR3ruzNvZ+yQfO03a0=
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2/go.mod h1:bBOAhwG1umN6/6ZUMtDFBMQR8jRg9O75tm9K00oMsK4=
github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/gogo/googleapis v0.0.0-20180223154316-0cd9801be74a h1:dR8+Q0uO5S2ZBcs2IH6VBKYwSxPo2vYCYq0ot0mu7xA=
github.com/gogo/googleapis v0.0.0-20180223154316-0cd9801be74a/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/gogo/status v1.1.1 h1:DuHXlSFHNKqTQ+/ACf5Vs6r4X/dH2EgIzR9Vr+H65kg=
github.com/gogo/status v1.1.1/go.mod h1:jpG3dM5QPcqu19Hg8lkUhBFBa3TcLs1DG7+2Jqci7oU=
github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY=
github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
Expand Down Expand Up @@ -1284,6 +1288,7 @@ google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCID
google.golang.org/appengine v1.6.6/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c=
google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
google.golang.org/genproto v0.0.0-20180518175338-11a468237815/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
google.golang.org/genproto v0.0.0-20190418145605-e7d98fc518a7/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
Expand Down Expand Up @@ -1330,6 +1335,7 @@ google.golang.org/genproto/googleapis/api v0.0.0-20230525234035-dd9d682886f9 h1:
google.golang.org/genproto/googleapis/api v0.0.0-20230525234035-dd9d682886f9/go.mod h1:vHYtlOoi6TsQ3Uk2yxR7NI5z8uoV+3pZtR4jmHIkRig=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 h1:0nDDozoAU19Qb2HwhXadU8OcsiO/09cnTqhUtq2MEOM=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA=
google.golang.org/grpc v1.12.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
Expand Down
24 changes: 20 additions & 4 deletions internal/armada/server/job_validation.go
@@ -1,6 +1,8 @@
package server

import (
"fmt"

"github.com/pkg/errors"

"github.com/armadaproject/armada/internal/armada/scheduling"
Expand All @@ -13,16 +15,30 @@ import (
func validateJobsCanBeScheduled(
jobs []*api.Job,
allClusterSchedulingInfo map[string]*api.ClusterSchedulingInfoReport,
) (bool, error) {
) (bool, []*api.JobSubmitResponseItem, error) {
activeClusterSchedulingInfo := scheduling.FilterActiveClusterSchedulingInfoReports(allClusterSchedulingInfo)
responseItems := make([]*api.JobSubmitResponseItem, 0, len(jobs))
for i, job := range jobs {
if ok, err := scheduling.MatchSchedulingRequirementsOnAnyCluster(job, activeClusterSchedulingInfo); !ok {
if err != nil {
return false, errors.WithMessagef(err, "%d-th job can't be scheduled", i)
response := &api.JobSubmitResponseItem{
JobId: job.Id,
Error: fmt.Sprintf("%d-th job can't be scheduled: %v", i, err),
}
responseItems = append(responseItems, response)
} else {
return false, errors.Errorf("%d-th job can't be scheduled", i)
response := &api.JobSubmitResponseItem{
JobId: job.Id,
Error: fmt.Sprintf("%d-th job can't be scheduled", i),
}
responseItems = append(responseItems, response)
}
}
}
return true, nil

if len(responseItems) > 0 {
return false, responseItems, errors.New("[createJobs] Failed to validate jobs can be scheduled")
}

return true, nil, nil
}
6 changes: 3 additions & 3 deletions internal/armada/server/lease.go
Expand Up @@ -970,11 +970,11 @@ func (q *AggregatedQueueServer) addAvoidNodeAffinity(
}

changed := addAvoidNodeAffinity(jobs[0], labels, func(jobsToValidate []*api.Job) error {
if ok, err := validateJobsCanBeScheduled(jobsToValidate, allClusterSchedulingInfo); !ok {
if ok, responseItems, err := validateJobsCanBeScheduled(jobsToValidate, allClusterSchedulingInfo); !ok {
if err != nil {
return errors.WithMessage(err, "can't schedule at least 1 job")
return errors.WithMessagef(err, "can't schedule %d (out of %d submitted) job(s)", len(responseItems), len(jobsToValidate))
} else {
return errors.Errorf("can't schedule at least 1 job")
return errors.Errorf("can't schedule %d (out of %d submitted) job(s)", len(responseItems), len(jobsToValidate))
}
}
return nil
Expand Down
132 changes: 110 additions & 22 deletions internal/armada/server/submit.go
Expand Up @@ -9,11 +9,11 @@ import (
"time"

"github.com/gogo/protobuf/types"
"github.com/gogo/status"
pool "github.com/jolestar/go-commons-pool"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/utils/strings/slices"

"github.com/armadaproject/armada/internal/armada/configuration"
Expand Down Expand Up @@ -42,6 +42,21 @@ type SubmitServer struct {
compressorPool *pool.ObjectPool
}

type JobSubmitError struct {
JobErrorsDetails []*api.JobSubmitResponseItem
Err error
}

func (e *JobSubmitError) Error() string {
output := ""
for _, jobError := range e.JobErrorsDetails {
output += fmt.Sprintf("Error - Job %s: %s\n", jobError.JobId, jobError.Error)
}

output += fmt.Sprintf("\nError - %s", e.Err.Error())
return output
}

func NewSubmitServer(
authorizer ActionAuthorizer,
jobRepository repository.JobRepository,
Expand Down Expand Up @@ -285,13 +300,51 @@ func (server *SubmitServer) SubmitJobs(grpcCtx context.Context, req *api.JobSubm
ctx := armadacontext.FromGrpcCtx(grpcCtx)
principal := authorization.GetPrincipal(ctx)

jobs, e := server.createJobs(req, principal.GetName(), principal.GetGroupNames())
const maxResponseItems = 5
var lastIdx int

jobs, responseItems, e := server.createJobs(req, principal.GetName(), principal.GetGroupNames())
if e != nil {
if len(responseItems) > maxResponseItems {
lastIdx = maxResponseItems
} else {
lastIdx = len(responseItems)
}

reqJson, _ := json.Marshal(req)
return nil, status.Errorf(codes.InvalidArgument, "[SubmitJobs] Error submitting job %s for user %s: %v", reqJson, principal.GetName(), e)
createJobsErrFmt := "[SubmitJobs] error creating %d of %d job(s) submitted; %s for user %s; first %d errors:%v"
numFails := len(responseItems)
numSubmitted := numFails + len(jobs)
details := &api.JobSubmitResponse{JobResponseItems: responseItems[:lastIdx]}

st, err := status.Newf(codes.InvalidArgument, createJobsErrFmt, numFails, numSubmitted, reqJson,
principal.GetName(), maxResponseItems, e).WithDetails(details)
if err != nil {
subJobUserFmt := "[SubmitJobs] error submitting job %s for user %s; : %v"
return nil, status.Errorf(codes.InvalidArgument, subJobUserFmt, reqJson, principal.GetName(), e)
}
return nil, st.Err()
}
if err := validation.ValidateApiJobs(jobs, *server.schedulingConfig); err != nil {
return nil, err

if responseItems, err := validation.ValidateApiJobs(jobs, *server.schedulingConfig); err != nil {
reqJson, _ := json.Marshal(req)
numFails := len(responseItems)
numSubmitted := len(jobs)
if len(responseItems) > maxResponseItems {
lastIdx = maxResponseItems
} else {
lastIdx = len(responseItems)
}

details := &api.JobSubmitResponse{JobResponseItems: responseItems[:lastIdx]}
validJobsErrFmt := "[SubmitJobs] error validating %d of %d job(s) submitted; %s for user %s; first %d errors:%v"
st, err := status.Newf(codes.InvalidArgument, validJobsErrFmt, numFails, numSubmitted, reqJson,
principal.GetName(), e).WithDetails(details)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, validJobsErrFmt, numFails, numSubmitted, reqJson,
principal.GetName(), e)
}
return nil, st.Err()
}

q, err := server.getQueueOrCreate(ctx, req.Queue)
Expand All @@ -301,9 +354,7 @@ func (server *SubmitServer) SubmitJobs(grpcCtx context.Context, req *api.JobSubm

err = server.submittingJobsWouldSurpassLimit(*q, req)
if err != nil {
return nil, status.Errorf(
codes.InvalidArgument,
"[SubmitJobs] error checking queue limit: %s", err)
return nil, status.Errorf(codes.InvalidArgument, "[SubmitJobs] error checking queue limit: %s", err)
}

err = server.authorizer.AuthorizeQueueAction(ctx, *q, permissions.SubmitAnyJobs, queue.PermissionVerbSubmit)
Expand All @@ -321,9 +372,24 @@ func (server *SubmitServer) SubmitJobs(grpcCtx context.Context, req *api.JobSubm
return nil, status.Errorf(codes.InvalidArgument, "error getting scheduling info: %s", err)
}

if ok, err := validateJobsCanBeScheduled(jobs, allClusterSchedulingInfo); !ok {
if ok, responseItems, err := validateJobsCanBeScheduled(jobs, allClusterSchedulingInfo); !ok {
if err != nil {
return nil, errors.WithMessagef(err, "can't schedule job for user %s", principal.GetName())
numFails := len(responseItems)
numSubmitted := len(jobs)
if len(responseItems) > maxResponseItems {
lastIdx = maxResponseItems
} else {
lastIdx = len(responseItems)
}
details := &api.JobSubmitResponse{JobResponseItems: responseItems[:lastIdx]}
validJobsErrFmt := "[SubmitJobs] error validating %d of %d job(s) submitted for user %s; first %d errors:%v"

st, e := status.Newf(codes.InvalidArgument, validJobsErrFmt, numFails, numSubmitted,
principal.GetName(), maxResponseItems, err).WithDetails(details)
if e != nil {
return nil, status.Errorf(codes.InvalidArgument, "[SubmitJobs] error validating jobs: %s", err)
}
return nil, st.Err()
}
return nil, errors.Errorf("can't schedule job for user %s", principal.GetName())
}
Expand Down Expand Up @@ -762,16 +828,16 @@ func (server *SubmitServer) getQueueOrCreate(ctx *armadacontext.Context, queueNa
// createJobs returns a list of objects representing the jobs in a JobSubmitRequest.
// This function validates the jobs in the request and the pod specs. in each job.
// If any job or pod in invalid, an error is returned.
func (server *SubmitServer) createJobs(request *api.JobSubmitRequest, owner string, ownershipGroups []string) ([]*api.Job, error) {
func (server *SubmitServer) createJobs(request *api.JobSubmitRequest, owner string, ownershipGroups []string) ([]*api.Job, []*api.JobSubmitResponseItem, error) {
return server.createJobsObjects(request, owner, ownershipGroups, time.Now, util.NewULID)
}

func (server *SubmitServer) createJobsObjects(request *api.JobSubmitRequest, owner string, ownershipGroups []string,
getTime func() time.Time, getUlid func() string,
) ([]*api.Job, error) {
) ([]*api.Job, []*api.JobSubmitResponseItem, error) {
compressor, err := server.compressorPool.BorrowObject(armadacontext.Background())
if err != nil {
return nil, err
return nil, nil, err
}
defer func(compressorPool *pool.ObjectPool, ctx *armadacontext.Context, object interface{}) {
err := compressorPool.ReturnObject(ctx, object)
Expand All @@ -781,29 +847,45 @@ func (server *SubmitServer) createJobsObjects(request *api.JobSubmitRequest, own
}(server.compressorPool, armadacontext.Background(), compressor)
compressedOwnershipGroups, err := compress.CompressStringArray(ownershipGroups, compressor.(compress.Compressor))
if err != nil {
return nil, err
return nil, nil, err
}

jobs := make([]*api.Job, 0, len(request.JobRequestItems))

if request.JobSetId == "" {
return nil, errors.Errorf("[createJobs] job set not specified")
return nil, nil, errors.Errorf("[createJobs] job set not specified")
}

if request.Queue == "" {
return nil, errors.Errorf("[createJobs] queue not specified")
return nil, nil, errors.Errorf("[createJobs] queue not specified")
}

responseItems := make([]*api.JobSubmitResponseItem, 0, len(request.JobRequestItems))
for i, item := range request.JobRequestItems {
jobId := getUlid()

if item.PodSpec != nil && len(item.PodSpecs) > 0 {
return nil, errors.Errorf("[createJobs] job %d in job set %s contains both podSpec and podSpecs, but may only contain either", i, request.JobSetId)
response := &api.JobSubmitResponseItem{
JobId: jobId,
Error: fmt.Sprintf("[createJobs] job %d in job set %s contains both podSpec and podSpecs, but may only contain either", i, request.JobSetId),
}
responseItems = append(responseItems, response)
}
podSpec := item.GetMainPodSpec()
if podSpec == nil {
return nil, errors.Errorf("[createJobs] job %d in job set %s contains no podSpec", i, request.JobSetId)
response := &api.JobSubmitResponseItem{
JobId: jobId,
Error: fmt.Sprintf("[createJobs] job %d in job set %s contains no podSpec", i, request.JobSetId),
}
responseItems = append(responseItems, response)
continue // Safety check, to avoid possible nil pointer dereference below
}
if err := validation.ValidateJobSubmitRequestItem(item); err != nil {
return nil, errors.Errorf("[createJobs] error validating the %d-th job of job set %s: %v", i, request.JobSetId, err)
response := &api.JobSubmitResponseItem{
JobId: jobId,
Error: fmt.Sprintf("[createJobs] error validating the %d-th job of job set %s: %v", i, request.JobSetId, err),
}
responseItems = append(responseItems, response)
}
namespace := item.Namespace
if namespace == "" {
Expand All @@ -813,7 +895,11 @@ func (server *SubmitServer) createJobsObjects(request *api.JobSubmitRequest, own
applyDefaultsToAnnotations(item.Annotations, *server.schedulingConfig)
applyDefaultsToPodSpec(podSpec, *server.schedulingConfig)
if err := validation.ValidatePodSpec(podSpec, server.schedulingConfig); err != nil {
return nil, errors.Errorf("[createJobs] error validating the %d-th job of job set %s: %v", i, request.JobSetId, err)
response := &api.JobSubmitResponseItem{
JobId: jobId,
Error: fmt.Sprintf("[createJobs] error validating the %d-th job of job set %s: %v", i, request.JobSetId, err),
}
responseItems = append(responseItems, response)
}

// TODO: remove, RequiredNodeLabels is deprecated and will be removed in future versions
Expand All @@ -824,7 +910,6 @@ func (server *SubmitServer) createJobsObjects(request *api.JobSubmitRequest, own
podSpec.NodeSelector[k] = v
}

jobId := getUlid()
enrichText(item.Labels, jobId)
enrichText(item.Annotations, jobId)
j := &api.Job{
Expand Down Expand Up @@ -855,7 +940,10 @@ func (server *SubmitServer) createJobsObjects(request *api.JobSubmitRequest, own
jobs = append(jobs, j)
}

return jobs, nil
if len(responseItems) > 0 {
return nil, responseItems, errors.New("[createJobs] error creating jobs, check JobSubmitResponse for details")
}
return jobs, nil, nil
}

func enrichText(labels map[string]string, jobId string) {
Expand Down

0 comments on commit 356a5f0

Please sign in to comment.