Threads Pool - C Language
|
00001 00022 #include "thpool.h" 00023 00025 void *thWorker(void * arg); 00026 00027 th_pool * poolInit(int dim, int job_max, sigset_t * set){ 00028 /* Dichiarazione della struttura che conterra' questo pool di threads */ 00029 th_pool *pool = NULL; 00030 int n=0; 00031 00032 if (dim<1){ 00033 errno = EINVAL; 00034 sys_err(__FILE__,__LINE__,"PoolInit: dimensione del pool di thread errata"); 00035 return NULL; 00036 } 00037 if (job_max<1){ 00038 errno = EINVAL; 00039 sys_err(__FILE__,__LINE__,"PoolInit: numero massimo di job errato"); 00040 return NULL; 00041 } 00042 00043 /* Allocazione memoria per la struttura del pool */ 00044 if (!(pool = (th_pool*) malloc(sizeof(th_pool)))){ 00045 sys_err(__FILE__,__LINE__,"PoolInit: errore calloc"); 00046 return NULL; 00047 } 00048 00049 /* Setto i parametri operativi del pool */ 00050 pool->pool_size = dim; 00051 pool->signalmask = set; 00052 pool->job_max = job_max; 00053 pool->firstjob = NULL; 00054 pool->lastjob = NULL; 00055 pool->quitflag = 0; 00056 pool->shutdown = 0; 00057 pool->job_size = 0; 00058 00059 /* Creazione array dei tids dei threads */ 00060 if(!(pool->tids = (pthread_t*) calloc(pool->pool_size, sizeof(pthread_t)))){ 00061 sys_err(__FILE__,__LINE__,"PoolInit: errore calloc",errno); 00062 free(pool); 00063 return NULL; 00064 } 00065 00066 /* Inizializzo il mutex e le cond 'full' e 'empty' per la lista dei job */ 00067 if (pthread_mutex_init(&(pool->lock), NULL)){ 00068 sys_err(__FILE__,__LINE__,"PoolInit: errore pthread_mutex_init"); 00069 free(pool->tids); free(pool); 00070 return NULL; 00071 } 00072 00073 if (pthread_cond_init(&(pool->notempty),NULL)){ 00074 sys_err(__FILE__,__LINE__,"PoolInit: errore pthread_cond_init notempty"); 00075 free(pool->tids); free(pool); 00076 return NULL; 00077 } 00078 00079 if (pthread_cond_init(&(pool->empty),NULL)){ 00080 sys_err(__FILE__,__LINE__,"PoolInit: errore pthread_cond_init empty"); 00081 free(pool->tids); free(pool); 00082 return NULL; 00083 } 00084 00085 /* Creazione dei threads */ 00086 for (n = 0; n < pool->pool_size; n++) 00087 if (pthread_create(&(pool->tids[n]), NULL, &thWorker, pool) != 0){ 00088 sys_err(__FILE__,__LINE__,"PoolInit: errore pthread_create",errno); 00089 free(pool->tids); free(pool); 00090 return NULL; 00091 } 00092 00093 return pool; 00094 } 00095 00096 /* Aggiungo un nuovo job alla lista dei job */ 00097 int poolDispatcher(th_pool *pool, void *(*start_routine)(void*), void *arg){ 00098 joblist * newjob = NULL; 00099 00100 if (!pool){ 00101 errno = EINVAL; 00102 sys_err(__FILE__,__LINE__,"Dispatch: errore pool e' nullo"); 00103 return -1; 00104 } 00105 if (!start_routine){ 00106 errno = EINVAL; 00107 sys_err(__FILE__,__LINE__,"Dispatch: errore start_routine e' nulla"); 00108 return -2; 00109 } 00110 00111 if (!(newjob = (struct joblist *) malloc(sizeof(struct joblist)))){ 00112 sys_err(__FILE__,__LINE__,"Dispatch: errore malloc(sizeof(struct joblist)"); 00113 return -3; 00114 } 00115 newjob->next = NULL; 00116 newjob->arg = arg; 00117 newjob->start_routine = start_routine; 00118 00119 /* Accesso in muta esclusione alla lista dei job */ 00120 pthread_mutex_lock(&(pool->lock)); 00121 00122 /* Se il pool di threads e' in fase di terminazione rifiuto il job */ 00123 if (pool->quitflag){ 00124 /*errore(__FILE__,__LINE__,"Dispatch: errore pool di thread in fase di chiusura",ESHUTDOWN);*/ 00125 errno = EINVAL; 00126 sys_err(__FILE__,__LINE__,"Dispatch: errore pool di thread in fase di chiusura"); 00127 free(newjob); 00128 pthread_mutex_unlock(&(pool->lock)); 00129 return -4; 00130 } 00131 00132 if (pool->job_size==pool->job_max){ 00133 errno = EAGAIN; 00134 sys_err(__FILE__,__LINE__,"Dispatch: errore troppi job in attesa nella coda del pool di threads"); 00135 free(newjob); 00136 pthread_mutex_unlock(&(pool->lock)); 00137 return -5; 00138 } 00139 00140 switch(pool->job_size){ 00141 case 0:/* lista dei jobs vuota -> inserimento in testa */ 00142 pool->firstjob = newjob; 00143 pool->lastjob = newjob; 00144 /* segnalo che la coda non e' piu' vuota */ 00145 pthread_cond_signal(&(pool->notempty)); 00146 break; 00147 default:/* inserimento in coda */ 00148 pool->lastjob->next = newjob; 00149 pool->lastjob = newjob; 00150 break; 00151 } 00152 pool->job_size++; 00153 00154 pthread_mutex_unlock(&(pool->lock)); 00155 return 0; 00156 } 00157 00158 int poolDestroy(th_pool *pool){ 00159 int n=0; 00160 00161 if (!pool){ 00162 errno = EINVAL; 00163 sys_err(__FILE__,__LINE__,"PoolDestroy: errore pool e' nullo"); 00164 return -1; 00165 } 00166 00167 pthread_mutex_lock(&(pool->lock)); 00168 /* da questo momento in poi il pool rifiutera' nuovi job */ 00169 pool->quitflag = 1; 00170 /* aspetto la terminazione dei job attivi e in coda */ 00171 while (pool->job_size != 0) 00172 pthread_cond_wait(&(pool->empty), &(pool->lock)); 00173 00174 /* termino definitivamente i threads del pool */ 00175 pool->shutdown = 1; 00176 pthread_cond_broadcast(&(pool->notempty)); 00177 pthread_mutex_unlock(&(pool->lock)); 00178 00179 for (n=0; n<pool->pool_size; n++){ 00180 pthread_cond_broadcast(&(pool->notempty)); 00181 if (pthread_join(pool->tids[n], NULL) != 0) 00182 sys_err(__FILE__,__LINE__,"PoolDestroy: errore pthred_join"); 00183 } 00184 00185 /* rilascio le risorse */ 00186 pthread_mutex_destroy(&(pool->lock)); 00187 pthread_cond_destroy(&(pool->empty)); 00188 pthread_cond_destroy(&(pool->notempty)); 00189 00190 free(pool->tids); 00191 free(pool); 00192 pool = NULL; 00193 return 0; 00194 } 00195 00196 void *thWorker(void *arg){ 00197 th_pool *pool = NULL; 00198 joblist * job = NULL; 00199 00200 pool = (th_pool *) arg; 00201 00202 /* Gestine dei segnali, se voluta */ 00203 if (pool->signalmask) 00204 if (pthread_sigmask(SIG_SETMASK, pool->signalmask, NULL) != 0) 00205 sys_err(__FILE__,__LINE__,"thWorker: pthread_sigmask error 'SIG_SETMASK'"); 00206 00207 for (;;){ 00208 /* Accesso in muta esclusione alla lista dei jobs */ 00209 pthread_mutex_lock(&(pool->lock)); 00210 00211 while (pool->job_size == 0){ /* coda vuota -> attesa */ 00212 /* rilascio il lock e attendo */ 00213 pthread_mutex_unlock(&(pool->lock)); 00214 00215 /* controllo che il pool non sia in fase di chiusura */ 00216 if (pool->shutdown){ 00217 pthread_mutex_unlock(&(pool->lock)); 00218 return NULL; /* pthread_exit(NULL); NO THANKS!! */ 00219 } 00220 00221 pthread_cond_wait(&(pool->notempty), &(pool->lock)); 00222 } 00223 00224 job = pool->firstjob; 00225 pool->firstjob = job->next; 00226 pool->job_size--; 00227 00228 /* se la coda dei job e' vuota */ 00229 if (pool->job_size==0) 00230 pool->lastjob = NULL; 00231 00232 /* segnalo che la coda e' nuovamente vuota (vedi poolDestroy) */ 00233 if (pool->job_size == 0 && pool->quitflag) 00234 pthread_cond_signal(&(pool->empty)); 00235 00236 pthread_mutex_unlock(&(pool->lock)); 00237 00238 /* Eseguo il job */ 00239 job->start_routine(job->arg); 00240 00241 free(job); 00242 job = NULL; 00243 } 00244 pthread_exit((void*)0); 00245 }