Irrational Exuberance!

Storing Bounded Timeboxes in Redis

April 7, 2011. Filed under pythonredis

So, you're trying to write a simple report system for tracking error rates in your system. And, hey--who knew?--you've decided to use Python and Redis. It'll be a breeze!

Or will it?

Installation

First, installing and starting Redis on your local machine.

curl http://redis.googlecode.com/files/redis-2.2.4.tar.gz > redis.tar.gz
tar -xzvf redis.tar.gz
cd redis-2.2.4
make
./src/redis-server

Let's test that worked real quickly...

./src/redis-cli set a b
./src/redis-cli get a

If the second command returned b, then you're all set. OK, awesome. Now just need to get redispy installed.

virtualenv --no-site-packages zset
cd zset
. ./bin/activate
pip install redis

Check that worked...

:::pycon
>>> import redis
>>> cli = redis.Redis()
>>> cli.get("a")
'b'

Excellent, now we're ready to get back to bounded timeboxes.

What Are Bounded Timeboxes?

Contrary to common expectations it isn't a DeLorean, and I might have just made the term up, but anyway, let's move past that. In this post a "timebox" is a something which contains activity for a period of time like "100 Diggs one minute ago" or "200 tweets two minutes ago". "bounded" comes into play because you only want to store a certain number of timeboxes. An example will make this more concrete.

You're storing the number of pageviews for each URL for each minute. You've decided to do this by having a sorted set per URL, where the key is the timestamp in seconds modulus sixty and the value is the number of pageviews in that minute. Writing and reading from this sorted set would be like:

:::python
import time, redis
PAGEVIEW_BUCKET = "pv.%s"

def increment_pageview(url):
    cli = redis.Redis()
    bucket = int(time.time()) % 60
    cli.zincrby(PAGEVIEW_BUCKET % url, bucket, 1)

def get_pageviews(url, bucket):
    cli = redis.Redis()
    return cli.zscore(PAGEVIEW_BUCKET % url, bucket)

This is reasonable and all, and allows us to find the minutes where the story received the most (or least) pageviews,

:::python
# get top 10 buckets by num pageviews, include num pageviews
cli.revrange(PAGEVIEW_BUCKET % url, 0, 9, withscores=True)

# get bottom 10 buckets by num pageviews, don't include num pageviews
cli.range(PAGEVIEW_BUCKET % url, 0, 9)

but it doesn't support keeping only the last N timebuckets, nor getting the number of pageviews in the those last buckets.

Using only one sorted set, if you want to prune you'd need to retrieve all the keys, sort them in Python and then prune. For a small enough set, that's probably going to be a performant--albeit slightly soul crushing--solution.

:::python
MAX_BUCKETS = 10

def increment_pageview(url):
    cli = redis.Redis()
    bucket = (int(time.time()) * 60,)
    url_key = PAGEVIEW % url
    cli.zincrby(url_key, bucket, 1)
    num_keys = cli.zcard(url_key)
    if num_keys > MAX_BUCKETS:
        all_keys = cli.rank(url_key, 0, num_keys, withscores=True)
        # note that older buckets are always lower ranking ints,
        # although they are being stored as strings, so we have
        # to coerce them into ints
        sorted_keys = sorted((int(a), b) for a,b in x.iteritems())
        for key, score in sorted_keys[:num_keys-MAX_BUCKETS]:
            cli.zrem(url_key, key)

Yes, it works. But, damnit, it's hard to love, and it's also pushing datastructure manipulation from Redis into the application. In this case we might not care beacuse we don't have to worry about any races (all the cleanup operations will be idempotent), but we can absolutely do better.

We can do better, that is, if by better we mean: "uses twice the memory but is conceptually cleaner and has better performance with large sorted sets."

WATCH, MULTI and EXEC

The answer is to use two sorted sets. The first contains the same data as before, and the second will use the name as its value, allowing us to perform the same operations as above in addition to removing older timeboxes.

