Load Balancing Across Erlang Process Groups

September 12, 2009. Filed under erlang 20

In the application we're working on at work we walk data through a series of steps, transforming it in each step. This is an Erlang system, and each of those steps has a process group which is populated with one or more processes (where each process is a gen_server). Any member of a given process group can serve any file, so abstractly the logic is along the lines of:

Data = <<"data">>,
Steps = [stage_1_group, stage_2_group, stage_3_group],
lists:foldr(fun(Group, Data) ->
    Pid = pg2:get_closest_pid(Group),
    gen_server:call(Pid, {process, Data})
  end, Data, Steps).

This makes a very flexible pipeline for processing data, but as we started moving from implementing functionality to doing load tests we started to notice situations where distribution of work across a process group was uneven. With high enough load, one pid in a group might acquire a queue of twenty jobs while another had an empty queue.

The culprit here is pg2:get_closest_pid/1 which has a simple algorithm for picking pids:

  1. Randomly choose a local process is one exists,
  2. Otherwise randomly choose a remote process.

We had an improptu discussion on how to resolve this, and we came up with a (to my mind) fairly novel solution. All our gen_server calls are synchronous, which means that the process' message queue is equivalent to the number of unprocessed jobs assigned to it.

Thus, we can simply iterate through the process group and choose the process with the shortest queue. Instead of get_closest_pid/1, we can write get_best_pid/1. (Sorry for the awful name. My other choice is get_unencumbered_pid/1, but that may be a bit long.)

get_best_pid(Group) ->
  Members = pg2:get_members(Group),
  Members1 = lists:map(fun(Pid) ->
      [{message_queue_len, Messages}] = erlang:process_info(Pid, [message_queue_len]),
      {Pid, Messages}
    end, Members),
  case lists:keysort(2, Members1) of
    [{Pid, _} | _] -> Pid;
    [] -> {error, empty_process_group}

The previous example can then be rewritten by substituting pg2:get_closest_pid/1 with get_best_pid/1 and now performs load balancing across all the process groups. This is a potentially hairy problem, but Erlang makes the solution quite clean.

As an added bonus, a much respected coworker pointed out that you could use the stack_size value from erlang:get_process_info as a tie-breaker between groups with the same number of queued jobs. The reasoning there is that a process which is currently processing data still wouldn't necessarily have additional messages queued, and thus it isn't possible to distinguish between an idling process and a processing one.