A Couple of Clojure Agent Examples
Having used Erlang daily at work for nearly a year now, when I first started looking into Clojure one of the first aspects I took a look at was Clojure agents. And initially, I was rather confused.1
The Erlang conception of an actor is a process with a mailbox which dequeues and processes messages individually. The actor process contains all the logic for processing messages, and the behavior can range from storing a message in an internal queue to ignoring the message entirely.
-module(agent).
-export([add/1, test/0]).
add(N) ->
receive
{add, M} -> add(N+M);
_ -> add(N)
end.
test() ->
Adder = spawn(agent, add, [0]),
Adder ! {add, 10}.
On the other hand, Clojure's agent inverts ownership of message handling logic: agents are submitted functions which are stored in a mailbox and then executed in order.
The agent itself has state, but no logic.
(def x (agent 0))
(defn increment [c n] (+ c n))
(send x increment 5) ; @x -> 5
(send x increment 10) ; @x -> 15
Using a Clojure agent is more akin to operating on a data-structure than interacting with a service.2 This makes sense, as the Clojure agent is a mechanism for sequencing operations on a particular instance of a datastructure. In distributed computing, it turns out that having a concise and easily grokked mechanism for sequencing modification is extremely useful.3
Now let's look at a few fairly straightforward examples of using Clojure agents.
Sequencing Writes to a File
Many Java classes are not designed to be thread-safe, but using an agent can make it possible to safely use such classes in a multi-threaded environment.
This example writes to a shared log file, where many actors are tracking their progress without mutual awareness, and without assuming the java.io.BufferedWriter class is thread-safe.
(ns logger (:import (java.io BufferedWriter FileWriter)))
(let [wtr (agent (BufferedWriter. (FileWriter. "agent.log")))]
(defn log [msg]
(letfn [(write [out msg]
(.write out msg)
out)]
(send wtr write msg)))
(defn close []
(send wtr #(.close %))))
(log "test\n")
(log "another line\n")
(close)
Note that the write
function's final return value is the reference to the BufferedWriter
.
This is significant because an agent's value is set to the return value of the function sent
via send. Thus, if write
was defined as
(defn write [out msg] (.write out msg))
then only the first write would work, but afterwards
the value of the agent would become nil
, and
subsequent calls to log
would attempt to call .write
on
nil, and thus throw an error.
(A slightly more in-depth look at writing to files in Clojure.)
Parallel HTTP Fetches
Although above I described agents as exclusively being used to sequence operations, the asynchronous sequencing of operations makes it possible to utilize multiple cores if there are multiple agents with queued messages (a single agent can't have its tasks split across multiple cores).
(ns parallel-fetch
(:import (java.io InputStream InputStreamReader BufferedReader)
(java.net URL HttpURLConnection)))
(defn get-url [url]
(let [conn (.openConnection (URL. url))]
(.setRequestMethod conn "GET")
(.connect conn)
(with-open [stream (BufferedReader.
(InputStreamReader. (.getInputStream conn)))]
(.toString (reduce #(.append %1 %2)
(StringBuffer.) (line-seq stream))))))
(defn get-urls [urls]
(let [agents (doall (map #(agent %) urls))]
(doseq [agent agents] (send-off agent get-url))
(apply await-for 5000 agents)
(doall (map #(deref %) agents))))
(prn (get-urls '("http://lethain.com" "http://willarson.com")))
Note the use of send-off instead of send;
send-off
is preferred for functions which block on IO, send
for those which block on CPU.
(await-for
blocks on any number of threads until they all complete or the timeout value, specified in milliseconds, is reached.)
Removed several lines here after helpful comment by Steve Gilardi clarified semantics of apply.
Agents as a Message Relay
In this final example, we relay a message through
a chain of N
agents. We'll also use a agent outside of the chain to log
progress through the chain. This logging technique makes it possible to check progress
through the relay while only retaining a reference to the first agent in the chain,
and is a fairly common pattern in distributed systems (although, usually you'll
be using something with more in common with RabbitMQ than a single process).
(ns agents-queue)
(def logger (agent (list)))
(defn log [msg]
(send logger #(cons %2 %1) msg))
(defn create-relay [n]
(letfn [(next-agent [previous _] (agent previous))]
(reduce next-agent nil (range 0 n))))
(defn relay [relay msg]
(letfn [(relay-msg [next-actor hop msg]
(cond (nil? next-actor) (log "finished relay")
:else (do (log (list hop msg))
(send next-actor relay-msg (+ hop 1) msg))))]
(send relay relay-msg 0 msg)))
(relay (create-relay 10) "hello")
(. java.lang.Thread sleep 5000)
(prn @logger)
; output from running script is:
; ("finished relay" (8 "hello") (7 "hello")
; (6 "hello") (5 "hello") (4 "hello")
; (3 "hello") (2 "hello") (1 "hello")
; (0 "hello"))
Probably the interesting aspect of this example is using agents to send agents to other agents which creates a fully asynchronous programming model.
Further, although this is a rather convoluted example in itself, the message relay is actually the simplest version of a work pipeline where each agent performs some manipulation on the incoming message before passing it further down the line.
And, that's it for the time being. Hopefully these examples provide a bit of a starting point for using Clojure agents. Let me know if you run into any issues.
However, I don't want to appear to propagate the misconception that simply having agents or actors makes a language Erlang-like: it doesn't. Or maybe it does, but no more than not optimizing tail-recursion makes Clojure a C-like language.
Erlang is a nice language, but Scala and Clojure are nice languages as well. The key difference is the Erlang OTP platform, which benefits heavily from some of the properties of the Erlang VM, such that it wouldn't be trivial to reimplement equivalent functionality at the library layer.
That said, you can take a lot of the great ideas from the Erlang OTP platform and apply them to Scala or Clojure.↩
It also means some concepts, like an Erlang actor having the ability to ignore messages, are not longer applicable.↩
An Erlang actor can also be used to sequence operations on a data, but the ability to incorporate logic into the actor means that it can also do much more; it can hide implementation details, validate incoming data, reject requests from unapproved processes, etc.↩