00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include "thread.h"
00036 #include "http.h"
00037 #include "module.h"
00038 #include "mutex.h"
00039 #include "options.h"
00040 #include "settings.h"
00041 #include <biosphere.h>
00042 #include <copy.h>
00043 #include <list.h>
00044
00045 #include <apr_file_io.h>
00046 #include <apr_strings.h>
00047 #include <apr_network_io.h>
00048 #include <apr_thread_proc.h>
00049 #include <apr_time.h>
00050 #include <assert.h>
00051 #include <string.h>
00052 #include <stdlib.h>
00053 #include <stdio.h>
00054 #include <sys/time.h>
00055
00056
00060 typedef struct worker_info {
00061 apr_socket_t *sock;
00062 apr_time_t ts;
00063 apr_pool_t *pool;
00064 } worker_info;
00065
00069 typedef struct worker_thread_args {
00070 apr_socket_t *sock;
00071 apr_pool_t *pool;
00072 } worker_thread_args;
00073
00078 #define MAX_HEADER_CHECK_ATTEMPTS 4
00079
00080 extern apr_pool_t *global_mp;
00081
00083 static apr_socket_t *listener_socket;
00084
00088 static bs_list *sockets;
00089
00090
00097 static bs_status init_listen_socket(apr_socket_t **sock,
00098 apr_pool_t *mp)
00099 {
00100 apr_status_t rv;
00101 apr_socket_t *s;
00102 apr_sockaddr_t *sa;
00103
00104
00105 rv = apr_sockaddr_info_get(&sa,
00106 NULL,
00107 APR_INET,
00108 global_listen_port,
00109 0,
00110 mp);
00111 if (rv != APR_SUCCESS) {
00112 char err[256];
00113 apr_strerror(rv, err, sizeof(err));
00114 fprintf(stderr, "Unable to create socket address: %s\n", err);
00115 return BS_ERROR;
00116 }
00117
00118
00119 rv = apr_socket_create(&s, sa->family, SOCK_STREAM, APR_PROTO_TCP, mp);
00120 if (rv != APR_SUCCESS) {
00121 char err[256];
00122 apr_strerror(rv, err, sizeof(err));
00123 fprintf(stderr, "Unable to create socket: %s\n", err);
00124 return BS_ERROR;
00125 }
00126
00127
00128
00129
00130 apr_socket_opt_set(s, APR_SO_NONBLOCK, 0);
00131 apr_socket_timeout_set(s, -1);
00132
00133
00134
00135 apr_socket_opt_set(s, APR_SO_REUSEADDR, 1);
00136
00137
00138 rv = apr_socket_bind(s, sa);
00139 if (rv != APR_SUCCESS) {
00140 char err[256];
00141 apr_strerror(rv, err, sizeof(err));
00142 fprintf(stderr, "Unable to bind to address: %s\n", err);
00143 apr_socket_close(s);
00144 return BS_ERROR;
00145 }
00146 rv = apr_socket_listen(s, SOMAXCONN);
00147 if (rv != APR_SUCCESS) {
00148 char err[256];
00149 apr_strerror(rv, err, sizeof(err));
00150 fprintf(stderr, "Unable to start listening on socket: %s\n", err);
00151 apr_socket_close(s);
00152 return BS_ERROR;
00153 }
00154
00155 *sock = s;
00156 return BS_OK;
00157 }
00158
00159
00167 static void remove_from_socket_list(apr_socket_t *sock)
00168 {
00169 unsigned i;
00170 lock_thread_mux();
00171 for (i = 0; i < list_size(sockets); i++) {
00172 worker_info *wi = list_index(sockets, i);
00173 if (wi->sock == sock) {
00174 list_remove(sockets, i);
00175 break;
00176 }
00177 }
00178 unlock_thread_mux();
00179 }
00180
00181
00185 static void add_to_socket_list(apr_socket_t *sock, apr_pool_t *mp)
00186 {
00187 worker_info *wi = (worker_info *) apr_palloc(mp, sizeof(worker_info));
00188 wi->sock = sock;
00189 wi->ts = apr_time_now();
00190 wi->pool = mp;
00191
00192 lock_thread_mux();
00193 list_append(sockets, wi);
00194 unlock_thread_mux();
00195 }
00196
00197
00207 static void* APR_THREAD_FUNC worker_thread_main(apr_thread_t *thd,
00208 void *data)
00209 {
00210 apr_status_t rv;
00211 worker_thread_args *args;
00212 apr_socket_t *sock;
00213 apr_pool_t *mp;
00214 struct timeval t;
00215
00216 char *headerbuf = NULL;
00217 apr_size_t headerlen = 0;
00218 int attempts = 0;
00219
00220 args = data;
00221 sock = args->sock;
00222 mp = args->pool;
00223 add_to_socket_list(sock, mp);
00224
00225
00226
00227 while (!received_full_headers(headerbuf)) {
00228 char tmpbuf[HTTP_BUFFER_SIZE];
00229 apr_size_t tmpbuflen = sizeof(tmpbuf) - 1;
00230 memset(tmpbuf, '\0', tmpbuflen + 1);
00231
00232 rv = apr_socket_recv(sock, tmpbuf, &tmpbuflen);
00233 if (rv == APR_EOF || tmpbuflen == 0) {
00234 apr_socket_close(sock);
00235 remove_from_socket_list(sock);
00236 apr_pool_destroy(mp);
00237 return NULL;
00238 }
00239
00240 if (headerbuf == NULL)
00241 headerbuf = apr_pstrndup(mp, tmpbuf, tmpbuflen);
00242 else
00243 headerbuf = apr_pstrcat(mp, headerbuf, tmpbuf, NULL);
00244 headerlen = strlen(headerbuf);
00245
00246
00247 attempts++;
00248 if (attempts > MAX_HEADER_CHECK_ATTEMPTS) {
00249 send_http_error(sock, "HTTP/1.0 406 Not Acceptable", mp);
00250 fprintf(stderr, "Too many connections\n");
00251 apr_socket_close(sock);
00252 remove_from_socket_list(sock);
00253 apr_pool_destroy(mp);
00254 return NULL;
00255 }
00256 }
00257
00258
00259 if (headerbuf[0] == 'P')
00260 handle_http_post(sock, headerbuf, strlen(headerbuf), mp);
00261 else if (headerbuf[0] == 'G')
00262 handle_http_get(sock, mp);
00263 else send_http_error(sock, "HTTP/1.0 404 Not Found", mp);
00264
00265 gettimeofday(&t, 0);
00266 printf("%ld.%06ld\n", t.tv_sec, t.tv_usec);
00267
00268
00269 apr_socket_close(sock);
00270 remove_from_socket_list(sock);
00271 apr_pool_destroy(mp);
00272 return NULL;
00273 }
00274
00275
00282 static bs_status setup_thread(apr_socket_t *sock)
00283 {
00284 apr_status_t rv;
00285 apr_thread_t *thd;
00286 apr_threadattr_t *thd_attr;
00287 apr_pool_t *thread_pool;
00288
00289
00290 rv = apr_pool_create(&thread_pool, NULL);
00291 if (rv != APR_SUCCESS) return BS_ERROR;
00292 apr_threadattr_create(&thd_attr, thread_pool);
00293 if (rv != APR_SUCCESS) return BS_ERROR;
00294
00295
00296 worker_thread_args *args =
00297 (worker_thread_args *) apr_palloc(thread_pool,
00298 sizeof(worker_thread_args));
00299 args->pool = thread_pool;
00300 args->sock = sock;
00301
00302 rv = apr_thread_create(&thd,
00303 thd_attr,
00304 worker_thread_main,
00305 args,
00306 thread_pool);
00307 if (rv != APR_SUCCESS) return BS_ERROR;
00308
00309 return BS_OK;
00310 }
00311
00312
00313 bs_status init_threadsystem(apr_pool_t *mp)
00314 {
00315 sockets = new_list(mp);
00316 return init_listen_socket(&listener_socket, mp);
00317 }
00318
00319
00323 static void remove_oldest_thread(void)
00324 {
00325 unsigned i;
00326 apr_time_t oldest = apr_time_now();
00327 apr_socket_t *socket_to_kill = NULL;
00328 apr_pool_t *pool_to_destroy = NULL;
00329
00330 apr_sleep(10000);
00331
00332
00333 for (i = 0; i < list_size(sockets); i++) {
00334 worker_info *wi = (worker_info *) list_index(sockets, i);
00335 if (wi->ts < oldest) {
00336 oldest = wi->ts;
00337 socket_to_kill = wi->sock;
00338 pool_to_destroy = wi->pool;
00339 }
00340 }
00341
00342 assert(socket_to_kill && pool_to_destroy);
00343 fprintf(stderr, "Had to forcefully kill worker thread.\n");
00344 remove_from_socket_list(socket_to_kill);
00345 apr_pool_destroy(pool_to_destroy);
00346 }
00347
00348
00358 void lets_go_threaded(void)
00359 {
00360 apr_pool_t *mp;
00361 apr_status_t rv;
00362 struct timeval t;
00363
00364 rv = apr_pool_create(&mp, global_mp);
00365 if (rv != APR_SUCCESS) return;
00366
00367
00368 while (1) {
00369 bs_status rv2;
00370 apr_socket_t *ns;
00371
00372
00373 lock_thread_mux();
00374 if (list_size(sockets) >= BIOSPHERED_MAX_CONNECTIONS)
00375 remove_oldest_thread();
00376 unlock_thread_mux();
00377
00378
00379
00380
00381 rv = apr_socket_accept(&ns, listener_socket, mp);
00382 if (rv != APR_SUCCESS) break;
00383 apr_socket_opt_set(ns, APR_SO_NONBLOCK, 0);
00384 apr_socket_timeout_set(ns, -1);
00385
00386 gettimeofday(&t, 0);
00387 printf("%ld.%06ld,", t.tv_sec, t.tv_usec);
00388
00389
00390 rv2 = setup_thread(ns);
00391 if (rv2 != BS_OK) return;
00392 }
00393 }
00394
00395
00399 typedef struct shutdown_info {
00400 bs_uint32 wait_time;
00401 apr_pool_t *pool;
00402 } shutdown_info;
00403
00404
00414 static void* APR_THREAD_FUNC shutdown_thread_main(apr_thread_t *thd,
00415 void *sinfo)
00416 {
00417 unsigned i;
00418 int exit_code = BS_OK;
00419 shutdown_info *si = sinfo;
00420
00421
00422 printf("Shutdown process started, waiting %d microseconds\n", si->wait_time);
00423 apr_sleep(si->wait_time);
00424 apr_socket_close(listener_socket);
00425
00426
00427 printf("Allowing remaining threads 10 seconds to finish\n");
00428 apr_sleep(10*1000*1000);
00429 printf("Killing remaining worker threads\n");
00430
00431 lock_thread_mux();
00432 for (i = 0; i < list_size(sockets); i++) {
00433 apr_socket_t *sock = list_index(sockets, i);
00434 apr_socket_close(sock);
00435 exit_code += BS_ERROR;
00436 }
00437 unlock_thread_mux();
00438
00439 exit_code += unload_all_modules();
00440
00441 if (i > 0)
00442 fprintf(stderr, "There were %d remaining zombie worker threads to kill.\n", i);
00443
00444 apr_pool_destroy(si->pool);
00445 exit(exit_code);
00446 }
00447
00448
00449 bs_status stop_daemon(bs_uint32 num_usecs)
00450 {
00451 apr_status_t rv;
00452 apr_thread_t *thd;
00453 apr_threadattr_t *thd_attr;
00454 apr_pool_t *thread_pool;
00455 shutdown_info *sinfo;
00456
00457
00458 rv = apr_pool_create(&thread_pool, NULL);
00459 if (rv != APR_SUCCESS) return BS_ERROR;
00460 apr_threadattr_create(&thd_attr, thread_pool);
00461 if (rv != APR_SUCCESS) return BS_ERROR;
00462
00463 sinfo = (shutdown_info *) apr_palloc(thread_pool, sizeof(shutdown_info));
00464 sinfo->wait_time = num_usecs;
00465 sinfo->pool = thread_pool;
00466
00467 rv = apr_thread_create(&thd,
00468 thd_attr,
00469 shutdown_thread_main,
00470 sinfo,
00471 thread_pool);
00472 if (rv != APR_SUCCESS) return BS_ERROR;
00473 return BS_OK;
00474 }
00475