The Ultimate Guide to Clojure Concurrency
But how can you learn to rock your cores?
The answer is not as simple as it seems. You've got a lot of choices in Clojure. Concurrency is not a simple topic. I'm going to lay out the basics of concurrency. Then I'll go through the options you've got in Clojure one at a time. Consider this list to be a catalog of tools you have to solve the problems you're facing.
Table of Contents
- Catalog of Primitives
- Delay - I may not need to calculate this
- Promise - I'll check back here for the answer
- Future - Please calculate this in another thread
- A word about
deref- What's in the box?
- Atom - Keeping a single value consistent over time
- Ref - Keep multiple values in a consistent relationship
- Var - Redefine stuff as you program
- Agent - Queue up work to aggregate a value
- Lock - Keep others out while you're in
- core.async - Workers with conveyor belts
- Threads - Use your cores
- Executor Service - Run lots of tasks in a thread pool
- Fork/Join - Break your work into chunks, do the chunks on your cores, put them back together
- Manifold - Abstractions for data sources
pmap- Simple parallelism with one more letter
- Reducers - Transformation pipelines in parallel
Ready to go?
A common question is "What is the difference between concurrency and parallelism?". If it's confusing you, don't worry. They're almost the same and the distinction is mostly academic. However, you're probably already used to thinking about concurrency and parallelism. We actually encounter the difference all the time in the real world.
Let's look at an example: bank tellers.
Imagine you walk into a bank. You see a row of tellers and a line of people waiting. How many bank transactions can happen at the same time? Easy. It's the same as the number of tellers. The parallelism of the bank is how many things can happen at the exact same time. If there are four tellers, four things can happen at the same time.
Even though there are only four tellers, all of those clients waiting in line will be helped before the end of the day. They're basically competing over the scarce resource of the teller's time. But because of the concurrency system set up (the queue), they all know that their business will be handled in a timely manner.
Want another example?
How about a bathroom shared among roommates. How many people can use it at once? One. But somehow using a lock, the eight roommates can share it.
We can increase the parallelism by adding bathrooms. If you've got ten bathrooms but only eight roommates, we've actually got more parallelism than concurrency. The work of using the bathroom can't really be broken down any more to take advantage of those two extra rooms. You can't go to the bathroom faster by using two bathrooms.
Let's go back down to one bathroom.
With eight roommates sharing one bathroom, can you imagine maybe one roommate doesn't get their fair share of bathroom time? Maybe they are slower running to the bathroom when it's vacant. Those roommates could use a better system for sharing that bathroom than a simple lock. This is where concurrency primitives come in. Concurrency primitives are simple tools that you can use to build concurrent systems that have the properties you need to solve your problem.
Want some real-world examples of concurrency primitives?
- Conveyor belts
We'll see more in a bit, where we'll catalog the ones that you get with Clojure.
But first, we saw how to increase parallelism. But how do you increase concurrency? We already know you can't increase concurrency in the bathroom example. The "unit of work" can't be split up any more. One person can't use two bathrooms at the same time!
But you can do it in the bank example.
Let's say you have 100 bank tellers (100 parallelism) and 10 clients in line. Obviously, no one will have to wait--except for those 90 tellers who are not busy! Is there anything we can do to keep them busy to make the client's bank experience go faster?
Yes. Because if you ask those clients, each one has multiple transactions. One person has four checks to deposit. Another wants to do three separate transfers, etc. The bank has set up their work so that each transaction is entirely independent. You can hand each teller a check to deposit (with the deposit slip) and all of your work can be done at the same time. One way to increase concurrency is to break down the work to be done into independent chunks. You'd be surprised how much can be broken up without much work.
Let's say you have to fetch 20 web pages. Each GET request is independent of the others, so you can do them all at the same time. No problem.
That's an easy example of independent work. We'll get deeper into what it means to be "independent" later, because there are different types of independence.
But that's for later.
Let's dive into the computer world and see the basics that Clojure and the JVM give us.
In one computer, the parallelism is the number of cores. Each core can execute instructions independently and run at full speed. The JVM does not give you direct access to these cores. You have to go through the operating system, which gives you processes and threads. These are both concurrency mechanisms so that different programs can share the limited number of cores.
The JVM lets you create native operating system threads. All of the threads share memory.
So let's look at this more closely:
As far as the JVM is concerned, you can have unlimited threads. And they're all able to read and write to the same memory. Obviously, all of those threads will need some help working together.
So let's give them some help!
Let's get into the concurrency tools we've got in Clojure.
The first one is kind of not that obvious because it's everywhere in Clojure. But it's one of those important things that makes all of the other concurrency stuff possible.
What is it?
The immutable data structures. Clojure uses immutable data by default. Why is this important? Let's imagine a bank where you could reuse checks. Let's write all the information in pencil. That could work, with a lot of discipline. Or you could make checks one-time-use (you create one, use it, then throw it away).
Or what if we wrote the amount of money in your account in chalk on a chalkboard. Whenever you deposit a check, we just erase the old number and write the new number. Sounds great, right?
No! It sounds terrible. Change banks immediately!
Or how about a bathroom shared by eight people with no lock! You can do it. Just be disciplined. Knock every time. What could go wrong? Start looking for a new place to live (or run to the hardware store to buy a lock).
It sounds ridiculous, but that's basically what we do all the time when we use mutable data structures. You can still write concurrent systems, but your job is harder and your success depends on discipline instead of easy rules like "wait your turn in line".
So let's all just take a moment and appreciate the calming and concurrent nature of immutable data structures.
Now we can move on to the next tool that really helps us.
The next thing that really helps with concurrency is using pure functions. Clojure doesn't help much with this, except that it makes it easy to make pure functions. But you don't get pure functions by default. And Clojure can't tell you whether a function is pure.
What is a pure function?
A pure function gives you the same return value for the same inputs all the time. You can call it whenever you want as many times as you want and you'll always get the same return value.
Wait! What things can make a function return different things?
What if the function reads a global mutable variable?
What if the function asks for user input?
What if the function fetches something from the network?
What if the function does different things depending on the time?
Or what if the function generates a different random number each time?
Basically, they can't depend on anything that changes. The time changes. The network changes. The user changes.
There's one more thing about pure functions you have to know about: not only can they not depend on changing stuff, pure functions can't change anything themselves.
Let's look at some examples of changing stuff:
Printing to the screen.
Posting to a web server.
Writing to disk.
Changing a global variable.
Sending an email.
And the classic: launching a missile. 🚀
Why are pure functions important?
Because they're easy to understand. They don't depend on the history of the program. They always act the same. They're like a rock solid friend--you can always trust them.
But there's another reason.
Many concurrency primitives will sometimes call your function multiple times. They need to. Why?
Let's look at an example.
Imagine you're working with a stock market agent. You've got some money and want to invest. The agent comes to you, tells you how much money you have and the curent stock price. You do some mental arithmetic and write down your order. The agent runs back to the market but there's so much competition, the price changes before the order gets put in. So the agent comes back.
And what do they ask?
The same question, but with different arguments. Here's your budget now, here's the price now. You plug them into the same mental function, and get a new order.
Sometimes it happens. Things change between when you calculate a thing and when the thing needs to happen. So you recalculate. But it's better to give that agent an order first in case it gets in and recalculate if it doesn't. That's called "optimistic".
What would happen if your function wasn't pure? It would get called many times and have more than one effect. It would print out something many times intead of one. Or it might launch two missles instead of one. Oops. Keep it pure, folks.
Okay, there's just one more idea before we get to the catalog of primitives. It's an important idea, but I understand if you want to skip ahead. In fact, skip around as much as you want!
We talked before about being able to break up tasks into smaller tasks so you can get more concurrency and take advantage of more cores. How small do things need to be?
The short answer is "as small as possible". Why?
Let's say we have twenty people who have to carry 100 rocks of different sizes up a hill. They've each got a backpack. How do you distribute the rocks among the twenty backpacks so you can carry them all? It's like a game of Tetris. You could spend all day trying to figure out how to fit them all optimally. Most of the time, you end up with some rocks that just won't fit. So you move them around a little, trying to find some extra space somewhere. But it's a lot of work and most of the time you can't fit them all in. This is a well-known problem called the Knapsack Problem.
What's the solution?
Break the rocks up. If you break them into gravel, you can literally pour them into the backpacks. And it's really easy. You don't have to think about it. But then you get to the end, and some still don't fit.
What's the solution?
Break them up even more! If you break them into sand, there's less empty space between them. And you can fill them up even more.
What does this have to do with concurrency?
It turns out that the same thing happens with tasks on your cores. If you have big tasks, you have to spend a lot of time figuring out which core would run which task to optimize the execution. But with tiny tasks, you don't think at all. You just pour them in wherever they fit. And more will fit! So break up your jobs into independent tasks and you will be able to run on more cores.
There's one caveat, though.
Sometimes you can break your task up into such small pieces, it's not worth the overhead of sending it to another core. For instance, adding two numbers is really fast on a computer. You might as well do it in the current thread instead of sending it to another machine. When you send it to another machine, you have to make a message, put it in a queue, and wait for the queue to process. It's not worth it! For many parallel execution examples, you'll see simple operations like addition used, just to make it easy to understand, even if you wouldn't really use that example in real life. However, in real-world calculations, you often have the opposite problem, which is that tasks are too big. And those could be broken up more.
So what does independent mean?
There are different types of independence. We've already looked at one type of independence. Pure functions are independent of time. It doesn't matter how many times you call them or when you call them. They always give the same answer. So that's one independence.
Well, another one we could look at is order independence. For instance, if I need to fetch 100 different websites, it doesn't really matter what order I do them in. It does matter when I do the requests, because I could make the request when the server is down, or the page changes over time.
But the order doesn't matter. The answer from server A does not depend on the answer from server B. I could do them in any order.
And that's really important.
Let's look at an example. What if you're shopping at a grocery store with ten checkout lines? If you finish shopping before another person, does that mean you're going to finish checking out before them? No! If you're anything like me, you tend to choose the wrong line. That person who finished after you gets lucky and you watch them finish before you even started.
Ugh. It sucks.
But let's look at it analytically: each checkout is independent of the order. You still get the same answer. Everyone got their food. Everyone got paid. You got the same result. But what wasn't independent? You can't checkout before you shop!
Order independence is really strong. You can't always guarantee it, but when you can, you have a lot of options for breaking up that work. In this case, there are ten checkout centers, each with their own queue. The queues can move independently.
Mathematicians call order independence commutativity. Here's an example:
a + b + c = b + a + c
That is, addition is commutative. Why does commutativity matter? Because you can't always guarantee the order. If you divide up tasks among four cores, you can't guarantee they'll finish in the same order. If you need order, you won't be able to divide the tasks up.
Another type of independence?
Grouping. Grouping is a little hard to explain, so I'll need a nice example. I hope I won't disappoint.
Let's say I want to build the longest horizontal lego tower in the known universe. So to get started, I lay out all the legos in the order I want them. The color pattern, that is, the order is important. However, if I keep the order of the pieces, does it matter which ones I connect first? Not really. I'll get the same tower. Mathematicians call this "grouping". It's like in this equation:
a + (b + c) = (a + b) + c
The grouping is the parentheses. The letters are in the same order, but the operations happen differently. It's a subtle but important difference. Mathematicians call this associativity.
Why is this important?
Well, if I'm really going to make the longest lego tower in the known universe, I need to get some help. After laying out the legos, I can get some friends to help. All I have to do is to roughly divide up sections of legos among my friends, we can all connect up our legos (maintaining order!), then connect up the sections when we're done.
Because I can group them however I want and get the same answer, it lets me be flexible dividing up the work. I know I'll be able to put the pieces back together and get the same answer.
There are more types of independence, like idempotence. Idempotence means that if something happens twice (or more), it's the same as it happening once. So, for instance, if you add the same value to a set twice, it's the same as it happening once. Why is this important?
Well, here's another example. You're probably familiar with it.
Have you ever been on a site where you buy something and after you click "Buy" it yells at you:
"Don't click the buy button twice! It will charge you twice!"
Wouldn't the site be better with less yelling and more idempotence? Yes. The "submit payment" operation should be idempotent so you don't have to yell at the nice people who are trying to give you money. If you accidentally click twice, it should only charge you once.
Wouldn't it be great if you could write some code that calculates a value, but never run the code if no one wants the value? And wouldn't it also be great if the value was only calculated once, no matter how many threads need the value?
There's a Clojure function called
delay. It creates an object called a Delay. A Delay ensures that some code is either run zero or one time. It's run zero times if the Delay is never
derefed. And it's run once if it is
derefed, regardless of how many times it's
Here's an example:
;; make a delay that does some calculation (def the-answer (delay (* 100 (long-calculation 45)))) ;; that long calculation has not been done at this point (when need-the-answer? ;; the thread will block until the answer is calculated (println (deref the-answer))) ;; if you don't need the answer, it was never run
Now, that's a nice way of avoiding work in a single thread if you're going to use the work some of the time but not all of the time. Delays are also useful for a shared resource between threads.
;; make a delay that initializes a shared resource ;; if we don't put it in a Delay, the resource will be initialized during compilation (def resource (delay (initialize-shared-resource))) ;; start 100 threads that use the resource (defn -main  (dotimes [x 100] (doto (Thread. (fn  (let [resource @resource] ;; use the resource here ... )) .start)))
The magic is that only one of those threads will initialize it. The rest of the threads will block waiting for it.
Thanks to Dan Lebrero for explaining how Delay is useful for concurrency.
Clojure has a function called
promise. A Promise
is like a coupon for a back rub that you get from your significant
other. It's not a present itself, but instead a Promise to give you
a present later.
promise will make a coupon for a value you promise to
deliver some time in the future. Usually you are going to calculate
that value in another thread.
Here's how you would use it.
;; make a Promise (def the-answer (promise)) ;; start working on the promise in a new thread (doto (Thread. (fn  ;;; do a lot of work ;;; ... ;;; then deliver on your promise (deliver the-answer 42))) .start) ;; in the original thread, do your own work ;; ... ;; then get the answer ;; (this will block if it's not there) (println (deref the-answer))
Promises are a simple way to communicate between threads. One Thread calculates the answer. The other waits for it. (We'll look at how to create Threads later.)
You can create as many Promises as you want. But each one can only get one answer. Once the answer is delivered, it can't be delivered again.
A Future is similar to a Promise. The main difference is that Futures
will evaluate an expression for you in another Thread. You don't
have to set the thread up. Here's how the same thing works using
(def the-answer (future ;; do a lot of work in a new thread ;; ... ;; then deliver the answer ;; the value of the last expression is delivered 42)) ;; in the main thread, do some work ;; ... ;; then get the answer (which blocks until the answer is done) (println (deref the-answer))
One thing that trips up people new to Futures is that they swallow exceptions. If the code you run in the Future throws an exception, you won't hear about it until you
deref it. When you
deref it, the exception will be thrown again in the current thread.
Let's look at some code that demonstrates this:
;; the Exception gets thrown but stored in the Future (def f (future (throw (Exception. "Hello from the future!")))) (deref f) ;; this will throw the Exception
couple of times now. And we're going to see it a bunch more times. So
it's worth spending a minute or two more on it.
It stands for dereference. You see, both Promises and Futures are
types of references. They're not the values themselves. They're
pointers to the values. They're like boxes where the answer will be
when the calculation is finished. You call
deref on it and it gets
whatever is in the box. We can also abbreviate
deref by prepending a
@ to the reference, like this:
@reference ;; same as (deref reference)
Promises and Futures are references that might not be complete yet. In
Clojure speak, when a promise has been delivered, we say it is
realized. There's even a function called
realized? to check if it
has an answer. You can check if there's something in the box before
you block getting the answer.
Whenever you block, it's also important sometimes to limit the amount
of time you wait. Like in real life, you may want to say "if I don't
hear from you by noon, I'm going to lunch by myself". You can use a
deref that has a timeout in it.
;; wait four seconds (4,000 ms) ;; if we don't have an answer by then, deref will return :cheeseburger (deref the-answer 4000 :cheeseburger)
Only blocking variants of references can use the timeout version. For instance, Atoms, which we'll see real soon (I promise) always have a value. So there's no need to wait, ever.
Ok, let me deliver on that promise. 😉
Atoms are maybe the most popular concurrency primitive in Clojure right now. Don't quote me on that, because I don't have any data. But I find that I use them quite a lot more than other things.
Why do I use them so much?
Because they capture the essence of sharing a single piece of information.
Let's say I'm working with nine other people. We are collecting donations from people and sharing out how much we've collected. To work together, we keep a running sum on a chalkboard. John's job is to tweet out every five minutes how much money we have. He glances over at the chalkboard, sees the number "42", and tweets out this:
"We have collected a total of $42!!"
Except that's wrong. Jane was in the middle of writing out the number 42,332. She had to glance down to remember the last three digits, just when John was checking the board.
It sounds dumb when people do it, but this is how computers work. They're dumb. If you say "check the board and tweet out what you see", that's what you get.
But Atoms are a solution.
What's the real problem? The deep root of the problem is that you can write partial answers on the board. If you could write the whole number at once, this never would have happened. Atoms ensure that you can only write consistent values.
An Atom holds a current value. That value has to be valid. You then send it a pure function which calculates the next value. The Atom guarantees that anybody derefing the Atom always gets either the old value or the new value, and never anything in between.
Here's another problem. It's one you're probably familiar with:
Let's say Jane and Jim both collect $10 at about the same time. They both run back to the chalkboard and see this value:
Great! They both pull out their calculators, type in the number, then add 10. Jane finishes first, erases the chalkboard, and write $70,410. She runs back out to get more donations. Then Jim erases the board. And writes $70,410. The same number!
They both were following a correct algorithm. Get the number, add your additions, then write it on the board. But when two people are involved, they need to coordinate a little better. Again, this is dumb when people are doing it, but it's what happens when threads share memory without coordination.
Let's see an example of using Atoms to prevent this problem.
;; create an Atom with initally no money (def donation-count (atom 0)) ;; start 9 people collecting money (9 threads) (dotimes [_ 9] (doto (Thread. (fn  ;; wait three seconds (Thread/sleep 3000) ;; go collect $1 (swap! donation-count inc) ;; do it again (recur))) .start)) ;; start one person tweeting (doto (Thread. (fn  ;; wait 100 seconds (Thread/sleep 100000) (tweet (str "We collected $" @donation-count " total!")))) .start)
There's our friend
deref again. An Atom is a reference to a value,
and derefing it gives you the value. But what's
swap! is the function for modifying the current value of the
Atom. Let's look at the arguments:
(swap! donation-count ;; the Atom inc ;; the function )
swap! takes the current value of the Atom, calls the function on it
(in this case,
inc), and sets the new value. However, just before
setting the new value, it checks to make sure the old value is still
in there. If it's different, it starts over. It calls the function
again with the new value it found. It keeps doing this until it
finally writes the value. Because it can check the value and write it
in one go, it's an atomic operation, hence the name.
That means the function can get called multiple times. That means it needs to be a pure function. Another thing is that you can't control the order of the function calls. If multiple threads are swapping to an Atom at the same time, order is out of the window. So make sure your functions are independent of order, like we talked about before.
For instance, incrementing (
inc) is independent of order. After all
incs are in, you'll have the same answer, regardless of order.
So, with all of those requirements, what benefits do you get from an Atom?
Let me explain.
Atoms give you a very important guarantee: you can look at the current
value of the Atom (with
deref) at any time and know that, at that
time, the value was current. It was valid. It might not be current
forever, but it was a valid answer. For instance, in our donations
example, the value in the Atom is always the sum of some subset of
donations. Any time you check the Atom, it was a valid count of money.
Let's look a bit more at the
swap! function. We were calling
which adds one. What if you want to add more than just $1 each time?
(swap! donation-count (fn [x] (+ x 10)))
Okay, see, we can add 10, or whatever number you want. But this is
such a common case that there's a shortcut. See, you're calling
with the current value of the Atom (here called
x) and a second
10). Here's the same thing but using the shortcut:
(swap! donation-count + 10)
This shortcut trips people up. The arguments are in a funny order, but we can take it step by step:
(swap! donation-count ;; the Atom + ;; the function ;; ;; current value of Atom goes here! 10 ;; the second argument ;; ;; the third argument ;; ;; the fourth argument, etc )
I've used semicolons up there as placeholders for arguments. But we can see what's going on there: the current value of the Atom gets put as the first argument.
Okay, I'm going to wipe the sweat off my brow. Because next up we're looking at the biggest, most powerful reference type in Clojure.
Listen, back in 2008/2009, everybody was talking about Refs. Why? Because they were Software Transactional Memory (STM), which was so hot in Clojure and one of the things that made it special. After a few years, it turns out that Atoms, which are much simpler, are good enough for most purposes. But the STM is still in there and people do use it.
When should you use it?
If we look at an Atom, it only holds one value. That value can be complex (a nested collection, for instance), but it's just one value. If you have two Atoms, there's no way to make sure they stay in relationship. For example, you can't make one Atom always be twice the value of another. Why? Because some thread could read the Atoms after you change one but before you change the other.
Refs solve this problem. You can read from and modify multiple Refs inside of a transaction. Observers on the outside of the transaction cannot see the intermediate values. Nice!
Basically, Refs let you read and write to two chalkboards without any time passing between the first chalkboard and the second.
Let's look at some code:
(def total-donations (ref 0)) (def count-donations (ref 0)) ;; start 9 people collecting money (dotimes [_ 9] (doto (Thread. (fn  ;; go collect $10 ;; ... (dosync ;; record $10 (alter total-donations + 10) ;; record one donation (alter count-donations inc)) ;; do it again (recur))) .start)) ;; start one person tweeting the total (doto (Thread. (fn  ;; wait 100 seconds (Thread/sleep 100000) (tweet (str "We collected $" @total-donations " total!")))) .start) ;; start one person tweeting the average (doto (Thread. (fn  ;; wait 100 seconds (Thread/sleep 100000) (when (pos? @count-donations) (tweet (str "Average donation: $" (double (dosync (/ @total-donations @count-donations)))))))) .start)
dosync means "do
synchronized". It means that everything in there is within a
transaction. Transactions give you some guarantees: if any funny
business happens in that transaction (an exception is thrown, for
example), the transaction will be aborted like it never
happened. Inside of that transaction, you have a consistent view of
all of the Refs you dereference. Any changes you make are visible to
you from the inside, but not to others outside of the transaction,
until the transaction completes. It's like you're in a little time
As a cost for these guarantees, you have to obey a few rules. First, no side effects in a transaction. The transaction can be run multiple times. Also notice that like Atoms, you can't guarantee the order.
Also notice that instead of using
swap!, you use
alter. But it's very
similar. You can even do the extra arguments thing like we do above.
We need to talk about Vars a little. But before I do, let me say this: you almost never use Vars directly as a concurrency primitive. So I'm not going to go deep into them. But they're important. We have to talk about them.
Whenever you define a variable using
defn, you create a
Var. They're references like Refs and Atoms. That means they're
mutable. You can change the value of a Var. And we use that all the
time while we're doing interactive development. We define a
function, we realize it's not quite right, so we redefine it. That
redefinition changes what's called the root value of the Var.
In addition to the root value, Vars can have a different value per-thread. This lets different threads use different dynamic scopes with the same Var.
Vars are one of those things that recede into the background. You rarely use them explicitly. They are there only to support interactive development and dynamic scope.
For some reason, whenever you mention Agents, people think of Actors, which are found in Erlang. They're definitely not the same. An Actor receives messages, does some work based on which message they receive, then listens for more messages. Agents, on the other hand, hold state like an Atom or a Ref.
It's best to compare Agents to Atoms. Like Atoms, Agents are
uncoordinated. You can't modify two Agents with any kind of
guarantees. The difference is all about which thread does the
work. When you
swap! an Atom, the processing happens in the current
thread. The thread keeps retrying the computation until it
successfully saves to the Atom (or throws an exception). But
everything you do to an Atom happens in the current thread.
send on an
Agent, on the other hand, runs the computation on another
thread. The call to
send returns immediately after adding a job to
a work queue that will be processed by a thread pool. So it's like
an Atom, but stuff happens on another thread.
Let's look at the interface to Agent.
send is the Agent equivalent
alter. Like I said,
send will process the
computation in a thread pool. You use thread pools for quick tasks,
like adding or string concatenation. If your task will take a long
time--a long computation or I/O--you could quickly overwhelm all of
the threads in the pool and keep them too busy to take on more
work. If your function does do lots of work, Clojure gives you a
runs each task in its own thread. Use that if you're doing I/O or a
lot of computation.
Let's write a simple summing function that stores the answer in an Agent.
(def sum (agent 0)) ;; create an agent initialized to 0 (def numbers [0 9 3 4 5 5 4 44 4 2 5 6 7 775 ...]) (doseq [x numbers] (send sum + x)) ;; add x to the current value (await sum) ;; wait until all sent actions are done (println @sum) ;; should have the answer
It might be easy to think that this happens in parallel. Even though it's happening on multiple threads, it's not parallel. Each Agent has its own queue of tasks, and they are done in the order they are received, one at a time. So it's not parallel. If you want parallelism with Agents, you have to have many Agents.
How can we make this sum parallel? Easy. Just make multiple Agents in a loop.
;; make 10 agents initialized to zero (def sums (map agent (repeat 10 0))) (def numbers (range 1000000)) ;; one million numbers ;; loop through all numbers and round-robin the agents (doseq [[x agent] (map vector numbers (cycle sums))] (send agent + x)) ;; wait at most 10 seconds (apply await-for 10000 sums) ;; sum up the answers in all ten agents (println (apply + (map deref sums)))
Of course, summing is just an example. It's probably not worth queuing up a task that just adds (addition is faster than queuing). But this shows how to do any kind of work in parallel.
In this example, all agents get the same number of tasks. What happens if some tasks take longer than others? That means that some Agents will be idle while others are still working. How can you prevent that?
The answer, predictably, is to add another level of indirection.
Instead of round-robin, we should add our numbers to a queue that the agents pull from when they're ready for more work.
;; make 10 agents initialized to zero (def sums (map agent (repeat 10 0))) (def numbers (agent (range 1000000))) ;; one million numbers in an agent (defn dequeue-and-add [sum-agent] (letfn [(add [current-sum x] ;; do the addition (let [new-sum (+ current-sum x)] ;; when we're done, schedule the next dequeue (send numbers dequeue) ;; return the new value of the Agent new-sum)) (dequeue [xs] ;; check if there's more to do (when (not (empty? xs)) ;; send the first number to the Agent (send sum-agent add (first xs))) ;; return the other numbers for other Agents (rest xs))] (send numbers dequeue))) ;; start all 10 Agents working (doseq [sum-agent sums] (dequeue-and-add sum-agent)) ;; wait for all the numbers to be cleared from the queue (loop  (when (seq @numbers) (Thread/sleep 1000) (recur))) ;; sum up the answers in all ten agents (println (apply + (map deref sums)))
We're using a hand off pattern. In
dequeue takes a
number from the
numbers Agent and
add adds it to the summing
We can't use
this one because the Agents use a hand off pattern. There will be
times when there are no tasks queued on an Agent because it is waiting
for another task to finish on another Agent. We have to be clever and
wait for the queue to process. You should probably add a timeout to
that, in case the queue never does empty.
What about errors?
Well, that's really interesting. If your task throws an exception, it
gets stored inside the Agent (not as the Agent's state). Once there's
an exception, no more tasks will be processed. You can check if an
Agent has failed by calling
it. It will return the exception (or
nil if there isn't one). Then
you can clear it with
can have the Agent automatically handle its own errors with
pass it a function of two arguments (the agent and the exception) and
you can handle the error.
Agents are really flexible, but as you can see, the code can get kind
of complicated. For doing stuff in parallel, I'd probably use other
options, like reducers (for pure functions) and
Agents are nice for things where you have a value that needs to be accumulated to over time and you want to do a calculation on it in another thread. They can also do side effects in the function you send them, because they're queued up instead of competing.
Locks are the traditional, old-school way of coordinating access to resources. Like a lock on your bathroom, software locks make sure only one person is using that resource (the bathroom) at the same time. We call that mutual exclusion: if I'm in the bathroom, you can't be in here. I exclude you and you exclude me. While a thread has a lock, it can act like it's the only thread that has that resource. It's a very low-level way of coordinating, but sometimes that's exactly what you want.
All JVM objects have a lock built in. You usually don't notice, but
it's how Java implements the
synchronized keyword. So all you need
to create a lock is to create an
Let's solve an actual problem using locks. If you have many threads all printing to the console at the same time, very often you'll see that the lines are mixed up. Two threads that print at exactly the same time will send their characters at the same time, and the line is just messed up.
(defn log [& args] (apply println args)) ;; Thread 1 (log "INFO 2017-4-29: Starting database connection.") ;; Thread 2 (log "WARNING 2017-4-29: Cannot find configuration file, using defaults.")
Instead of the following, which is what you want:
INFO 2017-4-29: Starting database connection. WARNING 2017-4-29: Cannot find configuration file, using defaults.
You get something like this:
INFO 20WARN17-4-29: StartingING 2017-4-29: Cannot find configuration file, database connection.
The characters from the two
printlns got mixed up. They were being
sent to standard out at the same time from two different Threads.
What's the solution?
Lock an object so that only one Thread can be in some code at the same time.
;; construct an Object just for its lock (def log-lock (Object.)) (defn log [& args] (locking log-lock (apply println args)))
Now you can call
log in many different Threads. The lines will
always come out okay.
Like I said,
locking is a low-level tool. I don't know if I've ever
used it in production. But if you have some resource shared by
multiple Threads, this is a simple way to let them share it safely.
If you've written Clojure, you've probably heard of core.async. core.async is a great library for doing parallel processing. The reason is simple: it's a very lightweight way to break down tasks and communicate between the tasks.
You create a task by using the
go macro. All of
the code inside of a
go block will be executed in a
Process. Processes are lighter weight than Threads and so are better
for breaking stuff up even more than you normally would with Threads.
Processes can communicate using lightweight queues called Channels. The patterns are endless, but just as an example, you could create one Process that puts values onto a Channel and another Process that consumes them. Channels ensure that Processes wait for each other and that each value on the Channel is delivered only once.
Let's look at some code:
(require [clojure.core.async :as async]) ;; create a channel with a buffer of up to 100 values (def number-chan (async/chan 100)) ;; Atom where we keep the sum (def sum (atom 0)) ;; start 100 go processes taking numbers from number-chan ;; and adding them to the sum Atom (dotimes [_ 100] (go (loop  (let [number (async/<! number-chan)] (swap! sum + number)) (recur)))) ;; create a go process that adds the numbers 0-1 million ;; to the channel (go (doseq [x (range 1000000)] (async/>! number-chan x)))
You take from a Channel with the function
<! and put to a
>!. If you try to
take and there is no value, your Process will "park" and wait for a
value. If you put and there is no room for new values, your Process
will "park" and wait for room on the Channel. Because things wait, it
means you can coordinate. You can say "Let's each go do some work and
meet back here when we're done."
This is just a very short introduction to a very big topic. You can learn more about core.async from these resources:
- Clojure core.async: a presentation by Rich Hickey at Strange Loop.
- Clojure core.async: my course on the topic.
- core.async Patterns: an advanced course where you learn interesting patterns for using core.async.
- Mastering Concurrent Processes with core.async: The excellent chapter from Clojure for the Brave and True.
You can create Threads in Java very easily by constructing the
java.lang.Thread class. Java has the concept of a
which is an interface with a method called
run() that returns
nothing. If you want to run something in another thread, you pass a
Runnable to the constructor of a
Thread, then start it. Luckily,
Clojure has thought of this. You can use a function of zero arguments
directly as a
;; create a Thread (def thread (Thread. (fn  ;; 0 arguments ;; this will run in a new Thread (println 1 2 3)))) ;; the thread won't run until you start it (.start thread)
Threads are easy to use in Java, but there are some things to keep in mind. First of all, Threads cannot be stopped from the outside. The only way to stop a Thread is for the function you pass it to reach the end of execution. That means if you create an infinite loop, that Thread will run forever. Just be aware!
So how do you stop a JVM Thread? You have to code it to watch for a signal. Maybe you set up a Promise that it checks. When the Promise is delivered, the Thread will stop executing.
Which brings us to the second point, there is no built-in way to communicate with a Thread. The Threads share memory, so they have access to all of the objects in scope. So you can use something like a Promise or a core.async channel.
Finally, there's no way to communicate out of the Thread. The return value of your function is discarded. If you do need to get a value out, you can put the value in a Promise, or otherwise store it in a Ref or Atom. For heavy-duty communication between Threads, consider core.async.
Wait. There's one more thing:
The JVM won't exit until all Threads are finished. When you're running
Threads and you want to shut down the JVM, you have to call
System.exit(). In Clojure, that looks like this:
(System/exit 0) ;; 0 means success, it's the Unix exit code
- Defining and Starting a Thread: A tutorial from Oracle.
- Java Threads: A lesson in my JVM course about starting and stopping threads.
The Java standard library contains something called an
ExecutorService. If you have a bunch of similar tasks that need to
be run in parallel,
ExecutorService is your friend.
What does it do?
You set up a thread pool and a queue feeding that thread pool. Those
make up your
ExecutorService. You hand the
"tasks". The tasks get queued up and pulled off by the threads in the
pool, executing them in parallel.
Tasks are just instances of
Callable. Since in Clojure functions are
Callable, it's super easy to just pass it functions of
Let's see some code:
(import 'java.util.concurrent.ExecutorService) (import 'java.util.concurrent.Executors) ;; create a thread pool with 4 threads (def service (Executors/newFixedThreadPool 4)) ;; submit a task and save the Future (def f (.submit ^ExecutorService service ;; we need to hint to tell it ^Callable (fn  ;; to use the Callable version ;; do some work ...))) ;; block on the Future (println @f)
Clojure Futures are also run in a similar way. If you need custom
control of the thread pool, you can do it yourself with an
Another system built into the Java standard library is called Fork/Join. Fork/Join is a way to break up a job into tiny tasks that get distributed to all of your cores. Fork/Join then reassembles the pieces into an answer.
Let's write our own summation system again.
(import 'java.util.concurrent.RecursiveTask) (import 'java.util.concurrent.ForkJoinPool) (def pool (ForkJoinPool.)) (defn summation [numbers] (proxy [RecursiveTask]  (compute  (if (<= (count numbers) 512) ;; if the vector is small enough, ;; we just reduce over them (reduce + 0 numbers) ;; otherwise, we split the vector roughly in two ;; and recusively run two more tasks (let [half (quot (count numbers) 2) f1 (summation (subvec numbers 0 half)) f2 (summation (subvec numbers half))] ;; do half the work in a new thread (.fork f2) ;; do the other half in this thread and combine (+ (.compute f1) (.join f2))))))) (defn sum [numbers] (.invoke pool (summation (vec numbers)))) (def answer (sum (range 1000000)))
If we sum a great big list of numbers, it will get split into two
halves until the list is 512 items or less. When we split, we
.fork on one half, which will queue up the task for
another thread. Meanwhile, since we have the thread, we can continue
computing the first half.
.join will block on the second half that
is executing in another thread.
I know this isn't the best way to sum numbers, since addition doesn't really care about order. But it is a good way to split up work done on an associative operation. You will need to tune the size of the work done in one thread to make it worth it to fork. In general, you probably will want to use Clojure reducers for this kind of work. See below.
The interesting thing about Manifold is that it provides another level of indirection which captures the essence of Futures, Promises, core.async Channels, and even RxJava and ReactiveStreams.
If you're going to be doing a lot of asynchronous communication between your threads, Manifold could be for you.
I had to include
just for completeness. It's a parallel implementation of
clojure.core/map. However, you should note that it's very naïve. It
is still lazy, like regular
map, so it won't begin executing until a
value is needed. It keeps just ahead. However, the function you pass
it does get run in other Threads. If it is computationally intensive,
you may want to try
pmap. Add one more letter and it makes your code
Clojure reducers is a great library for executing things in parallel. Under the hood, it uses Fork/Join. It also is custom-tuned to work with all of Clojure's built-in data structures.
To execute things in parallel with reducers, use
clojure.core.reducers/fold. It is like
reduce except it has two
functions: one for reducing and one for combining. The collection you
pass it will be broken into chunks. Each chunk will be reduced with
the reducing function. The results are combined using the combining
(require '[clojure.core.reducers :as r]) (def numbers (vec (range 1000000))) ;; sum numbers in parallel ;; when combining function and reducing function are the same, ;; you can use this arity (r/fold + numbers) ;; sum only even numbers ;; r/filter does not create intermediate lists (r/fold + (r/filter even? numbers))
The reducers library comes with Clojure. The main benefit is that
fold can run using Fork/Join, and so efficiently use many cores. In
addition, the standard sequence operations like
cat (concatenate) do not create intermediate
sequences as they do with the
clojure.core versions. They are
specifically built to work in parallel using Fork/Join.