We'll also need to use a few relatively uncommon Redis commands to keep two sorted sets synchronized in the face of concurrent requests:

  • WATCH allows you to make a transaction conditional: if another client modifies a value you're watching, then the dependent transaction will fail. (You watch A, use GET A to fetch its value, then do MULTI, INCR A 100, EXEC; if another client modifies A after you watched it and before your transaction completed on the Redis server, the transaction will fail, otherwise it will increment.)
  • MULTI allows you to queue multiple commands into a single transaction. Either all of them will succeed or none of them will. All WATCH commands should be issued before MULTI is issued.
  • EXEC executes all commands since MULTI.

These pieces make it possible to perform fairly complex transactional operations with Redis. They also make it possible to increase performance by sending multiple commands in a batch.

redispy uses pipelines to expose MULTI and EXEC. Here is us popping two items off of a sorted set with extreme prejudice (it'll fail if there aren't enough items, but otherwise it'll bang on the server until it is able to read the end of a sorted set and remove the items read before another client writes to that sorted set):

:::python
cli = redis.Redis()
while True:
    try:
        cli.watch("a")
        (first, second) = cli.zrange("a", 0, 1)
        cli.pipeline().zrem("a", first).zrem("a", second).execute()
        break
    except redis.exceptions.WatchError:
        pass

If you have a great deal of write-contention then this approach is going to thrash the server, but it absolutely will be transactional. Okay, let's put the same ideas to work to build our bounded timeboxes.

:::python
import time, redis
PAGEVIEW_BUCKET = "pv.%s"
TIME_BUCKET = "tb.%s"
MAX_BUCKETS = 1000

def increment_pageview(url):
    cli = redis.Redis()
    bucket = int(time.time()) % 60
    pv_key = PAGEVIEW_BUCKET % url
    cli.zincrby(pv_key, bucket, 1)

    # ensure that the bucket's KV exists in TIME_BUCKET
    tb_key = TIME_BUCKET % url
    cli.watch(tb_key)
    time_bucket_exists = cli.zscore(tb_key, bucket)
    if not time_bucket_exists:
        try:
            cli.pipeline().zadd(tb_key, bucket, bucket).execute()
        except redis.exceptions.WatchError:
            # somebody else already initialized the key,
            # we don't need to initialize it again
            pass

    # trim any excess values
    to_rem = cli.zrevrange(tb_key, MAX_BUCKETS, MAX_BUCKETS)
    if to_rem:
        # we aren't watching any values, so no need catch WatchError
        cli.pipeline().zrem(tb_key, bucket).zrem(pv_key, bucket).execute()

def recent_buckets(url, offset, limit):
    "Get the most recent time buckets and their pageviews for a URL."
    cli = redis.Redis()
    tb_key = TIME_BUCKET % url
    pv_key = PAGEVIEW_BUCKET % url

    # note that we are watching the time bucket key,
    # not the pageview key, so this watch will only fail
    # if executed while a call to ``increment_pageview``
    # trims one of the buckets, so we don't expect for
    # frequent failures here, but we can rest assured
    # we will never return data for a bucket whose data
    # we are no longer storing
    while True:
        try:
            cli.watch(tb_key)
            # end value is inclusive, so have to do limit-1 for
            # traditional offset/limit expectations
            recent_buckets = cli.zrevrange(tb_key, offset, limit-1)
            pipe = cli.pipeline()
            for bucket in recent_buckets:
                pipe.zscore(bucket)
            scores = pipe.execute()
            return zip(recent_buckets, scores)
        except redis.exceptions.WatchError:
            pass

What a concise improvement! No? Well, I suppose it's fairly complicated for what we want to accomplish, but I'll be damned if it doesn't meet the requirements for most analytics systems which want to maintain a fixed number of time buckets. And you probably learned something about the WATCH, MULTI and EXEC commands.

As always, let me know what I could have done better in this tutorial or where I've made mistakes! Thank you for reading!