Storing Bounded Timeboxes in Redis
So, you’re trying to write a simple report system for tracking error rates in your system. And, hey–who knew?–you’ve decided to use Python and Redis. It’ll be a breeze!
Or will it?
Installation
First, installing and starting Redis on your local machine.
curl http://redis.googlecode.com/files/redis-2.2.4.tar.gz > redis.tar.gz
tar -xzvf redis.tar.gz
cd redis-2.2.4
make
./src/redis-server
Let’s test that worked real quickly…
./src/redis-cli set a b
./src/redis-cli get a
If the second command returned b
, then you’re all set. OK, awesome.
Now just need to get redispy
installed.
virtualenv --no-site-packages zset
cd zset
. ./bin/activate
pip install redis
Check that worked…
:::pycon
>>> import redis
>>> cli = redis.Redis()
>>> cli.get("a")
'b'
Excellent, now we’re ready to get back to bounded timeboxes.
What Are Bounded Timeboxes?
Contrary to common expectations it isn’t a DeLorean, and I might have just made the term up, but anyway, let’s move past that. In this post a “timebox” is a something which contains activity for a period of time like “100 Diggs one minute ago” or “200 tweets two minutes ago”. “bounded” comes into play because you only want to store a certain number of timeboxes. An example will make this more concrete.
You’re storing the number of pageviews for each URL for each minute. You’ve decided to do this by having a sorted set per URL, where the key is the timestamp in seconds modulus sixty and the value is the number of pageviews in that minute. Writing and reading from this sorted set would be like:
:::python
import time, redis
PAGEVIEW_BUCKET = "pv.%s"
def increment_pageview(url):
cli = redis.Redis()
bucket = int(time.time()) % 60
cli.zincrby(PAGEVIEW_BUCKET % url, bucket, 1)
def get_pageviews(url, bucket):
cli = redis.Redis()
return cli.zscore(PAGEVIEW_BUCKET % url, bucket)
This is reasonable and all, and allows us to find the minutes where the story received the most (or least) pageviews,
:::python
# get top 10 buckets by num pageviews, include num pageviews
cli.revrange(PAGEVIEW_BUCKET % url, 0, 9, withscores=True)
# get bottom 10 buckets by num pageviews, don't include num pageviews
cli.range(PAGEVIEW_BUCKET % url, 0, 9)
but it doesn’t support keeping only the last N
timebuckets, nor
getting the number of pageviews in the those last buckets.
Using only one sorted set, if you want to prune you’d need to retrieve all the keys, sort them in Python and then prune. For a small enough set, that’s probably going to be a performant–albeit slightly soul crushing–solution.
:::python
MAX_BUCKETS = 10
def increment_pageview(url):
cli = redis.Redis()
bucket = (int(time.time()) * 60,)
url_key = PAGEVIEW % url
cli.zincrby(url_key, bucket, 1)
num_keys = cli.zcard(url_key)
if num_keys > MAX_BUCKETS:
all_keys = cli.rank(url_key, 0, num_keys, withscores=True)
# note that older buckets are always lower ranking ints,
# although they are being stored as strings, so we have
# to coerce them into ints
sorted_keys = sorted((int(a), b) for a,b in x.iteritems())
for key, score in sorted_keys[:num_keys-MAX_BUCKETS]:
cli.zrem(url_key, key)
Yes, it works. But, damnit, it’s hard to love, and it’s also pushing datastructure manipulation from Redis into the application. In this case we might not care beacuse we don’t have to worry about any races (all the cleanup operations will be idempotent), but we can absolutely do better.
We can do better, that is, if by better we mean: “uses twice the memory but is conceptually cleaner and has better performance with large sorted sets.”
WATCH
, MULTI
and EXEC
The answer is to use two sorted sets. The first contains the same data as before, and the second will use the name as its value, allowing us to perform the same operations as above in addition to removing older timeboxes.
We’ll also need to use a few relatively uncommon Redis commands to keep two sorted sets synchronized in the face of concurrent requests:
- WATCH allows you to make a transaction conditional: if another client modifies a value you’re watching, then the dependent transaction will fail.
(You watch
A
, useGET A
to fetch its value, then doMULTI
,INCR A 100
,EXEC
; if another client modifiesA
after you watched it and before your transaction completed on the Redis server, the transaction will fail, otherwise it will increment.) - MULTI allows you to queue multiple commands into a single transaction. Either all of them will succeed or none of them will. All
WATCH
commands should be issued beforeMULTI
is issued. - EXEC executes all commands since
MULTI
.
These pieces make it possible to perform fairly complex transactional operations with Redis. They also make it possible to increase performance by sending multiple commands in a batch.
redispy
uses pipelines to expose MULTI
and EXEC
. Here is us popping two items off of
a sorted set with extreme prejudice (it’ll fail if there aren’t enough items, but otherwise it’ll
bang on the server until it is able to read the end of a sorted set and remove the items read before
another client writes to that sorted set):
:::python
cli = redis.Redis()
while True:
try:
cli.watch("a")
(first, second) = cli.zrange("a", 0, 1)
cli.pipeline().zrem("a", first).zrem("a", second).execute()
break
except redis.exceptions.WatchError:
pass
If you have a great deal of write-contention then this approach is going to thrash the server, but it absolutely will be transactional. Okay, let’s put the same ideas to work to build our bounded timeboxes.
:::python
import time, redis
PAGEVIEW_BUCKET = "pv.%s"
TIME_BUCKET = "tb.%s"
MAX_BUCKETS = 1000
def increment_pageview(url):
cli = redis.Redis()
bucket = int(time.time()) % 60
pv_key = PAGEVIEW_BUCKET % url
cli.zincrby(pv_key, bucket, 1)
# ensure that the bucket's KV exists in TIME_BUCKET
tb_key = TIME_BUCKET % url
cli.watch(tb_key)
time_bucket_exists = cli.zscore(tb_key, bucket)
if not time_bucket_exists:
try:
cli.pipeline().zadd(tb_key, bucket, bucket).execute()
except redis.exceptions.WatchError:
# somebody else already initialized the key,
# we don't need to initialize it again
pass
# trim any excess values
to_rem = cli.zrevrange(tb_key, MAX_BUCKETS, MAX_BUCKETS)
if to_rem:
# we aren't watching any values, so no need catch WatchError
cli.pipeline().zrem(tb_key, bucket).zrem(pv_key, bucket).execute()
def recent_buckets(url, offset, limit):
"Get the most recent time buckets and their pageviews for a URL."
cli = redis.Redis()
tb_key = TIME_BUCKET % url
pv_key = PAGEVIEW_BUCKET % url
# note that we are watching the time bucket key,
# not the pageview key, so this watch will only fail
# if executed while a call to ``increment_pageview``
# trims one of the buckets, so we don't expect for
# frequent failures here, but we can rest assured
# we will never return data for a bucket whose data
# we are no longer storing
while True:
try:
cli.watch(tb_key)
# end value is inclusive, so have to do limit-1 for
# traditional offset/limit expectations
recent_buckets = cli.zrevrange(tb_key, offset, limit-1)
pipe = cli.pipeline()
for bucket in recent_buckets:
pipe.zscore(bucket)
scores = pipe.execute()
return zip(recent_buckets, scores)
except redis.exceptions.WatchError:
pass
What a concise improvement! No? Well, I suppose it’s fairly complicated for
what we want to accomplish, but I’ll be damned if it doesn’t meet the
requirements for most analytics systems which want to maintain a fixed
number of time buckets. And you probably learned something
about the WATCH
, MULTI
and EXEC
commands.
As always, let me know what I could have done better in this tutorial or where I’ve made mistakes! Thank you for reading!