Durable Writes & Consistent Reads

December 1, 2009. Filed under erlangdistributed-systems

After starting out with an extremely bare-bones implementation of a key-value store, it's time to make our first two incremental improvements in our quest towards Dynamo: implementing durable writes and consistent reads.

W, R and N

In our initial implementation, we immediately notified the client that an update was successful, and did not verify that the updates had actually been written in the remote nodes.

lists:foreach(fun(Pid) ->
        Pid ! {self(), update, Key, Value}
    end, pg2:get_members(kvs)),
Sender ! {self(), received, {set, Key, Value}},

While simple to implement, this approach is unreliable because it assumes the writes are successful. It could be that the other processes are down1, but we would still dutifully inform the client that their change had been successfully recorded.

To resolve this weakness, we'll make reading and writing a one-phase commit (rather than the zero phase commit we have now, aka firing blindly)2. After receiving a write request from a client, the receiver will wait for the writing nodes to acknowledge back success before responding to the client.

However, once we decide to perform a more sophisticated commit protocol, a few questions come up:

  • Knowing that we need to wait before informing the client of a successful write, how many nodes should we wait for?
  • In the same vein, when we are reading values, how many nodes should we query before returning the value to the client?

Rather than providing fixed answers, Dynamo makes these two values configurable (along with a third value, the number of nodes data is replicated to), and by doing so makes each Dynamo cluster highly customizable to the needs of its consumer.

These configurable values are:

  • N is the number of nodes where each piece of data is replicated (for our current implementation N is equal to the number of nodes in the system).
  • W is the number of nodes where each piece of data is written before notifying the user of success (our implementation is W := N, sort of 3).
  • R is the number of nodes which are consulted for their value of a key (currently in our implementation R := 1).

Combining these values in different ways creates data-stores with extremely different properties. A system with W := N, R:= 1 will write to every single node before notifying a user of success, and is a low-availability write system as a single downed node will prevent further writes. However, by guaranteeing updates are written to every node, it is possible to get the latest value by reading from only a single node, making it the system highly available for reads.

On the opposite end of the spectrum, a W := 1, R := N system would be highly available for writes, but with low availability for reads. If it's important to never reject a write, such inefficient reads might be justifiable.

For most users, the values of W and R will be somewhere between 1 and N. According to the Dynamo paper, Amazon finds N := 3, W := 2, R := 2 to be a reasonable balance between performance and reliability for most applications.

Now let's take a look at extending our key-value store to respect configurable values of W and R (we'll have to take on N later).

Records, Configuration & kvs.hrl

In the previous post I mentioned that -defines are usually kept in separate files with the .hrl extension, and now it's time for us to start using one. In this case we'll be calling it kvs.hrl.

First, we'll be adding constant values for W and R (as well as moving over the TIMEOUT value),

-define(KVS_WRITES, 3).
-define(KVS_READS, 3).
-define(TIMEOUT, 500).

but more importantly we'll be creating an Erlang record to manage the increasing amounts of state maintained by kvs:store/1.

Erlang records are nothing more than tuples with some syntactic sugar to make it possible for a human to access the contained values without remembering the elements' ordering. For most purposes, the syntactic sugar really is the biggest gain from using records, but there is a performance advantage versus using a property list4.

Our kvs_store record will start out looking like this:

-record(kvs_store, {data, pending_reads, pending_writes}).

We'll use data to store our key-value mappings, pending_reads to store reads which are still waiting for additional nodes' values before returning to user, and pending_writes for writes which are waiting for confirmation from additional nodes before notifying user of a successful write.

% a brief interlude for record syntax
Store = #kvs_store{data=[], pending_reads=[], pending_writes=[]},
#kvs_store{data=D, pending_reads=R, pending_writes=W} = Store,
Writes = Store#kvsstore.pending_writes,

