Problema del produttore/consumatore

In informatica, il problema del produttore-consumatore (conosciuto anche con il nome di problema del buffer limitato o bounded-buffer problem) è un esempio classico di sincronizzazione tra processi. Il problema descrive due processi, uno produttore (in inglese producer) ed uno consumatore (consumer), che condividono un buffer comune, di dimensione fissata. Compito del produttore è generare dati e depositarli nel buffer in continuo. Contemporaneamente, il consumatore utilizzerà i dati prodotti, rimuovendoli di volta in volta dal buffer. Il problema è assicurare che il produttore non elabori nuovi dati se il buffer è pieno, e che il consumatore non cerchi dati se il buffer è vuoto.

La soluzione per il produttore è sospendere la propria esecuzione se il buffer è pieno; non appena il consumatore avrà prelevato un elemento dal buffer, esso "sveglierà" il produttore, che ricomincerà quindi a riempire il buffer. Allo stesso modo, il consumatore si sospenderà se il buffer è vuoto; non appena il produttore avrà scaricato dati nel buffer, risveglierà il consumatore. Questa soluzione può essere implementata tramite delle strategie di comunicazione tra processi, tipicamente con dei semafori. Una soluzione errata potrebbe dar luogo ad un deadlock, in cui entrambi i processi aspettano di essere risvegliati.

Il problema può anche essere riscritto considerando più produttori e più consumatori distinti.

Implementazioni

modifica

Esempio di implementazione errata

modifica

La risoluzione del problema presenta una race condition. Un programmatore potrebbe perciò risolvere il problema in maniera incorretta, come mostrato nel codice seguente; in esso vengono usate due diverse procedure, sleep e wakeup. Quando viene invocata sleep da parte di un processo, questo si "addormenta", fino a che non interviene la procedura wakeup. La variabile itemCount è il numero di elementi contenuti nel buffer.

int itemCount
 
procedure producer() {
     while (true) {
         item = produceItem()
 
         if (itemCount == BUFFER_SIZE) {
             sleep()
         }
 
         putItemIntoBuffer(item)
         itemCount = itemCount + 1
         
         if (itemCount == 1) {
             wakeup(consumer)
         }
     }
}
 
 procedure consumer() {
     while (true) {
 
         if (itemCount == 0) {
             sleep()
         }
         
         item = removeItemFromBuffer()
         itemCount = itemCount - 1
         
         if (itemCount == BUFFER_SIZE - 1) {
             wakeup(producer)
         }
         
         consumeItem(item)
     }
}

Il problema con questa soluzione è la race condition che può portare ad un deadlock. Consideriamo il seguente scenario:

  1. Il consumatore ha appena letto la variabile itemCount, verificato che è uguale a zero e sta per entrare nel corpo dell'if.
  2. Prima di invocare sleep, il consumatore viene interrotto dal dispatcher del sistema operativo, che riattiva il produttore.
  3. Il produttore crea un nuovo dato da elaborare, lo mette nel buffer e incrementa itemCount.
  4. Poiché il buffer era vuoto prima di questa operazione, il produttore invoca wakeup per risvegliare i processi consumatori addormentati.
  5. Sfortunatamente, l'interruzione del consumatore è avvenuta prima che il consumatore invocasse sleep: wakeup non ha perciò alcun effetto. Non appena il controllo tornerà al consumatore, la procedura sleep verrà completata, impedendo l'esecuzione del consumatore.
  6. Il produttore andrà avanti fino a che non riempirà il buffer, dopodiché eseguirà anch'esso sleep.

Poiché entrambi i processi rimarranno per sempre addormentati, abbiamo raggiunto una situazione di stallo (deadlock), provando che questa soluzione è inadatta.

Una analisi possibile per questo errore è che ogni qual volta il linguaggio di programmazione non definisce la semantica con cui è necessario accedere alle variabili condivise (in questo caso itemCount), e non usa perciò elementi di sincronizzazione, la soluzione non sarà soddisfacente, anche senza dimostrare esplicitamente l'esistenza di una race condition.

Mediante semafori

modifica

