Skip to content

Commit

Permalink
Support client id in pubsub
Browse files Browse the repository at this point in the history
  • Loading branch information
kennethito committed Mar 6, 2024
1 parent 6dabeaf commit 3a54184
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 7 deletions.
8 changes: 7 additions & 1 deletion lib/redix/connector.ex
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,13 @@ defmodule Redix.Connector do
end
end

defp sync_command(transport, socket, command, timeout) do
@spec sync_command(
:ssl | :gen_tcp,
:gen_tcp.socket() | :ssl.sslsocket(),
[String.t()],
integer()
) :: {:ok, any} | {:error, any}
def sync_command(transport, socket, command, timeout) do
with :ok <- transport.send(socket, Redix.Protocol.pack(command)),
do: recv_response(transport, socket, &Redix.Protocol.parse/1, timeout)
end
Expand Down
12 changes: 12 additions & 0 deletions lib/redix/pubsub.ex
Original file line number Diff line number Diff line change
Expand Up @@ -416,4 +416,16 @@ defmodule Redix.PubSub do
when is_binary(patterns) or is_list(patterns) do
:gen_statem.call(conn, {:punsubscribe, List.wrap(patterns), subscriber})
end

@doc """
Gets the redis `CLIENT ID` associated with a connection
## Examples
iex> Redix.PubSub.client_id(conn)
{:ok, 123}
"""
@spec client_id(connection()) :: {:ok, integer()} | {:error, any()}
def client_id(conn) do
:gen_statem.call(conn, :client_id)
end
end
35 changes: 29 additions & 6 deletions lib/redix/pubsub/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ defmodule Redix.PubSub.Connection do
:backoff_current,
:last_disconnect_reason,
:connected_address,
:client_id,
subscriptions: %{},
monitors: %{}
]
Expand All @@ -28,14 +29,14 @@ defmodule Redix.PubSub.Connection do
data = %__MODULE__{opts: opts, transport: transport}

if opts[:sync_connect] do
with {:ok, socket, address} <- Connector.connect(data.opts, _conn_pid = self()),
:ok <- setopts(data, socket, active: :once) do
with {:ok, socket, address, client_id} <- connect(data) do
data = %__MODULE__{
data
| socket: socket,
last_disconnect_reason: nil,
backoff_current: nil,
connected_address: address
connected_address: address,
client_id: client_id
}

{:ok, :connected, data}
Expand Down Expand Up @@ -104,8 +105,7 @@ defmodule Redix.PubSub.Connection do
end

def disconnected(:internal, :connect, data) do
with {:ok, socket, address} <- Connector.connect(data.opts, _conn_pid = self()),
:ok <- setopts(data, socket, active: :once) do
with {:ok, socket, address, client_id} <- connect(data) do
:telemetry.execute([:redix, :connection], %{}, %{
connection: self(),
connection_name: data.opts[:name],
Expand All @@ -118,7 +118,8 @@ defmodule Redix.PubSub.Connection do
| socket: socket,
last_disconnect_reason: nil,
backoff_current: nil,
connected_address: address
connected_address: address,
client_id: client_id
}

{:next_state, :connected, data, {:next_event, :internal, :handle_connection}}
Expand Down Expand Up @@ -193,6 +194,12 @@ defmodule Redix.PubSub.Connection do
{:keep_state, data}
end

def disconnected({:call, from}, :client_id, data) do
:ok = :gen_statem.reply(from, {:error, :disconnected})

{:keep_state, data}
end

def connected(:internal, :handle_connection, data) do
if map_size(data.subscriptions) > 0 do
case resubscribe_after_reconnection(data) do
Expand Down Expand Up @@ -224,6 +231,12 @@ defmodule Redix.PubSub.Connection do
end
end

def connected({:call, from}, :client_id, data) do
:ok = :gen_statem.reply(from, {:ok, data.client_id})

{:keep_state, data}
end

def connected(:info, {transport_closed, socket}, %__MODULE__{socket: socket} = data)
when transport_closed in [:tcp_closed, :ssl_closed] do
disconnect(data, transport_closed, _handle_disconnection? = true)
Expand Down Expand Up @@ -561,6 +574,16 @@ defmodule Redix.PubSub.Connection do
defp key_for_target(:psubscribe, pattern), do: {:pattern, pattern}
defp key_for_target(:punsubscribe, pattern), do: {:pattern, pattern}

defp connect(%__MODULE__{opts: opts, transport: transport} = data) do
timeout = Keyword.fetch!(opts, :timeout)

with {:ok, socket, address} <- Connector.connect(opts, _conn_pid = self()),
{:ok, client_id} <- Connector.sync_command(transport, socket, ["CLIENT", "ID"], timeout),
:ok <- setopts(data, socket, active: :once) do
{:ok, socket, address, client_id}
end
end

defp setopts(data, socket, opts) do
inets_mod(data.transport).setopts(socket, opts)
end
Expand Down
5 changes: 5 additions & 0 deletions test/redix/pubsub_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ defmodule Redix.PubSubTest do
assert info[:fullsweep_after] == fullsweep_after
end

test "client_id should be available after start_link/2" do
{:ok, pid} = PubSub.start_link(port: @port)
assert match?({:ok, client_id} when is_number(client_id), PubSub.client_id(pid))
end

test "subscribe/unsubscribe flow", %{pubsub: pubsub, conn: conn} do
# First, we subscribe.
assert {:ok, ref} = PubSub.subscribe(pubsub, ["foo", "bar"], self())
Expand Down

0 comments on commit 3a54184

Please sign in to comment.