Oban Monitoring with Plugins

At work, we are currently implementing a payments processing service in Elixir. Our system relies on background processing to reconcile the status of payments in our system with the status of payments within our bank partner. We chose to use Oban as our background processing framework. Oban seemed to be quite mature and its use of Postgres meant that we didn’t need to add another element to our infrastructure.

Oban provides a great deal of instrumentation out of the box. In particular, it emits events when jobs are enqueued, started, stopped. These events allow us to monitor the age of jobs as they are being executed. In addition to monitoring job age, it is also important to monitor the depth of queues. Oban does not expose this information directly.

Oban makes use of plugins to extend its functionality. Although, Oban does not document how to implmement your own plugins, but based on our inspection of built-in plugins, it seems easy enough to build your own. A plugin is simply a module that begins a process using start_link. This means you can use an Agent, GenServer or any number of other otp behaviors as an Oban plugin.

When Oban starts, it adds these plugins to it’s supervision tree and passes its own configuration as an option when calling start_link. You can pass your own configuration for your plugin in the Oban configuration as shown below:

config :worker, Oban,
  repo: Ecto.Repo,
  plugins: [
    {Worker.Plugin.QueueLength, interval: :timer.seconds(10)}
  ],
  queues: [
    status_queue: 1,
    lifecycle_queue: 1
  ]

The plugin we implemented for monitoring queue depth, is a very simple GenServer. It repeatedly sends a poll message to itself using Process.send_after. When the process polls, it queries the state of the oban_jobs table through the repo defined in the Oban configuration and sends off events through :telemetry.execute.

The plugin itself is quite simple:

defmodule Worker.Plugin.QueueLength do
  use GenServer

  require Ecto.Query

  @spec start_link(Keyword.t()) :: GenServer.on_start()
  def start_link(opts) do
    name = Keyword.get(opts, :name, __MODULE__)

    GenServer.start_link(__MODULE__, opts, name: name)
  end

  @impl GenServer
  def init(opts) do
    repo =
      opts[:conf]
      |> Map.get(:repo)

    queues =
      opts[:conf]
      |> Map.get(:queues)
      |> Keyword.keys()
      |> Enum.map(&Atom.to_string/1)

    state = %{repo: repo, queues: queues, interval: opts[:interval]}
    schedule_poll(state)
    {:ok, state}
  end

  defp schedule_poll(%{interval: interval}) do
    Process.send_after(self(), :poll, interval)
  end

  @impl GenServer
  def handle_info(:poll, %{repo: repo, queues: queues} = state) do
    Enum.each(queues, &emit_for_queue(repo, &1))

    schedule_poll(state)
    {:noreply, state}
  end

  defp emit_for_queue(repo, queue_name) do
    queue_counts =
      Oban.Job
      |> Ecto.Query.where(queue: ^queue_name)
      |> Ecto.Query.group_by([j], j.state)
      |> Ecto.Query.select([j], {j.state, count(j.id)})
      |> repo.all()
      |> Enum.into(%{})

    :telemetry.execute(
      [:oban, :queue_stats],
      queue_counts,
      %{"queue_name" => queue_name})
  end
end

With our plugin running, we now can register :telemetry handlers on these events. Our handler will send the queue depth to our metrics system, where we can configure alerts on them.

We were very pleased to find the Oban plugin system as a way to implement this, but we were disappointed that we could not find any documentation on how to roll our own plugin. I hope that we can open-source our plugin1 and that increased usage of third-party plugins might lead to more documentation of the Oban plugin system.

  1. That being said, the entirety of the plugin is above. Feel free to treat that code snippet as MIT licensed.