I semafori risolvono il problema della perdita delle notifiche di risveglio dei processi. Nella soluzione seguente utilizzeremo due semafori, fillCount e emptyCount: il primo è incrementato ed il secondo decrementato quando un nuovo dato viene immesso nel buffer. Se il produttore prova a decrementare emptyCount mentre questo è zero, la sua esecuzione viene sospesa; non appena un elemento del buffer viene consumato, emptyCount viene incrementato e il produttore si riattiverà. Il consumatore funzionerà in maniera analoga.

 semaphore fillCount = 0
 semaphore emptyCount = DIMENSIONE_DEL_BUFFER
 
 procedure producer() {
     while (true) {
         item = produceItem()
         down(emptyCount)
         putItemIntoBuffer(item)
         up(fillCount)
     }
}
 
 procedure consumer() {
     while (true) {
         down(fillCount)
         item = removeItemFromBuffer()
         up(emptyCount)
         consumeItem(item)
     }
}

Questa soluzione lavora egregiamente quando esistono solo un produttore ed un consumatore. Al contrario, con più istanze di produttori e di consumatori, essa produce una rilevante race condition che può generare in due o più processi che leggono o scrivono lo stesso elemento contemporaneamente. Per capire come questo potrebbe accadere, immaginiamo come la procedura putItemIntoBuffer(), che mette un risultato nel buffer, potrebbe essere implementata: essa dovrebbe contenere due azioni, una che determini uno spazio disponibile del buffer e l'altra che vi scrive il dato. Se la procedura può essere eseguita concorrentemente da più produttori, allora diviene possibile il seguente scenario:

  1. Due produttori decrementano emptyCount
  2. Uno dei produttori determina il successivo spazio nel buffer, ma prima di passare alla fase di scrittura, viene interrotto dal sistema operativo.
  3. Un secondo produttore, nel determinare lo spazio successivo, individua il medesimo del precedente.
  4. Entrambi i produttori scrivono sul medesimo spazio.

Per superare questa situazione, dobbiamo assicurarci che solo un produttore per volta esegua putItemIntoBuffer(); in altre parole, abbiamo bisogno di una sezione critica con Mutex. Per raggiungere questo scopo, introduciamo un semaforo binario, chiamato Mutex. Poiché il valore di un semaforo binario può essere solamente uno o zero, solo un processo può essere in esecuzione tra i due cambi di valore del semaforo (algoritmo del biglietto). Questa soluzione, adatta anche a più produttori e più consumatori, avrà un codice di questo tipo:

 semaphore mutex = 1
 semaphore fillCount = 0
 semaphore emptyCount = DIMENSIONE_DEL_BUFFER
 
 procedure producer() {
     while (true) {
         item = produceItem()
         down(emptyCount)
         down(mutex)
         putItemIntoBuffer(item)
         up(mutex)
         up(fillCount)
     }
}
 
 procedure consumer() {
     while (true) {
         down(fillCount)
         down(mutex)
         item = removeItemFromBuffer()
         up(mutex)
         up(emptyCount)
         consumeItem(item)
     }
}

Notiamo che l'ordine in cui i semafori vengono incrementati e decrementati è essenziale: cambiando quest'ordine si potrebbe generare un deadlock.

Mediante test and set

modifica
/* Risolvere con un programma in pseudo-codice il problema produttore consumatore, 
con un buffer condiviso di lunghezza finita N ( un array b[M] adoperato come buffer circolare).
Utilizzando per la mutua esclusione l'istruzione test-and-set e l'attesa attiva (busy wait). */

// !! NO SEMAFORI  !!

int i = 0;
int nq = 0;
queue q;

P(){
	while(true){
		x = produce();
		while (nq == bufferSize) { /* non fare niente */ }
    
    	while (!testAndSet(i)) { /* non fare niente */ }

    	/* INIZIA SEZIONE CRITICA */
		q.append();
		nq++;
		i = 0;
	/* FINISCE LA SEZIONE CRITICA */

	//...altre operazioni...
    }
}

C(){
	while (true) {
		while (qn == 0) { /* non fare niente */ }
    
    	while (!testAndSet(i)) { /* non fare niente */ }

    	/* INIZIA SEZIONE CRITICA */
		y = q.take();
		nq--;
		i = 0;
		/* FINISCE SEZIONE CRITICA */
		consume(y);

		//...altre operazioni...
	}
}

Esempi con numero di produttori, scrittori e buffer variabile

modifica
/* Un produttore P produce dati che mette in tre code FIFO, identificate con qa,
qb e qc, in modo circolare fra le tre. Tre consumatori CA, CB e CC, prelevano
un dato per volta, rispettivamente dalle code qa, qb e qc, e li elaborano.

I tempi di ogni singola produzione ed elaborazione sono molto irregolari, al
punto che un consumatore può trovare la propria coda vuota, nel qual caso
può prelevare un dato dalla coda successiva in senso circolare.

Scrivere lo pseudo codice corrispondente ai processi P, CA, CB e CC,
utilizzando i semafori per la sincronizzazione, le code qa, qb e qc sono di
lunghezza uguale e finita. Evitare assolutamente l'attesa attiva dei tre
processi. 
N CONSUMATORI
*/

