TCP Socket - C Language
|
00001 00012 #include "pool.h" 00013 #include "errore.h" 00014 00016 void *thWorker(void * arg); 00017 00018 th_pool * poolInit(int dim, int job_max, sigset_t * set){ 00019 /* Dichiarazione della struttura che conterra' questo pool di threads */ 00020 th_pool *pool = NULL; 00021 int n=0; 00022 00023 if (dim<1){ 00024 errno = EINVAL; 00025 sys_err(__FILE__,__LINE__,"PoolInit: dimensione del pool di thread errata"); 00026 return NULL; 00027 } 00028 if (job_max<1){ 00029 errno = EINVAL; 00030 sys_err(__FILE__,__LINE__,"PoolInit: numero massimo di job errato"); 00031 return NULL; 00032 } 00033 00034 /* Allocazione memoria per la struttura del pool */ 00035 pool = (th_pool*) calloc(1, sizeof(th_pool)); 00036 /* Setto i parametri operativi del pool */ 00037 pool->pool_size = dim; 00038 pool->signalmask = set; 00039 pool->job_max = job_max; 00040 /*pool->firstjob = NULL; 00041 pool->lastjob = NULL; 00042 pool->job_size = 0;*/ 00043 /* Creazione array dei tids dei threads */ 00044 pool->tids = (pthread_t*) calloc(pool->pool_size, sizeof(pthread_t)); 00045 00046 /* Inizializzo il mutex e le cond 'notempty' e 'empty' per la lista dei job */ 00047 if (pthread_mutex_init(&(pool->lock), NULL)) 00048 sys_err(__FILE__,__LINE__,"PoolInit: errore pthread_mutex_init"); 00049 00050 if (pthread_cond_init(&(pool->notempty),NULL)) 00051 sys_err(__FILE__,__LINE__,"PoolInit: errore pthread_cond_init notempty"); 00052 00053 if (pthread_cond_init(&(pool->empty),NULL)) 00054 sys_err(__FILE__,__LINE__,"PoolInit: errore pthread_cond_init empty"); 00055 00056 /* Creazione dei threads */ 00057 for (n = 0; n < pool->pool_size; n++) 00058 if (pthread_create(&(pool->tids[n]), NULL, &thWorker, pool) != 0){ 00059 sys_err(__FILE__,__LINE__,"PoolInit: errore pthread_create"); 00060 /* Manca una reale gestione degli errori 00061 * in caso la pthread_create fallisca */ 00062 } 00063 00064 return pool; 00065 } 00066 00067 /* Aggiungo un nuovo job alla lista dei job */ 00068 int poolDispatcher(th_pool *pool, void *(*start_routine)(void*), void *arg){ 00069 joblist * newjob = NULL; 00070 00071 if (!pool){ 00072 errno = EINVAL; 00073 sys_err(__FILE__,__LINE__,"Dispatch: errore pool e' nullo"); 00074 return -1; 00075 } 00076 if (!start_routine){ 00077 errno = EINVAL; 00078 sys_err(__FILE__,__LINE__,"Dispatch: errore start_routine e' nulla"); 00079 return -2; 00080 } 00081 if (!(newjob = (struct joblist *) malloc(sizeof(struct joblist)))){ 00082 sys_err(__FILE__,__LINE__,"Dispatch: errore malloc(sizeof(struct joblist)"); 00083 return -3; 00084 } 00085 newjob->next = NULL; 00086 newjob->arg = arg; 00087 newjob->start_routine = start_routine; 00088 00089 /* Accesso in muta esclusione alla lista dei job */ 00090 pthread_mutex_lock(&(pool->lock)); 00091 00092 /* Se il pool di threads e' in fase di terminazione rifiuto il job */ 00093 if (pool->quitflag){ 00094 errno = ESHUTDOWN; 00095 sys_err(__FILE__,__LINE__,"Dispatch: errore pool di thread in fase di chiusura"); 00096 free(newjob); 00097 pthread_mutex_unlock(&(pool->lock)); 00098 return -4; 00099 } 00100 00101 /* Se la coda di job e' piena rifiuto il nuovo job*/ 00102 if (pool->job_size==pool->job_max){ 00103 errno = EAGAIN; 00104 sys_err(__FILE__,__LINE__,"Dispatch: errore troppi job in attesa nella coda del pool"); 00105 free(newjob); 00106 pthread_mutex_unlock(&(pool->lock)); 00107 return -5; 00108 } 00109 00110 /* lista dei jobs vuota -> inserimento in testa */ 00111 if (pool->job_size == 0){ 00112 pool->firstjob = newjob; 00113 pool->lastjob = newjob; 00114 /* segnalo che la coda non e' piu' vuota */ 00115 pthread_cond_signal(&(pool->notempty)); 00116 } 00117 else{ /* inserimento in coda */ 00118 pool->lastjob->next = newjob; 00119 pool->lastjob = newjob; 00120 } 00121 pool->job_size++; 00122 00123 pthread_mutex_unlock(&(pool->lock)); 00124 return 0; 00125 } 00126 00127 int poolDestroy(th_pool *pool){ 00128 int n=0; 00129 00130 if (!pool){ 00131 errno = EINVAL; 00132 sys_err(__FILE__,__LINE__,"PoolDestroy: errore pool e' nullo"); 00133 return -1; 00134 } 00135 00136 pthread_mutex_lock(&(pool->lock)); 00137 /* da questo momento in poi il pool rifiutera' nuovi job */ 00138 pool->quitflag = 1; 00139 /* aspetto la terminazione dei job attivi e in coda */ 00140 while (pool->job_size != 0) 00141 pthread_cond_wait(&(pool->empty), &(pool->lock)); 00142 00143 /* termino definitivamente i threads del pool */ 00144 pool->shutdown = 1; 00145 pthread_cond_broadcast(&(pool->notempty)); 00146 pthread_mutex_unlock(&(pool->lock)); 00147 00148 for (n=0; n<pool->pool_size; n++){ 00149 pthread_cond_broadcast(&(pool->notempty)); 00150 if (pthread_join(pool->tids[n], NULL) != 0) 00151 sys_err(__FILE__,__LINE__,"PoolDestroy: errore pthred_join"); 00152 } 00153 00154 /* rilascio le risorse */ 00155 pthread_mutex_destroy(&(pool->lock)); 00156 pthread_cond_destroy(&(pool->empty)); 00157 pthread_cond_destroy(&(pool->notempty)); 00158 free(pool->tids); 00159 free(pool); 00160 pool = NULL; 00161 return 0; 00162 } 00163 00164 void *thWorker(void *arg){ 00165 th_pool *pool = NULL; 00166 joblist * job = NULL; 00167 00168 pool = (th_pool *) arg; 00169 00170 /* Gestine dei segnali, se voluta */ 00171 if (pool->signalmask && (pthread_sigmask(SIG_SETMASK, pool->signalmask, NULL) != 0)) 00172 sys_err(__FILE__,__LINE__,"thWorker: pthread_sigmask error 'SIG_SETMASK'"); 00173 00174 for (;;){ 00175 /* Accesso in muta esclusione alla lista dei jobs */ 00176 pthread_mutex_lock(&(pool->lock)); 00177 00178 while (pool->job_size == 0){ /* coda vuota -> attesa */ 00179 /* rilascio il lock e attendo */ 00180 pthread_mutex_unlock(&(pool->lock)); 00181 00182 /* controllo che il pool non sia in fase di chiusura */ 00183 if (pool->shutdown) 00184 pthread_exit(NULL); 00185 00186 pthread_cond_wait(&(pool->notempty), &(pool->lock)); 00187 00188 /* controllo che il pool non sia in fase di chiusura */ 00189 if (pool->shutdown) { 00190 pthread_mutex_unlock(&(pool->lock)); 00191 pthread_exit(NULL); 00192 } 00193 } 00194 00195 job = pool->firstjob; 00196 pool->firstjob = job->next; 00197 pool->job_size--; 00198 /* se la coda dei job e' vuota */ 00199 if (pool->job_size==0) 00200 pool->lastjob = NULL; 00201 00202 /* segnalo che la coda e' nuovamente vuota (vedi poolDestroy) */ 00203 if (pool->job_size == 0 && pool->quitflag) 00204 pthread_cond_signal(&(pool->empty)); 00205 00206 pthread_mutex_unlock(&(pool->lock)); 00207 00208 /* Eseguo il job */ 00209 job->start_routine(job->arg); 00210 00211 free(job); 00212 job = NULL; 00213 } 00214 pthread_exit(NULL); 00215 return NULL; 00216 }