Skip to content

Commit

Permalink
Add integerations for socket and grpc (#1911)
Browse files Browse the repository at this point in the history
- The gRPC integration instruments all incoming requests and outgoing unary-unary, unary-stream grpc requests using grpcio channels. Use this integration to start or continue transactions for incoming grpc requests, create spans for outgoing requests, and ensure traces are properly propagated to downstream services.
- The Socket integration to create spans for dns resolves and connection creations.

---------

Co-authored-by: Anton Pirker <anton@ignaz.at>
  • Loading branch information
hossein-raeisi and antonpirker committed Mar 30, 2023
1 parent b98d727 commit 5d9cd4f
Show file tree
Hide file tree
Showing 19 changed files with 734 additions and 6 deletions.
4 changes: 4 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,7 @@ extend-ignore =
# is a worse version of and conflicts with B902 (first argument of a classmethod should be named cls)
N804,
extend-exclude=checkouts,lol*
exclude =
# gRCP generated files
grpc_test_service_pb2.py
grpc_test_service_pb2_grpc.py
73 changes: 73 additions & 0 deletions .github/workflows/test-integration-grpc.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
name: Test grpc

on:
push:
branches:
- master
- release/**

pull_request:

# Cancel in progress workflows on pull_requests.
# https://docs.github.com/en/actions/using-jobs/using-concurrency#example-using-a-fallback-value
concurrency:
group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }}
cancel-in-progress: true

permissions:
contents: read

env:
BUILD_CACHE_KEY: ${{ github.sha }}
CACHED_BUILD_PATHS: |
${{ github.workspace }}/dist-serverless
jobs:
test:
name: grpc, python ${{ matrix.python-version }}, ${{ matrix.os }}
runs-on: ${{ matrix.os }}
timeout-minutes: 45

strategy:
fail-fast: false
matrix:
python-version: ["3.7","3.8","3.9","3.10","3.11"]
# python3.6 reached EOL and is no longer being supported on
# new versions of hosted runners on Github Actions
# ubuntu-20.04 is the last version that supported python3.6
# see https://github.com/actions/setup-python/issues/544#issuecomment-1332535877
os: [ubuntu-20.04]

steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}

- name: Setup Test Env
run: |
pip install codecov "tox>=3,<4"
- name: Test grpc
timeout-minutes: 45
shell: bash
run: |
set -x # print commands that are executed
coverage erase
./scripts/runtox.sh "py${{ matrix.python-version }}-grpc" --cov=tests --cov=sentry_sdk --cov-report= --cov-branch
coverage combine .coverage*
coverage xml -i
codecov --file coverage.xml
check_required_tests:
name: All grpc tests passed or skipped
needs: test
# Always run this, even if a dependent job failed
if: always()
runs-on: ubuntu-20.04
steps:
- name: Check for failures
if: contains(needs.test.result, 'failure')
run: |
echo "One of the dependent jobs have failed. You may need to re-run it." && exit 1
2 changes: 2 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,5 @@ ignore_missing_imports = True
ignore_missing_imports = True
[mypy-arq.*]
ignore_missing_imports = True
[mypy-grpc.*]
ignore_missing_imports = True
4 changes: 4 additions & 0 deletions sentry_sdk/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ class OP:
FUNCTION = "function"
FUNCTION_AWS = "function.aws"
FUNCTION_GCP = "function.gcp"
GRPC_CLIENT = "grpc.client"
GRPC_SERVER = "grpc.server"
HTTP_CLIENT = "http.client"
HTTP_CLIENT_STREAM = "http.client.stream"
HTTP_SERVER = "http.server"
Expand All @@ -83,6 +85,8 @@ class OP:
VIEW_RENDER = "view.render"
VIEW_RESPONSE_RENDER = "view.response.render"
WEBSOCKET_SERVER = "websocket.server"
SOCKET_CONNECTION = "socket.connection"
SOCKET_DNS = "socket.dns"


# This type exists to trick mypy and PyCharm into thinking `init` and `Client`
Expand Down
2 changes: 2 additions & 0 deletions sentry_sdk/integrations/grpc/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .server import ServerInterceptor # noqa: F401
from .client import ClientInterceptor # noqa: F401
82 changes: 82 additions & 0 deletions sentry_sdk/integrations/grpc/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from sentry_sdk import Hub
from sentry_sdk._types import MYPY
from sentry_sdk.consts import OP
from sentry_sdk.integrations import DidNotEnable

if MYPY:
from typing import Any, Callable, Iterator, Iterable, Union

try:
import grpc
from grpc import ClientCallDetails, Call
from grpc._interceptor import _UnaryOutcome
from grpc.aio._interceptor import UnaryStreamCall
from google.protobuf.message import Message # type: ignore
except ImportError:
raise DidNotEnable("grpcio is not installed")


class ClientInterceptor(
grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor # type: ignore
):
def intercept_unary_unary(self, continuation, client_call_details, request):
# type: (ClientInterceptor, Callable[[ClientCallDetails, Message], _UnaryOutcome], ClientCallDetails, Message) -> _UnaryOutcome
hub = Hub.current
method = client_call_details.method

with hub.start_span(
op=OP.GRPC_CLIENT, description="unary unary call to %s" % method
) as span:
span.set_data("type", "unary unary")
span.set_data("method", method)

client_call_details = self._update_client_call_details_metadata_from_hub(
client_call_details, hub
)

response = continuation(client_call_details, request)
span.set_data("code", response.code().name)

return response

def intercept_unary_stream(self, continuation, client_call_details, request):
# type: (ClientInterceptor, Callable[[ClientCallDetails, Message], Union[Iterable[Any], UnaryStreamCall]], ClientCallDetails, Message) -> Union[Iterator[Message], Call]
hub = Hub.current
method = client_call_details.method

with hub.start_span(
op=OP.GRPC_CLIENT, description="unary stream call to %s" % method
) as span:
span.set_data("type", "unary stream")
span.set_data("method", method)

client_call_details = self._update_client_call_details_metadata_from_hub(
client_call_details, hub
)

response = continuation(
client_call_details, request
) # type: UnaryStreamCall
span.set_data("code", response.code().name)

return response

@staticmethod
def _update_client_call_details_metadata_from_hub(client_call_details, hub):
# type: (ClientCallDetails, Hub) -> ClientCallDetails
metadata = (
list(client_call_details.metadata) if client_call_details.metadata else []
)
for key, value in hub.iter_trace_propagation_headers():
metadata.append((key, value))

client_call_details = grpc._interceptor._ClientCallDetails(
method=client_call_details.method,
timeout=client_call_details.timeout,
metadata=metadata,
credentials=client_call_details.credentials,
wait_for_ready=client_call_details.wait_for_ready,
compression=client_call_details.compression,
)

return client_call_details
64 changes: 64 additions & 0 deletions sentry_sdk/integrations/grpc/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from sentry_sdk import Hub
from sentry_sdk._types import MYPY
from sentry_sdk.consts import OP
from sentry_sdk.integrations import DidNotEnable
from sentry_sdk.tracing import Transaction, TRANSACTION_SOURCE_CUSTOM

if MYPY:
from typing import Callable, Optional
from google.protobuf.message import Message # type: ignore

try:
import grpc
from grpc import ServicerContext, HandlerCallDetails, RpcMethodHandler
except ImportError:
raise DidNotEnable("grpcio is not installed")


class ServerInterceptor(grpc.ServerInterceptor): # type: ignore
def __init__(self, find_name=None):
# type: (ServerInterceptor, Optional[Callable[[ServicerContext], str]]) -> None
self._find_method_name = find_name or ServerInterceptor._find_name

super(ServerInterceptor, self).__init__()

def intercept_service(self, continuation, handler_call_details):
# type: (ServerInterceptor, Callable[[HandlerCallDetails], RpcMethodHandler], HandlerCallDetails) -> RpcMethodHandler
handler = continuation(handler_call_details)
if not handler or not handler.unary_unary:
return handler

def behavior(request, context):
# type: (Message, ServicerContext) -> Message
hub = Hub(Hub.current)

name = self._find_method_name(context)

if name:
metadata = dict(context.invocation_metadata())

transaction = Transaction.continue_from_headers(
metadata,
op=OP.GRPC_SERVER,
name=name,
source=TRANSACTION_SOURCE_CUSTOM,
)

with hub.start_transaction(transaction=transaction):
try:
return handler.unary_unary(request, context)
except BaseException as e:
raise e
else:
return handler.unary_unary(request, context)

return grpc.unary_unary_rpc_method_handler(
behavior,
request_deserializer=handler.request_deserializer,
response_serializer=handler.response_serializer,
)

@staticmethod
def _find_name(context):
# type: (ServicerContext) -> str
return context._rpc_event.call_details.method.decode()
89 changes: 89 additions & 0 deletions sentry_sdk/integrations/socket.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import socket
from sentry_sdk import Hub
from sentry_sdk._types import MYPY
from sentry_sdk.consts import OP
from sentry_sdk.integrations import Integration

if MYPY:
from socket import AddressFamily, SocketKind
from typing import Tuple, Optional, Union, List

__all__ = ["SocketIntegration"]


class SocketIntegration(Integration):
identifier = "socket"

@staticmethod
def setup_once():
# type: () -> None
"""
patches two of the most used functions of socket: create_connection and getaddrinfo(dns resolver)
"""
_patch_create_connection()
_patch_getaddrinfo()


def _get_span_description(host, port):
# type: (Union[bytes, str, None], Union[str, int, None]) -> str

try:
host = host.decode() # type: ignore
except (UnicodeDecodeError, AttributeError):
pass

description = "%s:%s" % (host, port) # type: ignore

return description


def _patch_create_connection():
# type: () -> None
real_create_connection = socket.create_connection

def create_connection(
address,
timeout=socket._GLOBAL_DEFAULT_TIMEOUT, # type: ignore
source_address=None,
):
# type: (Tuple[Optional[str], int], Optional[float], Optional[Tuple[Union[bytearray, bytes, str], int]])-> socket.socket
hub = Hub.current
if hub.get_integration(SocketIntegration) is None:
return real_create_connection(
address=address, timeout=timeout, source_address=source_address
)

with hub.start_span(
op=OP.SOCKET_CONNECTION,
description=_get_span_description(address[0], address[1]),
) as span:
span.set_data("address", address)
span.set_data("timeout", timeout)
span.set_data("source_address", source_address)

return real_create_connection(
address=address, timeout=timeout, source_address=source_address
)

socket.create_connection = create_connection


def _patch_getaddrinfo():
# type: () -> None
real_getaddrinfo = socket.getaddrinfo

def getaddrinfo(host, port, family=0, type=0, proto=0, flags=0):
# type: (Union[bytes, str, None], Union[str, int, None], int, int, int, int) -> List[Tuple[AddressFamily, SocketKind, int, str, Union[Tuple[str, int], Tuple[str, int, int, int]]]]
hub = Hub.current
if hub.get_integration(SocketIntegration) is None:
return real_getaddrinfo(host, port, family, type, proto, flags)

with hub.start_span(
op=OP.SOCKET_DNS, description=_get_span_description(host, port)
) as span:
span.set_data("host", host)
span.set_data("port", port)

return real_getaddrinfo(host, port, family, type, proto, flags)

socket.getaddrinfo = getaddrinfo
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def get_file_text(file_name):
"fastapi": ["fastapi>=0.79.0"],
"pymongo": ["pymongo>=3.1"],
"opentelemetry": ["opentelemetry-distro>=0.35b0"],
"grpcio": ["grpcio>=1.21.1"]
},
classifiers=[
"Development Status :: 5 - Production/Stable",
Expand Down
11 changes: 6 additions & 5 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,20 +311,21 @@ def flush(timeout=None, callback=None):
monkeypatch.setattr(test_client.transport, "capture_event", append)
monkeypatch.setattr(test_client, "flush", flush)

return EventStreamReader(events_r)
return EventStreamReader(events_r, events_w)

return inner


class EventStreamReader(object):
def __init__(self, file):
self.file = file
def __init__(self, read_file, write_file):
self.read_file = read_file
self.write_file = write_file

def read_event(self):
return json.loads(self.file.readline().decode("utf-8"))
return json.loads(self.read_file.readline().decode("utf-8"))

def read_flush(self):
assert self.file.readline() == b"flush\n"
assert self.read_file.readline() == b"flush\n"


# scope=session ensures that fixture is run earlier
Expand Down
3 changes: 3 additions & 0 deletions tests/integrations/grpc/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import pytest

pytest.importorskip("grpc")
11 changes: 11 additions & 0 deletions tests/integrations/grpc/grpc_test_service.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
syntax = "proto3";

package grpc_test_server;

service gRPCTestService{
rpc TestServe(gRPCTestMessage) returns (gRPCTestMessage);
}

message gRPCTestMessage {
string text = 1;
}

0 comments on commit 5d9cd4f

Please sign in to comment.