/Users/maurits/Documents/studie/afstuderen/biosphere/daemon/thread.c

Go to the documentation of this file.
00001 /*
00002  * Author: MA Hartman
00003  * Date: feb 15, 2007
00004  * 
00005  * Function:
00006  * Creation and cleanup of threads.
00007  * 
00008  * License information:
00009  * 
00010  * Copyright (c) 2006 Maurits Hartman
00011  *
00012  * Permission is hereby granted, free of charge, to any person
00013  * obtaining a copy of this software and associated documentation
00014  * files (the "Software"), to deal in the Software without
00015  * restriction, including without limitation the rights to use,
00016  * copy, modify, merge, publish, distribute, sublicense, and/or sell
00017  * copies of the Software, and to permit persons to whom the
00018  * Software is furnished to do so, subject to the following
00019  * conditions:
00020  * 
00021  * The above copyright notice and this permission notice shall be
00022  * included in all copies or substantial portions of the Software.
00023  * 
00024  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
00025  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
00026  * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
00027  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
00028  * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
00029  * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
00030  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
00031  * OTHER DEALINGS IN THE SOFTWARE.
00032  * 
00033  */
00034 
00035 #include "thread.h"
00036 #include "http.h"
00037 #include "module.h"
00038 #include "mutex.h"
00039 #include "options.h"
00040 #include "settings.h"
00041 #include <biosphere.h>
00042 #include <copy.h>
00043 #include <list.h>
00044 
00045 #include <apr_file_io.h>
00046 #include <apr_strings.h>
00047 #include <apr_network_io.h>
00048 #include <apr_thread_proc.h>
00049 #include <apr_time.h>
00050 #include <assert.h>
00051 #include <string.h>
00052 #include <stdlib.h>
00053 #include <stdio.h>
00054 #include <sys/time.h>
00055 
00056 
00060 typedef struct worker_info {
00061         apr_socket_t *sock;     
00062         apr_time_t ts;          
00063         apr_pool_t *pool;       
00064 } worker_info;
00065 
00069 typedef struct worker_thread_args {
00070         apr_socket_t *sock;     
00071         apr_pool_t *pool;       
00072 } worker_thread_args;
00073 
00078 #define MAX_HEADER_CHECK_ATTEMPTS 4
00079 
00080 extern apr_pool_t *global_mp;
00081 
00083 static apr_socket_t *listener_socket;
00084 
00088 static bs_list *sockets;
00089 
00090 
00097 static bs_status init_listen_socket(apr_socket_t **sock,
00098                 apr_pool_t *mp)
00099 {
00100     apr_status_t rv;
00101     apr_socket_t *s;
00102     apr_sockaddr_t *sa;
00103     
00104     /* Set the sockaddr info: */
00105     rv = apr_sockaddr_info_get(&sa,
00106                 NULL, /* hostname */
00107                 APR_INET,
00108                 global_listen_port,
00109                 0, /* flags */
00110                 mp);
00111     if (rv != APR_SUCCESS) {
00112         char err[256];
00113         apr_strerror(rv, err, sizeof(err));
00114         fprintf(stderr, "Unable to create socket address: %s\n", err);
00115         return BS_ERROR;
00116     }
00117     
00118     /* Create the socket: */
00119     rv = apr_socket_create(&s, sa->family, SOCK_STREAM, APR_PROTO_TCP, mp);
00120     if (rv != APR_SUCCESS) {
00121         char err[256];
00122         apr_strerror(rv, err, sizeof(err));
00123         fprintf(stderr, "Unable to create socket: %s\n", err);
00124         return BS_ERROR;
00125     }
00126 
00127     /* It is a good idea to specify socket options explicitly.
00128      * in this case, we make a blocking-forever socket as the
00129      * listening socket: */
00130     apr_socket_opt_set(s, APR_SO_NONBLOCK, 0);
00131     apr_socket_timeout_set(s, -1);
00132     /* this is useful for a server(socket listening) process.
00133      * When the server crashes and quickly restarted, it can use
00134      * the same socket again: */
00135     apr_socket_opt_set(s, APR_SO_REUSEADDR, 1);
00136 
00137         /* Bind the socket to the address and start listening: */
00138     rv = apr_socket_bind(s, sa);
00139     if (rv != APR_SUCCESS) {
00140         char err[256];
00141         apr_strerror(rv, err, sizeof(err));
00142         fprintf(stderr, "Unable to bind to address: %s\n", err);
00143                 apr_socket_close(s);
00144         return BS_ERROR;
00145     }
00146     rv = apr_socket_listen(s, SOMAXCONN);
00147     if (rv != APR_SUCCESS) {
00148         char err[256];
00149         apr_strerror(rv, err, sizeof(err));
00150         fprintf(stderr, "Unable to start listening on socket: %s\n", err);
00151         apr_socket_close(s);
00152         return BS_ERROR;
00153     }
00154 
00155     *sock = s;
00156     return BS_OK;
00157 }
00158 
00159 
00167 static void remove_from_socket_list(apr_socket_t *sock)
00168 {
00169         unsigned i;
00170         lock_thread_mux();
00171         for (i = 0; i < list_size(sockets); i++) {
00172                 worker_info *wi = list_index(sockets, i);
00173                 if (wi->sock == sock) {
00174                         list_remove(sockets, i);
00175                         break;  
00176                 }       
00177         }
00178         unlock_thread_mux();
00179 }
00180 
00181 
00185 static void add_to_socket_list(apr_socket_t *sock, apr_pool_t *mp)
00186 {
00187         worker_info *wi = (worker_info *) apr_palloc(mp, sizeof(worker_info));
00188         wi->sock = sock;
00189         wi->ts = apr_time_now();
00190         wi->pool = mp;
00191         
00192         lock_thread_mux();
00193         list_append(sockets, wi);
00194         unlock_thread_mux();
00195 }
00196 
00197 
00207 static void* APR_THREAD_FUNC worker_thread_main(apr_thread_t *thd,
00208                 void *data)
00209 {
00210         apr_status_t rv;
00211         worker_thread_args *args;
00212         apr_socket_t *sock;
00213         apr_pool_t *mp; /* Thread's private, standalone pool */
00214         struct timeval t;
00215         
00216         char *headerbuf = NULL; /* Buffer for the HTTP header data */
00217         apr_size_t headerlen = 0;
00218         int attempts = 0;
00219         
00220         args = data;
00221         sock = args->sock;
00222         mp = args->pool;
00223         add_to_socket_list(sock, mp);
00224         
00225         /* Start reading header info from the socket until full headers are
00226          * received: */
00227         while (!received_full_headers(headerbuf)) {
00228                 char tmpbuf[HTTP_BUFFER_SIZE];
00229                 apr_size_t tmpbuflen = sizeof(tmpbuf) - 1;
00230                 memset(tmpbuf, '\0', tmpbuflen + 1); /* Null-terminated */
00231                 
00232                 rv = apr_socket_recv(sock, tmpbuf, &tmpbuflen);
00233                 if (rv == APR_EOF || tmpbuflen == 0) {
00234                         apr_socket_close(sock);
00235                         remove_from_socket_list(sock);
00236                         apr_pool_destroy(mp);
00237                         return NULL;
00238                 }
00239                 
00240                 if (headerbuf == NULL)
00241                         headerbuf = apr_pstrndup(mp, tmpbuf, tmpbuflen);
00242                 else
00243                         headerbuf = apr_pstrcat(mp, headerbuf, tmpbuf, NULL);
00244                 headerlen = strlen(headerbuf);
00245                 
00246                 /* If http header collection takes too long, exit: */
00247                 attempts++;
00248                 if (attempts > MAX_HEADER_CHECK_ATTEMPTS) {
00249                         send_http_error(sock, "HTTP/1.0 406 Not Acceptable", mp);
00250                         fprintf(stderr, "Too many connections\n");
00251                         apr_socket_close(sock);
00252                         remove_from_socket_list(sock);
00253                         apr_pool_destroy(mp);
00254                         return NULL;    
00255                 }
00256         }
00257         
00258         /* Determine the kind of request: */
00259         if (headerbuf[0] == 'P')
00260                 handle_http_post(sock, headerbuf, strlen(headerbuf), mp);
00261         else if (headerbuf[0] == 'G')
00262                 handle_http_get(sock, mp);
00263         else send_http_error(sock, "HTTP/1.0 404 Not Found", mp);
00264         
00265         gettimeofday(&t, 0);
00266         printf("%ld.%06ld\n", t.tv_sec, t.tv_usec);
00267         
00268         /* Wrapup and exit worker thread: */
00269         apr_socket_close(sock);
00270         remove_from_socket_list(sock);
00271         apr_pool_destroy(mp);
00272         return NULL;    
00273 }
00274 
00275 
00282 static bs_status setup_thread(apr_socket_t *sock)
00283 {
00284         apr_status_t rv;
00285         apr_thread_t *thd;
00286         apr_threadattr_t *thd_attr;
00287         apr_pool_t *thread_pool;
00288         
00289         /* Set the attributes to create a default detachable thread: */
00290         rv = apr_pool_create(&thread_pool, NULL);
00291         if (rv != APR_SUCCESS) return BS_ERROR;
00292         apr_threadattr_create(&thd_attr, thread_pool);
00293         if (rv != APR_SUCCESS) return BS_ERROR;
00294 
00295         /* Create the arguments for the thread: */
00296         worker_thread_args *args =
00297                         (worker_thread_args *) apr_palloc(thread_pool,
00298                         sizeof(worker_thread_args));
00299         args->pool = thread_pool;
00300         args->sock = sock;
00301 
00302         rv = apr_thread_create(&thd,
00303                         thd_attr,
00304                         worker_thread_main,
00305                         args,
00306                         thread_pool);
00307         if (rv != APR_SUCCESS) return BS_ERROR;
00308         
00309         return BS_OK;
00310 }
00311 
00312 
00313 bs_status init_threadsystem(apr_pool_t *mp)
00314 {
00315         sockets = new_list(mp);
00316         return init_listen_socket(&listener_socket, mp);
00317 }
00318 
00319 
00323 static void remove_oldest_thread(void)
00324 {
00325         unsigned i;
00326         apr_time_t oldest = apr_time_now();
00327         apr_socket_t *socket_to_kill = NULL;
00328         apr_pool_t *pool_to_destroy = NULL;
00329         
00330         apr_sleep(10000); /* Wait for thread to finish */
00331         
00332         /* Forcefully remove the oldest */
00333         for (i = 0; i < list_size(sockets); i++) {
00334                 worker_info *wi = (worker_info *) list_index(sockets, i);
00335                 if (wi->ts < oldest) {
00336                         oldest = wi->ts;
00337                         socket_to_kill = wi->sock;
00338                         pool_to_destroy = wi->pool;
00339                 }
00340         }
00341         
00342         assert(socket_to_kill && pool_to_destroy);
00343         fprintf(stderr, "Had to forcefully kill worker thread.\n");
00344         remove_from_socket_list(socket_to_kill);
00345         apr_pool_destroy(pool_to_destroy);      
00346 }
00347 
00348 
00358 void lets_go_threaded(void)
00359 {
00360         apr_pool_t *mp;
00361         apr_status_t rv;
00362         struct timeval t;
00363         
00364         rv = apr_pool_create(&mp, global_mp);
00365         if (rv != APR_SUCCESS) return;
00366         
00367         /* The main loop waiting for incoming connections: */
00368         while (1) {
00369                 bs_status rv2;
00370                 apr_socket_t *ns;
00371                 
00372                 /* If the list is full, wait for the oldest thread: */
00373                 lock_thread_mux();
00374         if (list_size(sockets) >= BIOSPHERED_MAX_CONNECTIONS)
00375                 remove_oldest_thread();
00376         unlock_thread_mux();
00377                 
00378                 /* Wait for an incoming connection and create a thread for it:
00379                  * specify socket options explicitly: non-blocking, no time-out:
00380                  */
00381         rv = apr_socket_accept(&ns, listener_socket, mp);
00382         if (rv != APR_SUCCESS) break;
00383         apr_socket_opt_set(ns, APR_SO_NONBLOCK, 0);
00384         apr_socket_timeout_set(ns, -1);
00385         
00386         gettimeofday(&t, 0);
00387                 printf("%ld.%06ld,", t.tv_sec, t.tv_usec);
00388         
00389         /* Setup the worker thread: */
00390         rv2 = setup_thread(ns);
00391         if (rv2 != BS_OK) return;
00392         }
00393 }
00394 
00395 
00399 typedef struct shutdown_info {
00400         bs_uint32 wait_time;            
00401         apr_pool_t *pool;       
00402 } shutdown_info;
00403 
00404 
00414 static void* APR_THREAD_FUNC shutdown_thread_main(apr_thread_t *thd,
00415                 void *sinfo)
00416 {
00417         unsigned i;
00418         int exit_code = BS_OK;
00419         shutdown_info *si = sinfo;
00420         
00421         /* Wait the specified amount of seconds; */
00422         printf("Shutdown process started, waiting %d microseconds\n", si->wait_time);
00423         apr_sleep(si->wait_time);
00424         apr_socket_close(listener_socket);
00425         
00426         /* Wait 10 seconds before cleaning all the worker threads: */
00427         printf("Allowing remaining threads 10 seconds to finish\n");
00428         apr_sleep(10*1000*1000);
00429         printf("Killing remaining worker threads\n");
00430         
00431         lock_thread_mux();
00432         for (i = 0; i < list_size(sockets); i++) {
00433                 apr_socket_t *sock = list_index(sockets, i);
00434                 apr_socket_close(sock);
00435                 exit_code += BS_ERROR;
00436         }
00437         unlock_thread_mux();
00438         
00439         exit_code += unload_all_modules();
00440         
00441         if (i > 0)
00442                 fprintf(stderr, "There were %d remaining zombie worker threads to kill.\n", i);
00443         
00444         apr_pool_destroy(si->pool);
00445         exit(exit_code);
00446 }
00447 
00448 
00449 bs_status stop_daemon(bs_uint32 num_usecs)
00450 {
00451         apr_status_t rv;
00452         apr_thread_t *thd;
00453         apr_threadattr_t *thd_attr;
00454         apr_pool_t *thread_pool;
00455         shutdown_info *sinfo;
00456         
00457         /* Set the attributes to create a default detachable thread: */
00458         rv = apr_pool_create(&thread_pool, NULL);
00459         if (rv != APR_SUCCESS) return BS_ERROR;
00460         apr_threadattr_create(&thd_attr, thread_pool);
00461         if (rv != APR_SUCCESS) return BS_ERROR;
00462         
00463         sinfo = (shutdown_info *) apr_palloc(thread_pool, sizeof(shutdown_info));
00464         sinfo->wait_time = num_usecs;
00465         sinfo->pool = thread_pool;
00466 
00467         rv = apr_thread_create(&thd,
00468                         thd_attr,
00469                         shutdown_thread_main,
00470                         sinfo,
00471                         thread_pool);
00472         if (rv != APR_SUCCESS) return BS_ERROR;
00473         return BS_OK;   
00474 }
00475 

Generated on Tue Jul 17 09:50:52 2007 for Bio-SPHERE by  doxygen 1.5.1