Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gRPC integration and aio interceptors #2369

Merged
merged 63 commits into from Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
c341091
feat: Add interceptor for async gRPC server
Apr 28, 2023
a554078
feat: Add exception handling to async gRPC interceptor
Apr 28, 2023
0296157
feat: Add gRPC integration with monkeypatch for synchronous client si…
fdellekart Aug 26, 2023
dfb8726
feat: Add patch with gRPC server side interceptors for grpc.server
fdellekart Aug 26, 2023
aaf705e
test: Add tests to verify gRPC integration does not break other inter…
fdellekart Aug 27, 2023
155e998
feat: Add monkeypatch for async server to gRPC integrations
fdellekart Aug 27, 2023
0b487e1
feat(grpc): Add async unary unary client interceptor
fdellekart Sep 1, 2023
ce73a36
feat(grpc): Add monkeypatching for async channels to integration
fdellekart Sep 1, 2023
96c2c91
test(grpc): Add test for aio client integration
fdellekart Sep 1, 2023
4103929
refactor: Avoid unnecessary code duplication in grpc integration
fdellekart Sep 10, 2023
a8ddcf6
fix: gRPC async metadata can be tuple although differently typed
fdellekart Sep 13, 2023
1861aa8
Merge branch 'master' into feat/grpc-aio-integration
antonpirker Sep 18, 2023
9f1bc88
refactor(grpc): consistent naming and imports
fdellekart Sep 22, 2023
fac3ee3
Merge branch 'master' into feat/grpc-aio-integration
antonpirker Sep 27, 2023
b43784f
Added pytest-asyncio to test deps
antonpirker Sep 27, 2023
34bad4c
Added types to linter requirements
antonpirker Sep 27, 2023
5a89d9b
Made mechanism type lowercase
antonpirker Sep 27, 2023
3c493bc
Merge branch 'master' into feat/grpc-aio-integration
antonpirker Sep 29, 2023
8341a0d
fix: typing of async gRPC integration
fdellekart Sep 29, 2023
69704d4
feat(gRPC): Add async unary-stream interceptor
fdellekart Sep 29, 2023
53bb01c
Merge branch 'master' into feat/grpc-aio-integration
antonpirker Oct 2, 2023
4cde781
Merge branch 'master' into feat/grpc-aio-integration
antonpirker Oct 2, 2023
ccacf0a
Trying to make typing work in older Python versions
antonpirker Oct 2, 2023
c1c0be7
The tests need the types.
antonpirker Oct 2, 2023
1dff2a7
Merge branch 'master' into feat/grpc-aio-integration
antonpirker Oct 2, 2023
4c71549
Fixed typo
antonpirker Oct 2, 2023
c734463
Merge branch 'feat/grpc-aio-integration' of github.com:fdellekart/sen…
antonpirker Oct 2, 2023
b0b80fc
Merge branch 'master' into feat/grpc-aio-integration
antonpirker Oct 3, 2023
d2e1bbe
Cleaned up tests
antonpirker Oct 3, 2023
d4ffa1f
Prevent flake8 from checking generated files
antonpirker Oct 3, 2023
d737830
Tell black to not check auto generated files.
antonpirker Oct 3, 2023
02cfbde
Added pyproject.toml to right directory
antonpirker Oct 3, 2023
4bf2c83
Fixed some linting
antonpirker Oct 3, 2023
0fb680e
Merge branch 'feat/grpc-aio-integration' of github.com:fdellekart/sen…
fdellekart Oct 3, 2023
2437f87
test(gRPC): Add test for unary stream client interceptor
fdellekart Oct 3, 2023
30ada2f
refactor(gRPC): Remove external APIs from server side interceptor
fdellekart Oct 3, 2023
8b3e7e6
Cleaned up tests
antonpirker Oct 4, 2023
9aba63e
Updated black config to work with pre-commit
antonpirker Oct 4, 2023
b625b2d
cleanup
antonpirker Oct 4, 2023
9c9f3d8
cleanup
antonpirker Oct 4, 2023
7cd1057
Merge branch 'feat/grpc-aio-integration' of github.com:fdellekart/sen…
fdellekart Oct 6, 2023
2276ee4
refactor: gRPC code generation script to be run from project root
fdellekart Oct 6, 2023
119e415
test: Add test endpoint for unary-stream gRPC method
fdellekart Oct 6, 2023
c465dcf
fix: Typos in interceptors and status code of stream response stuck f…
fdellekart Oct 6, 2023
aec6c7c
test: Add test for async unary-stream client interceptor
fdellekart Oct 6, 2023
7b09a67
fix(gRPC): Make sure that server side interceptor does not break unar…
fdellekart Oct 6, 2023
6a19737
feat(gRPC): Ensure backwards compatibility in case gRPC interceptors …
fdellekart Oct 6, 2023
545f26b
test(gRPC): Add tests for currently unsupported RPC types
fdellekart Oct 9, 2023
dac5b60
fix(gRPC): aio integration was breaking unsupported RPC types
fdellekart Oct 9, 2023
9a97c9a
Merge branch 'master' into feat/grpc-aio-integration
antonpirker Oct 11, 2023
ff2ff75
Merge branch 'master' into feat/grpc-aio-integration
antonpirker Oct 11, 2023
63b912e
Merge branch 'master' into feat/grpc-aio-integration
antonpirker Oct 12, 2023
7cb78df
Small typing fix
antonpirker Oct 12, 2023
b0e6d19
Small linting fix
antonpirker Oct 12, 2023
b678001
Merge branch 'master' into feat/grpc-aio-integration
antonpirker Nov 6, 2023
ff9372b
Merge branch 'master' into pr/fdellekart/2369
antonpirker Nov 6, 2023
afacd37
Consistent naming (was my fault I guess)
antonpirker Nov 6, 2023
7be1306
Set nicer transaction name for async server interceptor
antonpirker Nov 7, 2023
27ab22f
Merge branch 'master' into feat/grpc-aio-integration
antonpirker Nov 8, 2023
2247336
Merge branch 'master' into feat/grpc-aio-integration
antonpirker Nov 8, 2023
f83e610
Make linter happy
antonpirker Nov 8, 2023
1271459
Make mypy happy
antonpirker Nov 8, 2023
dddfa41
Merge branch 'master' into feat/grpc-aio-integration
antonpirker Nov 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
91 changes: 89 additions & 2 deletions sentry_sdk/integrations/grpc/__init__.py
@@ -1,2 +1,89 @@
from .server import ServerInterceptor # noqa: F401
from .client import ClientInterceptor # noqa: F401
from typing import List, Optional, Sequence, Callable, ParamSpec
from functools import wraps