inizializzazioni() {
	queue qa, qb, qc; //inizializzo le code

	Sem vuoti_a = size;
	Sem vuoti_b = size;
	Sem vuoti_c = size; //servono per verificare la corretta produzione

	Sem dati = 0; //serve per vedere se abbiamo prodotto qualcosa

	Sem mutex_a = 1;
	Sem mutex_b = 1; 
	Sem mutex_c = 1; //servono per la mutua esclusione

	int dati_a = 0;
	int dati_b = 0;
	int dati_c = 0; //controlla quanti dati abbiamo prodotto per quel consumatore
}

void P(){
	while (true){
		v = produce();
		semWait(vuoti_a);
		semWait(mutex_a);
		qa.append(v);
		dati_a++;
		semSignal(mutex_a);
		semSignal(dati);

		w = produce();
		semWait(vuoti_b);
		semWait(mutex_b);
		qb.append(w);
		dati_b++;
		semSignal(mutex_b);
		semSignal(dati);

		z = produce();
		semWait(vuoti_c);
		semWait(mutex_c);
		qc.append();
		dati_c++;
		semSignal(mutex_c);
		semSignal(dati);
	}
}

void CA(){
	while(true){
		semWait(dati);
		if (dati_a==0) {
			if (dati_b > 0) {
			//vado a "consumare" la coda b
			semWait(mutex_b);
			qb.take();
			dati_b--;
			semSignal(mutex_b);
			semSignal(vuoti_b);
		} else {
			//vado a "consumare" la coda c
			semWait(mutex_c);
			qc.take();
			dati_c--;
			semSignal(mutex_c);
			semSignal(vuoti_c);
		}
	} else {
		//vado a "consumare" la coda del consumatore in esame
		semWait(mutex_a);
		qa.take();
		dati_a--;
		semSignal(mutex_a);
		semSignal(vuoti_a);
		}
		consume();
	}
}

void CB(){
	while(true){
	semWait(dati);
		if (dati_b==0) {
			if (dati_a > 0) {
			//vado a "consumare" la coda a
			semWait(mutex_a);
			qa.take();
			dati_a--;
			semSignal(mutex_a);
			semSignal(vuoti_a);
		} else {
			//vado a "consumare" la coda c
			semWait(mutex_c);
			qc.take();
			dati_c--;
			semSignal(mutex_c);
			semSignal(vuoti_c);
		}
	} else {
		//vado a "consumare" la coda del consumatore in esame
		semWait(mutex_b);
		qb.take();
		dati_b--;
		semSignal(mutex_b);
		semSignal(vuoti_b);
		}
		consume();
	}
}

void CC(){
	while(true){
	semWait(dati);
		if (dati_c==0) {
			if (dati_a > 0) {
			//vado a "consumare" la coda a
			semWait(mutex_a);
			qa.take();
			dati_a--;
			semSignal(mutex_a);
			semSignal(vuoti_a);
		} else {
			//vado a "consumare" la coda b
			semWait(mutex_b);
			qb.take();
			dati_b--;
			semSignal(mutex_b);
			semSignal(vuoti_b);
		}
	} else {
		//vado a "consumare" la coda del consumatore in esame
		semWait(mutex_c);
		qc.take();
		dati_c--;
		semSignal(mutex_c);
		semSignal(vuoti_c);
		}
		consume();
	}
}

// #################################################################
// N PRODUTTORI

inizializzazioni() {
	queue qa, qb; //inizializzo le code

	Sem vuoti_a = size;
	Sem vuoti_b = size; //servono per verificare la corretta produzione

	Sem dati = 0; //serve per vedere se abbiamo prodotto qualcosa

	Sem mutex_a = 1;
	Sem mutex_b = 1;  //servono per la mutua esclusione

	int dati_a = 0;
	int dati_b = 0; //controlla quanti dati abbiamo prodotto per quel consumatore
}

void P(){
	while (true){
		v = produce();
		semWait(vuoti_a);
		semWait(mutex_a);
		qa.append(v);
		dati_a++;
		semSignal(mutex_a);
		semSignal(dati);

		w = produce();
		semWait(vuoti_b);
		semWait(mutex_b);
		qb.append(w);
		dati_b++;
		semSignal(mutex_b);
		semSignal(dati);
	}
}

