Threads Pool - C Language
|
#include "thpool.h"
Vai al codice sorgente di questo file.
Funzioni | |
void * | thWorker (void *arg) |
th_pool * | poolInit (int dim, int job_max, sigset_t *set) |
int | poolDispatcher (th_pool *pool, void *(*start_routine)(void *), void *arg) |
int | poolDestroy (th_pool *pool) |
Implementazione della libreria di creazione e gestione di un pool di threads.
Definizione nel file thpool.c.
void * thWorker | ( | void * | arg | ) |
A solo uso interno
Definizione alla linea 196 del file thpool.c.
{ th_pool *pool = NULL; joblist * job = NULL; pool = (th_pool *) arg; /* Gestine dei segnali, se voluta */ if (pool->signalmask) if (pthread_sigmask(SIG_SETMASK, pool->signalmask, NULL) != 0) sys_err(__FILE__,__LINE__,"thWorker: pthread_sigmask error 'SIG_SETMASK'"); for (;;){ /* Accesso in muta esclusione alla lista dei jobs */ pthread_mutex_lock(&(pool->lock)); while (pool->job_size == 0){ /* coda vuota -> attesa */ /* rilascio il lock e attendo */ pthread_mutex_unlock(&(pool->lock)); /* controllo che il pool non sia in fase di chiusura */ if (pool->shutdown){ pthread_mutex_unlock(&(pool->lock)); return NULL; /* pthread_exit(NULL); NO THANKS!! */ } pthread_cond_wait(&(pool->notempty), &(pool->lock)); } job = pool->firstjob; pool->firstjob = job->next; pool->job_size--; /* se la coda dei job e' vuota */ if (pool->job_size==0) pool->lastjob = NULL; /* segnalo che la coda e' nuovamente vuota (vedi poolDestroy) */ if (pool->job_size == 0 && pool->quitflag) pthread_cond_signal(&(pool->empty)); pthread_mutex_unlock(&(pool->lock)); /* Eseguo il job */ job->start_routine(job->arg); free(job); job = NULL; } pthread_exit((void*)0); }
Funzione che inizializza e ritorna un Pool di Threads
dim | Dimensione del pool di threads |
job_max | Dimensione massima della coda dei jobs |
*set | Opzionale. Maschera dei segnali che il pool usera' durante la creazione dei threads. Se NULL viene ignorata. |
Definizione alla linea 27 del file thpool.c.
{ /* Dichiarazione della struttura che conterra' questo pool di threads */ th_pool *pool = NULL; int n=0; if (dim<1){ errno = EINVAL; sys_err(__FILE__,__LINE__,"PoolInit: dimensione del pool di thread errata"); return NULL; } if (job_max<1){ errno = EINVAL; sys_err(__FILE__,__LINE__,"PoolInit: numero massimo di job errato"); return NULL; } /* Allocazione memoria per la struttura del pool */ if (!(pool = (th_pool*) malloc(sizeof(th_pool)))){ sys_err(__FILE__,__LINE__,"PoolInit: errore calloc"); return NULL; } /* Setto i parametri operativi del pool */ pool->pool_size = dim; pool->signalmask = set; pool->job_max = job_max; pool->firstjob = NULL; pool->lastjob = NULL; pool->quitflag = 0; pool->shutdown = 0; pool->job_size = 0; /* Creazione array dei tids dei threads */ if(!(pool->tids = (pthread_t*) calloc(pool->pool_size, sizeof(pthread_t)))){ sys_err(__FILE__,__LINE__,"PoolInit: errore calloc",errno); free(pool); return NULL; } /* Inizializzo il mutex e le cond 'full' e 'empty' per la lista dei job */ if (pthread_mutex_init(&(pool->lock), NULL)){ sys_err(__FILE__,__LINE__,"PoolInit: errore pthread_mutex_init"); free(pool->tids); free(pool); return NULL; } if (pthread_cond_init(&(pool->notempty),NULL)){ sys_err(__FILE__,__LINE__,"PoolInit: errore pthread_cond_init notempty"); free(pool->tids); free(pool); return NULL; } if (pthread_cond_init(&(pool->empty),NULL)){ sys_err(__FILE__,__LINE__,"PoolInit: errore pthread_cond_init empty"); free(pool->tids); free(pool); return NULL; } /* Creazione dei threads */ for (n = 0; n < pool->pool_size; n++) if (pthread_create(&(pool->tids[n]), NULL, &thWorker, pool) != 0){ sys_err(__FILE__,__LINE__,"PoolInit: errore pthread_create",errno); free(pool->tids); free(pool); return NULL; } return pool; }
int poolDispatcher | ( | th_pool * | pool, |
void *(*)(void *) | start_routine, | ||
void * | arg | ||
) |
Funzione che sottopone un nuovo job al pool di threads
*pool | Il pool di thread |
*(*start_routine)(void*) | Il job da fare eseguire al pool |
*arg | argomento passato al job |
Definizione alla linea 97 del file thpool.c.
{ joblist * newjob = NULL; if (!pool){ errno = EINVAL; sys_err(__FILE__,__LINE__,"Dispatch: errore pool e' nullo"); return -1; } if (!start_routine){ errno = EINVAL; sys_err(__FILE__,__LINE__,"Dispatch: errore start_routine e' nulla"); return -2; } if (!(newjob = (struct joblist *) malloc(sizeof(struct joblist)))){ sys_err(__FILE__,__LINE__,"Dispatch: errore malloc(sizeof(struct joblist)"); return -3; } newjob->next = NULL; newjob->arg = arg; newjob->start_routine = start_routine; /* Accesso in muta esclusione alla lista dei job */ pthread_mutex_lock(&(pool->lock)); /* Se il pool di threads e' in fase di terminazione rifiuto il job */ if (pool->quitflag){ /*errore(__FILE__,__LINE__,"Dispatch: errore pool di thread in fase di chiusura",ESHUTDOWN);*/ errno = EINVAL; sys_err(__FILE__,__LINE__,"Dispatch: errore pool di thread in fase di chiusura"); free(newjob); pthread_mutex_unlock(&(pool->lock)); return -4; } if (pool->job_size==pool->job_max){ errno = EAGAIN; sys_err(__FILE__,__LINE__,"Dispatch: errore troppi job in attesa nella coda del pool di threads"); free(newjob); pthread_mutex_unlock(&(pool->lock)); return -5; } switch(pool->job_size){ case 0:/* lista dei jobs vuota -> inserimento in testa */ pool->firstjob = newjob; pool->lastjob = newjob; /* segnalo che la coda non e' piu' vuota */ pthread_cond_signal(&(pool->notempty)); break; default:/* inserimento in coda */ pool->lastjob->next = newjob; pool->lastjob = newjob; break; } pool->job_size++; pthread_mutex_unlock(&(pool->lock)); return 0; }
int poolDestroy | ( | th_pool * | pool | ) |
Distrugge un Pool di Threads: uccide i threads e libera le risorse
In particolare:
*pool | Il pool di threads da distruggere |
Definizione alla linea 158 del file thpool.c.
{ int n=0; if (!pool){ errno = EINVAL; sys_err(__FILE__,__LINE__,"PoolDestroy: errore pool e' nullo"); return -1; } pthread_mutex_lock(&(pool->lock)); /* da questo momento in poi il pool rifiutera' nuovi job */ pool->quitflag = 1; /* aspetto la terminazione dei job attivi e in coda */ while (pool->job_size != 0) pthread_cond_wait(&(pool->empty), &(pool->lock)); /* termino definitivamente i threads del pool */ pool->shutdown = 1; pthread_cond_broadcast(&(pool->notempty)); pthread_mutex_unlock(&(pool->lock)); for (n=0; n<pool->pool_size; n++){ pthread_cond_broadcast(&(pool->notempty)); if (pthread_join(pool->tids[n], NULL) != 0) sys_err(__FILE__,__LINE__,"PoolDestroy: errore pthred_join"); } /* rilascio le risorse */ pthread_mutex_destroy(&(pool->lock)); pthread_cond_destroy(&(pool->empty)); pthread_cond_destroy(&(pool->notempty)); free(pool->tids); free(pool); pool = NULL; return 0; }