Planche 1
Concurrency 2
From shared memory
to synchronization
by communication on
channels
Jean-Jacques Lévy (INRIA - Rocq)


MPRI concurrency course with:
Pierre-Louis Curien (PPS)
Eric Goubault (CEA)
James Leifer (INRIA - Rocq)
Catuscia Palamidessi (INRIA - Futurs)

Planche 2

Plan

Planche 3

Readers and Writers (1/6)

A shared resource is concurrently read or modified.
Planche 4

Les lecteurs et les écrivains (2/6)

En lecture, on a nReaders simultanés (nReaders > 0).

En écriture, on a nReaders = -1.
PROCEDURE AcquireShared() =
BEGIN
  LOCK m DO
    WHILE nReaders = -1 DO
      Thread.Wait(m, c);
    END;
    ++ nReaders;
  END;
END AcquireShared;

PROCEDURE ReleaseShared() =
BEGIN
  LOCK m DO
    -- nReaders;
    IF nReaders = 0 THEN
      Thread.Signal(c);
  END:
END ReleaseShared;
PROCEDURE AcquireExclusive() =
BEGIN
  LOCK m DO
    WHILE nReaders != 0 DO
      Thread.Wait(m, c);
    END;
    nReaders := -1;
  END;
END AcquireExclusive;

PROCEDURE ReleaseExclusive() =
BEGIN
  LOCK m DO
   nReaders := 0;
   Thread.Broadcast(c);
  END;
END ReleaseExclusive;

Planche 5

Les lecteurs et les écrivains (3/6)

Broadcast réveille trop de processus se retrouvant immédiatement bloqués. Avec deux conditions cR et cW, on a un contrôle plus fin:
PROCEDURE AcquireShared() =
BEGIN
  LOCK m DO
    ++ nWaitingReaders;
    WHILE nReaders = -1 DO
      Thread.Wait(m, cR);
    END;
    -- nWaitingReaders;
    ++ nReaders;
  END;
END AcquireShared;

PROCEDURE ReleaseShared() =
BEGIN
  LOCK m DO
    -- nReaders;
    IF nReaders = 0 THEN
      Thread.Signal(cW);
  END:
END ReleaseShared;
PROCEDURE AcquireExclusive() =
BEGIN
  LOCK m DO
    WHILE nReaders != 0 DO
      Thread.Wait(m, cW);
    END;
    nReaders := -1;
  END;
END AcquireExclusive;

PROCEDURE ReleaseExclusive() =
BEGIN
  LOCK m DO
   nReaders := 0;
   IF nWaitingReaders > 0 THEN
     Thread.Broadcast(cR);
   ELSE
     Thread.Signal(cW);
  END;
END ReleaseExclusive;

Planche 6

Les lecteurs et les écrivains (4/6)

Exécuter signal à l'intérieur d'une section critique n'est pas très efficace. Avec un seul processeur, ce n'est pas un problème car les réveillés passent dans l'état prêt attendant la disponibilité du processeur.

Avec plusieurs processeurs, le processus réveillé peut retomber rapidement dans l'état bloqué, tant que le verrou n'est pas relaché.

