home

Learning Elixir's Supervisors

Sep 16, 2017

One of the things I struggled with when building our first elixir application was getting our Rabbit code to be properly supervised and resilient to failure. The pattern we generally follow is 1 connection and N workers, where each worker has a channel derived from the connection. The workers, which can be publishers (we normally only have 1) or consumers (we normally ave many) are dependent on the connection but independent of each other.

My first iteration worked, but wasn't the most elegant. I've since had a chance to revisit the code and thought it would make for good writing (and perhaps good reading).

Given the dependencies described above, the first thing we can do is build a supervisor tree:

defmodule App.Queue.Supervisor do
  use Supervisor

  def start_link() do
    Supervisor.start_link(__MODULE__, [], name: __MODULE__)
  end

  def init(_) do
    children = [
      worker(App.Queue.Connection, []),
      supervisor(App.Queue.WorkerSupervisor, [])
    ]

    supervise(children, strategy: :rest_for_one)
  end
end

defmodule App.Queue.WorkerSupervisor do
  use Supervisor

  def start_link() do
    Supervisor.start_link(__MODULE__, [], name: __MODULE__)
  end

  def init(_) do
    children = [
      worker(App.Queue.Publisher, []),
      worker(App.Queue.Consumers.Spice, []),
      worker(App.Queue.Consumers.Saiyans, []),
    ]
    supervise(children, strategy: :one_for_one)
  end
end

These are split up so that the correct strategy can be applied to each group. Starting from the bottom, if one worker is forced to restart, there's no reason to also restart the other workers. Thus, we pick the one_for_one strategy. The rest_for_one strategy that we use in our first Supervisor means that if one process fails and is restarted, all processes defined after it are also restarted. For this reason, we put the connection first. If the App.Queue.Connection process is restarted, then the App.Queue.WorkerSupervisor will also be restarted. But the reverse isn't true.

If we were to put all of the above in a single supervisor and kept the rest_for_one strategy, it would mean that the Saiyans consumer would be restarted whenever the Spice consumer was. This isn't what we want.

The above is a start, but now we need to dig into our actual processes. The first one that we'll look at, and also the most critical, is the App.Queue.Connection process. We'll be using this AMQP library. Our following examples will expose you to the parts of its API that you need to know with respect to this post.

A naïve implementation of our connection worker could look like:

defmodule App.Queue.Connection do
  use GenServer

  def start_link() do
    GenServer.start_link(__MODULE__, [], name: __MODULE__)
  end

  def init(_) do
    # returns {:ok, conn} on success and {:error, err} on failure
    AMQP.Connection.open("amqp://....")
  end
end

The issue with this code is that if the connection fails, nothing happens until we try to use it. The supervisor is monitoring our process, not the connection. We could change the code to do:

def init(_) do
  {:ok, conn} = AMQP.Connection.open("amqp://....")
  Process.link(conn.pid)
  {:ok, conn}
end

By linking our Connection process to the underlying connection, we ensure that when the underlying connection dies, our connection process will also die. At first glance, this might seem like the right approach. Our process will die, the supervisor will restart it and it'll restart each worker which can use the new connection to get new channels. However, it probably isn't what you want.

By default, if a process terminates 3 times in 5 seconds, the parent process, in our case App.Queue.Supervisor, terminates. This cascades up, so that the parent of App.Queue.Supervisor will try to restart that supervisor and itself terminates if we hit the 3 in 5 threshold. This goes all the way up to the application.

It's common for such connection failures, whether they're to a queue, database or webservice, to happen quickly. Failing 3 times in 5 seconds? We're likely to fail 1000 times in a second; taking down our entire app. Sadly, there's no backoff functionality in supervisors. In some cases, maybe you really do want to crash the entire app, but I think that for many apps, retrying indefinitely is a more desirable behaviour.

One quick solution could be to specify very high max_restarts and very low max_seconds parameters along with the strategy. But I'd like to explore something a little more elegant.

Rather than linking to the underlying connection process, we can monitor/1 it. Rather than bubbling the termination we'll receive a message. Consider:

def init(_) do
  {:ok, connect()}
end

defp connect() do
  case AMQP.Connection.open() do
    {:ok, conn} ->
      Process.monitor(conn.pid)
      conn  # this will be our state
    {:error, err} ->
      Logger.error("failed to connect: #{inspect err}")
      :timer.sleep(1000)
      connect() # keep trying
  end
end

# we get this message because of the call to monitor we did
def handle_info({:DOWN, _, :process, _pid, reason}, _) do
  Logger.error("queue connection is down: #{inspect reason}")
  {:noreply, connect()}
end

Rather than terminating, we handle the :DOWN notification and try to reconnect, waiting 1 second between attempts. Essentially, we're taking over the opinionated Supervisor's restart strategy; trying infinitely and applying whatever backoff algorithm we want.

We could apply the same strategy to our workers, having each monitor their channel's process. Alternative, I want to show why, in this case, it's OK to link rather than monitor:

defmodule App.Queue.Publisher do
  use GenServer

  def start_link() do
    GenServer.start_link(__MODULE__, [], name: __MODULE__)
  end

  def init(_) do
    {:ok, channel} = App.Queue.Connection.open_channel()
    Process.link(channel.pid)
    {:ok, channel}
  end
end

We'll look at open_channel briefly, but as you read it, I want you to try to figure out why the above is safe. If Rabbit goes down, why won't this fail 3 times in 5 seconds?

defmodule App.Queue.Connection do
  ...

  def open_channel() do
    GenServer.call(__MODULE__, :open_channel)
  end

  def handle_call(:open_channel, _from, conn) do
    {:reply, AMQP.Channel.open(conn), conn}
  end
  
  ...
end

Do you see it? It's a little tricky to follow, so let's walk through it. Let's assume the app is running when suddenly our connection to Rabbit dies. This causes our channel to terminate (in Rabbit, a channel dies when its connection dies) thus causing our Publisher to terminate (since it was linked to the channel process). The WorkerSupervisor detects this and restarts the Publisher. As part of its startup process, in its init/1 function, the process asks the Connection for a channel. This is a blocking call and one of two things will happen. Either a channel will be returned, or the call will timeout (the default 5 seconds, since we didn't specify one when we called GenServer.call).

Why will it timeout? Because so long as our link to Rabbit is down, the Connection process is going to be in its connect loop. If the connection isn't established, pending calls will timeout. If the connection is re-established, a channel is returned. With a timeout of 5 seconds, our workers will only fail once per 5 seconds. Thus avoiding the supervisor's limits.

Hopefully this not only provides some practical code for keeping Rabbit connections and channels alive, but also illustrates how supervisors, process linking and process monitoring works and can be leveraged.