Threads Pool - C Language
thpool.c
Vai alla documentazione di questo file.
00001 
00022 #include "thpool.h"
00023 
00025 void *thWorker(void * arg);
00026 
00027 th_pool * poolInit(int dim, int job_max, sigset_t * set){
00028         /* Dichiarazione della struttura che conterra' questo pool di threads */
00029         th_pool *pool = NULL;
00030         int n=0;
00031 
00032         if (dim<1){
00033                 errno = EINVAL;
00034                 sys_err(__FILE__,__LINE__,"PoolInit: dimensione del pool di thread errata");
00035                 return NULL;
00036         }
00037         if (job_max<1){
00038                 errno = EINVAL;
00039                 sys_err(__FILE__,__LINE__,"PoolInit: numero massimo di job errato");
00040                 return NULL;
00041         }
00042 
00043         /* Allocazione memoria per la struttura del pool */
00044         if (!(pool = (th_pool*) malloc(sizeof(th_pool)))){
00045                 sys_err(__FILE__,__LINE__,"PoolInit: errore calloc");
00046                 return NULL;
00047         }
00048 
00049         /* Setto i parametri operativi del pool */
00050         pool->pool_size = dim;
00051         pool->signalmask = set;
00052         pool->job_max = job_max;
00053         pool->firstjob = NULL;
00054         pool->lastjob = NULL;
00055         pool->quitflag = 0;
00056         pool->shutdown = 0;
00057         pool->job_size = 0;
00058 
00059         /* Creazione array dei tids dei threads */
00060         if(!(pool->tids = (pthread_t*) calloc(pool->pool_size, sizeof(pthread_t)))){
00061                 sys_err(__FILE__,__LINE__,"PoolInit: errore calloc",errno);
00062                 free(pool);
00063                 return NULL;
00064         }
00065 
00066         /* Inizializzo il mutex e le cond 'full' e 'empty' per la lista dei job */
00067         if (pthread_mutex_init(&(pool->lock), NULL)){
00068                 sys_err(__FILE__,__LINE__,"PoolInit: errore pthread_mutex_init");
00069                 free(pool->tids); free(pool);
00070                 return NULL;
00071         }
00072 
00073         if (pthread_cond_init(&(pool->notempty),NULL)){
00074                 sys_err(__FILE__,__LINE__,"PoolInit: errore pthread_cond_init notempty");
00075                 free(pool->tids); free(pool);
00076                 return NULL;
00077         }
00078 
00079         if (pthread_cond_init(&(pool->empty),NULL)){
00080                 sys_err(__FILE__,__LINE__,"PoolInit: errore pthread_cond_init empty");
00081                 free(pool->tids); free(pool);
00082                 return NULL;
00083         }
00084 
00085         /* Creazione dei threads */
00086         for (n = 0; n < pool->pool_size; n++)
00087                 if (pthread_create(&(pool->tids[n]), NULL, &thWorker, pool) != 0){
00088                         sys_err(__FILE__,__LINE__,"PoolInit: errore pthread_create",errno);
00089                         free(pool->tids); free(pool);
00090                         return NULL;
00091                 }
00092 
00093         return pool;
00094 }
00095 
00096 /* Aggiungo un nuovo job alla lista dei job */
00097 int poolDispatcher(th_pool *pool, void *(*start_routine)(void*), void *arg){
00098         joblist * newjob = NULL;
00099 
00100         if (!pool){
00101                 errno = EINVAL;
00102                 sys_err(__FILE__,__LINE__,"Dispatch: errore pool e' nullo");
00103                 return -1;
00104         }
00105         if (!start_routine){
00106                 errno = EINVAL;
00107                 sys_err(__FILE__,__LINE__,"Dispatch: errore start_routine e' nulla");
00108                 return -2;
00109         }
00110 
00111         if (!(newjob = (struct joblist *) malloc(sizeof(struct joblist)))){
00112                 sys_err(__FILE__,__LINE__,"Dispatch: errore malloc(sizeof(struct joblist)");
00113                 return -3;
00114         }
00115         newjob->next = NULL;
00116         newjob->arg = arg;
00117         newjob->start_routine = start_routine;
00118 
00119         /* Accesso in muta esclusione alla lista dei job */
00120         pthread_mutex_lock(&(pool->lock));
00121 
00122         /* Se il pool di threads e' in fase di terminazione rifiuto il job */
00123         if (pool->quitflag){
00124                 /*errore(__FILE__,__LINE__,"Dispatch: errore pool di thread in fase di chiusura",ESHUTDOWN);*/
00125                 errno = EINVAL;
00126                 sys_err(__FILE__,__LINE__,"Dispatch: errore pool di thread in fase di chiusura");
00127                 free(newjob);
00128                 pthread_mutex_unlock(&(pool->lock));
00129                 return -4;
00130         }
00131 
00132         if (pool->job_size==pool->job_max){
00133                 errno = EAGAIN;
00134                 sys_err(__FILE__,__LINE__,"Dispatch: errore troppi job in attesa nella coda del pool di threads");
00135                 free(newjob);
00136                 pthread_mutex_unlock(&(pool->lock));
00137                 return -5;
00138         }
00139 
00140         switch(pool->job_size){
00141                 case 0:/* lista dei jobs vuota -> inserimento in testa */
00142                         pool->firstjob = newjob;
00143                         pool->lastjob = newjob;
00144                         /* segnalo che la coda non e' piu' vuota */
00145                         pthread_cond_signal(&(pool->notempty));
00146                 break;
00147                 default:/* inserimento in coda */
00148                         pool->lastjob->next = newjob;
00149                         pool->lastjob = newjob;
00150                 break;
00151         }
00152         pool->job_size++;
00153 
00154         pthread_mutex_unlock(&(pool->lock));
00155         return 0;
00156 }
00157 
00158 int poolDestroy(th_pool *pool){
00159         int n=0;
00160 
00161         if (!pool){
00162                 errno = EINVAL;
00163                 sys_err(__FILE__,__LINE__,"PoolDestroy: errore pool e' nullo");
00164                 return -1;
00165         }
00166 
00167         pthread_mutex_lock(&(pool->lock));
00168         /* da questo momento in poi il pool rifiutera' nuovi job */
00169         pool->quitflag = 1;
00170         /* aspetto la terminazione dei job attivi e in coda */
00171         while (pool->job_size != 0)
00172                 pthread_cond_wait(&(pool->empty), &(pool->lock));
00173 
00174         /* termino definitivamente i threads del pool */
00175         pool->shutdown = 1;
00176         pthread_cond_broadcast(&(pool->notempty));
00177         pthread_mutex_unlock(&(pool->lock));
00178 
00179         for (n=0; n<pool->pool_size; n++){
00180                 pthread_cond_broadcast(&(pool->notempty));
00181                 if (pthread_join(pool->tids[n], NULL) != 0)
00182                         sys_err(__FILE__,__LINE__,"PoolDestroy: errore pthred_join");
00183         }
00184 
00185         /* rilascio le risorse */
00186         pthread_mutex_destroy(&(pool->lock));
00187         pthread_cond_destroy(&(pool->empty));
00188         pthread_cond_destroy(&(pool->notempty));
00189 
00190         free(pool->tids);
00191         free(pool);
00192         pool = NULL;
00193         return 0;
00194 }
00195 
00196 void *thWorker(void *arg){
00197         th_pool *pool = NULL;
00198         joblist * job = NULL;
00199 
00200         pool = (th_pool *) arg;
00201 
00202         /* Gestine dei segnali, se voluta */
00203         if (pool->signalmask)
00204                 if (pthread_sigmask(SIG_SETMASK, pool->signalmask, NULL) != 0)
00205                         sys_err(__FILE__,__LINE__,"thWorker: pthread_sigmask error 'SIG_SETMASK'");
00206 
00207         for (;;){
00208                 /* Accesso in muta esclusione alla lista dei jobs */
00209                 pthread_mutex_lock(&(pool->lock));
00210 
00211                 while (pool->job_size == 0){    /* coda vuota -> attesa */
00212                         /* rilascio il lock e attendo */
00213                         pthread_mutex_unlock(&(pool->lock));
00214 
00215                         /* controllo che il pool non sia in fase di chiusura */
00216                         if (pool->shutdown){
00217                                 pthread_mutex_unlock(&(pool->lock));
00218                                 return NULL; /* pthread_exit(NULL); NO THANKS!! */
00219                         }
00220 
00221                         pthread_cond_wait(&(pool->notempty), &(pool->lock));
00222                 }
00223 
00224                 job = pool->firstjob;
00225                 pool->firstjob = job->next;
00226                 pool->job_size--;
00227 
00228                 /* se la coda dei job e' vuota */
00229                 if (pool->job_size==0)
00230                         pool->lastjob = NULL;
00231 
00232                 /* segnalo che la coda e' nuovamente vuota (vedi poolDestroy) */
00233                 if (pool->job_size == 0 && pool->quitflag)
00234                         pthread_cond_signal(&(pool->empty));
00235 
00236                 pthread_mutex_unlock(&(pool->lock));
00237 
00238                 /* Eseguo il job */
00239                 job->start_routine(job->arg);
00240 
00241                 free(job);
00242                 job = NULL;
00243         }
00244         pthread_exit((void*)0);
00245 }