Irrational Exuberance!

Analyzing cross-service requests with Apache Beam.

November 26, 2017. Filed under pythoninfrastructuredata

After doing a survey of current data technologies, I wanted to write a couple simple programs in each to get a better feel of how they work. I decided to start with Apache Beam as it aims to allow you to write programs to run on many of the other platforms I hope to look into, which will hopefully allow me to reuse a single program for evaluating a number of different engines.

As a toy project, I've picked analyzing cross-service requests in a hypothetical microservices architecture. I chose this problem for a couple of reasons, the first of which is that it doesn't fit particularly well with fixed-sized windows, and is more accurately modeled using the session windowing strategy described in the Google Dataflow paper. It's also interesting because many companies already thread a request_id across all their requests, and it seemed like it would be an interesting validation of Apache Beam's approach if it can successfully provide real-time request tracing analytics.

The set up

Probably the easiest way to follow along is to install Docker, checkout the Git repository containing this code, and then follow these commands to get into a terminal where you can run the commands:

git clone https://github.com/lethain/learning-beam
cd learning-beam
docker build -t "learning-beam" .
docker run -it learning-beam /bin/bash
cd /tmp
python traces.py --input logs/output.* --output output
cat output*

You can also do this with just a virtualenv, but debug at your own risk!

git clone https://github.com/lethain/learning-beam
cd learning-beam
virtualenv env
. ./env/bin/activate
pip install apache_beam
python traces.py --input logs/output.* --output output
cat output*

Submit a pull request or drop me a note if you run into issues!


Beyond configuring the environment, the other particulars relate to the synthetic data we've generated for this analysis. We're generating log entries, represented in JSON, where each log entry is a span in our cross-service requests.

{
  "time": 0,
  "id": "236ed3a7-5e13-4b4f-81c4-9b0f46a1e37f",
  "trace": "b8c9e238-0310-492d-9a5d-137acd552354",
  "origin": {
    "service": "app",
    "server": "app-1",
  },
  "destination": {
    "service": "user",
    "server": "user-1",
  }
}

As a simplifying assumption, we're using a logical clock that starts at 0 when our hypothetical infrastructure begins sending requests, and otherwise behaves as a normal clock (e.g. time 10 represents 10 seconds have passed since the infrastructure started).

You can use the generate_logs.py tool to create sample logs, but it'll be easier to rely on the pregenerated contents of the logs directory, which are grouped into five second buckets.

Combining spans into a trace

The goal of this job is to convert a series of log lines in format of:

{
  "time": 0,
  "id": "236ed3a7-5e13-4b4f-81c4-9b0f46a1e37f",
  "trace": "b8c9e238-0310-492d-9a5d-137acd552354",
  "origin": {
    "service": "app",
    "server": "app-1",
  },
  "destination": {
    "service": "user",
    "server": "user-1",
  }
}

Into complete traces in the format of:

[ebcf] 6 spans: cache (t: 0) -> app (t: 1) -> posts (t: 2) -> ...
[b6a1] 5 spans: search (t: 0) -> db (t: 1) -> queue (t: 2) -> ...
[325a] 2 spans: cache (t: 0) -> frontend (t: 1)

The full code is up on Github, so I'll focus on two interesting parts: defining the pipeline, and writing the DoFn function to compile the spans into the span summaries.

Stripping out configuration, the total pipeline definition is:

with beam.Pipeline(options=opts) as p:
    lines = p | ReadFromText(args.input, coder=JsonCoder())
    output = (lines
              | beam.Map(lambda x: (x['trace'], x))
              | beam.GroupByKey()
              | beam.ParDo(AssembleTrace())
   )
   output | WriteToText(args.output)

This shows us:

  1. creating a beam.Pipeline,
  2. reading in our logs (in their JSON format),
  3. transforming them into (key, value) pairs using the trace key (the request_id in this example),
  4. grouping the data by keys, in this case trace,
  5. using the AssembleTrace subclass of beam.DoFn, which we'll cover next, to compile the grouped spans into a trace,
  6. outputting the resulting lines to text.

(The code in Github shows us windowing the data as well, but this example doesn't make use of that functionality, so it's strictly a no-op.)

The only other important code in this example is the AssembleTrace class, so let's look there next:

class AssembleTrace(beam.DoFn):
    "DoFn for processing assembled traces."
        def fmt_span(self, span):
            "Format a span for joining."
            vals = (span['destination']['service'], span['time'])
            return "%s (t: %s)" % vals

        def process(self, element, window=beam.DoFn.WindowParam):
            "Take traces grouped by trace id and analyze the trace."
            trace = element[0]
            spans = list(element[1])
            spans.sort(key=lambda x: x['time'])
            services = " -> ".join([self.fmt_span(span) for span in spans])
            return ["[%s] %s spans: %s" % (trace, len(spans), services)]

The process method is the key, unpacking the key and grouped values, sorting the values, and then joining them into a string representation of a trace.

That's all there is to it: now you have a simple Beam program that constructs request traces from logs containing the spans. This is clearly a very simple example, and it's not constructing the spans in real-time, but altogether I was pretty impressed with how trivial it felt to write a somewhat useful program (including how few concepts I needed to understand to do it).

Next up, I hope to get this running and compiling spans on top of a streaming runner (probably Apache Flink!).


Mostly I worked off these resources:

Overall, I was pretty impressed with how easy Beam as to work with once I got over my annoyance with the overriding of the | operator!