Oban Monitoring
Published on Sep 12, 2020 by dix.
At work, we are currently implementing a payments processing service in Elixir. Our system relies on background processing to reconcile the status of payments in our system with the status of payments within our bank partner. We chose to use Oban as our background processing framework. Oban seemed to be quite mature and its use of Postgres meant that we didn’t need to add another element to our infrastructure.
Oban
provides a great deal of instrumentation out of the box. In particular,
it emits events when jobs are enqueued, started, stopped. These events allow us
to monitor the age of jobs as they are being executed. In addition to monitoring
job age, it is also important to monitor the depth of queues. Oban
does not
expose this information directly.
Oban
makes use of plugins to extend its functionality. Although, Oban
does
not document how to implmement your own plugins, but based on our inspection of
built-in plugins, it seems easy enough to build your own. A plugin is simply a
module that begins a process using start_link
. This means you can use an
Agent
, GenServer
or any number of other otp
behaviors as an Oban
plugin.
When Oban
starts, it adds these plugins to it’s supervision tree and passes
its own configuration as an option when calling start_link
. You can pass your
own configuration for your plugin in the Oban
configuration as shown below:
config :worker, Oban, repo: Ecto.Repo, plugins: [ {Worker.Plugin.QueueLength, interval: :timer.seconds(10)} ], queues: [ status_queue: 1, lifecycle_queue: 1 ]
The plugin we implemented for monitoring queue depth, is a very simple
GenServer
. It repeatedly sends a poll message to itself using
Process.send_after
. When the process polls, it queries the state of the
oban_jobs
table through the repo defined in the Oban configuration and sends
off events through :telemetry.execute
.
The plugin itself is quite simple:
defmodule Worker.Plugin.QueueLength do use GenServer require Ecto.Query @spec start_link(Keyword.t()) :: GenServer.on_start() def start_link(opts) do name = Keyword.get(opts, :name, __MODULE__) GenServer.start_link(__MODULE__, opts, name: name) end @impl GenServer def init(opts) do repo = opts[:conf] |> Map.get(:repo) queues = opts[:conf] |> Map.get(:queues) |> Keyword.keys() |> Enum.map(&Atom.to_string/1) state = %{repo: repo, queues: queues, interval: opts[:interval]} schedule_poll(state) {:ok, state} end defp schedule_poll(%{interval: interval}) do Process.send_after(self(), :poll, interval) end @impl GenServer def handle_info(:poll, %{repo: repo, queues: queues} = state) do Enum.each(queues, &emit_for_queue(repo, &1)) schedule_poll(state) {:noreply, state} end defp emit_for_queue(repo, queue_name) do queue_counts = Oban.Job |> Ecto.Query.where(queue: ^queue_name) |> Ecto.Query.group_by([j], j.state) |> Ecto.Query.select([j], {j.state, count(j.id)}) |> repo.all() |> Enum.into(%{}) :telemetry.execute( [:oban, :queue_stats], queue_counts, %{"queue_name" => queue_name}) end end
With our plugin running, we now can register :telemetry
handlers on these
events. Our handler will send the queue depth to our metrics system, where we
can configure alerts on them.
We were very pleased to find the Oban
plugin system as a way to implement
this, but we were disappointed that we could not find any documentation on how
to roll our own plugin. I hope that we can open-source our plugin1 and that
increased usage of third-party plugins might lead to more documentation of the
Oban
plugin system.
Footnotes:
That being said, the entirety of the plugin is above. Feel free to treat that code snippet as MIT licensed.