import grpc
from grpc import Channel, Server, intercept_channel
from grpc.aio import Channel as AsyncChannel
from grpc.aio import Server as AsyncServer

from sentry_sdk.integrations import Integration
from .client import ClientInterceptor
from .server import ServerInterceptor
from .aio.server import ServerInterceptor as AsyncServerInterceptor
from .aio.client import AsyncClientInterceptor
fdellekart marked this conversation as resolved.
Show resolved Hide resolved

P = ParamSpec("P")


def _wrap_channel_sync(func: Callable[P, Channel]) -> Callable[P, Channel]:
"Wrapper for synchronous secure and insecure channel."

@wraps(func)
def patched_channel(*args, **kwargs) -> Channel:
channel = func(*args, **kwargs)
return intercept_channel(channel, ClientInterceptor())

return patched_channel


def _wrap_channel_async(func: Callable[P, AsyncChannel]) -> Callable[P, AsyncChannel]:
"Wrapper for asynchronous secure and insecure channel."

@wraps(func)
def patched_channel(
*args,
interceptors: Optional[Sequence[grpc.aio.ClientInterceptor]] = None,
**kwargs
) -> Channel:
interceptor = AsyncClientInterceptor()
interceptors = [interceptor, *(interceptors or [])]
return func(*args, interceptors=interceptors, **kwargs)

return patched_channel


def _wrap_sync_server(func: Callable[P, Server]) -> Callable[P, Server]:
"""Wrapper for synchronous server."""

@wraps(func)
def patched_server(
*args, interceptors: Optional[List[grpc.ServerInterceptor]] = None, **kwargs
) -> Server:
server_interceptor = ServerInterceptor()
interceptors = [server_interceptor, *(interceptors or [])]
return func(*args, interceptors=interceptors, **kwargs)

return patched_server


def _wrap_async_server(func: Callable[P, AsyncServer]) -> Callable[P, AsyncServer]:
"""Wrapper for asynchronous server."""

@wraps(func)
def patched_aio_server(
*args, interceptors: Optional[List[grpc.ServerInterceptor]] = None, **kwargs
) -> Server:
server_interceptor = AsyncServerInterceptor(
find_name=lambda request: request.__class__
)
interceptors = [server_interceptor, *(interceptors or [])]
return func(*args, interceptors=interceptors, **kwargs)

return patched_aio_server


class GRPCIntegration(Integration):
identifier = "grpc"

@staticmethod
def setup_once() -> None:
import grpc

grpc.insecure_channel = _wrap_channel_sync(grpc.insecure_channel)
grpc.secure_channel = _wrap_channel_sync(grpc.secure_channel)

grpc.aio.insecure_channel = _wrap_channel_async(grpc.aio.insecure_channel)
grpc.aio.secure_channel = _wrap_channel_async(grpc.aio.secure_channel)