void C(){
	while(true){
		semWait(dati);
		if (dati_a==0) {
			// vado a consumare la coda qb
			semWait(mutex_b);
			qb.take();
			dati_b--;
			semSignal(mutex_b);
			semSignal(vuoti_b);
			
		} else {
			//vado a "consumare" la coda del consumatore in esame
			semWait(mutex_a);
			qa.take();
			dati_a--;
			semSignal(mutex_a);
			semSignal(vuoti_a);
		}
		consume();
	}
}

/* Il processo P1 produce dei dati e li invia al processo P2 tramite un sistema a doppio buffer, 
costituito da due buffer (ciascuno di lunghezza unitaria) condivisi tra P1 e P2. Mentre P2 legge 
da un buffer, P1 può scrivere nell'altro.

Scrivere i codici dei processi P1 e P2, utilizzando i semafori per la sincronizzazione. 
N BUFFER
*/

Sem w1 = 1;
Sem w2 = 1;
Sem r1 = 0;
Sem r2 = 0; 
dati x, y // inizializzazione dati letti

void P1(){
	while (true) {
		k = produce();
		semWait(w1);
		append(buffer1, k);
		semSignal(r1);

		k = produce();
		semWait(w2);
		append(buffer2, k);
		semSignal(r2);
	}
}

void P2(){
	while(true){
		semWait(r1);
		y = take(buffer1);
		semSignal(w1);
		consume(y);

		semWait(r2);
		y = take(buffer2);
		semSignal(w2);
		consume(y);
	}
}

/* Si consideri un sistema costituito da soli due processi P1 e P2. Il processo P1 produce dei dati e li invia al processo P2 
tramite una struttura a triplo buffer, costituita da tre buffer B1,B2 e B3 (ciascuno di lunghezza unitaria) condivisi fra P1 e P2. 
Mentre P2 legge da un buffer, P1 può scrivere in uno degli altri.

Scrivere i codici dei processi P1 e P2, utilizzando i semafori per la sincronizzazione. Evitare l'attesa attiva dei processi P1 e P2. */

Sem w1 = 1;
Sem w2 = 1;
Sem w3 = 1;
Sem r1 = 0;
Sem r2 = 0;
Sem r3 = 0; 
var x, y // inizializzazione dati letti

void P1(){
	while (true) {
		k = produce();
		semWait(w1);
		append(buffer1, k);
		semSignal(r1);

		k = produce();
		semWait(w2);
		append(buffer2, k);
		semSignal(r2);

		k = produce();
		semWait(w3);
		append(buffer3, k);
		semSignal(r3);
	}
}

void P2(){
	while(true){
		semWait(r1);
		y = take(buffer1);
		semSignal(w1);
		consume(y);

		semWait(r2);
		y = take(buffer2);
		semSignal(w2);
		consume(y);

		semWait(r3);
		y = take(buffer3);
		semSignal(w3);
		consume(y);
	}
}

Mostriamo ora una soluzione tramite l'uso di monitor. Dal momento che essi garantiscono la mutua esclusione, non è necessaria nessuna implementazione aggiuntiva per proteggere le sezioni critiche nel caso di più produttori/consumatori. È inoltre degno di nota come questa implementazione renda più semplice prevedere ed evitare il verificarsi di race condition.

 monitor ProducerConsumer {
     
     int itemCount
     condition full
     condition empty
     
     procedure add(item) {
         while (itemCount == BUFFER_SIZE) {
             wait(full)
         }
         
         putItemIntoBuffer(item)
         itemCount = itemCount + 1
         
         if (itemCount == 1) {
             notify(empty)
         }
     }
     
     procedure remove() {
         while (itemCount == 0) {
             wait(empty)
         }
         
         item = removeItemFromBuffer()
         itemCount = itemCount - 1
         
         if (itemCount == BUFFER_SIZE - 1) {
             notify(full)
         }
         
         return item;
     }
}
 
 procedure producer() {
     while (true) {
         item = produceItem()
         ProducerConsumer.add(item)
     }
 }
 
 procedure consumer() {
     while (true) {
         item = ProducerConsumer.remove()
         consumeItem(item)
     }
 }

Bibliografia

modifica

Voci correlate

modifica
  Portale Informatica: accedi alle voci di Wikipedia che trattano di informatica