Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gRPC integration and aio interceptors #2369

Merged
merged 63 commits into from Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from 60 commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
c341091
feat: Add interceptor for async gRPC server
Apr 28, 2023
a554078
feat: Add exception handling to async gRPC interceptor
Apr 28, 2023
0296157
feat: Add gRPC integration with monkeypatch for synchronous client si…
fdellekart Aug 26, 2023
dfb8726
feat: Add patch with gRPC server side interceptors for grpc.server
fdellekart Aug 26, 2023
aaf705e
test: Add tests to verify gRPC integration does not break other inter…
fdellekart Aug 27, 2023
155e998
feat: Add monkeypatch for async server to gRPC integrations
fdellekart Aug 27, 2023
0b487e1
feat(grpc): Add async unary unary client interceptor
fdellekart Sep 1, 2023
ce73a36
feat(grpc): Add monkeypatching for async channels to integration
fdellekart Sep 1, 2023
96c2c91
test(grpc): Add test for aio client integration
fdellekart Sep 1, 2023
4103929
refactor: Avoid unnecessary code duplication in grpc integration
fdellekart Sep 10, 2023
a8ddcf6
fix: gRPC async metadata can be tuple although differently typed
fdellekart Sep 13, 2023
1861aa8
Merge branch 'master' into feat/grpc-aio-integration
antonpirker Sep 18, 2023
9f1bc88
refactor(grpc): consistent naming and imports
fdellekart Sep 22, 2023
fac3ee3
Merge branch 'master' into feat/grpc-aio-integration
antonpirker Sep 27, 2023
b43784f
Added pytest-asyncio to test deps
antonpirker Sep 27, 2023
34bad4c
Added types to linter requirements
antonpirker Sep 27, 2023
5a89d9b
Made mechanism type lowercase
antonpirker Sep 27, 2023
3c493bc
Merge branch 'master' into feat/grpc-aio-integration
antonpirker Sep 29, 2023
8341a0d
fix: typing of async gRPC integration
fdellekart Sep 29, 2023
69704d4
feat(gRPC): Add async unary-stream interceptor
fdellekart Sep 29, 2023
53bb01c
Merge branch 'master' into feat/grpc-aio-integration
antonpirker Oct 2, 2023
4cde781
Merge branch 'master' into feat/grpc-aio-integration
antonpirker Oct 2, 2023
ccacf0a
Trying to make typing work in older Python versions
antonpirker Oct 2, 2023
c1c0be7
The tests need the types.
antonpirker Oct 2, 2023
1dff2a7
Merge branch 'master' into feat/grpc-aio-integration
antonpirker Oct 2, 2023
4c71549
Fixed typo
antonpirker Oct 2, 2023
c734463
Merge branch 'feat/grpc-aio-integration' of github.com:fdellekart/sen…
antonpirker Oct 2, 2023
b0b80fc
Merge branch 'master' into feat/grpc-aio-integration
antonpirker Oct 3, 2023
d2e1bbe
Cleaned up tests
antonpirker Oct 3, 2023
d4ffa1f
Prevent flake8 from checking generated files
antonpirker Oct 3, 2023
d737830
Tell black to not check auto generated files.
antonpirker Oct 3, 2023
02cfbde
Added pyproject.toml to right directory
antonpirker Oct 3, 2023
4bf2c83
Fixed some linting
antonpirker Oct 3, 2023
0fb680e
Merge branch 'feat/grpc-aio-integration' of github.com:fdellekart/sen…
fdellekart Oct 3, 2023
2437f87
test(gRPC): Add test for unary stream client interceptor
fdellekart Oct 3, 2023
30ada2f
refactor(gRPC): Remove external APIs from server side interceptor
fdellekart Oct 3, 2023
8b3e7e6
Cleaned up tests
antonpirker Oct 4, 2023
9aba63e
Updated black config to work with pre-commit
antonpirker Oct 4, 2023
b625b2d
cleanup
antonpirker Oct 4, 2023
9c9f3d8
cleanup
antonpirker Oct 4, 2023
7cd1057
Merge branch 'feat/grpc-aio-integration' of github.com:fdellekart/sen…
fdellekart Oct 6, 2023
2276ee4
refactor: gRPC code generation script to be run from project root
fdellekart Oct 6, 2023
119e415
test: Add test endpoint for unary-stream gRPC method
fdellekart Oct 6, 2023
c465dcf
fix: Typos in interceptors and status code of stream response stuck f…
fdellekart Oct 6, 2023
aec6c7c
test: Add test for async unary-stream client interceptor
fdellekart Oct 6, 2023
7b09a67
fix(gRPC): Make sure that server side interceptor does not break unar…
fdellekart Oct 6, 2023
6a19737
feat(gRPC): Ensure backwards compatibility in case gRPC interceptors …
fdellekart Oct 6, 2023
545f26b
test(gRPC): Add tests for currently unsupported RPC types
fdellekart Oct 9, 2023
dac5b60
fix(gRPC): aio integration was breaking unsupported RPC types
fdellekart Oct 9, 2023
9a97c9a
Merge branch 'master' into feat/grpc-aio-integration
antonpirker Oct 11, 2023
ff2ff75
Merge branch 'master' into feat/grpc-aio-integration
antonpirker Oct 11, 2023
63b912e
Merge branch 'master' into feat/grpc-aio-integration
antonpirker Oct 12, 2023
7cb78df
Small typing fix
antonpirker Oct 12, 2023
b0e6d19
Small linting fix
antonpirker Oct 12, 2023
b678001
Merge branch 'master' into feat/grpc-aio-integration
antonpirker Nov 6, 2023
ff9372b
Merge branch 'master' into pr/fdellekart/2369
antonpirker Nov 6, 2023
afacd37
Consistent naming (was my fault I guess)
antonpirker Nov 6, 2023
7be1306
Set nicer transaction name for async server interceptor
antonpirker Nov 7, 2023
27ab22f
Merge branch 'master' into feat/grpc-aio-integration
antonpirker Nov 8, 2023
2247336
Merge branch 'master' into feat/grpc-aio-integration
antonpirker Nov 8, 2023
f83e610
Make linter happy
antonpirker Nov 8, 2023
1271459
Make mypy happy
antonpirker Nov 8, 2023
dddfa41
Merge branch 'master' into feat/grpc-aio-integration
antonpirker Nov 8, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Expand Up @@ -11,6 +11,7 @@ repos:
rev: 22.6.0
hooks:
- id: black
exclude: ^(.*_pb2.py|.*_pb2_grpc.py)

