diff --git a/examples/features/orca/server/main.go b/examples/features/orca/server/main.go index 5d4bdb163a1..e52d5d06eeb 100644 --- a/examples/features/orca/server/main.go +++ b/examples/features/orca/server/main.go @@ -44,9 +44,9 @@ type server struct { func (s *server) UnaryEcho(ctx context.Context, in *pb.EchoRequest) (*pb.EchoResponse, error) { // Report a sample cost for this query. - cmr := orca.CallMetricRecorderFromContext(ctx) + cmr := orca.CallMetricsRecorderFromContext(ctx) if cmr == nil { - return nil, status.Errorf(codes.Internal, "unable to retrieve call metric recorder (missing ORCA ServerOption?)") + return nil, status.Errorf(codes.Internal, "unable to retrieve call metrics recorder (missing ORCA ServerOption?)") } cmr.SetRequestCost("db_queries", 10) @@ -63,27 +63,31 @@ func main() { fmt.Printf("Server listening at %v\n", lis.Addr()) // Create the gRPC server with the orca.CallMetricsServerOption() option, - // which will enable per-call metric recording. - s := grpc.NewServer(orca.CallMetricsServerOption()) + // which will enable per-call metric recording. No ServerMetricsProvider + // is given here because the out-of-band reporting is enabled separately. + s := grpc.NewServer(orca.CallMetricsServerOption(nil)) pb.RegisterEchoServer(s, &server{}) // Register the orca service for out-of-band metric reporting, and set the // minimum reporting interval to 3 seconds. Note that, by default, the // minimum interval must be at least 30 seconds, but 3 seconds is set via // an internal-only option for illustration purposes only. - opts := orca.ServiceOptions{MinReportingInterval: 3 * time.Second} + smr := orca.NewServerMetricsRecorder() + opts := orca.ServiceOptions{ + MinReportingInterval: 3 * time.Second, + ServerMetricsProvider: smr, + } internal.ORCAAllowAnyMinReportingInterval.(func(so *orca.ServiceOptions))(&opts) - orcaSvc, err := orca.Register(s, opts) - if err != nil { + if err := orca.Register(s, opts); err != nil { log.Fatalf("Failed to register ORCA service: %v", err) } // Simulate CPU utilization reporting. go func() { for { - orcaSvc.SetCPUUtilization(.5) + smr.SetCPUUtilization(.5) time.Sleep(2 * time.Second) - orcaSvc.SetCPUUtilization(.9) + smr.SetCPUUtilization(.9) time.Sleep(2 * time.Second) } }() diff --git a/orca/call_metric_recorder.go b/orca/call_metric_recorder.go deleted file mode 100644 index 62f2a1a6c22..00000000000 --- a/orca/call_metric_recorder.go +++ /dev/null @@ -1,130 +0,0 @@ -/* - * - * Copyright 2022 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package orca - -import ( - "context" - "sync" - "sync/atomic" - - v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3" -) - -// CallMetricRecorder provides functionality to record per-RPC custom backend -// metrics. See CallMetricsServerOption() for more details. -// -// Safe for concurrent use. -type CallMetricRecorder struct { - cpu atomic.Value // float64 - memory atomic.Value // float64 - - mu sync.RWMutex - requestCost map[string]float64 - utilization map[string]float64 -} - -func newCallMetricRecorder() *CallMetricRecorder { - return &CallMetricRecorder{ - requestCost: make(map[string]float64), - utilization: make(map[string]float64), - } -} - -// SetCPUUtilization records a measurement for the CPU utilization metric. -func (c *CallMetricRecorder) SetCPUUtilization(val float64) { - c.cpu.Store(val) -} - -// SetMemoryUtilization records a measurement for the memory utilization metric. -func (c *CallMetricRecorder) SetMemoryUtilization(val float64) { - c.memory.Store(val) -} - -// SetRequestCost records a measurement for a request cost metric, -// uniquely identifiable by name. -func (c *CallMetricRecorder) SetRequestCost(name string, val float64) { - c.mu.Lock() - c.requestCost[name] = val - c.mu.Unlock() -} - -// SetUtilization records a measurement for a utilization metric uniquely -// identifiable by name. -func (c *CallMetricRecorder) SetUtilization(name string, val float64) { - c.mu.Lock() - c.utilization[name] = val - c.mu.Unlock() -} - -// toLoadReportProto dumps the recorded measurements as an OrcaLoadReport proto. -func (c *CallMetricRecorder) toLoadReportProto() *v3orcapb.OrcaLoadReport { - c.mu.RLock() - defer c.mu.RUnlock() - - cost := make(map[string]float64, len(c.requestCost)) - for k, v := range c.requestCost { - cost[k] = v - } - util := make(map[string]float64, len(c.utilization)) - for k, v := range c.utilization { - util[k] = v - } - cpu, _ := c.cpu.Load().(float64) - mem, _ := c.memory.Load().(float64) - return &v3orcapb.OrcaLoadReport{ - CpuUtilization: cpu, - MemUtilization: mem, - RequestCost: cost, - Utilization: util, - } -} - -type callMetricRecorderCtxKey struct{} - -// CallMetricRecorderFromContext returns the RPC specific custom metrics -// recorder [CallMetricRecorder] embedded in the provided RPC context. -// -// Returns nil if no custom metrics recorder is found in the provided context, -// which will be the case when custom metrics reporting is not enabled. -func CallMetricRecorderFromContext(ctx context.Context) *CallMetricRecorder { - rw, ok := ctx.Value(callMetricRecorderCtxKey{}).(*recorderWrapper) - if !ok { - return nil - } - return rw.recorder() -} - -func newContextWithRecorderWrapper(ctx context.Context, r *recorderWrapper) context.Context { - return context.WithValue(ctx, callMetricRecorderCtxKey{}, r) -} - -// recorderWrapper is a wrapper around a CallMetricRecorder to ensures that -// concurrent calls to CallMetricRecorderFromContext() results in only one -// allocation of the underlying metric recorder. -type recorderWrapper struct { - once sync.Once - r *CallMetricRecorder -} - -func (rw *recorderWrapper) recorder() *CallMetricRecorder { - rw.once.Do(func() { - rw.r = newCallMetricRecorder() - }) - return rw.r -} diff --git a/orca/call_metrics.go b/orca/call_metrics.go new file mode 100644 index 00000000000..558c7bce6a8 --- /dev/null +++ b/orca/call_metrics.go @@ -0,0 +1,196 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package orca + +import ( + "context" + "sync" + + "google.golang.org/grpc" + grpcinternal "google.golang.org/grpc/internal" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/orca/internal" + "google.golang.org/protobuf/proto" +) + +// CallMetricsRecorder allows a service method handler to record per-RPC +// metrics. It contains all utilization-based metrics from +// ServerMetricsRecorder as well as additional request cost metrics. +type CallMetricsRecorder interface { + ServerMetricsRecorder + + // SetRequestCost sets the relevant server metric. + SetRequestCost(name string, val float64) + // DeleteRequestCost deletes the relevant server metric to prevent it + // from being sent. + DeleteRequestCost(name string) + + // SetNamedMetric sets the relevant server metric. + SetNamedMetric(name string, val float64) + // DeleteNamedMetric deletes the relevant server metric to prevent it + // from being sent. + DeleteNamedMetric(name string) +} + +type callMetricsRecorderCtxKey struct{} + +// CallMetricsRecorderFromContext returns the RPC-specific custom metrics +// recorder embedded in the provided RPC context. +// +// Returns nil if no custom metrics recorder is found in the provided context, +// which will be the case when custom metrics reporting is not enabled. +func CallMetricsRecorderFromContext(ctx context.Context) CallMetricsRecorder { + rw, ok := ctx.Value(callMetricsRecorderCtxKey{}).(*recorderWrapper) + if !ok { + return nil + } + return rw.recorder() +} + +// recorderWrapper is a wrapper around a CallMetricsRecorder to ensure that +// concurrent calls to CallMetricsRecorderFromContext() results in only one +// allocation of the underlying metrics recorder, while also allowing for lazy +// initialization of the recorder itself. +type recorderWrapper struct { + once sync.Once + r CallMetricsRecorder + smp ServerMetricsProvider +} + +func (rw *recorderWrapper) recorder() CallMetricsRecorder { + rw.once.Do(func() { + rw.r = newServerMetricsRecorder() + }) + return rw.r +} + +// setTrailerMetadata adds a trailer metadata entry with key being set to +// `internal.TrailerMetadataKey` and value being set to the binary-encoded +// orca.OrcaLoadReport protobuf message. +// +// This function is called from the unary and streaming interceptors defined +// above. Any errors encountered here are not propagated to the caller because +// they are ignored there. Hence we simply log any errors encountered here at +// warning level, and return nothing. +func (rw *recorderWrapper) setTrailerMetadata(ctx context.Context) { + var sm *ServerMetrics + if rw.smp != nil { + sm = rw.smp.ServerMetrics() + sm.merge(rw.r.ServerMetrics()) + } else { + sm = rw.r.ServerMetrics() + } + + b, err := proto.Marshal(sm.toLoadReportProto()) + if err != nil { + logger.Warningf("Failed to marshal load report: %v", err) + return + } + if err := grpc.SetTrailer(ctx, metadata.Pairs(internal.TrailerMetadataKey, string(b))); err != nil { + logger.Warningf("Failed to set trailer metadata: %v", err) + } +} + +var joinServerOptions = grpcinternal.JoinServerOptions.(func(...grpc.ServerOption) grpc.ServerOption) + +// CallMetricsServerOption returns a server option which enables the reporting +// of per-RPC custom backend metrics for unary and streaming RPCs. +// +// Server applications interested in injecting custom backend metrics should +// pass the server option returned from this function as the first argument to +// grpc.NewServer(). +// +// Subsequently, server RPC handlers can retrieve a reference to the RPC +// specific custom metrics recorder [CallMetricsRecorder] to be used, via a call +// to CallMetricsRecorderFromContext(), and inject custom metrics at any time +// during the RPC lifecycle. +// +// The injected custom metrics will be sent as part of trailer metadata, as a +// binary-encoded [ORCA LoadReport] protobuf message, with the metadata key +// being set be "endpoint-load-metrics-bin". +// +// If a non-nil ServerMetricsProvider is provided, the gRPC server will +// transmit the metrics it provides, overwritten by any per-RPC metrics given +// to the CallMetricsRecorder. A ServerMetricsProvider is typically obtained +// by calling NewServerMetricsRecorder. +// +// [ORCA LoadReport]: https://github.com/cncf/xds/blob/main/xds/data/orca/v3/orca_load_report.proto#L15 +func CallMetricsServerOption(smp ServerMetricsProvider) grpc.ServerOption { + return joinServerOptions(grpc.ChainUnaryInterceptor(unaryInt(smp)), grpc.ChainStreamInterceptor(streamInt(smp))) +} + +func unaryInt(smp ServerMetricsProvider) func(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + return func(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + // We don't allocate the metric recorder here. It will be allocated the + // first time the user calls CallMetricsRecorderFromContext(). + rw := &recorderWrapper{smp: smp} + ctxWithRecorder := newContextWithRecorderWrapper(ctx, rw) + + resp, err := handler(ctxWithRecorder, req) + + // It is safe to access the underlying metric recorder inside the wrapper at + // this point, as the user's RPC handler is done executing, and therefore + // there will be no more calls to CallMetricsRecorderFromContext(), which is + // where the metric recorder is lazy allocated. + if rw.r != nil { + rw.setTrailerMetadata(ctx) + } + return resp, err + } +} + +func streamInt(smp ServerMetricsProvider) func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + // We don't allocate the metric recorder here. It will be allocated the + // first time the user calls CallMetricsRecorderFromContext(). + rw := &recorderWrapper{smp: smp} + ws := &wrappedStream{ + ServerStream: ss, + ctx: newContextWithRecorderWrapper(ss.Context(), rw), + } + + err := handler(srv, ws) + + // It is safe to access the underlying metric recorder inside the wrapper at + // this point, as the user's RPC handler is done executing, and therefore + // there will be no more calls to CallMetricsRecorderFromContext(), which is + // where the metric recorder is lazy allocated. + if rw.r != nil { + rw.setTrailerMetadata(ss.Context()) + } + return err + } +} + +func newContextWithRecorderWrapper(ctx context.Context, r *recorderWrapper) context.Context { + return context.WithValue(ctx, callMetricsRecorderCtxKey{}, r) +} + +// wrappedStream wraps the grpc.ServerStream received by the streaming +// interceptor. Overrides only the Context() method to return a context which +// contains a reference to the CallMetricsRecorder corresponding to this +// stream. +type wrappedStream struct { + grpc.ServerStream + ctx context.Context +} + +func (w *wrappedStream) Context() context.Context { + return w.ctx +} diff --git a/orca/call_metric_recorder_test.go b/orca/call_metrics_test.go similarity index 91% rename from orca/call_metric_recorder_test.go rename to orca/call_metrics_test.go index 43d0e45291e..4374b593b9f 100644 --- a/orca/call_metric_recorder_test.go +++ b/orca/call_metrics_test.go @@ -78,23 +78,24 @@ func (s) TestE2ECallMetricsUnary(t *testing.T) { for _, test := range tests { t.Run(test.desc, func(t *testing.T) { - // A server option to enables reporting of per-call backend metrics. - callMetricsServerOption := orca.CallMetricsServerOption() + // A server option to enable reporting of per-call backend metrics. + smr := orca.NewServerMetricsRecorder() + callMetricsServerOption := orca.CallMetricsServerOption(smr) + smr.SetCPUUtilization(1.0) // An interceptor to injects custom backend metrics, added only when // the injectMetrics field in the test is set. injectingInterceptor := func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { - recorder := orca.CallMetricRecorderFromContext(ctx) + recorder := orca.CallMetricsRecorderFromContext(ctx) if recorder == nil { err := errors.New("Failed to retrieve per-RPC custom metrics recorder from the RPC context") t.Error(err) return nil, err } - recorder.SetCPUUtilization(1.0) recorder.SetMemoryUtilization(50.0) // This value will be overwritten by a write to the same metric // from the server handler. - recorder.SetUtilization("queueSize", 1.0) + recorder.SetNamedUtilization("queueSize", 1.0) return handler(ctx, req) } @@ -106,14 +107,14 @@ func (s) TestE2ECallMetricsUnary(t *testing.T) { if !test.injectMetrics { return &testpb.Empty{}, nil } - recorder := orca.CallMetricRecorderFromContext(ctx) + recorder := orca.CallMetricsRecorderFromContext(ctx) if recorder == nil { err := errors.New("Failed to retrieve per-RPC custom metrics recorder from the RPC context") t.Error(err) return nil, err } recorder.SetRequestCost("queryCost", 25.0) - recorder.SetUtilization("queueSize", 75.0) + recorder.SetNamedUtilization("queueSize", 75.0) return &testpb.Empty{}, nil }, } @@ -183,23 +184,24 @@ func (s) TestE2ECallMetricsStreaming(t *testing.T) { for _, test := range tests { t.Run(test.desc, func(t *testing.T) { - // A server option to enables reporting of per-call backend metrics. - callMetricsServerOption := orca.CallMetricsServerOption() + // A server option to enable reporting of per-call backend metrics. + smr := orca.NewServerMetricsRecorder() + callMetricsServerOption := orca.CallMetricsServerOption(smr) + smr.SetCPUUtilization(1.0) // An interceptor which injects custom backend metrics, added only // when the injectMetrics field in the test is set. injectingInterceptor := func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { - recorder := orca.CallMetricRecorderFromContext(ss.Context()) + recorder := orca.CallMetricsRecorderFromContext(ss.Context()) if recorder == nil { err := errors.New("Failed to retrieve per-RPC custom metrics recorder from the RPC context") t.Error(err) return err } - recorder.SetCPUUtilization(1.0) recorder.SetMemoryUtilization(50.0) // This value will be overwritten by a write to the same metric // from the server handler. - recorder.SetUtilization("queueSize", 1.0) + recorder.SetNamedUtilization("queueSize", 1.0) return handler(srv, ss) } @@ -209,14 +211,14 @@ func (s) TestE2ECallMetricsStreaming(t *testing.T) { srv := stubserver.StubServer{ FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error { if test.injectMetrics { - recorder := orca.CallMetricRecorderFromContext(stream.Context()) + recorder := orca.CallMetricsRecorderFromContext(stream.Context()) if recorder == nil { err := errors.New("Failed to retrieve per-RPC custom metrics recorder from the RPC context") t.Error(err) return err } recorder.SetRequestCost("queryCost", 25.0) - recorder.SetUtilization("queueSize", 75.0) + recorder.SetNamedUtilization("queueSize", 75.0) } // Streaming implementation replies with a dummy response until the diff --git a/orca/orca.go b/orca/orca.go index 2c958b6902e..771db36af1c 100644 --- a/orca/orca.go +++ b/orca/orca.go @@ -27,128 +27,21 @@ package orca import ( - "context" - "errors" - - "google.golang.org/grpc" "google.golang.org/grpc/grpclog" - igrpc "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/balancerload" "google.golang.org/grpc/metadata" "google.golang.org/grpc/orca/internal" - "google.golang.org/protobuf/proto" -) - -var ( - logger = grpclog.Component("orca-backend-metrics") - joinServerOptions = igrpc.JoinServerOptions.(func(...grpc.ServerOption) grpc.ServerOption) ) -const trailerMetadataKey = "endpoint-load-metrics-bin" - -// CallMetricsServerOption returns a server option which enables the reporting -// of per-RPC custom backend metrics for unary and streaming RPCs. -// -// Server applications interested in injecting custom backend metrics should -// pass the server option returned from this function as the first argument to -// grpc.NewServer(). -// -// Subsequently, server RPC handlers can retrieve a reference to the RPC -// specific custom metrics recorder [CallMetricRecorder] to be used, via a call -// to CallMetricRecorderFromContext(), and inject custom metrics at any time -// during the RPC lifecycle. -// -// The injected custom metrics will be sent as part of trailer metadata, as a -// binary-encoded [ORCA LoadReport] protobuf message, with the metadata key -// being set be "endpoint-load-metrics-bin". -// -// [ORCA LoadReport]: https://github.com/cncf/xds/blob/main/xds/data/orca/v3/orca_load_report.proto#L15 -func CallMetricsServerOption() grpc.ServerOption { - return joinServerOptions(grpc.ChainUnaryInterceptor(unaryInt), grpc.ChainStreamInterceptor(streamInt)) -} - -func unaryInt(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { - // We don't allocate the metric recorder here. It will be allocated the - // first time the user calls CallMetricRecorderFromContext(). - rw := &recorderWrapper{} - ctxWithRecorder := newContextWithRecorderWrapper(ctx, rw) - - resp, err := handler(ctxWithRecorder, req) - - // It is safe to access the underlying metric recorder inside the wrapper at - // this point, as the user's RPC handler is done executing, and therefore - // there will be no more calls to CallMetricRecorderFromContext(), which is - // where the metric recorder is lazy allocated. - if rw.r == nil { - return resp, err - } - setTrailerMetadata(ctx, rw.r) - return resp, err -} - -func streamInt(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { - // We don't allocate the metric recorder here. It will be allocated the - // first time the user calls CallMetricRecorderFromContext(). - rw := &recorderWrapper{} - ws := &wrappedStream{ - ServerStream: ss, - ctx: newContextWithRecorderWrapper(ss.Context(), rw), - } - - err := handler(srv, ws) - - // It is safe to access the underlying metric recorder inside the wrapper at - // this point, as the user's RPC handler is done executing, and therefore - // there will be no more calls to CallMetricRecorderFromContext(), which is - // where the metric recorder is lazy allocated. - if rw.r == nil { - return err - } - setTrailerMetadata(ss.Context(), rw.r) - return err -} - -// setTrailerMetadata adds a trailer metadata entry with key being set to -// `trailerMetadataKey` and value being set to the binary-encoded -// orca.OrcaLoadReport protobuf message. -// -// This function is called from the unary and streaming interceptors defined -// above. Any errors encountered here are not propagated to the caller because -// they are ignored there. Hence we simply log any errors encountered here at -// warning level, and return nothing. -func setTrailerMetadata(ctx context.Context, r *CallMetricRecorder) { - b, err := proto.Marshal(r.toLoadReportProto()) - if err != nil { - logger.Warningf("failed to marshal load report: %v", err) - return - } - if err := grpc.SetTrailer(ctx, metadata.Pairs(trailerMetadataKey, string(b))); err != nil { - logger.Warningf("failed to set trailer metadata: %v", err) - } -} - -// wrappedStream wraps the grpc.ServerStream received by the streaming -// interceptor. Overrides only the Context() method to return a context which -// contains a reference to the CallMetricRecorder corresponding to this stream. -type wrappedStream struct { - grpc.ServerStream - ctx context.Context -} - -func (w *wrappedStream) Context() context.Context { - return w.ctx -} - -// ErrLoadReportMissing indicates no ORCA load report was found in trailers. -var ErrLoadReportMissing = errors.New("orca load report missing in provided metadata") +var logger = grpclog.Component("orca-backend-metrics") // loadParser implements the Parser interface defined in `internal/balancerload` // package. This interface is used by the client stream to parse load reports // sent by the server in trailer metadata. The parsed loads are then sent to // balancers via balancer.DoneInfo. // -// The grpc package cannot directly call orca.ToLoadReport() as that would cause -// an import cycle. Hence this roundabout method is used. +// The grpc package cannot directly call toLoadReport() as that would cause an +// import cycle. Hence this roundabout method is used. type loadParser struct{} func (loadParser) Parse(md metadata.MD) interface{} { diff --git a/orca/producer_test.go b/orca/producer_test.go index f15317995de..be41424063f 100644 --- a/orca/producer_test.go +++ b/orca/producer_test.go @@ -128,11 +128,11 @@ func (s) TestProducer(t *testing.T) { // Register the OpenRCAService with a very short metrics reporting interval. const shortReportingInterval = 50 * time.Millisecond - opts := orca.ServiceOptions{MinReportingInterval: shortReportingInterval} + smr := orca.NewServerMetricsRecorder() + opts := orca.ServiceOptions{MinReportingInterval: shortReportingInterval, ServerMetricsProvider: smr} internal.AllowAnyMinReportingInterval.(func(*orca.ServiceOptions))(&opts) s := grpc.NewServer() - orcaSrv, err := orca.Register(s, opts) - if err != nil { + if err := orca.Register(s, opts); err != nil { t.Fatalf("orca.Register failed: %v", err) } go s.Serve(lis) @@ -157,9 +157,9 @@ func (s) TestProducer(t *testing.T) { defer oobLis.Stop() // Set a few metrics and wait for them on the client side. - orcaSrv.SetCPUUtilization(10) - orcaSrv.SetMemoryUtilization(100) - orcaSrv.SetUtilization("bob", 555) + smr.SetCPUUtilization(10) + smr.SetMemoryUtilization(100) + smr.SetNamedUtilization("bob", 555) loadReportWant := &v3orcapb.OrcaLoadReport{ CpuUtilization: 10, MemUtilization: 100, @@ -181,9 +181,9 @@ testReport: } // Change and add metrics and wait for them on the client side. - orcaSrv.SetCPUUtilization(50) - orcaSrv.SetMemoryUtilization(200) - orcaSrv.SetUtilization("mary", 321) + smr.SetCPUUtilization(50) + smr.SetMemoryUtilization(200) + smr.SetNamedUtilization("mary", 321) loadReportWant = &v3orcapb.OrcaLoadReport{ CpuUtilization: 50, MemUtilization: 200, diff --git a/orca/server_metrics.go b/orca/server_metrics.go new file mode 100644 index 00000000000..6b63d3d252b --- /dev/null +++ b/orca/server_metrics.go @@ -0,0 +1,270 @@ +/* + * + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package orca + +import ( + "sync" + + v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3" +) + +// ServerMetrics is the data returned from a server to a client to describe the +// current state of the server and/or the cost of a request when used per-call. +type ServerMetrics struct { + CPUUtilization float64 // CPU utilization: [0, 1.0]; unset=-1 + MemUtilization float64 // Memory utilization: [0, 1.0]; unset=-1 + QPS float64 // queries per second: [0, inf); unset=-1 + EPS float64 // errors per second: [0, inf); unset=-1 + + // The following maps must never be nil. + + Utilization map[string]float64 // Custom fields: [0, 1.0] + RequestCost map[string]float64 // Custom fields: [0, inf); not sent OOB + NamedMetrics map[string]float64 // Custom fields: [0, inf); not sent OOB +} + +// toLoadReportProto dumps sm as an OrcaLoadReport proto. +func (sm *ServerMetrics) toLoadReportProto() *v3orcapb.OrcaLoadReport { + ret := &v3orcapb.OrcaLoadReport{ + Utilization: sm.Utilization, + RequestCost: sm.RequestCost, + NamedMetrics: sm.NamedMetrics, + } + if sm.CPUUtilization != -1 { + ret.CpuUtilization = sm.CPUUtilization + } + if sm.MemUtilization != -1 { + ret.MemUtilization = sm.MemUtilization + } + if sm.QPS != -1 { + ret.RpsFractional = sm.QPS + } + if sm.EPS != -1 { + ret.Eps = sm.EPS + } + return ret +} + +// merge merges o into sm, overwriting any values present in both. +func (sm *ServerMetrics) merge(o *ServerMetrics) { + if o.CPUUtilization != -1 { + sm.CPUUtilization = o.CPUUtilization + } + if o.MemUtilization != -1 { + sm.MemUtilization = o.MemUtilization + } + if o.QPS != -1 { + sm.QPS = o.QPS + } + if o.EPS != -1 { + sm.EPS = o.EPS + } + mergeMap(sm.Utilization, o.Utilization) + mergeMap(sm.RequestCost, o.RequestCost) + mergeMap(sm.NamedMetrics, o.NamedMetrics) +} + +func mergeMap(a, b map[string]float64) { + for k, v := range b { + a[k] = v + } +} + +// ServerMetricsRecorder allows for recording and providing out of band server +// metrics. +type ServerMetricsRecorder interface { + ServerMetricsProvider + + // SetCPUUtilization sets the relevant server metric. + SetCPUUtilization(float64) + // DeleteCPUUtilization deletes the relevant server metric to prevent it + // from being sent. + DeleteCPUUtilization() + + // SetMemoryUtilization sets the relevant server metric. + SetMemoryUtilization(float64) + // DeleteMemoryUtilization deletes the relevant server metric to prevent it + // from being sent. + DeleteMemoryUtilization() + + // SetQPS sets the relevant server metric. + SetQPS(float64) + // DeleteQPS deletes the relevant server metric to prevent it from being + // sent. + DeleteQPS() + + // SetEPS sets the relevant server metric. + SetEPS(float64) + // DeleteEPS deletes the relevant server metric to prevent it from being + // sent. + DeleteEPS() + + // SetNamedUtilization sets the relevant server metric. + SetNamedUtilization(name string, val float64) + // DeleteNamedUtilization deletes the relevant server metric to prevent it + // from being sent. + DeleteNamedUtilization(name string) +} + +type serverMetricsRecorder struct { + mu sync.Mutex // protects state + state *ServerMetrics // the current metrics +} + +// NewServerMetricsRecorder returns an in-memory store for ServerMetrics and +// allows for safe setting and retrieving of ServerMetrics. Also implements +// ServerMetricsProvider for use with NewService. +func NewServerMetricsRecorder() ServerMetricsRecorder { + return newServerMetricsRecorder() +} + +func newServerMetricsRecorder() *serverMetricsRecorder { + return &serverMetricsRecorder{ + state: &ServerMetrics{ + CPUUtilization: -1, + MemUtilization: -1, + QPS: -1, + EPS: -1, + Utilization: make(map[string]float64), + RequestCost: make(map[string]float64), + NamedMetrics: make(map[string]float64), + }, + } +} + +// ServerMetrics returns a copy of the current ServerMetrics. +func (s *serverMetricsRecorder) ServerMetrics() *ServerMetrics { + s.mu.Lock() + defer s.mu.Unlock() + return &ServerMetrics{ + CPUUtilization: s.state.CPUUtilization, + MemUtilization: s.state.MemUtilization, + QPS: s.state.QPS, + EPS: s.state.EPS, + Utilization: copyMap(s.state.Utilization), + RequestCost: copyMap(s.state.RequestCost), + NamedMetrics: copyMap(s.state.NamedMetrics), + } +} + +func copyMap(m map[string]float64) map[string]float64 { + ret := make(map[string]float64, len(m)) + for k, v := range m { + ret[k] = v + } + return ret +} + +// SetCPUUtilization records a measurement for the CPU utilization metric. +func (s *serverMetricsRecorder) SetCPUUtilization(val float64) { + s.mu.Lock() + defer s.mu.Unlock() + s.state.CPUUtilization = val +} + +// DeleteCPUUtilization deletes the relevant server metric to prevent it from +// being sent. +func (s *serverMetricsRecorder) DeleteCPUUtilization() { + s.SetCPUUtilization(-1) +} + +// SetMemoryUtilization records a measurement for the memory utilization metric. +func (s *serverMetricsRecorder) SetMemoryUtilization(val float64) { + s.mu.Lock() + defer s.mu.Unlock() + s.state.MemUtilization = val +} + +// DeleteMemoryUtilization deletes the relevant server metric to prevent it +// from being sent. +func (s *serverMetricsRecorder) DeleteMemoryUtilization() { + s.SetMemoryUtilization(-1) +} + +// SetQPS records a measurement for the QPS metric. +func (s *serverMetricsRecorder) SetQPS(val float64) { + s.mu.Lock() + defer s.mu.Unlock() + s.state.QPS = val +} + +// DeleteQPS deletes the relevant server metric to prevent it from being sent. +func (s *serverMetricsRecorder) DeleteQPS() { + s.SetQPS(-1) +} + +// SetEPS records a measurement for the EPS metric. +func (s *serverMetricsRecorder) SetEPS(val float64) { + s.mu.Lock() + defer s.mu.Unlock() + s.state.EPS = val +} + +// DeleteEPS deletes the relevant server metric to prevent it from being sent. +func (s *serverMetricsRecorder) DeleteEPS() { + s.SetEPS(-1) +} + +// SetNamedUtilization records a measurement for a utilization metric uniquely +// identifiable by name. +func (s *serverMetricsRecorder) SetNamedUtilization(name string, val float64) { + s.mu.Lock() + defer s.mu.Unlock() + s.state.Utilization[name] = val +} + +// DeleteNamedUtilization deletes any previously recorded measurement for a +// utilization metric uniquely identifiable by name. +func (s *serverMetricsRecorder) DeleteNamedUtilization(name string) { + s.mu.Lock() + defer s.mu.Unlock() + delete(s.state.Utilization, name) +} + +// SetRequestCost records a measurement for a utilization metric uniquely +// identifiable by name. +func (s *serverMetricsRecorder) SetRequestCost(name string, val float64) { + s.mu.Lock() + defer s.mu.Unlock() + s.state.RequestCost[name] = val +} + +// DeleteRequestCost deletes any previously recorded measurement for a +// utilization metric uniquely identifiable by name. +func (s *serverMetricsRecorder) DeleteRequestCost(name string) { + s.mu.Lock() + defer s.mu.Unlock() + delete(s.state.RequestCost, name) +} + +// SetNamedMetric records a measurement for a utilization metric uniquely +// identifiable by name. +func (s *serverMetricsRecorder) SetNamedMetric(name string, val float64) { + s.mu.Lock() + defer s.mu.Unlock() + s.state.NamedMetrics[name] = val +} + +// DeleteNamedMetric deletes any previously recorded measurement for a +// utilization metric uniquely identifiable by name. +func (s *serverMetricsRecorder) DeleteNamedMetric(name string) { + s.mu.Lock() + defer s.mu.Unlock() + delete(s.state.NamedMetrics, name) +} diff --git a/orca/service.go b/orca/service.go index ae011fd9a9d..7461a6b05a1 100644 --- a/orca/service.go +++ b/orca/service.go @@ -19,7 +19,7 @@ package orca import ( - "sync" + "fmt" "time" "google.golang.org/grpc" @@ -28,7 +28,6 @@ import ( ointernal "google.golang.org/grpc/orca/internal" "google.golang.org/grpc/status" - v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3" v3orcaservicegrpc "github.com/cncf/xds/go/xds/service/orca/v3" v3orcaservicepb "github.com/cncf/xds/go/xds/service/orca/v3" ) @@ -60,15 +59,16 @@ type Service struct { // Minimum reporting interval, as configured by the user, or the default. minReportingInterval time.Duration - // mu guards the custom metrics injected by the server application. - mu sync.RWMutex - cpu float64 - memory float64 - utilization map[string]float64 + smProvider ServerMetricsProvider } // ServiceOptions contains options to configure the ORCA service implementation. type ServiceOptions struct { + // ServerMetricsProvider is the provider to be used by the service for + // reporting OOB server metrics to clients. Typically obtained via + // NewServerMetricsRecorder. This field is required. + ServerMetricsProvider ServerMetricsProvider + // MinReportingInterval sets the lower bound for how often out-of-band // metrics are reported on the streaming RPC initiated by the client. If // unspecified, negative or less than the default value of 30s, the default @@ -81,11 +81,22 @@ type ServiceOptions struct { allowAnyMinReportingInterval bool } +// A ServerMetricsProvider provides ServerMetrics upon request. +type ServerMetricsProvider interface { + // ServerMetrics returns the current set of server metrics. It should + // return a read-only, immutable copy of the data that is active at the + // time of the call. + ServerMetrics() *ServerMetrics +} + // NewService creates a new ORCA service implementation configured using the // provided options. func NewService(opts ServiceOptions) (*Service, error) { // The default minimum supported reporting interval value can be overridden // for testing purposes through the orca internal package. + if opts.ServerMetricsProvider == nil { + return nil, fmt.Errorf("ServerMetricsProvider not specified") + } if !opts.allowAnyMinReportingInterval { if opts.MinReportingInterval < 0 || opts.MinReportingInterval < minReportingInterval { opts.MinReportingInterval = minReportingInterval @@ -93,20 +104,22 @@ func NewService(opts ServiceOptions) (*Service, error) { } service := &Service{ minReportingInterval: opts.MinReportingInterval, - utilization: make(map[string]float64), + smProvider: opts.ServerMetricsProvider, } return service, nil } // Register creates a new ORCA service implementation configured using the -// provided options and registers the same on the provided service registrar. -func Register(s *grpc.Server, opts ServiceOptions) (*Service, error) { +// provided options and registers the same on the provided grpc Server. +func Register(s *grpc.Server, opts ServiceOptions) error { + // TODO(https://github.com/cncf/xds/issues/41): replace *grpc.Server with + // grpc.ServiceRegistrar when possible. service, err := NewService(opts) if err != nil { - return nil, err + return err } v3orcaservicegrpc.RegisterOpenRcaServiceServer(s, service) - return service, nil + return nil } // determineReportingInterval determines the reporting interval for out-of-band @@ -127,7 +140,7 @@ func (s *Service) determineReportingInterval(req *v3orcaservicepb.OrcaLoadReport } func (s *Service) sendMetricsResponse(stream v3orcaservicegrpc.OpenRcaService_StreamCoreMetricsServer) error { - return stream.Send(s.toLoadReportProto()) + return stream.Send(s.smProvider.ServerMetrics().toLoadReportProto()) } // StreamCoreMetrics streams custom backend metrics injected by the server @@ -148,49 +161,3 @@ func (s *Service) StreamCoreMetrics(req *v3orcaservicepb.OrcaLoadReportRequest, } } } - -// SetCPUUtilization records a measurement for the CPU utilization metric. -func (s *Service) SetCPUUtilization(val float64) { - s.mu.Lock() - s.cpu = val - s.mu.Unlock() -} - -// SetMemoryUtilization records a measurement for the memory utilization metric. -func (s *Service) SetMemoryUtilization(val float64) { - s.mu.Lock() - s.memory = val - s.mu.Unlock() -} - -// SetUtilization records a measurement for a utilization metric uniquely -// identifiable by name. -func (s *Service) SetUtilization(name string, val float64) { - s.mu.Lock() - s.utilization[name] = val - s.mu.Unlock() -} - -// DeleteUtilization deletes any previously recorded measurement for a -// utilization metric uniquely identifiable by name. -func (s *Service) DeleteUtilization(name string) { - s.mu.Lock() - delete(s.utilization, name) - s.mu.Unlock() -} - -// toLoadReportProto dumps the recorded measurements as an OrcaLoadReport proto. -func (s *Service) toLoadReportProto() *v3orcapb.OrcaLoadReport { - s.mu.RLock() - defer s.mu.RUnlock() - - util := make(map[string]float64, len(s.utilization)) - for k, v := range s.utilization { - util[k] = v - } - return &v3orcapb.OrcaLoadReport{ - CpuUtilization: s.cpu, - MemUtilization: s.memory, - Utilization: util, - } -} diff --git a/orca/service_test.go b/orca/service_test.go index 715d53241c7..e5cf59fccb4 100644 --- a/orca/service_test.go +++ b/orca/service_test.go @@ -52,7 +52,7 @@ type testServiceImpl struct { requests int64 testgrpc.TestServiceServer - orcaSrv *orca.Service + smr orca.ServerMetricsRecorder } func (t *testServiceImpl) UnaryCall(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { @@ -60,26 +60,26 @@ func (t *testServiceImpl) UnaryCall(context.Context, *testpb.SimpleRequest) (*te t.requests++ t.mu.Unlock() - t.orcaSrv.SetUtilization(requestsMetricKey, float64(t.requests)) - t.orcaSrv.SetCPUUtilization(50.0) - t.orcaSrv.SetMemoryUtilization(99.0) + t.smr.SetNamedUtilization(requestsMetricKey, float64(t.requests)) + t.smr.SetCPUUtilization(50.0) + t.smr.SetMemoryUtilization(99.0) return &testpb.SimpleResponse{}, nil } func (t *testServiceImpl) EmptyCall(context.Context, *testpb.Empty) (*testpb.Empty, error) { - t.orcaSrv.DeleteUtilization(requestsMetricKey) - t.orcaSrv.SetCPUUtilization(0) - t.orcaSrv.SetMemoryUtilization(0) + t.smr.DeleteNamedUtilization(requestsMetricKey) + t.smr.SetCPUUtilization(0) + t.smr.SetMemoryUtilization(0) return &testpb.Empty{}, nil } -// Test_E2E_CustomBackendMetrics_OutOfBand tests the injection of out-of-band +// TestE2E_CustomBackendMetrics_OutOfBand tests the injection of out-of-band // custom backend metrics from the server application, and verifies that // expected load reports are received at the client. // // TODO: Change this test to use the client API, when ready, to read the // out-of-band metrics pushed by the server. -func (s) Test_E2E_CustomBackendMetrics_OutOfBand(t *testing.T) { +func (s) TestE2E_CustomBackendMetrics_OutOfBand(t *testing.T) { lis, err := testutils.LocalTCPListener() if err != nil { t.Fatal(err) @@ -87,18 +87,18 @@ func (s) Test_E2E_CustomBackendMetrics_OutOfBand(t *testing.T) { // Override the min reporting interval in the internal package. const shortReportingInterval = 100 * time.Millisecond - opts := orca.ServiceOptions{MinReportingInterval: shortReportingInterval} + smr := orca.NewServerMetricsRecorder() + opts := orca.ServiceOptions{MinReportingInterval: shortReportingInterval, ServerMetricsProvider: smr} internal.AllowAnyMinReportingInterval.(func(*orca.ServiceOptions))(&opts) // Register the OpenRCAService with a very short metrics reporting interval. s := grpc.NewServer() - orcaSrv, err := orca.Register(s, opts) - if err != nil { + if err := orca.Register(s, opts); err != nil { t.Fatalf("orca.EnableOutOfBandMetricsReportingForTesting() failed: %v", err) } // Register the test service implementation on the same grpc server, and start serving. - testgrpc.RegisterTestServiceServer(s, &testServiceImpl{orcaSrv: orcaSrv}) + testgrpc.RegisterTestServiceServer(s, &testServiceImpl{smr: smr}) go s.Serve(lis) defer s.Stop() t.Logf("Started gRPC server at %s...", lis.Addr().String())