Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PersistentConnectionProvider UX Improvements #3397

Open
fselmo opened this issue May 15, 2024 · 3 comments
Open

PersistentConnectionProvider UX Improvements #3397

fselmo opened this issue May 15, 2024 · 3 comments

Comments

@fselmo
Copy link
Collaborator

fselmo commented May 15, 2024

The newer PersistentConnectionProvider implementations, mostly the WebSocketProvider, seem to be getting a lot of use and a lot of good feedback is coming back from users, whether through issues or Discord. It's time to start thinking about some good UX abstractions that can help facilitate subscription management, re-connection logic, and anything else that might make sense to bake into the library.

Some ideas that have been brewing:

  • Configurable re-connection logic. This should be able to be turned off and also configured for different exceptions, etc.
  • Storing subscription request information along with the params (I believe this is already done in the request_processor._request_information_cache) and re-subscribing with new subscription ids when the re-connection is triggered.
  • Handling subscriptions via specific callbacks. Could this be an argument to w3.eth.subscribe method perhaps? Likely managed within a SubscriptionManager class strapped to persistent connection providers.

If you have UX improvements, pain points, or any comments on anything discussed in this issue, please feel free to join the discussion below!

@fselmo fselmo pinned this issue May 15, 2024
@DefiDebauchery
Copy link
Contributor

DefiDebauchery commented May 15, 2024

You hit on the two that I'm most interested in! As a user, I don't want to have to manage process_subscriptions().

Not specific to the Persistent Provider itself, but as far as overall UX - and to tack onto the mention of .subscribe(), do you think it would be worthwhile to add optional kwargs to the method instead of (or in addition to) relying on LogsSubscriptionArg?

    async def subscribe(
        self,
        subscription_type: SubscriptionType,  # Meh
        subscription_arg: Optional[
            Union[
                LogsSubscriptionArg,  # logs, optional filter params
                bool,  # newPendingTransactions, full_transactions
            ]
        ] = None,
        callback: Optional[Callable] = None,  # Sneaking that in here
        *,
        address: Optional[
            Union[
                Address,
                ChecksumAddress,
                ENS,
                Sequence[Union[Address, ChecksumAddress, ENS]]
            ],
        ]= None,
        topics: Optional[Sequence[Union[HexStr, Sequence[HexStr]]]] = None,
    ) -> HexStr:

This will certainly require some additional sanity-checking, and may even require separate methods for subscriptions between newHeads and logs (et al; which wouldn't be terrible, as I don't necessarily love subscription_type), but it may be more expressive to have a defined set of arguments, while still allowing people to use pre-packaged log structures.

@clover-es
Copy link

For me as web3.py user, my expectation was to have some class abstraction such as SubscriptionManager that will allow to:

  • Handle RPC connection and task lifecycle.
    • Provide high level asyncio coroutine to be added as task -> connect and process active subscriptions.
    • Gracefully cancelling subscription processing task when asyncio.CancelledError is thrown.
  • Handle several subscriptions.
    • Subscribe and unsubscribe to different topics without connecting or disconnecting to the websocket.
    • For example, the subscribe() method would provide an Optional callback function to be called when a message is received from within the given subscription.
    • If callback function is not provided in the subscribe(), maybe SubscriptionManager might provide a general callback function in the __init__().
This is a quick draft of the SubscriptionManager class I provided in Discord:
import asyncio

from typing import Dict

from eth_typing import HexStr
from web3 import AsyncWeb3, WebSocketProvider
from web3.types import SubscriptionType
from websockets import ConnectionClosedError, ConnectionClosed