grpc.server = _wrap_sync_server(grpc.server)
grpc.aio.server = _wrap_async_server(grpc.aio.server)
1 change: 1 addition & 0 deletions sentry_sdk/integrations/grpc/aio/__init__.py
@@ -0,0 +1 @@
from .server import ServerInterceptor # noqa: F401
fdellekart marked this conversation as resolved.
Show resolved Hide resolved
56 changes: 56 additions & 0 deletions sentry_sdk/integrations/grpc/aio/client.py
@@ -0,0 +1,56 @@
from typing import Callable, Union

from grpc.aio import UnaryUnaryClientInterceptor
from grpc.aio._call import UnaryUnaryCall
from grpc.aio._interceptor import ClientCallDetails
fdellekart marked this conversation as resolved.
Show resolved Hide resolved
from google.protobuf.message import Message

from sentry_sdk import Hub
from sentry_sdk.consts import OP


class AsyncClientInterceptor(UnaryUnaryClientInterceptor):
fdellekart marked this conversation as resolved.
Show resolved Hide resolved
async def intercept_unary_unary(
self,
continuation: Callable[[ClientCallDetails, Message], UnaryUnaryCall],
client_call_details: ClientCallDetails,
request: Message,
) -> Union[UnaryUnaryCall, Message]:
hub = Hub.current
method = client_call_details.method

with hub.start_span(
op=OP.GRPC_CLIENT, description="unary unary call to %s" % method.decode()
) as span:
span.set_data("type", "unary unary")
span.set_data("method", method)

client_call_details = self._update_client_call_details_metadata_from_hub(
client_call_details, hub
)

response = await continuation(client_call_details, request)
status_code = await response.code()
span.set_data("code", status_code.name)

return response

@staticmethod
def _update_client_call_details_metadata_from_hub(
client_call_details: ClientCallDetails, hub: Hub
):
metadata = (
list(client_call_details.metadata) if client_call_details.metadata else []
)
for key, value in hub.iter_trace_propagation_headers():
metadata.append((key, value))

client_call_details = ClientCallDetails(
method=client_call_details.method,
timeout=client_call_details.timeout,
metadata=metadata,
credentials=client_call_details.credentials,
wait_for_ready=client_call_details.wait_for_ready,
)

return client_call_details
70 changes: 70 additions & 0 deletions sentry_sdk/integrations/grpc/aio/server.py
@@ -0,0 +1,70 @@
from functools import wraps

from sentry_sdk import Hub
from sentry_sdk._types import MYPY
from sentry_sdk.consts import OP
from sentry_sdk.integrations import DidNotEnable
from sentry_sdk.tracing import Transaction, TRANSACTION_SOURCE_CUSTOM
from sentry_sdk.utils import event_from_exception

if MYPY:
from collections.abc import Awaitable, Callable
from typing import Any


try:
import grpc
from grpc import HandlerCallDetails, RpcMethodHandler
from grpc.aio import ServicerContext
from grpc.experimental import wrap_server_method_handler
except ImportError:
raise DidNotEnable("grpcio is not installed")


class ServerInterceptor(grpc.aio.ServerInterceptor): # type: ignore
def __init__(self, find_name=None):
# type: (ServerInterceptor, Callable[[ServicerContext], str] | None) -> None
self._find_method_name = find_name or ServerInterceptor._find_name

super(ServerInterceptor, self).__init__()

async def intercept_service(self, continuation, handler_call_details):
# type: (ServerInterceptor, Callable[[HandlerCallDetails], Awaitable[RpcMethodHandler]], HandlerCallDetails) -> Awaitable[RpcMethodHandler]
handler = await continuation(handler_call_details)
return wrap_server_method_handler(self.wrapper, handler)

def wrapper(self, handler):
# type: (ServerInterceptor, Callable[[Any, ServicerContext], Awaitable[Any]]) -> Callable[[Any, ServicerContext], Awaitable[Any]]
@wraps(handler)
async def wrapped(request, context):
# type: (Any, ServicerContext) -> Any
name = self._find_method_name(context)
if not name:
return await handler(request, context)

hub = Hub(Hub.current)

transaction = Transaction.continue_from_headers(
dict(context.invocation_metadata()),
op=OP.GRPC_SERVER,
name=name,
source=TRANSACTION_SOURCE_CUSTOM,
)

with hub.start_transaction(transaction=transaction):
try:
return await handler(request, context)
except Exception as exc:
event, hint = event_from_exception(
exc,
mechanism={"type": "gRPC", "handled": False},
)
hub.capture_event(event, hint=hint)
raise

return wrapped

@staticmethod
def _find_name(context):
# type: (ServicerContext) -> str
return context._rpc_event.call_details.method.decode()
169 changes: 169 additions & 0 deletions tests/integrations/grpc/test_aio_grpc.py
@@ -0,0 +1,169 @@
from __future__ import absolute_import

import asyncio
import os

import grpc
import pytest
import pytest_asyncio
import sentry_sdk

