From 83983c78d13f305b5504c20476068d4d57fee4c4 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Fri, 9 Dec 2022 09:34:56 -0800 Subject: [PATCH 1/5] balancer: support injection of per-call metadata from LB policies --- balancer/balancer.go | 14 +++++ clientconn.go | 2 +- picker_wrapper.go | 28 ++++++---- stream.go | 25 +++++---- test/balancer_test.go | 120 ++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 167 insertions(+), 22 deletions(-) diff --git a/balancer/balancer.go b/balancer/balancer.go index 392b21fb2d8..06e055a5e5d 100644 --- a/balancer/balancer.go +++ b/balancer/balancer.go @@ -233,6 +233,11 @@ type PickInfo struct { FullMethodName string // Ctx is the RPC's context, and may contain relevant RPC-level information // like the outgoing header metadata. + // + // If an LB policy wishes to mutate the outgoing header metadata on a + // per-call basis, it could get the existing outgoing metadata from this + // context using a call to metadata.FromOutgoingContext(), mutate it and + // pass it back in the Metadata field of PickResult. Ctx context.Context } @@ -279,6 +284,15 @@ type PickResult struct { // type, Done may not be called. May be nil if the balancer does not wish // to be notified when the RPC completes. Done func(DoneInfo) + + // Metadata provides a way for LB policies to inject arbitrary per-call + // metadata. If this is non-nil, it will be used as-is and will overwrite + // any existing metadata stored in the RPC context. LB policies are advised + // to use this with caution. See PickInfo.Ctx for more details. + // + // LB policies with child policies are responsible for propagating metadata + // injected by their children to the ClientConn, as part of Pick(). + Metatada metadata.MD } // TransientFailureError returns e. It exists for backward compatibility and diff --git a/clientconn.go b/clientconn.go index 78c81a108ed..a23f6ba4881 100644 --- a/clientconn.go +++ b/clientconn.go @@ -934,7 +934,7 @@ func (cc *ClientConn) healthCheckConfig() *healthCheckConfig { return cc.sc.healthCheckConfig } -func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) { +func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, balancer.PickResult, error) { return cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{ Ctx: ctx, FullMethodName: method, diff --git a/picker_wrapper.go b/picker_wrapper.go index a5d5516ee06..c525dc070fc 100644 --- a/picker_wrapper.go +++ b/picker_wrapper.go @@ -58,12 +58,18 @@ func (pw *pickerWrapper) updatePicker(p balancer.Picker) { pw.mu.Unlock() } -func doneChannelzWrapper(acw *acBalancerWrapper, done func(balancer.DoneInfo)) func(balancer.DoneInfo) { +// doneChannelzWrapper performs the following: +// - increments the calls started channelz counter +// - wraps the done function in the passed in result to increment the calls +// failed or calls succeeded channelz counter before invoking the actual +// done function. +func doneChannelzWrapper(acw *acBalancerWrapper, result *balancer.PickResult) { acw.mu.Lock() ac := acw.ac acw.mu.Unlock() ac.incrCallsStarted() - return func(b balancer.DoneInfo) { + done := result.Done + result.Done = func(b balancer.DoneInfo) { if b.Err != nil && b.Err != io.EOF { ac.incrCallsFailed() } else { @@ -82,7 +88,7 @@ func doneChannelzWrapper(acw *acBalancerWrapper, done func(balancer.DoneInfo)) f // - the current picker returns other errors and failfast is false. // - the subConn returned by the current picker is not READY // When one of these situations happens, pick blocks until the picker gets updated. -func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, func(balancer.DoneInfo), error) { +func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, balancer.PickResult, error) { var ch chan struct{} var lastPickErr error @@ -90,7 +96,7 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer. pw.mu.Lock() if pw.done { pw.mu.Unlock() - return nil, nil, ErrClientConnClosing + return nil, balancer.PickResult{}, ErrClientConnClosing } if pw.picker == nil { @@ -111,9 +117,9 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer. } switch ctx.Err() { case context.DeadlineExceeded: - return nil, nil, status.Error(codes.DeadlineExceeded, errStr) + return nil, balancer.PickResult{}, status.Error(codes.DeadlineExceeded, errStr) case context.Canceled: - return nil, nil, status.Error(codes.Canceled, errStr) + return nil, balancer.PickResult{}, status.Error(codes.Canceled, errStr) } case <-ch: } @@ -125,7 +131,6 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer. pw.mu.Unlock() pickResult, err := p.Pick(info) - if err != nil { if err == balancer.ErrNoSubConnAvailable { continue @@ -136,7 +141,7 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer. if istatus.IsRestrictedControlPlaneCode(st) { err = status.Errorf(codes.Internal, "received picker error with illegal status: %v", err) } - return nil, nil, dropError{error: err} + return nil, balancer.PickResult{}, dropError{error: err} } // For all other errors, wait for ready RPCs should block and other // RPCs should fail with unavailable. @@ -144,7 +149,7 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer. lastPickErr = err continue } - return nil, nil, status.Error(codes.Unavailable, err.Error()) + return nil, balancer.PickResult{}, status.Error(codes.Unavailable, err.Error()) } acw, ok := pickResult.SubConn.(*acBalancerWrapper) @@ -154,9 +159,10 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer. } if t := acw.getAddrConn().getReadyTransport(); t != nil { if channelz.IsOn() { - return t, doneChannelzWrapper(acw, pickResult.Done), nil + doneChannelzWrapper(acw, &pickResult) + return t, pickResult, nil } - return t, pickResult.Done, nil + return t, pickResult, nil } if pickResult.Done != nil { // Calling done with nil error, no bytes sent and no bytes received. diff --git a/stream.go b/stream.go index 0f8e6c0149d..b22e124e274 100644 --- a/stream.go +++ b/stream.go @@ -438,7 +438,7 @@ func (a *csAttempt) getTransport() error { cs := a.cs var err error - a.t, a.done, err = cs.cc.getTransport(a.ctx, cs.callInfo.failFast, cs.callHdr.Method) + a.t, a.pickResult, err = cs.cc.getTransport(a.ctx, cs.callInfo.failFast, cs.callHdr.Method) if err != nil { if de, ok := err.(dropError); ok { err = de.error @@ -455,7 +455,12 @@ func (a *csAttempt) getTransport() error { func (a *csAttempt) newStream() error { cs := a.cs cs.callHdr.PreviousAttempts = cs.numRetries - s, err := a.t.NewStream(a.ctx, cs.callHdr) + // Metadata from PickResult takes precedence over metadata from RPC context. + ctx := a.ctx + if a.pickResult.Metatada != nil { + ctx = metadata.NewOutgoingContext(a.ctx, a.pickResult.Metatada) + } + s, err := a.t.NewStream(ctx, cs.callHdr) if err != nil { nse, ok := err.(*transport.NewStreamError) if !ok { @@ -529,12 +534,12 @@ type clientStream struct { // csAttempt implements a single transport stream attempt within a // clientStream. type csAttempt struct { - ctx context.Context - cs *clientStream - t transport.ClientTransport - s *transport.Stream - p *parser - done func(balancer.DoneInfo) + ctx context.Context + cs *clientStream + t transport.ClientTransport + s *transport.Stream + p *parser + pickResult balancer.PickResult finished bool dc Decompressor @@ -1103,12 +1108,12 @@ func (a *csAttempt) finish(err error) { tr = a.s.Trailer() } - if a.done != nil { + if a.pickResult.Done != nil { br := false if a.s != nil { br = a.s.BytesReceived() } - a.done(balancer.DoneInfo{ + a.pickResult.Done(balancer.DoneInfo{ Err: err, Trailer: tr, BytesSent: a.s != nil, diff --git a/test/balancer_test.go b/test/balancer_test.go index c919f1e0f7c..5e43fb3b9f0 100644 --- a/test/balancer_test.go +++ b/test/balancer_test.go @@ -866,3 +866,123 @@ func (s) TestAuthorityInBuildOptions(t *testing.T) { }) } } + +// wrappedPickFirstBalancerBuilder builds a custom balancer which wraps an +// underlying pick_first balancer. +type wrappedPickFirstBalancerBuilder struct { + name string +} + +func (*wrappedPickFirstBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { + builder := balancer.Get(grpc.PickFirstBalancerName) + wpfb := &wrappedPickFirstBalancer{ + ClientConn: cc, + } + pf := builder.Build(wpfb, opts) + wpfb.Balancer = pf + return wpfb +} + +func (wbb *wrappedPickFirstBalancerBuilder) Name() string { + return wbb.name +} + +// wrappedPickFirstBalancer contains a pick_first balancer and forwards all +// calls from the ClientConn to it. For state updates from the pick_first +// balancer, it creates a custom picker which injects arbitrary metadata on a +// per-call basis. +type wrappedPickFirstBalancer struct { + balancer.Balancer + balancer.ClientConn +} + +func (wb *wrappedPickFirstBalancer) UpdateState(state balancer.State) { + state.Picker = &wrappedPicker{p: state.Picker} + wb.ClientConn.UpdateState(state) +} + +const ( + metadataHeaderInjectedByWrappedBalancer = "metadata-header-injected-by-wrapped-balancer" + metadataValueInjectedByWrappedBalancer = "metadata-value-injected-by-wrapped-balancer" +) + +// wrappedPicker wraps the picker returned by the pick_first +type wrappedPicker struct { + p balancer.Picker +} + +func (wp *wrappedPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { + res, err := wp.p.Pick(info) + if err != nil { + return balancer.PickResult{}, err + } + + if res.Metatada == nil { + res.Metatada = metadata.Pairs(metadataHeaderInjectedByWrappedBalancer, metadataValueInjectedByWrappedBalancer) + } else { + res.Metatada.Append(metadataHeaderInjectedByWrappedBalancer, metadataValueInjectedByWrappedBalancer) + } + return res, nil +} + +// TestMetadataInPickResult tests the scenario where an LB policy inject +// arbitrary metadata on a per-call basis and verifies that the injected +// metadata makes it all the way to the server RPC handler. +func (s) TestMetadataInPickResult(t *testing.T) { + mdChan := make(chan []string, 1) + ss := &stubserver.StubServer{ + EmptyCallF: func(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) { + md, ok := metadata.FromIncomingContext(ctx) + if ok { + select { + case mdChan <- md[metadataHeaderInjectedByWrappedBalancer]: + case <-ctx.Done(): + return nil, ctx.Err() + } + } + return &testpb.Empty{}, nil + }, + } + if err := ss.StartServer(); err != nil { + t.Fatalf("Starting test backend: %v", err) + } + defer ss.Stop() + t.Logf("Started test backend at %q", ss.Address) + + b := &wrappedPickFirstBalancerBuilder{name: t.Name() + "wrappedPickFirstBalancer"} + balancer.Register(b) + t.Logf("Registered test balancer with name %q", b.Name()) + + r := manual.NewBuilderWithScheme("whatever") + r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: ss.Address}}}) + dopts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithResolvers(r), + grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, b.Name())), + } + cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...) + if err != nil { + t.Fatalf("grpc.Dial(): %v", err) + } + defer cc.Close() + tc := testpb.NewTestServiceClient(cc) + t.Log("Created ClientConn to test backend") + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil { + t.Fatalf("EmptyCall() RPC: %v", err) + } + t.Log("EmptyCall() RPC succeeded") + + var gotMD []string + select { + case gotMD = <-mdChan: + case <-ctx.Done(): + t.Fatalf("Timed out waiting for custom metadata in test backend") + } + wantMD := []string{metadataValueInjectedByWrappedBalancer} + if !cmp.Equal(gotMD, wantMD) { + t.Fatalf("Mismatch in custom metadata received at test backend, got: %v, want %v", gotMD, wantMD) + } +} From 296c47778afc619e237ee593ab452d30c900b6e8 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Mon, 12 Dec 2022 12:07:06 -0800 Subject: [PATCH 2/5] merge metadata from LB policy instead of overwriting --- balancer/balancer.go | 10 ++------ stream.go | 15 ++++++++++-- test/balancer_test.go | 54 ++++++++++++++++++++++++++++--------------- 3 files changed, 50 insertions(+), 29 deletions(-) diff --git a/balancer/balancer.go b/balancer/balancer.go index 06e055a5e5d..09d61dd1b55 100644 --- a/balancer/balancer.go +++ b/balancer/balancer.go @@ -233,11 +233,6 @@ type PickInfo struct { FullMethodName string // Ctx is the RPC's context, and may contain relevant RPC-level information // like the outgoing header metadata. - // - // If an LB policy wishes to mutate the outgoing header metadata on a - // per-call basis, it could get the existing outgoing metadata from this - // context using a call to metadata.FromOutgoingContext(), mutate it and - // pass it back in the Metadata field of PickResult. Ctx context.Context } @@ -286,9 +281,8 @@ type PickResult struct { Done func(DoneInfo) // Metadata provides a way for LB policies to inject arbitrary per-call - // metadata. If this is non-nil, it will be used as-is and will overwrite - // any existing metadata stored in the RPC context. LB policies are advised - // to use this with caution. See PickInfo.Ctx for more details. + // metadata. Any metadata returned here will be merged with existing + // metadata added by the client application. // // LB policies with child policies are responsible for propagating metadata // injected by their children to the ClientConn, as part of Pick(). diff --git a/stream.go b/stream.go index b22e124e274..5f30d6c02f8 100644 --- a/stream.go +++ b/stream.go @@ -455,11 +455,22 @@ func (a *csAttempt) getTransport() error { func (a *csAttempt) newStream() error { cs := a.cs cs.callHdr.PreviousAttempts = cs.numRetries - // Metadata from PickResult takes precedence over metadata from RPC context. + + // Merge metadata stored in PickResult, if any, with existing call metadata. ctx := a.ctx if a.pickResult.Metatada != nil { - ctx = metadata.NewOutgoingContext(a.ctx, a.pickResult.Metatada) + // We currently do not have a function it the metadata package which + // merges given metadata with existing metadata in a context. Exising + // function `AppendToOutgoingContext()` takes a variadic argument of key + // value pairs. + // + // TODO: Make it possible to retrieve key value pairs from metadata.MD + // in a form passable to AppendToOutgoingContext(). + md, _ := metadata.FromOutgoingContext(ctx) + md = metadata.Join(md, a.pickResult.Metatada) + ctx = metadata.NewOutgoingContext(a.ctx, md) } + s, err := a.t.NewStream(ctx, cs.callHdr) if err != nil { nse, ok := err.(*transport.NewStreamError) diff --git a/test/balancer_test.go b/test/balancer_test.go index 5e43fb3b9f0..8b146dbb9ba 100644 --- a/test/balancer_test.go +++ b/test/balancer_test.go @@ -902,8 +902,10 @@ func (wb *wrappedPickFirstBalancer) UpdateState(state balancer.State) { } const ( - metadataHeaderInjectedByWrappedBalancer = "metadata-header-injected-by-wrapped-balancer" - metadataValueInjectedByWrappedBalancer = "metadata-value-injected-by-wrapped-balancer" + metadataHeaderInjectedByBalancer = "metadata-header-injected-by-balancer" + metadataHeaderInjectedByApplication = "metadata-header-injected-by-application" + metadataValueInjectedByBalancer = "metadata-value-injected-by-balancer" + metadataValueInjectedByApplication = "metadata-value-injected-by-application" ) // wrappedPicker wraps the picker returned by the pick_first @@ -918,9 +920,9 @@ func (wp *wrappedPicker) Pick(info balancer.PickInfo) (balancer.PickResult, erro } if res.Metatada == nil { - res.Metatada = metadata.Pairs(metadataHeaderInjectedByWrappedBalancer, metadataValueInjectedByWrappedBalancer) + res.Metatada = metadata.Pairs(metadataHeaderInjectedByBalancer, metadataValueInjectedByBalancer) } else { - res.Metatada.Append(metadataHeaderInjectedByWrappedBalancer, metadataValueInjectedByWrappedBalancer) + res.Metatada.Append(metadataHeaderInjectedByBalancer, metadataValueInjectedByBalancer) } return res, nil } @@ -929,16 +931,15 @@ func (wp *wrappedPicker) Pick(info balancer.PickInfo) (balancer.PickResult, erro // arbitrary metadata on a per-call basis and verifies that the injected // metadata makes it all the way to the server RPC handler. func (s) TestMetadataInPickResult(t *testing.T) { - mdChan := make(chan []string, 1) + t.Log("Starting test backend...") + mdChan := make(chan metadata.MD, 1) ss := &stubserver.StubServer{ EmptyCallF: func(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) { - md, ok := metadata.FromIncomingContext(ctx) - if ok { - select { - case mdChan <- md[metadataHeaderInjectedByWrappedBalancer]: - case <-ctx.Done(): - return nil, ctx.Err() - } + md, _ := metadata.FromIncomingContext(ctx) + select { + case mdChan <- md: + case <-ctx.Done(): + return nil, ctx.Err() } return &testpb.Empty{}, nil }, @@ -949,10 +950,12 @@ func (s) TestMetadataInPickResult(t *testing.T) { defer ss.Stop() t.Logf("Started test backend at %q", ss.Address) + name := t.Name() + "wrappedPickFirstBalancer" + t.Logf("Registering test balancer with name %q...", name) b := &wrappedPickFirstBalancerBuilder{name: t.Name() + "wrappedPickFirstBalancer"} balancer.Register(b) - t.Logf("Registered test balancer with name %q", b.Name()) + t.Log("Creating ClientConn to test backend...") r := manual.NewBuilderWithScheme("whatever") r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: ss.Address}}}) dopts := []grpc.DialOption{ @@ -966,23 +969,36 @@ func (s) TestMetadataInPickResult(t *testing.T) { } defer cc.Close() tc := testpb.NewTestServiceClient(cc) - t.Log("Created ClientConn to test backend") + t.Log("Making EmptyCall() RPC with custom metadata...") ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() + md := metadata.Pairs(metadataHeaderInjectedByApplication, metadataValueInjectedByApplication) + ctx = metadata.NewOutgoingContext(ctx, md) if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil { t.Fatalf("EmptyCall() RPC: %v", err) } t.Log("EmptyCall() RPC succeeded") - var gotMD []string + t.Log("Waiting for custom metadata to be received at the test backend...") + var gotMD metadata.MD select { case gotMD = <-mdChan: case <-ctx.Done(): - t.Fatalf("Timed out waiting for custom metadata in test backend") + t.Fatalf("Timed out waiting for custom metadata to be received at the test backend") } - wantMD := []string{metadataValueInjectedByWrappedBalancer} - if !cmp.Equal(gotMD, wantMD) { - t.Fatalf("Mismatch in custom metadata received at test backend, got: %v, want %v", gotMD, wantMD) + + t.Log("Veirfying custom metadata added by the client application is recieved at the test backend...") + wantMDVal := []string{metadataValueInjectedByApplication} + gotMDVal := gotMD.Get(metadataHeaderInjectedByApplication) + if !cmp.Equal(gotMDVal, wantMDVal) { + t.Fatalf("Mismatch in custom metadata received at test backend, got: %v, want %v", gotMDVal, wantMDVal) + } + + t.Log("Veirfying custom metadata added by the LB policy is recieved at the test backend...") + wantMDVal = []string{metadataValueInjectedByBalancer} + gotMDVal = gotMD.Get(metadataHeaderInjectedByBalancer) + if !cmp.Equal(gotMDVal, wantMDVal) { + t.Fatalf("Mismatch in custom metadata received at test backend, got: %v, want %v", gotMDVal, wantMDVal) } } From 64db6af01e0295fae39c7a6a9e26f06e05ba447e Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Mon, 12 Dec 2022 12:17:20 -0800 Subject: [PATCH 3/5] fix typos --- stream.go | 2 +- test/balancer_test.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/stream.go b/stream.go index 5f30d6c02f8..0ab5b0dbf9c 100644 --- a/stream.go +++ b/stream.go @@ -460,7 +460,7 @@ func (a *csAttempt) newStream() error { ctx := a.ctx if a.pickResult.Metatada != nil { // We currently do not have a function it the metadata package which - // merges given metadata with existing metadata in a context. Exising + // merges given metadata with existing metadata in a context. Existing // function `AppendToOutgoingContext()` takes a variadic argument of key // value pairs. // diff --git a/test/balancer_test.go b/test/balancer_test.go index 8b146dbb9ba..bd782ffa6e4 100644 --- a/test/balancer_test.go +++ b/test/balancer_test.go @@ -988,14 +988,14 @@ func (s) TestMetadataInPickResult(t *testing.T) { t.Fatalf("Timed out waiting for custom metadata to be received at the test backend") } - t.Log("Veirfying custom metadata added by the client application is recieved at the test backend...") + t.Log("Verifying custom metadata added by the client application is received at the test backend...") wantMDVal := []string{metadataValueInjectedByApplication} gotMDVal := gotMD.Get(metadataHeaderInjectedByApplication) if !cmp.Equal(gotMDVal, wantMDVal) { t.Fatalf("Mismatch in custom metadata received at test backend, got: %v, want %v", gotMDVal, wantMDVal) } - t.Log("Veirfying custom metadata added by the LB policy is recieved at the test backend...") + t.Log("Verifying custom metadata added by the LB policy is received at the test backend...") wantMDVal = []string{metadataValueInjectedByBalancer} gotMDVal = gotMD.Get(metadataHeaderInjectedByBalancer) if !cmp.Equal(gotMDVal, wantMDVal) { From 17b0110fe30c5c473fff6923c645094a8f7eb8e7 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Tue, 20 Dec 2022 09:21:51 -0800 Subject: [PATCH 4/5] overwrite the attempt's context --- stream.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/stream.go b/stream.go index 0ab5b0dbf9c..7bd76823f67 100644 --- a/stream.go +++ b/stream.go @@ -457,7 +457,9 @@ func (a *csAttempt) newStream() error { cs.callHdr.PreviousAttempts = cs.numRetries // Merge metadata stored in PickResult, if any, with existing call metadata. - ctx := a.ctx + // It is safe to overwrite the csAttempt's context here, since all state + // maintained in it are local to the attempt. When the attempt has to be + // retried, a new instance of csAttempt will be created. if a.pickResult.Metatada != nil { // We currently do not have a function it the metadata package which // merges given metadata with existing metadata in a context. Existing @@ -466,12 +468,12 @@ func (a *csAttempt) newStream() error { // // TODO: Make it possible to retrieve key value pairs from metadata.MD // in a form passable to AppendToOutgoingContext(). - md, _ := metadata.FromOutgoingContext(ctx) + md, _ := metadata.FromOutgoingContext(a.ctx) md = metadata.Join(md, a.pickResult.Metatada) - ctx = metadata.NewOutgoingContext(a.ctx, md) + a.ctx = metadata.NewOutgoingContext(a.ctx, md) } - s, err := a.t.NewStream(ctx, cs.callHdr) + s, err := a.t.NewStream(a.ctx, cs.callHdr) if err != nil { nse, ok := err.(*transport.NewStreamError) if !ok { From 23184d99a0a7a94f181a5e571bdee81adb79f3a1 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Tue, 20 Dec 2022 09:50:34 -0800 Subject: [PATCH 5/5] update TODO --- stream.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/stream.go b/stream.go index 7bd76823f67..175aee9583e 100644 --- a/stream.go +++ b/stream.go @@ -467,7 +467,8 @@ func (a *csAttempt) newStream() error { // value pairs. // // TODO: Make it possible to retrieve key value pairs from metadata.MD - // in a form passable to AppendToOutgoingContext(). + // in a form passable to AppendToOutgoingContext(), or create a version + // of AppendToOutgoingContext() that accepts a metadata.MD. md, _ := metadata.FromOutgoingContext(a.ctx) md = metadata.Join(md, a.pickResult.Metatada) a.ctx = metadata.NewOutgoingContext(a.ctx, md)