Kilian Cirera Sant

Using Phoenix PubSub without Phoenix

April 07, 2019

While working with Elixir it’s easy to stumble upon a situation where we want to enable some process to subscribe to updates from another process. For example, we might want a Phoenix channel to receive updates from a specific GenServer representing the state of a chat room.

One could build a one-of solution pretty easiy for a single chat room. The chat room could have a list of process ids in its state and expose a subscribe/0 function, using self/0 to obtain the id of the caller process:

def Chat.ChatRoom do
    use GenServer

    def subscribe do
        GenServer.cast(__MODULE__, {:subscribe, self()})
    end

    def handle_cast({:subscribe, pid}, state) do
        {:noreply, %{state | subscribers: [pid | state.subscribers]}
    end

    # ...

    def broadcast(message) do
       state.subscribers |> Enum.each(&send(&1, message))
    end
end

To subscribe, a process can simply Chat.ChatRoom.subscribe().

Note in a real application we would have to provide means for unsubscribing as well.

While the above works, it gets complicated if we decide we’ll have multiple chat rooms, because now we’ll require a separate process to manage subscriptions for each room, or, outside of the chat room example, if multiple processes are supposed to emit information about the same subject.

Thankfully there is a drop-in but flexible solution: Phoenix PubSub. It comes with Phoenix, where it’s used for Channels, but it can be used independently.

Phoenix PubSub can work with different backends and includes an adapter for PG2 which uses direct message passing between processes, and one to let Redis handle the exchange of data. It’s also possible to build your own adapter.

With Phoenix.PubSub the client can call Phoenix.PubSub.subscribe(ChatRoom.PubSub, room_name) and the server can broadcast with Phoenix.PubSub.broadcast!(ChatRoom.PubSub, room_name, message).

The setup is very simple:

  1. In your mix.exs file, add {:phoenix_pubsub, "~> 1.1.2"} to the deps:

    defp deps do
      [
        {:phoenix_pubsub, "~> 1.1.2"}
      ]
    end
  2. In the same file, add :phoenix_pubsub to the extra_applications:

    def application do
      [
        extra_applications: [:logger, :phoenix_pubsub],
        mod: {Some.Application, []}
      ]
    end
  3. Add Phoenix.PubSub to your supervisor:

    children = [
      {Phoenix.PubSub.PG2, name: Some.PubSub},
      {Some.Worker, {}}}
    ]
  4. Create an empty Some.PubSub module:

    defmodule Some.PubSub do
    end

Going back to the chat example, the chat room channels would subscribe to a GenServer. When users join or send messages, the channels would be responsible for sending that information to the GensSever, either by also broadcasting using Phoenix.PubSub.broadcast!(ChatRoom.PubSub, room_name, message) or by using a process registry, which I’ll cover in a future post.


Kilian Cirera Sant

I'm a Software Engineer based in Seattle.
You can also follow me on Twitter, LinkedIn and GitHub.