- repo: https://github.com/pycqa/flake8
rev: 5.0.4
Expand Down
1 change: 1 addition & 0 deletions linter-requirements.txt
Expand Up @@ -2,6 +2,7 @@ mypy
black
flake8==5.0.4 # flake8 depends on pyflakes>=3.0.0 and this dropped support for Python 2 "# type:" comments
types-certifi
types-protobuf
types-redis
types-setuptools
pymongo # There is no separate types module.
Expand Down
10 changes: 10 additions & 0 deletions pyproject.toml
@@ -0,0 +1,10 @@
[tool.black]
# 'extend-exclude' excludes files or directories in addition to the defaults
extend-exclude = '''
# A regex preceded with ^/ will apply only to files and directories
# in the root of the project.
(
.*_pb2.py # exclude autogenerated Protocol Buffer files anywhere in the project
| .*_pb2_grpc.py # exclude autogenerated Protocol Buffer files anywhere in the project
)
'''
154 changes: 152 additions & 2 deletions sentry_sdk/integrations/grpc/__init__.py
@@ -1,2 +1,152 @@
from .server import ServerInterceptor # noqa: F401
from .client import ClientInterceptor # noqa: F401
from functools import wraps

import grpc
from grpc import Channel, Server, intercept_channel
from grpc.aio import Channel as AsyncChannel
from grpc.aio import Server as AsyncServer

from sentry_sdk.integrations import Integration
from sentry_sdk._types import TYPE_CHECKING

from .client import ClientInterceptor
from .server import ServerInterceptor
from .aio.server import ServerInterceptor as AsyncServerInterceptor
from .aio.client import (
SentryUnaryUnaryClientInterceptor as AsyncUnaryUnaryClientInterceptor,
)
from .aio.client import (
SentryUnaryStreamClientInterceptor as AsyncUnaryStreamClientIntercetor,
)

from typing import Any, Optional, Sequence

