Callbacks Are An Informal Pipeline

March 7, 2010. Filed under javascript 18 node-js 3

At work for the last eight months or so I've been working on workflow software. The simplest example of a workflow is the pipeline: take a task, run it, pass the output to the next task as input, run it, pass... and on it goes. Last night I was trying to refactor the evolving complexity of a Node.js finally realized the very strong parallel between writing a workflow and chaining long series of callbacks.

At work I've been using a [Erlang gen_fsm}[gen_fsm] to guide work through the pipeline (along with failure recovery at each step), but for this Node.js case ideally I'd like something more lightweight since it will need to model at least one flow for each view in the webapp I am working on. For example, here is one of the first flows I am trying to model for creating a new project (some of the particularities are related to using Redis as a backend and redis-node-client as an interface, but this is a generic problem to callback based pipelines):

get new unique id key from a counter
create new project using unique key as part of key
add the new key to a list of projects which is stored in a second key

If the above flow is successful, then we want to finish with a show success flow:

get projects for user
render "project_creaed" template
send back to user

If the above flow fails in any step, then we want to finish with a show failure flow:

get projects for user
render "project_failed" template
send back to user

So the question is, how to make it easy to compose these various flows? By agreeing on a standard interface for workflow segments this is actually pretty simple. All segments are a function with this skeleton:

var a_segment = function(workflow_funs, ctx, param1, param2) {, ctx, [1, 2, 3, 4]);

The first parameter is always the remaining segments in the workflow, ctx is a dictionary which is passed into the workflow and is available to all segments (it can be modified along the pipeline), and then each segment can pass any number of additional parameters to the next segment (subsequent segments are free to use or ignore the additional parameters).

Here are three more realistic workflow segments, first an example of retrieving the next unique identifier from a Redis counter.

var redis = require("../redis-node-client/redisclient"),
    client = new redis.Client(),
    workflow = require("./workflow");

var function unique_id = function(funs, ctx, key) {
    client.connect(function() {
        client.incr(key, function(err, value) {
            ctx[key] = value;
  , ctx, [err, value]);

This workflow concept relies on each of the segments calling when its work is complete. This is useful because it you can write workflows while only needing to keep track of a single callback at a time (it gets really hard to keep nested callbacks straight in your mind, especially when it comes time to modify them).

Next let's write a segment which renders a Mustache template using Mu.

var workflow = require("./workflow"),
    Mu = require('../Mu/lib/mu');
Mu.templateRoot = './templates';

var render_template = function(funs, ctx) {
    Mu.render(ctx.template, ctx, {}, function(err, output) {
        if (err) {
            throw err;
        } else {
            var buffer = '';
            output.addListener('data', function (c) {buffer += c; })
            .addListener('end', function () {
      , ctx, [buffer]);

Finally, let's write a workflow segment which sends a response to an http client.

var http_response = function(funs, ctx, body) {
    if (ctx.headers === undefined) ctx.headers = {};
    if (ctx.headers['Content-Type'] === undefined) ctx.headers['Content-Type'] = "text/html";
    if (ctx.status_code === undefined) ctx.status_code = 200;
    ctx.res.writeHead(ctx.status_code, ctx.headers);
    ctx.res.close();, ctx);

Now let's create an HTTP server which responds to requests by creating a new unique key, renders a template and then returns it to the user.

var http = require("http"),
    workflow = require("./workflow");

// note that this requires a mustache template at ./templates/create_project
// which should be something like "project is {{name}}, unique is {{unique_project}}"

http.createServer(function(req, res) {
    req.addListener('end', function() {
        var ctx = { req:req, res:res, name:"my project", template:"create_project"};[unique_id, render_template, http_response], ctx, ["unique_project"]);

Note that ["unique_project"] is the parameter passed into the unique_id function to populate its key parameter, and then that response is stored into the ctx dictionary for the key unique_project. http_response renders the value passed into its body parameter, which is passed in by render_template.

The really unmagical function which supports this kind-of-goodness is four simple lines (the real magic is just deciding on an adequate convention and following it, meta-programming be damned).

// workflow.js = function(funs, ctx, params) {
    if (funs && funs[0]) 
        funs[0].apply(this, [funs.slice(1), ctx].concat(params));

I'm still playing around with this idea and a few others while trying to write a relatively complex Node.js application. I think there is something here, although this first iteration is extremely rough.

As an ending note, it is definitely possible to approach this problem in terms of a more sophisticated finite state machine (like the Erlang gen_fsm implementation I briefly mentioned above), but it's definitely more complex to work that kind of dynamic, and often it isn't necessary. Especially taking advantage of JavaScript exceptions will make it possible to create FSM like complexity in the required scenarios without requiring FSM like complexity in the simple ones.