get_value(Key, S=#kvs_store{data=Data}) ->
    proplists:get_value(Key, Data).

set_value(Key, Value, S=#kvs_store{data=Data}) ->
    S#{data=[{Key, Value} | proplists:delete(Key, Data)]}.

We'll need to add an -include at the top of kvs.erl to take advantage of the record definition.

-include("kvs.hrl").

And we'll also need to update the kvs:start/1 function to use the record:

%% @doc create N nodes in distributed key-value store
%% @spec start(integer()) -> started
start(N) ->
    pg2:create(kvs),
    lists:foreach(fun(_) ->
			  Store = #kvs_store{data=[],
					     pending_reads=[],
					     pending_writes=[]},
			  pg2:join(kvs, spawn(kvs, store, [Store]))
		  end, lists:seq(0, N)),
    started.

With that done, now we can move on to implementing durable writes.

Durable Writes

The first step in our implementation of durable writes is to update the interface of the store/1 function (this change is necessary for the consistent read changes we make below as well). The function's arity will remain unchanged, but we'll be passing around our new kvs_store record instead of the old property list.

% old definition
store(Data) -> term().
% new definition
store(Store = #kvs_store{data=Data,
                         pending_reads=Reads,
                         pending_writes=Writes}) -> term().

The skeptical reader may wonder why we went through the bother of defining a record when we could have simply changed the function definition to

store(Data, Reads, Writes) -> term().

and been done with it.

A reason to prefer the record is that you only pull out the necessary data, and don't have to juggle unused values. This becomes particularly meaningful as a record grows from an arity of one to an arity of three, from an arity of three to an arity of six, and so on. In the end, however, it is a matter of style.

Now that we've updated the function definition, it is time for us to start updating the receive block. We'll examine the set, update and updated messages independently, but keep in mind that they are all patterns matching against store/1's mailbox.

set, update and updated

Receiving a set patterned messages is the beginning of the update process, so we'll begin there.

Previously set would broadcast the updated key-value pair to all nodes and then immediately notify the user that the write was successful, but now we're going to track the number of acks received from writing nodes and wait until they pass the W threshold before notifying the user of a successful write.

{Sender, set, Key, Value} ->
    % client interface for updating values
    lists:foreach(fun(Pid) ->
		  Pid ! {self(), update, Sender, Key, Value}
	  end, pg2:get_members(kvs)),
    Writes2 = [{{Sender, Key}, ?KVS_WRITES} | Writes],
    store(Store#kvs_store{pending_writes=Writes2});

With these changes, all nodes maintain a property list in pending_writes which contain all writes which have been initiated but still haven't surpassed the W threshold for a durable write. The keys in pending_writes are a 2-tuple of the client process id (the pid which called the kvs:set/2 function) and the key which is being updated; the values are counters which contain the number of additional acknowledgements required before the write threshold is surpassed.

{Sender, update, Client, Key, Value} ->
    % sent to all nodes by first receiving node
    Sender ! {self(), updated, Client, Key, Value},
    store(Store#kvs_store{data=[{Key, Value} |
		proplists:delete(Key, Data)]});

As nodes receive an update, they notify the node which received the original set message that they have successfully updated the key's value.

{_Sender, updated, Client, Key, Value} ->
    Count = proplists:get_value({Client, Key}, Writes),
    case Count of
	undefined ->
	    store(Store);
	0 ->
	    Client ! {self(), received, {set, Key, Value}},
	    store(Store#kvs_store{
		    pending_writes=proplists:delete({Key, Value}, Writes)});
	_ ->
	    store(Store#kvs_store{
		    pending_writes=[{{Client, Key}, Count-1} |
				    proplists:delete({Client, Key}, Writes)]})
    end;

As other nodes acknowledge a successful write the corresponding write counter is decremented. Once it has been full decremented the Client pid is notified of a successful durable write, and the write is deleted from the pending_writes property list. Further acknowledgements for that update are ignored.

Consistent Reads

Implementing consistent reads follows a pattern similar to durable writes, but with the added complexity of reconciling inconsistent reads (i.e. what to do if one node says the value if 5 and another says the value is 16).

First we implement the read fanout, which sends reads to the other nodes and then adds the new read to pending_reads.