# Hack to get new Python features working in older versions
# without introducing a hard dependency on `typing_extensions`
# from: https://stackoverflow.com/a/71944042/300572
if TYPE_CHECKING:
from typing import ParamSpec, Callable

Check warning on line 27 in sentry_sdk/integrations/grpc/__init__.py

View check run for this annotation

Codecov / codecov/patch

sentry_sdk/integrations/grpc/__init__.py#L27

Added line #L27 was not covered by tests
else:
# Fake ParamSpec
class ParamSpec:
def __init__(self, _):
self.args = None
self.kwargs = None

# Callable[anything] will return None
class _Callable:
def __getitem__(self, _):
return None

# Make instances
Callable = _Callable()

P = ParamSpec("P")


def _wrap_channel_sync(func: Callable[P, Channel]) -> Callable[P, Channel]:
"Wrapper for synchronous secure and insecure channel."

@wraps(func)
def patched_channel(*args: Any, **kwargs: Any) -> Channel:
channel = func(*args, **kwargs)
if not ClientInterceptor._is_intercepted:
ClientInterceptor._is_intercepted = True
return intercept_channel(channel, ClientInterceptor())
else:
return channel

Check warning on line 56 in sentry_sdk/integrations/grpc/__init__.py

View check run for this annotation

Codecov / codecov/patch

sentry_sdk/integrations/grpc/__init__.py#L56

Added line #L56 was not covered by tests

return patched_channel


def _wrap_intercept_channel(func: Callable[P, Channel]) -> Callable[P, Channel]:
@wraps(func)
def patched_intercept_channel(
channel: Channel, *interceptors: grpc.ServerInterceptor
) -> Channel:
if ClientInterceptor._is_intercepted:
interceptors = tuple(
[
interceptor
for interceptor in interceptors
if not isinstance(interceptor, ClientInterceptor)
]
)
else:
interceptors = interceptors

Check warning on line 75 in sentry_sdk/integrations/grpc/__init__.py

View check run for this annotation

Codecov / codecov/patch

sentry_sdk/integrations/grpc/__init__.py#L75

Added line #L75 was not covered by tests
return intercept_channel(channel, *interceptors)

return patched_intercept_channel # type: ignore


def _wrap_channel_async(func: Callable[P, AsyncChannel]) -> Callable[P, AsyncChannel]:
"Wrapper for asynchronous secure and insecure channel."

@wraps(func)
def patched_channel(
*args: P.args,
interceptors: Optional[Sequence[grpc.aio.ClientInterceptor]] = None,
**kwargs: P.kwargs,
) -> Channel:
sentry_interceptors = [
AsyncUnaryUnaryClientInterceptor(),
AsyncUnaryStreamClientIntercetor(),
]
interceptors = [*sentry_interceptors, *(interceptors or [])]
return func(*args, interceptors=interceptors, **kwargs) # type: ignore

return patched_channel # type: ignore


def _wrap_sync_server(func: Callable[P, Server]) -> Callable[P, Server]:
"""Wrapper for synchronous server."""

@wraps(func)
def patched_server(
*args: P.args,
interceptors: Optional[Sequence[grpc.ServerInterceptor]] = None,
**kwargs: P.kwargs,
) -> Server:
interceptors = [
interceptor
for interceptor in interceptors or []
if not isinstance(interceptor, ServerInterceptor)
]
server_interceptor = ServerInterceptor()
interceptors = [server_interceptor, *(interceptors or [])]
return func(*args, interceptors=interceptors, **kwargs) # type: ignore

return patched_server # type: ignore


def _wrap_async_server(func: Callable[P, AsyncServer]) -> Callable[P, AsyncServer]:
"""Wrapper for asynchronous server."""

@wraps(func)
def patched_aio_server(
*args: P.args,
interceptors: Optional[Sequence[grpc.ServerInterceptor]] = None,
**kwargs: P.kwargs,
) -> Server:
server_interceptor = AsyncServerInterceptor()
interceptors = [server_interceptor, *(interceptors or [])]
return func(*args, interceptors=interceptors, **kwargs) # type: ignore

return patched_aio_server # type: ignore


class GRPCIntegration(Integration):
identifier = "grpc"

@staticmethod
def setup_once() -> None:
import grpc

