Durable Writes & Consistent Reads
After starting out with an extremely bare-bones implementation of a key-value store, it's time to make our first two incremental improvements in our quest towards Dynamo: implementing durable writes and consistent reads.
W
, R
and N
In our initial implementation, we immediately notified the client that an update was successful, and did not verify that the updates had actually been written in the remote nodes.
lists:foreach(fun(Pid) ->
Pid ! {self(), update, Key, Value}
end, pg2:get_members(kvs)),
Sender ! {self(), received, {set, Key, Value}},
While simple to implement, this approach is unreliable because it assumes the writes are successful. It could be that the other processes are down1, but we would still dutifully inform the client that their change had been successfully recorded.
To resolve this weakness, we'll make reading and writing a one-phase commit (rather than the zero phase commit we have now, aka firing blindly)2. After receiving a write request from a client, the receiver will wait for the writing nodes to acknowledge back success before responding to the client.
However, once we decide to perform a more sophisticated commit protocol, a few questions come up:
- Knowing that we need to wait before informing the client of a successful write, how many nodes should we wait for?
- In the same vein, when we are reading values, how many nodes should we query before returning the value to the client?
Rather than providing fixed answers, Dynamo makes these two values configurable (along with a third value, the number of nodes data is replicated to), and by doing so makes each Dynamo cluster highly customizable to the needs of its consumer.
These configurable values are:
N
is the number of nodes where each piece of data is replicated (for our current implementationN
is equal to the number of nodes in the system).W
is the number of nodes where each piece of data is written before notifying the user of success (our implementation isW := N
, sort of 3).R
is the number of nodes which are consulted for their value of a key (currently in our implementationR := 1
).
Combining these values in different ways creates data-stores with extremely different properties.
A system with W := N, R:= 1
will write to every single node before notifying a user of success,
and is a low-availability write system as a single downed node will prevent further writes.
However, by guaranteeing updates are written to every node, it is possible to get the latest value
by reading from only a single node, making it the system highly available for reads.
On the opposite end of the spectrum, a W := 1, R := N
system would be highly available for writes,
but with low availability for reads. If it's important to never reject a write, such inefficient reads
might be justifiable.
For most users, the values of W
and R
will be somewhere between 1 and N
.
According to the Dynamo paper, Amazon finds N := 3, W := 2, R := 2
to be a reasonable
balance between performance and reliability for most applications.
Now let's take a look at extending our key-value store to respect configurable
values of W
and R
(we'll have to take on N
later).
Records, Configuration & kvs.hrl
In the previous post I mentioned that -define
s are usually kept in
separate files with the .hrl
extension, and now it's time for us to start using
one. In this case we'll be calling it kvs.hrl
.
First, we'll be adding constant values for W
and R
(as well as moving over the TIMEOUT
value),
-define(KVS_WRITES, 3).
-define(KVS_READS, 3).
-define(TIMEOUT, 500).
but more importantly we'll be creating an Erlang record to manage the increasing
amounts of state maintained by kvs:store/1
.
Erlang records are nothing more than tuples with some syntactic sugar to make it possible for a human to access the contained values without remembering the elements' ordering. For most purposes, the syntactic sugar really is the biggest gain from using records, but there is a performance advantage versus using a property list4.
Our kvs_store
record will start out looking like this:
-record(kvs_store, {data, pending_reads, pending_writes}).
We'll use data
to store our key-value mappings,
pending_reads
to store reads which are still waiting
for additional nodes' values before returning to user, and pending_writes
for writes which are waiting for confirmation from additional nodes before
notifying user of a successful write.
% a brief interlude for record syntax
Store = #kvs_store{data=[], pending_reads=[], pending_writes=[]},
#kvs_store{data=D, pending_reads=R, pending_writes=W} = Store,
Writes = Store#kvsstore.pending_writes,
get_value(Key, S=#kvs_store{data=Data}) ->
proplists:get_value(Key, Data).
set_value(Key, Value, S=#kvs_store{data=Data}) ->
S#{data=[{Key, Value} | proplists:delete(Key, Data)]}.
We'll need to add an -include
at the top of kvs.erl
to take advantage of
the record definition.
-include("kvs.hrl").
And we'll also need to update the kvs:start/1
function to use the record:
%% @doc create N nodes in distributed key-value store
%% @spec start(integer()) -> started
start(N) ->
pg2:create(kvs),
lists:foreach(fun(_) ->
Store = #kvs_store{data=[],
pending_reads=[],
pending_writes=[]},
pg2:join(kvs, spawn(kvs, store, [Store]))
end, lists:seq(0, N)),
started.
With that done, now we can move on to implementing durable writes.
Durable Writes
The first step in our implementation of durable writes is to
update the interface of the store/1
function
(this change is necessary for the consistent read changes
we make below as well). The function's arity will remain
unchanged, but we'll be passing around our new kvs_store
record instead of the old property list.
% old definition
store(Data) -> term().
% new definition
store(Store = #kvs_store{data=Data,
pending_reads=Reads,
pending_writes=Writes}) -> term().
The skeptical reader may wonder why we went through the bother of defining a record when we could have simply changed the function definition to
store(Data, Reads, Writes) -> term().
and been done with it.
A reason to prefer the record is that you only pull out the necessary data, and don't have to juggle unused values. This becomes particularly meaningful as a record grows from an arity of one to an arity of three, from an arity of three to an arity of six, and so on. In the end, however, it is a matter of style.
Now that we've updated the function definition,
it is time for us to start updating the receive
block. We'll examine the set
, update
and updated
messages independently, but keep in mind that
they are all patterns matching against store/1
's mailbox.
set
, update
and updated
Receiving a set
patterned messages is the beginning of
the update process, so we'll begin there.
Previously set
would broadcast the updated key-value
pair to all nodes and then immediately notify the user
that the write was successful, but now we're going to
track the number of acks received from writing nodes
and wait until they pass the W
threshold before
notifying the user of a successful write.
{Sender, set, Key, Value} ->
% client interface for updating values
lists:foreach(fun(Pid) ->
Pid ! {self(), update, Sender, Key, Value}
end, pg2:get_members(kvs)),
Writes2 = [{ {Sender, Key}, ?KVS_WRITES} | Writes],
store(Store#kvs_store{pending_writes=Writes2});
With these changes, all nodes maintain a property list in pending_writes
which contain all writes which have been initiated but still haven't surpassed
the W
threshold for a durable write. The keys in pending_writes
are a
2-tuple of the client process id (the pid which called the kvs:set/2
function)
and the key which is being updated; the values are counters which contain the number
of additional acknowledgements required before the write threshold is surpassed.
{Sender, update, Client, Key, Value} ->
% sent to all nodes by first receiving node
Sender ! {self(), updated, Client, Key, Value},
store(Store#kvs_store{data=[{Key, Value} |
proplists:delete(Key, Data)]});
As nodes receive an update, they notify the node which received the original set
message
that they have successfully updated the key's value.
{_Sender, updated, Client, Key, Value} ->
Count = proplists:get_value({Client, Key}, Writes),
case Count of
undefined ->
store(Store);
0 ->
Client ! {self(), received, {set, Key, Value}},
store(Store#kvs_store{
pending_writes=proplists:delete({Key, Value}, Writes)});
_ ->
store(Store#kvs_store{
pending_writes=[{ {Client, Key}, Count-1} |
proplists:delete({Client, Key}, Writes)]})
end;
As other nodes acknowledge a successful write
the corresponding write counter is decremented.
Once it has been full decremented the Client
pid
is notified of a successful durable write, and
the write is deleted from the pending_writes
property list.
Further acknowledgements for that update are ignored.
Consistent Reads
Implementing consistent reads follows a pattern similar
to durable writes, but with the added complexity of
reconciling inconsistent reads (i.e. what to do if
one node says the value if 5
and another says
the value is 16
).
First we implement the read fanout, which sends reads
to the other nodes and then adds the new read to pending_reads
.
{Sender, get, Key} ->
% client interface for retrieving values
lists:foreach(fun(Pid) ->
Pid ! {self(), retrieve, Sender, Key}
end, pg2:get_members(kvs)),
% ?KVS_READS is required # of nodes to read from
% [] is used to collect read values
Reads2 = [{ {Sender, Key}, {?KVS_READS, []}} | Reads],
store(Store#kvs_store{pending_reads=Reads2});
Next, sending sending the local value of the key back to the
node which received the original get
message.
{Sender, retrieve, Client, Key} ->
Sender ! {self(), retrieved, Client, Key, proplists:get_value(Key, Data)},
store(Store);
Finally, the logic for decrementing the counter in pending_reads
and for collecting the values returned from various nodes via retrieve
messages.
{_Sender, retrieved, Client, Key, Value} ->
case proplists:get_value({Client, Key}, Reads) of
{0, Values} ->
Freq = lists:foldr(fun(X, Acc) ->
case proplists:get_value(X, Acc) of
undefined -> [{X, 1} | Acc];
N -> [{X, N+1} | proplists:delete(X, Acc)]
end
end, [], Values),
[{Popular, _} | _ ] = lists:reverse(lists:keysort(2, Freq)),
Client ! {self(), got, Popular},
store(Store#kvs_store{
pending_reads=proplists:delete({Key, Value}, Reads)});
{Count, Values} ->
store(Store#kvs_store{
pending_reads=[{ {Client, Key}, {Count-1, [Value | Values]}} |
proplists:delete({Client, Key}, Reads)]})
end;
Note that we're using an extremely weak algorithm for resolving inconsistent reads: we pick the most common value (if there are multiple most common values, then we indirectly pick one based on the order of responses from the reading nodes). If we can bear with this unsightliness for the moment, the next entry will take a look at using various kinds of clocks to great improvements in read consistency.
Failing Reads and Writes
In addition to a weak algorithm for resolving inconsistencies, another flaw in our current implementation is the handling of reads and writes which fail or otherwise cannot complete.
Consider a system whereW := 10
, but there are
only five available nodes due to a node failure.
The write counter will never reach zero, and thus a
response will never be sent to the user.
That doesn't make for a very usable system.
A reasonable solution is to consider all pending operations older than some timeout value to have failed, and then to notify the user that the read/write was a failure.
Currently we're not storing when a set
or get
request
comes in, so we''ll need to extend the information recorded
in pending_reads
and pending_writes
to include a timestamp.
First we need to write a utility function to convert the output
of erlang:now/0
into seconds since epoch,
ts() ->
{Mega, Sec, _} = erlang:now(),
(Mega * 1000000) + Sec.
and next we need to update a number of the receive
patterns,
starting with the set
pattern, which needs to be updated to
create pending writes as a 2-tuple of the number required writes and time
the request was received (previously it was only the
number of required writes).
{Sender, set, Key, Value} ->
lists:foreach(fun(Pid) ->
Pid ! {self(), update, Sender, Key, Value}
end, pg2:get_members(kvs)),
Writes2 = [{ {Sender, Key}, {?KVS_WRITES, ts()}} | Writes],
store(Store#kvs_store{pending_writes=Writes2});
updated
requires a change as well, to extract Count
properly
from the new 2-tuple, as well as properly formatting the value when
decrementing Count
.
{_Sender, updated, Client, Key, Value} ->
{Count, Timestamp} = proplists:get_value({Client, Key}, Writes),
case Count of
undefined ->
store(Store);
0 ->
Client ! {self(), received, {set, Key, Value}},
store(Store#kvs_store{
pending_writes=proplists:delete({Key, Value}, Writes)});
_ ->
store(Store#kvs_store{
pending_writes=[{ {Client, Key}, {Count-1, Timestamp}} |
proplists:delete({Client, Key}, Writes)]})
end;
We need to make corresponding changes for reading, but we probably don't need to explicitly view those changes (but feel free to examine the diff where those changes occured, if you're interested).
The last major change we need to make is adding an after
block following the receive
block
in store/1
, which will allow us to periodically check for expired reads and writes.
after
?KVS_POLL_PENDING ->
Writes2 = lists:filter(fun filter_writes/1, Writes),
Reads2 = lists:filter(fun filter_reads/1, Reads),
store(Store#kvs_store{pending_writes=Writes2,
pending_reads=Reads2})
end.
filter_writes/1
and filter_reads/1
detect pending operations which
are taking too long to complete, notify the client that their request failed,
and then removes the failed operation from the pending_writes
and pend_reads
lists respectively.
filter_writes({ {Client, _Key}, {_Count, Ts}}) ->
Now = ts(),
if Now > Ts + ?KVS_WRITE_TIMEOUT ->
Client ! {error, write_failed},
false;
true ->
true
end.
filter_reads({ {Client, Key}, {Count, _Values, Ts}}) ->
Now = ts(),
if Now > Ts + ?KVS_READ_TIMEOUT ->
Client ! {error, read_failed},
false;
true ->
true
end.
It wasn't strictly necessary to implementing these functions outside of the receive
block,
but it is becoming rather sizable, and it'll be easier to modify if we carve it up a bit.
(This is also the first time we've seen the Erlang if
block, whose guards I tend to like less
than case
's pattern matching. This is certainly a matter of style, but I prefer using case
in all situations, except those which devolve into the snippet case X of true -> A; false -> B end;
,
which seems like trying a bit too hard to avoid if.)
Finally, the absolutely last modification, we'll also need to add the timeout value to kvs.hrl
.
-define(KVS_WRITE_TIMEOUT, 2). % seconds
-define(KVS_READ_TIMEOUT, 1). % seconds
-define(KVS_POLL_PENDING, 500). % ms
After making all of our changes, let's start kvs
with only a single node and a configuration of W := 3, R := 3
.
14> c(kvs).
{ok,kvs}
15> kvs:stop().
stopped
16> kvs:start(1).
started
17> kvs:get(a).
{error,read_failed}
18> kvs:set(b, 10).
{error,write_failed}
There is our successful handling of failed pending reads and writes. Excellent.
With that, our second batch of changes comes to a close; a snapshot of the updated code is available on GitHub.
In the next article we'll take a more principled stance on consistent reads by exploring the impact of using physical, logical and vector clocks to version and reconcile values.
The post should be up in a couple of days.
It is possible for process groups to become contaminated with dead pids, especially if you are being slack (like we are, for the time being) and using spawned processes instead of gen_servers. Even with a gen_server, if you don't trap_exit, then it is entirely possibly for a gen_server to terminate without calling its termination cleanup function.
Finally, even if you properly set trap_exit on a gen_server, the termination cleanup function can fail to fire if the process is killed brutally (
kill -9
, etc).Rest assured, however, that it is possible to detect dead processes, using a combination of
erlang:node/1
,rpc:call/3
anderlang:is_alive/1
(erlang:is_alive/1
must be executed on the node where the process lives, which means you must first discover the node and then remotely check its liveliness).In systems where you anticipate the possibility of processes being brutally killed, it is necessary to design your system in such a way that it survives sending messages to dead pids (with
gen_server:call
you'll receive some kind of error, although it may not immediately lead you to understand the issue, butgen_server:cast
and using!
probably won't raise any issue, so you'll either need a timeout to detect dropped messages, or to explicitly verify a process is alive before sending... with the timeout being much less error prone).↩This one-phase commit still suffers from a weakness, which is how to back out a write where a portion of the nodes fail. We'll need something along the lines of a two-phase commit to really address that adequately. This should come in a later entry. Which I guess I say a lot these days.↩
This is only true to the extent that we can trust
pg2:get_members/1
to contain all nodes, including all crashed or downed nodes. In a trueW := N
condition, if any nodes are crashed then all writes to the system would fail.↩There is also a penalty in migrating running code from a previous definition of a record to a new one, which requires hand-crafted transition code. In general, it's easier to bring a system down and then bring it back up again (starting with the new record definition) than attempting to migrate it, unless you really can't bring your processes down.
In situations where you think you'll be frequently updating a record definition, consider taking the performance hit and just using a property list instead. No one ever woke up wanting to do a manual migration.↩