Previous Contents Next

Chapter 2   Distributed programming

This chapter presents the distributed and mobile features of the join-calculus. The join-calculus language is specifically designed to provide a simple and well-defined model of distributed programming. In the previous chapter, we described the core concurrent language, assuming programs were running on a single machine. However, this needs not be the case. Indeed, the join-calculus is entirely based on asynchronous message-passing, which is the basic operation of most distributed systems.

In this chapter, we describe support for execution on several machines and new primitives to control locality, migration, and failure. To this end, we interleave a description of the model with a series of examples that illustrate the use of these primitives.

2.1   The Distributed Model

The execution of join-calculus programs can be distributed among numerous heterogeneous machines; new machines may join or quit the computation. At any time, every process or expression is running on a given machine. However, they may migrate from one machine to another, under the control of the language. In this implementation, the runtime support consists of several Unix processes that communicate using TCP/IP over the network.

In the join-calculus, the execution of a process or an expression does not usually depend on its localization. Indeed, it is equivalent to run processes P and Q on two different machines, or to run process P | Q on a single machine. In particular, the scope for defined names and values is independent of localization: when a port name is known of some process, it can be used to form messages (either as destination or as content) without knowing whether it is locally- or remotely-defined. So far, locality is transparent, and programs can be written independently of where their chunks will actually be executed.

Of course, locality matters in some circumstances: side-effects such as printing values on the local terminal depend on the current machine; besides, efficiency can be dramatically affected because message-sending over the network takes much longer than local calls; finally, the termination of some underlying runtime will affect all its local processes. For all these reasons, locality is explicitly controlled within the join-calculus; it can be adjusted using migration. In contrast, resources such as definitions and processes are never silently relocated by the system.

The name-server

Because the join-calculus has lexical scoping, programs being executed on different machines do not initially share any port name; therefore, they would normally not be able to interact with one another. To bootstrap a distributed computation, it is necessary to exchange a few names, and this is achieved using a built-in library called the name server. Once this is done, these first names can be used to communicate some more names and to build more complex communication patterns.

The interface of the name server mostly consists of two functions to register and look up arbitrary values in a ``global table'' indexed by plain strings (see 5.8 for reference).

For instance, the following program contains two processes running in parallel. One of them locally defines some resource (a function f that squares integers) and registers it under the string ``square''. The other process is not within the scope of f; it looks up for the value registered under the same string, locally binds it to sqr, then uses it to print something.

# spawn{ let f(x) = reply x*x in ns.register("square",f); } 
# spawn{ let sqr = ns.lookup("square") in print(sqr(2)); } 
-> 4
Of course, this makes sense only when the two processes are running as part of stand-alone programs on different machines, and this assumes that these processes share the same strings and conventions.

Running several programs in concert

The runtimes that participate to a distributed computation are launched as Unix commands, e.g. bytecode executables generated by the compiler. Each runtime is given its own join-calculus program, along with options that describe its distributed behavior.

By default, a runtime is in local mode and makes no attempt to communicate with peers. Otherwise, it can run either as a name server (flag -s) or as the client of another name server (flag -c). In both cases, the name server is specified by its IP address. When lookup or register is used for the first time in a client runtime, this runtime attempts to communicate with its server; later on, runtimes directly communicate with one another on demand, and there is no further difference between client and server runtimes.

The following example illustrates this with two machines. Let us assume that we rely on a single machine ``here.inria.fr'' for computing squares of integers; on this machine, we define a square function that also prints something when it is called (so that we can keep track of what is happening), and we register this function with key ``square'':

# let f(x) = 
#   print_string("["^ml.string_of_int(x)^"] ");
#   reply x*x
# 
# do ns.register("square",f)
On machine here.inria.fr, we compile the previous program (p.j) and we execute it in server mode:
here> jcc p.j -o p.out
here> ./p.out -s
We also write a program that relies on the previous machine to compute squares; this program first looks up for the name registered by here.inria.fr, then performs some computations and reports their results.