{Sender, get, Key} ->
    % client interface for retrieving values
    lists:foreach(fun(Pid) ->
	  Pid ! {self(), retrieve, Sender, Key}
    end, pg2:get_members(kvs)),
    % ?KVS_READS is required # of nodes to read from
    % [] is used to collect read values
    Reads2 = [{{Sender, Key}, {?KVS_READS, []}} | Reads],
	    store(Store#kvs_store{pending_reads=Reads2});

Next, sending sending the local value of the key back to the node which received the original get message.

{Sender, retrieve, Client, Key} ->
    Sender ! {self(), retrieved, Client, Key, proplists:get_value(Key, Data)},
    store(Store);

Finally, the logic for decrementing the counter in pending_reads and for collecting the values returned from various nodes via retrieve messages.

{_Sender, retrieved, Client, Key, Value} ->
    case proplists:get_value({Client, Key}, Reads) of
	{0, Values} ->
	    Freq = lists:foldr(fun(X, Acc) ->
		case proplists:get_value(X, Acc) of
		    undefined -> [{X, 1} | Acc];
		        N -> [{X, N+1} | proplists:delete(X, Acc)]
		     end
	        end, [], Values),
	    [{Popular, _} | _ ] = lists:reverse(lists:keysort(2, Freq)),
            Client ! {self(), got, Popular},
	    store(Store#kvs_store{
		pending_reads=proplists:delete({Key, Value}, Reads)});
	{Count, Values} ->
	    store(Store#kvs_store{
		pending_reads=[{{Client, Key}, {Count-1, [Value | Values]}} |
		    proplists:delete({Client, Key}, Reads)]})
    end;

Note that we're using an extremely weak algorithm for resolving inconsistent reads: we pick the most common value (if there are multiple most common values, then we indirectly pick one based on the order of responses from the reading nodes). If we can bear with this unsightliness for the moment, the next entry will take a look at using various kinds of clocks to great improvements in read consistency.

Failing Reads and Writes

In addition to a weak algorithm for resolving inconsistencies, another flaw in our current implementation is the handling of reads and writes which fail or otherwise cannot complete.

Consider a system whereW := 10, but there are only five available nodes due to a node failure. The write counter will never reach zero, and thus a response will never be sent to the user. That doesn't make for a very usable system.

A reasonable solution is to consider all pending operations older than some timeout value to have failed, and then to notify the user that the read/write was a failure.

Currently we're not storing when a set or get request comes in, so we''ll need to extend the information recorded in pending_reads and pending_writesto include a timestamp.

First we need to write a utility function to convert the output of erlang:now/0 into seconds since epoch,

ts() ->
    {Mega, Sec, _} = erlang:now(),
    (Mega * 1000000) + Sec.

and next we need to update a number of the receive patterns, starting with the set pattern, which needs to be updated to create pending writes as a 2-tuple of the number required writes and time the request was received (previously it was only the number of required writes).

{Sender, set, Key, Value} ->
            lists:foreach(fun(Pid) ->
                             Pid ! {self(), update, Sender, Key, Value}
                          end, pg2:get_members(kvs)),
            Writes2 = [{{Sender, Key}, {?KVS_WRITES, ts()}} | Writes],
            store(Store#kvs_store{pending_writes=Writes2});

updated requires a change as well, to extract Count properly from the new 2-tuple, as well as properly formatting the value when decrementing Count.

{_Sender, updated, Client, Key, Value} ->
  {Count, Timestamp} = proplists:get_value({Client, Key}, Writes),
    case Count of
      undefined ->
        store(Store);
      0 ->
        Client ! {self(), received, {set, Key, Value}},
        store(Store#kvs_store{
          pending_writes=proplists:delete({Key, Value}, Writes)});
      _ ->
        store(Store#kvs_store{
          pending_writes=[{{Client, Key}, {Count-1, Timestamp}} |
            proplists:delete({Client, Key}, Writes)]})
  end;

We need to make corresponding changes for reading, but we probably don't need to explicitly view those changes (but feel free to examine the diff where those changes occured, if you're interested).

The last major change we need to make is adding an after block following the receive block in store/1, which will allow us to periodically check for expired reads and writes.

    after
	?KVS_POLL_PENDING ->
	    Writes2 = lists:filter(fun filter_writes/1, Writes),
	    Reads2 = lists:filter(fun filter_reads/1, Reads),
	    store(Store#kvs_store{pending_writes=Writes2,
                                  pending_reads=Reads2})
    end.