class SubscriptionHandler:
    w3_socket: AsyncWeb3 = None
    # Dictionary containing callbacks for each subscription id
    callbacks: Dict[HexStr, callable] = {}

    def __init__(self, wss_url):
        self.wss_url = wss_url

    async def process_subscriptions(self) -> None:
        """
        Performs the websocket connection and processes the subscriptions and calls the callbacks
        :return: None
        """
        async for self.w3_socket in AsyncWeb3(WebSocketProvider(self.wss_url)):
            try:
                async for message in self.w3_socket.socket.process_subscriptions():
                    try:
                        self.callbacks[message["subscription"]](message["result"])
                    except ValueError as e:
                        try:
                            print(f"Callback for {message['subscription']} not found")
                        except ValueError as e:
                            print(f"Unexpected response from RPC: {e}")
            except (ConnectionClosedError, ConnectionClosed) as e:
                continue
            except asyncio.CancelledError:
                print("Cancelling subscriptions")
                for sub_id in self.callbacks.keys():
                    await self.w3_socket.eth.unsubscribe(sub_id)
                break

    async def subscribe(
        self, callback: callable, event_type: SubscriptionType, **event_params
    ) -> HexStr:
        """
        Subscribes to the given event type with the given callback.
        Must be called while process_subscriptions() task is running

        :param callback: The function to call when the event is received
        :param event_type: The event type to subscribe to
        :param event_params: Additional parameters to pass to the subscription
        :return: The subscription ID
        """
        if self.is_connected():
            sub_id = await self.w3_socket.eth.subscribe(event_type, event_params)
            print(f"Subscribed to {sub_id}")
            self.callbacks[sub_id] = callback
            return sub_id
        else:
            raise RuntimeError(
                "Websocket connection not established, it's not possible to subscribe"
            )

    async def unsubscribe(self, sub_id: HexStr) -> None:
        """
        Unsubscribes from a subscription identified by sub_id.
        Must be called while process_subscriptions() task is running

        :param sub_id: The subscription ID to unsubscribe from
        :return: None
        """
        if self.is_connected():
            await self.w3_socket.eth.unsubscribe(sub_id)
            self.callbacks.pop(sub_id)
        else:
            raise RuntimeError(
                "Websocket connection not established, it's not possible to unsubscribe"
            )

    def is_connected(self) -> bool:
        return self.w3_socket is not None
And usage of the class itself:
def callback_logs(message):
    print(f"New log received: {message}")


def callback_heads(message):
    print(f"New header received: {message}")


async def main():
    subs_handler = SubscriptionHandler("wss://eth.drpc.org")
    # Connects to the RPC wss
    sub_task = asyncio.create_task(subs_handler.process_subscriptions())
    # Waits for the connection to be established
    while not subs_handler.is_connected():
        await asyncio.sleep(1)
    # Subscribes to desired events
    await subs_handler.subscribe(
        callback_logs, "logs", address="0xb4e16d0168e52d35cacd2c6185b44281ec28c9dc"
    )
    new_heads_id = await subs_handler.subscribe(callback_heads, "newHeads")
    try:
        await asyncio.sleep(10)
        # Unsubscribe from new heads after 10 seconds (test unsubscribe)
        await subs_handler.unsubscribe(new_heads_id)
        while True:
            await asyncio.sleep(0)
    except asyncio.CancelledError:
        sub_task.cancel()
        await sub_task


if __name__ == "__main__":
    asyncio.run(main())

@jimtje
Copy link

jimtje commented May 24, 2024

I think it would be great if the logic and client can be separated, both to future-proof in case of any changes in the underlying libraries that is out of the control of the maintainers here the interface here can be maintained. It also would allow for better integration of both other async libraries (trio, curio, even gevent even though it's not technically async) but also different clients (httpx being the most prominent, also trio-websocket or any AnyIO based client). People have hacked together their own implementations of their stack (myself included), but what that turns into is that I end up using multiple web3 clients which is fine for personal use (even if it may involve transactions on the level of a small country's GDP in terms of values transacted) but is not ideal for people who perhaps aren't or haven't been writing code to interact with web3 nodes constantly and the abstraction is not merely shortcuts but maybe the primary way to learn how one interacts with web3 writ large programmatically.

This might be a slightly longer-term project and cover more than just PersistentConnectionProvider - although that is part of it for sure, but luckily libraries like AnyIO would help quite a bit, and there isn't really a deadline per se. I don't know how many people use other clients and implementations for most of the stack, but I suspect that httpx at least has a fair number of users, which felt unlikely as recent as the first year of COVID. I think web3 adoption is only going to increase and a library that is able to reduce friction for both maintainers/contributors and users in the long run would be helpful for a lot of people.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants