diff --git a/go.mod b/go.mod index c2b38a85577..467fcd057c6 100644 --- a/go.mod +++ b/go.mod @@ -81,6 +81,7 @@ require ( github.com/go-openapi/swag v0.22.4 github.com/go-openapi/validate v0.22.1 github.com/go-playground/validator/v10 v10.15.4 + github.com/gogo/status v1.1.1 github.com/golang/mock v1.6.0 github.com/goreleaser/goreleaser v1.15.2 github.com/jackc/pgx/v5 v5.5.0 @@ -132,6 +133,7 @@ require ( github.com/go-playground/universal-translator v0.18.1 // indirect github.com/gobwas/glob v0.2.3 // indirect github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect + github.com/gogo/googleapis v0.0.0-20180223154316-0cd9801be74a // indirect github.com/golang-jwt/jwt v3.2.2+incompatible // indirect github.com/golang/snappy v0.0.3 // indirect github.com/gomodule/redigo v2.0.0+incompatible // indirect diff --git a/go.sum b/go.sum index bdf62892c4f..b92ecbcd4fe 100644 --- a/go.sum +++ b/go.sum @@ -298,10 +298,14 @@ github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJA github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 h1:ZpnhV/YsD2/4cESfV5+Hoeu/iUR3ruzNvZ+yQfO03a0= github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2/go.mod h1:bBOAhwG1umN6/6ZUMtDFBMQR8jRg9O75tm9K00oMsK4= github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= +github.com/gogo/googleapis v0.0.0-20180223154316-0cd9801be74a h1:dR8+Q0uO5S2ZBcs2IH6VBKYwSxPo2vYCYq0ot0mu7xA= +github.com/gogo/googleapis v0.0.0-20180223154316-0cd9801be74a/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/gogo/status v1.1.1 h1:DuHXlSFHNKqTQ+/ACf5Vs6r4X/dH2EgIzR9Vr+H65kg= +github.com/gogo/status v1.1.1/go.mod h1:jpG3dM5QPcqu19Hg8lkUhBFBa3TcLs1DG7+2Jqci7oU= github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY= github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= @@ -1284,6 +1288,7 @@ google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCID google.golang.org/appengine v1.6.6/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c= google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= +google.golang.org/genproto v0.0.0-20180518175338-11a468237815/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/genproto v0.0.0-20190418145605-e7d98fc518a7/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= @@ -1330,6 +1335,7 @@ google.golang.org/genproto/googleapis/api v0.0.0-20230525234035-dd9d682886f9 h1: google.golang.org/genproto/googleapis/api v0.0.0-20230525234035-dd9d682886f9/go.mod h1:vHYtlOoi6TsQ3Uk2yxR7NI5z8uoV+3pZtR4jmHIkRig= google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 h1:0nDDozoAU19Qb2HwhXadU8OcsiO/09cnTqhUtq2MEOM= google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA= +google.golang.org/grpc v1.12.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= diff --git a/internal/armada/server/job_validation.go b/internal/armada/server/job_validation.go index a584a5c1fd4..eba5edf1488 100644 --- a/internal/armada/server/job_validation.go +++ b/internal/armada/server/job_validation.go @@ -1,6 +1,8 @@ package server import ( + "fmt" + "github.com/pkg/errors" "github.com/armadaproject/armada/internal/armada/scheduling" @@ -13,16 +15,30 @@ import ( func validateJobsCanBeScheduled( jobs []*api.Job, allClusterSchedulingInfo map[string]*api.ClusterSchedulingInfoReport, -) (bool, error) { +) (bool, []*api.JobSubmitResponseItem, error) { activeClusterSchedulingInfo := scheduling.FilterActiveClusterSchedulingInfoReports(allClusterSchedulingInfo) + responseItems := make([]*api.JobSubmitResponseItem, 0, len(jobs)) for i, job := range jobs { if ok, err := scheduling.MatchSchedulingRequirementsOnAnyCluster(job, activeClusterSchedulingInfo); !ok { if err != nil { - return false, errors.WithMessagef(err, "%d-th job can't be scheduled", i) + response := &api.JobSubmitResponseItem{ + JobId: job.Id, + Error: fmt.Sprintf("%d-th job can't be scheduled: %v", i, err), + } + responseItems = append(responseItems, response) } else { - return false, errors.Errorf("%d-th job can't be scheduled", i) + response := &api.JobSubmitResponseItem{ + JobId: job.Id, + Error: fmt.Sprintf("%d-th job can't be scheduled", i), + } + responseItems = append(responseItems, response) } } } - return true, nil + + if len(responseItems) > 0 { + return false, responseItems, errors.New("[createJobs] Failed to validate jobs can be scheduled") + } + + return true, nil, nil } diff --git a/internal/armada/server/lease.go b/internal/armada/server/lease.go index 72ce1189c65..23465c6b805 100644 --- a/internal/armada/server/lease.go +++ b/internal/armada/server/lease.go @@ -970,11 +970,11 @@ func (q *AggregatedQueueServer) addAvoidNodeAffinity( } changed := addAvoidNodeAffinity(jobs[0], labels, func(jobsToValidate []*api.Job) error { - if ok, err := validateJobsCanBeScheduled(jobsToValidate, allClusterSchedulingInfo); !ok { + if ok, responseItems, err := validateJobsCanBeScheduled(jobsToValidate, allClusterSchedulingInfo); !ok { if err != nil { - return errors.WithMessage(err, "can't schedule at least 1 job") + return errors.WithMessagef(err, "can't schedule %d (out of %d submitted) job(s)", len(responseItems), len(jobsToValidate)) } else { - return errors.Errorf("can't schedule at least 1 job") + return errors.Errorf("can't schedule %d (out of %d submitted) job(s)", len(responseItems), len(jobsToValidate)) } } return nil diff --git a/internal/armada/server/submit.go b/internal/armada/server/submit.go index 7fcafd25bdb..c5efe682f48 100644 --- a/internal/armada/server/submit.go +++ b/internal/armada/server/submit.go @@ -9,11 +9,11 @@ import ( "time" "github.com/gogo/protobuf/types" + "github.com/gogo/status" pool "github.com/jolestar/go-commons-pool" "github.com/pkg/errors" log "github.com/sirupsen/logrus" "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" "k8s.io/utils/strings/slices" "github.com/armadaproject/armada/internal/armada/configuration" @@ -42,6 +42,21 @@ type SubmitServer struct { compressorPool *pool.ObjectPool } +type JobSubmitError struct { + JobErrorsDetails []*api.JobSubmitResponseItem + Err error +} + +func (e *JobSubmitError) Error() string { + output := "" + for _, jobError := range e.JobErrorsDetails { + output += fmt.Sprintf("Error - Job %s: %s\n", jobError.JobId, jobError.Error) + } + + output += fmt.Sprintf("\nError - %s", e.Err.Error()) + return output +} + func NewSubmitServer( authorizer ActionAuthorizer, jobRepository repository.JobRepository, @@ -285,13 +300,51 @@ func (server *SubmitServer) SubmitJobs(grpcCtx context.Context, req *api.JobSubm ctx := armadacontext.FromGrpcCtx(grpcCtx) principal := authorization.GetPrincipal(ctx) - jobs, e := server.createJobs(req, principal.GetName(), principal.GetGroupNames()) + const maxResponseItems = 5 + var lastIdx int + + jobs, responseItems, e := server.createJobs(req, principal.GetName(), principal.GetGroupNames()) if e != nil { + if len(responseItems) > maxResponseItems { + lastIdx = maxResponseItems + } else { + lastIdx = len(responseItems) + } + reqJson, _ := json.Marshal(req) - return nil, status.Errorf(codes.InvalidArgument, "[SubmitJobs] Error submitting job %s for user %s: %v", reqJson, principal.GetName(), e) + createJobsErrFmt := "[SubmitJobs] error creating %d of %d job(s) submitted; %s for user %s; first %d errors:%v" + numFails := len(responseItems) + numSubmitted := numFails + len(jobs) + details := &api.JobSubmitResponse{JobResponseItems: responseItems[:lastIdx]} + + st, err := status.Newf(codes.InvalidArgument, createJobsErrFmt, numFails, numSubmitted, reqJson, + principal.GetName(), maxResponseItems, e).WithDetails(details) + if err != nil { + subJobUserFmt := "[SubmitJobs] error submitting job %s for user %s; : %v" + return nil, status.Errorf(codes.InvalidArgument, subJobUserFmt, reqJson, principal.GetName(), e) + } + return nil, st.Err() } - if err := validation.ValidateApiJobs(jobs, *server.schedulingConfig); err != nil { - return nil, err + + if responseItems, err := validation.ValidateApiJobs(jobs, *server.schedulingConfig); err != nil { + reqJson, _ := json.Marshal(req) + numFails := len(responseItems) + numSubmitted := len(jobs) + if len(responseItems) > maxResponseItems { + lastIdx = maxResponseItems + } else { + lastIdx = len(responseItems) + } + + details := &api.JobSubmitResponse{JobResponseItems: responseItems[:lastIdx]} + validJobsErrFmt := "[SubmitJobs] error validating %d of %d job(s) submitted; %s for user %s; first %d errors:%v" + st, err := status.Newf(codes.InvalidArgument, validJobsErrFmt, numFails, numSubmitted, reqJson, + principal.GetName(), e).WithDetails(details) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, validJobsErrFmt, numFails, numSubmitted, reqJson, + principal.GetName(), e) + } + return nil, st.Err() } q, err := server.getQueueOrCreate(ctx, req.Queue) @@ -301,9 +354,7 @@ func (server *SubmitServer) SubmitJobs(grpcCtx context.Context, req *api.JobSubm err = server.submittingJobsWouldSurpassLimit(*q, req) if err != nil { - return nil, status.Errorf( - codes.InvalidArgument, - "[SubmitJobs] error checking queue limit: %s", err) + return nil, status.Errorf(codes.InvalidArgument, "[SubmitJobs] error checking queue limit: %s", err) } err = server.authorizer.AuthorizeQueueAction(ctx, *q, permissions.SubmitAnyJobs, queue.PermissionVerbSubmit) @@ -321,9 +372,24 @@ func (server *SubmitServer) SubmitJobs(grpcCtx context.Context, req *api.JobSubm return nil, status.Errorf(codes.InvalidArgument, "error getting scheduling info: %s", err) } - if ok, err := validateJobsCanBeScheduled(jobs, allClusterSchedulingInfo); !ok { + if ok, responseItems, err := validateJobsCanBeScheduled(jobs, allClusterSchedulingInfo); !ok { if err != nil { - return nil, errors.WithMessagef(err, "can't schedule job for user %s", principal.GetName()) + numFails := len(responseItems) + numSubmitted := len(jobs) + if len(responseItems) > maxResponseItems { + lastIdx = maxResponseItems + } else { + lastIdx = len(responseItems) + } + details := &api.JobSubmitResponse{JobResponseItems: responseItems[:lastIdx]} + validJobsErrFmt := "[SubmitJobs] error validating %d of %d job(s) submitted for user %s; first %d errors:%v" + + st, e := status.Newf(codes.InvalidArgument, validJobsErrFmt, numFails, numSubmitted, + principal.GetName(), maxResponseItems, err).WithDetails(details) + if e != nil { + return nil, status.Errorf(codes.InvalidArgument, "[SubmitJobs] error validating jobs: %s", err) + } + return nil, st.Err() } return nil, errors.Errorf("can't schedule job for user %s", principal.GetName()) } @@ -762,16 +828,16 @@ func (server *SubmitServer) getQueueOrCreate(ctx *armadacontext.Context, queueNa // createJobs returns a list of objects representing the jobs in a JobSubmitRequest. // This function validates the jobs in the request and the pod specs. in each job. // If any job or pod in invalid, an error is returned. -func (server *SubmitServer) createJobs(request *api.JobSubmitRequest, owner string, ownershipGroups []string) ([]*api.Job, error) { +func (server *SubmitServer) createJobs(request *api.JobSubmitRequest, owner string, ownershipGroups []string) ([]*api.Job, []*api.JobSubmitResponseItem, error) { return server.createJobsObjects(request, owner, ownershipGroups, time.Now, util.NewULID) } func (server *SubmitServer) createJobsObjects(request *api.JobSubmitRequest, owner string, ownershipGroups []string, getTime func() time.Time, getUlid func() string, -) ([]*api.Job, error) { +) ([]*api.Job, []*api.JobSubmitResponseItem, error) { compressor, err := server.compressorPool.BorrowObject(armadacontext.Background()) if err != nil { - return nil, err + return nil, nil, err } defer func(compressorPool *pool.ObjectPool, ctx *armadacontext.Context, object interface{}) { err := compressorPool.ReturnObject(ctx, object) @@ -781,29 +847,45 @@ func (server *SubmitServer) createJobsObjects(request *api.JobSubmitRequest, own }(server.compressorPool, armadacontext.Background(), compressor) compressedOwnershipGroups, err := compress.CompressStringArray(ownershipGroups, compressor.(compress.Compressor)) if err != nil { - return nil, err + return nil, nil, err } jobs := make([]*api.Job, 0, len(request.JobRequestItems)) if request.JobSetId == "" { - return nil, errors.Errorf("[createJobs] job set not specified") + return nil, nil, errors.Errorf("[createJobs] job set not specified") } if request.Queue == "" { - return nil, errors.Errorf("[createJobs] queue not specified") + return nil, nil, errors.Errorf("[createJobs] queue not specified") } + responseItems := make([]*api.JobSubmitResponseItem, 0, len(request.JobRequestItems)) for i, item := range request.JobRequestItems { + jobId := getUlid() + if item.PodSpec != nil && len(item.PodSpecs) > 0 { - return nil, errors.Errorf("[createJobs] job %d in job set %s contains both podSpec and podSpecs, but may only contain either", i, request.JobSetId) + response := &api.JobSubmitResponseItem{ + JobId: jobId, + Error: fmt.Sprintf("[createJobs] job %d in job set %s contains both podSpec and podSpecs, but may only contain either", i, request.JobSetId), + } + responseItems = append(responseItems, response) } podSpec := item.GetMainPodSpec() if podSpec == nil { - return nil, errors.Errorf("[createJobs] job %d in job set %s contains no podSpec", i, request.JobSetId) + response := &api.JobSubmitResponseItem{ + JobId: jobId, + Error: fmt.Sprintf("[createJobs] job %d in job set %s contains no podSpec", i, request.JobSetId), + } + responseItems = append(responseItems, response) + continue // Safety check, to avoid possible nil pointer dereference below } if err := validation.ValidateJobSubmitRequestItem(item); err != nil { - return nil, errors.Errorf("[createJobs] error validating the %d-th job of job set %s: %v", i, request.JobSetId, err) + response := &api.JobSubmitResponseItem{ + JobId: jobId, + Error: fmt.Sprintf("[createJobs] error validating the %d-th job of job set %s: %v", i, request.JobSetId, err), + } + responseItems = append(responseItems, response) } namespace := item.Namespace if namespace == "" { @@ -813,7 +895,11 @@ func (server *SubmitServer) createJobsObjects(request *api.JobSubmitRequest, own applyDefaultsToAnnotations(item.Annotations, *server.schedulingConfig) applyDefaultsToPodSpec(podSpec, *server.schedulingConfig) if err := validation.ValidatePodSpec(podSpec, server.schedulingConfig); err != nil { - return nil, errors.Errorf("[createJobs] error validating the %d-th job of job set %s: %v", i, request.JobSetId, err) + response := &api.JobSubmitResponseItem{ + JobId: jobId, + Error: fmt.Sprintf("[createJobs] error validating the %d-th job of job set %s: %v", i, request.JobSetId, err), + } + responseItems = append(responseItems, response) } // TODO: remove, RequiredNodeLabels is deprecated and will be removed in future versions @@ -824,7 +910,6 @@ func (server *SubmitServer) createJobsObjects(request *api.JobSubmitRequest, own podSpec.NodeSelector[k] = v } - jobId := getUlid() enrichText(item.Labels, jobId) enrichText(item.Annotations, jobId) j := &api.Job{ @@ -855,7 +940,10 @@ func (server *SubmitServer) createJobsObjects(request *api.JobSubmitRequest, own jobs = append(jobs, j) } - return jobs, nil + if len(responseItems) > 0 { + return nil, responseItems, errors.New("[createJobs] error creating jobs, check JobSubmitResponse for details") + } + return jobs, nil, nil } func enrichText(labels map[string]string, jobId string) { diff --git a/internal/armada/server/submit_test.go b/internal/armada/server/submit_test.go index 9521cb28432..e01ee30a9e4 100644 --- a/internal/armada/server/submit_test.go +++ b/internal/armada/server/submit_test.go @@ -1628,6 +1628,9 @@ func TestSubmitServer_CreateJobs_WithJobIdReplacement(t *testing.T) { }, } + // Empty - no response items expected as no jobs were submitted without errors + var expectedResponseItems []*api.JobSubmitResponseItem + request := &api.JobSubmitRequest{ Queue: "test", JobSetId: "test-jobsetid", @@ -1667,8 +1670,90 @@ func TestSubmitServer_CreateJobs_WithJobIdReplacement(t *testing.T) { } ownershipGroups := make([]string, 0) withSubmitServer(func(s *SubmitServer, events *repository.TestEventStore) { - output, err := s.createJobsObjects(request, "test", ownershipGroups, mockNow, mockNewULID) + output, responseItems, err := s.createJobsObjects(request, "test", ownershipGroups, mockNow, mockNewULID) assert.NoError(t, err) + assert.Equal(t, expectedResponseItems, responseItems) assert.Equal(t, expected, output) }) } + +func TestSubmitServer_CreateJobs_WithDuplicatePodSpec(t *testing.T) { + timeNow := time.Now() + mockNow := func() time.Time { + return timeNow + } + mockNewULID := func() string { + return "test-ulid" + } + + expectedResponseItems := []*api.JobSubmitResponseItem{ + { + JobId: "test-ulid", + Error: "[createJobs] job 0 in job set test-jobsetid contains both podSpec and podSpecs, but may only contain either", + }, + } + expectedError := "[createJobs] error creating jobs, check JobSubmitResponse for details" + + request := &api.JobSubmitRequest{ + Queue: "test", + JobSetId: "test-jobsetid", + JobRequestItems: []*api.JobSubmitRequestItem{ + { + Priority: 1, + Namespace: "test", + ClientId: "0", + Labels: map[string]string{ + "a.label": "job-id-is-{JobId}", + }, + Annotations: map[string]string{ + "a.nnotation": "job-id-is-{JobId}", + }, + PodSpecs: []*v1.PodSpec{ + { + Containers: []v1.Container{ + { + Name: "app", + Image: "test:latest", + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + "cpu": resource.MustParse("1"), + "memory": resource.MustParse("100Mi"), + }, + Requests: v1.ResourceList{ + "cpu": resource.MustParse("1"), + "memory": resource.MustParse("100Mi"), + }, + }, + }, + }, + }, + }, + PodSpec: &v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "app", + Image: "test:latest", + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + "cpu": resource.MustParse("1"), + "memory": resource.MustParse("100Mi"), + }, + Requests: v1.ResourceList{ + "cpu": resource.MustParse("1"), + "memory": resource.MustParse("100Mi"), + }, + }, + }, + }, + }, + }, + }, + } + ownershipGroups := make([]string, 0) + withSubmitServer(func(s *SubmitServer, events *repository.TestEventStore) { + output, responseItems, err := s.createJobsObjects(request, "test", ownershipGroups, mockNow, mockNewULID) + assert.Equal(t, expectedError, err.Error()) + assert.Equal(t, expectedResponseItems, responseItems) + assert.Nil(t, output) + }) +} diff --git a/internal/armada/server/submit_to_log.go b/internal/armada/server/submit_to_log.go index f54411642f8..49ad9c01b97 100644 --- a/internal/armada/server/submit_to_log.go +++ b/internal/armada/server/submit_to_log.go @@ -10,12 +10,12 @@ import ( "github.com/apache/pulsar-client-go/pulsar" "github.com/gogo/protobuf/types" + "github.com/gogo/status" "github.com/google/uuid" "github.com/pkg/errors" log "github.com/sirupsen/logrus" "golang.org/x/exp/maps" "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" "github.com/armadaproject/armada/internal/armada/permissions" "github.com/armadaproject/armada/internal/armada/repository" @@ -94,12 +94,29 @@ func (srv *PulsarSubmitServer) SubmitJobs(grpcCtx context.Context, req *api.JobS // Create legacy API jobs from the requests. // We use the legacy code for the conversion to ensure that behaviour doesn't change. - apiJobs, err := srv.SubmitServer.createJobs(req, userId, groups) + apiJobs, responseItems, err := srv.SubmitServer.createJobs(req, userId, groups) if err != nil { - return nil, err + details := &api.JobSubmitResponse{ + JobResponseItems: responseItems, + } + + st, e := status.Newf(codes.InvalidArgument, "[SubmitJobs] Failed to parse job request: %s", err.Error()).WithDetails(details) + if e != nil { + return nil, status.Newf(codes.Internal, "[SubmitJobs] Failed to parse job request: %s", e.Error()).Err() + } + + return nil, st.Err() } - if err := commonvalidation.ValidateApiJobs(apiJobs, *srv.SubmitServer.schedulingConfig); err != nil { - return nil, err + if responseItems, err := commonvalidation.ValidateApiJobs(apiJobs, *srv.SubmitServer.schedulingConfig); err != nil { + details := &api.JobSubmitResponse{ + JobResponseItems: responseItems, + } + + st, e := status.Newf(codes.InvalidArgument, "[SubmitJobs] Failed to parse job request: %s", err.Error()).WithDetails(details) + if e != nil { + return nil, status.Newf(codes.Internal, "[SubmitJobs] Failed to parse job request: %s", e.Error()).Err() + } + return nil, st.Err() } schedulersByJobId, err := srv.assignScheduler(apiJobs) diff --git a/internal/armadactl/submit.go b/internal/armadactl/submit.go index e779b7198fd..26514c2d66c 100644 --- a/internal/armadactl/submit.go +++ b/internal/armadactl/submit.go @@ -31,10 +31,19 @@ func (a *App) Submit(path string, dryRun bool) error { } requests := client.CreateChunkedSubmitRequests(submitFile.Queue, submitFile.JobSetId, submitFile.Jobs) - return client.WithSubmitClient(a.Params.ApiConnectionDetails, func(c api.SubmitClient) error { + return client.WithSubmitClient(a.Params.ApiConnectionDetails, func(originalClient api.SubmitClient) error { + c := api.CustomSubmitClient{Inner: originalClient} + for _, request := range requests { - response, err := client.SubmitJobs(c, request) + response, err := client.CustomClientSubmitJobs(c, request) if err != nil { + if response != nil { + fmt.Fprintln(a.Out, "[JobSubmitResponse]") + for _, jobResponseItem := range response.JobResponseItems { + fmt.Fprintf(a.Out, "Error submitting job with id %s, details: %s\n", jobResponseItem.JobId, jobResponseItem.Error) + } + } + fmt.Fprintln(a.Out, "[Error]") return errors.WithMessagef(err, "error submitting request %#v", request) } diff --git a/internal/common/validation/job.go b/internal/common/validation/job.go index 60e89778a4e..0bb4028dbee 100644 --- a/internal/common/validation/job.go +++ b/internal/common/validation/job.go @@ -11,16 +11,26 @@ import ( "github.com/armadaproject/armada/pkg/api" ) -func ValidateApiJobs(jobs []*api.Job, config configuration.SchedulingConfig) error { +func ValidateApiJobs(jobs []*api.Job, config configuration.SchedulingConfig) ([]*api.JobSubmitResponseItem, error) { if _, err := validateGangs(jobs); err != nil { - return err + return nil, err } + + responseItems := make([]*api.JobSubmitResponseItem, 0, len(jobs)) for _, job := range jobs { if err := ValidateApiJob(job, config); err != nil { - return err + response := &api.JobSubmitResponseItem{ + JobId: job.Id, + Error: err.Error(), + } + responseItems = append(responseItems, response) } } - return nil + + if len(responseItems) > 0 { + return responseItems, errors.New("[createJobs] Failed to validate jobs") + } + return nil, nil } type gangDetails = struct { diff --git a/pkg/api/submit.go b/pkg/api/submit.go new file mode 100644 index 00000000000..0a913f1215b --- /dev/null +++ b/pkg/api/submit.go @@ -0,0 +1,27 @@ +package api + +import ( + "context" + + "github.com/gogo/status" + "google.golang.org/grpc" +) + +type CustomSubmitClient struct { + Inner SubmitClient +} + +func (c *CustomSubmitClient) SubmitJobs(ctx context.Context, in *JobSubmitRequest, opts ...grpc.CallOption) (*JobSubmitResponse, error) { + out, err := c.Inner.SubmitJobs(ctx, in, opts...) + if err != nil { + st := status.Convert(err) + for _, detail := range st.Details() { + switch t := detail.(type) { + case *JobSubmitResponse: + return t, err + } + } + return nil, err + } + return out, nil +} diff --git a/pkg/client/submit.go b/pkg/client/submit.go index 234dad8f49e..4214ca7ca71 100644 --- a/pkg/client/submit.go +++ b/pkg/client/submit.go @@ -38,6 +38,13 @@ func SubmitJobs(submitClient api.SubmitClient, request *api.JobSubmitRequest) (* return submitClient.SubmitJobs(ctx, request) } +func CustomClientSubmitJobs(submitClient api.CustomSubmitClient, request *api.JobSubmitRequest) (*api.JobSubmitResponse, error) { + AddClientIds(request.JobRequestItems) + ctx, cancel := common.ContextWithDefaultTimeout() + defer cancel() + return submitClient.SubmitJobs(ctx, request) +} + func CreateChunkedSubmitRequests(queue string, jobSetId string, jobs []*api.JobSubmitRequestItem) []*api.JobSubmitRequest { requests := make([]*api.JobSubmitRequest, 0, 10)