grpc.insecure_channel = _wrap_channel_sync(grpc.insecure_channel)
grpc.secure_channel = _wrap_channel_sync(grpc.secure_channel)
grpc.intercept_channel = _wrap_intercept_channel(grpc.intercept_channel)

grpc.aio.insecure_channel = _wrap_channel_async(grpc.aio.insecure_channel)
grpc.aio.secure_channel = _wrap_channel_async(grpc.aio.secure_channel)

grpc.server = _wrap_sync_server(grpc.server)
grpc.aio.server = _wrap_async_server(grpc.aio.server)
2 changes: 2 additions & 0 deletions sentry_sdk/integrations/grpc/aio/__init__.py
@@ -0,0 +1,2 @@
from .server import ServerInterceptor # noqa: F401
fdellekart marked this conversation as resolved.
Show resolved Hide resolved
from .client import ClientInterceptor # noqa: F401
91 changes: 91 additions & 0 deletions sentry_sdk/integrations/grpc/aio/client.py
@@ -0,0 +1,91 @@
from typing import Callable, Union, AsyncIterable, Any

from grpc.aio import (
UnaryUnaryClientInterceptor,
UnaryStreamClientInterceptor,
ClientCallDetails,
UnaryUnaryCall,
UnaryStreamCall,
)
from google.protobuf.message import Message

from sentry_sdk import Hub
from sentry_sdk.consts import OP


class ClientInterceptor:
@staticmethod
def _update_client_call_details_metadata_from_hub(
client_call_details: ClientCallDetails, hub: Hub
) -> ClientCallDetails:
metadata = (
list(client_call_details.metadata) if client_call_details.metadata else []
)
for key, value in hub.iter_trace_propagation_headers():
metadata.append((key, value))

client_call_details = ClientCallDetails(
method=client_call_details.method,
timeout=client_call_details.timeout,
metadata=metadata,
credentials=client_call_details.credentials,
wait_for_ready=client_call_details.wait_for_ready,
)

return client_call_details


class SentryUnaryUnaryClientInterceptor(ClientInterceptor, UnaryUnaryClientInterceptor): # type: ignore
async def intercept_unary_unary(
self,
continuation: Callable[[ClientCallDetails, Message], UnaryUnaryCall],
client_call_details: ClientCallDetails,
request: Message,
) -> Union[UnaryUnaryCall, Message]:
hub = Hub.current
method = client_call_details.method

with hub.start_span(
op=OP.GRPC_CLIENT, description="unary unary call to %s" % method.decode()
) as span:
span.set_data("type", "unary unary")
span.set_data("method", method)

client_call_details = self._update_client_call_details_metadata_from_hub(
client_call_details, hub
)

response = await continuation(client_call_details, request)
status_code = await response.code()
span.set_data("code", status_code.name)

return response


class SentryUnaryStreamClientInterceptor(
ClientInterceptor, UnaryStreamClientInterceptor # type: ignore
):
async def intercept_unary_stream(
self,
continuation: Callable[[ClientCallDetails, Message], UnaryStreamCall],
client_call_details: ClientCallDetails,
request: Message,
) -> Union[AsyncIterable[Any], UnaryStreamCall]:
hub = Hub.current
method = client_call_details.method

with hub.start_span(
op=OP.GRPC_CLIENT, description="unary stream call to %s" % method.decode()
) as span:
span.set_data("type", "unary stream")
span.set_data("method", method)

client_call_details = self._update_client_call_details_metadata_from_hub(
client_call_details, hub
)

response = await continuation(client_call_details, request)
# status_code = await response.code()
# span.set_data("code", status_code)

return response
94 changes: 94 additions & 0 deletions sentry_sdk/integrations/grpc/aio/server.py
@@ -0,0 +1,94 @@
from sentry_sdk import Hub
from sentry_sdk._types import MYPY
from sentry_sdk.consts import OP
from sentry_sdk.integrations import DidNotEnable
from sentry_sdk.tracing import Transaction, TRANSACTION_SOURCE_CUSTOM
from sentry_sdk.utils import event_from_exception

if MYPY:
from collections.abc import Awaitable, Callable
from typing import Any

Check warning on line 10 in sentry_sdk/integrations/grpc/aio/server.py

View check run for this annotation

Codecov / codecov/patch

sentry_sdk/integrations/grpc/aio/server.py#L9-L10

