Analyzing cross-service requests with Apache Beam.
After doing a survey of current data technologies, I wanted to write a couple simple programs in each to get a better feel of how they work. I decided to start with Apache Beam as it aims to allow you to write programs to run on many of the other platforms I hope to look into, which will hopefully allow me to reuse a single program for evaluating a number of different engines.
As a toy project, I’ve picked analyzing cross-service requests
in a hypothetical microservices architecture. I chose this problem for a couple of reasons,
the first of which is that it doesn’t fit particularly well with fixed-sized windows, and is
more accurately modeled using the session windowing strategy described in
the Google Dataflow paper.
It’s also interesting because many companies already thread a request_id
across all their
requests, and it seemed like it would be an interesting
validation of Apache Beam’s approach if it can successfully provide real-time request tracing
analytics.
The set up
Probably the easiest way to follow along is to install Docker, checkout the Git repository containing this code, and then follow these commands to get into a terminal where you can run the commands:
git clone https://github.com/lethain/learning-beam
cd learning-beam
docker build -t "learning-beam" .
docker run -it learning-beam /bin/bash
cd /tmp
python traces.py --input logs/output.* --output output
cat output*
You can also do this with just a virtualenv, but debug at your own risk!
git clone https://github.com/lethain/learning-beam
cd learning-beam
virtualenv env
. ./env/bin/activate
pip install apache_beam
python traces.py --input logs/output.* --output output
cat output*
Submit a pull request or drop me a note if you run into issues!
Beyond configuring the environment, the other particulars relate to the synthetic data we’ve generated for this analysis. We’re generating log entries, represented in JSON, where each log entry is a span in our cross-service requests.
{
"time": 0,
"id": "236ed3a7-5e13-4b4f-81c4-9b0f46a1e37f",
"trace": "b8c9e238-0310-492d-9a5d-137acd552354",
"origin": {
"service": "app",
"server": "app-1",
},
"destination": {
"service": "user",
"server": "user-1",
}
}
As a simplifying assumption, we’re using a logical clock that starts at 0 when our
hypothetical infrastructure begins sending requests, and otherwise behaves as a normal
clock (e.g. time 10
represents 10 seconds have passed since the infrastructure started).
You can use the generate_logs.py
tool to create sample logs, but it’ll be easier to rely
on the pregenerated contents of the logs
directory, which are grouped into five second
buckets.
Combining spans into a trace
The goal of this job is to convert a series of log lines in format of:
{
"time": 0,
"id": "236ed3a7-5e13-4b4f-81c4-9b0f46a1e37f",
"trace": "b8c9e238-0310-492d-9a5d-137acd552354",
"origin": {
"service": "app",
"server": "app-1",
},
"destination": {
"service": "user",
"server": "user-1",
}
}
Into complete traces in the format of:
[ebcf] 6 spans: cache (t: 0) -> app (t: 1) -> posts (t: 2) -> ...
[b6a1] 5 spans: search (t: 0) -> db (t: 1) -> queue (t: 2) -> ...
[325a] 2 spans: cache (t: 0) -> frontend (t: 1)
The full code is up on Github,
so I’ll focus on two interesting parts: defining the pipeline, and writing the
DoFn
function to compile the spans into the span summaries.
Stripping out configuration, the total pipeline definition is:
with beam.Pipeline(options=opts) as p:
lines = p | ReadFromText(args.input, coder=JsonCoder())
output = (lines
| beam.Map(lambda x: (x['trace'], x))
| beam.GroupByKey()
| beam.ParDo(AssembleTrace())
)
output | WriteToText(args.output)
This shows us:
- creating a
beam.Pipeline
, - reading in our logs (in their JSON format),
- transforming them into
(key, value)
pairs using thetrace
key (therequest_id
in this example), - grouping the data by keys, in this case
trace
, - using the
AssembleTrace
subclass ofbeam.DoFn
, which we’ll cover next, to compile the grouped spans into a trace, - outputting the resulting lines to text.
(The code in Github shows us windowing the data as well, but this example doesn’t make use of that functionality, so it’s strictly a no-op.)
The only other important code in this example is the AssembleTrace
class, so let’s look there next:
class AssembleTrace(beam.DoFn):
"DoFn for processing assembled traces."
def fmt_span(self, span):
"Format a span for joining."
vals = (span['destination']['service'], span['time'])
return "%s (t: %s)" % vals
def process(self, element, window=beam.DoFn.WindowParam):
"Take traces grouped by trace id and analyze the trace."
trace = element[0]
spans = list(element[1])
spans.sort(key=lambda x: x['time'])
services = " -> ".join([self.fmt_span(span) for span in spans])
return ["[%s] %s spans: %s" % (trace, len(spans), services)]
The process
method is the key, unpacking the key and grouped values, sorting the values,
and then joining them into a string representation of a trace.
That’s all there is to it: now you have a simple Beam program that constructs request traces from logs containing the spans. This is clearly a very simple example, and it’s not constructing the spans in real-time, but altogether I was pretty impressed with how trivial it felt to write a somewhat useful program (including how few concepts I needed to understand to do it).
Next up, I hope to get this running and compiling spans on top of a streaming runner (probably Apache Flink!).
Mostly I worked off these resources:
- Apache Beam Programming Guide which is significantly more up-to-date than the WordCount Example Walkthrough, to the extent that the walkthrough will often tell you that things don’t exist in the Python SDK, but the SDK documentation shows they do.
- For reading in the JSON log lines, I followed this example, and it worked as advertised.
- The Windowed wordcount example for windowing, and an example of writing a
beam.DoFn
subclass.
Overall, I was pretty impressed with how easy Beam as to work with once I got over my annoyance with the overriding of the |
operator!
I later did a Spark version of this job for a quick comparison point, and it was even shorter, and about equally simple:
import json
from pyspark.sql import SparkSession
def fmt_span(span):
"Format a span for joining."
vals = (span['destination']['service'], span['time'])
return "%s (t: %s)" % vals
def build_trace(kv):
trace, spans = kv
spans = list(spans)
spans.sort(key=lambda x: x['time'])
services = " -> ".join([fmt_span(span) for span in spans])
return "[%s] %s spans: %s" % (trace, len(spans), services)
def run():
log_dir = 'logs/*'
spark = SparkSession.builder.appName("RequestSessions").getOrCreate()
lines = spark.read.json(log_dir).cache().rdd
traces = lines.map(lambda x: (x['trace'], x)) \
.groupByKey() \
.map(build_trace)
output = traces.collect()
spark.stop()
if __name__ == '__main__':
run()
I found it a bit simpler to avoid the parDo
terminology
from Beam, which feels like unnecessary conceptual load,
but overall pretty similar.