00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include "node.h"
00036 #include "mutex.h"
00037 #include <http_common.h>
00038 #include <list.h>
00039 #include <str.h>
00040 #include <type.h>
00041
00042 #include <apr_network_io.h>
00043 #include <apr_strings.h>
00044 #include <stdio.h>
00045 #include <assert.h>
00046
00047 extern apr_pool_t *global_mp;
00048
00049 static bs_list *nodes = NULL;
00050
00051
00057 bs_status init_node(apr_pool_t *mp)
00058 {
00059 nodes = new_list(mp);
00060 return BS_OK;
00061 }
00062
00063
00071 static bs_bool node_already_known(const char *address, bs_uint16 port)
00072 {
00073 unsigned i;
00074 assert(nodes && address);
00075
00076 lock_node_mux();
00077 for (i = 0; i < list_size(nodes); i++) {
00078 node_info *ni = list_index(nodes, i);
00079 node_address *na = ni->address;
00080 if (streq(na->address, address) && na->port == port) {
00081 unlock_node_mux();
00082 return TRUE;
00083 }
00084 }
00085 unlock_node_mux();
00086 return FALSE;
00087 }
00088
00089
00090 bs_status add_node(const char *address, bs_uint16 port)
00091 {
00092 node_address *na;
00093 node_info *ni;
00094 unsigned i;
00095 bs_bool added = FALSE;
00096 bs_status rv;
00097 bs_definition *def;
00098
00099
00100 if (node_already_known(address, port)) return BS_ERROR;
00101
00102
00103 lock_pool_mux();
00104 rv = connect_and_get(&def, address, port, global_mp);
00105 if (rv != BS_OK) return rv;
00106
00107
00108 na = (node_address *) apr_palloc(global_mp, sizeof(node_address));
00109 na->address = apr_pstrdup(global_mp, address);
00110 na->port = port;
00111 ni = (node_info *) apr_pcalloc(global_mp, sizeof(node_info));
00112 ni->address = na;
00113 ni->services = def;
00114 ni->last_seen = apr_time_now();
00115 ni->ping = ping_node(address, port, global_mp);
00116 unlock_pool_mux();
00117 if (ni->ping == 0xffffffff) {
00118 fprintf(stderr, "Unable to contact node: %s:%d\n", address,
00119 port);
00120 return BS_ERROR;
00121 }
00122
00123
00124 lock_node_mux();
00125 for (i = 0; i < list_size(nodes); i++) {
00126 node_info *tmp = (node_info *) list_index(nodes, i);
00127 if (tmp->ping > ni->ping) {
00128 list_insert(nodes, i, ni);
00129 added = TRUE;
00130 }
00131 }
00132 if (!added) list_append(nodes, ni);
00133 unlock_node_mux();
00134 return BS_OK;
00135 }
00136
00137
00138 bs_status remove_node(const char *address, bs_uint16 port)
00139 {
00140 unsigned i;
00141
00142 lock_node_mux();
00143 for (i = 0; i < list_size(nodes); i++) {
00144 node_info *ni = list_index(nodes, i);
00145 node_address *na = ni->address;
00146 if (streq(na->address, address) && na->port == port) {
00147 list_remove(nodes, i);
00148 break;
00149 }
00150 }
00151 unlock_node_mux();
00152 return BS_OK;
00153 }
00154
00155
00156 bs_uint32 ping_node(const char *address, bs_uint16 port,
00157 apr_pool_t *mp)
00158 {
00159 apr_time_t start, end;
00160 apr_socket_t *sock;
00161 apr_status_t rv;
00162 apr_sockaddr_t *sa;
00163
00164 rv = apr_sockaddr_info_get(&sa, address, APR_INET, port, 0, mp);
00165 if (rv != APR_SUCCESS) return 0xffffffff;
00166 rv = apr_socket_create(&sock, sa->family, SOCK_STREAM,
00167 APR_PROTO_TCP, mp);
00168 if (rv != APR_SUCCESS) return 0xffffffff;
00169
00170 start = apr_time_now();
00171 rv = apr_socket_connect(sock, sa);
00172 if (rv != APR_SUCCESS) return 0xffffffff;
00173 end = apr_time_now();
00174
00175 apr_socket_close(sock);
00176 return (bs_uint32) (end - start);
00177 }
00178
00179
00180 node_info *get_servicing_node(const bs_service_request *req)
00181 {
00182 unsigned i;
00183
00184
00185 lock_node_mux();
00186 for (i = 0; i < list_size(nodes); i++) {
00187 node_info *ni = (node_info *) list_index(nodes, i);
00188 if (request_corresponds_with_definition(req, ni->services)) {
00189
00190 unlock_node_mux();
00191 return ni;
00192 }
00193 }
00194
00195
00196 unlock_node_mux();
00197 return NULL;
00198 }