Mercurial > hg > index.fcgi > gift-gnutella > gift-gnutella-0.0.11-1pba
comparison src/gt_query_route.c @ 0:d39e1d0d75b6
initial add
author | paulo@hit-nxdomain.opendns.com |
---|---|
date | Sat, 20 Feb 2010 21:18:28 -0800 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:b75034ae26ad |
---|---|
1 /* | |
2 * $Id: gt_query_route.c,v 1.46 2004/04/05 07:56:54 hipnod Exp $ | |
3 * | |
4 * Copyright (C) 2001-2003 giFT project (gift.sourceforge.net) | |
5 * | |
6 * This program is free software; you can redistribute it and/or modify it | |
7 * under the terms of the GNU General Public License as published by the | |
8 * Free Software Foundation; either version 2, or (at your option) any | |
9 * later version. | |
10 * | |
11 * This program is distributed in the hope that it will be useful, but | |
12 * WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |
14 * General Public License for more details. | |
15 */ | |
16 | |
17 #include "gt_gnutella.h" | |
18 | |
19 #include "gt_query_route.h" | |
20 #include "gt_packet.h" | |
21 #include "gt_utils.h" | |
22 | |
23 #include "gt_node.h" | |
24 #include "gt_node_list.h" | |
25 | |
26 #include <libgift/stopwatch.h> | |
27 | |
28 #include <zlib.h> | |
29 | |
30 /*****************************************************************************/ | |
31 | |
32 /* | |
33 * TODO: | |
34 * - compact table to bit-level representation | |
35 * - support incremental updates of table | |
36 * - cut-off entries when MAX_FILL_RATIO is reached | |
37 */ | |
38 #define MIN_TABLE_BITS (16) /* 16 bits */ | |
39 #define MAX_TABLE_BITS (21) /* 21 bits */ | |
40 #define MIN_TABLE_SIZE (1UL << (MIN_TABLE_BITS - 1)) /* 32k bytes */ | |
41 #define MAX_TABLE_SIZE (1UL << (MAX_TABLE_BITS - 1)) /* 1M bytes */ | |
42 #define MIN_TABLE_SLOTS (MIN_TABLE_SIZE * 2) /* 64k slots */ | |
43 #define MAX_TABLE_SLOTS (MAX_TABLE_SIZE * 2) /* 2M slots */ | |
44 #define INC_FILL_RATIO (0.70) /* 0.7% */ | |
45 #define MAX_FILL_RATIO (1.00) /* 1% (unused)*/ | |
46 | |
47 /* | |
48 * magical constant necessary for the query routing hash function | |
49 */ | |
50 #define MULTIPLIER 0x4F1BBCDC | |
51 | |
52 /* | |
53 * How often to synchronize the QRT with ultrapeers. | |
54 * | |
55 * This is very big because we don't support incremental updating | |
56 * yet. | |
57 */ | |
58 #define QRT_UPDATE_INTERVAL (20 * MINUTES) | |
59 | |
60 /* | |
61 * How often we check to build a compressed patch of local shares. | |
62 */ | |
63 #define QRT_BUILD_INTERVAL (3 * SECONDS) | |
64 | |
65 /* | |
66 * How long to wait after the first query_route_table_submit() before | |
67 * actually submitting the table. | |
68 */ | |
69 #define QRT_SUBMIT_DELAY (1 * MINUTES) | |
70 | |
71 /* | |
72 * Largest hops value in table. It looks like Limewire hardcodes | |
73 * this as 7, and won't understand any other value. | |
74 */ | |
75 #define INFINITY 7 | |
76 | |
77 /* | |
78 * Constants for changing route table. | |
79 */ | |
80 #define QRT_KEYWORD_ADD (0x0a) /* -6 */ | |
81 #define QRT_KEYWORD_REMOVE (0x06) /* 6 */ | |
82 | |
83 /* | |
84 * The minimum length of a keyword | |
85 */ | |
86 #define QRP_MIN_LENGTH 3 | |
87 | |
88 /* | |
89 * Maximum patch fragment size to send | |
90 */ | |
91 #define PATCH_FRAGSIZE 2048 | |
92 | |
93 /* | |
94 * Number of bits in the patches we send | |
95 */ | |
96 #define PATCH_BITS 4 | |
97 | |
98 /* | |
99 * Holds a 32-bit index describing each token this node shares. | |
100 */ | |
101 struct qrp_route_entry | |
102 { | |
103 int ref; /* number of references to this index */ | |
104 uint32_t index; /* the actual position */ | |
105 }; | |
106 | |
107 struct qrp_route_table | |
108 { | |
109 uint8_t *table; | |
110 size_t bits; | |
111 size_t size; | |
112 size_t slots; | |
113 size_t present; | |
114 size_t shared; | |
115 }; | |
116 | |
117 /*****************************************************************************/ | |
118 | |
119 /* | |
120 * The set of indices that are currently marked "present" in the | |
121 * routing table. | |
122 */ | |
123 static Dataset *indices; | |
124 | |
125 /* | |
126 * This holds the compressed table as a full QRP patch (incremental | |
127 * updates not supported yet). | |
128 */ | |
129 static uint8_t *compressed_table; | |
130 static size_t compressed_slots; | |
131 static size_t compressed_size; | |
132 | |
133 /* | |
134 * Keeps track of how many times the compressed table has changed. | |
135 * Used to avoid sending updates when not necessary. | |
136 */ | |
137 static int compressed_version; | |
138 | |
139 /* | |
140 * Timer that builds the compressed patch for the table submitted | |
141 * to peers. | |
142 */ | |
143 static timer_id build_timer; | |
144 | |
145 /* | |
146 * Whether we have to rebuild the table when shares are done | |
147 * syncing. | |
148 */ | |
149 static BOOL table_changed; | |
150 | |
151 /* | |
152 * The full query-routing table in binary form. This will get | |
153 * compressed before transmission. | |
154 */ | |
155 static struct qrp_route_table *route_table; | |
156 | |
157 /*****************************************************************************/ | |
158 | |
159 /* hash function used for query-routing */ | |
160 uint32_t gt_query_router_hash_str (char *str, size_t bits) | |
161 { | |
162 uint32_t hash; | |
163 unsigned int i; | |
164 unsigned char c; | |
165 | |
166 i = hash = 0; | |
167 | |
168 while ((c = *str++) && !isspace (c)) | |
169 { | |
170 hash ^= tolower (c) << (i * 8); | |
171 | |
172 /* using & instead of %, sorry */ | |
173 i = (i+1) & 0x03; | |
174 } | |
175 | |
176 return (hash * MULTIPLIER) >> (32 - bits); | |
177 } | |
178 | |
179 /*****************************************************************************/ | |
180 | |
181 static struct qrp_route_table *qrp_route_table_new (size_t bits) | |
182 { | |
183 struct qrp_route_table *qrt; | |
184 | |
185 if (!(qrt = MALLOC (sizeof (struct qrp_route_table)))) | |
186 return NULL; | |
187 | |
188 qrt->bits = bits; | |
189 qrt->size = (1UL << (bits - 1)); | |
190 qrt->slots = qrt->size * 2; /* 4-bit entries only */ | |
191 | |
192 if (!(qrt->table = MALLOC (qrt->size))) | |
193 { | |
194 free (qrt); | |
195 return NULL; | |
196 } | |
197 | |
198 return qrt; | |
199 } | |
200 | |
201 static void qrp_route_table_free (struct qrp_route_table *qrt) | |
202 { | |
203 if (!qrt) | |
204 return; | |
205 | |
206 free (qrt->table); | |
207 free (qrt); | |
208 } | |
209 | |
210 static void qrp_route_table_insert (struct qrp_route_table *qrt, uint32_t index) | |
211 { | |
212 uint8_t old_entry; | |
213 int set_lower; | |
214 int entry; | |
215 | |
216 if (!qrt) | |
217 return; | |
218 | |
219 assert (index < qrt->size * 2); | |
220 | |
221 set_lower = index % 2; | |
222 entry = index / 2; | |
223 | |
224 if (set_lower) | |
225 { | |
226 old_entry = qrt->table[entry] & 0x0f; | |
227 qrt->table[entry] = (qrt->table[entry] & 0xf0) | | |
228 ((QRT_KEYWORD_ADD) & 0x0f); | |
229 } | |
230 else | |
231 { | |
232 old_entry = (qrt->table[entry] & 0xf0) >> 4; | |
233 qrt->table[entry] = (qrt->table[entry] & 0x0f) | | |
234 ((QRT_KEYWORD_ADD << 4) & 0xf0); | |
235 } | |
236 | |
237 assert (old_entry == 0 || old_entry == QRT_KEYWORD_REMOVE); | |
238 #if 0 | |
239 GT->dbg (GT, "+%u [%d/%d]", index, entry, set_lower); | |
240 #endif | |
241 | |
242 qrt->present++; | |
243 } | |
244 | |
245 #if 0 | |
246 /* untested */ | |
247 static void qrp_route_table_remove (struct qrp_route_table *qrt, uint32_t index) | |
248 { | |
249 uint8_t old_entry; | |
250 int clear_lower; | |
251 int entry; | |
252 | |
253 if (!qrt) | |
254 return; | |
255 | |
256 assert (index < qrt->size * 2); | |
257 | |
258 clear_lower = index % 2; | |
259 entry = index / 2; | |
260 | |
261 /* | |
262 * This needs to change when doing incremental updating... | |
263 */ | |
264 | |
265 if (clear_lower) | |
266 { | |
267 old_entry = qrt->table[entry] & 0x0f; | |
268 qrt->table[entry] = (qrt->table[entry] & 0xf0) | | |
269 ((QRT_KEYWORD_REMOVE) & 0x0f); | |
270 } | |
271 else | |
272 { | |
273 old_entry = (qrt->table[entry] & 0xf0) >> 4; | |
274 qrt->table[entry] = (qrt->table[entry] & 0x0f) | | |
275 ((QRT_KEYWORD_REMOVE << 4) & 0xf0); | |
276 } | |
277 | |
278 assert (old_entry == (uint8_t) QRT_KEYWORD_ADD); | |
279 #if 0 | |
280 GT->dbg (GT, "-%u [%d/%d]", index, entry, clear_lower); | |
281 #endif | |
282 | |
283 qrt->present--; | |
284 } | |
285 #endif | |
286 | |
287 static BOOL qrp_route_table_lookup (struct qrp_route_table *qrt, uint32_t index) | |
288 { | |
289 int check_lower; | |
290 uint32_t entry; | |
291 | |
292 if (!qrt) | |
293 return FALSE; | |
294 | |
295 assert (index < qrt->slots); | |
296 assert (qrt->slots == qrt->size * 2); | |
297 | |
298 check_lower = index % 2; | |
299 entry = index / 2; | |
300 | |
301 if (check_lower) | |
302 { | |
303 if ((qrt->table[entry] & 0x0f) == QRT_KEYWORD_ADD) | |
304 return TRUE; | |
305 } | |
306 else | |
307 { | |
308 if (((qrt->table[entry] & 0xf0) >> 4) == QRT_KEYWORD_ADD) | |
309 return TRUE; | |
310 } | |
311 | |
312 return FALSE; | |
313 } | |
314 | |
315 static double qrp_route_table_fill_ratio (struct qrp_route_table *qrt) | |
316 { | |
317 return (double)qrt->present * 100 / qrt->slots; | |
318 } | |
319 | |
320 /*****************************************************************************/ | |
321 | |
322 static char *zlib_strerror (int error) | |
323 { | |
324 switch (error) | |
325 { | |
326 case Z_OK: return "OK"; | |
327 case Z_STREAM_END: return "End of stream"; | |
328 case Z_NEED_DICT: return "Decompressing dictionary needed"; | |
329 case Z_STREAM_ERROR: return "Stream error"; | |
330 case Z_ERRNO: return "Generic zlib error"; | |
331 case Z_DATA_ERROR: return "Data error"; | |
332 case Z_MEM_ERROR: return "Memory error"; | |
333 case Z_BUF_ERROR: return "Buffer error"; | |
334 case Z_VERSION_ERROR: return "Incompatible runtime zlib library"; | |
335 default: break; | |
336 } | |
337 | |
338 return "Invalid zlib error code"; | |
339 } | |
340 | |
341 /* TODO: make this use a stream-like interface */ | |
342 static uint8_t *compress_table (uint8_t *table, size_t in_size, size_t *out_size) | |
343 { | |
344 z_streamp out; | |
345 int ret; | |
346 uint8_t *out_buf; | |
347 int free_size; | |
348 | |
349 *out_size = 0; | |
350 | |
351 if (!(out = MALLOC (sizeof(*out)))) | |
352 return NULL; | |
353 | |
354 out->zalloc = Z_NULL; | |
355 out->zfree = Z_NULL; | |
356 out->opaque = Z_NULL; | |
357 | |
358 if ((ret = deflateInit (out, Z_DEFAULT_COMPRESSION)) != Z_OK) | |
359 { | |
360 GT->DBGFN (GT, "deflateInit error: %s", zlib_strerror (ret)); | |
361 free (out); | |
362 return NULL; | |
363 } | |
364 | |
365 /* allocate initial buffer */ | |
366 free_size = in_size + in_size / 100; | |
367 | |
368 if (!(out_buf = malloc (free_size))) | |
369 { | |
370 free (out_buf); | |
371 deflateEnd (out); | |
372 free (out); | |
373 return NULL; | |
374 } | |
375 | |
376 out->next_in = table; | |
377 out->avail_in = in_size; | |
378 out->next_out = out_buf; | |
379 out->avail_out = free_size; | |
380 | |
381 if ((ret = deflate (out, Z_FINISH)) != Z_STREAM_END) | |
382 { | |
383 GT->DBGFN (GT, "compression error: %s", zlib_strerror (ret)); | |
384 free (out_buf); | |
385 deflateEnd (out); | |
386 free (out); | |
387 return NULL; | |
388 } | |
389 | |
390 /* | |
391 * This could theoretically fail I guess. If it does, we shouldn't keep | |
392 * the table at least. | |
393 */ | |
394 assert (out->avail_in == 0); | |
395 | |
396 *out_size = free_size - out->avail_out; | |
397 | |
398 deflateEnd (out); | |
399 free (out); | |
400 | |
401 return out_buf; | |
402 } | |
403 | |
404 #if 0 | |
405 /* send the a QRP table to nodes we haven't sent a real table yet */ | |
406 static GtNode *update_nodes (TCPC *c, GtNode *node, void *udata) | |
407 { | |
408 assert (node->state == GT_NODE_CONNECTED); | |
409 assert (GT_CONN(node) == c); | |
410 | |
411 /* | |
412 * If the counter is not 0, we sent a table to this node already. | |
413 * So, wait for the timer to pick that up. | |
414 */ | |
415 if (node->query_router_counter != 0) | |
416 return NULL; | |
417 | |
418 /* submit the table */ | |
419 query_route_table_submit (c); | |
420 | |
421 /* reset the submit timer */ | |
422 if (GT_NODE(c)->query_route_timer != 0) | |
423 timer_reset (GT_NODE(c)->query_route_timer); | |
424 | |
425 return NULL; | |
426 } | |
427 #endif | |
428 | |
429 static void add_index (ds_data_t *key, ds_data_t *value, | |
430 struct qrp_route_table *qrt) | |
431 { | |
432 struct qrp_route_entry *entry = value->data; | |
433 uint32_t slot; | |
434 | |
435 /* grab only the most significant bits of the entry */ | |
436 slot = entry->index >> (32 - qrt->bits); | |
437 | |
438 /* | |
439 * If the entry already exists in the table, bump shared entries and | |
440 * forget about this entry. | |
441 */ | |
442 if (qrp_route_table_lookup (qrt, slot)) | |
443 { | |
444 qrt->shared++; | |
445 return; | |
446 } | |
447 | |
448 qrp_route_table_insert (qrt, slot); | |
449 } | |
450 | |
451 static void build_uncompressed (struct qrp_route_table *qrt) | |
452 { | |
453 dataset_foreach (indices, DS_FOREACH(add_index), qrt); | |
454 } | |
455 | |
456 static int build_qrp_table (void *udata) | |
457 { | |
458 uint8_t *new_table; | |
459 size_t new_size; | |
460 StopWatch *sw; | |
461 double elapsed; | |
462 double fill_ratio; | |
463 | |
464 if (!route_table && !(route_table = qrp_route_table_new (MIN_TABLE_BITS))) | |
465 { | |
466 /* try again later */ | |
467 return TRUE; | |
468 } | |
469 | |
470 sw = stopwatch_new (TRUE); | |
471 | |
472 /* build a table from the indices */ | |
473 build_uncompressed (route_table); | |
474 | |
475 stopwatch_stop (sw); | |
476 | |
477 elapsed = stopwatch_elapsed (sw, NULL); | |
478 | |
479 fill_ratio = qrp_route_table_fill_ratio (route_table); | |
480 | |
481 GT->DBGFN (GT, "%.4lfs elapsed building", elapsed); | |
482 GT->DBGFN (GT, "present=%u shared=%u size=%u", route_table->present, | |
483 route_table->shared, route_table->size); | |
484 GT->DBGFN (GT, "fill ratio=%.4lf%%", fill_ratio); | |
485 | |
486 /* | |
487 * If the fill ratio is greater than an acceptable threshold, | |
488 * and we haven't reached the maximum table size allowed, | |
489 * rebuild a larger routing table. | |
490 */ | |
491 if (fill_ratio >= INC_FILL_RATIO && route_table->bits < MAX_TABLE_BITS) | |
492 { | |
493 struct qrp_route_table *new_table; | |
494 | |
495 /* | |
496 * If we don't have enough memory to build the new table, fall | |
497 * through and compress the existing table. This would only happen | |
498 * if this node has a very small amount of memory. | |
499 */ | |
500 if ((new_table = qrp_route_table_new (route_table->bits + 1))) | |
501 { | |
502 qrp_route_table_free (route_table); | |
503 route_table = new_table; | |
504 | |
505 /* retry the build later, it's kinda expensive */ | |
506 stopwatch_free (sw); | |
507 return TRUE; | |
508 } | |
509 } | |
510 | |
511 stopwatch_start (sw); | |
512 | |
513 /* compress a new table */ | |
514 new_table = compress_table (route_table->table, | |
515 route_table->size, | |
516 &new_size); | |
517 | |
518 elapsed = stopwatch_free_elapsed (sw); | |
519 | |
520 GT->DBGFN (GT, "%.4lfs elapsed compressing", elapsed); | |
521 GT->DBGFN (GT, "compressed size=%lu", new_size); | |
522 | |
523 if (!new_table) | |
524 return TRUE; | |
525 | |
526 assert (new_size > 0); | |
527 | |
528 /* | |
529 * Replace the old compressed table | |
530 */ | |
531 free (compressed_table); | |
532 | |
533 compressed_table = new_table; | |
534 compressed_size = new_size; | |
535 compressed_slots = route_table->slots; | |
536 | |
537 compressed_version++; | |
538 | |
539 if (!compressed_version) | |
540 compressed_version++; | |
541 | |
542 /* | |
543 * An optimization to reduce memory usage: realloc the | |
544 * compressed table to the smaller size. | |
545 */ | |
546 if ((new_table = realloc (new_table, new_size))) | |
547 compressed_table = new_table; | |
548 | |
549 #if 0 | |
550 /* update nodes with this table */ | |
551 gt_conn_foreach (GT_CONN_FOREACH(update_nodes), NULL, | |
552 GT_NODE_ULTRA, GT_NODE_CONNECTED, 0); | |
553 #endif | |
554 | |
555 /* | |
556 * Temporary optimization: we can free the uncompressed | |
557 * route table now, as it is unused. This is a dubious optimization | |
558 * though because the table will probably hang around in | |
559 * the future when incremental updating works. | |
560 */ | |
561 qrp_route_table_free (route_table); | |
562 route_table = NULL; | |
563 | |
564 /* remove the timer, as the table is now up to date */ | |
565 build_timer = 0; | |
566 return FALSE; | |
567 } | |
568 | |
569 static int start_build (void *udata) | |
570 { | |
571 build_timer = timer_add (QRT_BUILD_INTERVAL, | |
572 (TimerCallback)build_qrp_table, NULL); | |
573 return FALSE; | |
574 } | |
575 | |
576 static void start_build_timer (void) | |
577 { | |
578 if (build_timer) | |
579 return; | |
580 | |
581 /* | |
582 * If we don't have a compressed table, we haven't built | |
583 * the table before, so build it soon. Otherwise, | |
584 * we won't submit it for a while anyway, so build it | |
585 * at half the update interval. | |
586 */ | |
587 if (compressed_table) | |
588 { | |
589 build_timer = timer_add (QRT_UPDATE_INTERVAL / 2, | |
590 (TimerCallback)start_build, NULL); | |
591 return; | |
592 } | |
593 | |
594 start_build (NULL); | |
595 } | |
596 | |
597 /*****************************************************************************/ | |
598 | |
599 /* TODO: this should be moved to GT_SELF */ | |
600 uint8_t *gt_query_router_self (size_t *size, int *version) | |
601 { | |
602 if (!compressed_table) | |
603 return NULL; | |
604 | |
605 assert (size != NULL && version != NULL); | |
606 | |
607 *size = compressed_size; | |
608 *version = compressed_version; | |
609 | |
610 return compressed_table; | |
611 } | |
612 | |
613 static int free_entries (ds_data_t *key, ds_data_t *value, void *udata) | |
614 { | |
615 struct qrp_route_entry *entry = value->data; | |
616 | |
617 free (entry); | |
618 | |
619 return DS_CONTINUE | DS_REMOVE; | |
620 } | |
621 | |
622 void gt_query_router_self_destroy (void) | |
623 { | |
624 timer_remove_zero (&build_timer); | |
625 | |
626 qrp_route_table_free (route_table); | |
627 route_table = NULL; | |
628 | |
629 free (compressed_table); | |
630 compressed_table = NULL; | |
631 compressed_slots = 0; | |
632 compressed_size = 0; | |
633 compressed_version = 0; | |
634 | |
635 dataset_foreach_ex (indices, DS_FOREACH_EX(free_entries), NULL); | |
636 dataset_clear (indices); | |
637 indices = NULL; | |
638 } | |
639 | |
640 void gt_query_router_self_init (void) | |
641 { | |
642 indices = dataset_new (DATASET_HASH); | |
643 } | |
644 | |
645 /*****************************************************************************/ | |
646 | |
647 static uint32_t *append_token (uint32_t *tokens, size_t *len, | |
648 size_t pos, uint32_t tok) | |
649 { | |
650 if (pos >= *len) | |
651 { | |
652 uint32_t *new_tokens; | |
653 | |
654 *(len) += 8; | |
655 new_tokens = realloc (tokens, *len * sizeof (uint32_t)); | |
656 | |
657 assert (new_tokens != NULL); | |
658 tokens = new_tokens; | |
659 } | |
660 | |
661 tokens[pos] = tok; | |
662 return tokens; | |
663 } | |
664 | |
665 static uint32_t *tokenize (char *hpath, size_t *r_len) | |
666 { | |
667 uint32_t *tokens; | |
668 int count; | |
669 size_t len; | |
670 char *str, *str0; | |
671 char *next; | |
672 | |
673 if (!(str0 = str = STRDUP (hpath))) | |
674 return NULL; | |
675 | |
676 tokens = NULL; | |
677 len = 0; | |
678 count = 0; | |
679 | |
680 while ((next = string_sep_set (&str, QRP_DELIMITERS)) != NULL) | |
681 { | |
682 uint32_t tok; | |
683 | |
684 if (string_isempty (next)) | |
685 continue; | |
686 | |
687 /* don't add keywords that are too small */ | |
688 if (strlen (next) < QRP_MIN_LENGTH) | |
689 continue; | |
690 | |
691 tok = gt_query_router_hash_str (next, 32); | |
692 tokens = append_token (tokens, &len, count++, tok); | |
693 } | |
694 | |
695 *r_len = count; | |
696 | |
697 free (str0); | |
698 | |
699 return tokens; | |
700 } | |
701 | |
702 void gt_query_router_self_add (FileShare *file) | |
703 { | |
704 uint32_t *tokens, *tokens0; | |
705 uint32_t tok; | |
706 size_t len; | |
707 int i; | |
708 struct qrp_route_entry *entry; | |
709 | |
710 tokens0 = tokens = tokenize (share_get_hpath (file), &len); | |
711 | |
712 assert (tokens != NULL); | |
713 assert (len > 0); | |
714 | |
715 for (i = 0; i < len; i++) | |
716 { | |
717 tok = tokens[i]; | |
718 | |
719 if ((entry = dataset_lookup (indices, &tok, sizeof (tok)))) | |
720 { | |
721 entry->ref++; | |
722 continue; | |
723 } | |
724 | |
725 /* | |
726 * Create a new index and add it to the table. | |
727 */ | |
728 if (!(entry = malloc (sizeof (struct qrp_route_entry)))) | |
729 continue; | |
730 | |
731 entry->ref = 1; | |
732 entry->index = tok; | |
733 | |
734 dataset_insert (&indices, &tok, sizeof (tok), entry, 0); | |
735 | |
736 table_changed = TRUE; | |
737 } | |
738 | |
739 free (tokens0); | |
740 } | |
741 | |
742 void gt_query_router_self_remove (FileShare *file) | |
743 { | |
744 uint32_t *tokens, *tokens0; | |
745 uint32_t tok; | |
746 size_t len; | |
747 int i; | |
748 struct qrp_route_entry *entry; | |
749 | |
750 tokens0 = tokens = tokenize (share_get_hpath (file), &len); | |
751 | |
752 assert (tokens != NULL); | |
753 assert (len > 0); | |
754 | |
755 for (i = 0; i < len; i++) | |
756 { | |
757 tok = tokens[i]; | |
758 | |
759 entry = dataset_lookup (indices, &tok, sizeof (tok)); | |
760 assert (entry != NULL); | |
761 | |
762 if (--entry->ref > 0) | |
763 continue; | |
764 | |
765 dataset_remove (indices, &tok, sizeof (tok)); | |
766 | |
767 table_changed = TRUE; | |
768 } | |
769 | |
770 free (tokens0); | |
771 } | |
772 | |
773 void gt_query_router_self_sync (BOOL begin) | |
774 { | |
775 if (!begin && table_changed) | |
776 { | |
777 start_build_timer(); | |
778 table_changed = FALSE; | |
779 } | |
780 } | |
781 | |
782 /*****************************************************************************/ | |
783 | |
784 int query_patch_open (GtQueryRouter *router, int seq_size, int compressed, | |
785 size_t max_size) | |
786 { | |
787 GtQueryPatch *new_patch; | |
788 | |
789 if (!(new_patch = malloc (sizeof (GtQueryPatch)))) | |
790 return FALSE; | |
791 | |
792 memset (new_patch, 0, sizeof (GtQueryPatch)); | |
793 | |
794 if (!(new_patch->stream = zlib_stream_open (max_size))) | |
795 { | |
796 free (new_patch); | |
797 return FALSE; | |
798 } | |
799 | |
800 new_patch->seq_size = seq_size; | |
801 new_patch->compressed = compressed; | |
802 | |
803 /* NOTE: sequence is 1-based, not 0-based */ | |
804 new_patch->seq_num = 1; | |
805 | |
806 router->patch = new_patch; | |
807 | |
808 return TRUE; | |
809 } | |
810 | |
811 void query_patch_close (GtQueryRouter *router) | |
812 { | |
813 GtQueryPatch *patch; | |
814 | |
815 GT->DBGFN (GT, "entered"); | |
816 | |
817 if (!router) | |
818 return; | |
819 | |
820 patch = router->patch; | |
821 | |
822 if (!patch) | |
823 return; | |
824 | |
825 zlib_stream_close (patch->stream); | |
826 | |
827 router->patch = NULL; | |
828 free (patch); | |
829 } | |
830 | |
831 /* TODO: compact router tables to bit-level */ | |
832 static void query_patch_apply (GtQueryRouter *router, int bits, char *data, | |
833 size_t data_size) | |
834 { | |
835 GtQueryPatch *patch; | |
836 char *table; /* NOTE: this must be signed */ | |
837 int i; | |
838 | |
839 patch = router->patch; | |
840 assert (patch != NULL); | |
841 | |
842 /* check for overflow: this may look wrong but its not */ | |
843 if (patch->table_pos + (data_size - 1) >= router->size) | |
844 { | |
845 GT->DBGFN (GT, "patch overflow: %u (max of %u)", | |
846 patch->table_pos+data_size, router->size); | |
847 query_patch_close (router); | |
848 return; | |
849 } | |
850 | |
851 table = router->table; | |
852 | |
853 /* hrm */ | |
854 if (bits == 4) | |
855 { | |
856 int j; | |
857 | |
858 for (i = 0; i < data_size; i++) | |
859 { | |
860 int pos; | |
861 char change; | |
862 | |
863 pos = i + patch->table_pos; | |
864 | |
865 /* avoid % */ | |
866 j = (i+1) & 0x1; | |
867 | |
868 /* grab the correct half of the byte and sign-extend it | |
869 * NOTE: this starts off with the most significant bits! */ | |
870 change = data[i] & (0x0f << (4 * j)); | |
871 | |
872 /* move to least significant bits | |
873 * TODO: does this do sign-extension correctly? */ | |
874 change >>= 4; | |
875 | |
876 table[pos] += change; | |
877 } | |
878 } | |
879 else if (bits == 8) | |
880 { | |
881 /* untested */ | |
882 for (i = 0; i < data_size; i++) | |
883 { | |
884 table[i + patch->table_pos] += data[i]; | |
885 } | |
886 } | |
887 else | |
888 { | |
889 GT->DBGFN (GT, "undefined bits value in query patch: %u", bits); | |
890 query_patch_close (router); | |
891 return; | |
892 } | |
893 | |
894 /* store the table position for the next patch */ | |
895 patch->table_pos += i; | |
896 | |
897 /* cleanup the data if the patch is done */ | |
898 patch->seq_num++; | |
899 | |
900 if (patch->seq_num > patch->seq_size) | |
901 query_patch_close (router); | |
902 } | |
903 | |
904 /*****************************************************************************/ | |
905 | |
906 /* TODO: compact router tables to bit-level */ | |
907 GtQueryRouter *gt_query_router_new (size_t size, int infinity) | |
908 { | |
909 GtQueryRouter *router; | |
910 | |
911 if (size > MAX_TABLE_SIZE) | |
912 return NULL; | |
913 | |
914 if (!(router = malloc (sizeof (GtQueryRouter)))) | |
915 return NULL; | |
916 | |
917 memset (router, 0, sizeof (GtQueryRouter)); | |
918 | |
919 if (!(router->table = malloc (size))) | |
920 { | |
921 free (router->table); | |
922 return NULL; | |
923 } | |
924 | |
925 memset (router->table, infinity, size); | |
926 | |
927 router->size = size; | |
928 | |
929 return router; | |
930 } | |
931 | |
932 void gt_query_router_free (GtQueryRouter *router) | |
933 { | |
934 if (!router) | |
935 return; | |
936 | |
937 query_patch_close (router); | |
938 | |
939 free (router->table); | |
940 free (router); | |
941 } | |
942 | |
943 /*****************************************************************************/ | |
944 | |
945 static void print_hex (unsigned char *data, size_t size) | |
946 { | |
947 fprint_hex (stdout, data, size); | |
948 } | |
949 | |
950 void gt_query_router_update (GtQueryRouter *router, size_t seq_num, | |
951 size_t seq_size, int compressed, int bits, | |
952 unsigned char *zdata, size_t size) | |
953 { | |
954 GtQueryPatch *patch; | |
955 char *data; | |
956 | |
957 if (!router) | |
958 { | |
959 GT->DBGFN (GT, "null router"); | |
960 return; | |
961 } | |
962 | |
963 if (!router->patch) | |
964 { | |
965 if (!query_patch_open (router, seq_size, compressed, router->size)) | |
966 return; | |
967 } | |
968 | |
969 patch = router->patch; | |
970 | |
971 /* check for an invalid sequence number or size */ | |
972 if (patch->seq_size != seq_size || patch->seq_num != seq_num) | |
973 { | |
974 GT->DBGFN (GT, "bad patch: seq_size %u vs %u, seq_num %u vs %u", | |
975 patch->seq_size, seq_size, patch->seq_num, seq_num); | |
976 query_patch_close (router); | |
977 return; | |
978 } | |
979 | |
980 if (compressed != patch->compressed) | |
981 { | |
982 GT->DBGFN (GT, "tried to change encodings in patch"); | |
983 query_patch_close (router); | |
984 return; | |
985 } | |
986 | |
987 switch (compressed) | |
988 { | |
989 case 0x00: /* no compression */ | |
990 if (!zlib_stream_write (patch->stream, zdata, size)) | |
991 { | |
992 GT->DBGFN (GT, "error copying data"); | |
993 query_patch_close (router); | |
994 return; | |
995 } | |
996 | |
997 break; | |
998 | |
999 case 0x01: /* deflate */ | |
1000 printf ("zlib compressed data:\n"); | |
1001 print_hex (zdata, size); | |
1002 | |
1003 if (!zlib_stream_inflate (patch->stream, zdata, size)) | |
1004 { | |
1005 GT->DBGFN (GT, "error inflating data"); | |
1006 query_patch_close (router); | |
1007 return; | |
1008 } | |
1009 | |
1010 break; | |
1011 | |
1012 default: | |
1013 GT->DBGFN (GT, "unknown compression algorithm in query route patch"); | |
1014 return; | |
1015 } | |
1016 | |
1017 /* read the data in the stream */ | |
1018 if (!(size = zlib_stream_read (patch->stream, &data))) | |
1019 { | |
1020 GT->DBGFN (GT, "error calling zlib_stream_read"); | |
1021 query_patch_close (router); | |
1022 return; | |
1023 } | |
1024 | |
1025 printf ("after uncompressing:\n"); | |
1026 print_hex (data, size); | |
1027 | |
1028 /* apply the patch -- this will cleanup any data if necessary */ | |
1029 query_patch_apply (router, bits, data, size); | |
1030 | |
1031 print_hex (router->table, router->size); | |
1032 } | |
1033 | |
1034 /*****************************************************************************/ | |
1035 | |
1036 static void submit_empty_table (TCPC *c) | |
1037 { | |
1038 static char table[8] = { 0 }; | |
1039 int len; | |
1040 | |
1041 #if 0 | |
1042 size_t size; | |
1043 #endif | |
1044 | |
1045 GT->DBGFN (GT, "reseting route table for %s", net_ip_str (GT_NODE(c)->ip)); | |
1046 | |
1047 /* all slots in the table should be initialized to infinity, so it | |
1048 * should be "empty" on the remote node */ | |
1049 memset (table, 0, sizeof (table)); | |
1050 | |
1051 | |
1052 #if 0 | |
1053 /* TEST: set part of the table to -infinity to get queries */ | |
1054 size = sizeof (table); | |
1055 memset (table + (size + 1) / 2 - 1, -infinity, (size + 1) / 4); | |
1056 #endif | |
1057 | |
1058 /* format: <query-route-msg-type> <length of table> <infinity> */ | |
1059 if (gt_packet_send_fmt (c, GT_MSG_QUERY_ROUTE, NULL, 1, 0, | |
1060 "%c%lu%c", 0, (unsigned long) sizeof (table), | |
1061 INFINITY) < 0) | |
1062 { | |
1063 GT->DBGFN (GT, "error reseting table"); | |
1064 return; | |
1065 } | |
1066 | |
1067 len = sizeof (table); | |
1068 | |
1069 if (gt_packet_send_fmt (c, GT_MSG_QUERY_ROUTE, NULL, 1, 0, | |
1070 "%c%c%c%c%c%*p", | |
1071 1, 1, 1, 0, 8, len, table) < 0) | |
1072 { | |
1073 GT->DBGFN (GT, "error sending empty patch"); | |
1074 return; | |
1075 } | |
1076 } | |
1077 | |
1078 static void submit_table (TCPC *c, uint8_t *table, size_t size, size_t slots) | |
1079 { | |
1080 int infinity = INFINITY; | |
1081 int seq_size; | |
1082 int compressed; | |
1083 int seq_num; | |
1084 uint8_t *p; | |
1085 size_t send_size; | |
1086 | |
1087 /* XXX make table size settable at runtime */ | |
1088 | |
1089 /* send a reset table first */ | |
1090 if (gt_packet_send_fmt (c, GT_MSG_QUERY_ROUTE, NULL, 1, 0, | |
1091 "%c%lu%c", 0, (long)slots, infinity) < 0) | |
1092 { | |
1093 GT->DBGFN (GT, "error reseting table"); | |
1094 return; | |
1095 } | |
1096 | |
1097 /* Break the table into PATCH_FRAGSIZE-sized chunks, | |
1098 * and include any leftover portions. */ | |
1099 seq_size = size / PATCH_FRAGSIZE + | |
1100 (size % PATCH_FRAGSIZE == 0 ? 0 : 1); | |
1101 | |
1102 assert (seq_size < 256); | |
1103 #if 0 | |
1104 GT->dbg (GT, "sequence size: %u", seq_size); | |
1105 #endif | |
1106 | |
1107 p = table; | |
1108 compressed = TRUE; | |
1109 | |
1110 /* NOTE: patch sequence numbers start at 1 */ | |
1111 for (seq_num = 1; seq_num <= seq_size; seq_num++) | |
1112 { | |
1113 send_size = MIN (PATCH_FRAGSIZE, size); | |
1114 | |
1115 if (gt_packet_send_fmt (c, GT_MSG_QUERY_ROUTE, NULL, 1, 0, | |
1116 "%c%c%c%c%c%*p", | |
1117 /* QRP PATCH */ 1, | |
1118 seq_num, seq_size, compressed, | |
1119 /* bits */ PATCH_BITS, | |
1120 send_size, p) < 0) | |
1121 { | |
1122 GT->DBGFN (GT, "error sending QRT patch"); | |
1123 return; | |
1124 } | |
1125 | |
1126 size -= send_size; | |
1127 p += send_size; | |
1128 } | |
1129 } | |
1130 | |
1131 static BOOL update_qr_table (TCPC *c) | |
1132 { | |
1133 size_t size; | |
1134 int version; | |
1135 uint8_t *table; | |
1136 GtNode *node = GT_NODE(c); | |
1137 | |
1138 assert (node->state & GT_NODE_CONNECTED); | |
1139 | |
1140 table = gt_query_router_self (&size, &version); | |
1141 | |
1142 /* we may not have finished building a table yet */ | |
1143 if (!table) | |
1144 return TRUE; | |
1145 | |
1146 /* dont submit a table if this node is already up to date */ | |
1147 if (node->query_router_counter == version) | |
1148 return TRUE; | |
1149 | |
1150 /* HACK: this shouldn't be using compressed_slots */ | |
1151 submit_table (c, table, size, compressed_slots); | |
1152 | |
1153 /* store the version number of this table so | |
1154 * we dont resubmit unecessarily */ | |
1155 node->query_router_counter = version; | |
1156 | |
1157 return TRUE; | |
1158 } | |
1159 | |
1160 static BOOL submit_first_table (TCPC *c) | |
1161 { | |
1162 GtNode *node = GT_NODE(c); | |
1163 | |
1164 assert (node->state & GT_NODE_CONNECTED); | |
1165 | |
1166 update_qr_table (c); | |
1167 | |
1168 /* remove the first timer */ | |
1169 timer_remove (node->query_route_timer); | |
1170 | |
1171 /* set the timer for updating the table repeatedly */ | |
1172 node->query_route_timer = timer_add (QRT_UPDATE_INTERVAL, | |
1173 (TimerCallback)update_qr_table, c); | |
1174 | |
1175 /* we removed the timer, and must return FALSE */ | |
1176 return FALSE; | |
1177 } | |
1178 | |
1179 /* | |
1180 * Submit the query routing table for this node to another. | |
1181 * | |
1182 * This delays sending the table for while. This helps preserve our precious | |
1183 * upstream when we're looking for nodes to connect to, as this often | |
1184 * happens when we're in the middle of looking for more nodes. | |
1185 */ | |
1186 void query_route_table_submit (TCPC *c) | |
1187 { | |
1188 GtNode *node = GT_NODE(c); | |
1189 | |
1190 assert (node->query_route_timer == 0); | |
1191 | |
1192 /* save bandwidth with an empty table */ | |
1193 submit_empty_table (c); | |
1194 | |
1195 /* submit a real table later */ | |
1196 node->query_route_timer = timer_add (QRT_SUBMIT_DELAY, | |
1197 (TimerCallback)submit_first_table, c); | |
1198 } | |
1199 | |
1200 /*****************************************************************************/ | |
1201 /* TESTING */ | |
1202 | |
1203 #if 0 | |
1204 #define CHECK(x) do { \ | |
1205 if (!(x)) printf("FAILED: %s\n", #x); \ | |
1206 else printf("OK: %s\n", #x); \ | |
1207 } while (0) | |
1208 | |
1209 #define HASH(str, bits) \ | |
1210 printf ("hash " str ": %u\n", gt_query_router_hash_str (str, bits)) | |
1211 | |
1212 int main (int argc, char **argv) | |
1213 { | |
1214 #define hash gt_query_router_hash_str | |
1215 | |
1216 CHECK(hash("", 13)==0); | |
1217 CHECK(hash("eb", 13)==6791); | |
1218 CHECK(hash("ebc", 13)==7082); | |
1219 CHECK(hash("ebck", 13)==6698); | |
1220 CHECK(hash("ebckl", 13)==3179); | |
1221 CHECK(hash("ebcklm", 13)==3235); | |
1222 CHECK(hash("ebcklme", 13)==6438); | |
1223 CHECK(hash("ebcklmen", 13)==1062); | |
1224 CHECK(hash("ebcklmenq", 13)==3527); | |
1225 CHECK(hash("", 16)==0); | |
1226 CHECK(hash("n", 16)==65003); | |
1227 CHECK(hash("nd", 16)==54193); | |
1228 CHECK(hash("ndf", 16)==4953); | |
1229 CHECK(hash("ndfl", 16)==58201); | |
1230 CHECK(hash("ndfla", 16)==34830); | |
1231 CHECK(hash("ndflal", 16)==36910); | |
1232 CHECK(hash("ndflale", 16)==34586); | |
1233 CHECK(hash("ndflalem", 16)==37658); | |
1234 CHECK(hash("FAIL", 16)==37458); // WILL FAIL | |
1235 CHECK(hash("ndflaleme", 16)==45559); | |
1236 CHECK(hash("ol2j34lj", 10)==318); | |
1237 CHECK(hash("asdfas23", 10)==503); | |
1238 CHECK(hash("9um3o34fd", 10)==758); | |
1239 CHECK(hash("a234d", 10)==281); | |
1240 CHECK(hash("a3f", 10)==767); | |
1241 CHECK(hash("3nja9", 10)==581); | |
1242 CHECK(hash("2459345938032343", 10)==146); | |
1243 CHECK(hash("7777a88a8a8a8", 10)==342); | |
1244 CHECK(hash("asdfjklkj3k", 10)==861); | |
1245 CHECK(hash("adfk32l", 10)==1011); | |
1246 CHECK(hash("zzzzzzzzzzz", 10)==944); | |
1247 | |
1248 CHECK(hash("3nja9", 10)==581); | |
1249 CHECK(hash("3NJA9", 10)==581); | |
1250 CHECK(hash("3nJa9", 10)==581); | |
1251 | |
1252 printf ("hash(FAIL, 16) = %u\n", hash("FAIL", 16)); | |
1253 return 0; | |
1254 } | |
1255 #endif |