Mercurial > hg > index.fcgi > gift-gnutella > gift-gnutella-0.0.11-1pba
diff src/gt_query_route.c @ 0:d39e1d0d75b6
initial add
author | paulo@hit-nxdomain.opendns.com |
---|---|
date | Sat, 20 Feb 2010 21:18:28 -0800 |
parents | |
children |
line diff
1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/src/gt_query_route.c Sat Feb 20 21:18:28 2010 -0800 1.3 @@ -0,0 +1,1255 @@ 1.4 +/* 1.5 + * $Id: gt_query_route.c,v 1.46 2004/04/05 07:56:54 hipnod Exp $ 1.6 + * 1.7 + * Copyright (C) 2001-2003 giFT project (gift.sourceforge.net) 1.8 + * 1.9 + * This program is free software; you can redistribute it and/or modify it 1.10 + * under the terms of the GNU General Public License as published by the 1.11 + * Free Software Foundation; either version 2, or (at your option) any 1.12 + * later version. 1.13 + * 1.14 + * This program is distributed in the hope that it will be useful, but 1.15 + * WITHOUT ANY WARRANTY; without even the implied warranty of 1.16 + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 1.17 + * General Public License for more details. 1.18 + */ 1.19 + 1.20 +#include "gt_gnutella.h" 1.21 + 1.22 +#include "gt_query_route.h" 1.23 +#include "gt_packet.h" 1.24 +#include "gt_utils.h" 1.25 + 1.26 +#include "gt_node.h" 1.27 +#include "gt_node_list.h" 1.28 + 1.29 +#include <libgift/stopwatch.h> 1.30 + 1.31 +#include <zlib.h> 1.32 + 1.33 +/*****************************************************************************/ 1.34 + 1.35 +/* 1.36 + * TODO: 1.37 + * - compact table to bit-level representation 1.38 + * - support incremental updates of table 1.39 + * - cut-off entries when MAX_FILL_RATIO is reached 1.40 + */ 1.41 +#define MIN_TABLE_BITS (16) /* 16 bits */ 1.42 +#define MAX_TABLE_BITS (21) /* 21 bits */ 1.43 +#define MIN_TABLE_SIZE (1UL << (MIN_TABLE_BITS - 1)) /* 32k bytes */ 1.44 +#define MAX_TABLE_SIZE (1UL << (MAX_TABLE_BITS - 1)) /* 1M bytes */ 1.45 +#define MIN_TABLE_SLOTS (MIN_TABLE_SIZE * 2) /* 64k slots */ 1.46 +#define MAX_TABLE_SLOTS (MAX_TABLE_SIZE * 2) /* 2M slots */ 1.47 +#define INC_FILL_RATIO (0.70) /* 0.7% */ 1.48 +#define MAX_FILL_RATIO (1.00) /* 1% (unused)*/ 1.49 + 1.50 +/* 1.51 + * magical constant necessary for the query routing hash function 1.52 + */ 1.53 +#define MULTIPLIER 0x4F1BBCDC 1.54 + 1.55 +/* 1.56 + * How often to synchronize the QRT with ultrapeers. 1.57 + * 1.58 + * This is very big because we don't support incremental updating 1.59 + * yet. 1.60 + */ 1.61 +#define QRT_UPDATE_INTERVAL (20 * MINUTES) 1.62 + 1.63 +/* 1.64 + * How often we check to build a compressed patch of local shares. 1.65 + */ 1.66 +#define QRT_BUILD_INTERVAL (3 * SECONDS) 1.67 + 1.68 +/* 1.69 + * How long to wait after the first query_route_table_submit() before 1.70 + * actually submitting the table. 1.71 + */ 1.72 +#define QRT_SUBMIT_DELAY (1 * MINUTES) 1.73 + 1.74 +/* 1.75 + * Largest hops value in table. It looks like Limewire hardcodes 1.76 + * this as 7, and won't understand any other value. 1.77 + */ 1.78 +#define INFINITY 7 1.79 + 1.80 +/* 1.81 + * Constants for changing route table. 1.82 + */ 1.83 +#define QRT_KEYWORD_ADD (0x0a) /* -6 */ 1.84 +#define QRT_KEYWORD_REMOVE (0x06) /* 6 */ 1.85 + 1.86 +/* 1.87 + * The minimum length of a keyword 1.88 + */ 1.89 +#define QRP_MIN_LENGTH 3 1.90 + 1.91 +/* 1.92 + * Maximum patch fragment size to send 1.93 + */ 1.94 +#define PATCH_FRAGSIZE 2048 1.95 + 1.96 +/* 1.97 + * Number of bits in the patches we send 1.98 + */ 1.99 +#define PATCH_BITS 4 1.100 + 1.101 +/* 1.102 + * Holds a 32-bit index describing each token this node shares. 1.103 + */ 1.104 +struct qrp_route_entry 1.105 +{ 1.106 + int ref; /* number of references to this index */ 1.107 + uint32_t index; /* the actual position */ 1.108 +}; 1.109 + 1.110 +struct qrp_route_table 1.111 +{ 1.112 + uint8_t *table; 1.113 + size_t bits; 1.114 + size_t size; 1.115 + size_t slots; 1.116 + size_t present; 1.117 + size_t shared; 1.118 +}; 1.119 + 1.120 +/*****************************************************************************/ 1.121 + 1.122 +/* 1.123 + * The set of indices that are currently marked "present" in the 1.124 + * routing table. 1.125 + */ 1.126 +static Dataset *indices; 1.127 + 1.128 +/* 1.129 + * This holds the compressed table as a full QRP patch (incremental 1.130 + * updates not supported yet). 1.131 + */ 1.132 +static uint8_t *compressed_table; 1.133 +static size_t compressed_slots; 1.134 +static size_t compressed_size; 1.135 + 1.136 +/* 1.137 + * Keeps track of how many times the compressed table has changed. 1.138 + * Used to avoid sending updates when not necessary. 1.139 + */ 1.140 +static int compressed_version; 1.141 + 1.142 +/* 1.143 + * Timer that builds the compressed patch for the table submitted 1.144 + * to peers. 1.145 + */ 1.146 +static timer_id build_timer; 1.147 + 1.148 +/* 1.149 + * Whether we have to rebuild the table when shares are done 1.150 + * syncing. 1.151 + */ 1.152 +static BOOL table_changed; 1.153 + 1.154 +/* 1.155 + * The full query-routing table in binary form. This will get 1.156 + * compressed before transmission. 1.157 + */ 1.158 +static struct qrp_route_table *route_table; 1.159 + 1.160 +/*****************************************************************************/ 1.161 + 1.162 +/* hash function used for query-routing */ 1.163 +uint32_t gt_query_router_hash_str (char *str, size_t bits) 1.164 +{ 1.165 + uint32_t hash; 1.166 + unsigned int i; 1.167 + unsigned char c; 1.168 + 1.169 + i = hash = 0; 1.170 + 1.171 + while ((c = *str++) && !isspace (c)) 1.172 + { 1.173 + hash ^= tolower (c) << (i * 8); 1.174 + 1.175 + /* using & instead of %, sorry */ 1.176 + i = (i+1) & 0x03; 1.177 + } 1.178 + 1.179 + return (hash * MULTIPLIER) >> (32 - bits); 1.180 +} 1.181 + 1.182 +/*****************************************************************************/ 1.183 + 1.184 +static struct qrp_route_table *qrp_route_table_new (size_t bits) 1.185 +{ 1.186 + struct qrp_route_table *qrt; 1.187 + 1.188 + if (!(qrt = MALLOC (sizeof (struct qrp_route_table)))) 1.189 + return NULL; 1.190 + 1.191 + qrt->bits = bits; 1.192 + qrt->size = (1UL << (bits - 1)); 1.193 + qrt->slots = qrt->size * 2; /* 4-bit entries only */ 1.194 + 1.195 + if (!(qrt->table = MALLOC (qrt->size))) 1.196 + { 1.197 + free (qrt); 1.198 + return NULL; 1.199 + } 1.200 + 1.201 + return qrt; 1.202 +} 1.203 + 1.204 +static void qrp_route_table_free (struct qrp_route_table *qrt) 1.205 +{ 1.206 + if (!qrt) 1.207 + return; 1.208 + 1.209 + free (qrt->table); 1.210 + free (qrt); 1.211 +} 1.212 + 1.213 +static void qrp_route_table_insert (struct qrp_route_table *qrt, uint32_t index) 1.214 +{ 1.215 + uint8_t old_entry; 1.216 + int set_lower; 1.217 + int entry; 1.218 + 1.219 + if (!qrt) 1.220 + return; 1.221 + 1.222 + assert (index < qrt->size * 2); 1.223 + 1.224 + set_lower = index % 2; 1.225 + entry = index / 2; 1.226 + 1.227 + if (set_lower) 1.228 + { 1.229 + old_entry = qrt->table[entry] & 0x0f; 1.230 + qrt->table[entry] = (qrt->table[entry] & 0xf0) | 1.231 + ((QRT_KEYWORD_ADD) & 0x0f); 1.232 + } 1.233 + else 1.234 + { 1.235 + old_entry = (qrt->table[entry] & 0xf0) >> 4; 1.236 + qrt->table[entry] = (qrt->table[entry] & 0x0f) | 1.237 + ((QRT_KEYWORD_ADD << 4) & 0xf0); 1.238 + } 1.239 + 1.240 + assert (old_entry == 0 || old_entry == QRT_KEYWORD_REMOVE); 1.241 +#if 0 1.242 + GT->dbg (GT, "+%u [%d/%d]", index, entry, set_lower); 1.243 +#endif 1.244 + 1.245 + qrt->present++; 1.246 +} 1.247 + 1.248 +#if 0 1.249 +/* untested */ 1.250 +static void qrp_route_table_remove (struct qrp_route_table *qrt, uint32_t index) 1.251 +{ 1.252 + uint8_t old_entry; 1.253 + int clear_lower; 1.254 + int entry; 1.255 + 1.256 + if (!qrt) 1.257 + return; 1.258 + 1.259 + assert (index < qrt->size * 2); 1.260 + 1.261 + clear_lower = index % 2; 1.262 + entry = index / 2; 1.263 + 1.264 + /* 1.265 + * This needs to change when doing incremental updating... 1.266 + */ 1.267 + 1.268 + if (clear_lower) 1.269 + { 1.270 + old_entry = qrt->table[entry] & 0x0f; 1.271 + qrt->table[entry] = (qrt->table[entry] & 0xf0) | 1.272 + ((QRT_KEYWORD_REMOVE) & 0x0f); 1.273 + } 1.274 + else 1.275 + { 1.276 + old_entry = (qrt->table[entry] & 0xf0) >> 4; 1.277 + qrt->table[entry] = (qrt->table[entry] & 0x0f) | 1.278 + ((QRT_KEYWORD_REMOVE << 4) & 0xf0); 1.279 + } 1.280 + 1.281 + assert (old_entry == (uint8_t) QRT_KEYWORD_ADD); 1.282 +#if 0 1.283 + GT->dbg (GT, "-%u [%d/%d]", index, entry, clear_lower); 1.284 +#endif 1.285 + 1.286 + qrt->present--; 1.287 +} 1.288 +#endif 1.289 + 1.290 +static BOOL qrp_route_table_lookup (struct qrp_route_table *qrt, uint32_t index) 1.291 +{ 1.292 + int check_lower; 1.293 + uint32_t entry; 1.294 + 1.295 + if (!qrt) 1.296 + return FALSE; 1.297 + 1.298 + assert (index < qrt->slots); 1.299 + assert (qrt->slots == qrt->size * 2); 1.300 + 1.301 + check_lower = index % 2; 1.302 + entry = index / 2; 1.303 + 1.304 + if (check_lower) 1.305 + { 1.306 + if ((qrt->table[entry] & 0x0f) == QRT_KEYWORD_ADD) 1.307 + return TRUE; 1.308 + } 1.309 + else 1.310 + { 1.311 + if (((qrt->table[entry] & 0xf0) >> 4) == QRT_KEYWORD_ADD) 1.312 + return TRUE; 1.313 + } 1.314 + 1.315 + return FALSE; 1.316 +} 1.317 + 1.318 +static double qrp_route_table_fill_ratio (struct qrp_route_table *qrt) 1.319 +{ 1.320 + return (double)qrt->present * 100 / qrt->slots; 1.321 +} 1.322 + 1.323 +/*****************************************************************************/ 1.324 + 1.325 +static char *zlib_strerror (int error) 1.326 +{ 1.327 + switch (error) 1.328 + { 1.329 + case Z_OK: return "OK"; 1.330 + case Z_STREAM_END: return "End of stream"; 1.331 + case Z_NEED_DICT: return "Decompressing dictionary needed"; 1.332 + case Z_STREAM_ERROR: return "Stream error"; 1.333 + case Z_ERRNO: return "Generic zlib error"; 1.334 + case Z_DATA_ERROR: return "Data error"; 1.335 + case Z_MEM_ERROR: return "Memory error"; 1.336 + case Z_BUF_ERROR: return "Buffer error"; 1.337 + case Z_VERSION_ERROR: return "Incompatible runtime zlib library"; 1.338 + default: break; 1.339 + } 1.340 + 1.341 + return "Invalid zlib error code"; 1.342 +} 1.343 + 1.344 +/* TODO: make this use a stream-like interface */ 1.345 +static uint8_t *compress_table (uint8_t *table, size_t in_size, size_t *out_size) 1.346 +{ 1.347 + z_streamp out; 1.348 + int ret; 1.349 + uint8_t *out_buf; 1.350 + int free_size; 1.351 + 1.352 + *out_size = 0; 1.353 + 1.354 + if (!(out = MALLOC (sizeof(*out)))) 1.355 + return NULL; 1.356 + 1.357 + out->zalloc = Z_NULL; 1.358 + out->zfree = Z_NULL; 1.359 + out->opaque = Z_NULL; 1.360 + 1.361 + if ((ret = deflateInit (out, Z_DEFAULT_COMPRESSION)) != Z_OK) 1.362 + { 1.363 + GT->DBGFN (GT, "deflateInit error: %s", zlib_strerror (ret)); 1.364 + free (out); 1.365 + return NULL; 1.366 + } 1.367 + 1.368 + /* allocate initial buffer */ 1.369 + free_size = in_size + in_size / 100; 1.370 + 1.371 + if (!(out_buf = malloc (free_size))) 1.372 + { 1.373 + free (out_buf); 1.374 + deflateEnd (out); 1.375 + free (out); 1.376 + return NULL; 1.377 + } 1.378 + 1.379 + out->next_in = table; 1.380 + out->avail_in = in_size; 1.381 + out->next_out = out_buf; 1.382 + out->avail_out = free_size; 1.383 + 1.384 + if ((ret = deflate (out, Z_FINISH)) != Z_STREAM_END) 1.385 + { 1.386 + GT->DBGFN (GT, "compression error: %s", zlib_strerror (ret)); 1.387 + free (out_buf); 1.388 + deflateEnd (out); 1.389 + free (out); 1.390 + return NULL; 1.391 + } 1.392 + 1.393 + /* 1.394 + * This could theoretically fail I guess. If it does, we shouldn't keep 1.395 + * the table at least. 1.396 + */ 1.397 + assert (out->avail_in == 0); 1.398 + 1.399 + *out_size = free_size - out->avail_out; 1.400 + 1.401 + deflateEnd (out); 1.402 + free (out); 1.403 + 1.404 + return out_buf; 1.405 +} 1.406 + 1.407 +#if 0 1.408 +/* send the a QRP table to nodes we haven't sent a real table yet */ 1.409 +static GtNode *update_nodes (TCPC *c, GtNode *node, void *udata) 1.410 +{ 1.411 + assert (node->state == GT_NODE_CONNECTED); 1.412 + assert (GT_CONN(node) == c); 1.413 + 1.414 + /* 1.415 + * If the counter is not 0, we sent a table to this node already. 1.416 + * So, wait for the timer to pick that up. 1.417 + */ 1.418 + if (node->query_router_counter != 0) 1.419 + return NULL; 1.420 + 1.421 + /* submit the table */ 1.422 + query_route_table_submit (c); 1.423 + 1.424 + /* reset the submit timer */ 1.425 + if (GT_NODE(c)->query_route_timer != 0) 1.426 + timer_reset (GT_NODE(c)->query_route_timer); 1.427 + 1.428 + return NULL; 1.429 +} 1.430 +#endif 1.431 + 1.432 +static void add_index (ds_data_t *key, ds_data_t *value, 1.433 + struct qrp_route_table *qrt) 1.434 +{ 1.435 + struct qrp_route_entry *entry = value->data; 1.436 + uint32_t slot; 1.437 + 1.438 + /* grab only the most significant bits of the entry */ 1.439 + slot = entry->index >> (32 - qrt->bits); 1.440 + 1.441 + /* 1.442 + * If the entry already exists in the table, bump shared entries and 1.443 + * forget about this entry. 1.444 + */ 1.445 + if (qrp_route_table_lookup (qrt, slot)) 1.446 + { 1.447 + qrt->shared++; 1.448 + return; 1.449 + } 1.450 + 1.451 + qrp_route_table_insert (qrt, slot); 1.452 +} 1.453 + 1.454 +static void build_uncompressed (struct qrp_route_table *qrt) 1.455 +{ 1.456 + dataset_foreach (indices, DS_FOREACH(add_index), qrt); 1.457 +} 1.458 + 1.459 +static int build_qrp_table (void *udata) 1.460 +{ 1.461 + uint8_t *new_table; 1.462 + size_t new_size; 1.463 + StopWatch *sw; 1.464 + double elapsed; 1.465 + double fill_ratio; 1.466 + 1.467 + if (!route_table && !(route_table = qrp_route_table_new (MIN_TABLE_BITS))) 1.468 + { 1.469 + /* try again later */ 1.470 + return TRUE; 1.471 + } 1.472 + 1.473 + sw = stopwatch_new (TRUE); 1.474 + 1.475 + /* build a table from the indices */ 1.476 + build_uncompressed (route_table); 1.477 + 1.478 + stopwatch_stop (sw); 1.479 + 1.480 + elapsed = stopwatch_elapsed (sw, NULL); 1.481 + 1.482 + fill_ratio = qrp_route_table_fill_ratio (route_table); 1.483 + 1.484 + GT->DBGFN (GT, "%.4lfs elapsed building", elapsed); 1.485 + GT->DBGFN (GT, "present=%u shared=%u size=%u", route_table->present, 1.486 + route_table->shared, route_table->size); 1.487 + GT->DBGFN (GT, "fill ratio=%.4lf%%", fill_ratio); 1.488 + 1.489 + /* 1.490 + * If the fill ratio is greater than an acceptable threshold, 1.491 + * and we haven't reached the maximum table size allowed, 1.492 + * rebuild a larger routing table. 1.493 + */ 1.494 + if (fill_ratio >= INC_FILL_RATIO && route_table->bits < MAX_TABLE_BITS) 1.495 + { 1.496 + struct qrp_route_table *new_table; 1.497 + 1.498 + /* 1.499 + * If we don't have enough memory to build the new table, fall 1.500 + * through and compress the existing table. This would only happen 1.501 + * if this node has a very small amount of memory. 1.502 + */ 1.503 + if ((new_table = qrp_route_table_new (route_table->bits + 1))) 1.504 + { 1.505 + qrp_route_table_free (route_table); 1.506 + route_table = new_table; 1.507 + 1.508 + /* retry the build later, it's kinda expensive */ 1.509 + stopwatch_free (sw); 1.510 + return TRUE; 1.511 + } 1.512 + } 1.513 + 1.514 + stopwatch_start (sw); 1.515 + 1.516 + /* compress a new table */ 1.517 + new_table = compress_table (route_table->table, 1.518 + route_table->size, 1.519 + &new_size); 1.520 + 1.521 + elapsed = stopwatch_free_elapsed (sw); 1.522 + 1.523 + GT->DBGFN (GT, "%.4lfs elapsed compressing", elapsed); 1.524 + GT->DBGFN (GT, "compressed size=%lu", new_size); 1.525 + 1.526 + if (!new_table) 1.527 + return TRUE; 1.528 + 1.529 + assert (new_size > 0); 1.530 + 1.531 + /* 1.532 + * Replace the old compressed table 1.533 + */ 1.534 + free (compressed_table); 1.535 + 1.536 + compressed_table = new_table; 1.537 + compressed_size = new_size; 1.538 + compressed_slots = route_table->slots; 1.539 + 1.540 + compressed_version++; 1.541 + 1.542 + if (!compressed_version) 1.543 + compressed_version++; 1.544 + 1.545 + /* 1.546 + * An optimization to reduce memory usage: realloc the 1.547 + * compressed table to the smaller size. 1.548 + */ 1.549 + if ((new_table = realloc (new_table, new_size))) 1.550 + compressed_table = new_table; 1.551 + 1.552 +#if 0 1.553 + /* update nodes with this table */ 1.554 + gt_conn_foreach (GT_CONN_FOREACH(update_nodes), NULL, 1.555 + GT_NODE_ULTRA, GT_NODE_CONNECTED, 0); 1.556 +#endif 1.557 + 1.558 + /* 1.559 + * Temporary optimization: we can free the uncompressed 1.560 + * route table now, as it is unused. This is a dubious optimization 1.561 + * though because the table will probably hang around in 1.562 + * the future when incremental updating works. 1.563 + */ 1.564 + qrp_route_table_free (route_table); 1.565 + route_table = NULL; 1.566 + 1.567 + /* remove the timer, as the table is now up to date */ 1.568 + build_timer = 0; 1.569 + return FALSE; 1.570 +} 1.571 + 1.572 +static int start_build (void *udata) 1.573 +{ 1.574 + build_timer = timer_add (QRT_BUILD_INTERVAL, 1.575 + (TimerCallback)build_qrp_table, NULL); 1.576 + return FALSE; 1.577 +} 1.578 + 1.579 +static void start_build_timer (void) 1.580 +{ 1.581 + if (build_timer) 1.582 + return; 1.583 + 1.584 + /* 1.585 + * If we don't have a compressed table, we haven't built 1.586 + * the table before, so build it soon. Otherwise, 1.587 + * we won't submit it for a while anyway, so build it 1.588 + * at half the update interval. 1.589 + */ 1.590 + if (compressed_table) 1.591 + { 1.592 + build_timer = timer_add (QRT_UPDATE_INTERVAL / 2, 1.593 + (TimerCallback)start_build, NULL); 1.594 + return; 1.595 + } 1.596 + 1.597 + start_build (NULL); 1.598 +} 1.599 + 1.600 +/*****************************************************************************/ 1.601 + 1.602 +/* TODO: this should be moved to GT_SELF */ 1.603 +uint8_t *gt_query_router_self (size_t *size, int *version) 1.604 +{ 1.605 + if (!compressed_table) 1.606 + return NULL; 1.607 + 1.608 + assert (size != NULL && version != NULL); 1.609 + 1.610 + *size = compressed_size; 1.611 + *version = compressed_version; 1.612 + 1.613 + return compressed_table; 1.614 +} 1.615 + 1.616 +static int free_entries (ds_data_t *key, ds_data_t *value, void *udata) 1.617 +{ 1.618 + struct qrp_route_entry *entry = value->data; 1.619 + 1.620 + free (entry); 1.621 + 1.622 + return DS_CONTINUE | DS_REMOVE; 1.623 +} 1.624 + 1.625 +void gt_query_router_self_destroy (void) 1.626 +{ 1.627 + timer_remove_zero (&build_timer); 1.628 + 1.629 + qrp_route_table_free (route_table); 1.630 + route_table = NULL; 1.631 + 1.632 + free (compressed_table); 1.633 + compressed_table = NULL; 1.634 + compressed_slots = 0; 1.635 + compressed_size = 0; 1.636 + compressed_version = 0; 1.637 + 1.638 + dataset_foreach_ex (indices, DS_FOREACH_EX(free_entries), NULL); 1.639 + dataset_clear (indices); 1.640 + indices = NULL; 1.641 +} 1.642 + 1.643 +void gt_query_router_self_init (void) 1.644 +{ 1.645 + indices = dataset_new (DATASET_HASH); 1.646 +} 1.647 + 1.648 +/*****************************************************************************/ 1.649 + 1.650 +static uint32_t *append_token (uint32_t *tokens, size_t *len, 1.651 + size_t pos, uint32_t tok) 1.652 +{ 1.653 + if (pos >= *len) 1.654 + { 1.655 + uint32_t *new_tokens; 1.656 + 1.657 + *(len) += 8; 1.658 + new_tokens = realloc (tokens, *len * sizeof (uint32_t)); 1.659 + 1.660 + assert (new_tokens != NULL); 1.661 + tokens = new_tokens; 1.662 + } 1.663 + 1.664 + tokens[pos] = tok; 1.665 + return tokens; 1.666 +} 1.667 + 1.668 +static uint32_t *tokenize (char *hpath, size_t *r_len) 1.669 +{ 1.670 + uint32_t *tokens; 1.671 + int count; 1.672 + size_t len; 1.673 + char *str, *str0; 1.674 + char *next; 1.675 + 1.676 + if (!(str0 = str = STRDUP (hpath))) 1.677 + return NULL; 1.678 + 1.679 + tokens = NULL; 1.680 + len = 0; 1.681 + count = 0; 1.682 + 1.683 + while ((next = string_sep_set (&str, QRP_DELIMITERS)) != NULL) 1.684 + { 1.685 + uint32_t tok; 1.686 + 1.687 + if (string_isempty (next)) 1.688 + continue; 1.689 + 1.690 + /* don't add keywords that are too small */ 1.691 + if (strlen (next) < QRP_MIN_LENGTH) 1.692 + continue; 1.693 + 1.694 + tok = gt_query_router_hash_str (next, 32); 1.695 + tokens = append_token (tokens, &len, count++, tok); 1.696 + } 1.697 + 1.698 + *r_len = count; 1.699 + 1.700 + free (str0); 1.701 + 1.702 + return tokens; 1.703 +} 1.704 + 1.705 +void gt_query_router_self_add (FileShare *file) 1.706 +{ 1.707 + uint32_t *tokens, *tokens0; 1.708 + uint32_t tok; 1.709 + size_t len; 1.710 + int i; 1.711 + struct qrp_route_entry *entry; 1.712 + 1.713 + tokens0 = tokens = tokenize (share_get_hpath (file), &len); 1.714 + 1.715 + assert (tokens != NULL); 1.716 + assert (len > 0); 1.717 + 1.718 + for (i = 0; i < len; i++) 1.719 + { 1.720 + tok = tokens[i]; 1.721 + 1.722 + if ((entry = dataset_lookup (indices, &tok, sizeof (tok)))) 1.723 + { 1.724 + entry->ref++; 1.725 + continue; 1.726 + } 1.727 + 1.728 + /* 1.729 + * Create a new index and add it to the table. 1.730 + */ 1.731 + if (!(entry = malloc (sizeof (struct qrp_route_entry)))) 1.732 + continue; 1.733 + 1.734 + entry->ref = 1; 1.735 + entry->index = tok; 1.736 + 1.737 + dataset_insert (&indices, &tok, sizeof (tok), entry, 0); 1.738 + 1.739 + table_changed = TRUE; 1.740 + } 1.741 + 1.742 + free (tokens0); 1.743 +} 1.744 + 1.745 +void gt_query_router_self_remove (FileShare *file) 1.746 +{ 1.747 + uint32_t *tokens, *tokens0; 1.748 + uint32_t tok; 1.749 + size_t len; 1.750 + int i; 1.751 + struct qrp_route_entry *entry; 1.752 + 1.753 + tokens0 = tokens = tokenize (share_get_hpath (file), &len); 1.754 + 1.755 + assert (tokens != NULL); 1.756 + assert (len > 0); 1.757 + 1.758 + for (i = 0; i < len; i++) 1.759 + { 1.760 + tok = tokens[i]; 1.761 + 1.762 + entry = dataset_lookup (indices, &tok, sizeof (tok)); 1.763 + assert (entry != NULL); 1.764 + 1.765 + if (--entry->ref > 0) 1.766 + continue; 1.767 + 1.768 + dataset_remove (indices, &tok, sizeof (tok)); 1.769 + 1.770 + table_changed = TRUE; 1.771 + } 1.772 + 1.773 + free (tokens0); 1.774 +} 1.775 + 1.776 +void gt_query_router_self_sync (BOOL begin) 1.777 +{ 1.778 + if (!begin && table_changed) 1.779 + { 1.780 + start_build_timer(); 1.781 + table_changed = FALSE; 1.782 + } 1.783 +} 1.784 + 1.785 +/*****************************************************************************/ 1.786 + 1.787 +int query_patch_open (GtQueryRouter *router, int seq_size, int compressed, 1.788 + size_t max_size) 1.789 +{ 1.790 + GtQueryPatch *new_patch; 1.791 + 1.792 + if (!(new_patch = malloc (sizeof (GtQueryPatch)))) 1.793 + return FALSE; 1.794 + 1.795 + memset (new_patch, 0, sizeof (GtQueryPatch)); 1.796 + 1.797 + if (!(new_patch->stream = zlib_stream_open (max_size))) 1.798 + { 1.799 + free (new_patch); 1.800 + return FALSE; 1.801 + } 1.802 + 1.803 + new_patch->seq_size = seq_size; 1.804 + new_patch->compressed = compressed; 1.805 + 1.806 + /* NOTE: sequence is 1-based, not 0-based */ 1.807 + new_patch->seq_num = 1; 1.808 + 1.809 + router->patch = new_patch; 1.810 + 1.811 + return TRUE; 1.812 +} 1.813 + 1.814 +void query_patch_close (GtQueryRouter *router) 1.815 +{ 1.816 + GtQueryPatch *patch; 1.817 + 1.818 + GT->DBGFN (GT, "entered"); 1.819 + 1.820 + if (!router) 1.821 + return; 1.822 + 1.823 + patch = router->patch; 1.824 + 1.825 + if (!patch) 1.826 + return; 1.827 + 1.828 + zlib_stream_close (patch->stream); 1.829 + 1.830 + router->patch = NULL; 1.831 + free (patch); 1.832 +} 1.833 + 1.834 +/* TODO: compact router tables to bit-level */ 1.835 +static void query_patch_apply (GtQueryRouter *router, int bits, char *data, 1.836 + size_t data_size) 1.837 +{ 1.838 + GtQueryPatch *patch; 1.839 + char *table; /* NOTE: this must be signed */ 1.840 + int i; 1.841 + 1.842 + patch = router->patch; 1.843 + assert (patch != NULL); 1.844 + 1.845 + /* check for overflow: this may look wrong but its not */ 1.846 + if (patch->table_pos + (data_size - 1) >= router->size) 1.847 + { 1.848 + GT->DBGFN (GT, "patch overflow: %u (max of %u)", 1.849 + patch->table_pos+data_size, router->size); 1.850 + query_patch_close (router); 1.851 + return; 1.852 + } 1.853 + 1.854 + table = router->table; 1.855 + 1.856 + /* hrm */ 1.857 + if (bits == 4) 1.858 + { 1.859 + int j; 1.860 + 1.861 + for (i = 0; i < data_size; i++) 1.862 + { 1.863 + int pos; 1.864 + char change; 1.865 + 1.866 + pos = i + patch->table_pos; 1.867 + 1.868 + /* avoid % */ 1.869 + j = (i+1) & 0x1; 1.870 + 1.871 + /* grab the correct half of the byte and sign-extend it 1.872 + * NOTE: this starts off with the most significant bits! */ 1.873 + change = data[i] & (0x0f << (4 * j)); 1.874 + 1.875 + /* move to least significant bits 1.876 + * TODO: does this do sign-extension correctly? */ 1.877 + change >>= 4; 1.878 + 1.879 + table[pos] += change; 1.880 + } 1.881 + } 1.882 + else if (bits == 8) 1.883 + { 1.884 + /* untested */ 1.885 + for (i = 0; i < data_size; i++) 1.886 + { 1.887 + table[i + patch->table_pos] += data[i]; 1.888 + } 1.889 + } 1.890 + else 1.891 + { 1.892 + GT->DBGFN (GT, "undefined bits value in query patch: %u", bits); 1.893 + query_patch_close (router); 1.894 + return; 1.895 + } 1.896 + 1.897 + /* store the table position for the next patch */ 1.898 + patch->table_pos += i; 1.899 + 1.900 + /* cleanup the data if the patch is done */ 1.901 + patch->seq_num++; 1.902 + 1.903 + if (patch->seq_num > patch->seq_size) 1.904 + query_patch_close (router); 1.905 +} 1.906 + 1.907 +/*****************************************************************************/ 1.908 + 1.909 +/* TODO: compact router tables to bit-level */ 1.910 +GtQueryRouter *gt_query_router_new (size_t size, int infinity) 1.911 +{ 1.912 + GtQueryRouter *router; 1.913 + 1.914 + if (size > MAX_TABLE_SIZE) 1.915 + return NULL; 1.916 + 1.917 + if (!(router = malloc (sizeof (GtQueryRouter)))) 1.918 + return NULL; 1.919 + 1.920 + memset (router, 0, sizeof (GtQueryRouter)); 1.921 + 1.922 + if (!(router->table = malloc (size))) 1.923 + { 1.924 + free (router->table); 1.925 + return NULL; 1.926 + } 1.927 + 1.928 + memset (router->table, infinity, size); 1.929 + 1.930 + router->size = size; 1.931 + 1.932 + return router; 1.933 +} 1.934 + 1.935 +void gt_query_router_free (GtQueryRouter *router) 1.936 +{ 1.937 + if (!router) 1.938 + return; 1.939 + 1.940 + query_patch_close (router); 1.941 + 1.942 + free (router->table); 1.943 + free (router); 1.944 +} 1.945 + 1.946 +/*****************************************************************************/ 1.947 + 1.948 +static void print_hex (unsigned char *data, size_t size) 1.949 +{ 1.950 + fprint_hex (stdout, data, size); 1.951 +} 1.952 + 1.953 +void gt_query_router_update (GtQueryRouter *router, size_t seq_num, 1.954 + size_t seq_size, int compressed, int bits, 1.955 + unsigned char *zdata, size_t size) 1.956 +{ 1.957 + GtQueryPatch *patch; 1.958 + char *data; 1.959 + 1.960 + if (!router) 1.961 + { 1.962 + GT->DBGFN (GT, "null router"); 1.963 + return; 1.964 + } 1.965 + 1.966 + if (!router->patch) 1.967 + { 1.968 + if (!query_patch_open (router, seq_size, compressed, router->size)) 1.969 + return; 1.970 + } 1.971 + 1.972 + patch = router->patch; 1.973 + 1.974 + /* check for an invalid sequence number or size */ 1.975 + if (patch->seq_size != seq_size || patch->seq_num != seq_num) 1.976 + { 1.977 + GT->DBGFN (GT, "bad patch: seq_size %u vs %u, seq_num %u vs %u", 1.978 + patch->seq_size, seq_size, patch->seq_num, seq_num); 1.979 + query_patch_close (router); 1.980 + return; 1.981 + } 1.982 + 1.983 + if (compressed != patch->compressed) 1.984 + { 1.985 + GT->DBGFN (GT, "tried to change encodings in patch"); 1.986 + query_patch_close (router); 1.987 + return; 1.988 + } 1.989 + 1.990 + switch (compressed) 1.991 + { 1.992 + case 0x00: /* no compression */ 1.993 + if (!zlib_stream_write (patch->stream, zdata, size)) 1.994 + { 1.995 + GT->DBGFN (GT, "error copying data"); 1.996 + query_patch_close (router); 1.997 + return; 1.998 + } 1.999 + 1.1000 + break; 1.1001 + 1.1002 + case 0x01: /* deflate */ 1.1003 + printf ("zlib compressed data:\n"); 1.1004 + print_hex (zdata, size); 1.1005 + 1.1006 + if (!zlib_stream_inflate (patch->stream, zdata, size)) 1.1007 + { 1.1008 + GT->DBGFN (GT, "error inflating data"); 1.1009 + query_patch_close (router); 1.1010 + return; 1.1011 + } 1.1012 + 1.1013 + break; 1.1014 + 1.1015 + default: 1.1016 + GT->DBGFN (GT, "unknown compression algorithm in query route patch"); 1.1017 + return; 1.1018 + } 1.1019 + 1.1020 + /* read the data in the stream */ 1.1021 + if (!(size = zlib_stream_read (patch->stream, &data))) 1.1022 + { 1.1023 + GT->DBGFN (GT, "error calling zlib_stream_read"); 1.1024 + query_patch_close (router); 1.1025 + return; 1.1026 + } 1.1027 + 1.1028 + printf ("after uncompressing:\n"); 1.1029 + print_hex (data, size); 1.1030 + 1.1031 + /* apply the patch -- this will cleanup any data if necessary */ 1.1032 + query_patch_apply (router, bits, data, size); 1.1033 + 1.1034 + print_hex (router->table, router->size); 1.1035 +} 1.1036 + 1.1037 +/*****************************************************************************/ 1.1038 + 1.1039 +static void submit_empty_table (TCPC *c) 1.1040 +{ 1.1041 + static char table[8] = { 0 }; 1.1042 + int len; 1.1043 + 1.1044 +#if 0 1.1045 + size_t size; 1.1046 +#endif 1.1047 + 1.1048 + GT->DBGFN (GT, "reseting route table for %s", net_ip_str (GT_NODE(c)->ip)); 1.1049 + 1.1050 + /* all slots in the table should be initialized to infinity, so it 1.1051 + * should be "empty" on the remote node */ 1.1052 + memset (table, 0, sizeof (table)); 1.1053 + 1.1054 + 1.1055 +#if 0 1.1056 + /* TEST: set part of the table to -infinity to get queries */ 1.1057 + size = sizeof (table); 1.1058 + memset (table + (size + 1) / 2 - 1, -infinity, (size + 1) / 4); 1.1059 +#endif 1.1060 + 1.1061 + /* format: <query-route-msg-type> <length of table> <infinity> */ 1.1062 + if (gt_packet_send_fmt (c, GT_MSG_QUERY_ROUTE, NULL, 1, 0, 1.1063 + "%c%lu%c", 0, (unsigned long) sizeof (table), 1.1064 + INFINITY) < 0) 1.1065 + { 1.1066 + GT->DBGFN (GT, "error reseting table"); 1.1067 + return; 1.1068 + } 1.1069 + 1.1070 + len = sizeof (table); 1.1071 + 1.1072 + if (gt_packet_send_fmt (c, GT_MSG_QUERY_ROUTE, NULL, 1, 0, 1.1073 + "%c%c%c%c%c%*p", 1.1074 + 1, 1, 1, 0, 8, len, table) < 0) 1.1075 + { 1.1076 + GT->DBGFN (GT, "error sending empty patch"); 1.1077 + return; 1.1078 + } 1.1079 +} 1.1080 + 1.1081 +static void submit_table (TCPC *c, uint8_t *table, size_t size, size_t slots) 1.1082 +{ 1.1083 + int infinity = INFINITY; 1.1084 + int seq_size; 1.1085 + int compressed; 1.1086 + int seq_num; 1.1087 + uint8_t *p; 1.1088 + size_t send_size; 1.1089 + 1.1090 + /* XXX make table size settable at runtime */ 1.1091 + 1.1092 + /* send a reset table first */ 1.1093 + if (gt_packet_send_fmt (c, GT_MSG_QUERY_ROUTE, NULL, 1, 0, 1.1094 + "%c%lu%c", 0, (long)slots, infinity) < 0) 1.1095 + { 1.1096 + GT->DBGFN (GT, "error reseting table"); 1.1097 + return; 1.1098 + } 1.1099 + 1.1100 + /* Break the table into PATCH_FRAGSIZE-sized chunks, 1.1101 + * and include any leftover portions. */ 1.1102 + seq_size = size / PATCH_FRAGSIZE + 1.1103 + (size % PATCH_FRAGSIZE == 0 ? 0 : 1); 1.1104 + 1.1105 + assert (seq_size < 256); 1.1106 +#if 0 1.1107 + GT->dbg (GT, "sequence size: %u", seq_size); 1.1108 +#endif 1.1109 + 1.1110 + p = table; 1.1111 + compressed = TRUE; 1.1112 + 1.1113 + /* NOTE: patch sequence numbers start at 1 */ 1.1114 + for (seq_num = 1; seq_num <= seq_size; seq_num++) 1.1115 + { 1.1116 + send_size = MIN (PATCH_FRAGSIZE, size); 1.1117 + 1.1118 + if (gt_packet_send_fmt (c, GT_MSG_QUERY_ROUTE, NULL, 1, 0, 1.1119 + "%c%c%c%c%c%*p", 1.1120 + /* QRP PATCH */ 1, 1.1121 + seq_num, seq_size, compressed, 1.1122 + /* bits */ PATCH_BITS, 1.1123 + send_size, p) < 0) 1.1124 + { 1.1125 + GT->DBGFN (GT, "error sending QRT patch"); 1.1126 + return; 1.1127 + } 1.1128 + 1.1129 + size -= send_size; 1.1130 + p += send_size; 1.1131 + } 1.1132 +} 1.1133 + 1.1134 +static BOOL update_qr_table (TCPC *c) 1.1135 +{ 1.1136 + size_t size; 1.1137 + int version; 1.1138 + uint8_t *table; 1.1139 + GtNode *node = GT_NODE(c); 1.1140 + 1.1141 + assert (node->state & GT_NODE_CONNECTED); 1.1142 + 1.1143 + table = gt_query_router_self (&size, &version); 1.1144 + 1.1145 + /* we may not have finished building a table yet */ 1.1146 + if (!table) 1.1147 + return TRUE; 1.1148 + 1.1149 + /* dont submit a table if this node is already up to date */ 1.1150 + if (node->query_router_counter == version) 1.1151 + return TRUE; 1.1152 + 1.1153 + /* HACK: this shouldn't be using compressed_slots */ 1.1154 + submit_table (c, table, size, compressed_slots); 1.1155 + 1.1156 + /* store the version number of this table so 1.1157 + * we dont resubmit unecessarily */ 1.1158 + node->query_router_counter = version; 1.1159 + 1.1160 + return TRUE; 1.1161 +} 1.1162 + 1.1163 +static BOOL submit_first_table (TCPC *c) 1.1164 +{ 1.1165 + GtNode *node = GT_NODE(c); 1.1166 + 1.1167 + assert (node->state & GT_NODE_CONNECTED); 1.1168 + 1.1169 + update_qr_table (c); 1.1170 + 1.1171 + /* remove the first timer */ 1.1172 + timer_remove (node->query_route_timer); 1.1173 + 1.1174 + /* set the timer for updating the table repeatedly */ 1.1175 + node->query_route_timer = timer_add (QRT_UPDATE_INTERVAL, 1.1176 + (TimerCallback)update_qr_table, c); 1.1177 + 1.1178 + /* we removed the timer, and must return FALSE */ 1.1179 + return FALSE; 1.1180 +} 1.1181 + 1.1182 +/* 1.1183 + * Submit the query routing table for this node to another. 1.1184 + * 1.1185 + * This delays sending the table for while. This helps preserve our precious 1.1186 + * upstream when we're looking for nodes to connect to, as this often 1.1187 + * happens when we're in the middle of looking for more nodes. 1.1188 + */ 1.1189 +void query_route_table_submit (TCPC *c) 1.1190 +{ 1.1191 + GtNode *node = GT_NODE(c); 1.1192 + 1.1193 + assert (node->query_route_timer == 0); 1.1194 + 1.1195 + /* save bandwidth with an empty table */ 1.1196 + submit_empty_table (c); 1.1197 + 1.1198 + /* submit a real table later */ 1.1199 + node->query_route_timer = timer_add (QRT_SUBMIT_DELAY, 1.1200 + (TimerCallback)submit_first_table, c); 1.1201 +} 1.1202 + 1.1203 +/*****************************************************************************/ 1.1204 +/* TESTING */ 1.1205 + 1.1206 +#if 0 1.1207 +#define CHECK(x) do { \ 1.1208 + if (!(x)) printf("FAILED: %s\n", #x); \ 1.1209 + else printf("OK: %s\n", #x); \ 1.1210 +} while (0) 1.1211 + 1.1212 +#define HASH(str, bits) \ 1.1213 + printf ("hash " str ": %u\n", gt_query_router_hash_str (str, bits)) 1.1214 + 1.1215 +int main (int argc, char **argv) 1.1216 +{ 1.1217 +#define hash gt_query_router_hash_str 1.1218 + 1.1219 + CHECK(hash("", 13)==0); 1.1220 + CHECK(hash("eb", 13)==6791); 1.1221 + CHECK(hash("ebc", 13)==7082); 1.1222 + CHECK(hash("ebck", 13)==6698); 1.1223 + CHECK(hash("ebckl", 13)==3179); 1.1224 + CHECK(hash("ebcklm", 13)==3235); 1.1225 + CHECK(hash("ebcklme", 13)==6438); 1.1226 + CHECK(hash("ebcklmen", 13)==1062); 1.1227 + CHECK(hash("ebcklmenq", 13)==3527); 1.1228 + CHECK(hash("", 16)==0); 1.1229 + CHECK(hash("n", 16)==65003); 1.1230 + CHECK(hash("nd", 16)==54193); 1.1231 + CHECK(hash("ndf", 16)==4953); 1.1232 + CHECK(hash("ndfl", 16)==58201); 1.1233 + CHECK(hash("ndfla", 16)==34830); 1.1234 + CHECK(hash("ndflal", 16)==36910); 1.1235 + CHECK(hash("ndflale", 16)==34586); 1.1236 + CHECK(hash("ndflalem", 16)==37658); 1.1237 + CHECK(hash("FAIL", 16)==37458); // WILL FAIL 1.1238 + CHECK(hash("ndflaleme", 16)==45559); 1.1239 + CHECK(hash("ol2j34lj", 10)==318); 1.1240 + CHECK(hash("asdfas23", 10)==503); 1.1241 + CHECK(hash("9um3o34fd", 10)==758); 1.1242 + CHECK(hash("a234d", 10)==281); 1.1243 + CHECK(hash("a3f", 10)==767); 1.1244 + CHECK(hash("3nja9", 10)==581); 1.1245 + CHECK(hash("2459345938032343", 10)==146); 1.1246 + CHECK(hash("7777a88a8a8a8", 10)==342); 1.1247 + CHECK(hash("asdfjklkj3k", 10)==861); 1.1248 + CHECK(hash("adfk32l", 10)==1011); 1.1249 + CHECK(hash("zzzzzzzzzzz", 10)==944); 1.1250 + 1.1251 + CHECK(hash("3nja9", 10)==581); 1.1252 + CHECK(hash("3NJA9", 10)==581); 1.1253 + CHECK(hash("3nJa9", 10)==581); 1.1254 + 1.1255 + printf ("hash(FAIL, 16) = %u\n", hash("FAIL", 16)); 1.1256 + return 0; 1.1257 +} 1.1258 +#endif