Hands On Review of the Dynamo Paper

November 30, 2009. Filed under erlangdistributed-systems

Recently I had the pleasure of reading the Amazon Dynamo paper, and was trying to jot down some structured notes about it, but was having trouble simply because the paper flits form one topic to another at a rather brisk pace.

I decided that a good way to record the ideas (as well as solidify them in my mind) was to go through the process of writing a distributed key-value store, and then incrementally add the enhancements discussed in the Dynamo paper. By the end of this series we'll have re-implemented most of the interesting ideas from Dynamo in a distributed Erlang system.

Each post will present an idea and then apply it to a sample distributed key-value store that evolves over the course of the series. (Some posts may contain several ideas, with the aim to keep each post long enough to deserve its own URI but short enough to be consumed in one reading.)

Why the Dynamo Paper?

Because of the huge number of topics covered. Some of the topics covered are virtual nodes, consistent hashing, merkle trees, incremental scaling, vector clocks, gossip protocols, seeds and sloppy quorums. Any one of these topics could be a paper on its own, and as a result the 27 page Dynamo paper is forced to skim over many of the details,

However, a series of articles gives more leeway to explore these ideas; building a project adds a bit of a hands on feel, which I find to be the most instructive method of presenting material.

Why Erlang?

Erlang is an excellent fit for designing a distributed system. This isn't particularly an ode to message passing, but much more an homage to the pg2 module and the ability to register and interact with processes running on other hosts as if they were running locally.

I've been enjoying experimenting with Clojure, and have been a long time Python fan, but neither--and very few programming languages in general--can abstract away operating across machines with the casual ease of Erlang.

This is an important message, so forgive me for stating it once more: for distributed systems the key advantages of Erlang are the OTP application platform and the Erlang virtual machine. Pattern matching, message passing and the functional paradigm are all powerful language features, but they are replaceable; the OTP and the Erlang VM are not1.

Prerequisites?

The target audience for this series is the avid programmer, but not the avid Erlang programmer. To read through the series, you'll only need a bit of persistence. To work through the series you'll need to be familiar with the Erlang syntax and have a working copy of erl installed, but specific familiarity with the pg2, etc, isn't necessary.

Rather--beyond discussing the Dynamo implementation--a secondary goal of this series is to introduce as many significant concepts of implementing and deploying Erlang applications as possible, so it is very much intended to be an enlightening read for the Erlang neophyte.

Our Prototype

In this first entry we're going to put together the initial version of our distributed key-value store.

The initial interface we are defining is:

start(N) -> started.
stop() -> stopped.
get(Key) -> {ok, Value}.
get(Key, Timeout) -> {ok, Value}.
set(Key, Value) -> {ok, updated}.
set(Key, Value, Timeout) -> {ok, updated}.

Such that usage looks like:

2> kvs:start(5).
started
3> kvs:get(a).
{ok,undefined}
4> kvs:set(a, 10).
{ok,updated}
5> kvs:get(a).
{ok,10}
6> kvs:get(a).
{ok,10}
7> kvs:stop().
stopped

For the most part these function definitions won't change, but start will need some improvements as we add more configuration options. Now let's work through the initial implementations (which will change quite a bit over the series).

start and stop

We'll be relying heavily on pg2 throughout this series. pg2 is a module for creating global process groups, where processes register themselves and thus are easily discoverable from other processes.

The true value is that registered processes are not just discoverable to other processes on the same node, but to processes running on other nodes as well. Since nodes can be run on different physical machines, pg2 gives us the ability to scale across machines with a minimum of hassle.