filter_writes/1 and filter_reads/1 detect pending operations which are taking too long to complete, notify the client that their request failed, and then removes the failed operation from the pending_writes and pend_reads lists respectively.

filter_writes({{Client, _Key}, {_Count, Ts}}) ->
    Now = ts(),
    if Now > Ts + ?KVS_WRITE_TIMEOUT ->
	    Client ! {error, write_failed},
	    false;
       true  ->
	    true
    end.

filter_reads({{Client, _Key}, {_Count, _Values, Ts}}) ->
    Now = ts(),
    if Now > Ts + ?KVS_READ_TIMEOUT ->
	    Client ! {error, read_failed},
	    false;
       true  ->
	    true
    end.

It wasn't strictly necessary to implementing these functions outside of the receive block, but it is becoming rather sizable, and it'll be easier to modify if we carve it up a bit.

(This is also the first time we've seen the Erlang if block, whose guards I tend to like less than case's pattern matching. This is certainly a matter of style, but I prefer using case in all situations, except those which devolve into the snippet case X of true -> A; false -> B end;, which seems like trying a bit too hard to avoid if.)

Finally, the absolutely last modification, we'll also need to add the timeout value to kvs.hrl.

-define(KVS_WRITE_TIMEOUT, 2).  % seconds
-define(KVS_READ_TIMEOUT, 1).   % seconds
-define(KVS_POLL_PENDING, 500). % ms

After making all of our changes, let's start kvs with only a single node and a configuration of W := 3, R := 3.

14> c(kvs).
{ok,kvs}
15> kvs:stop().
stopped
16> kvs:start(1).
started
17> kvs:get(a).
{error,read_failed}
18> kvs:set(b, 10).
{error,write_failed}

There is our successful handling of failed pending reads and writes. Excellent.

With that, our second batch of changes comes to a close; a snapshot of the updated code is available on GitHub.

In the next article we'll take a more principled stance on consistent reads by exploring the impact of using physical, logical and vector clocks to version and reconcile values.

The post should be up in a couple of days.


  1. It is possible for process groups to become contaminated with dead pids, especially if you are being slack (like we are, for the time being) and using spawned processes instead of gen_servers. Even with a gen_server, if you don't trap_exit, then it is entirely possibly for a gen_server to terminate without calling its termination cleanup function.

    Finally, even if you properly set trap_exit on a gen_server, the termination cleanup function can fail to fire if the process is killed brutally (kill -9, etc).

    Rest assured, however, that it is possible to detect dead processes, using a combination of erlang:node/1, rpc:call/3 and erlang:is_alive/1 (erlang:is_alive/1 must be executed on the node where the process lives, which means you must first discover the node and then remotely check its liveliness).

    In systems where you anticipate the possibility of processes being brutally killed, it is necessary to design your system in such a way that it survives sending messages to dead pids (with gen_server:call you'll receive some kind of error, although it may not immediately lead you to understand the issue, but gen_server:cast and using ! probably won't raise any issue, so you'll either need a timeout to detect dropped messages, or to explicitly verify a process is alive before sending... with the timeout being much less error prone).

  2. This one-phase commit still suffers from a weakness, which is how to back out a write where a portion of the nodes fail. We'll need something along the lines of a two-phase commit to really address that adequately. This should come in a later entry. Which I guess I say a lot these days.

  3. This is only true to the extent that we can trust pg2:get_members/1 to contain all nodes, including all crashed or downed nodes. In a true W := N condition, if any nodes are crashed then all writes to the system would fail.

  4. There is also a penalty in migrating running code from a previous definition of a record to a new one, which requires hand-crafted transition code. In general, it's easier to bring a system down and then bring it back up again (starting with the new record definition) than attempting to migrate it, unless you really can't bring your processes down.

    In situations where you think you'll be frequently updating a record definition, consider taking the performance hit and just using a property list instead. No one ever woke up wanting to do a manual migration.