Hands On Review of the Dynamo Paper
Recently I had the pleasure of reading the Amazon Dynamo paper, and was trying to jot down some structured notes about it, but was having trouble simply because the paper flits form one topic to another at a rather brisk pace.
I decided that a good way to record the ideas (as well as solidify them in my mind) was to go through the process of writing a distributed key-value store, and then incrementally add the enhancements discussed in the Dynamo paper. By the end of this series we'll have re-implemented most of the interesting ideas from Dynamo in a distributed Erlang system.
Each post will present an idea and then apply it to a sample distributed key-value store that evolves over the course of the series. (Some posts may contain several ideas, with the aim to keep each post long enough to deserve its own URI but short enough to be consumed in one reading.)
Why the Dynamo Paper?
Because of the huge number of topics covered. Some of the topics covered are virtual nodes, consistent hashing, merkle trees, incremental scaling, vector clocks, gossip protocols, seeds and sloppy quorums. Any one of these topics could be a paper on its own, and as a result the 27 page Dynamo paper is forced to skim over many of the details,
However, a series of articles gives more leeway to explore these ideas; building a project adds a bit of a hands on feel, which I find to be the most instructive method of presenting material.
Why Erlang?
Erlang is an excellent fit for designing a distributed system. This isn't particularly an ode to message passing, but much more an homage to the pg2 module and the ability to register and interact with processes running on other hosts as if they were running locally.
I've been enjoying experimenting with Clojure, and have been a long time Python fan, but neither--and very few programming languages in general--can abstract away operating across machines with the casual ease of Erlang.
This is an important message, so forgive me for stating it once more: for distributed systems the key advantages of Erlang are the OTP application platform and the Erlang virtual machine. Pattern matching, message passing and the functional paradigm are all powerful language features, but they are replaceable; the OTP and the Erlang VM are not1.
Prerequisites?
The target audience for this series is the avid programmer, but not the
avid Erlang programmer. To read through the series, you'll only need a bit of
persistence. To work through the series you'll need to be familiar with the
Erlang syntax and have a working copy of erl
installed,
but specific familiarity with the pg2
, etc, isn't necessary.
Rather--beyond discussing the Dynamo implementation--a secondary goal of this series is to introduce as many significant concepts of implementing and deploying Erlang applications as possible, so it is very much intended to be an enlightening read for the Erlang neophyte.
Our Prototype
In this first entry we're going to put together the initial version of our distributed key-value store.
The initial interface we are defining is:
start(N) -> started.
stop() -> stopped.
get(Key) -> {ok, Value}.
get(Key, Timeout) -> {ok, Value}.
set(Key, Value) -> {ok, updated}.
set(Key, Value, Timeout) -> {ok, updated}.
Such that usage looks like:
2> kvs:start(5).
started
3> kvs:get(a).
{ok,undefined}
4> kvs:set(a, 10).
{ok,updated}
5> kvs:get(a).
{ok,10}
6> kvs:get(a).
{ok,10}
7> kvs:stop().
stopped
For the most part these function definitions won't change, but start
will need
some improvements as we add more configuration options. Now let's work through
the initial implementations (which will change quite a bit over the series).
start
and stop
We'll be relying heavily on pg2 throughout this series. pg2
is a module for
creating global process groups, where processes register themselves and thus are
easily discoverable from other processes.
The true value is that registered processes are not just discoverable to other processes on the same node,
but to processes running on other nodes as well. Since nodes can be run on different physical machines,
pg2
gives us the ability to scale across machines with a minimum of hassle.
When using pg2
, the first step is to create a process group via pg2:create/1
(an idempotent operation, such that
calling it on an existing group doesn't harm it), and afterward processes are added to it via pg1:join/2
.
Later, processes may be removed from the group via pg2:leave/2
. (The process group itself may be deleted
via pg2:delete/1
.)
Thus, we can implement start/1
as follows (where the one parameter is an integer
representing the number of nodes to create):
%% @doc create N processes in distributed key-value store
%% @spec start(integer()) -> started
start(N) ->
pg2:create(kvs),
lists:foreach(fun(_) ->
pg2:join(kvs, spawn(kvs, store, [[]]))
end, lists:seq(0, N)),
started.
A few things to note:
- We haven't described the
kvs:store/1
function yet, so the above code won't yet compile successfully. spawn/3
is used to create processes using the specified function and parameters, and returns a process identifier.- The second parameter of
pg2:join/2
takes a process identifier returned byspawn/3
.2 - You can run
start/1
multiple times, and on multiple nodes, where all created processes would join the same process group. As such, this start function already supports utilizing process across multiple physical hosts.
Even simpler than start/1
, here is our implementation of stop/0
:
%% @doc stop all pids in KVS process group
%% stop() -> stopped.
stop() ->
lists:foreach(fun(Pid) ->
pg2:leave(kvs, Pid),
Pid ! stop
end, pg2:get_members(kvs)),
stopped.
Notice that stop/0
will remove all processes from the kvs
process group.
That means it will remove processes created on other physical hosts. For the time being
that is reasonable, but we'll probably want to create a more nuance approach to stopping
as the series progresses.3
The other interesting aspect of stop/0
is that is shows the first example of message
passing in this series. 4 The syntax for message passing in Erlang is
ReceiverPid ! TermToSend.
Where ReceiverPid
is the process identifier for the receiver, and TermToSend
is an arbitrary Erlang term. These are all legal examples of message passing:
Pid ! ok.
Pid ! {ok, [{a, 10}, {b, 20}]}.
Pid ! [a,b,c,d].
Pid ! dict;store(name, <<"Will">>, dict:new()).
Now that we've written the start/1
and stop/0
functions,
it's time to write the store/1
function which contains the real
key-value store logic.
The Key-Value store/1
The store/1
function is the heart of the implementation. It maintains the
key-value store, handles updating and propagating values, and handles retrieving
values for clients.
In this first implementation we are making a couple of strategic decisions:
- We are storing values in a property list, because it is very simple. It goes without saying
that a property list is not a suitable datastructure for large numbers of values (average complexity of
O(N/2)
for both reads and updates, worst case complexity ofO(N)
). We'll address this later. - We are storing all data in every processes. That is, we're doing replication instead of sharding. This is a temporary condition, which we'll improve on throughout the series.
- Reading always reads from a single process, which would be appropriate if we verified all other processes had successfully updated before reporting that a write succeeded. This'll improve over time as well.
With those caveats in mind, here is our initial implementation of store/1
:
%% @doc implementation of distributed key-value store
%% @spec store(proplist()) -> term()
%% proplist = [{term(), term()}]
store(Data) ->
receive
{Sender, get, Key} ->
% client interface for retrieving values
Sender ! {self(), got, proplists:get_value(Key, Data)},
store(Data);
{Sender, set, Key, Value} ->
% client interface for updating values
lists:foreach(fun(Pid) ->
Pid ! {self(), update, Key, Value}
end, pg2:get_members(kvs)),
Sender ! {self(), received, {set, Key, Value}},
store(Data);
{Sender, update, Key, Value} ->
% sent to all nodes by first receiving node
Sender ! {self(), updated, Key, Value},
store([{Key, Value} | proplists:delete(Key, Data)]);
{_Sender, updated, _Key, _Value} ->
store(Data);
stop ->
ok
end.
There are a number of aspects of store/1
worth discussing briefly:
This is the first occurrence of
receive
keyword, which is the second half of using message passing in Erlang (the first half is the!
operator).receive
and!
work together as follows:% in the Sender process Receiver ! {set, a, "b"}. % in Receiver process receive {set, Key, Val} -> io:format("Received ~p => ~p~n", [Key, Val]), end.
Also note that the
self/0
function returns the pid of the process where it is called, so it is frequently used when the sender wants the receiver to respond.% in Sender process Receiver ! {self(), get, name}.
% in Receiver process receive {Sender, get, Key} -> Value = get_from_somewhere(Key), Sender ! {got, Key, Value) end.
pg2:get_members/1
takes a process group name and returns a list of all pids which are currently members. It, along withpg2:get_closest_pid/1
are the two mechanisms for getting a pid from a process group.Note the distinction we're making between
set
andupdate
:set
is the external interface for updating values, andupdate
is the internal interface for propagating values.(Also note that we're ignoring the
updated
message at this point.)
Now that the store/1
implementation is done,
it's time to write some convenient wrappers to
export from the kvs
module, which simplify
interacting with the store/1
function.
get
and set
Interfaces
Although it's possible to directly communicate with the processes,
it takes a lot of knowledge to do so. You need to know they are registered
in the kvs
process group. You need to know the format of the
messages they want. You also need to make sure that the interactions
are synchronous (by default Erlang message passing is asynchronous,
but we'll see below the standard pattern for synchronous message passing).
We can relieve these issues by writing interface functions for performing the two important functions in a key-value store: setting and retrieving.
First let's look at the implementation for get/1
and get/2
.
%% @doc retrieve value for key
%% @spec get(term()) -> value() | undefined
%% value = term()
get(Key) -> get(Key, ?TIMEOUT).
%% @doc retrieve value for key, with timeout
%% @spec get(term(), integer()) -> val() | timeout()
%% val = {ok, term()} | {ok, undefined}
%% timeout = {error, timeout}
get(Key, Timeout) ->
Pid = pg2:get_closest_pid(kvs),
Pid ! {self(), get, Key},
receive
{Pid, got, Value} ->
{ok, Value}
after
Timeout ->
{error, timeout}
end.
A few new things appear in this example: first, pg2:get_closest_pid/1
, which
randomly selects a process in the specified process group, with preference for pids
running on the same node. This is useful for evenly distributing work across the pids
in a process group5.
Second, note the send, use receive to block on response pattern, which is frequently used for synchronous communication between two processes.
The set/2
and set/3
functions are very similar to get/2
;
following the same pattern, they differ only in the message sent
and the expected response.
%% @doc update value for key
%% @spec set(term(), term()) -> {ok, updated} | {error, timeout}
set(Key, Val) -> set(Key, Val, ?TIMEOUT).
%% @doc update value for key, with timeout
%% @spec set(term(), term()) -> {ok, updated} | {error, timeout}
set(Key, Val, Timeout) ->
Pid = pg2:get_closest_pid(kvs),
Pid ! {self(), set, Key, Val},
receive
{Pid, received, {set, Key, Val}} ->
{ok, updated}
after
Timeout ->
{error, timeout}
end.
There shouldn't be anything in set/3
which we haven't encountered before,
so we can move on to the last task in implementing this simplest form of
the distributed key-value store.
kvs
Module
Finally, we need to write the headers for the kvs
module, which only
takes a few lines.
-module(kvs).
-export([get/1, get/2, set/2, set/3, start/1, stop/0, store/1]).
-define(TIMEOUT, 500).
Note that we are defining TIMEOUT
, which is the number of milliseconds to
wait for a response. You might also want to change 500
to infinity
, which
means no timeout is used. That said, for now 500 milliseconds is functionally equivalent
to infinity, so 500 seems like a reasonable default.6
In Closing
The full code written in this entry is available on GitHub. (Note that I am linking to a specific revision of the kvs.erl
file,
not the most recent version of the code.)
Now that we've written the infrastructure for this series, soon we'll be able to start examining the ideas in the Dynamo paper and incrementally building something that looks ever so slightly similar.
The next post is available and discusses adding durable writes and consistent reads.
Okay, okay. Anything is replaceable. I guess we could write a distributed key-value store on top of Hadoop if we really wanted to. I'm sure that would be great fun.↩
In more serious implementations,
pg2:join/2
is usually an OTP gen_server adds itself to a process group on creation and removes itself on termination. For this example, dealing with spawned pids leads to much more concise code than dealing with the full gen_servers stack, which is not without its idiosyncrasies. ↩If you're wondering how this might be easily extended to only stop processes started on the local node, then take a look at the
erlang:node/1
function, which returns the node where a particular process is running.↩Because of the aforementioned gen_servers, many if not most Erlang systems are written without using the explicit message passing operator (i.e.
!
). Instead thegen_server:call/2
andgen_server:cast/2
are used for synchronous and asynchronous communication respectively.At some point near the end of the series it will be worthwhile to convert the code to a gen_server, but early on I believe it will simply obscure the interesting ideas behind boilerplate.↩
Note that there is no guarantee of randomness, and it will always assign work to local nodes, so in certain pathological cases it is possible to achieve an extremely lopsided distributions of work using
pg2:get_closest_pid/1
.If you need guaranteed balancing, you'll need to write a wrapper on top of
pg2:get_members/1
and either inspect↩Generally all
-define
statements for a module are kept in a.hrl
file. Since we only have one definition, for the time being I felt it was simpler to include it in the same file, but in general this isn't a best practice.↩