paulo@0: /* paulo@0: * $Id: gt_query_route.c,v 1.46 2004/04/05 07:56:54 hipnod Exp $ paulo@0: * paulo@0: * Copyright (C) 2001-2003 giFT project (gift.sourceforge.net) paulo@0: * paulo@0: * This program is free software; you can redistribute it and/or modify it paulo@0: * under the terms of the GNU General Public License as published by the paulo@0: * Free Software Foundation; either version 2, or (at your option) any paulo@0: * later version. paulo@0: * paulo@0: * This program is distributed in the hope that it will be useful, but paulo@0: * WITHOUT ANY WARRANTY; without even the implied warranty of paulo@0: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU paulo@0: * General Public License for more details. paulo@0: */ paulo@0: paulo@0: #include "gt_gnutella.h" paulo@0: paulo@0: #include "gt_query_route.h" paulo@0: #include "gt_packet.h" paulo@0: #include "gt_utils.h" paulo@0: paulo@0: #include "gt_node.h" paulo@0: #include "gt_node_list.h" paulo@0: paulo@0: #include paulo@0: paulo@0: #include paulo@0: paulo@0: /*****************************************************************************/ paulo@0: paulo@0: /* paulo@0: * TODO: paulo@0: * - compact table to bit-level representation paulo@0: * - support incremental updates of table paulo@0: * - cut-off entries when MAX_FILL_RATIO is reached paulo@0: */ paulo@0: #define MIN_TABLE_BITS (16) /* 16 bits */ paulo@0: #define MAX_TABLE_BITS (21) /* 21 bits */ paulo@0: #define MIN_TABLE_SIZE (1UL << (MIN_TABLE_BITS - 1)) /* 32k bytes */ paulo@0: #define MAX_TABLE_SIZE (1UL << (MAX_TABLE_BITS - 1)) /* 1M bytes */ paulo@0: #define MIN_TABLE_SLOTS (MIN_TABLE_SIZE * 2) /* 64k slots */ paulo@0: #define MAX_TABLE_SLOTS (MAX_TABLE_SIZE * 2) /* 2M slots */ paulo@0: #define INC_FILL_RATIO (0.70) /* 0.7% */ paulo@0: #define MAX_FILL_RATIO (1.00) /* 1% (unused)*/ paulo@0: paulo@0: /* paulo@0: * magical constant necessary for the query routing hash function paulo@0: */ paulo@0: #define MULTIPLIER 0x4F1BBCDC paulo@0: paulo@0: /* paulo@0: * How often to synchronize the QRT with ultrapeers. paulo@0: * paulo@0: * This is very big because we don't support incremental updating paulo@0: * yet. paulo@0: */ paulo@0: #define QRT_UPDATE_INTERVAL (20 * MINUTES) paulo@0: paulo@0: /* paulo@0: * How often we check to build a compressed patch of local shares. paulo@0: */ paulo@0: #define QRT_BUILD_INTERVAL (3 * SECONDS) paulo@0: paulo@0: /* paulo@0: * How long to wait after the first query_route_table_submit() before paulo@0: * actually submitting the table. paulo@0: */ paulo@0: #define QRT_SUBMIT_DELAY (1 * MINUTES) paulo@0: paulo@0: /* paulo@0: * Largest hops value in table. It looks like Limewire hardcodes paulo@0: * this as 7, and won't understand any other value. paulo@0: */ paulo@0: #define INFINITY 7 paulo@0: paulo@0: /* paulo@0: * Constants for changing route table. paulo@0: */ paulo@0: #define QRT_KEYWORD_ADD (0x0a) /* -6 */ paulo@0: #define QRT_KEYWORD_REMOVE (0x06) /* 6 */ paulo@0: paulo@0: /* paulo@0: * The minimum length of a keyword paulo@0: */ paulo@0: #define QRP_MIN_LENGTH 3 paulo@0: paulo@0: /* paulo@0: * Maximum patch fragment size to send paulo@0: */ paulo@0: #define PATCH_FRAGSIZE 2048 paulo@0: paulo@0: /* paulo@0: * Number of bits in the patches we send paulo@0: */ paulo@0: #define PATCH_BITS 4 paulo@0: paulo@0: /* paulo@0: * Holds a 32-bit index describing each token this node shares. paulo@0: */ paulo@0: struct qrp_route_entry paulo@0: { paulo@0: int ref; /* number of references to this index */ paulo@0: uint32_t index; /* the actual position */ paulo@0: }; paulo@0: paulo@0: struct qrp_route_table paulo@0: { paulo@0: uint8_t *table; paulo@0: size_t bits; paulo@0: size_t size; paulo@0: size_t slots; paulo@0: size_t present; paulo@0: size_t shared; paulo@0: }; paulo@0: paulo@0: /*****************************************************************************/ paulo@0: paulo@0: /* paulo@0: * The set of indices that are currently marked "present" in the paulo@0: * routing table. paulo@0: */ paulo@0: static Dataset *indices; paulo@0: paulo@0: /* paulo@0: * This holds the compressed table as a full QRP patch (incremental paulo@0: * updates not supported yet). paulo@0: */ paulo@0: static uint8_t *compressed_table; paulo@0: static size_t compressed_slots; paulo@0: static size_t compressed_size; paulo@0: paulo@0: /* paulo@0: * Keeps track of how many times the compressed table has changed. paulo@0: * Used to avoid sending updates when not necessary. paulo@0: */ paulo@0: static int compressed_version; paulo@0: paulo@0: /* paulo@0: * Timer that builds the compressed patch for the table submitted paulo@0: * to peers. paulo@0: */ paulo@0: static timer_id build_timer; paulo@0: paulo@0: /* paulo@0: * Whether we have to rebuild the table when shares are done paulo@0: * syncing. paulo@0: */ paulo@0: static BOOL table_changed; paulo@0: paulo@0: /* paulo@0: * The full query-routing table in binary form. This will get paulo@0: * compressed before transmission. paulo@0: */ paulo@0: static struct qrp_route_table *route_table; paulo@0: paulo@0: /*****************************************************************************/ paulo@0: paulo@0: /* hash function used for query-routing */ paulo@0: uint32_t gt_query_router_hash_str (char *str, size_t bits) paulo@0: { paulo@0: uint32_t hash; paulo@0: unsigned int i; paulo@0: unsigned char c; paulo@0: paulo@0: i = hash = 0; paulo@0: paulo@0: while ((c = *str++) && !isspace (c)) paulo@0: { paulo@0: hash ^= tolower (c) << (i * 8); paulo@0: paulo@0: /* using & instead of %, sorry */ paulo@0: i = (i+1) & 0x03; paulo@0: } paulo@0: paulo@0: return (hash * MULTIPLIER) >> (32 - bits); paulo@0: } paulo@0: paulo@0: /*****************************************************************************/ paulo@0: paulo@0: static struct qrp_route_table *qrp_route_table_new (size_t bits) paulo@0: { paulo@0: struct qrp_route_table *qrt; paulo@0: paulo@0: if (!(qrt = MALLOC (sizeof (struct qrp_route_table)))) paulo@0: return NULL; paulo@0: paulo@0: qrt->bits = bits; paulo@0: qrt->size = (1UL << (bits - 1)); paulo@0: qrt->slots = qrt->size * 2; /* 4-bit entries only */ paulo@0: paulo@0: if (!(qrt->table = MALLOC (qrt->size))) paulo@0: { paulo@0: free (qrt); paulo@0: return NULL; paulo@0: } paulo@0: paulo@0: return qrt; paulo@0: } paulo@0: paulo@0: static void qrp_route_table_free (struct qrp_route_table *qrt) paulo@0: { paulo@0: if (!qrt) paulo@0: return; paulo@0: paulo@0: free (qrt->table); paulo@0: free (qrt); paulo@0: } paulo@0: paulo@0: static void qrp_route_table_insert (struct qrp_route_table *qrt, uint32_t index) paulo@0: { paulo@0: uint8_t old_entry; paulo@0: int set_lower; paulo@0: int entry; paulo@0: paulo@0: if (!qrt) paulo@0: return; paulo@0: paulo@0: assert (index < qrt->size * 2); paulo@0: paulo@0: set_lower = index % 2; paulo@0: entry = index / 2; paulo@0: paulo@0: if (set_lower) paulo@0: { paulo@0: old_entry = qrt->table[entry] & 0x0f; paulo@0: qrt->table[entry] = (qrt->table[entry] & 0xf0) | paulo@0: ((QRT_KEYWORD_ADD) & 0x0f); paulo@0: } paulo@0: else paulo@0: { paulo@0: old_entry = (qrt->table[entry] & 0xf0) >> 4; paulo@0: qrt->table[entry] = (qrt->table[entry] & 0x0f) | paulo@0: ((QRT_KEYWORD_ADD << 4) & 0xf0); paulo@0: } paulo@0: paulo@0: assert (old_entry == 0 || old_entry == QRT_KEYWORD_REMOVE); paulo@0: #if 0 paulo@0: GT->dbg (GT, "+%u [%d/%d]", index, entry, set_lower); paulo@0: #endif paulo@0: paulo@0: qrt->present++; paulo@0: } paulo@0: paulo@0: #if 0 paulo@0: /* untested */ paulo@0: static void qrp_route_table_remove (struct qrp_route_table *qrt, uint32_t index) paulo@0: { paulo@0: uint8_t old_entry; paulo@0: int clear_lower; paulo@0: int entry; paulo@0: paulo@0: if (!qrt) paulo@0: return; paulo@0: paulo@0: assert (index < qrt->size * 2); paulo@0: paulo@0: clear_lower = index % 2; paulo@0: entry = index / 2; paulo@0: paulo@0: /* paulo@0: * This needs to change when doing incremental updating... paulo@0: */ paulo@0: paulo@0: if (clear_lower) paulo@0: { paulo@0: old_entry = qrt->table[entry] & 0x0f; paulo@0: qrt->table[entry] = (qrt->table[entry] & 0xf0) | paulo@0: ((QRT_KEYWORD_REMOVE) & 0x0f); paulo@0: } paulo@0: else paulo@0: { paulo@0: old_entry = (qrt->table[entry] & 0xf0) >> 4; paulo@0: qrt->table[entry] = (qrt->table[entry] & 0x0f) | paulo@0: ((QRT_KEYWORD_REMOVE << 4) & 0xf0); paulo@0: } paulo@0: paulo@0: assert (old_entry == (uint8_t) QRT_KEYWORD_ADD); paulo@0: #if 0 paulo@0: GT->dbg (GT, "-%u [%d/%d]", index, entry, clear_lower); paulo@0: #endif paulo@0: paulo@0: qrt->present--; paulo@0: } paulo@0: #endif paulo@0: paulo@0: static BOOL qrp_route_table_lookup (struct qrp_route_table *qrt, uint32_t index) paulo@0: { paulo@0: int check_lower; paulo@0: uint32_t entry; paulo@0: paulo@0: if (!qrt) paulo@0: return FALSE; paulo@0: paulo@0: assert (index < qrt->slots); paulo@0: assert (qrt->slots == qrt->size * 2); paulo@0: paulo@0: check_lower = index % 2; paulo@0: entry = index / 2; paulo@0: paulo@0: if (check_lower) paulo@0: { paulo@0: if ((qrt->table[entry] & 0x0f) == QRT_KEYWORD_ADD) paulo@0: return TRUE; paulo@0: } paulo@0: else paulo@0: { paulo@0: if (((qrt->table[entry] & 0xf0) >> 4) == QRT_KEYWORD_ADD) paulo@0: return TRUE; paulo@0: } paulo@0: paulo@0: return FALSE; paulo@0: } paulo@0: paulo@0: static double qrp_route_table_fill_ratio (struct qrp_route_table *qrt) paulo@0: { paulo@0: return (double)qrt->present * 100 / qrt->slots; paulo@0: } paulo@0: paulo@0: /*****************************************************************************/ paulo@0: paulo@0: static char *zlib_strerror (int error) paulo@0: { paulo@0: switch (error) paulo@0: { paulo@0: case Z_OK: return "OK"; paulo@0: case Z_STREAM_END: return "End of stream"; paulo@0: case Z_NEED_DICT: return "Decompressing dictionary needed"; paulo@0: case Z_STREAM_ERROR: return "Stream error"; paulo@0: case Z_ERRNO: return "Generic zlib error"; paulo@0: case Z_DATA_ERROR: return "Data error"; paulo@0: case Z_MEM_ERROR: return "Memory error"; paulo@0: case Z_BUF_ERROR: return "Buffer error"; paulo@0: case Z_VERSION_ERROR: return "Incompatible runtime zlib library"; paulo@0: default: break; paulo@0: } paulo@0: paulo@0: return "Invalid zlib error code"; paulo@0: } paulo@0: paulo@0: /* TODO: make this use a stream-like interface */ paulo@0: static uint8_t *compress_table (uint8_t *table, size_t in_size, size_t *out_size) paulo@0: { paulo@0: z_streamp out; paulo@0: int ret; paulo@0: uint8_t *out_buf; paulo@0: int free_size; paulo@0: paulo@0: *out_size = 0; paulo@0: paulo@0: if (!(out = MALLOC (sizeof(*out)))) paulo@0: return NULL; paulo@0: paulo@0: out->zalloc = Z_NULL; paulo@0: out->zfree = Z_NULL; paulo@0: out->opaque = Z_NULL; paulo@0: paulo@0: if ((ret = deflateInit (out, Z_DEFAULT_COMPRESSION)) != Z_OK) paulo@0: { paulo@0: GT->DBGFN (GT, "deflateInit error: %s", zlib_strerror (ret)); paulo@0: free (out); paulo@0: return NULL; paulo@0: } paulo@0: paulo@0: /* allocate initial buffer */ paulo@0: free_size = in_size + in_size / 100; paulo@0: paulo@0: if (!(out_buf = malloc (free_size))) paulo@0: { paulo@0: free (out_buf); paulo@0: deflateEnd (out); paulo@0: free (out); paulo@0: return NULL; paulo@0: } paulo@0: paulo@0: out->next_in = table; paulo@0: out->avail_in = in_size; paulo@0: out->next_out = out_buf; paulo@0: out->avail_out = free_size; paulo@0: paulo@0: if ((ret = deflate (out, Z_FINISH)) != Z_STREAM_END) paulo@0: { paulo@0: GT->DBGFN (GT, "compression error: %s", zlib_strerror (ret)); paulo@0: free (out_buf); paulo@0: deflateEnd (out); paulo@0: free (out); paulo@0: return NULL; paulo@0: } paulo@0: paulo@0: /* paulo@0: * This could theoretically fail I guess. If it does, we shouldn't keep paulo@0: * the table at least. paulo@0: */ paulo@0: assert (out->avail_in == 0); paulo@0: paulo@0: *out_size = free_size - out->avail_out; paulo@0: paulo@0: deflateEnd (out); paulo@0: free (out); paulo@0: paulo@0: return out_buf; paulo@0: } paulo@0: paulo@0: #if 0 paulo@0: /* send the a QRP table to nodes we haven't sent a real table yet */ paulo@0: static GtNode *update_nodes (TCPC *c, GtNode *node, void *udata) paulo@0: { paulo@0: assert (node->state == GT_NODE_CONNECTED); paulo@0: assert (GT_CONN(node) == c); paulo@0: paulo@0: /* paulo@0: * If the counter is not 0, we sent a table to this node already. paulo@0: * So, wait for the timer to pick that up. paulo@0: */ paulo@0: if (node->query_router_counter != 0) paulo@0: return NULL; paulo@0: paulo@0: /* submit the table */ paulo@0: query_route_table_submit (c); paulo@0: paulo@0: /* reset the submit timer */ paulo@0: if (GT_NODE(c)->query_route_timer != 0) paulo@0: timer_reset (GT_NODE(c)->query_route_timer); paulo@0: paulo@0: return NULL; paulo@0: } paulo@0: #endif paulo@0: paulo@0: static void add_index (ds_data_t *key, ds_data_t *value, paulo@0: struct qrp_route_table *qrt) paulo@0: { paulo@0: struct qrp_route_entry *entry = value->data; paulo@0: uint32_t slot; paulo@0: paulo@0: /* grab only the most significant bits of the entry */ paulo@0: slot = entry->index >> (32 - qrt->bits); paulo@0: paulo@0: /* paulo@0: * If the entry already exists in the table, bump shared entries and paulo@0: * forget about this entry. paulo@0: */ paulo@0: if (qrp_route_table_lookup (qrt, slot)) paulo@0: { paulo@0: qrt->shared++; paulo@0: return; paulo@0: } paulo@0: paulo@0: qrp_route_table_insert (qrt, slot); paulo@0: } paulo@0: paulo@0: static void build_uncompressed (struct qrp_route_table *qrt) paulo@0: { paulo@0: dataset_foreach (indices, DS_FOREACH(add_index), qrt); paulo@0: } paulo@0: paulo@0: static int build_qrp_table (void *udata) paulo@0: { paulo@0: uint8_t *new_table; paulo@0: size_t new_size; paulo@0: StopWatch *sw; paulo@0: double elapsed; paulo@0: double fill_ratio; paulo@0: paulo@0: if (!route_table && !(route_table = qrp_route_table_new (MIN_TABLE_BITS))) paulo@0: { paulo@0: /* try again later */ paulo@0: return TRUE; paulo@0: } paulo@0: paulo@0: sw = stopwatch_new (TRUE); paulo@0: paulo@0: /* build a table from the indices */ paulo@0: build_uncompressed (route_table); paulo@0: paulo@0: stopwatch_stop (sw); paulo@0: paulo@0: elapsed = stopwatch_elapsed (sw, NULL); paulo@0: paulo@0: fill_ratio = qrp_route_table_fill_ratio (route_table); paulo@0: paulo@0: GT->DBGFN (GT, "%.4lfs elapsed building", elapsed); paulo@0: GT->DBGFN (GT, "present=%u shared=%u size=%u", route_table->present, paulo@0: route_table->shared, route_table->size); paulo@0: GT->DBGFN (GT, "fill ratio=%.4lf%%", fill_ratio); paulo@0: paulo@0: /* paulo@0: * If the fill ratio is greater than an acceptable threshold, paulo@0: * and we haven't reached the maximum table size allowed, paulo@0: * rebuild a larger routing table. paulo@0: */ paulo@0: if (fill_ratio >= INC_FILL_RATIO && route_table->bits < MAX_TABLE_BITS) paulo@0: { paulo@0: struct qrp_route_table *new_table; paulo@0: paulo@0: /* paulo@0: * If we don't have enough memory to build the new table, fall paulo@0: * through and compress the existing table. This would only happen paulo@0: * if this node has a very small amount of memory. paulo@0: */ paulo@0: if ((new_table = qrp_route_table_new (route_table->bits + 1))) paulo@0: { paulo@0: qrp_route_table_free (route_table); paulo@0: route_table = new_table; paulo@0: paulo@0: /* retry the build later, it's kinda expensive */ paulo@0: stopwatch_free (sw); paulo@0: return TRUE; paulo@0: } paulo@0: } paulo@0: paulo@0: stopwatch_start (sw); paulo@0: paulo@0: /* compress a new table */ paulo@0: new_table = compress_table (route_table->table, paulo@0: route_table->size, paulo@0: &new_size); paulo@0: paulo@0: elapsed = stopwatch_free_elapsed (sw); paulo@0: paulo@0: GT->DBGFN (GT, "%.4lfs elapsed compressing", elapsed); paulo@0: GT->DBGFN (GT, "compressed size=%lu", new_size); paulo@0: paulo@0: if (!new_table) paulo@0: return TRUE; paulo@0: paulo@0: assert (new_size > 0); paulo@0: paulo@0: /* paulo@0: * Replace the old compressed table paulo@0: */ paulo@0: free (compressed_table); paulo@0: paulo@0: compressed_table = new_table; paulo@0: compressed_size = new_size; paulo@0: compressed_slots = route_table->slots; paulo@0: paulo@0: compressed_version++; paulo@0: paulo@0: if (!compressed_version) paulo@0: compressed_version++; paulo@0: paulo@0: /* paulo@0: * An optimization to reduce memory usage: realloc the paulo@0: * compressed table to the smaller size. paulo@0: */ paulo@0: if ((new_table = realloc (new_table, new_size))) paulo@0: compressed_table = new_table; paulo@0: paulo@0: #if 0 paulo@0: /* update nodes with this table */ paulo@0: gt_conn_foreach (GT_CONN_FOREACH(update_nodes), NULL, paulo@0: GT_NODE_ULTRA, GT_NODE_CONNECTED, 0); paulo@0: #endif paulo@0: paulo@0: /* paulo@0: * Temporary optimization: we can free the uncompressed paulo@0: * route table now, as it is unused. This is a dubious optimization paulo@0: * though because the table will probably hang around in paulo@0: * the future when incremental updating works. paulo@0: */ paulo@0: qrp_route_table_free (route_table); paulo@0: route_table = NULL; paulo@0: paulo@0: /* remove the timer, as the table is now up to date */ paulo@0: build_timer = 0; paulo@0: return FALSE; paulo@0: } paulo@0: paulo@0: static int start_build (void *udata) paulo@0: { paulo@0: build_timer = timer_add (QRT_BUILD_INTERVAL, paulo@0: (TimerCallback)build_qrp_table, NULL); paulo@0: return FALSE; paulo@0: } paulo@0: paulo@0: static void start_build_timer (void) paulo@0: { paulo@0: if (build_timer) paulo@0: return; paulo@0: paulo@0: /* paulo@0: * If we don't have a compressed table, we haven't built paulo@0: * the table before, so build it soon. Otherwise, paulo@0: * we won't submit it for a while anyway, so build it paulo@0: * at half the update interval. paulo@0: */ paulo@0: if (compressed_table) paulo@0: { paulo@0: build_timer = timer_add (QRT_UPDATE_INTERVAL / 2, paulo@0: (TimerCallback)start_build, NULL); paulo@0: return; paulo@0: } paulo@0: paulo@0: start_build (NULL); paulo@0: } paulo@0: paulo@0: /*****************************************************************************/ paulo@0: paulo@0: /* TODO: this should be moved to GT_SELF */ paulo@0: uint8_t *gt_query_router_self (size_t *size, int *version) paulo@0: { paulo@0: if (!compressed_table) paulo@0: return NULL; paulo@0: paulo@0: assert (size != NULL && version != NULL); paulo@0: paulo@0: *size = compressed_size; paulo@0: *version = compressed_version; paulo@0: paulo@0: return compressed_table; paulo@0: } paulo@0: paulo@0: static int free_entries (ds_data_t *key, ds_data_t *value, void *udata) paulo@0: { paulo@0: struct qrp_route_entry *entry = value->data; paulo@0: paulo@0: free (entry); paulo@0: paulo@0: return DS_CONTINUE | DS_REMOVE; paulo@0: } paulo@0: paulo@0: void gt_query_router_self_destroy (void) paulo@0: { paulo@0: timer_remove_zero (&build_timer); paulo@0: paulo@0: qrp_route_table_free (route_table); paulo@0: route_table = NULL; paulo@0: paulo@0: free (compressed_table); paulo@0: compressed_table = NULL; paulo@0: compressed_slots = 0; paulo@0: compressed_size = 0; paulo@0: compressed_version = 0; paulo@0: paulo@0: dataset_foreach_ex (indices, DS_FOREACH_EX(free_entries), NULL); paulo@0: dataset_clear (indices); paulo@0: indices = NULL; paulo@0: } paulo@0: paulo@0: void gt_query_router_self_init (void) paulo@0: { paulo@0: indices = dataset_new (DATASET_HASH); paulo@0: } paulo@0: paulo@0: /*****************************************************************************/ paulo@0: paulo@0: static uint32_t *append_token (uint32_t *tokens, size_t *len, paulo@0: size_t pos, uint32_t tok) paulo@0: { paulo@0: if (pos >= *len) paulo@0: { paulo@0: uint32_t *new_tokens; paulo@0: paulo@0: *(len) += 8; paulo@0: new_tokens = realloc (tokens, *len * sizeof (uint32_t)); paulo@0: paulo@0: assert (new_tokens != NULL); paulo@0: tokens = new_tokens; paulo@0: } paulo@0: paulo@0: tokens[pos] = tok; paulo@0: return tokens; paulo@0: } paulo@0: paulo@0: static uint32_t *tokenize (char *hpath, size_t *r_len) paulo@0: { paulo@0: uint32_t *tokens; paulo@0: int count; paulo@0: size_t len; paulo@0: char *str, *str0; paulo@0: char *next; paulo@0: paulo@0: if (!(str0 = str = STRDUP (hpath))) paulo@0: return NULL; paulo@0: paulo@0: tokens = NULL; paulo@0: len = 0; paulo@0: count = 0; paulo@0: paulo@0: while ((next = string_sep_set (&str, QRP_DELIMITERS)) != NULL) paulo@0: { paulo@0: uint32_t tok; paulo@0: paulo@0: if (string_isempty (next)) paulo@0: continue; paulo@0: paulo@0: /* don't add keywords that are too small */ paulo@0: if (strlen (next) < QRP_MIN_LENGTH) paulo@0: continue; paulo@0: paulo@0: tok = gt_query_router_hash_str (next, 32); paulo@0: tokens = append_token (tokens, &len, count++, tok); paulo@0: } paulo@0: paulo@0: *r_len = count; paulo@0: paulo@0: free (str0); paulo@0: paulo@0: return tokens; paulo@0: } paulo@0: paulo@0: void gt_query_router_self_add (FileShare *file) paulo@0: { paulo@0: uint32_t *tokens, *tokens0; paulo@0: uint32_t tok; paulo@0: size_t len; paulo@0: int i; paulo@0: struct qrp_route_entry *entry; paulo@0: paulo@0: tokens0 = tokens = tokenize (share_get_hpath (file), &len); paulo@0: paulo@0: assert (tokens != NULL); paulo@0: assert (len > 0); paulo@0: paulo@0: for (i = 0; i < len; i++) paulo@0: { paulo@0: tok = tokens[i]; paulo@0: paulo@0: if ((entry = dataset_lookup (indices, &tok, sizeof (tok)))) paulo@0: { paulo@0: entry->ref++; paulo@0: continue; paulo@0: } paulo@0: paulo@0: /* paulo@0: * Create a new index and add it to the table. paulo@0: */ paulo@0: if (!(entry = malloc (sizeof (struct qrp_route_entry)))) paulo@0: continue; paulo@0: paulo@0: entry->ref = 1; paulo@0: entry->index = tok; paulo@0: paulo@0: dataset_insert (&indices, &tok, sizeof (tok), entry, 0); paulo@0: paulo@0: table_changed = TRUE; paulo@0: } paulo@0: paulo@0: free (tokens0); paulo@0: } paulo@0: paulo@0: void gt_query_router_self_remove (FileShare *file) paulo@0: { paulo@0: uint32_t *tokens, *tokens0; paulo@0: uint32_t tok; paulo@0: size_t len; paulo@0: int i; paulo@0: struct qrp_route_entry *entry; paulo@0: paulo@0: tokens0 = tokens = tokenize (share_get_hpath (file), &len); paulo@0: paulo@0: assert (tokens != NULL); paulo@0: assert (len > 0); paulo@0: paulo@0: for (i = 0; i < len; i++) paulo@0: { paulo@0: tok = tokens[i]; paulo@0: paulo@0: entry = dataset_lookup (indices, &tok, sizeof (tok)); paulo@0: assert (entry != NULL); paulo@0: paulo@0: if (--entry->ref > 0) paulo@0: continue; paulo@0: paulo@0: dataset_remove (indices, &tok, sizeof (tok)); paulo@0: paulo@0: table_changed = TRUE; paulo@0: } paulo@0: paulo@0: free (tokens0); paulo@0: } paulo@0: paulo@0: void gt_query_router_self_sync (BOOL begin) paulo@0: { paulo@0: if (!begin && table_changed) paulo@0: { paulo@0: start_build_timer(); paulo@0: table_changed = FALSE; paulo@0: } paulo@0: } paulo@0: paulo@0: /*****************************************************************************/ paulo@0: paulo@0: int query_patch_open (GtQueryRouter *router, int seq_size, int compressed, paulo@0: size_t max_size) paulo@0: { paulo@0: GtQueryPatch *new_patch; paulo@0: paulo@0: if (!(new_patch = malloc (sizeof (GtQueryPatch)))) paulo@0: return FALSE; paulo@0: paulo@0: memset (new_patch, 0, sizeof (GtQueryPatch)); paulo@0: paulo@0: if (!(new_patch->stream = zlib_stream_open (max_size))) paulo@0: { paulo@0: free (new_patch); paulo@0: return FALSE; paulo@0: } paulo@0: paulo@0: new_patch->seq_size = seq_size; paulo@0: new_patch->compressed = compressed; paulo@0: paulo@0: /* NOTE: sequence is 1-based, not 0-based */ paulo@0: new_patch->seq_num = 1; paulo@0: paulo@0: router->patch = new_patch; paulo@0: paulo@0: return TRUE; paulo@0: } paulo@0: paulo@0: void query_patch_close (GtQueryRouter *router) paulo@0: { paulo@0: GtQueryPatch *patch; paulo@0: paulo@0: GT->DBGFN (GT, "entered"); paulo@0: paulo@0: if (!router) paulo@0: return; paulo@0: paulo@0: patch = router->patch; paulo@0: paulo@0: if (!patch) paulo@0: return; paulo@0: paulo@0: zlib_stream_close (patch->stream); paulo@0: paulo@0: router->patch = NULL; paulo@0: free (patch); paulo@0: } paulo@0: paulo@0: /* TODO: compact router tables to bit-level */ paulo@0: static void query_patch_apply (GtQueryRouter *router, int bits, char *data, paulo@0: size_t data_size) paulo@0: { paulo@0: GtQueryPatch *patch; paulo@0: char *table; /* NOTE: this must be signed */ paulo@0: int i; paulo@0: paulo@0: patch = router->patch; paulo@0: assert (patch != NULL); paulo@0: paulo@0: /* check for overflow: this may look wrong but its not */ paulo@0: if (patch->table_pos + (data_size - 1) >= router->size) paulo@0: { paulo@0: GT->DBGFN (GT, "patch overflow: %u (max of %u)", paulo@0: patch->table_pos+data_size, router->size); paulo@0: query_patch_close (router); paulo@0: return; paulo@0: } paulo@0: paulo@0: table = router->table; paulo@0: paulo@0: /* hrm */ paulo@0: if (bits == 4) paulo@0: { paulo@0: int j; paulo@0: paulo@0: for (i = 0; i < data_size; i++) paulo@0: { paulo@0: int pos; paulo@0: char change; paulo@0: paulo@0: pos = i + patch->table_pos; paulo@0: paulo@0: /* avoid % */ paulo@0: j = (i+1) & 0x1; paulo@0: paulo@0: /* grab the correct half of the byte and sign-extend it paulo@0: * NOTE: this starts off with the most significant bits! */ paulo@0: change = data[i] & (0x0f << (4 * j)); paulo@0: paulo@0: /* move to least significant bits paulo@0: * TODO: does this do sign-extension correctly? */ paulo@0: change >>= 4; paulo@0: paulo@0: table[pos] += change; paulo@0: } paulo@0: } paulo@0: else if (bits == 8) paulo@0: { paulo@0: /* untested */ paulo@0: for (i = 0; i < data_size; i++) paulo@0: { paulo@0: table[i + patch->table_pos] += data[i]; paulo@0: } paulo@0: } paulo@0: else paulo@0: { paulo@0: GT->DBGFN (GT, "undefined bits value in query patch: %u", bits); paulo@0: query_patch_close (router); paulo@0: return; paulo@0: } paulo@0: paulo@0: /* store the table position for the next patch */ paulo@0: patch->table_pos += i; paulo@0: paulo@0: /* cleanup the data if the patch is done */ paulo@0: patch->seq_num++; paulo@0: paulo@0: if (patch->seq_num > patch->seq_size) paulo@0: query_patch_close (router); paulo@0: } paulo@0: paulo@0: /*****************************************************************************/ paulo@0: paulo@0: /* TODO: compact router tables to bit-level */ paulo@0: GtQueryRouter *gt_query_router_new (size_t size, int infinity) paulo@0: { paulo@0: GtQueryRouter *router; paulo@0: paulo@0: if (size > MAX_TABLE_SIZE) paulo@0: return NULL; paulo@0: paulo@0: if (!(router = malloc (sizeof (GtQueryRouter)))) paulo@0: return NULL; paulo@0: paulo@0: memset (router, 0, sizeof (GtQueryRouter)); paulo@0: paulo@0: if (!(router->table = malloc (size))) paulo@0: { paulo@0: free (router->table); paulo@0: return NULL; paulo@0: } paulo@0: paulo@0: memset (router->table, infinity, size); paulo@0: paulo@0: router->size = size; paulo@0: paulo@0: return router; paulo@0: } paulo@0: paulo@0: void gt_query_router_free (GtQueryRouter *router) paulo@0: { paulo@0: if (!router) paulo@0: return; paulo@0: paulo@0: query_patch_close (router); paulo@0: paulo@0: free (router->table); paulo@0: free (router); paulo@0: } paulo@0: paulo@0: /*****************************************************************************/ paulo@0: paulo@0: static void print_hex (unsigned char *data, size_t size) paulo@0: { paulo@0: fprint_hex (stdout, data, size); paulo@0: } paulo@0: paulo@0: void gt_query_router_update (GtQueryRouter *router, size_t seq_num, paulo@0: size_t seq_size, int compressed, int bits, paulo@0: unsigned char *zdata, size_t size) paulo@0: { paulo@0: GtQueryPatch *patch; paulo@0: char *data; paulo@0: paulo@0: if (!router) paulo@0: { paulo@0: GT->DBGFN (GT, "null router"); paulo@0: return; paulo@0: } paulo@0: paulo@0: if (!router->patch) paulo@0: { paulo@0: if (!query_patch_open (router, seq_size, compressed, router->size)) paulo@0: return; paulo@0: } paulo@0: paulo@0: patch = router->patch; paulo@0: paulo@0: /* check for an invalid sequence number or size */ paulo@0: if (patch->seq_size != seq_size || patch->seq_num != seq_num) paulo@0: { paulo@0: GT->DBGFN (GT, "bad patch: seq_size %u vs %u, seq_num %u vs %u", paulo@0: patch->seq_size, seq_size, patch->seq_num, seq_num); paulo@0: query_patch_close (router); paulo@0: return; paulo@0: } paulo@0: paulo@0: if (compressed != patch->compressed) paulo@0: { paulo@0: GT->DBGFN (GT, "tried to change encodings in patch"); paulo@0: query_patch_close (router); paulo@0: return; paulo@0: } paulo@0: paulo@0: switch (compressed) paulo@0: { paulo@0: case 0x00: /* no compression */ paulo@0: if (!zlib_stream_write (patch->stream, zdata, size)) paulo@0: { paulo@0: GT->DBGFN (GT, "error copying data"); paulo@0: query_patch_close (router); paulo@0: return; paulo@0: } paulo@0: paulo@0: break; paulo@0: paulo@0: case 0x01: /* deflate */ paulo@0: printf ("zlib compressed data:\n"); paulo@0: print_hex (zdata, size); paulo@0: paulo@0: if (!zlib_stream_inflate (patch->stream, zdata, size)) paulo@0: { paulo@0: GT->DBGFN (GT, "error inflating data"); paulo@0: query_patch_close (router); paulo@0: return; paulo@0: } paulo@0: paulo@0: break; paulo@0: paulo@0: default: paulo@0: GT->DBGFN (GT, "unknown compression algorithm in query route patch"); paulo@0: return; paulo@0: } paulo@0: paulo@0: /* read the data in the stream */ paulo@0: if (!(size = zlib_stream_read (patch->stream, &data))) paulo@0: { paulo@0: GT->DBGFN (GT, "error calling zlib_stream_read"); paulo@0: query_patch_close (router); paulo@0: return; paulo@0: } paulo@0: paulo@0: printf ("after uncompressing:\n"); paulo@0: print_hex (data, size); paulo@0: paulo@0: /* apply the patch -- this will cleanup any data if necessary */ paulo@0: query_patch_apply (router, bits, data, size); paulo@0: paulo@0: print_hex (router->table, router->size); paulo@0: } paulo@0: paulo@0: /*****************************************************************************/ paulo@0: paulo@0: static void submit_empty_table (TCPC *c) paulo@0: { paulo@0: static char table[8] = { 0 }; paulo@0: int len; paulo@0: paulo@0: #if 0 paulo@0: size_t size; paulo@0: #endif paulo@0: paulo@0: GT->DBGFN (GT, "reseting route table for %s", net_ip_str (GT_NODE(c)->ip)); paulo@0: paulo@0: /* all slots in the table should be initialized to infinity, so it paulo@0: * should be "empty" on the remote node */ paulo@0: memset (table, 0, sizeof (table)); paulo@0: paulo@0: paulo@0: #if 0 paulo@0: /* TEST: set part of the table to -infinity to get queries */ paulo@0: size = sizeof (table); paulo@0: memset (table + (size + 1) / 2 - 1, -infinity, (size + 1) / 4); paulo@0: #endif paulo@0: paulo@0: /* format: */ paulo@0: if (gt_packet_send_fmt (c, GT_MSG_QUERY_ROUTE, NULL, 1, 0, paulo@0: "%c%lu%c", 0, (unsigned long) sizeof (table), paulo@0: INFINITY) < 0) paulo@0: { paulo@0: GT->DBGFN (GT, "error reseting table"); paulo@0: return; paulo@0: } paulo@0: paulo@0: len = sizeof (table); paulo@0: paulo@0: if (gt_packet_send_fmt (c, GT_MSG_QUERY_ROUTE, NULL, 1, 0, paulo@0: "%c%c%c%c%c%*p", paulo@0: 1, 1, 1, 0, 8, len, table) < 0) paulo@0: { paulo@0: GT->DBGFN (GT, "error sending empty patch"); paulo@0: return; paulo@0: } paulo@0: } paulo@0: paulo@0: static void submit_table (TCPC *c, uint8_t *table, size_t size, size_t slots) paulo@0: { paulo@0: int infinity = INFINITY; paulo@0: int seq_size; paulo@0: int compressed; paulo@0: int seq_num; paulo@0: uint8_t *p; paulo@0: size_t send_size; paulo@0: paulo@0: /* XXX make table size settable at runtime */ paulo@0: paulo@0: /* send a reset table first */ paulo@0: if (gt_packet_send_fmt (c, GT_MSG_QUERY_ROUTE, NULL, 1, 0, paulo@0: "%c%lu%c", 0, (long)slots, infinity) < 0) paulo@0: { paulo@0: GT->DBGFN (GT, "error reseting table"); paulo@0: return; paulo@0: } paulo@0: paulo@0: /* Break the table into PATCH_FRAGSIZE-sized chunks, paulo@0: * and include any leftover portions. */ paulo@0: seq_size = size / PATCH_FRAGSIZE + paulo@0: (size % PATCH_FRAGSIZE == 0 ? 0 : 1); paulo@0: paulo@0: assert (seq_size < 256); paulo@0: #if 0 paulo@0: GT->dbg (GT, "sequence size: %u", seq_size); paulo@0: #endif paulo@0: paulo@0: p = table; paulo@0: compressed = TRUE; paulo@0: paulo@0: /* NOTE: patch sequence numbers start at 1 */ paulo@0: for (seq_num = 1; seq_num <= seq_size; seq_num++) paulo@0: { paulo@0: send_size = MIN (PATCH_FRAGSIZE, size); paulo@0: paulo@0: if (gt_packet_send_fmt (c, GT_MSG_QUERY_ROUTE, NULL, 1, 0, paulo@0: "%c%c%c%c%c%*p", paulo@0: /* QRP PATCH */ 1, paulo@0: seq_num, seq_size, compressed, paulo@0: /* bits */ PATCH_BITS, paulo@0: send_size, p) < 0) paulo@0: { paulo@0: GT->DBGFN (GT, "error sending QRT patch"); paulo@0: return; paulo@0: } paulo@0: paulo@0: size -= send_size; paulo@0: p += send_size; paulo@0: } paulo@0: } paulo@0: paulo@0: static BOOL update_qr_table (TCPC *c) paulo@0: { paulo@0: size_t size; paulo@0: int version; paulo@0: uint8_t *table; paulo@0: GtNode *node = GT_NODE(c); paulo@0: paulo@0: assert (node->state & GT_NODE_CONNECTED); paulo@0: paulo@0: table = gt_query_router_self (&size, &version); paulo@0: paulo@0: /* we may not have finished building a table yet */ paulo@0: if (!table) paulo@0: return TRUE; paulo@0: paulo@0: /* dont submit a table if this node is already up to date */ paulo@0: if (node->query_router_counter == version) paulo@0: return TRUE; paulo@0: paulo@0: /* HACK: this shouldn't be using compressed_slots */ paulo@0: submit_table (c, table, size, compressed_slots); paulo@0: paulo@0: /* store the version number of this table so paulo@0: * we dont resubmit unecessarily */ paulo@0: node->query_router_counter = version; paulo@0: paulo@0: return TRUE; paulo@0: } paulo@0: paulo@0: static BOOL submit_first_table (TCPC *c) paulo@0: { paulo@0: GtNode *node = GT_NODE(c); paulo@0: paulo@0: assert (node->state & GT_NODE_CONNECTED); paulo@0: paulo@0: update_qr_table (c); paulo@0: paulo@0: /* remove the first timer */ paulo@0: timer_remove (node->query_route_timer); paulo@0: paulo@0: /* set the timer for updating the table repeatedly */ paulo@0: node->query_route_timer = timer_add (QRT_UPDATE_INTERVAL, paulo@0: (TimerCallback)update_qr_table, c); paulo@0: paulo@0: /* we removed the timer, and must return FALSE */ paulo@0: return FALSE; paulo@0: } paulo@0: paulo@0: /* paulo@0: * Submit the query routing table for this node to another. paulo@0: * paulo@0: * This delays sending the table for while. This helps preserve our precious paulo@0: * upstream when we're looking for nodes to connect to, as this often paulo@0: * happens when we're in the middle of looking for more nodes. paulo@0: */ paulo@0: void query_route_table_submit (TCPC *c) paulo@0: { paulo@0: GtNode *node = GT_NODE(c); paulo@0: paulo@0: assert (node->query_route_timer == 0); paulo@0: paulo@0: /* save bandwidth with an empty table */ paulo@0: submit_empty_table (c); paulo@0: paulo@0: /* submit a real table later */ paulo@0: node->query_route_timer = timer_add (QRT_SUBMIT_DELAY, paulo@0: (TimerCallback)submit_first_table, c); paulo@0: } paulo@0: paulo@0: /*****************************************************************************/ paulo@0: /* TESTING */ paulo@0: paulo@0: #if 0 paulo@0: #define CHECK(x) do { \ paulo@0: if (!(x)) printf("FAILED: %s\n", #x); \ paulo@0: else printf("OK: %s\n", #x); \ paulo@0: } while (0) paulo@0: paulo@0: #define HASH(str, bits) \ paulo@0: printf ("hash " str ": %u\n", gt_query_router_hash_str (str, bits)) paulo@0: paulo@0: int main (int argc, char **argv) paulo@0: { paulo@0: #define hash gt_query_router_hash_str paulo@0: paulo@0: CHECK(hash("", 13)==0); paulo@0: CHECK(hash("eb", 13)==6791); paulo@0: CHECK(hash("ebc", 13)==7082); paulo@0: CHECK(hash("ebck", 13)==6698); paulo@0: CHECK(hash("ebckl", 13)==3179); paulo@0: CHECK(hash("ebcklm", 13)==3235); paulo@0: CHECK(hash("ebcklme", 13)==6438); paulo@0: CHECK(hash("ebcklmen", 13)==1062); paulo@0: CHECK(hash("ebcklmenq", 13)==3527); paulo@0: CHECK(hash("", 16)==0); paulo@0: CHECK(hash("n", 16)==65003); paulo@0: CHECK(hash("nd", 16)==54193); paulo@0: CHECK(hash("ndf", 16)==4953); paulo@0: CHECK(hash("ndfl", 16)==58201); paulo@0: CHECK(hash("ndfla", 16)==34830); paulo@0: CHECK(hash("ndflal", 16)==36910); paulo@0: CHECK(hash("ndflale", 16)==34586); paulo@0: CHECK(hash("ndflalem", 16)==37658); paulo@0: CHECK(hash("FAIL", 16)==37458); // WILL FAIL paulo@0: CHECK(hash("ndflaleme", 16)==45559); paulo@0: CHECK(hash("ol2j34lj", 10)==318); paulo@0: CHECK(hash("asdfas23", 10)==503); paulo@0: CHECK(hash("9um3o34fd", 10)==758); paulo@0: CHECK(hash("a234d", 10)==281); paulo@0: CHECK(hash("a3f", 10)==767); paulo@0: CHECK(hash("3nja9", 10)==581); paulo@0: CHECK(hash("2459345938032343", 10)==146); paulo@0: CHECK(hash("7777a88a8a8a8", 10)==342); paulo@0: CHECK(hash("asdfjklkj3k", 10)==861); paulo@0: CHECK(hash("adfk32l", 10)==1011); paulo@0: CHECK(hash("zzzzzzzzzzz", 10)==944); paulo@0: paulo@0: CHECK(hash("3nja9", 10)==581); paulo@0: CHECK(hash("3NJA9", 10)==581); paulo@0: CHECK(hash("3nJa9", 10)==581); paulo@0: paulo@0: printf ("hash(FAIL, 16) = %u\n", hash("FAIL", 16)); paulo@0: return 0; paulo@0: } paulo@0: #endif