Added lines #L9 - L10 were not covered by tests


try:
import grpc
from grpc import HandlerCallDetails, RpcMethodHandler
from grpc.aio import ServicerContext
except ImportError:
raise DidNotEnable("grpcio is not installed")

Check warning on line 18 in sentry_sdk/integrations/grpc/aio/server.py

View check run for this annotation

Codecov / codecov/patch

sentry_sdk/integrations/grpc/aio/server.py#L17-L18

Added lines #L17 - L18 were not covered by tests


class ServerInterceptor(grpc.aio.ServerInterceptor): # type: ignore
def __init__(self, find_name=None):
# type: (ServerInterceptor, Callable[[ServicerContext], str] | None) -> None
self._find_method_name = find_name or self._find_name

super(ServerInterceptor, self).__init__()

async def intercept_service(self, continuation, handler_call_details):
# type: (ServerInterceptor, Callable[[HandlerCallDetails], Awaitable[RpcMethodHandler]], HandlerCallDetails) -> Awaitable[RpcMethodHandler]
self._handler_call_details = handler_call_details
handler = await continuation(handler_call_details)

if not handler.request_streaming and not handler.response_streaming:
handler_factory = grpc.unary_unary_rpc_method_handler

async def wrapped(request, context):
# type: (Any, ServicerContext) -> Any
name = self._find_method_name(context)
if not name:
return await handler(request, context)

Check warning on line 40 in sentry_sdk/integrations/grpc/aio/server.py

View check run for this annotation

Codecov / codecov/patch

sentry_sdk/integrations/grpc/aio/server.py#L40

Added line #L40 was not covered by tests

hub = Hub.current

# What if the headers are empty?
transaction = Transaction.continue_from_headers(
dict(context.invocation_metadata()),
op=OP.GRPC_SERVER,
name=name,
source=TRANSACTION_SOURCE_CUSTOM,
)

with hub.start_transaction(transaction=transaction):
try:
return await handler.unary_unary(request, context)
except Exception as exc:
event, hint = event_from_exception(
exc,
mechanism={"type": "grpc", "handled": False},
)
hub.capture_event(event, hint=hint)
raise

elif not handler.request_streaming and handler.response_streaming:
handler_factory = grpc.unary_stream_rpc_method_handler

async def wrapped(request, context): # type: ignore
# type: (Any, ServicerContext) -> Any
async for r in handler.unary_stream(request, context):
yield r

elif handler.request_streaming and not handler.response_streaming:
handler_factory = grpc.stream_unary_rpc_method_handler

async def wrapped(request, context):
# type: (Any, ServicerContext) -> Any
response = handler.stream_unary(request, context)
return await response

elif handler.request_streaming and handler.response_streaming:
handler_factory = grpc.stream_stream_rpc_method_handler

async def wrapped(request, context): # type: ignore
# type: (Any, ServicerContext) -> Any
async for r in handler.stream_stream(request, context):
yield r

return handler_factory(
wrapped,
request_deserializer=handler.request_deserializer,
response_serializer=handler.response_serializer,
)

def _find_name(self, context):
return self._handler_call_details.method
7 changes: 5 additions & 2 deletions sentry_sdk/integrations/grpc/client.py
Expand Up @@ -11,14 +11,16 @@
from grpc import ClientCallDetails, Call
from grpc._interceptor import _UnaryOutcome
from grpc.aio._interceptor import UnaryStreamCall
from google.protobuf.message import Message # type: ignore
from google.protobuf.message import Message
except ImportError:
raise DidNotEnable("grpcio is not installed")


class ClientInterceptor(
grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor # type: ignore
):
_is_intercepted = False

def intercept_unary_unary(self, continuation, client_call_details, request):
# type: (ClientInterceptor, Callable[[ClientCallDetails, Message], _UnaryOutcome], ClientCallDetails, Message) -> _UnaryOutcome
hub = Hub.current
Expand Down Expand Up @@ -57,7 +59,8 @@ def intercept_unary_stream(self, continuation, client_call_details, request):
response = continuation(
client_call_details, request
) # type: UnaryStreamCall
span.set_data("code", response.code().name)
# Setting code on unary-stream leads to execution getting stuck
# span.set_data("code", response.code().name)

return response

Expand Down