Skip to content

Commit

Permalink
stats/opencensus: Add per call latency metric (#6017)
Browse files Browse the repository at this point in the history
  • Loading branch information
zasweq committed Feb 16, 2023
1 parent 0f02ca5 commit abff344
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 5 deletions.
36 changes: 36 additions & 0 deletions rpc_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ type callInfo struct {
contentSubtype string
codec baseCodec
maxRetryRPCBufferSize int
onFinish []func(err error)
}

func defaultCallInfo() *callInfo {
Expand Down Expand Up @@ -295,6 +296,41 @@ func (o FailFastCallOption) before(c *callInfo) error {
}
func (o FailFastCallOption) after(c *callInfo, attempt *csAttempt) {}

// OnFinish returns a CallOption that configures a callback to be called when
// the call completes. The error passed to the callback is the status of the
// RPC, and may be nil. The onFinish callback provided will only be called once
// by gRPC. This is mainly used to be used by streaming interceptors, to be
// notified when the RPC completes along with information about the status of
// the RPC.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func OnFinish(onFinish func(err error)) CallOption {
return OnFinishCallOption{
OnFinish: onFinish,
}
}

// OnFinishCallOption is CallOption that indicates a callback to be called when
// the call completes.
//
// # Experimental
//
// Notice: This type is EXPERIMENTAL and may be changed or removed in a
// later release.
type OnFinishCallOption struct {
OnFinish func(error)
}

func (o OnFinishCallOption) before(c *callInfo) error {
c.onFinish = append(c.onFinish, o.OnFinish)
return nil
}

func (o OnFinishCallOption) after(c *callInfo, attempt *csAttempt) {}

// MaxCallRecvMsgSize returns a CallOption which sets the maximum message size
// in bytes the client can receive. If this is not set, gRPC uses the default
// 4MB.
Expand Down
14 changes: 14 additions & 0 deletions stats/opencensus/client_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ var (
clientRoundtripLatency = stats.Float64("grpc.io/client/roundtrip_latency", "Time between first byte of request sent to last byte of response received, or terminal error.", stats.UnitMilliseconds)
clientStartedRPCs = stats.Int64("grpc.io/client/started_rpcs", "The total number of client RPCs ever opened, including those that have not completed.", stats.UnitDimensionless)
clientServerLatency = stats.Float64("grpc.io/client/server_latency", `Propagated from the server and should have the same value as "grpc.io/server/latency".`, stats.UnitMilliseconds)
// Per call measure:
clientAPILatency = stats.Float64("grpc.io/client/api_latency", "The end-to-end time the gRPC library takes to complete an RPC from the application’s perspective", stats.UnitMilliseconds)
)

var (
Expand Down Expand Up @@ -103,6 +105,18 @@ var (
TagKeys: []tag.Key{keyClientMethod},
Aggregation: millisecondsDistribution,
}

// The following metric is per call:

// ClientAPILatencyView is the distribution of client api latency for the
// full RPC call, keyed on method and status.
ClientAPILatencyView = &view.View{
Measure: clientAPILatency,
Name: "grpc.io/client/api_latency",
Description: "Distribution of client api latency, by method and status",
TagKeys: []tag.Key{keyClientMethod, keyClientStatus},
Aggregation: millisecondsDistribution,
}
)

// DefaultClientViews is the set of client views which are considered the
Expand Down
7 changes: 6 additions & 1 deletion stats/opencensus/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ func (s) TestAllMetricsOneFunction(t *testing.T) {
ServerReceivedMessagesPerRPCView,
ClientRoundtripLatencyView,
ServerLatencyView,
ClientAPILatencyView,
}
view.Register(allViews...)
// Unregister unconditionally in this defer to correctly cleanup globals in
Expand Down Expand Up @@ -760,6 +761,10 @@ func (s) TestAllMetricsOneFunction(t *testing.T) {
{
metric: ServerLatencyView,
},
// Per call metrics:
{
metric: ClientAPILatencyView,
},
}
// Unregister all the views. Unregistering a view causes a synchronous
// upload of any collected data for the view to any registered exporters.
Expand All @@ -780,7 +785,7 @@ func (s) TestAllMetricsOneFunction(t *testing.T) {
// declare the exact data you want, make sure the latency
// measurement points for the two RPCs above fall within buckets
// that fall into less than 5 seconds, which is the rpc timeout.
if metricName == "grpc.io/client/roundtrip_latency" || metricName == "grpc.io/server/server_latency" {
if metricName == "grpc.io/client/roundtrip_latency" || metricName == "grpc.io/server/server_latency" || metricName == "grpc.io/client/api_latency" {
// RPCs have a context timeout of 5s, so all the recorded
// measurements (one per RPC - two total) should fall within 5
// second buckets.
Expand Down
47 changes: 43 additions & 4 deletions stats/opencensus/opencensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@ package opencensus

import (
"context"
"time"

ocstats "go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.opencensus.io/trace"

"google.golang.org/grpc"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
)

var (
Expand Down Expand Up @@ -78,15 +82,50 @@ func ServerOption(to TraceOptions) grpc.ServerOption {
}

// unaryInterceptor handles per RPC context management. It also handles per RPC
// tracing and stats.
// tracing and stats, and records the latency for the full RPC call.
func unaryInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
return invoker(ctx, method, req, reply, cc, opts...)
startTime := time.Now()
err := invoker(ctx, method, req, reply, cc, opts...)
callLatency := float64(time.Since(startTime)) / float64(time.Millisecond)

ocstats.RecordWithOptions(ctx,
ocstats.WithTags(
tag.Upsert(keyClientMethod, removeLeadingSlash(method)),
tag.Upsert(keyClientStatus, canonicalString(status.Code(err))),
),
ocstats.WithMeasurements(
clientAPILatency.M(callLatency),
),
)

return err
}

// streamInterceptor handles per RPC context management. It also handles per RPC
// tracing and stats.
// tracing and stats, and records the latency for the full RPC call.
func streamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
return streamer(ctx, desc, cc, method, opts...)
startTime := time.Now()

callback := func(err error) {
callLatency := float64(time.Since(startTime)) / float64(time.Millisecond)
ocstats.RecordWithOptions(context.Background(),
ocstats.WithTags(
tag.Upsert(keyClientMethod, method),
tag.Upsert(keyClientStatus, canonicalString(status.Code(err))),
),
ocstats.WithMeasurements(
clientAPILatency.M(callLatency),
),
)
}

opts = append([]grpc.CallOption{grpc.OnFinish(callback)}, opts...)

s, err := streamer(ctx, desc, cc, method, opts...)
if err != nil {
return nil, err
}
return s, nil
}

type rpcInfo struct {
Expand Down
3 changes: 3 additions & 0 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -971,6 +971,9 @@ func (cs *clientStream) finish(err error) {
return
}
cs.finished = true
for _, onFinish := range cs.callInfo.onFinish {
onFinish(err)
}
cs.commitAttemptLocked()
if cs.attempt != nil {
cs.attempt.finish(err)
Expand Down

0 comments on commit abff344

Please sign in to comment.