from sentry_sdk import Hub, start_transaction
from sentry_sdk.consts import OP
from sentry_sdk.integrations.grpc import GRPCIntegration
from tests.integrations.grpc.grpc_test_service_pb2 import gRPCTestMessage
from tests.integrations.grpc.grpc_test_service_pb2_grpc import (
gRPCTestServiceServicer,
add_gRPCTestServiceServicer_to_server,
gRPCTestServiceStub,
)

AIO_PORT = 50052
AIO_PORT += os.getpid() % 100 # avoid port conflicts when running tests in parallel


@pytest.fixture(scope="function")
def event_loop(request):
"""Create an instance of the default event loop for each test case."""
loop = asyncio.new_event_loop()
yield loop
loop.close()


@pytest_asyncio.fixture(scope="function")
async def grpc_server(sentry_init, event_loop):
sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()])
server = grpc.aio.server()
server.add_insecure_port(f"[::]:{AIO_PORT}")
add_gRPCTestServiceServicer_to_server(TestService, server)

await event_loop.create_task(server.start())

try:
yield server
finally:
await server.stop(None)


@pytest.mark.asyncio
async def test_grpc_server_starts_transaction(capture_events, grpc_server):
events = capture_events()

async with grpc.aio.insecure_channel(f"localhost:{AIO_PORT}") as channel:
stub = gRPCTestServiceStub(channel)
await stub.TestServe(gRPCTestMessage(text="test"))

(event,) = events
span = event["spans"][0]

assert event["type"] == "transaction"
assert event["transaction_info"] == {
"source": "custom",
}
assert event["contexts"]["trace"]["op"] == OP.GRPC_SERVER
assert span["op"] == "test"


@pytest.mark.asyncio
async def test_grpc_server_continues_transaction(capture_events, grpc_server):
events = capture_events()

async with grpc.aio.insecure_channel(f"localhost:{AIO_PORT}") as channel:
stub = gRPCTestServiceStub(channel)

with sentry_sdk.start_transaction() as transaction:
metadata = (
(
"baggage",
"sentry-trace_id={trace_id},sentry-environment=test,"
"sentry-transaction=test-transaction,sentry-sample_rate=1.0".format(
trace_id=transaction.trace_id
),
),
(
"sentry-trace",
"{trace_id}-{parent_span_id}-{sampled}".format(
trace_id=transaction.trace_id,
parent_span_id=transaction.span_id,
sampled=1,
),
),
)

await stub.TestServe(gRPCTestMessage(text="test"), metadata=metadata)

(event, _) = events
span = event["spans"][0]

assert event["type"] == "transaction"
assert event["transaction_info"] == {
"source": "custom",
}
assert event["contexts"]["trace"]["op"] == OP.GRPC_SERVER
assert event["contexts"]["trace"]["trace_id"] == transaction.trace_id
assert span["op"] == "test"


@pytest.mark.asyncio
async def test_grpc_server_exception(sentry_init, capture_events, grpc_server):
sentry_init(traces_sample_rate=1.0)
events = capture_events()

async with grpc.aio.insecure_channel(f"localhost:{AIO_PORT}") as channel:
stub = gRPCTestServiceStub(channel)
try:
await stub.TestServe(gRPCTestMessage(text="exception"))
raise AssertionError()
except Exception:
pass

(event, _) = events

assert event["exception"]["values"][0]["type"] == "TestService.TestException"
assert event["exception"]["values"][0]["value"] == "test"
assert event["exception"]["values"][0]["mechanism"]["handled"] is False
assert event["exception"]["values"][0]["mechanism"]["type"] == "gRPC"


@pytest.mark.asyncio
async def test_grpc_client_starts_span(grpc_server, capture_events_forksafe):
events = capture_events_forksafe()

async with grpc.aio.insecure_channel(f"localhost:{AIO_PORT}") as channel:
stub = gRPCTestServiceStub(channel)
with start_transaction():
await stub.TestServe(gRPCTestMessage(text="test"))

events.write_file.close()
events.read_event()
local_transaction = events.read_event()
span = local_transaction["spans"][0]

assert len(local_transaction["spans"]) == 1
assert span["op"] == OP.GRPC_CLIENT
assert (
span["description"]
== "unary unary call to /grpc_test_server.gRPCTestService/TestServe"
)
assert span["data"] == {
"type": "unary unary",
"method": "/grpc_test_server.gRPCTestService/TestServe",
"code": "OK",
}


class TestService(gRPCTestServiceServicer):
class TestException(Exception):
def __init__(self):
super().__init__("test")

@classmethod
async def TestServe(cls, request, context): # noqa: N802
hub = Hub.current
with hub.start_span(op="test", description="test"):
pass

if request.text == "exception":
raise cls.TestException()

return gRPCTestMessage(text=request.text)