Storing Bounded Timeboxes in Redis
So, you’re trying to write a simple report system for tracking error rates in your system. And, hey–who knew?–you’ve decided to use Python and Redis. It’ll be a breeze!
Or will it?
First, installing and starting Redis on your local machine.
curl http://redis.googlecode.com/files/redis-2.2.4.tar.gz > redis.tar.gz tar -xzvf redis.tar.gz cd redis-2.2.4 make ./src/redis-server
Let’s test that worked real quickly…
./src/redis-cli set a b ./src/redis-cli get a
If the second command returned
b, then you’re all set. OK, awesome.
Now just need to get
virtualenv --no-site-packages zset cd zset . ./bin/activate pip install redis
Check that worked…
:::pycon >>> import redis >>> cli = redis.Redis() >>> cli.get("a") 'b'
Excellent, now we’re ready to get back to bounded timeboxes.
What Are Bounded Timeboxes?
Contrary to common expectations it isn’t a DeLorean, and I might have just made the term up, but anyway, let’s move past that. In this post a “timebox” is a something which contains activity for a period of time like “100 Diggs one minute ago” or “200 tweets two minutes ago”. “bounded” comes into play because you only want to store a certain number of timeboxes. An example will make this more concrete.
You’re storing the number of pageviews for each URL for each minute. You’ve decided to do this by having a sorted set per URL, where the key is the timestamp in seconds modulus sixty and the value is the number of pageviews in that minute. Writing and reading from this sorted set would be like:
:::python import time, redis PAGEVIEW_BUCKET = "pv.%s" def increment_pageview(url): cli = redis.Redis() bucket = int(time.time()) % 60 cli.zincrby(PAGEVIEW_BUCKET % url, bucket, 1) def get_pageviews(url, bucket): cli = redis.Redis() return cli.zscore(PAGEVIEW_BUCKET % url, bucket)
This is reasonable and all, and allows us to find the minutes where the story received the most (or least) pageviews,
:::python # get top 10 buckets by num pageviews, include num pageviews cli.revrange(PAGEVIEW_BUCKET % url, 0, 9, withscores=True) # get bottom 10 buckets by num pageviews, don't include num pageviews cli.range(PAGEVIEW_BUCKET % url, 0, 9)
but it doesn’t support keeping only the last
N timebuckets, nor
getting the number of pageviews in the those last buckets.
Using only one sorted set, if you want to prune you’d need to retrieve all the keys, sort them in Python and then prune. For a small enough set, that’s probably going to be a performant–albeit slightly soul crushing–solution.
:::python MAX_BUCKETS = 10 def increment_pageview(url): cli = redis.Redis() bucket = (int(time.time()) * 60,) url_key = PAGEVIEW % url cli.zincrby(url_key, bucket, 1) num_keys = cli.zcard(url_key) if num_keys > MAX_BUCKETS: all_keys = cli.rank(url_key, 0, num_keys, withscores=True) # note that older buckets are always lower ranking ints, # although they are being stored as strings, so we have # to coerce them into ints sorted_keys = sorted((int(a), b) for a,b in x.iteritems()) for key, score in sorted_keys[:num_keys-MAX_BUCKETS]: cli.zrem(url_key, key)
Yes, it works. But, damnit, it’s hard to love, and it’s also pushing datastructure manipulation from Redis into the application. In this case we might not care beacuse we don’t have to worry about any races (all the cleanup operations will be idempotent), but we can absolutely do better.
We can do better, that is, if by better we mean: “uses twice the memory but is conceptually cleaner and has better performance with large sorted sets.”
The answer is to use two sorted sets. The first contains the same data as before, and the second will use the name as its value, allowing us to perform the same operations as above in addition to removing older timeboxes.
We’ll also need to use a few relatively uncommon Redis commands to keep two sorted sets synchronized in the face of concurrent requests:
- WATCH allows you to make a transaction conditional: if another client modifies a value you’re watching, then the dependent transaction will fail.
GET Ato fetch its value, then do
INCR A 100,
EXEC; if another client modifies
Aafter you watched it and before your transaction completed on the Redis server, the transaction will fail, otherwise it will increment.)
- MULTI allows you to queue multiple commands into a single transaction. Either all of them will succeed or none of them will. All
WATCHcommands should be issued before
- EXEC executes all commands since
These pieces make it possible to perform fairly complex transactional operations with Redis. They also make it possible to increase performance by sending multiple commands in a batch.
redispy uses pipelines to expose
EXEC. Here is us popping two items off of
a sorted set with extreme prejudice (it’ll fail if there aren’t enough items, but otherwise it’ll
bang on the server until it is able to read the end of a sorted set and remove the items read before
another client writes to that sorted set):
:::python cli = redis.Redis() while True: try: cli.watch("a") (first, second) = cli.zrange("a", 0, 1) cli.pipeline().zrem("a", first).zrem("a", second).execute() break except redis.exceptions.WatchError: pass
If you have a great deal of write-contention then this approach is going to thrash the server, but it absolutely will be transactional. Okay, let’s put the same ideas to work to build our bounded timeboxes.
:::python import time, redis PAGEVIEW_BUCKET = "pv.%s" TIME_BUCKET = "tb.%s" MAX_BUCKETS = 1000 def increment_pageview(url): cli = redis.Redis() bucket = int(time.time()) % 60 pv_key = PAGEVIEW_BUCKET % url cli.zincrby(pv_key, bucket, 1) # ensure that the bucket's KV exists in TIME_BUCKET tb_key = TIME_BUCKET % url cli.watch(tb_key) time_bucket_exists = cli.zscore(tb_key, bucket) if not time_bucket_exists: try: cli.pipeline().zadd(tb_key, bucket, bucket).execute() except redis.exceptions.WatchError: # somebody else already initialized the key, # we don't need to initialize it again pass # trim any excess values to_rem = cli.zrevrange(tb_key, MAX_BUCKETS, MAX_BUCKETS) if to_rem: # we aren't watching any values, so no need catch WatchError cli.pipeline().zrem(tb_key, bucket).zrem(pv_key, bucket).execute() def recent_buckets(url, offset, limit): "Get the most recent time buckets and their pageviews for a URL." cli = redis.Redis() tb_key = TIME_BUCKET % url pv_key = PAGEVIEW_BUCKET % url # note that we are watching the time bucket key, # not the pageview key, so this watch will only fail # if executed while a call to ``increment_pageview`` # trims one of the buckets, so we don't expect for # frequent failures here, but we can rest assured # we will never return data for a bucket whose data # we are no longer storing while True: try: cli.watch(tb_key) # end value is inclusive, so have to do limit-1 for # traditional offset/limit expectations recent_buckets = cli.zrevrange(tb_key, offset, limit-1) pipe = cli.pipeline() for bucket in recent_buckets: pipe.zscore(bucket) scores = pipe.execute() return zip(recent_buckets, scores) except redis.exceptions.WatchError: pass
What a concise improvement! No? Well, I suppose it’s fairly complicated for
what we want to accomplish, but I’ll be damned if it doesn’t meet the
requirements for most analytics systems which want to maintain a fixed
number of time buckets. And you probably learned something
As always, let me know what I could have done better in this tutorial or where I’ve made mistakes! Thank you for reading!