Background

Parallel is a library for spawning processes on a cluster of machines, and passing typed messages between them. The aim is to make using another processes as easy as possible. Parallel was built to take advantage of multicore computers in OCaml, which can't use threads for parallelism due to it's non reentrant runtime.

Introduction

Parallel is built on top of Async, an OCaml library that provides cooperative concurrency. So what do we want an async interface to parallel computations to look like? Well since a Deferred already captures the concept of some action starting now and returning a future result, we just want a function to run that action in parallel, like this,

  1. val run : ?where:[`Local | `On of string | `F of (unit
  2. -> string)] -> thunk:(unit -> 'a Deferred.t)
  3. -> 'a Deferred.t

So what exactly does run do,

  • run creates a new process on the machine specified by [where]
  • run starts [thunk] in that process
  • run waits for [thunk] to finish and returns it's result to the caller
  • [thunk] may also call run if it wants to

The above function is actually ALL we need, we can build anything we want on top of it. We could, for example, pass a closure that will return Deferred.never, and start up two way communication with that new process. For example via an address we agree on before it is sent to the other process. In practice parallel provides a few more things.

  1. val spawn : ?where:[`Local | `On of string | `F of (unit
  2. -> string)] -> (('a, 'b) Hub.t -> 'c Deferred.t)
  3. -> (('a, 'b) Channel.t * ('c, string) Result.t Deferred.t) Deferred.t

Spawn is run's more featureful cousin. The closure is started up on another process, and passed a typed Hub, and the caller is given the closure's result, and a typed Channel that is connected to that Hub. A Channel is a connection to a Hub. A Hub may have many channels connected to it at any given time. A very important feature of Channels is that they are mobile, they can be passed between processes, and they remain connected to their Hub. They can either be passed by lexical capture prior to spawn or run, or they can be sent over a Channel. Lets see a small example,

Ping Pong

  1. open Core.Std
  2. open Async.Std
  3. open Parallel.Std
  4.  
  5. let worker h =
  6. Pipe.iter_without_pushback (Hub.listen_simple h) ~f:(fun (id, \`Ping) ->
  7. Hub.send h id \`Pong;
  8. > > | fun () -> \`Done
  9.  
  10. let main () =
  11. Parallel.spawn ~where:Parallel.random worker >>> fun (c, _res) ->
  12. let rec loop () =
  13. Channel.write c \`Ping;
  14. Channel.read c >>> fun \`Pong ->
  15. Clock.after (sec 1.) >>> loop
  16. in
  17. loop ();
  18. Clock.after (sec 60.) >>> fun () -> Shutdown.shutdown 0
  19.  
  20. let () =
  21. Parallel.init ~cluster:
  22. {Cluster.master_machine = Unix.gethostname ();
  23. worker_machines = ["host0"; "host1"]} ();
  24. main ();
  25. never_returns (Scheduler.go ())

Lets take this program apart. The function worker, is the worker process, which will listen to it's hub for Ping messages (id is the
name of the client that sent the message) and respond to the sender with a Pong message. Meanwhile, the main function will start up one process using spawn and have it run the worker function. It will then write a Ping message to that process every second, and read the returned Pong message. After 60 seconds the main function will call shutdown. The toplevel action at the bottom first calls Parallel.init, and defines the cluster of three machines (the master, host0 and host1). Notice that main's Parallel.spawn is given a ~where argument that will randomly pick one of the machines for the worker to run on. Then starts main, and then the async scheduler.

The most important thing to say about this program is that to the compiler it looks just like any other program. So the type checker will check the types, and as a result it will enforce the protocol, to the extent possible. The other is that when main dies, after the shutdown call, the framework will ensure that all the worker processes (on all machines) are killed as well.

Implementation Notes and Gotchas

There are three kinds of processes involved in a program the uses Parallel:

  • the main process
  • the master process
  • worker processes

Parallel dynamically creates a worker process to service each call to [run].

The OS process tree looks like:

| main
|    master
|      worker1
|      ...
|      workerN

As far as the OS is concerned, all workers are children of the master. However, from the perspective of Parallel, the topology is more structured. Each worker process is created on behalf of its "owner" process, which is either the main process or another worker process. One can think of the main and worker processes as arranged in a tree different than the OS tree, in which there is an edge from each process to its owner (the main process has no owner).

Parallel uses OCaml's [Marshal] library to serialize OCaml values to and from strings so that they can be sent over unix sockets between processes. For example, the [f] supplied to [run] is marshalled and sent from the process that calls [run] to the worker process that will run [f]. Most, but not all values can be marshaled. Examples of values that can't be marshaled include C allocated abstract tagged values, and custom blocks with no serilize/deserialize method.

The main process and all worker processes have a socket connected to the master process. The master process's sole job is to service requests that are sent to these sockets, which can ask it to create a new worker process. As the master process receives requests, it does what each request asks, and then sends a response back via the socket to the client that made the request.

Each worker process has a socket connected to its owner process. This socket initially receives the [f] that the worker is to run, and is ultimately used to send the result back from the worker to the owner.

Here are the steps involved in implementing [run f]. There are three processes involved.

  • R = the process calling [run]
  • M = the master process
  • W = the worker process running the task

The steps are:

  1. R asks M to create W
  2. M forks W
  3. M tells R about W
  4. R sends [f] to W to run
  5. W runs [f]
  6. W sends the result of [f] to R
  7. M notices W has exited, and cleans up

When there are multiple machines in a cluster, each machine has a master process, and all the workers know about all master processes. When a worker wants to run on machine M, it looks up the address of that machine's master process in its table before performing step 1, everything after that is exactly the same as the example.

Channel Passing

When a channel is passed from one process to another, the open socket is not actually passed. The API makes this pretty transparant, any api call will reconnect the channel, but it is useful to be aware of what is really going on, as if you aren't aware you may create a race condition. For example, if I spawn a worker connected to a hub I have, and then I immediately send something, it may or may not arrive, because the worker may not have time to connect and recieve it. A better strategy is to wait for the worker to say hello, and then send the data.

Channel passing also means that though you created only one channel from a given hub, you can end up with as many connections (client ids) as workers who got hold of that channel. You can address them all individually, or you can always use send_to_all if you really want to model a hub as a kind of shared bus.

Stdout and Stderr

stdout and stderr will be forwarded back to the master machine. This can cause some interleaving if you print a lot of messages, but generally works reasonably well. So printf debugging can work normally, even in a parallel program spanning multiple machines.

Some things to avoid marshaling

Monitor.t, Pcre.regexp, Writer.t, Reader.t, and similar kinds of objects shouldn't be depended upon to marshal correctly. Pcre.regexp is a custom block, and doesn't implement marshal/unmarshal, so it won't work. Monitor.t, Writer.t, and Reader.t, because of their complex nature, generally tow the entire async scheduler along with them, and because of that they will fail if any job on the scheduler queue has a custom object (e.g. regexp, or other C object) that can't be marshaled. You also can't marshal functions you've dynamically loaded (e.g. with ocaml plugin, though I hear this will be fixed soonish).

Processes don't share memory!

The library can make it look easy to create and use a process on some other machine maybe halfway around the world, but even still it is another process. All the normal boundries associated with that apply, so you can't expect global variables you set in one worker process to effect another. For a large parallel program that is a good thing.

Shared things

Because of the way parallel works, with the master process an image of a very early state of one's program and workers forked from the master, it is usually not possible to share big static things in the way one might do in C using fork. Moreover, it isn't necessarially a win as you might think, if you know about how unix only copies pages on write when a process forks, you know that it should be a win. But the garbage collector ruins that completely, because as it scans it will write to EVERY page, causing a copy on write fault to copy the page, so you'll end up with a non shared copy of that big static thing in every process anyway. The best you can probably do is have one process own it and expose it with a query interface. Moreover, if you're running on multiple machines that IS the best you can do, so you may as well get used to it.

Why Not Just Fork!?

The unix savvy amoung you may ask, what the heck are you doing with master processes and closure passing, just fork! Oh how that would make life easier, but alas, it really isn't possible. Why? You can't write async without threads, because the Unix API doesn't provide an asynchronous system call for every operation, meaning if you need to do something that might block, you must do it in a thread. And the list of stuff that might block is long and crippling. Want to read from a file without blocking out for SECONDS? Sorry! Not without a thread you don't. But once you've started a thread, all bets are off if you fork. POSIX actually doesn't even say anything about what happens to threads in a process that forks (besides saying they don't think its a good idea to do so). In every sane OS, only the forking thread continues in the child, all the other threads are dead. OK, fine you say, let them die. But their mutexes, semephores and condition variables are in whatever state they were in the moment you forked, that is to say, any state at all. Unfortunatly this means that having created a thread that does anything meaningful (e.g. calls into libc), if you fork, all bets are off as to what happens in that child process. A dead thread may, for example, hold the lock around the C heap, which would mean that any call into libc would deadlock trying to allocate memory (oops), that'd ruin your day. Trust me, if parallel could work in some simpler way we'd adopt it quickly!

Say I Want To Have a Giant Shared Matrix

The parallelism model implemented is strictly message passing, shared memory isn't implemented, but there are various hacks you could use to make this work (e.g. implement it yourself). Bigarray already allows mmaping files, so in theory even a cluster of machines could all mmap a giant file and use/mutate it.

Why Can't I Use Async Before Parallel.init?

By default Parallel.init does a check that you haven't created any threads, and that you haven't made any use of async. The threads check is mandatory, but the async check can be turned off by setting [fail_if_async_has_been_initialized] to false. Why is this check the default? Well in general you can't initialize async libraries before calling Parallel.init and expect them to work in the child process. The reason is that the async scheduler is thrown away in the child process before calling Scheduler.go. This is out of necessity, there is no way we can know what state the scheduler is in at the moment we fork, and it would be quite unfortunate if it were in a bad state, or worse, there are jobs on the queue that get run in all workers as soon as they call Scheduler.go. But as a result of this, any asyncy thing you create before Parallel.init won't work in worker processes. For example, say you initialize the log module before Parallel.init expecting to use it in the workers. It won't work, since all of its state (loops, writers, etc) is invalid in the worker processes. The check exists to make sure people are aware of this, and to make sure it only happens if they really know it's ok.

What CWD Will Worker Machine Processes Have?

If the CWD of the master exists on the worker machine, and you have permission to enter it, then parallel will switch to that directory before starting the master process, otherwise it will chdir to /.