view src/gt_query_route.c @ 0:d39e1d0d75b6

initial add
author paulo@hit-nxdomain.opendns.com
date Sat, 20 Feb 2010 21:18:28 -0800
parents
children
line source
1 /*
2 * $Id: gt_query_route.c,v 1.46 2004/04/05 07:56:54 hipnod Exp $
3 *
4 * Copyright (C) 2001-2003 giFT project (gift.sourceforge.net)
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2, or (at your option) any
9 * later version.
10 *
11 * This program is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
15 */
17 #include "gt_gnutella.h"
19 #include "gt_query_route.h"
20 #include "gt_packet.h"
21 #include "gt_utils.h"
23 #include "gt_node.h"
24 #include "gt_node_list.h"
26 #include <libgift/stopwatch.h>
28 #include <zlib.h>
30 /*****************************************************************************/
32 /*
33 * TODO:
34 * - compact table to bit-level representation
35 * - support incremental updates of table
36 * - cut-off entries when MAX_FILL_RATIO is reached
37 */
38 #define MIN_TABLE_BITS (16) /* 16 bits */
39 #define MAX_TABLE_BITS (21) /* 21 bits */
40 #define MIN_TABLE_SIZE (1UL << (MIN_TABLE_BITS - 1)) /* 32k bytes */
41 #define MAX_TABLE_SIZE (1UL << (MAX_TABLE_BITS - 1)) /* 1M bytes */
42 #define MIN_TABLE_SLOTS (MIN_TABLE_SIZE * 2) /* 64k slots */
43 #define MAX_TABLE_SLOTS (MAX_TABLE_SIZE * 2) /* 2M slots */
44 #define INC_FILL_RATIO (0.70) /* 0.7% */
45 #define MAX_FILL_RATIO (1.00) /* 1% (unused)*/
47 /*
48 * magical constant necessary for the query routing hash function
49 */
50 #define MULTIPLIER 0x4F1BBCDC
52 /*
53 * How often to synchronize the QRT with ultrapeers.
54 *
55 * This is very big because we don't support incremental updating
56 * yet.
57 */
58 #define QRT_UPDATE_INTERVAL (20 * MINUTES)
60 /*
61 * How often we check to build a compressed patch of local shares.
62 */
63 #define QRT_BUILD_INTERVAL (3 * SECONDS)
65 /*
66 * How long to wait after the first query_route_table_submit() before
67 * actually submitting the table.
68 */
69 #define QRT_SUBMIT_DELAY (1 * MINUTES)
71 /*
72 * Largest hops value in table. It looks like Limewire hardcodes
73 * this as 7, and won't understand any other value.
74 */
75 #define INFINITY 7
77 /*
78 * Constants for changing route table.
79 */
80 #define QRT_KEYWORD_ADD (0x0a) /* -6 */
81 #define QRT_KEYWORD_REMOVE (0x06) /* 6 */
83 /*
84 * The minimum length of a keyword
85 */
86 #define QRP_MIN_LENGTH 3
88 /*
89 * Maximum patch fragment size to send
90 */
91 #define PATCH_FRAGSIZE 2048
93 /*
94 * Number of bits in the patches we send
95 */
96 #define PATCH_BITS 4
98 /*
99 * Holds a 32-bit index describing each token this node shares.
100 */
101 struct qrp_route_entry
102 {
103 int ref; /* number of references to this index */
104 uint32_t index; /* the actual position */
105 };
107 struct qrp_route_table
108 {
109 uint8_t *table;
110 size_t bits;
111 size_t size;
112 size_t slots;
113 size_t present;
114 size_t shared;
115 };
117 /*****************************************************************************/
119 /*
120 * The set of indices that are currently marked "present" in the
121 * routing table.
122 */
123 static Dataset *indices;
125 /*
126 * This holds the compressed table as a full QRP patch (incremental
127 * updates not supported yet).
128 */
129 static uint8_t *compressed_table;
130 static size_t compressed_slots;
131 static size_t compressed_size;
133 /*
134 * Keeps track of how many times the compressed table has changed.
135 * Used to avoid sending updates when not necessary.
136 */
137 static int compressed_version;
139 /*
140 * Timer that builds the compressed patch for the table submitted
141 * to peers.
142 */
143 static timer_id build_timer;
145 /*
146 * Whether we have to rebuild the table when shares are done
147 * syncing.
148 */
149 static BOOL table_changed;
151 /*
152 * The full query-routing table in binary form. This will get
153 * compressed before transmission.
154 */
155 static struct qrp_route_table *route_table;
157 /*****************************************************************************/
159 /* hash function used for query-routing */
160 uint32_t gt_query_router_hash_str (char *str, size_t bits)
161 {
162 uint32_t hash;
163 unsigned int i;
164 unsigned char c;
166 i = hash = 0;
168 while ((c = *str++) && !isspace (c))
169 {
170 hash ^= tolower (c) << (i * 8);
172 /* using & instead of %, sorry */
173 i = (i+1) & 0x03;
174 }
176 return (hash * MULTIPLIER) >> (32 - bits);
177 }
179 /*****************************************************************************/
181 static struct qrp_route_table *qrp_route_table_new (size_t bits)
182 {
183 struct qrp_route_table *qrt;
185 if (!(qrt = MALLOC (sizeof (struct qrp_route_table))))
186 return NULL;
188 qrt->bits = bits;
189 qrt->size = (1UL << (bits - 1));
190 qrt->slots = qrt->size * 2; /* 4-bit entries only */
192 if (!(qrt->table = MALLOC (qrt->size)))
193 {
194 free (qrt);
195 return NULL;
196 }
198 return qrt;
199 }
201 static void qrp_route_table_free (struct qrp_route_table *qrt)
202 {
203 if (!qrt)
204 return;
206 free (qrt->table);
207 free (qrt);
208 }
210 static void qrp_route_table_insert (struct qrp_route_table *qrt, uint32_t index)
211 {
212 uint8_t old_entry;
213 int set_lower;
214 int entry;
216 if (!qrt)
217 return;
219 assert (index < qrt->size * 2);
221 set_lower = index % 2;
222 entry = index / 2;
224 if (set_lower)
225 {
226 old_entry = qrt->table[entry] & 0x0f;
227 qrt->table[entry] = (qrt->table[entry] & 0xf0) |
228 ((QRT_KEYWORD_ADD) & 0x0f);
229 }
230 else
231 {
232 old_entry = (qrt->table[entry] & 0xf0) >> 4;
233 qrt->table[entry] = (qrt->table[entry] & 0x0f) |
234 ((QRT_KEYWORD_ADD << 4) & 0xf0);
235 }
237 assert (old_entry == 0 || old_entry == QRT_KEYWORD_REMOVE);
238 #if 0
239 GT->dbg (GT, "+%u [%d/%d]", index, entry, set_lower);
240 #endif
242 qrt->present++;
243 }
245 #if 0
246 /* untested */
247 static void qrp_route_table_remove (struct qrp_route_table *qrt, uint32_t index)
248 {
249 uint8_t old_entry;
250 int clear_lower;
251 int entry;
253 if (!qrt)
254 return;
256 assert (index < qrt->size * 2);
258 clear_lower = index % 2;
259 entry = index / 2;
261 /*
262 * This needs to change when doing incremental updating...
263 */
265 if (clear_lower)
266 {
267 old_entry = qrt->table[entry] & 0x0f;
268 qrt->table[entry] = (qrt->table[entry] & 0xf0) |
269 ((QRT_KEYWORD_REMOVE) & 0x0f);
270 }
271 else
272 {
273 old_entry = (qrt->table[entry] & 0xf0) >> 4;
274 qrt->table[entry] = (qrt->table[entry] & 0x0f) |
275 ((QRT_KEYWORD_REMOVE << 4) & 0xf0);
276 }
278 assert (old_entry == (uint8_t) QRT_KEYWORD_ADD);
279 #if 0
280 GT->dbg (GT, "-%u [%d/%d]", index, entry, clear_lower);
281 #endif
283 qrt->present--;
284 }
285 #endif
287 static BOOL qrp_route_table_lookup (struct qrp_route_table *qrt, uint32_t index)
288 {
289 int check_lower;
290 uint32_t entry;
292 if (!qrt)
293 return FALSE;
295 assert (index < qrt->slots);
296 assert (qrt->slots == qrt->size * 2);
298 check_lower = index % 2;
299 entry = index / 2;
301 if (check_lower)
302 {
303 if ((qrt->table[entry] & 0x0f) == QRT_KEYWORD_ADD)
304 return TRUE;
305 }
306 else
307 {
308 if (((qrt->table[entry] & 0xf0) >> 4) == QRT_KEYWORD_ADD)
309 return TRUE;
310 }
312 return FALSE;
313 }
315 static double qrp_route_table_fill_ratio (struct qrp_route_table *qrt)
316 {
317 return (double)qrt->present * 100 / qrt->slots;
318 }
320 /*****************************************************************************/
322 static char *zlib_strerror (int error)
323 {
324 switch (error)
325 {
326 case Z_OK: return "OK";
327 case Z_STREAM_END: return "End of stream";
328 case Z_NEED_DICT: return "Decompressing dictionary needed";
329 case Z_STREAM_ERROR: return "Stream error";
330 case Z_ERRNO: return "Generic zlib error";
331 case Z_DATA_ERROR: return "Data error";
332 case Z_MEM_ERROR: return "Memory error";
333 case Z_BUF_ERROR: return "Buffer error";
334 case Z_VERSION_ERROR: return "Incompatible runtime zlib library";
335 default: break;
336 }
338 return "Invalid zlib error code";
339 }
341 /* TODO: make this use a stream-like interface */
342 static uint8_t *compress_table (uint8_t *table, size_t in_size, size_t *out_size)
343 {
344 z_streamp out;
345 int ret;
346 uint8_t *out_buf;
347 int free_size;
349 *out_size = 0;
351 if (!(out = MALLOC (sizeof(*out))))
352 return NULL;
354 out->zalloc = Z_NULL;
355 out->zfree = Z_NULL;
356 out->opaque = Z_NULL;
358 if ((ret = deflateInit (out, Z_DEFAULT_COMPRESSION)) != Z_OK)
359 {
360 GT->DBGFN (GT, "deflateInit error: %s", zlib_strerror (ret));
361 free (out);
362 return NULL;
363 }
365 /* allocate initial buffer */
366 free_size = in_size + in_size / 100;
368 if (!(out_buf = malloc (free_size)))
369 {
370 free (out_buf);
371 deflateEnd (out);
372 free (out);
373 return NULL;
374 }
376 out->next_in = table;
377 out->avail_in = in_size;
378 out->next_out = out_buf;
379 out->avail_out = free_size;
381 if ((ret = deflate (out, Z_FINISH)) != Z_STREAM_END)
382 {
383 GT->DBGFN (GT, "compression error: %s", zlib_strerror (ret));
384 free (out_buf);
385 deflateEnd (out);
386 free (out);
387 return NULL;
388 }
390 /*
391 * This could theoretically fail I guess. If it does, we shouldn't keep
392 * the table at least.
393 */
394 assert (out->avail_in == 0);
396 *out_size = free_size - out->avail_out;
398 deflateEnd (out);
399 free (out);
401 return out_buf;
402 }
404 #if 0
405 /* send the a QRP table to nodes we haven't sent a real table yet */
406 static GtNode *update_nodes (TCPC *c, GtNode *node, void *udata)
407 {
408 assert (node->state == GT_NODE_CONNECTED);
409 assert (GT_CONN(node) == c);
411 /*
412 * If the counter is not 0, we sent a table to this node already.
413 * So, wait for the timer to pick that up.
414 */
415 if (node->query_router_counter != 0)
416 return NULL;
418 /* submit the table */
419 query_route_table_submit (c);
421 /* reset the submit timer */
422 if (GT_NODE(c)->query_route_timer != 0)
423 timer_reset (GT_NODE(c)->query_route_timer);
425 return NULL;
426 }
427 #endif
429 static void add_index (ds_data_t *key, ds_data_t *value,
430 struct qrp_route_table *qrt)
431 {
432 struct qrp_route_entry *entry = value->data;
433 uint32_t slot;
435 /* grab only the most significant bits of the entry */
436 slot = entry->index >> (32 - qrt->bits);
438 /*
439 * If the entry already exists in the table, bump shared entries and
440 * forget about this entry.
441 */
442 if (qrp_route_table_lookup (qrt, slot))
443 {
444 qrt->shared++;
445 return;
446 }
448 qrp_route_table_insert (qrt, slot);
449 }
451 static void build_uncompressed (struct qrp_route_table *qrt)
452 {
453 dataset_foreach (indices, DS_FOREACH(add_index), qrt);
454 }
456 static int build_qrp_table (void *udata)
457 {
458 uint8_t *new_table;
459 size_t new_size;
460 StopWatch *sw;
461 double elapsed;
462 double fill_ratio;
464 if (!route_table && !(route_table = qrp_route_table_new (MIN_TABLE_BITS)))
465 {
466 /* try again later */
467 return TRUE;
468 }
470 sw = stopwatch_new (TRUE);
472 /* build a table from the indices */
473 build_uncompressed (route_table);
475 stopwatch_stop (sw);
477 elapsed = stopwatch_elapsed (sw, NULL);
479 fill_ratio = qrp_route_table_fill_ratio (route_table);
481 GT->DBGFN (GT, "%.4lfs elapsed building", elapsed);
482 GT->DBGFN (GT, "present=%u shared=%u size=%u", route_table->present,
483 route_table->shared, route_table->size);
484 GT->DBGFN (GT, "fill ratio=%.4lf%%", fill_ratio);
486 /*
487 * If the fill ratio is greater than an acceptable threshold,
488 * and we haven't reached the maximum table size allowed,
489 * rebuild a larger routing table.
490 */
491 if (fill_ratio >= INC_FILL_RATIO && route_table->bits < MAX_TABLE_BITS)
492 {
493 struct qrp_route_table *new_table;
495 /*
496 * If we don't have enough memory to build the new table, fall
497 * through and compress the existing table. This would only happen
498 * if this node has a very small amount of memory.
499 */
500 if ((new_table = qrp_route_table_new (route_table->bits + 1)))
501 {
502 qrp_route_table_free (route_table);
503 route_table = new_table;
505 /* retry the build later, it's kinda expensive */
506 stopwatch_free (sw);
507 return TRUE;
508 }
509 }
511 stopwatch_start (sw);
513 /* compress a new table */
514 new_table = compress_table (route_table->table,
515 route_table->size,
516 &new_size);
518 elapsed = stopwatch_free_elapsed (sw);
520 GT->DBGFN (GT, "%.4lfs elapsed compressing", elapsed);
521 GT->DBGFN (GT, "compressed size=%lu", new_size);
523 if (!new_table)
524 return TRUE;
526 assert (new_size > 0);
528 /*
529 * Replace the old compressed table
530 */
531 free (compressed_table);
533 compressed_table = new_table;
534 compressed_size = new_size;
535 compressed_slots = route_table->slots;
537 compressed_version++;
539 if (!compressed_version)
540 compressed_version++;
542 /*
543 * An optimization to reduce memory usage: realloc the
544 * compressed table to the smaller size.
545 */
546 if ((new_table = realloc (new_table, new_size)))
547 compressed_table = new_table;
549 #if 0
550 /* update nodes with this table */
551 gt_conn_foreach (GT_CONN_FOREACH(update_nodes), NULL,
552 GT_NODE_ULTRA, GT_NODE_CONNECTED, 0);
553 #endif
555 /*
556 * Temporary optimization: we can free the uncompressed
557 * route table now, as it is unused. This is a dubious optimization
558 * though because the table will probably hang around in
559 * the future when incremental updating works.
560 */
561 qrp_route_table_free (route_table);
562 route_table = NULL;
564 /* remove the timer, as the table is now up to date */
565 build_timer = 0;
566 return FALSE;
567 }
569 static int start_build (void *udata)
570 {
571 build_timer = timer_add (QRT_BUILD_INTERVAL,
572 (TimerCallback)build_qrp_table, NULL);
573 return FALSE;
574 }
576 static void start_build_timer (void)
577 {
578 if (build_timer)
579 return;
581 /*
582 * If we don't have a compressed table, we haven't built
583 * the table before, so build it soon. Otherwise,
584 * we won't submit it for a while anyway, so build it
585 * at half the update interval.
586 */
587 if (compressed_table)
588 {
589 build_timer = timer_add (QRT_UPDATE_INTERVAL / 2,
590 (TimerCallback)start_build, NULL);
591 return;
592 }
594 start_build (NULL);
595 }
597 /*****************************************************************************/
599 /* TODO: this should be moved to GT_SELF */
600 uint8_t *gt_query_router_self (size_t *size, int *version)
601 {
602 if (!compressed_table)
603 return NULL;
605 assert (size != NULL && version != NULL);
607 *size = compressed_size;
608 *version = compressed_version;
610 return compressed_table;
611 }
613 static int free_entries (ds_data_t *key, ds_data_t *value, void *udata)
614 {
615 struct qrp_route_entry *entry = value->data;
617 free (entry);
619 return DS_CONTINUE | DS_REMOVE;
620 }
622 void gt_query_router_self_destroy (void)
623 {
624 timer_remove_zero (&build_timer);
626 qrp_route_table_free (route_table);
627 route_table = NULL;
629 free (compressed_table);
630 compressed_table = NULL;
631 compressed_slots = 0;
632 compressed_size = 0;
633 compressed_version = 0;
635 dataset_foreach_ex (indices, DS_FOREACH_EX(free_entries), NULL);
636 dataset_clear (indices);
637 indices = NULL;
638 }
640 void gt_query_router_self_init (void)
641 {
642 indices = dataset_new (DATASET_HASH);
643 }
645 /*****************************************************************************/
647 static uint32_t *append_token (uint32_t *tokens, size_t *len,
648 size_t pos, uint32_t tok)
649 {
650 if (pos >= *len)
651 {
652 uint32_t *new_tokens;
654 *(len) += 8;
655 new_tokens = realloc (tokens, *len * sizeof (uint32_t));
657 assert (new_tokens != NULL);
658 tokens = new_tokens;
659 }
661 tokens[pos] = tok;
662 return tokens;
663 }
665 static uint32_t *tokenize (char *hpath, size_t *r_len)
666 {
667 uint32_t *tokens;
668 int count;
669 size_t len;
670 char *str, *str0;
671 char *next;
673 if (!(str0 = str = STRDUP (hpath)))
674 return NULL;
676 tokens = NULL;
677 len = 0;
678 count = 0;
680 while ((next = string_sep_set (&str, QRP_DELIMITERS)) != NULL)
681 {
682 uint32_t tok;
684 if (string_isempty (next))
685 continue;
687 /* don't add keywords that are too small */
688 if (strlen (next) < QRP_MIN_LENGTH)
689 continue;
691 tok = gt_query_router_hash_str (next, 32);
692 tokens = append_token (tokens, &len, count++, tok);
693 }
695 *r_len = count;
697 free (str0);
699 return tokens;
700 }
702 void gt_query_router_self_add (FileShare *file)
703 {
704 uint32_t *tokens, *tokens0;
705 uint32_t tok;
706 size_t len;
707 int i;
708 struct qrp_route_entry *entry;
710 tokens0 = tokens = tokenize (share_get_hpath (file), &len);
712 assert (tokens != NULL);
713 assert (len > 0);
715 for (i = 0; i < len; i++)
716 {
717 tok = tokens[i];
719 if ((entry = dataset_lookup (indices, &tok, sizeof (tok))))
720 {
721 entry->ref++;
722 continue;
723 }
725 /*
726 * Create a new index and add it to the table.
727 */
728 if (!(entry = malloc (sizeof (struct qrp_route_entry))))
729 continue;
731 entry->ref = 1;
732 entry->index = tok;
734 dataset_insert (&indices, &tok, sizeof (tok), entry, 0);
736 table_changed = TRUE;
737 }
739 free (tokens0);
740 }
742 void gt_query_router_self_remove (FileShare *file)
743 {
744 uint32_t *tokens, *tokens0;
745 uint32_t tok;
746 size_t len;
747 int i;
748 struct qrp_route_entry *entry;
750 tokens0 = tokens = tokenize (share_get_hpath (file), &len);
752 assert (tokens != NULL);
753 assert (len > 0);
755 for (i = 0; i < len; i++)
756 {
757 tok = tokens[i];
759 entry = dataset_lookup (indices, &tok, sizeof (tok));
760 assert (entry != NULL);
762 if (--entry->ref > 0)
763 continue;
765 dataset_remove (indices, &tok, sizeof (tok));
767 table_changed = TRUE;
768 }
770 free (tokens0);
771 }
773 void gt_query_router_self_sync (BOOL begin)
774 {
775 if (!begin && table_changed)
776 {
777 start_build_timer();
778 table_changed = FALSE;
779 }
780 }
782 /*****************************************************************************/
784 int query_patch_open (GtQueryRouter *router, int seq_size, int compressed,
785 size_t max_size)
786 {
787 GtQueryPatch *new_patch;
789 if (!(new_patch = malloc (sizeof (GtQueryPatch))))
790 return FALSE;
792 memset (new_patch, 0, sizeof (GtQueryPatch));
794 if (!(new_patch->stream = zlib_stream_open (max_size)))
795 {
796 free (new_patch);
797 return FALSE;
798 }
800 new_patch->seq_size = seq_size;
801 new_patch->compressed = compressed;
803 /* NOTE: sequence is 1-based, not 0-based */
804 new_patch->seq_num = 1;
806 router->patch = new_patch;
808 return TRUE;
809 }
811 void query_patch_close (GtQueryRouter *router)
812 {
813 GtQueryPatch *patch;
815 GT->DBGFN (GT, "entered");
817 if (!router)
818 return;
820 patch = router->patch;
822 if (!patch)
823 return;
825 zlib_stream_close (patch->stream);
827 router->patch = NULL;
828 free (patch);
829 }
831 /* TODO: compact router tables to bit-level */
832 static void query_patch_apply (GtQueryRouter *router, int bits, char *data,
833 size_t data_size)
834 {
835 GtQueryPatch *patch;
836 char *table; /* NOTE: this must be signed */
837 int i;
839 patch = router->patch;
840 assert (patch != NULL);
842 /* check for overflow: this may look wrong but its not */
843 if (patch->table_pos + (data_size - 1) >= router->size)
844 {
845 GT->DBGFN (GT, "patch overflow: %u (max of %u)",
846 patch->table_pos+data_size, router->size);
847 query_patch_close (router);
848 return;
849 }
851 table = router->table;
853 /* hrm */
854 if (bits == 4)
855 {
856 int j;
858 for (i = 0; i < data_size; i++)
859 {
860 int pos;
861 char change;
863 pos = i + patch->table_pos;
865 /* avoid % */
866 j = (i+1) & 0x1;
868 /* grab the correct half of the byte and sign-extend it
869 * NOTE: this starts off with the most significant bits! */
870 change = data[i] & (0x0f << (4 * j));
872 /* move to least significant bits
873 * TODO: does this do sign-extension correctly? */
874 change >>= 4;
876 table[pos] += change;
877 }
878 }
879 else if (bits == 8)
880 {
881 /* untested */
882 for (i = 0; i < data_size; i++)
883 {
884 table[i + patch->table_pos] += data[i];
885 }
886 }
887 else
888 {
889 GT->DBGFN (GT, "undefined bits value in query patch: %u", bits);
890 query_patch_close (router);
891 return;
892 }
894 /* store the table position for the next patch */
895 patch->table_pos += i;
897 /* cleanup the data if the patch is done */
898 patch->seq_num++;
900 if (patch->seq_num > patch->seq_size)
901 query_patch_close (router);
902 }
904 /*****************************************************************************/
906 /* TODO: compact router tables to bit-level */
907 GtQueryRouter *gt_query_router_new (size_t size, int infinity)
908 {
909 GtQueryRouter *router;
911 if (size > MAX_TABLE_SIZE)
912 return NULL;
914 if (!(router = malloc (sizeof (GtQueryRouter))))
915 return NULL;
917 memset (router, 0, sizeof (GtQueryRouter));
919 if (!(router->table = malloc (size)))
920 {
921 free (router->table);
922 return NULL;
923 }
925 memset (router->table, infinity, size);
927 router->size = size;
929 return router;
930 }
932 void gt_query_router_free (GtQueryRouter *router)
933 {
934 if (!router)
935 return;
937 query_patch_close (router);
939 free (router->table);
940 free (router);
941 }
943 /*****************************************************************************/
945 static void print_hex (unsigned char *data, size_t size)
946 {
947 fprint_hex (stdout, data, size);
948 }
950 void gt_query_router_update (GtQueryRouter *router, size_t seq_num,
951 size_t seq_size, int compressed, int bits,
952 unsigned char *zdata, size_t size)
953 {
954 GtQueryPatch *patch;
955 char *data;
957 if (!router)
958 {
959 GT->DBGFN (GT, "null router");
960 return;
961 }
963 if (!router->patch)
964 {
965 if (!query_patch_open (router, seq_size, compressed, router->size))
966 return;
967 }
969 patch = router->patch;
971 /* check for an invalid sequence number or size */
972 if (patch->seq_size != seq_size || patch->seq_num != seq_num)
973 {
974 GT->DBGFN (GT, "bad patch: seq_size %u vs %u, seq_num %u vs %u",
975 patch->seq_size, seq_size, patch->seq_num, seq_num);
976 query_patch_close (router);
977 return;
978 }
980 if (compressed != patch->compressed)
981 {
982 GT->DBGFN (GT, "tried to change encodings in patch");
983 query_patch_close (router);
984 return;
985 }
987 switch (compressed)
988 {
989 case 0x00: /* no compression */
990 if (!zlib_stream_write (patch->stream, zdata, size))
991 {
992 GT->DBGFN (GT, "error copying data");
993 query_patch_close (router);
994 return;
995 }
997 break;
999 case 0x01: /* deflate */
1000 printf ("zlib compressed data:\n");
1001 print_hex (zdata, size);
1003 if (!zlib_stream_inflate (patch->stream, zdata, size))
1005 GT->DBGFN (GT, "error inflating data");
1006 query_patch_close (router);
1007 return;
1010 break;
1012 default:
1013 GT->DBGFN (GT, "unknown compression algorithm in query route patch");
1014 return;
1017 /* read the data in the stream */
1018 if (!(size = zlib_stream_read (patch->stream, &data)))
1020 GT->DBGFN (GT, "error calling zlib_stream_read");
1021 query_patch_close (router);
1022 return;
1025 printf ("after uncompressing:\n");
1026 print_hex (data, size);
1028 /* apply the patch -- this will cleanup any data if necessary */
1029 query_patch_apply (router, bits, data, size);
1031 print_hex (router->table, router->size);
1034 /*****************************************************************************/
1036 static void submit_empty_table (TCPC *c)
1038 static char table[8] = { 0 };
1039 int len;
1041 #if 0
1042 size_t size;
1043 #endif
1045 GT->DBGFN (GT, "reseting route table for %s", net_ip_str (GT_NODE(c)->ip));
1047 /* all slots in the table should be initialized to infinity, so it
1048 * should be "empty" on the remote node */
1049 memset (table, 0, sizeof (table));
1052 #if 0
1053 /* TEST: set part of the table to -infinity to get queries */
1054 size = sizeof (table);
1055 memset (table + (size + 1) / 2 - 1, -infinity, (size + 1) / 4);
1056 #endif
1058 /* format: <query-route-msg-type> <length of table> <infinity> */
1059 if (gt_packet_send_fmt (c, GT_MSG_QUERY_ROUTE, NULL, 1, 0,
1060 "%c%lu%c", 0, (unsigned long) sizeof (table),
1061 INFINITY) < 0)
1063 GT->DBGFN (GT, "error reseting table");
1064 return;
1067 len = sizeof (table);
1069 if (gt_packet_send_fmt (c, GT_MSG_QUERY_ROUTE, NULL, 1, 0,
1070 "%c%c%c%c%c%*p",
1071 1, 1, 1, 0, 8, len, table) < 0)
1073 GT->DBGFN (GT, "error sending empty patch");
1074 return;
1078 static void submit_table (TCPC *c, uint8_t *table, size_t size, size_t slots)
1080 int infinity = INFINITY;
1081 int seq_size;
1082 int compressed;
1083 int seq_num;
1084 uint8_t *p;
1085 size_t send_size;
1087 /* XXX make table size settable at runtime */
1089 /* send a reset table first */
1090 if (gt_packet_send_fmt (c, GT_MSG_QUERY_ROUTE, NULL, 1, 0,
1091 "%c%lu%c", 0, (long)slots, infinity) < 0)
1093 GT->DBGFN (GT, "error reseting table");
1094 return;
1097 /* Break the table into PATCH_FRAGSIZE-sized chunks,
1098 * and include any leftover portions. */
1099 seq_size = size / PATCH_FRAGSIZE +
1100 (size % PATCH_FRAGSIZE == 0 ? 0 : 1);
1102 assert (seq_size < 256);
1103 #if 0
1104 GT->dbg (GT, "sequence size: %u", seq_size);
1105 #endif
1107 p = table;
1108 compressed = TRUE;
1110 /* NOTE: patch sequence numbers start at 1 */
1111 for (seq_num = 1; seq_num <= seq_size; seq_num++)
1113 send_size = MIN (PATCH_FRAGSIZE, size);
1115 if (gt_packet_send_fmt (c, GT_MSG_QUERY_ROUTE, NULL, 1, 0,
1116 "%c%c%c%c%c%*p",
1117 /* QRP PATCH */ 1,
1118 seq_num, seq_size, compressed,
1119 /* bits */ PATCH_BITS,
1120 send_size, p) < 0)
1122 GT->DBGFN (GT, "error sending QRT patch");
1123 return;
1126 size -= send_size;
1127 p += send_size;
1131 static BOOL update_qr_table (TCPC *c)
1133 size_t size;
1134 int version;
1135 uint8_t *table;
1136 GtNode *node = GT_NODE(c);
1138 assert (node->state & GT_NODE_CONNECTED);
1140 table = gt_query_router_self (&size, &version);
1142 /* we may not have finished building a table yet */
1143 if (!table)
1144 return TRUE;
1146 /* dont submit a table if this node is already up to date */
1147 if (node->query_router_counter == version)
1148 return TRUE;
1150 /* HACK: this shouldn't be using compressed_slots */
1151 submit_table (c, table, size, compressed_slots);
1153 /* store the version number of this table so
1154 * we dont resubmit unecessarily */
1155 node->query_router_counter = version;
1157 return TRUE;
1160 static BOOL submit_first_table (TCPC *c)
1162 GtNode *node = GT_NODE(c);
1164 assert (node->state & GT_NODE_CONNECTED);
1166 update_qr_table (c);
1168 /* remove the first timer */
1169 timer_remove (node->query_route_timer);
1171 /* set the timer for updating the table repeatedly */
1172 node->query_route_timer = timer_add (QRT_UPDATE_INTERVAL,
1173 (TimerCallback)update_qr_table, c);
1175 /* we removed the timer, and must return FALSE */
1176 return FALSE;
1179 /*
1180 * Submit the query routing table for this node to another.
1182 * This delays sending the table for while. This helps preserve our precious
1183 * upstream when we're looking for nodes to connect to, as this often
1184 * happens when we're in the middle of looking for more nodes.
1185 */
1186 void query_route_table_submit (TCPC *c)
1188 GtNode *node = GT_NODE(c);
1190 assert (node->query_route_timer == 0);
1192 /* save bandwidth with an empty table */
1193 submit_empty_table (c);
1195 /* submit a real table later */
1196 node->query_route_timer = timer_add (QRT_SUBMIT_DELAY,
1197 (TimerCallback)submit_first_table, c);
1200 /*****************************************************************************/
1201 /* TESTING */
1203 #if 0
1204 #define CHECK(x) do { \
1205 if (!(x)) printf("FAILED: %s\n", #x); \
1206 else printf("OK: %s\n", #x); \
1207 } while (0)
1209 #define HASH(str, bits) \
1210 printf ("hash " str ": %u\n", gt_query_router_hash_str (str, bits))
1212 int main (int argc, char **argv)
1214 #define hash gt_query_router_hash_str
1216 CHECK(hash("", 13)==0);
1217 CHECK(hash("eb", 13)==6791);
1218 CHECK(hash("ebc", 13)==7082);
1219 CHECK(hash("ebck", 13)==6698);
1220 CHECK(hash("ebckl", 13)==3179);
1221 CHECK(hash("ebcklm", 13)==3235);
1222 CHECK(hash("ebcklme", 13)==6438);
1223 CHECK(hash("ebcklmen", 13)==1062);
1224 CHECK(hash("ebcklmenq", 13)==3527);
1225 CHECK(hash("", 16)==0);
1226 CHECK(hash("n", 16)==65003);
1227 CHECK(hash("nd", 16)==54193);
1228 CHECK(hash("ndf", 16)==4953);
1229 CHECK(hash("ndfl", 16)==58201);
1230 CHECK(hash("ndfla", 16)==34830);
1231 CHECK(hash("ndflal", 16)==36910);
1232 CHECK(hash("ndflale", 16)==34586);
1233 CHECK(hash("ndflalem", 16)==37658);
1234 CHECK(hash("FAIL", 16)==37458); // WILL FAIL
1235 CHECK(hash("ndflaleme", 16)==45559);
1236 CHECK(hash("ol2j34lj", 10)==318);
1237 CHECK(hash("asdfas23", 10)==503);
1238 CHECK(hash("9um3o34fd", 10)==758);
1239 CHECK(hash("a234d", 10)==281);
1240 CHECK(hash("a3f", 10)==767);
1241 CHECK(hash("3nja9", 10)==581);
1242 CHECK(hash("2459345938032343", 10)==146);
1243 CHECK(hash("7777a88a8a8a8", 10)==342);
1244 CHECK(hash("asdfjklkj3k", 10)==861);
1245 CHECK(hash("adfk32l", 10)==1011);
1246 CHECK(hash("zzzzzzzzzzz", 10)==944);
1248 CHECK(hash("3nja9", 10)==581);
1249 CHECK(hash("3NJA9", 10)==581);
1250 CHECK(hash("3nJa9", 10)==581);
1252 printf ("hash(FAIL, 16) = %u\n", hash("FAIL", 16));
1253 return 0;
1255 #endif