When using pg2, the first step is to create a process group via pg2:create/1 (an idempotent operation, such that calling it on an existing group doesn't harm it), and afterward processes are added to it via pg1:join/2. Later, processes may be removed from the group via pg2:leave/2. (The process group itself may be deleted via pg2:delete/1.)

Thus, we can implement start/1 as follows (where the one parameter is an integer representing the number of nodes to create):

%% @doc create N processes in distributed key-value store
%% @spec start(integer()) -> started
start(N) ->
    pg2:create(kvs),
    lists:foreach(fun(_) ->
			  pg2:join(kvs, spawn(kvs, store, [[]]))
		  end, lists:seq(0, N)),
    started.

A few things to note:

  1. We haven't described the kvs:store/1 function yet, so the above code won't yet compile successfully.
  2. spawn/3 is used to create processes using the specified function and parameters, and returns a process identifier.
  3. The second parameter of pg2:join/2 takes a process identifier returned by spawn/3.2
  4. You can run start/1 multiple times, and on multiple nodes, where all created processes would join the same process group. As such, this start function already supports utilizing process across multiple physical hosts.

Even simpler than start/1, here is our implementation of stop/0:

%% @doc stop all pids in KVS process group
%% stop() -> stopped.
stop() ->
    lists:foreach(fun(Pid) ->
			  pg2:leave(kvs, Pid),
			  Pid ! stop
		  end, pg2:get_members(kvs)),
    stopped.

Notice that stop/0 will remove all processes from the kvs process group. That means it will remove processes created on other physical hosts. For the time being that is reasonable, but we'll probably want to create a more nuance approach to stopping as the series progresses.3

The other interesting aspect of stop/0 is that is shows the first example of message passing in this series. 4 The syntax for message passing in Erlang is

ReceiverPid ! TermToSend.

Where ReceiverPid is the process identifier for the receiver, and TermToSend is an arbitrary Erlang term. These are all legal examples of message passing:

Pid ! ok.
Pid ! {ok, [{a, 10}, {b, 20}]}.
Pid ! [a,b,c,d].
Pid ! dict;store(name, <<"Will">>, dict:new()).

Now that we've written the start/1 and stop/0 functions, it's time to write the store/1 function which contains the real key-value store logic.

The Key-Value store/1

The store/1 function is the heart of the implementation. It maintains the key-value store, handles updating and propagating values, and handles retrieving values for clients.

In this first implementation we are making a couple of strategic decisions:

  • We are storing values in a property list, because it is very simple. It goes without saying that a property list is not a suitable datastructure for large numbers of values (average complexity of O(N/2) for both reads and updates, worst case complexity of O(N)). We'll address this later.
  • We are storing all data in every processes. That is, we're doing replication instead of sharding. This is a temporary condition, which we'll improve on throughout the series.
  • Reading always reads from a single process, which would be appropriate if we verified all other processes had successfully updated before reporting that a write succeeded. This'll improve over time as well.

With those caveats in mind, here is our initial implementation of store/1:

%% @doc implementation of distributed key-value store
%% @spec store(proplist()) -> term()
%%       proplist = [{term(), term()}]
store(Data) ->
    receive
	{Sender, get, Key} ->
	    % client interface for retrieving values
	    Sender ! {self(), got, proplists:get_value(Key, Data)},
	    store(Data);
	{Sender, set, Key, Value} ->
	    % client interface for updating values
	    lists:foreach(fun(Pid) ->
				  Pid ! {self(), update, Key, Value}
			  end, pg2:get_members(kvs)),
	    Sender ! {self(), received, {set, Key, Value}},
	    store(Data);
	{Sender, update, Key, Value} ->
	    % sent to all nodes by first receiving node
	    Sender ! {self(), updated, Key, Value},
	    store([{Key, Value} | proplists:delete(Key, Data)]);
	{_Sender, updated, _Key, _Value} ->
	    store(Data);
	stop ->
	    ok
    end.

There are a number of aspects of store/1 worth discussing briefly:

  • This is the first occurrence of receive keyword, which is the second half of using message passing in Erlang (the first half is the ! operator). receive and ! work together as follows:

    % in the Sender process
    Receiver ! {set, a, "b"}.
    % in Receiver process
    receive
        {set, Key, Val} ->
            io:format("Received ~p => ~p~n", [Key, Val]),
    end.
    

    Also note that the self/0 function returns the pid of the process where it is called, so it is frequently used when the sender wants the receiver to respond.

    % in Sender process
    Receiver ! {self(), get, name}.
    
    % in Receiver process
    receive
        {Sender, get, Key} ->
            Value = get_from_somewhere(Key),
            Sender ! {got, Key, Value)
    end.
    
  • pg2:get_members/1 takes a process group name and returns a list of all pids which are currently members. It, along with pg2:get_closest_pid/1 are the two mechanisms for getting a pid from a process group.

  • Note the distinction we're making between set and update: set is the external interface for updating values, and update is the internal interface for propagating values.

    (Also note that we're ignoring the updated message at this point.)

Now that the store/1 implementation is done, it's time to write some convenient wrappers to export from the kvs module, which simplify interacting with the store/1 function.

get and set Interfaces

Although it's possible to directly communicate with the processes, it takes a lot of knowledge to do so. You need to know they are registered in the kvs process group. You need to know the format of the messages they want. You also need to make sure that the interactions are synchronous (by default Erlang message passing is asynchronous, but we'll see below the standard pattern for synchronous message passing).

We can relieve these issues by writing interface functions for performing the two important functions in a key-value store: setting and retrieving.

First let's look at the implementation for get/1 and get/2.

%% @doc retrieve value for key
%% @spec get(term()) -> value() | undefined
%%       value = term()
get(Key) -> get(Key, ?TIMEOUT).

%% @doc retrieve value for key, with timeout
%% @spec get(term(), integer()) -> val() | timeout()
%%       val = {ok, term()} | {ok, undefined}
%%       timeout = {error, timeout}
get(Key, Timeout) ->
    Pid = pg2:get_closest_pid(kvs),
    Pid ! {self(), get, Key},
    receive
	{Pid, got, Value} ->
	    {ok, Value}
    after
	Timeout ->
	    {error, timeout}
    end.

A few new things appear in this example: first, pg2:get_closest_pid/1, which randomly selects a process in the specified process group, with preference for pids running on the same node. This is useful for evenly distributing work across the pids in a process group5.

Second, note the send, use receive to block on response pattern, which is frequently used for synchronous communication between two processes.

The set/2 and set/3 functions are very similar to get/2; following the same pattern, they differ only in the message sent and the expected response.

%% @doc update value for key
%% @spec set(term(), term()) -> {ok, updated} | {error, timeout}
set(Key, Val) -> set(Key, Val, ?TIMEOUT).

%% @doc update value for key, with timeout
%% @spec set(term(), term()) -> {ok, updated} | {error, timeout}
set(Key, Val, Timeout) ->
    Pid = pg2:get_closest_pid(kvs),
    Pid ! {self(), set, Key, Val},
    receive
	{Pid, received, {set, Key, Val}} ->
	    {ok, updated}
    after
	Timeout ->
	    {error, timeout}
    end.

There shouldn't be anything in set/3 which we haven't encountered before, so we can move on to the last task in implementing this simplest form of the distributed key-value store.

kvs Module

Finally, we need to write the headers for the kvs module, which only takes a few lines.

-module(kvs).
-export([get/1, get/2, set/2, set/3, start/1, stop/0, store/1]).
-define(TIMEOUT, 500).

Note that we are defining TIMEOUT, which is the number of milliseconds to wait for a response. You might also want to change 500 to infinity, which means no timeout is used. That said, for now 500 milliseconds is functionally equivalent to infinity, so 500 seems like a reasonable default.6

In Closing

The full code written in this entry is available on GitHub. (Note that I am linking to a specific revision of the kvs.erl file, not the most recent version of the code.)

Now that we've written the infrastructure for this series, soon we'll be able to start examining the ideas in the Dynamo paper and incrementally building something that looks ever so slightly similar.

The next post is available and discusses adding durable writes and consistent reads.


  1. Okay, okay. Anything is replaceable. I guess we could write a distributed key-value store on top of Hadoop if we really wanted to. I'm sure that would be great fun.

  2. In more serious implementations, pg2:join/2 is usually an OTP gen_server adds itself to a process group on creation and removes itself on termination. For this example, dealing with spawned pids leads to much more concise code than dealing with the full gen_servers stack, which is not without its idiosyncrasies.

  3. If you're wondering how this might be easily extended to only stop processes started on the local node, then take a look at the erlang:node/1 function, which returns the node where a particular process is running.

  4. Because of the aforementioned gen_servers, many if not most Erlang systems are written without using the explicit message passing operator (i.e. !). Instead the gen_server:call/2 and gen_server:cast/2 are used for synchronous and asynchronous communication respectively.

    At some point near the end of the series it will be worthwhile to convert the code to a gen_server, but early on I believe it will simply obscure the interesting ideas behind boilerplate.

  5. Note that there is no guarantee of randomness, and it will always assign work to local nodes, so in certain pathological cases it is possible to achieve an extremely lopsided distributions of work using pg2:get_closest_pid/1.

    If you need guaranteed balancing, you'll need to write a wrapper on top of pg2:get_members/1 and either inspect

  6. Generally all -define statements for a module are kept in a .hrl file. Since we only have one definition, for the time being I felt it was simpler to include it in the same file, but in general this isn't a best practice.