A GenServer can block senders without blocking itself
I was discussing a data processing/aggregation problem with Ben Olive recently, and he told me about a GenServer feature I didn’t know about. Returning a {:noreply, new_state}
to a handle_call/3
. It is right there in the GenServer documentation for handle_call/3
The problem
You have many Elixir processes sending messages, and you want to combine one message from each of many processes into a batch; only after the batch was successfully processed do you want the individual processes to continue their work.
Where I was stuck
I was stuck thinking about only the basics of GenServer.cast
and GenServer.call
. If I let the processes do GenServer.cast
, which will hand control back to them immediately, they will keep sending messages without backpressure. If I make the individual processes use GenServer.call
, the GenServer itself is doing a blocking operation and can not process messages from other senders to batch them up.
What I didn’t know
In the OTP primitives, there is no synchronous blocking communication between processes. One process sends a message to another process. It is async. That is all there is. The GenServer.call
appears synchronous because the sender sends a unique identifier and blocks itself to get a reply with that unique identifier.
The solution
The receiver can choose to continue doing other work and send the reply later. This will keep the sender blocked and waiting. Still, the receiver can process other messages and reply when appropriate or even ask another process to send the reply.
Example Code
mix new blocky --sup
lib/blocky/producer.ex
defmodule Blocky.Producer do
use GenServer
def start_link(id) do
GenServer.start_link(__MODULE__, %{id: id, number: 1})
end
def init(state) do
send(self(), :produce)
{:ok, state}
end
def handle_info(:produce, %{id: id, number: number} = state) do
send(self(), :produce)
# IO.inspect(id, label: "procucing 🛢")
Blocky.Consumer.push(number)
{:noreply, %{state | number: number + 1}}
end
end
lib/blocky/consumer.ex
defmodule Blocky.Consumer do
@wait_to_aggregate 1000
use GenServer
def push(number) do
GenServer.call(__MODULE__, {:push_number, number})
end
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(_opts) do
send(self(), :aggregate)
{:ok, %{senders: [], numbers: []}}
end
def handle_call({:push_number, number}, from, state) do
new_state =
state
|> Map.update!(:numbers, fn numbers -> [number | numbers] end)
|> Map.update!(:senders, fn senders -> [from | senders] end)
# blocks the sender
{:noreply, new_state}
# releases the sender
# {:reply, :ok, new_state}
end
def handle_info(:aggregate, %{senders: senders, numbers: numbers}) do
Process.send_after(self(), :aggregate, @wait_to_aggregate)
IO.inspect("Adding #{Enum.count(numbers)} numbers ✅")
IO.inspect("Total is #{Enum.sum(numbers)} 🧮")
reply_to_senders(senders)
{:noreply, %{senders: [], numbers: []}}
end
defp reply_to_senders(senders) do
for sender <- senders do
GenServer.reply(sender, :ok)
end
end
end
lib/blocky/application.ex
defmodule Blocky.Application do
use Application
@number_of_producers 10
@impl true
def start(_type, _args) do
producers =
for id <- 1..@number_of_producers do
%{
id: id,
start: {Blocky.Producer, :start_link, [id]}
}
end
children = [Blocky.Consumer | producers]
opts = [strategy: :one_for_one, name: Blocky.Supervisor]
Supervisor.start_link(children, opts)
{:ok, self()}
end
end
Run it
mix run --no-halt
Notice that the consumer has time to handle messages from all 10 consumers and add them up before they are allowed to send more.
"Adding 0 numbers ✅"
"Total is 0 🧮"
"Adding 10 numbers ✅"
"Total is 10 🧮"
"Adding 10 numbers ✅"
"Total is 20 🧮"
"Adding 10 numbers ✅"
"Total is 30 🧮"
If we comment out the :noreply
and choose to send a reply rigt away, like this:
lib/blocky/consumer.ex
# blocks the sender
# {:noreply, new_state}
# releases the sender
{:reply, :ok, new_state}
Now the producers will bury us in messages, not what we wanted:
"Adding 0 numbers ✅"
"Total is 0 🧮"
"Adding 584734 numbers ✅"
"Total is 17151906968 🧮"
"Adding 451791 numbers ✅"
"Total is 36628491013 🧮"
"Adding 539047 numbers ✅"
"Total is 70417648445 🧮"