# let sqr = ns.lookup("square")
# ;;
# let log(s,x) = 
#   print_string("q: "^s^"= "^ml.string_of_int(x)^"\n"); reply
# ;;
# let sum(s,n) = reply (if n = 0 then s else sum(s+sqr(n),n-1))
# ;;
# do log("sqr(3)",sqr(3))
# do log("sum(5)",sum(0,5))
On another machine there.inria,fr, we compile and run our second program (q.j) in client mode, using the address of our name server as parameter:
there> jcc  q.j -o q.out
there> ./q.out -c -host here.inria.fr
What is the output of this computation? Whenever a process defines new port names, this is done locally, that is, their guarded processes will be executed at the same place as the defining process. Here, every call to square in sqr(3) and within sum(5) will be evaluated as a remote function call to here.inria.fr. The actual localization of processes is revealed by the print statements: f (aliased as sqr on there.inria.fr) always prints on machine here, and log always prints on machine there, no matter where the messages are posted.

The result on machine here.inria.fr is:
-> [3] [5] [4] [3] [2] [1] 
while the result on machine there.inria.fr is:
-> q: sqr(3)= 9
-> q: sum(5)= 55

2.2   Locations and Mobility

So far, the localization of processes and expressions is entirely static. In some cases, however, we would need a finer control. To compute the sum of squares in the previous example, each call to sqr within the loop resulted in two messages on the network (one for the request, and another one for the answer). It would be better to run the whole loop on the machine that actually computes squares. Yet, we would prefer not to modify the program running on the server every time we need to run a different kind of loop that involves numerous squares.

To this end, we introduce a unit of locality called ``location''.

A location contains a bunch of definitions and running processes ``at the same place''. Every location is given a name, and these location names are first-class values. They can be communicated as contents of messages, registered to the name server, ...exactly as port names. They can also be used in primitives that dynamically change the organization of locations.

2.2.1   Basic examples

Locations can be declared either locally or as a top-level statement. For instance, we create a new location named this_location:

# loc this_location
#   with  square(x) = reply x*x
#     and cubic(x)  = reply square(x)*x
#   init 
#     print(square(2));
# end
# 
# do print(cubic(2))
val this_location : location
val square : <int> -> <int>
val cubic : <int> -> <int>
-> 84
This declaration binds a location name this_location, and two port names square and cubic whose scope extends within the location and in the following statements. Here, it also starts a process {print(square(2));} in its init part. This process runs in the location, in parallel with the remaining part of the program. As a result, we can obtain either 84 or 48.

Distributed computations are organized as trees of nested locations; every definition and every process is permanently attached to the location where it appears in the source program; a process can create new sublocations with an initial content (bindings and processes) and a fresh name. Once created, there is no way to place new bindings and processes in the location from the outside.

For instance, the following program defines three locations such that the locations named kitchen and living_room are sublocations of house. As regards the scopes of names, the locations kitchen, living_room and the ports cook, switch, on, off all have the same scope, which extends in the whole house location (betweeen init and the last end); the location house has a larger swcope, that would include whatever follows in the source file.
# loc house 
#   init
#     loc kitchen 
#       with cook() = print_string("cooking...\n"); reply
#       end
#     and living_room
#       with switch() | off() = print_string("music on\n"); on() | reply to switch
#        and switch() | on()  = print_string("music off\n"); off() | reply to switch
#       init off() 
#       end
#     in
#     switch(); cook(); switch();
#   end
val house : location
-> music on
-> cooking...
-> music off

2.2.2   Mobile Agents

While processes and definitions are statically attached to their location, locations can move from one place to another. Such migrations are triggered by a process inside of this location. As a result of the migration, the moving location becomes a sublocation of its destination location.

Notice that locations can be used for several purposes: as a destination addresses, as mobile agents, or as a combination of the two.

Let us propose an agent-based variant of the previous example. On the square side, we create a new empty location ``here'', and we register it on the name-server; its name will be used as the destination address for our mobile agent.

# loc here end
# do ns.register("here",here)
On the client side, we create another location ``mobile'' that wraps the loop computation that should be executed on the square side; the process within mobile first gets the name here, then migrates its location inside of ``here''. Once this is done, it performs the actual computation.

