Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

orca: allow a ServerMetricsProvider to be passed to the ORCA service and ServerOption #6223

Merged
merged 2 commits into from
May 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 13 additions & 9 deletions examples/features/orca/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ type server struct {

func (s *server) UnaryEcho(ctx context.Context, in *pb.EchoRequest) (*pb.EchoResponse, error) {
// Report a sample cost for this query.
cmr := orca.CallMetricRecorderFromContext(ctx)
cmr := orca.CallMetricsRecorderFromContext(ctx)
if cmr == nil {
return nil, status.Errorf(codes.Internal, "unable to retrieve call metric recorder (missing ORCA ServerOption?)")
return nil, status.Errorf(codes.Internal, "unable to retrieve call metrics recorder (missing ORCA ServerOption?)")
}
cmr.SetRequestCost("db_queries", 10)

Expand All @@ -63,27 +63,31 @@ func main() {
fmt.Printf("Server listening at %v\n", lis.Addr())

// Create the gRPC server with the orca.CallMetricsServerOption() option,
// which will enable per-call metric recording.
s := grpc.NewServer(orca.CallMetricsServerOption())
// which will enable per-call metric recording. No ServerMetricsProvider
// is given here because the out-of-band reporting is enabled separately.
s := grpc.NewServer(orca.CallMetricsServerOption(nil))
pb.RegisterEchoServer(s, &server{})

// Register the orca service for out-of-band metric reporting, and set the
// minimum reporting interval to 3 seconds. Note that, by default, the
// minimum interval must be at least 30 seconds, but 3 seconds is set via
// an internal-only option for illustration purposes only.
opts := orca.ServiceOptions{MinReportingInterval: 3 * time.Second}
smr := orca.NewServerMetricsRecorder()
opts := orca.ServiceOptions{
MinReportingInterval: 3 * time.Second,
ServerMetricsProvider: smr,
}
internal.ORCAAllowAnyMinReportingInterval.(func(so *orca.ServiceOptions))(&opts)
orcaSvc, err := orca.Register(s, opts)
if err != nil {
if err := orca.Register(s, opts); err != nil {
log.Fatalf("Failed to register ORCA service: %v", err)
}

// Simulate CPU utilization reporting.
go func() {
for {
orcaSvc.SetCPUUtilization(.5)
smr.SetCPUUtilization(.5)
time.Sleep(2 * time.Second)
orcaSvc.SetCPUUtilization(.9)
smr.SetCPUUtilization(.9)
time.Sleep(2 * time.Second)
}
}()
Expand Down
130 changes: 0 additions & 130 deletions orca/call_metric_recorder.go

This file was deleted.

196 changes: 196 additions & 0 deletions orca/call_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package orca

import (
"context"
"sync"

"google.golang.org/grpc"
grpcinternal "google.golang.org/grpc/internal"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/orca/internal"
"google.golang.org/protobuf/proto"
)

// CallMetricsRecorder allows a service method handler to record per-RPC
// metrics. It contains all utilization-based metrics from
// ServerMetricsRecorder as well as additional request cost metrics.
type CallMetricsRecorder interface {
ServerMetricsRecorder

// SetRequestCost sets the relevant server metric.
SetRequestCost(name string, val float64)
// DeleteRequestCost deletes the relevant server metric to prevent it
// from being sent.
DeleteRequestCost(name string)

// SetNamedMetric sets the relevant server metric.
SetNamedMetric(name string, val float64)
// DeleteNamedMetric deletes the relevant server metric to prevent it
// from being sent.
DeleteNamedMetric(name string)
}

type callMetricsRecorderCtxKey struct{}

// CallMetricsRecorderFromContext returns the RPC-specific custom metrics
// recorder embedded in the provided RPC context.
//
// Returns nil if no custom metrics recorder is found in the provided context,
// which will be the case when custom metrics reporting is not enabled.
func CallMetricsRecorderFromContext(ctx context.Context) CallMetricsRecorder {
rw, ok := ctx.Value(callMetricsRecorderCtxKey{}).(*recorderWrapper)
if !ok {
return nil
}
return rw.recorder()
}

// recorderWrapper is a wrapper around a CallMetricsRecorder to ensure that
// concurrent calls to CallMetricsRecorderFromContext() results in only one
// allocation of the underlying metrics recorder, while also allowing for lazy
// initialization of the recorder itself.
type recorderWrapper struct {
once sync.Once
r CallMetricsRecorder
smp ServerMetricsProvider
}

func (rw *recorderWrapper) recorder() CallMetricsRecorder {
rw.once.Do(func() {
rw.r = newServerMetricsRecorder()
})
return rw.r
}

// setTrailerMetadata adds a trailer metadata entry with key being set to
// `internal.TrailerMetadataKey` and value being set to the binary-encoded
// orca.OrcaLoadReport protobuf message.
//
// This function is called from the unary and streaming interceptors defined
// above. Any errors encountered here are not propagated to the caller because
// they are ignored there. Hence we simply log any errors encountered here at
// warning level, and return nothing.
func (rw *recorderWrapper) setTrailerMetadata(ctx context.Context) {
var sm *ServerMetrics
if rw.smp != nil {
sm = rw.smp.ServerMetrics()
sm.merge(rw.r.ServerMetrics())
} else {
sm = rw.r.ServerMetrics()
}

b, err := proto.Marshal(sm.toLoadReportProto())
if err != nil {
logger.Warningf("Failed to marshal load report: %v", err)
return
}
if err := grpc.SetTrailer(ctx, metadata.Pairs(internal.TrailerMetadataKey, string(b))); err != nil {
logger.Warningf("Failed to set trailer metadata: %v", err)
}
}

var joinServerOptions = grpcinternal.JoinServerOptions.(func(...grpc.ServerOption) grpc.ServerOption)

// CallMetricsServerOption returns a server option which enables the reporting
// of per-RPC custom backend metrics for unary and streaming RPCs.
//
// Server applications interested in injecting custom backend metrics should
// pass the server option returned from this function as the first argument to
// grpc.NewServer().
//
// Subsequently, server RPC handlers can retrieve a reference to the RPC
// specific custom metrics recorder [CallMetricsRecorder] to be used, via a call
// to CallMetricsRecorderFromContext(), and inject custom metrics at any time
// during the RPC lifecycle.
//
// The injected custom metrics will be sent as part of trailer metadata, as a
// binary-encoded [ORCA LoadReport] protobuf message, with the metadata key
// being set be "endpoint-load-metrics-bin".
//
// If a non-nil ServerMetricsProvider is provided, the gRPC server will
// transmit the metrics it provides, overwritten by any per-RPC metrics given
// to the CallMetricsRecorder. A ServerMetricsProvider is typically obtained
// by calling NewServerMetricsRecorder.
//
// [ORCA LoadReport]: https://github.com/cncf/xds/blob/main/xds/data/orca/v3/orca_load_report.proto#L15
func CallMetricsServerOption(smp ServerMetricsProvider) grpc.ServerOption {
return joinServerOptions(grpc.ChainUnaryInterceptor(unaryInt(smp)), grpc.ChainStreamInterceptor(streamInt(smp)))
}

func unaryInt(smp ServerMetricsProvider) func(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
return func(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
// We don't allocate the metric recorder here. It will be allocated the
// first time the user calls CallMetricsRecorderFromContext().
rw := &recorderWrapper{smp: smp}
ctxWithRecorder := newContextWithRecorderWrapper(ctx, rw)

resp, err := handler(ctxWithRecorder, req)

// It is safe to access the underlying metric recorder inside the wrapper at
// this point, as the user's RPC handler is done executing, and therefore
// there will be no more calls to CallMetricsRecorderFromContext(), which is
// where the metric recorder is lazy allocated.
if rw.r != nil {
rw.setTrailerMetadata(ctx)
}
return resp, err
}
}

func streamInt(smp ServerMetricsProvider) func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
// We don't allocate the metric recorder here. It will be allocated the
// first time the user calls CallMetricsRecorderFromContext().
rw := &recorderWrapper{smp: smp}
ws := &wrappedStream{
ServerStream: ss,
ctx: newContextWithRecorderWrapper(ss.Context(), rw),
}

err := handler(srv, ws)

// It is safe to access the underlying metric recorder inside the wrapper at
// this point, as the user's RPC handler is done executing, and therefore
// there will be no more calls to CallMetricsRecorderFromContext(), which is
// where the metric recorder is lazy allocated.
if rw.r != nil {
rw.setTrailerMetadata(ss.Context())
}
return err
}
}

func newContextWithRecorderWrapper(ctx context.Context, r *recorderWrapper) context.Context {
return context.WithValue(ctx, callMetricsRecorderCtxKey{}, r)
}

// wrappedStream wraps the grpc.ServerStream received by the streaming
// interceptor. Overrides only the Context() method to return a context which
// contains a reference to the CallMetricsRecorder corresponding to this
// stream.
type wrappedStream struct {
grpc.ServerStream
ctx context.Context
}

func (w *wrappedStream) Context() context.Context {
return w.ctx
}