Il vaut mieux faire signal à l'extérieur de la section critique.
(Ce qu'on ne fait jamais !!)
PROCEDURE ReleaseShared() =
BEGIN
  VAR doSignal: BOOLEAN;
  LOCK m DO
    -- nReaders;
    doSignal := nReaders = 0;
  END:
  IF doSignal THEN
    Thread.Signal(cW);
END ReleaseShared;

Planche 7

Les lecteurs et les écrivains (5/6)

Des blocages inutiles sont possibles (avec plusieurs processeurs) sur le Broadcast de fin d'écriture.

Comme avant, on peut le sortir de la section critique.

Si plusieurs lecteurs sont réveillés, un seul prend le verrou. Mieux vaut faire signal en fin d'écriture, puis refaire signal en fin d'accès partagé pour relancer les autres lecteurs.
PROCEDURE AcquireShared() =
BEGIN
  LOCK m DO
    ++ nWaitingReaders;
    WHILE nReaders = -1 DO
      Thread.Wait(m, cR);
    END;
    -- nWaitingReaders;
    ++ nReaders;
  END;
  Thread.Signal(cR);
END AcquireShared;
PROCEDURE ReleaseExclusive() =
BEGIN
  LOCK m DO
   nReaders := 0;
   IF nWaitingReaders > 0 THEN
     Thread.Signal(cR);
   ELSE
     Thread.Signal(cW);
  END;
END ReleaseExclusive;


Planche 8

Les lecteurs et les écrivains (6/6)

Famine possible d'un écrivain en attente de fin de lecture. La politique d'ordonnancement des processus peut aider. On peut aussi logiquement imposer le passage d'un écrivain.
PROCEDURE AcquireShared() =
BEGIN
  LOCK m DO
    ++ nWaitingReaders;
    IF nWaitingWriters > 0 THEN
      Thread.Wait(m, cR);
    WHILE nReaders = -1 DO
      Thread.Wait(m, cR);
    END;
    -- nWaitingReaders;
    ++ nReaders;
  END;
  Thread.Signal(cR);
END AcquireShared;
PROCEDURE AcquireExclusive() =
BEGIN
  LOCK m DO
    ++nWaitingWriters;
    WHILE nReaders != 0 DO
      Thread.Wait(m, cW);
    END;
    --nWaitingWriters;
    nReaders := -1;
  END;
END AcquireExclusive;



Contrôler finement la synchronisation peut être complexe.


Planche 9

Les 5 philosophes (1/7)

Planche 10

Les 5 philosophes (2/7)



  VAR s: ARRAY [0..4] OF MUTEX;

  PROCEDURE Philosophe (i: CARDINAL) =
  BEGIN
    (* penser *)
    Thread.Acquire(s[i]);
    Thread.Acquire(s[(i+1) MOD 5]);
    (* manger *)
    Thread.Release(s[i]);
    Thread.Release(s[(i+1) MOD 5]);
  END Philosophe;

Sureté, mais interblocage.

Planche 11

Les 5 philosophes (3/7)



  VAR f: ARRAY 0..4 OF CARDINAL = {2, 2, 2, 2, 2};
      manger: ARRAY 0..4 OF Thread.Condition;

  PROCEDURE Philosophe (i: CARDINAL) =
  BEGIN
    WHILE true DO
      (* penser *)
      PrendreFourchettes(i);
      (* manger *)
      RelacherFourchettes(i);
    END;
  END Philosophe;
Planche 12

Les 5 philosophes (4/7)


  BEGIN
    LOCK m DO
      WHILE f[i] != 2 DO
        Thread.Wait (m, manger[i]);
      -- f[(i-1) MOD 5]; -- f[(i+1) MOD 5];
    END:
  END PrendreFourchettes;

  PROCEDURE RelacherFourchettes (i: CARDINAL) =
  BEGIN
    VAR g := (i-1) MOD 5, d := (i+1) MOD 5;
    ++ f[g]; ++ f[d];
    IF f[d] = 2 THEN
      Thread.Signal (manger[d]);
    IF f[g] = 2 THEN
      Thread.Signal (manger[g]);
  END RelacherFourchettes;

Planche 13

Les 5 philosophes (5/7)

Planche 14

Les 5 philosophes (6/7)

Planche 15

Les 5 philosophes (7/7)

Exercice 1 Programmer cette solution des 5 philosophes avec les seuls Thread.Wait, Thread.Signal et Thread.Broadcast.

Planche 16

Communication channels (1/2)

Planche 17

Communication channels (2/2)


Planche 18

Concurrent ML

The Event library in Ocaml implements [Reppy] SML library.
sig
  type 'a channel
  val new_channel : unit -> 'a Event.channel
  type 'a event
  val send : 'a Event.channel -> 'a -> unit Event.event
  val receive : 'a Event.channel -> 'a Event.event
  val choose : 'a Event.event list -> 'a Event.event
  val wrap : 'a Event.event -> ('a -> 'b) -> 'b Event.event
  val guard : (unit -> 'a Event.event) -> 'a Event.event
  val sync : 'a Event.event -> 'a
  val select : 'a Event.event list -> 'a
  ...
end

Planche 19

Updatable storage cell (1/4)


open Event;;

type 'a request =  GET | PUT of 'a;;

type 'a cell = {
  reqCh: 'a request channel;
  replyCh: 'a channel;
  }  ;;

let send1 c msg = sync (send c msg) ;;
let receive1 c =  sync (receive c) ;;

let get c =   send1 c.reqCh GET; receive1 c.replyCh ;;
let put c x = send1 c.reqCh (PUT x) ;;

let cell x =
  let reqCh = new_channel() in
  let replyCh = new_channel() in
  let rec loop x = match (receive1 reqCh) with
    GET -> send1 replyCh x ; loop x
  | PUT x' -> loop x'
  in
  Thread.create loop x ;
  {reqCh = reqCh; replyCh = replyCh} ;;

Planche 20

Updatable storage cell (2/4)


open Event;;

type 'a cell = {
  getCh: 'a channel;
  putCh: 'a channel;
  }  ;;

let get c =
  sync (receive c.getCh);;

let put c x =
  sync (send c.putCh x);;

let cell x =
  let getCh = new_channel() in
  let putCh = new_channel() in
  let rec loop x = select [
    wrap (send getCh x) (function () -> loop x);
    wrap (receive putCh) loop
  ]
  in
  Thread.create loop x ;
  {getCh = getCh; putCh = putCh} ;;

Planche 21

Updatable storage cell (3/4)


open Event;;

type 'a sem = {
  getCh: 'a channel;
  putCh: 'a channel;
  }  ;;

let get c =  sync (receive c.getCh);;
let put c x =  sync (send c.putCh x);;

let sem () =
  let getCh = new_channel() in
  let putCh = new_channel() in
  let rec loop x =
    let putEvt = wrap (receive putCh) (function y -> loop (Some y)) in
    match x with
      None ->  sync putEvt
    | Some y ->
      let getEvt = wrap (send getCh y) (function () -> loop x) in
      select [ getEvt; putEvt ]
  in
  Thread.create loop None ;
  {getCh = getCh; putCh = putCh} ;;

Planche 22

Updatable storage cell (4/4)

loop(None) = Put(y) · loop(Somey)
loop(Somey) =
Get
á y ñ · Put(z) · loop(Somez)
  + Put(z) · loop(Somez)
 
Sem(x)
def
=
 
loop(Somex)
 
  ß  
 
Sem(x) =
Get
· Put(y) · Sem(y)
  + Put(y) · Sem(y)

Exercice 2 Give a program for generalized semaphores. Find which equation is true for them.


Planche 23

Conclusion


This document was translated from LATEX by HEVEA.