# loc mobile 
#   init
#     let here = ns.lookup("here") in
#     go(here);
#     let sqr = ns.lookup("square") in
#     let sum(s,n) = 
#       reply (if n = 0 then s else sum(s+sqr(n),n-1)) in
#     let result = sum(0,5) in
#     print_string("q: sum(5)= "^ml.string_of_int(s)^"\n");
#   end
The go(here) primitive migrates the whole mobile location on machine here.inria.fr, as a sub-location of location here, then it returns. Afterwards, the whole computation (calls to the name server, to sqr and to sum) is local to here.inria.fr. There are only three messages exchanged between the two machines: one for the lookup(here) request, one for the answer, and one for the migration.

Let us consider a variant of mobile that combines migration and remote communication:

# let sqr  = ns.lookup("square")
# let here = ns.lookup("here")
# 
# let log(s,x) =
#   { let r = ml.string_of_int(x) in
#     print_string("agent: "^s^" is "^r^"\n"); }
# 
# loc mobile
#   with quadric(x) = reply sqr(sqr(x))
#   and  sum(s,n,f) = reply (if n = 0 then s else sum(s+f(n),n-1,f))
#   init 
#     go(here); 
#     log("sum ( i^2 , i= 1..10 )",sum(0,10,sqr))
#   end
# 
# spawn log("sum ( i^4 , i= 1..10 )",sum(0,10,quadric)) 
As before, the mobile agent contains a process that first controls the migration, then performs some computation. Here, the location mobile is also used as a container for the definitions of quadric and sum. By scoping rules for such bindings, they can be used in the following expressions that remain on machine there.

Once the agent arrives on the square machine here, the calls to sum and quadric become remote calls, as if both functions had been defined and registered on machine here. Conversely, messages sent to log from the body of the mobile agent arrive on machine there where the result is printed.

As we run this program on machine there, we obtain the output:
-> agent: sum ( i^2 , i= 1..10 ) is 385
-> agent: sum ( i^4 , i= 1..10 ) is 25333
As regards locality, every repeated use of sqr is now performed on machine here. In the example, the computation of the two sums is entirely local once the agent arrives on machine here (one network datagram), which is much more efficient than the equivalent RPC-based program (that would send over sixty network datagrams).

Remember that localization and scopes are independent in the join-calculus: an agent can perform exactly the same actions no matter where it is actually positioned in the location tree. If we forget about the difference between what is local and what is remote, our program produces the same result as a plain program where the locations boundaries and the migration have been removed:
# let sqr  = ns.lookup("square")
# 
# let log(s,x) = { print_string(s); print(x); print_newline(); }
# 
# let quadric(x) = reply sqr(sqr(x))
# and sum(s,n,f) = reply (if n = 0 then s else sum(s+f(n),n-1,f))
# 
# spawn log("agent: sum ( i^2 , i= 1..10 ) is ",sum(0,10,sqr))
# spawn log("agent: sum ( i^4 , i= 1..10 ) is ",sum(0,10,quadric))
Apart from the performances, both styles are equivalent. In particular, we can first write and test programs, then refine them to get a better tuning of locality.

Applets

The next example shows how to define ``applets''. An applet is a program that is downloaded from a remote server, then used locally. As compared to the previous examples, this migration operates the other way round. Here, the applet defines a reference cell with destructive reading:

# let cell(there) = 
# 
#   let log(s) = print_string("cell "^s^"\n"); reply in
# 
#   loc applet
#     with get() | some(x) = log("is empty"); none() | reply x to get
#     and  put(x) | none() = log("contains "^x); some(x) | reply to put
#     init 
#       go(there); none ()
#     end in
# 
#   reply get,put
# ;;
# do ns.register ("cell",cell)
Our applet has two states: either none() or some(s) where s is a string, and two methods get and put. Each time cell is called, it creates a new applet in its own location. Thus, numerous independent cells can be created and shipped to callers.

cell takes as argument the location (there) where the new cell should reside. This is achieved by the process go(there);none() that first performs the migration, then sends an internal message to activate the cell. Besides, cell defines a log function outside of the applet. The latter therefore remains on the server and, when called from within the applet on the client machine, keeps track of the usage of its cell. This is in contrast with applets à la Java: the location migrates with its code, but also with its communication capabilities unaffected.

We complement our example with a simplistic user that allocates and uses a local cell:

# let cell = ns.lookup ("cell") 
# 
# loc user
#   init
#     let get,put = cell(user) in
#     put("world");
#     put("hello, "^get()); 
#     print_string(get());
#   end
-> hello, world
On the server side, we get the trace:

-> cell contains world
-> cell is empty
-> cell contains hello, world
-> cell is empty
On the client side, there are no more go primitives in the applet after its arrival, and its name applet does not appear anywhere. Of course, some other host location may still move, but then it would carry the cell applet as a sublocation. As a result, the contents of the applet can be considered part of the host location, as if this contents had been defined locally in the beginning.

Data-driven Migration

In the following examples, we consider large data structures that are distributed among several machines. Because of their relative sizes, it is better to have agents move from site to site as they use the data, rather than move the data or, even worse, access the data one bit at a time. Here, we are interested in defining a general iterator that takes a distributed data structure and applies a function to each of its basic component.

In practice, we use arrays as the building blocs of our data structure; the basic functions to allocate arrays and fill them (make), and to create a general function that applies a function over every value in the array (iter) could be defined by the following module table.j:

# let make(n,f) =
#   let a = array.create(n,0) in
#   let loop(m) = if m<n then ( array.set(a,m,f(m)); loop(m+1) ); reply in
#   loop(0); reply a
# ;;
# let iter(a) = 
#   let l = array.length(a) in
#   let loop(f,n) = if n < l then ( f(array.get(a,n)); loop(f,n+1) ); reply in
#   let i(f) = loop(f,0); reply in
#   reply i
# ;;
We now need to ``glue'' together several arrays. More precisely, we define an iterator that is consistent with locality: for each array, we move an agent that contains the function to apply inside of the array location, then we apply it, then we move to the next array, ...

Now, each part of the data structure is a pair (host location, iterator), and the mobility protocol consists in (1) migrate the function to apply inside of the host location, (2) call the iterator with this function as argument.

Here is a simple example module (statistics.j) that collects data using this protocol and keeps its partial results as an internal message.

# let collect(loc_data,iter_data) =
# 
#   loc agent 
#     with state(n,s,s2) | f(x) = state(n+1,s+x,s2+x*x) | reply to f
#     and  state(n,s,s2) | done() | result() = reply n,s,s2 to result
#   init
#     go(loc_data); 
#     { state(0,0,0) | iter_data(f);done() }
#   end in
# 
#   let n,s,s2 = result() in
#   print_string(  "the size is "); print(n);
#   print_string(", the average is "); print(s/n);
#   print_string(", the variance is "); print((n*s2-s*s)/(n*n));
#   print_newline();
Here is the definition of a basic data structure that consists of one array:

# loc here end
# 
# let iter = 
#   let f(x) = reply 2*x+5 in
#   table.iter(table.make(100,f))
# 
# do ns.register("loc_a",here)
# do ns.register("iter_a",iter)
In order to build our structure in a compositional way, we use a merge function. This function takes two (location, iterator) pairs and returns a new such pair standing for the compound data structure (module merge.j):

# let merge(loc1,iter1,loc2,iter2) =
#   loc mobile 
#   with iter(f) = go(loc1); iter1(f); go(loc2); iter2(f); reply 
#   end in
#   reply mobile,iter
Thereby, we can assemble data by repeatedly calling the merge functions on miscellaneous chunks; this defines a structural tree whose leaves are basic arrays. At the same time, merge sets up the locations that are needed to traverse it locally.

While in our example basic arrays are stationary, this is not the case for the location of a compound structure: this location moves to each of its subcomponents in turn before applying its function to them.

For instance, if we consider the data structure built from three arrays in the following program,

# let itr_a = ns.lookup("iter_a") let loc_a = ns.lookup("loc_a")
# let itr_b = ns.lookup("iter_b") let loc_b = ns.lookup("loc_b")
# let itr_c = ns.lookup("iter_c") let loc_c = ns.lookup("loc_c")
# 
# open merge
# let loc_ab,  itr_ab   = merge(loc_a ,itr_a ,loc_b, itr_b)
# let loc_ab_c,itr_ab_c = merge(loc_ab,itr_ab,loc_c, itr_c) 
# 
# spawn statistics.collect(loc_ab_c, itr_ab_c)
we get some result on the machine that runs the program,

-> the size is 230, the average is 77, the variance is 3109
and the successive nestings of locations during the computation are:

 iter on a   ---->  iter on b  ---->  iter on c
