rev |
line source |
paulo@0
|
1 /*
|
paulo@0
|
2 * $Id: gt_query_route.c,v 1.46 2004/04/05 07:56:54 hipnod Exp $
|
paulo@0
|
3 *
|
paulo@0
|
4 * Copyright (C) 2001-2003 giFT project (gift.sourceforge.net)
|
paulo@0
|
5 *
|
paulo@0
|
6 * This program is free software; you can redistribute it and/or modify it
|
paulo@0
|
7 * under the terms of the GNU General Public License as published by the
|
paulo@0
|
8 * Free Software Foundation; either version 2, or (at your option) any
|
paulo@0
|
9 * later version.
|
paulo@0
|
10 *
|
paulo@0
|
11 * This program is distributed in the hope that it will be useful, but
|
paulo@0
|
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
|
paulo@0
|
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
paulo@0
|
14 * General Public License for more details.
|
paulo@0
|
15 */
|
paulo@0
|
16
|
paulo@0
|
17 #include "gt_gnutella.h"
|
paulo@0
|
18
|
paulo@0
|
19 #include "gt_query_route.h"
|
paulo@0
|
20 #include "gt_packet.h"
|
paulo@0
|
21 #include "gt_utils.h"
|
paulo@0
|
22
|
paulo@0
|
23 #include "gt_node.h"
|
paulo@0
|
24 #include "gt_node_list.h"
|
paulo@0
|
25
|
paulo@0
|
26 #include <libgift/stopwatch.h>
|
paulo@0
|
27
|
paulo@0
|
28 #include <zlib.h>
|
paulo@0
|
29
|
paulo@0
|
30 /*****************************************************************************/
|
paulo@0
|
31
|
paulo@0
|
32 /*
|
paulo@0
|
33 * TODO:
|
paulo@0
|
34 * - compact table to bit-level representation
|
paulo@0
|
35 * - support incremental updates of table
|
paulo@0
|
36 * - cut-off entries when MAX_FILL_RATIO is reached
|
paulo@0
|
37 */
|
paulo@0
|
38 #define MIN_TABLE_BITS (16) /* 16 bits */
|
paulo@0
|
39 #define MAX_TABLE_BITS (21) /* 21 bits */
|
paulo@0
|
40 #define MIN_TABLE_SIZE (1UL << (MIN_TABLE_BITS - 1)) /* 32k bytes */
|
paulo@0
|
41 #define MAX_TABLE_SIZE (1UL << (MAX_TABLE_BITS - 1)) /* 1M bytes */
|
paulo@0
|
42 #define MIN_TABLE_SLOTS (MIN_TABLE_SIZE * 2) /* 64k slots */
|
paulo@0
|
43 #define MAX_TABLE_SLOTS (MAX_TABLE_SIZE * 2) /* 2M slots */
|
paulo@0
|
44 #define INC_FILL_RATIO (0.70) /* 0.7% */
|
paulo@0
|
45 #define MAX_FILL_RATIO (1.00) /* 1% (unused)*/
|
paulo@0
|
46
|
paulo@0
|
47 /*
|
paulo@0
|
48 * magical constant necessary for the query routing hash function
|
paulo@0
|
49 */
|
paulo@0
|
50 #define MULTIPLIER 0x4F1BBCDC
|
paulo@0
|
51
|
paulo@0
|
52 /*
|
paulo@0
|
53 * How often to synchronize the QRT with ultrapeers.
|
paulo@0
|
54 *
|
paulo@0
|
55 * This is very big because we don't support incremental updating
|
paulo@0
|
56 * yet.
|
paulo@0
|
57 */
|
paulo@0
|
58 #define QRT_UPDATE_INTERVAL (20 * MINUTES)
|
paulo@0
|
59
|
paulo@0
|
60 /*
|
paulo@0
|
61 * How often we check to build a compressed patch of local shares.
|
paulo@0
|
62 */
|
paulo@0
|
63 #define QRT_BUILD_INTERVAL (3 * SECONDS)
|
paulo@0
|
64
|
paulo@0
|
65 /*
|
paulo@0
|
66 * How long to wait after the first query_route_table_submit() before
|
paulo@0
|
67 * actually submitting the table.
|
paulo@0
|
68 */
|
paulo@0
|
69 #define QRT_SUBMIT_DELAY (1 * MINUTES)
|
paulo@0
|
70
|
paulo@0
|
71 /*
|
paulo@0
|
72 * Largest hops value in table. It looks like Limewire hardcodes
|
paulo@0
|
73 * this as 7, and won't understand any other value.
|
paulo@0
|
74 */
|
paulo@0
|
75 #define INFINITY 7
|
paulo@0
|
76
|
paulo@0
|
77 /*
|
paulo@0
|
78 * Constants for changing route table.
|
paulo@0
|
79 */
|
paulo@0
|
80 #define QRT_KEYWORD_ADD (0x0a) /* -6 */
|
paulo@0
|
81 #define QRT_KEYWORD_REMOVE (0x06) /* 6 */
|
paulo@0
|
82
|
paulo@0
|
83 /*
|
paulo@0
|
84 * The minimum length of a keyword
|
paulo@0
|
85 */
|
paulo@0
|
86 #define QRP_MIN_LENGTH 3
|
paulo@0
|
87
|
paulo@0
|
88 /*
|
paulo@0
|
89 * Maximum patch fragment size to send
|
paulo@0
|
90 */
|
paulo@0
|
91 #define PATCH_FRAGSIZE 2048
|
paulo@0
|
92
|
paulo@0
|
93 /*
|
paulo@0
|
94 * Number of bits in the patches we send
|
paulo@0
|
95 */
|
paulo@0
|
96 #define PATCH_BITS 4
|
paulo@0
|
97
|
paulo@0
|
98 /*
|
paulo@0
|
99 * Holds a 32-bit index describing each token this node shares.
|
paulo@0
|
100 */
|
paulo@0
|
101 struct qrp_route_entry
|
paulo@0
|
102 {
|
paulo@0
|
103 int ref; /* number of references to this index */
|
paulo@0
|
104 uint32_t index; /* the actual position */
|
paulo@0
|
105 };
|
paulo@0
|
106
|
paulo@0
|
107 struct qrp_route_table
|
paulo@0
|
108 {
|
paulo@0
|
109 uint8_t *table;
|
paulo@0
|
110 size_t bits;
|
paulo@0
|
111 size_t size;
|
paulo@0
|
112 size_t slots;
|
paulo@0
|
113 size_t present;
|
paulo@0
|
114 size_t shared;
|
paulo@0
|
115 };
|
paulo@0
|
116
|
paulo@0
|
117 /*****************************************************************************/
|
paulo@0
|
118
|
paulo@0
|
119 /*
|
paulo@0
|
120 * The set of indices that are currently marked "present" in the
|
paulo@0
|
121 * routing table.
|
paulo@0
|
122 */
|
paulo@0
|
123 static Dataset *indices;
|
paulo@0
|
124
|
paulo@0
|
125 /*
|
paulo@0
|
126 * This holds the compressed table as a full QRP patch (incremental
|
paulo@0
|
127 * updates not supported yet).
|
paulo@0
|
128 */
|
paulo@0
|
129 static uint8_t *compressed_table;
|
paulo@0
|
130 static size_t compressed_slots;
|
paulo@0
|
131 static size_t compressed_size;
|
paulo@0
|
132
|
paulo@0
|
133 /*
|
paulo@0
|
134 * Keeps track of how many times the compressed table has changed.
|
paulo@0
|
135 * Used to avoid sending updates when not necessary.
|
paulo@0
|
136 */
|
paulo@0
|
137 static int compressed_version;
|
paulo@0
|
138
|
paulo@0
|
139 /*
|
paulo@0
|
140 * Timer that builds the compressed patch for the table submitted
|
paulo@0
|
141 * to peers.
|
paulo@0
|
142 */
|
paulo@0
|
143 static timer_id build_timer;
|
paulo@0
|
144
|
paulo@0
|
145 /*
|
paulo@0
|
146 * Whether we have to rebuild the table when shares are done
|
paulo@0
|
147 * syncing.
|
paulo@0
|
148 */
|
paulo@0
|
149 static BOOL table_changed;
|
paulo@0
|
150
|
paulo@0
|
151 /*
|
paulo@0
|
152 * The full query-routing table in binary form. This will get
|
paulo@0
|
153 * compressed before transmission.
|
paulo@0
|
154 */
|
paulo@0
|
155 static struct qrp_route_table *route_table;
|
paulo@0
|
156
|
paulo@0
|
157 /*****************************************************************************/
|
paulo@0
|
158
|
paulo@0
|
159 /* hash function used for query-routing */
|
paulo@0
|
160 uint32_t gt_query_router_hash_str (char *str, size_t bits)
|
paulo@0
|
161 {
|
paulo@0
|
162 uint32_t hash;
|
paulo@0
|
163 unsigned int i;
|
paulo@0
|
164 unsigned char c;
|
paulo@0
|
165
|
paulo@0
|
166 i = hash = 0;
|
paulo@0
|
167
|
paulo@0
|
168 while ((c = *str++) && !isspace (c))
|
paulo@0
|
169 {
|
paulo@0
|
170 hash ^= tolower (c) << (i * 8);
|
paulo@0
|
171
|
paulo@0
|
172 /* using & instead of %, sorry */
|
paulo@0
|
173 i = (i+1) & 0x03;
|
paulo@0
|
174 }
|
paulo@0
|
175
|
paulo@0
|
176 return (hash * MULTIPLIER) >> (32 - bits);
|
paulo@0
|
177 }
|
paulo@0
|
178
|
paulo@0
|
179 /*****************************************************************************/
|
paulo@0
|
180
|
paulo@0
|
181 static struct qrp_route_table *qrp_route_table_new (size_t bits)
|
paulo@0
|
182 {
|
paulo@0
|
183 struct qrp_route_table *qrt;
|
paulo@0
|
184
|
paulo@0
|
185 if (!(qrt = MALLOC (sizeof (struct qrp_route_table))))
|
paulo@0
|
186 return NULL;
|
paulo@0
|
187
|
paulo@0
|
188 qrt->bits = bits;
|
paulo@0
|
189 qrt->size = (1UL << (bits - 1));
|
paulo@0
|
190 qrt->slots = qrt->size * 2; /* 4-bit entries only */
|
paulo@0
|
191
|
paulo@0
|
192 if (!(qrt->table = MALLOC (qrt->size)))
|
paulo@0
|
193 {
|
paulo@0
|
194 free (qrt);
|
paulo@0
|
195 return NULL;
|
paulo@0
|
196 }
|
paulo@0
|
197
|
paulo@0
|
198 return qrt;
|
paulo@0
|
199 }
|
paulo@0
|
200
|
paulo@0
|
201 static void qrp_route_table_free (struct qrp_route_table *qrt)
|
paulo@0
|
202 {
|
paulo@0
|
203 if (!qrt)
|
paulo@0
|
204 return;
|
paulo@0
|
205
|
paulo@0
|
206 free (qrt->table);
|
paulo@0
|
207 free (qrt);
|
paulo@0
|
208 }
|
paulo@0
|
209
|
paulo@0
|
210 static void qrp_route_table_insert (struct qrp_route_table *qrt, uint32_t index)
|
paulo@0
|
211 {
|
paulo@0
|
212 uint8_t old_entry;
|
paulo@0
|
213 int set_lower;
|
paulo@0
|
214 int entry;
|
paulo@0
|
215
|
paulo@0
|
216 if (!qrt)
|
paulo@0
|
217 return;
|
paulo@0
|
218
|
paulo@0
|
219 assert (index < qrt->size * 2);
|
paulo@0
|
220
|
paulo@0
|
221 set_lower = index % 2;
|
paulo@0
|
222 entry = index / 2;
|
paulo@0
|
223
|
paulo@0
|
224 if (set_lower)
|
paulo@0
|
225 {
|
paulo@0
|
226 old_entry = qrt->table[entry] & 0x0f;
|
paulo@0
|
227 qrt->table[entry] = (qrt->table[entry] & 0xf0) |
|
paulo@0
|
228 ((QRT_KEYWORD_ADD) & 0x0f);
|
paulo@0
|
229 }
|
paulo@0
|
230 else
|
paulo@0
|
231 {
|
paulo@0
|
232 old_entry = (qrt->table[entry] & 0xf0) >> 4;
|
paulo@0
|
233 qrt->table[entry] = (qrt->table[entry] & 0x0f) |
|
paulo@0
|
234 ((QRT_KEYWORD_ADD << 4) & 0xf0);
|
paulo@0
|
235 }
|
paulo@0
|
236
|
paulo@0
|
237 assert (old_entry == 0 || old_entry == QRT_KEYWORD_REMOVE);
|
paulo@0
|
238 #if 0
|
paulo@0
|
239 GT->dbg (GT, "+%u [%d/%d]", index, entry, set_lower);
|
paulo@0
|
240 #endif
|
paulo@0
|
241
|
paulo@0
|
242 qrt->present++;
|
paulo@0
|
243 }
|
paulo@0
|
244
|
paulo@0
|
245 #if 0
|
paulo@0
|
246 /* untested */
|
paulo@0
|
247 static void qrp_route_table_remove (struct qrp_route_table *qrt, uint32_t index)
|
paulo@0
|
248 {
|
paulo@0
|
249 uint8_t old_entry;
|
paulo@0
|
250 int clear_lower;
|
paulo@0
|
251 int entry;
|
paulo@0
|
252
|
paulo@0
|
253 if (!qrt)
|
paulo@0
|
254 return;
|
paulo@0
|
255
|
paulo@0
|
256 assert (index < qrt->size * 2);
|
paulo@0
|
257
|
paulo@0
|
258 clear_lower = index % 2;
|
paulo@0
|
259 entry = index / 2;
|
paulo@0
|
260
|
paulo@0
|
261 /*
|
paulo@0
|
262 * This needs to change when doing incremental updating...
|
paulo@0
|
263 */
|
paulo@0
|
264
|
paulo@0
|
265 if (clear_lower)
|
paulo@0
|
266 {
|
paulo@0
|
267 old_entry = qrt->table[entry] & 0x0f;
|
paulo@0
|
268 qrt->table[entry] = (qrt->table[entry] & 0xf0) |
|
paulo@0
|
269 ((QRT_KEYWORD_REMOVE) & 0x0f);
|
paulo@0
|
270 }
|
paulo@0
|
271 else
|
paulo@0
|
272 {
|
paulo@0
|
273 old_entry = (qrt->table[entry] & 0xf0) >> 4;
|
paulo@0
|
274 qrt->table[entry] = (qrt->table[entry] & 0x0f) |
|
paulo@0
|
275 ((QRT_KEYWORD_REMOVE << 4) & 0xf0);
|
paulo@0
|
276 }
|
paulo@0
|
277
|
paulo@0
|
278 assert (old_entry == (uint8_t) QRT_KEYWORD_ADD);
|
paulo@0
|
279 #if 0
|
paulo@0
|
280 GT->dbg (GT, "-%u [%d/%d]", index, entry, clear_lower);
|
paulo@0
|
281 #endif
|
paulo@0
|
282
|
paulo@0
|
283 qrt->present--;
|
paulo@0
|
284 }
|
paulo@0
|
285 #endif
|
paulo@0
|
286
|
paulo@0
|
287 static BOOL qrp_route_table_lookup (struct qrp_route_table *qrt, uint32_t index)
|
paulo@0
|
288 {
|
paulo@0
|
289 int check_lower;
|
paulo@0
|
290 uint32_t entry;
|
paulo@0
|
291
|
paulo@0
|
292 if (!qrt)
|
paulo@0
|
293 return FALSE;
|
paulo@0
|
294
|
paulo@0
|
295 assert (index < qrt->slots);
|
paulo@0
|
296 assert (qrt->slots == qrt->size * 2);
|
paulo@0
|
297
|
paulo@0
|
298 check_lower = index % 2;
|
paulo@0
|
299 entry = index / 2;
|
paulo@0
|
300
|
paulo@0
|
301 if (check_lower)
|
paulo@0
|
302 {
|
paulo@0
|
303 if ((qrt->table[entry] & 0x0f) == QRT_KEYWORD_ADD)
|
paulo@0
|
304 return TRUE;
|
paulo@0
|
305 }
|
paulo@0
|
306 else
|
paulo@0
|
307 {
|
paulo@0
|
308 if (((qrt->table[entry] & 0xf0) >> 4) == QRT_KEYWORD_ADD)
|
paulo@0
|
309 return TRUE;
|
paulo@0
|
310 }
|
paulo@0
|
311
|
paulo@0
|
312 return FALSE;
|
paulo@0
|
313 }
|
paulo@0
|
314
|
paulo@0
|
315 static double qrp_route_table_fill_ratio (struct qrp_route_table *qrt)
|
paulo@0
|
316 {
|
paulo@0
|
317 return (double)qrt->present * 100 / qrt->slots;
|
paulo@0
|
318 }
|
paulo@0
|
319
|
paulo@0
|
320 /*****************************************************************************/
|
paulo@0
|
321
|
paulo@0
|
322 static char *zlib_strerror (int error)
|
paulo@0
|
323 {
|
paulo@0
|
324 switch (error)
|
paulo@0
|
325 {
|
paulo@0
|
326 case Z_OK: return "OK";
|
paulo@0
|
327 case Z_STREAM_END: return "End of stream";
|
paulo@0
|
328 case Z_NEED_DICT: return "Decompressing dictionary needed";
|
paulo@0
|
329 case Z_STREAM_ERROR: return "Stream error";
|
paulo@0
|
330 case Z_ERRNO: return "Generic zlib error";
|
paulo@0
|
331 case Z_DATA_ERROR: return "Data error";
|
paulo@0
|
332 case Z_MEM_ERROR: return "Memory error";
|
paulo@0
|
333 case Z_BUF_ERROR: return "Buffer error";
|
paulo@0
|
334 case Z_VERSION_ERROR: return "Incompatible runtime zlib library";
|
paulo@0
|
335 default: break;
|
paulo@0
|
336 }
|
paulo@0
|
337
|
paulo@0
|
338 return "Invalid zlib error code";
|
paulo@0
|
339 }
|
paulo@0
|
340
|
paulo@0
|
341 /* TODO: make this use a stream-like interface */
|
paulo@0
|
342 static uint8_t *compress_table (uint8_t *table, size_t in_size, size_t *out_size)
|
paulo@0
|
343 {
|
paulo@0
|
344 z_streamp out;
|
paulo@0
|
345 int ret;
|
paulo@0
|
346 uint8_t *out_buf;
|
paulo@0
|
347 int free_size;
|
paulo@0
|
348
|
paulo@0
|
349 *out_size = 0;
|
paulo@0
|
350
|
paulo@0
|
351 if (!(out = MALLOC (sizeof(*out))))
|
paulo@0
|
352 return NULL;
|
paulo@0
|
353
|
paulo@0
|
354 out->zalloc = Z_NULL;
|
paulo@0
|
355 out->zfree = Z_NULL;
|
paulo@0
|
356 out->opaque = Z_NULL;
|
paulo@0
|
357
|
paulo@0
|
358 if ((ret = deflateInit (out, Z_DEFAULT_COMPRESSION)) != Z_OK)
|
paulo@0
|
359 {
|
paulo@0
|
360 GT->DBGFN (GT, "deflateInit error: %s", zlib_strerror (ret));
|
paulo@0
|
361 free (out);
|
paulo@0
|
362 return NULL;
|
paulo@0
|
363 }
|
paulo@0
|
364
|
paulo@0
|
365 /* allocate initial buffer */
|
paulo@0
|
366 free_size = in_size + in_size / 100;
|
paulo@0
|
367
|
paulo@0
|
368 if (!(out_buf = malloc (free_size)))
|
paulo@0
|
369 {
|
paulo@0
|
370 free (out_buf);
|
paulo@0
|
371 deflateEnd (out);
|
paulo@0
|
372 free (out);
|
paulo@0
|
373 return NULL;
|
paulo@0
|
374 }
|
paulo@0
|
375
|
paulo@0
|
376 out->next_in = table;
|
paulo@0
|
377 out->avail_in = in_size;
|
paulo@0
|
378 out->next_out = out_buf;
|
paulo@0
|
379 out->avail_out = free_size;
|
paulo@0
|
380
|
paulo@0
|
381 if ((ret = deflate (out, Z_FINISH)) != Z_STREAM_END)
|
paulo@0
|
382 {
|
paulo@0
|
383 GT->DBGFN (GT, "compression error: %s", zlib_strerror (ret));
|
paulo@0
|
384 free (out_buf);
|
paulo@0
|
385 deflateEnd (out);
|
paulo@0
|
386 free (out);
|
paulo@0
|
387 return NULL;
|
paulo@0
|
388 }
|
paulo@0
|
389
|
paulo@0
|
390 /*
|
paulo@0
|
391 * This could theoretically fail I guess. If it does, we shouldn't keep
|
paulo@0
|
392 * the table at least.
|
paulo@0
|
393 */
|
paulo@0
|
394 assert (out->avail_in == 0);
|
paulo@0
|
395
|
paulo@0
|
396 *out_size = free_size - out->avail_out;
|
paulo@0
|
397
|
paulo@0
|
398 deflateEnd (out);
|
paulo@0
|
399 free (out);
|
paulo@0
|
400
|
paulo@0
|
401 return out_buf;
|
paulo@0
|
402 }
|
paulo@0
|
403
|
paulo@0
|
404 #if 0
|
paulo@0
|
405 /* send the a QRP table to nodes we haven't sent a real table yet */
|
paulo@0
|
406 static GtNode *update_nodes (TCPC *c, GtNode *node, void *udata)
|
paulo@0
|
407 {
|
paulo@0
|
408 assert (node->state == GT_NODE_CONNECTED);
|
paulo@0
|
409 assert (GT_CONN(node) == c);
|
paulo@0
|
410
|
paulo@0
|
411 /*
|
paulo@0
|
412 * If the counter is not 0, we sent a table to this node already.
|
paulo@0
|
413 * So, wait for the timer to pick that up.
|
paulo@0
|
414 */
|
paulo@0
|
415 if (node->query_router_counter != 0)
|
paulo@0
|
416 return NULL;
|
paulo@0
|
417
|
paulo@0
|
418 /* submit the table */
|
paulo@0
|
419 query_route_table_submit (c);
|
paulo@0
|
420
|
paulo@0
|
421 /* reset the submit timer */
|
paulo@0
|
422 if (GT_NODE(c)->query_route_timer != 0)
|
paulo@0
|
423 timer_reset (GT_NODE(c)->query_route_timer);
|
paulo@0
|
424
|
paulo@0
|
425 return NULL;
|
paulo@0
|
426 }
|
paulo@0
|
427 #endif
|
paulo@0
|
428
|
paulo@0
|
429 static void add_index (ds_data_t *key, ds_data_t *value,
|
paulo@0
|
430 struct qrp_route_table *qrt)
|
paulo@0
|
431 {
|
paulo@0
|
432 struct qrp_route_entry *entry = value->data;
|
paulo@0
|
433 uint32_t slot;
|
paulo@0
|
434
|
paulo@0
|
435 /* grab only the most significant bits of the entry */
|
paulo@0
|
436 slot = entry->index >> (32 - qrt->bits);
|
paulo@0
|
437
|
paulo@0
|
438 /*
|
paulo@0
|
439 * If the entry already exists in the table, bump shared entries and
|
paulo@0
|
440 * forget about this entry.
|
paulo@0
|
441 */
|
paulo@0
|
442 if (qrp_route_table_lookup (qrt, slot))
|
paulo@0
|
443 {
|
paulo@0
|
444 qrt->shared++;
|
paulo@0
|
445 return;
|
paulo@0
|
446 }
|
paulo@0
|
447
|
paulo@0
|
448 qrp_route_table_insert (qrt, slot);
|
paulo@0
|
449 }
|
paulo@0
|
450
|
paulo@0
|
451 static void build_uncompressed (struct qrp_route_table *qrt)
|
paulo@0
|
452 {
|
paulo@0
|
453 dataset_foreach (indices, DS_FOREACH(add_index), qrt);
|
paulo@0
|
454 }
|
paulo@0
|
455
|
paulo@0
|
456 static int build_qrp_table (void *udata)
|
paulo@0
|
457 {
|
paulo@0
|
458 uint8_t *new_table;
|
paulo@0
|
459 size_t new_size;
|
paulo@0
|
460 StopWatch *sw;
|
paulo@0
|
461 double elapsed;
|
paulo@0
|
462 double fill_ratio;
|
paulo@0
|
463
|
paulo@0
|
464 if (!route_table && !(route_table = qrp_route_table_new (MIN_TABLE_BITS)))
|
paulo@0
|
465 {
|
paulo@0
|
466 /* try again later */
|
paulo@0
|
467 return TRUE;
|
paulo@0
|
468 }
|
paulo@0
|
469
|
paulo@0
|
470 sw = stopwatch_new (TRUE);
|
paulo@0
|
471
|
paulo@0
|
472 /* build a table from the indices */
|
paulo@0
|
473 build_uncompressed (route_table);
|
paulo@0
|
474
|
paulo@0
|
475 stopwatch_stop (sw);
|
paulo@0
|
476
|
paulo@0
|
477 elapsed = stopwatch_elapsed (sw, NULL);
|
paulo@0
|
478
|
paulo@0
|
479 fill_ratio = qrp_route_table_fill_ratio (route_table);
|
paulo@0
|
480
|
paulo@0
|
481 GT->DBGFN (GT, "%.4lfs elapsed building", elapsed);
|
paulo@0
|
482 GT->DBGFN (GT, "present=%u shared=%u size=%u", route_table->present,
|
paulo@0
|
483 route_table->shared, route_table->size);
|
paulo@0
|
484 GT->DBGFN (GT, "fill ratio=%.4lf%%", fill_ratio);
|
paulo@0
|
485
|
paulo@0
|
486 /*
|
paulo@0
|
487 * If the fill ratio is greater than an acceptable threshold,
|
paulo@0
|
488 * and we haven't reached the maximum table size allowed,
|
paulo@0
|
489 * rebuild a larger routing table.
|
paulo@0
|
490 */
|
paulo@0
|
491 if (fill_ratio >= INC_FILL_RATIO && route_table->bits < MAX_TABLE_BITS)
|
paulo@0
|
492 {
|
paulo@0
|
493 struct qrp_route_table *new_table;
|
paulo@0
|
494
|
paulo@0
|
495 /*
|
paulo@0
|
496 * If we don't have enough memory to build the new table, fall
|
paulo@0
|
497 * through and compress the existing table. This would only happen
|
paulo@0
|
498 * if this node has a very small amount of memory.
|
paulo@0
|
499 */
|
paulo@0
|
500 if ((new_table = qrp_route_table_new (route_table->bits + 1)))
|
paulo@0
|
501 {
|
paulo@0
|
502 qrp_route_table_free (route_table);
|
paulo@0
|
503 route_table = new_table;
|
paulo@0
|
504
|
paulo@0
|
505 /* retry the build later, it's kinda expensive */
|
paulo@0
|
506 stopwatch_free (sw);
|
paulo@0
|
507 return TRUE;
|
paulo@0
|
508 }
|
paulo@0
|
509 }
|
paulo@0
|
510
|
paulo@0
|
511 stopwatch_start (sw);
|
paulo@0
|
512
|
paulo@0
|
513 /* compress a new table */
|
paulo@0
|
514 new_table = compress_table (route_table->table,
|
paulo@0
|
515 route_table->size,
|
paulo@0
|
516 &new_size);
|
paulo@0
|
517
|
paulo@0
|
518 elapsed = stopwatch_free_elapsed (sw);
|
paulo@0
|
519
|
paulo@0
|
520 GT->DBGFN (GT, "%.4lfs elapsed compressing", elapsed);
|
paulo@0
|
521 GT->DBGFN (GT, "compressed size=%lu", new_size);
|
paulo@0
|
522
|
paulo@0
|
523 if (!new_table)
|
paulo@0
|
524 return TRUE;
|
paulo@0
|
525
|
paulo@0
|
526 assert (new_size > 0);
|
paulo@0
|
527
|
paulo@0
|
528 /*
|
paulo@0
|
529 * Replace the old compressed table
|
paulo@0
|
530 */
|
paulo@0
|
531 free (compressed_table);
|
paulo@0
|
532
|
paulo@0
|
533 compressed_table = new_table;
|
paulo@0
|
534 compressed_size = new_size;
|
paulo@0
|
535 compressed_slots = route_table->slots;
|
paulo@0
|
536
|
paulo@0
|
537 compressed_version++;
|
paulo@0
|
538
|
paulo@0
|
539 if (!compressed_version)
|
paulo@0
|
540 compressed_version++;
|
paulo@0
|
541
|
paulo@0
|
542 /*
|
paulo@0
|
543 * An optimization to reduce memory usage: realloc the
|
paulo@0
|
544 * compressed table to the smaller size.
|
paulo@0
|
545 */
|
paulo@0
|
546 if ((new_table = realloc (new_table, new_size)))
|
paulo@0
|
547 compressed_table = new_table;
|
paulo@0
|
548
|
paulo@0
|
549 #if 0
|
paulo@0
|
550 /* update nodes with this table */
|
paulo@0
|
551 gt_conn_foreach (GT_CONN_FOREACH(update_nodes), NULL,
|
paulo@0
|
552 GT_NODE_ULTRA, GT_NODE_CONNECTED, 0);
|
paulo@0
|
553 #endif
|
paulo@0
|
554
|
paulo@0
|
555 /*
|
paulo@0
|
556 * Temporary optimization: we can free the uncompressed
|
paulo@0
|
557 * route table now, as it is unused. This is a dubious optimization
|
paulo@0
|
558 * though because the table will probably hang around in
|
paulo@0
|
559 * the future when incremental updating works.
|
paulo@0
|
560 */
|
paulo@0
|
561 qrp_route_table_free (route_table);
|
paulo@0
|
562 route_table = NULL;
|
paulo@0
|
563
|
paulo@0
|
564 /* remove the timer, as the table is now up to date */
|
paulo@0
|
565 build_timer = 0;
|
paulo@0
|
566 return FALSE;
|
paulo@0
|
567 }
|
paulo@0
|
568
|
paulo@0
|
569 static int start_build (void *udata)
|
paulo@0
|
570 {
|
paulo@0
|
571 build_timer = timer_add (QRT_BUILD_INTERVAL,
|
paulo@0
|
572 (TimerCallback)build_qrp_table, NULL);
|
paulo@0
|
573 return FALSE;
|
paulo@0
|
574 }
|
paulo@0
|
575
|
paulo@0
|
576 static void start_build_timer (void)
|
paulo@0
|
577 {
|
paulo@0
|
578 if (build_timer)
|
paulo@0
|
579 return;
|
paulo@0
|
580
|
paulo@0
|
581 /*
|
paulo@0
|
582 * If we don't have a compressed table, we haven't built
|
paulo@0
|
583 * the table before, so build it soon. Otherwise,
|
paulo@0
|
584 * we won't submit it for a while anyway, so build it
|
paulo@0
|
585 * at half the update interval.
|
paulo@0
|
586 */
|
paulo@0
|
587 if (compressed_table)
|
paulo@0
|
588 {
|
paulo@0
|
589 build_timer = timer_add (QRT_UPDATE_INTERVAL / 2,
|
paulo@0
|
590 (TimerCallback)start_build, NULL);
|
paulo@0
|
591 return;
|
paulo@0
|
592 }
|
paulo@0
|
593
|
paulo@0
|
594 start_build (NULL);
|
paulo@0
|
595 }
|
paulo@0
|
596
|
paulo@0
|
597 /*****************************************************************************/
|
paulo@0
|
598
|
paulo@0
|
599 /* TODO: this should be moved to GT_SELF */
|
paulo@0
|
600 uint8_t *gt_query_router_self (size_t *size, int *version)
|
paulo@0
|
601 {
|
paulo@0
|
602 if (!compressed_table)
|
paulo@0
|
603 return NULL;
|
paulo@0
|
604
|
paulo@0
|
605 assert (size != NULL && version != NULL);
|
paulo@0
|
606
|
paulo@0
|
607 *size = compressed_size;
|
paulo@0
|
608 *version = compressed_version;
|
paulo@0
|
609
|
paulo@0
|
610 return compressed_table;
|
paulo@0
|
611 }
|
paulo@0
|
612
|
paulo@0
|
613 static int free_entries (ds_data_t *key, ds_data_t *value, void *udata)
|
paulo@0
|
614 {
|
paulo@0
|
615 struct qrp_route_entry *entry = value->data;
|
paulo@0
|
616
|
paulo@0
|
617 free (entry);
|
paulo@0
|
618
|
paulo@0
|
619 return DS_CONTINUE | DS_REMOVE;
|
paulo@0
|
620 }
|
paulo@0
|
621
|
paulo@0
|
622 void gt_query_router_self_destroy (void)
|
paulo@0
|
623 {
|
paulo@0
|
624 timer_remove_zero (&build_timer);
|
paulo@0
|
625
|
paulo@0
|
626 qrp_route_table_free (route_table);
|
paulo@0
|
627 route_table = NULL;
|
paulo@0
|
628
|
paulo@0
|
629 free (compressed_table);
|
paulo@0
|
630 compressed_table = NULL;
|
paulo@0
|
631 compressed_slots = 0;
|
paulo@0
|
632 compressed_size = 0;
|
paulo@0
|
633 compressed_version = 0;
|
paulo@0
|
634
|
paulo@0
|
635 dataset_foreach_ex (indices, DS_FOREACH_EX(free_entries), NULL);
|
paulo@0
|
636 dataset_clear (indices);
|
paulo@0
|
637 indices = NULL;
|
paulo@0
|
638 }
|
paulo@0
|
639
|
paulo@0
|
640 void gt_query_router_self_init (void)
|
paulo@0
|
641 {
|
paulo@0
|
642 indices = dataset_new (DATASET_HASH);
|
paulo@0
|
643 }
|
paulo@0
|
644
|
paulo@0
|
645 /*****************************************************************************/
|
paulo@0
|
646
|
paulo@0
|
647 static uint32_t *append_token (uint32_t *tokens, size_t *len,
|
paulo@0
|
648 size_t pos, uint32_t tok)
|
paulo@0
|
649 {
|
paulo@0
|
650 if (pos >= *len)
|
paulo@0
|
651 {
|
paulo@0
|
652 uint32_t *new_tokens;
|
paulo@0
|
653
|
paulo@0
|
654 *(len) += 8;
|
paulo@0
|
655 new_tokens = realloc (tokens, *len * sizeof (uint32_t));
|
paulo@0
|
656
|
paulo@0
|
657 assert (new_tokens != NULL);
|
paulo@0
|
658 tokens = new_tokens;
|
paulo@0
|
659 }
|
paulo@0
|
660
|
paulo@0
|
661 tokens[pos] = tok;
|
paulo@0
|
662 return tokens;
|
paulo@0
|
663 }
|
paulo@0
|
664
|
paulo@0
|
665 static uint32_t *tokenize (char *hpath, size_t *r_len)
|
paulo@0
|
666 {
|
paulo@0
|
667 uint32_t *tokens;
|
paulo@0
|
668 int count;
|
paulo@0
|
669 size_t len;
|
paulo@0
|
670 char *str, *str0;
|
paulo@0
|
671 char *next;
|
paulo@0
|
672
|
paulo@0
|
673 if (!(str0 = str = STRDUP (hpath)))
|
paulo@0
|
674 return NULL;
|
paulo@0
|
675
|
paulo@0
|
676 tokens = NULL;
|
paulo@0
|
677 len = 0;
|
paulo@0
|
678 count = 0;
|
paulo@0
|
679
|
paulo@0
|
680 while ((next = string_sep_set (&str, QRP_DELIMITERS)) != NULL)
|
paulo@0
|
681 {
|
paulo@0
|
682 uint32_t tok;
|
paulo@0
|
683
|
paulo@0
|
684 if (string_isempty (next))
|
paulo@0
|
685 continue;
|
paulo@0
|
686
|
paulo@0
|
687 /* don't add keywords that are too small */
|
paulo@0
|
688 if (strlen (next) < QRP_MIN_LENGTH)
|
paulo@0
|
689 continue;
|
paulo@0
|
690
|
paulo@0
|
691 tok = gt_query_router_hash_str (next, 32);
|
paulo@0
|
692 tokens = append_token (tokens, &len, count++, tok);
|
paulo@0
|
693 }
|
paulo@0
|
694
|
paulo@0
|
695 *r_len = count;
|
paulo@0
|
696
|
paulo@0
|
697 free (str0);
|
paulo@0
|
698
|
paulo@0
|
699 return tokens;
|
paulo@0
|
700 }
|
paulo@0
|
701
|
paulo@0
|
702 void gt_query_router_self_add (FileShare *file)
|
paulo@0
|
703 {
|
paulo@0
|
704 uint32_t *tokens, *tokens0;
|
paulo@0
|
705 uint32_t tok;
|
paulo@0
|
706 size_t len;
|
paulo@0
|
707 int i;
|
paulo@0
|
708 struct qrp_route_entry *entry;
|
paulo@0
|
709
|
paulo@0
|
710 tokens0 = tokens = tokenize (share_get_hpath (file), &len);
|
paulo@0
|
711
|
paulo@0
|
712 assert (tokens != NULL);
|
paulo@0
|
713 assert (len > 0);
|
paulo@0
|
714
|
paulo@0
|
715 for (i = 0; i < len; i++)
|
paulo@0
|
716 {
|
paulo@0
|
717 tok = tokens[i];
|
paulo@0
|
718
|
paulo@0
|
719 if ((entry = dataset_lookup (indices, &tok, sizeof (tok))))
|
paulo@0
|
720 {
|
paulo@0
|
721 entry->ref++;
|
paulo@0
|
722 continue;
|
paulo@0
|
723 }
|
paulo@0
|
724
|
paulo@0
|
725 /*
|
paulo@0
|
726 * Create a new index and add it to the table.
|
paulo@0
|
727 */
|
paulo@0
|
728 if (!(entry = malloc (sizeof (struct qrp_route_entry))))
|
paulo@0
|
729 continue;
|
paulo@0
|
730
|
paulo@0
|
731 entry->ref = 1;
|
paulo@0
|
732 entry->index = tok;
|
paulo@0
|
733
|
paulo@0
|
734 dataset_insert (&indices, &tok, sizeof (tok), entry, 0);
|
paulo@0
|
735
|
paulo@0
|
736 table_changed = TRUE;
|
paulo@0
|
737 }
|
paulo@0
|
738
|
paulo@0
|
739 free (tokens0);
|
paulo@0
|
740 }
|
paulo@0
|
741
|
paulo@0
|
742 void gt_query_router_self_remove (FileShare *file)
|
paulo@0
|
743 {
|
paulo@0
|
744 uint32_t *tokens, *tokens0;
|
paulo@0
|
745 uint32_t tok;
|
paulo@0
|
746 size_t len;
|
paulo@0
|
747 int i;
|
paulo@0
|
748 struct qrp_route_entry *entry;
|
paulo@0
|
749
|
paulo@0
|
750 tokens0 = tokens = tokenize (share_get_hpath (file), &len);
|
paulo@0
|
751
|
paulo@0
|
752 assert (tokens != NULL);
|
paulo@0
|
753 assert (len > 0);
|
paulo@0
|
754
|
paulo@0
|
755 for (i = 0; i < len; i++)
|
paulo@0
|
756 {
|
paulo@0
|
757 tok = tokens[i];
|
paulo@0
|
758
|
paulo@0
|
759 entry = dataset_lookup (indices, &tok, sizeof (tok));
|
paulo@0
|
760 assert (entry != NULL);
|
paulo@0
|
761
|
paulo@0
|
762 if (--entry->ref > 0)
|
paulo@0
|
763 continue;
|
paulo@0
|
764
|
paulo@0
|
765 dataset_remove (indices, &tok, sizeof (tok));
|
paulo@0
|
766
|
paulo@0
|
767 table_changed = TRUE;
|
paulo@0
|
768 }
|
paulo@0
|
769
|
paulo@0
|
770 free (tokens0);
|
paulo@0
|
771 }
|
paulo@0
|
772
|
paulo@0
|
773 void gt_query_router_self_sync (BOOL begin)
|
paulo@0
|
774 {
|
paulo@0
|
775 if (!begin && table_changed)
|
paulo@0
|
776 {
|
paulo@0
|
777 start_build_timer();
|
paulo@0
|
778 table_changed = FALSE;
|
paulo@0
|
779 }
|
paulo@0
|
780 }
|
paulo@0
|
781
|
paulo@0
|
782 /*****************************************************************************/
|
paulo@0
|
783
|
paulo@0
|
784 int query_patch_open (GtQueryRouter *router, int seq_size, int compressed,
|
paulo@0
|
785 size_t max_size)
|
paulo@0
|
786 {
|
paulo@0
|
787 GtQueryPatch *new_patch;
|
paulo@0
|
788
|
paulo@0
|
789 if (!(new_patch = malloc (sizeof (GtQueryPatch))))
|
paulo@0
|
790 return FALSE;
|
paulo@0
|
791
|
paulo@0
|
792 memset (new_patch, 0, sizeof (GtQueryPatch));
|
paulo@0
|
793
|
paulo@0
|
794 if (!(new_patch->stream = zlib_stream_open (max_size)))
|
paulo@0
|
795 {
|
paulo@0
|
796 free (new_patch);
|
paulo@0
|
797 return FALSE;
|
paulo@0
|
798 }
|
paulo@0
|
799
|
paulo@0
|
800 new_patch->seq_size = seq_size;
|
paulo@0
|
801 new_patch->compressed = compressed;
|
paulo@0
|
802
|
paulo@0
|
803 /* NOTE: sequence is 1-based, not 0-based */
|
paulo@0
|
804 new_patch->seq_num = 1;
|
paulo@0
|
805
|
paulo@0
|
806 router->patch = new_patch;
|
paulo@0
|
807
|
paulo@0
|
808 return TRUE;
|
paulo@0
|
809 }
|
paulo@0
|
810
|
paulo@0
|
811 void query_patch_close (GtQueryRouter *router)
|
paulo@0
|
812 {
|
paulo@0
|
813 GtQueryPatch *patch;
|
paulo@0
|
814
|
paulo@0
|
815 GT->DBGFN (GT, "entered");
|
paulo@0
|
816
|
paulo@0
|
817 if (!router)
|
paulo@0
|
818 return;
|
paulo@0
|
819
|
paulo@0
|
820 patch = router->patch;
|
paulo@0
|
821
|
paulo@0
|
822 if (!patch)
|
paulo@0
|
823 return;
|
paulo@0
|
824
|
paulo@0
|
825 zlib_stream_close (patch->stream);
|
paulo@0
|
826
|
paulo@0
|
827 router->patch = NULL;
|
paulo@0
|
828 free (patch);
|
paulo@0
|
829 }
|
paulo@0
|
830
|
paulo@0
|
831 /* TODO: compact router tables to bit-level */
|
paulo@0
|
832 static void query_patch_apply (GtQueryRouter *router, int bits, char *data,
|
paulo@0
|
833 size_t data_size)
|
paulo@0
|
834 {
|
paulo@0
|
835 GtQueryPatch *patch;
|
paulo@0
|
836 char *table; /* NOTE: this must be signed */
|
paulo@0
|
837 int i;
|
paulo@0
|
838
|
paulo@0
|
839 patch = router->patch;
|
paulo@0
|
840 assert (patch != NULL);
|
paulo@0
|
841
|
paulo@0
|
842 /* check for overflow: this may look wrong but its not */
|
paulo@0
|
843 if (patch->table_pos + (data_size - 1) >= router->size)
|
paulo@0
|
844 {
|
paulo@0
|
845 GT->DBGFN (GT, "patch overflow: %u (max of %u)",
|
paulo@0
|
846 patch->table_pos+data_size, router->size);
|
paulo@0
|
847 query_patch_close (router);
|
paulo@0
|
848 return;
|
paulo@0
|
849 }
|
paulo@0
|
850
|
paulo@0
|
851 table = router->table;
|
paulo@0
|
852
|
paulo@0
|
853 /* hrm */
|
paulo@0
|
854 if (bits == 4)
|
paulo@0
|
855 {
|
paulo@0
|
856 int j;
|
paulo@0
|
857
|
paulo@0
|
858 for (i = 0; i < data_size; i++)
|
paulo@0
|
859 {
|
paulo@0
|
860 int pos;
|
paulo@0
|
861 char change;
|
paulo@0
|
862
|
paulo@0
|
863 pos = i + patch->table_pos;
|
paulo@0
|
864
|
paulo@0
|
865 /* avoid % */
|
paulo@0
|
866 j = (i+1) & 0x1;
|
paulo@0
|
867
|
paulo@0
|
868 /* grab the correct half of the byte and sign-extend it
|
paulo@0
|
869 * NOTE: this starts off with the most significant bits! */
|
paulo@0
|
870 change = data[i] & (0x0f << (4 * j));
|
paulo@0
|
871
|
paulo@0
|
872 /* move to least significant bits
|
paulo@0
|
873 * TODO: does this do sign-extension correctly? */
|
paulo@0
|
874 change >>= 4;
|
paulo@0
|
875
|
paulo@0
|
876 table[pos] += change;
|
paulo@0
|
877 }
|
paulo@0
|
878 }
|
paulo@0
|
879 else if (bits == 8)
|
paulo@0
|
880 {
|
paulo@0
|
881 /* untested */
|
paulo@0
|
882 for (i = 0; i < data_size; i++)
|
paulo@0
|
883 {
|
paulo@0
|
884 table[i + patch->table_pos] += data[i];
|
paulo@0
|
885 }
|
paulo@0
|
886 }
|
paulo@0
|
887 else
|
paulo@0
|
888 {
|
paulo@0
|
889 GT->DBGFN (GT, "undefined bits value in query patch: %u", bits);
|
paulo@0
|
890 query_patch_close (router);
|
paulo@0
|
891 return;
|
paulo@0
|
892 }
|
paulo@0
|
893
|
paulo@0
|
894 /* store the table position for the next patch */
|
paulo@0
|
895 patch->table_pos += i;
|
paulo@0
|
896
|
paulo@0
|
897 /* cleanup the data if the patch is done */
|
paulo@0
|
898 patch->seq_num++;
|
paulo@0
|
899
|
paulo@0
|
900 if (patch->seq_num > patch->seq_size)
|
paulo@0
|
901 query_patch_close (router);
|
paulo@0
|
902 }
|
paulo@0
|
903
|
paulo@0
|
904 /*****************************************************************************/
|
paulo@0
|
905
|
paulo@0
|
906 /* TODO: compact router tables to bit-level */
|
paulo@0
|
907 GtQueryRouter *gt_query_router_new (size_t size, int infinity)
|
paulo@0
|
908 {
|
paulo@0
|
909 GtQueryRouter *router;
|
paulo@0
|
910
|
paulo@0
|
911 if (size > MAX_TABLE_SIZE)
|
paulo@0
|
912 return NULL;
|
paulo@0
|
913
|
paulo@0
|
914 if (!(router = malloc (sizeof (GtQueryRouter))))
|
paulo@0
|
915 return NULL;
|
paulo@0
|
916
|
paulo@0
|
917 memset (router, 0, sizeof (GtQueryRouter));
|
paulo@0
|
918
|
paulo@0
|
919 if (!(router->table = malloc (size)))
|
paulo@0
|
920 {
|
paulo@0
|
921 free (router->table);
|
paulo@0
|
922 return NULL;
|
paulo@0
|
923 }
|
paulo@0
|
924
|
paulo@0
|
925 memset (router->table, infinity, size);
|
paulo@0
|
926
|
paulo@0
|
927 router->size = size;
|
paulo@0
|
928
|
paulo@0
|
929 return router;
|
paulo@0
|
930 }
|
paulo@0
|
931
|
paulo@0
|
932 void gt_query_router_free (GtQueryRouter *router)
|
paulo@0
|
933 {
|
paulo@0
|
934 if (!router)
|
paulo@0
|
935 return;
|
paulo@0
|
936
|
paulo@0
|
937 query_patch_close (router);
|
paulo@0
|
938
|
paulo@0
|
939 free (router->table);
|
paulo@0
|
940 free (router);
|
paulo@0
|
941 }
|
paulo@0
|
942
|
paulo@0
|
943 /*****************************************************************************/
|
paulo@0
|
944
|
paulo@0
|
945 static void print_hex (unsigned char *data, size_t size)
|
paulo@0
|
946 {
|
paulo@0
|
947 fprint_hex (stdout, data, size);
|
paulo@0
|
948 }
|
paulo@0
|
949
|
paulo@0
|
950 void gt_query_router_update (GtQueryRouter *router, size_t seq_num,
|
paulo@0
|
951 size_t seq_size, int compressed, int bits,
|
paulo@0
|
952 unsigned char *zdata, size_t size)
|
paulo@0
|
953 {
|
paulo@0
|
954 GtQueryPatch *patch;
|
paulo@0
|
955 char *data;
|
paulo@0
|
956
|
paulo@0
|
957 if (!router)
|
paulo@0
|
958 {
|
paulo@0
|
959 GT->DBGFN (GT, "null router");
|
paulo@0
|
960 return;
|
paulo@0
|
961 }
|
paulo@0
|
962
|
paulo@0
|
963 if (!router->patch)
|
paulo@0
|
964 {
|
paulo@0
|
965 if (!query_patch_open (router, seq_size, compressed, router->size))
|
paulo@0
|
966 return;
|
paulo@0
|
967 }
|
paulo@0
|
968
|
paulo@0
|
969 patch = router->patch;
|
paulo@0
|
970
|
paulo@0
|
971 /* check for an invalid sequence number or size */
|
paulo@0
|
972 if (patch->seq_size != seq_size || patch->seq_num != seq_num)
|
paulo@0
|
973 {
|
paulo@0
|
974 GT->DBGFN (GT, "bad patch: seq_size %u vs %u, seq_num %u vs %u",
|
paulo@0
|
975 patch->seq_size, seq_size, patch->seq_num, seq_num);
|
paulo@0
|
976 query_patch_close (router);
|
paulo@0
|
977 return;
|
paulo@0
|
978 }
|
paulo@0
|
979
|
paulo@0
|
980 if (compressed != patch->compressed)
|
paulo@0
|
981 {
|
paulo@0
|
982 GT->DBGFN (GT, "tried to change encodings in patch");
|
paulo@0
|
983 query_patch_close (router);
|
paulo@0
|
984 return;
|
paulo@0
|
985 }
|
paulo@0
|
986
|
paulo@0
|
987 switch (compressed)
|
paulo@0
|
988 {
|
paulo@0
|
989 case 0x00: /* no compression */
|
paulo@0
|
990 if (!zlib_stream_write (patch->stream, zdata, size))
|
paulo@0
|
991 {
|
paulo@0
|
992 GT->DBGFN (GT, "error copying data");
|
paulo@0
|
993 query_patch_close (router);
|
paulo@0
|
994 return;
|
paulo@0
|
995 }
|
paulo@0
|
996
|
paulo@0
|
997 break;
|
paulo@0
|
998
|
paulo@0
|
999 case 0x01: /* deflate */
|
paulo@0
|
1000 printf ("zlib compressed data:\n");
|
paulo@0
|
1001 print_hex (zdata, size);
|
paulo@0
|
1002
|
paulo@0
|
1003 if (!zlib_stream_inflate (patch->stream, zdata, size))
|
paulo@0
|
1004 {
|
paulo@0
|
1005 GT->DBGFN (GT, "error inflating data");
|
paulo@0
|
1006 query_patch_close (router);
|
paulo@0
|
1007 return;
|
paulo@0
|
1008 }
|
paulo@0
|
1009
|
paulo@0
|
1010 break;
|
paulo@0
|
1011
|
paulo@0
|
1012 default:
|
paulo@0
|
1013 GT->DBGFN (GT, "unknown compression algorithm in query route patch");
|
paulo@0
|
1014 return;
|
paulo@0
|
1015 }
|
paulo@0
|
1016
|
paulo@0
|
1017 /* read the data in the stream */
|
paulo@0
|
1018 if (!(size = zlib_stream_read (patch->stream, &data)))
|
paulo@0
|
1019 {
|
paulo@0
|
1020 GT->DBGFN (GT, "error calling zlib_stream_read");
|
paulo@0
|
1021 query_patch_close (router);
|
paulo@0
|
1022 return;
|
paulo@0
|
1023 }
|
paulo@0
|
1024
|
paulo@0
|
1025 printf ("after uncompressing:\n");
|
paulo@0
|
1026 print_hex (data, size);
|
paulo@0
|
1027
|
paulo@0
|
1028 /* apply the patch -- this will cleanup any data if necessary */
|
paulo@0
|
1029 query_patch_apply (router, bits, data, size);
|
paulo@0
|
1030
|
paulo@0
|
1031 print_hex (router->table, router->size);
|
paulo@0
|
1032 }
|
paulo@0
|
1033
|
paulo@0
|
1034 /*****************************************************************************/
|
paulo@0
|
1035
|
paulo@0
|
1036 static void submit_empty_table (TCPC *c)
|
paulo@0
|
1037 {
|
paulo@0
|
1038 static char table[8] = { 0 };
|
paulo@0
|
1039 int len;
|
paulo@0
|
1040
|
paulo@0
|
1041 #if 0
|
paulo@0
|
1042 size_t size;
|
paulo@0
|
1043 #endif
|
paulo@0
|
1044
|
paulo@0
|
1045 GT->DBGFN (GT, "reseting route table for %s", net_ip_str (GT_NODE(c)->ip));
|
paulo@0
|
1046
|
paulo@0
|
1047 /* all slots in the table should be initialized to infinity, so it
|
paulo@0
|
1048 * should be "empty" on the remote node */
|
paulo@0
|
1049 memset (table, 0, sizeof (table));
|
paulo@0
|
1050
|
paulo@0
|
1051
|
paulo@0
|
1052 #if 0
|
paulo@0
|
1053 /* TEST: set part of the table to -infinity to get queries */
|
paulo@0
|
1054 size = sizeof (table);
|
paulo@0
|
1055 memset (table + (size + 1) / 2 - 1, -infinity, (size + 1) / 4);
|
paulo@0
|
1056 #endif
|
paulo@0
|
1057
|
paulo@0
|
1058 /* format: <query-route-msg-type> <length of table> <infinity> */
|
paulo@0
|
1059 if (gt_packet_send_fmt (c, GT_MSG_QUERY_ROUTE, NULL, 1, 0,
|
paulo@0
|
1060 "%c%lu%c", 0, (unsigned long) sizeof (table),
|
paulo@0
|
1061 INFINITY) < 0)
|
paulo@0
|
1062 {
|
paulo@0
|
1063 GT->DBGFN (GT, "error reseting table");
|
paulo@0
|
1064 return;
|
paulo@0
|
1065 }
|
paulo@0
|
1066
|
paulo@0
|
1067 len = sizeof (table);
|
paulo@0
|
1068
|
paulo@0
|
1069 if (gt_packet_send_fmt (c, GT_MSG_QUERY_ROUTE, NULL, 1, 0,
|
paulo@0
|
1070 "%c%c%c%c%c%*p",
|
paulo@0
|
1071 1, 1, 1, 0, 8, len, table) < 0)
|
paulo@0
|
1072 {
|
paulo@0
|
1073 GT->DBGFN (GT, "error sending empty patch");
|
paulo@0
|
1074 return;
|
paulo@0
|
1075 }
|
paulo@0
|
1076 }
|
paulo@0
|
1077
|
paulo@0
|
1078 static void submit_table (TCPC *c, uint8_t *table, size_t size, size_t slots)
|
paulo@0
|
1079 {
|
paulo@0
|
1080 int infinity = INFINITY;
|
paulo@0
|
1081 int seq_size;
|
paulo@0
|
1082 int compressed;
|
paulo@0
|
1083 int seq_num;
|
paulo@0
|
1084 uint8_t *p;
|
paulo@0
|
1085 size_t send_size;
|
paulo@0
|
1086
|
paulo@0
|
1087 /* XXX make table size settable at runtime */
|
paulo@0
|
1088
|
paulo@0
|
1089 /* send a reset table first */
|
paulo@0
|
1090 if (gt_packet_send_fmt (c, GT_MSG_QUERY_ROUTE, NULL, 1, 0,
|
paulo@0
|
1091 "%c%lu%c", 0, (long)slots, infinity) < 0)
|
paulo@0
|
1092 {
|
paulo@0
|
1093 GT->DBGFN (GT, "error reseting table");
|
paulo@0
|
1094 return;
|
paulo@0
|
1095 }
|
paulo@0
|
1096
|
paulo@0
|
1097 /* Break the table into PATCH_FRAGSIZE-sized chunks,
|
paulo@0
|
1098 * and include any leftover portions. */
|
paulo@0
|
1099 seq_size = size / PATCH_FRAGSIZE +
|
paulo@0
|
1100 (size % PATCH_FRAGSIZE == 0 ? 0 : 1);
|
paulo@0
|
1101
|
paulo@0
|
1102 assert (seq_size < 256);
|
paulo@0
|
1103 #if 0
|
paulo@0
|
1104 GT->dbg (GT, "sequence size: %u", seq_size);
|
paulo@0
|
1105 #endif
|
paulo@0
|
1106
|
paulo@0
|
1107 p = table;
|
paulo@0
|
1108 compressed = TRUE;
|
paulo@0
|
1109
|
paulo@0
|
1110 /* NOTE: patch sequence numbers start at 1 */
|
paulo@0
|
1111 for (seq_num = 1; seq_num <= seq_size; seq_num++)
|
paulo@0
|
1112 {
|
paulo@0
|
1113 send_size = MIN (PATCH_FRAGSIZE, size);
|
paulo@0
|
1114
|
paulo@0
|
1115 if (gt_packet_send_fmt (c, GT_MSG_QUERY_ROUTE, NULL, 1, 0,
|
paulo@0
|
1116 "%c%c%c%c%c%*p",
|
paulo@0
|
1117 /* QRP PATCH */ 1,
|
paulo@0
|
1118 seq_num, seq_size, compressed,
|
paulo@0
|
1119 /* bits */ PATCH_BITS,
|
paulo@0
|
1120 send_size, p) < 0)
|
paulo@0
|
1121 {
|
paulo@0
|
1122 GT->DBGFN (GT, "error sending QRT patch");
|
paulo@0
|
1123 return;
|
paulo@0
|
1124 }
|
paulo@0
|
1125
|
paulo@0
|
1126 size -= send_size;
|
paulo@0
|
1127 p += send_size;
|
paulo@0
|
1128 }
|
paulo@0
|
1129 }
|
paulo@0
|
1130
|
paulo@0
|
1131 static BOOL update_qr_table (TCPC *c)
|
paulo@0
|
1132 {
|
paulo@0
|
1133 size_t size;
|
paulo@0
|
1134 int version;
|
paulo@0
|
1135 uint8_t *table;
|
paulo@0
|
1136 GtNode *node = GT_NODE(c);
|
paulo@0
|
1137
|
paulo@0
|
1138 assert (node->state & GT_NODE_CONNECTED);
|
paulo@0
|
1139
|
paulo@0
|
1140 table = gt_query_router_self (&size, &version);
|
paulo@0
|
1141
|
paulo@0
|
1142 /* we may not have finished building a table yet */
|
paulo@0
|
1143 if (!table)
|
paulo@0
|
1144 return TRUE;
|
paulo@0
|
1145
|
paulo@0
|
1146 /* dont submit a table if this node is already up to date */
|
paulo@0
|
1147 if (node->query_router_counter == version)
|
paulo@0
|
1148 return TRUE;
|
paulo@0
|
1149
|
paulo@0
|
1150 /* HACK: this shouldn't be using compressed_slots */
|
paulo@0
|
1151 submit_table (c, table, size, compressed_slots);
|
paulo@0
|
1152
|
paulo@0
|
1153 /* store the version number of this table so
|
paulo@0
|
1154 * we dont resubmit unecessarily */
|
paulo@0
|
1155 node->query_router_counter = version;
|
paulo@0
|
1156
|
paulo@0
|
1157 return TRUE;
|
paulo@0
|
1158 }
|
paulo@0
|
1159
|
paulo@0
|
1160 static BOOL submit_first_table (TCPC *c)
|
paulo@0
|
1161 {
|
paulo@0
|
1162 GtNode *node = GT_NODE(c);
|
paulo@0
|
1163
|
paulo@0
|
1164 assert (node->state & GT_NODE_CONNECTED);
|
paulo@0
|
1165
|
paulo@0
|
1166 update_qr_table (c);
|
paulo@0
|
1167
|
paulo@0
|
1168 /* remove the first timer */
|
paulo@0
|
1169 timer_remove (node->query_route_timer);
|
paulo@0
|
1170
|
paulo@0
|
1171 /* set the timer for updating the table repeatedly */
|
paulo@0
|
1172 node->query_route_timer = timer_add (QRT_UPDATE_INTERVAL,
|
paulo@0
|
1173 (TimerCallback)update_qr_table, c);
|
paulo@0
|
1174
|
paulo@0
|
1175 /* we removed the timer, and must return FALSE */
|
paulo@0
|
1176 return FALSE;
|
paulo@0
|
1177 }
|
paulo@0
|
1178
|
paulo@0
|
1179 /*
|
paulo@0
|
1180 * Submit the query routing table for this node to another.
|
paulo@0
|
1181 *
|
paulo@0
|
1182 * This delays sending the table for while. This helps preserve our precious
|
paulo@0
|
1183 * upstream when we're looking for nodes to connect to, as this often
|
paulo@0
|
1184 * happens when we're in the middle of looking for more nodes.
|
paulo@0
|
1185 */
|
paulo@0
|
1186 void query_route_table_submit (TCPC *c)
|
paulo@0
|
1187 {
|
paulo@0
|
1188 GtNode *node = GT_NODE(c);
|
paulo@0
|
1189
|
paulo@0
|
1190 assert (node->query_route_timer == 0);
|
paulo@0
|
1191
|
paulo@0
|
1192 /* save bandwidth with an empty table */
|
paulo@0
|
1193 submit_empty_table (c);
|
paulo@0
|
1194
|
paulo@0
|
1195 /* submit a real table later */
|
paulo@0
|
1196 node->query_route_timer = timer_add (QRT_SUBMIT_DELAY,
|
paulo@0
|
1197 (TimerCallback)submit_first_table, c);
|
paulo@0
|
1198 }
|
paulo@0
|
1199
|
paulo@0
|
1200 /*****************************************************************************/
|
paulo@0
|
1201 /* TESTING */
|
paulo@0
|
1202
|
paulo@0
|
1203 #if 0
|
paulo@0
|
1204 #define CHECK(x) do { \
|
paulo@0
|
1205 if (!(x)) printf("FAILED: %s\n", #x); \
|
paulo@0
|
1206 else printf("OK: %s\n", #x); \
|
paulo@0
|
1207 } while (0)
|
paulo@0
|
1208
|
paulo@0
|
1209 #define HASH(str, bits) \
|
paulo@0
|
1210 printf ("hash " str ": %u\n", gt_query_router_hash_str (str, bits))
|
paulo@0
|
1211
|
paulo@0
|
1212 int main (int argc, char **argv)
|
paulo@0
|
1213 {
|
paulo@0
|
1214 #define hash gt_query_router_hash_str
|
paulo@0
|
1215
|
paulo@0
|
1216 CHECK(hash("", 13)==0);
|
paulo@0
|
1217 CHECK(hash("eb", 13)==6791);
|
paulo@0
|
1218 CHECK(hash("ebc", 13)==7082);
|
paulo@0
|
1219 CHECK(hash("ebck", 13)==6698);
|
paulo@0
|
1220 CHECK(hash("ebckl", 13)==3179);
|
paulo@0
|
1221 CHECK(hash("ebcklm", 13)==3235);
|
paulo@0
|
1222 CHECK(hash("ebcklme", 13)==6438);
|
paulo@0
|
1223 CHECK(hash("ebcklmen", 13)==1062);
|
paulo@0
|
1224 CHECK(hash("ebcklmenq", 13)==3527);
|
paulo@0
|
1225 CHECK(hash("", 16)==0);
|
paulo@0
|
1226 CHECK(hash("n", 16)==65003);
|
paulo@0
|
1227 CHECK(hash("nd", 16)==54193);
|
paulo@0
|
1228 CHECK(hash("ndf", 16)==4953);
|
paulo@0
|
1229 CHECK(hash("ndfl", 16)==58201);
|
paulo@0
|
1230 CHECK(hash("ndfla", 16)==34830);
|
paulo@0
|
1231 CHECK(hash("ndflal", 16)==36910);
|
paulo@0
|
1232 CHECK(hash("ndflale", 16)==34586);
|
paulo@0
|
1233 CHECK(hash("ndflalem", 16)==37658);
|
paulo@0
|
1234 CHECK(hash("FAIL", 16)==37458); // WILL FAIL
|
paulo@0
|
1235 CHECK(hash("ndflaleme", 16)==45559);
|
paulo@0
|
1236 CHECK(hash("ol2j34lj", 10)==318);
|
paulo@0
|
1237 CHECK(hash("asdfas23", 10)==503);
|
paulo@0
|
1238 CHECK(hash("9um3o34fd", 10)==758);
|
paulo@0
|
1239 CHECK(hash("a234d", 10)==281);
|
paulo@0
|
1240 CHECK(hash("a3f", 10)==767);
|
paulo@0
|
1241 CHECK(hash("3nja9", 10)==581);
|
paulo@0
|
1242 CHECK(hash("2459345938032343", 10)==146);
|
paulo@0
|
1243 CHECK(hash("7777a88a8a8a8", 10)==342);
|
paulo@0
|
1244 CHECK(hash("asdfjklkj3k", 10)==861);
|
paulo@0
|
1245 CHECK(hash("adfk32l", 10)==1011);
|
paulo@0
|
1246 CHECK(hash("zzzzzzzzzzz", 10)==944);
|
paulo@0
|
1247
|
paulo@0
|
1248 CHECK(hash("3nja9", 10)==581);
|
paulo@0
|
1249 CHECK(hash("3NJA9", 10)==581);
|
paulo@0
|
1250 CHECK(hash("3nJa9", 10)==581);
|
paulo@0
|
1251
|
paulo@0
|
1252 printf ("hash(FAIL, 16) = %u\n", hash("FAIL", 16));
|
paulo@0
|
1253 return 0;
|
paulo@0
|
1254 }
|
paulo@0
|
1255 #endif
|