Skip to content

Commit

Permalink
Capture exception logic on the GRPC server side
Browse files Browse the repository at this point in the history
fixes gh-4490
  • Loading branch information
marcingrzejszczak committed Jan 16, 2024
1 parent ecd133a commit 34f993c
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,23 @@ class ObservationGrpcServerCallListener<RespT> extends SimpleForwardingServerCal
@Override
public void onMessage(RespT message) {
this.observation.event(GrpcServerEvents.MESSAGE_RECEIVED);
try (Scope scope = observation.openScope()) {
super.onMessage(message);
}
this.observation.scoped(() -> super.onMessage(message));
}

@Override
public void onHalfClose() {
try (Scope scope = observation.openScope()) {
super.onHalfClose();
}
this.observation.scoped(super::onHalfClose);
}

@Override
public void onCancel() {
try (Scope scope = this.observation.openScope()) {
super.onCancel();
}
catch (Exception exception) {
this.observation.error(exception);
throw exception;
}
finally {
this.observation.stop();
}
Expand All @@ -66,16 +66,18 @@ public void onComplete() {
try (Scope scope = this.observation.openScope()) {
super.onComplete();
}
catch (Exception exception) {
this.observation.error(exception);
throw exception;
}
finally {
this.observation.stop();
}
}

@Override
public void onReady() {
try (Scope scope = this.observation.openScope()) {
super.onReady();
}
this.observation.scoped(super::onReady);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@
import io.micrometer.observation.Observation.Context;
import io.micrometer.observation.Observation.Event;
import io.micrometer.observation.ObservationHandler;
import io.micrometer.observation.ObservationRegistry;
import io.micrometer.observation.ObservationTextPublisher;
import io.micrometer.observation.tck.TestObservationRegistry;
import io.micrometer.observation.tck.TestObservationRegistryAssert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
Expand Down Expand Up @@ -82,13 +83,14 @@ class GrpcObservationTest {

ObservationGrpcClientInterceptor clientInterceptor;

TestObservationRegistry observationRegistry = TestObservationRegistry.create();

@BeforeEach
void setUp() {
serverHandler = new ContextAndEventHoldingObservationHandler<>(GrpcServerObservationContext.class);
clientHandler = new ContextAndEventHoldingObservationHandler<>(GrpcClientObservationContext.class);

MeterRegistry meterRegistry = new SimpleMeterRegistry();
ObservationRegistry observationRegistry = ObservationRegistry.create();

observationRegistry.observationConfig()
.observationHandler(new ObservationTextPublisher())
.observationHandler(new DefaultMeterObservationHandler(meterRegistry))
Expand Down Expand Up @@ -341,10 +343,13 @@ void unaryRpcFailure() {
MethodType.UNARY);
verifyClientContext("grpc.testing.SimpleService", "UnaryRpc", "grpc.testing.SimpleService/UnaryRpc",
MethodType.UNARY);
assertThat(serverHandler.getContext().getStatusCode()).isEqualTo(Code.UNIMPLEMENTED);
assertThat(clientHandler.getContext().getStatusCode()).isEqualTo(Code.UNIMPLEMENTED);
assertThat(serverHandler.getContext().getStatusCode()).isNull();
assertThat(clientHandler.getContext().getStatusCode()).isEqualTo(Code.UNKNOWN);
assertThat(serverHandler.getEvents()).containsExactly(GrpcServerEvents.MESSAGE_RECEIVED);
assertThat(clientHandler.getEvents()).containsExactly(GrpcClientEvents.MESSAGE_SENT);
TestObservationRegistryAssert.assertThat(observationRegistry)
.hasAnObservation(
observationContextAssert -> observationContextAssert.hasNameEqualTo("grpc.server").hasError());
}

@Test
Expand All @@ -362,10 +367,13 @@ void clientStreamingRpcFailure() {
"grpc.testing.SimpleService/ClientStreamingRpc", MethodType.CLIENT_STREAMING);
verifyServerContext("grpc.testing.SimpleService", "ClientStreamingRpc",
"grpc.testing.SimpleService/ClientStreamingRpc", MethodType.CLIENT_STREAMING);
assertThat(clientHandler.getContext().getStatusCode()).isEqualTo(Code.UNIMPLEMENTED);
assertThat(serverHandler.getContext().getStatusCode()).isEqualTo(Code.UNIMPLEMENTED);
assertThat(clientHandler.getContext().getStatusCode()).isEqualTo(Code.UNKNOWN);
assertThat(serverHandler.getContext().getStatusCode()).isNull();
assertThat(clientHandler.getEvents()).isEmpty();
assertThat(serverHandler.getEvents()).isEmpty();
TestObservationRegistryAssert.assertThat(observationRegistry)
.hasAnObservation(
observationContextAssert -> observationContextAssert.hasNameEqualTo("grpc.server").hasError());
}

@Test
Expand All @@ -385,10 +393,13 @@ void serverStreamingRpcFailure() {
"grpc.testing.SimpleService/ServerStreamingRpc", MethodType.SERVER_STREAMING);
verifyServerContext("grpc.testing.SimpleService", "ServerStreamingRpc",
"grpc.testing.SimpleService/ServerStreamingRpc", MethodType.SERVER_STREAMING);
assertThat(clientHandler.getContext().getStatusCode()).isEqualTo(Code.UNIMPLEMENTED);
assertThat(serverHandler.getContext().getStatusCode()).isEqualTo(Code.UNIMPLEMENTED);
assertThat(clientHandler.getContext().getStatusCode()).isEqualTo(Code.UNKNOWN);
assertThat(serverHandler.getContext().getStatusCode()).isNull();
assertThat(clientHandler.getEvents()).containsExactly(GrpcClientEvents.MESSAGE_SENT);
assertThat(serverHandler.getEvents()).containsExactly(GrpcServerEvents.MESSAGE_RECEIVED);
TestObservationRegistryAssert.assertThat(observationRegistry)
.hasAnObservation(
observationContextAssert -> observationContextAssert.hasNameEqualTo("grpc.server").hasError());
}

@Test
Expand All @@ -407,10 +418,13 @@ void bidiStreamingRpcFailure() {
"grpc.testing.SimpleService/BidiStreamingRpc", MethodType.BIDI_STREAMING);
verifyServerContext("grpc.testing.SimpleService", "BidiStreamingRpc",
"grpc.testing.SimpleService/BidiStreamingRpc", MethodType.BIDI_STREAMING);
assertThat(clientHandler.getContext().getStatusCode()).isEqualTo(Code.UNIMPLEMENTED);
assertThat(serverHandler.getContext().getStatusCode()).isEqualTo(Code.UNIMPLEMENTED);
assertThat(clientHandler.getContext().getStatusCode()).isEqualTo(Code.UNKNOWN);
assertThat(serverHandler.getContext().getStatusCode()).isNull();
assertThat(clientHandler.getEvents()).isEmpty();
assertThat(serverHandler.getEvents()).isEmpty();
TestObservationRegistryAssert.assertThat(observationRegistry)
.hasAnObservation(
observationContextAssert -> observationContextAssert.hasNameEqualTo("grpc.server").hasError());
}

private StreamObserver<SimpleResponse> createResponseObserver(AtomicBoolean errored) {
Expand Down Expand Up @@ -534,6 +548,26 @@ public void onCompleted() {
// Default implementation in the parent class throws UNIMPLEMENTED error
static class ExceptionService extends SimpleServiceImplBase {

@Override
public void unaryRpc(SimpleRequest request, StreamObserver<SimpleResponse> responseObserver) {
throw new IllegalStateException("Boom!");
}

@Override
public StreamObserver<SimpleRequest> clientStreamingRpc(StreamObserver<SimpleResponse> responseObserver) {
throw new IllegalStateException("Boom!");
}

@Override
public void serverStreamingRpc(SimpleRequest request, StreamObserver<SimpleResponse> responseObserver) {
throw new IllegalStateException("Boom!");
}

@Override
public StreamObserver<SimpleRequest> bidiStreamingRpc(StreamObserver<SimpleResponse> responseObserver) {
throw new IllegalStateException("Boom!");
}

}

// Hold reference to the Context and Events happened in ObservationHandler
Expand Down

0 comments on commit 34f993c

Please sign in to comment.