-----------------------------------------------------
 a     b     c   |  a     b     c  |  a     b     c
 ab -->          |       ab        |       ab
 ab_c            |      ab_c -->   |            ab_c
 agent           |      agent      |            agent
Notice that migrations are delegated at each level of nesting: as we apply a function f, we put it in its own location (here, agent) and migrate it inside the data structure location (ab_c), which is a leaf of the location tree. Then, we repeatedly apply the function and migrate some location whose branch contains the location of f. At any stage, the branch of the location tree that contains the leaf agent is the inverse of a branch in the structural tree, and the subpart locations are the successive superlocations of the compound location.

2.3   Termination, Failures and Failure Recovery

As a matter of fact, parts of a distributed computation may fail (e.g., because of the physical crash of a machine). The simplest solution would be to abort the whole computation whenever this is detected, but this is not realistic in case numerous machines are involved. Rather, we would like our programs to detect such failures and take adequate measures (cleanly report the problem, abort related parts of the computation, make another attempt on a different machine, ...).

Accordingly, the join-calculus provides an abstract model of failure and failure detection. This is reflected in terms of locations as follows:

The semantics of the join-calculus guarantees that this is the only reason why parts of the computation may stop. Now, an executable running the program P on a fallible machine can be thought as a system-defined location loc machine init P | crash();halt() end where crash may return at any time.

In our model, a machine can in particular remotely detect that another machine has stopped, once it knows the name of a location there. In practice, it is difficult to provide reliable failure detection, as this requires further assumptions on the network.

BEWARE: As it stands, this prototype implementation does not detect this kind of failures: when a runtime terminates abnormally, the failure of its locations is not always detected.

Because locations may only fail as a whole, the programmer can define what are the suitable units of failure, and even use the halt/fail primitives to control its computation. Notice that no silent recovery mechanism is provided; the programmer must figure out what to do in case of problems.

2.3.1   Basic examples

To begin with, we use simple examples that attempt to use a port name say inside of a fallible location to get messages printed. Because these calls may never return in case the locations stopped, we spawn them instead of waiting for their completion.

In this first example, location agent can stop at any time. After the failure occurred, we print some report message. We know for sure that the first say can only print something before the failure report, and that the second say cannot print anything.
# loc agent
#   with say(s) = print_string(s); reply
#   init halt()
#   end
# 
# spawn { say("it may work before.\n"); }
# 
# spawn { fail(agent); 
#         print_string("the location stopped\n");
#         say("it never works after\n"); }
val agent : location
val say : <string> -> <>
-> it may work before.
-> the location stopped
The following example is more tricky. First, the agent does not halt itself; however, it migrates within a location that stops and this is a deadly move. Second, the halt() process can be triggered only from the outside by a normal message kill(). Thus we know that the first say always prints its message. Finally, as there is no halt in location agent, it can only stop because location failing halted, so that fail(agent); implies failing also stopped.
# loc failing
#   with kill() = halt()
#   end
# 
# loc agent
#   with say(s) = print_string(s); reply
#   init go(failing);
#   end 
# 
# spawn { say("it always works.\n"); kill() }
# 
# spawn { say("it may work before.\n"); }
# 
# spawn { fail(agent); 
#         print_string("both locations stopped.\n");
#         say("it never works after.\n"); }
val failing : location
val kill : <>

val agent : location
val say : <string> -> <>
-> it always works.
-> it may work before.
-> both locations stopped.

2.3.2   Watching for Failures

We now move on to some more realistic use of failure-detection; we first consider a function that encapsulates a session with mobility and potential failures.

There is usually no need to halt locations that completed their task explicitly (the garbage-collector should take care of them). However, in some case we would like to be sure that no immigrant location is still running locally.

Let us assume that job is a remote function within location there that may create mobile sublocations and migrate them to the caller's site. To this end, the caller should supply a host location, as in the previous examples. How can we make sure that job is not using this location to run other agents after the call completes? This is handled using a new temporary location box for each call, and halting it once the function call has completed.

# let safe(job,arg,success,failure) =
# 
#   loc box
#     with kill()  = halt()
#     and  start() = reply job(box,arg)  end in
# 
#   let done(x)   | running() = done(x) | kill()
#   and done(x)   | failed()  = success(x)
#   and running() | failed()  = failure() in
# 
#   done(start()) | running() | fail(box); failed()
val safe : <<location * 'a> -> <'b> * 'a * <'b> * <>>
Our supervising protocol either return a result on success, or a signal on failure. In either case, the message guarantees that no alien computation may take place afterward.

Initially, there is a message running(), and the control definition waits for either some result on done(x) or some failure detection on failed(). Whatever its definition is, the job process can create and move locations inside of the box, and eventually return some value to the start process within the box. Once this occurs, done forwards the reply to the control process, and the first join-pattern is triggered. In this case, the running() message is consumed and eventually replaced by a failed() message (once the kill() message is handled, the box gets closed, and the fail guard in the control process is triggered, releasing a message on failed).

At this stage, we know for sure that no binding or computation introduced by job remains on the caller's machine, and we can return the value as if a plain RPC had occurred.

This ``wrapper'' is actually quite general. Once a location-passing convention is chosen, the safe function does not depend on the actual computation performed by job (its arguments, its results, and even the way it uses locations are parametric here). We could refine this example further to transform unduly long calls to job into failure (by sending a failed() message after an external timeout), to give some more control to the caller (adding an abort message),...

2.3.3   Recovering from partial failures

We finally come back to distributed loops. We give an example of a program that uses the CPU of whatever machine is available to compute the sum 1 + 2 + ... + 999. Basically, we only assume that the iteration could be computed in any order, we cut the loop in small chunks, and we distribute a chunk to every available machine. The program takes care of potential failure. In that case, their chunk is aborted and given back to another machine.

# let size = 1000
# let chunk = 200
# 
# let join(name,there) =
#   loc mobile 
#   init
#     let start(i,done) = 
#       let loop(u,s) = if u<(i+1)*chunk then { loop(u+1,s+u) } else { done(s) } in
#       loop(i*chunk,0) in
#     go(there); 
#     worker(name,mobile,start)
#   end in
#   print_string(name^" joins the party\n");
# 
# and job(i) | worker(name,there,start) =
#   print_string(name^","^ml.string_of_int(i*chunk)^"\n");
#   let once() | done(s)  = add(s) | worker(name,there,start)
#   and once() | failed() = print_string(name^" went down\n"); job(i) in
#   once() | start(i,done) | fail(there);failed()
# 
# and result(n,s) | add(ds) =
#   let s' = s + ds in
#   if n > 0 then { result(n-1,s') }
#   else { print_string("The sum is "^ml.string_of_int(s')^"\n"); }
# 
# do ns.register("join",join)
# 
# spawn 
#   result(size/chunk-1,0)
# | let jobs(n) = job(n) | if n>0 then { jobs(n-1) } in jobs(size/chunk-1)
The actual work is performed in the mobile locations, once they reach the locations there provided by joining machines. Messages job(i) partition the work to be done. Each remote computation concludes either with a done(s) or failed() message; in the latter case, the aborted job is re-issued. The resulting sum is accumulated as a message on result.

The client is not specific to our computation at all; indeed, its only contribution is a location where others may place their sublocations.

# loc worker init
#   let join =  ns.lookup("join") in 
#   join("reliable",worker)
# end
In the following, we explicitly model an unreliable task force, as a variant of the previous program that also has a ``time bomb'' which eventually stops the joining location:

# let delay = ml.int_of_string(mlsys.getenv("DELAY")) 
# let name  = mlsys.getenv("NAME")
# 
# loc unreliable_worker init
#   let join =  ns.lookup("join") in 
#   let tictac(n) = if n = 0 then { halt() } else { tictac(n-1) } in 
#   join(name,unreliable_worker) | tictac(delay)
# end
We start three of them with various parameters, for instance with the command lines:

DELAY=300  ; NAME=fallible; ./work.out -c &
DELAY=500  ; NAME=dubious ; ./work.out -c &
DELAY=30000; NAME=reliable; ./work.out -c &
./distributed_loop.out -s
and we look at the output of the last command (the main loop program):

-> fallible joins the party
-> fallible,0
-> fallible,200
-> fallible went down
-> dubious joins the party
-> dubious,200
-> dubious,400
-> dubious,600
-> reliable joins the party
-> reliable,800
-> dubious went down
-> reliable,600
-> The sum is 499500

Previous Contents Next