annotate src/message/query.c @ 0:d39e1d0d75b6

initial add
author paulo@hit-nxdomain.opendns.com
date Sat, 20 Feb 2010 21:18:28 -0800
parents
children
rev   line source
paulo@0 1 /*
paulo@0 2 * $Id: query.c,v 1.10 2004/06/04 15:44:59 hipnod Exp $
paulo@0 3 *
paulo@0 4 * Copyright (C) 2001-2003 giFT project (gift.sourceforge.net)
paulo@0 5 *
paulo@0 6 * This program is free software; you can redistribute it and/or modify it
paulo@0 7 * under the terms of the GNU General Public License as published by the
paulo@0 8 * Free Software Foundation; either version 2, or (at your option) any
paulo@0 9 * later version.
paulo@0 10 *
paulo@0 11 * This program is distributed in the hope that it will be useful, but
paulo@0 12 * WITHOUT ANY WARRANTY; without even the implied warranty of
paulo@0 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
paulo@0 14 * General Public License for more details.
paulo@0 15 */
paulo@0 16
paulo@0 17 #include "gt_gnutella.h"
paulo@0 18 #include "message/msg_handler.h"
paulo@0 19
paulo@0 20 #include "sha1.h"
paulo@0 21 #include "xml.h"
paulo@0 22
paulo@0 23 #include "gt_share.h"
paulo@0 24 #include "gt_share_file.h"
paulo@0 25 #include "gt_share_state.h"
paulo@0 26
paulo@0 27 #include "gt_search.h"
paulo@0 28 #include "gt_search_exec.h"
paulo@0 29 #include "gt_urn.h"
paulo@0 30
paulo@0 31 #include "transfer/push_proxy.h"
paulo@0 32
paulo@0 33 /*****************************************************************************/
paulo@0 34
paulo@0 35 #define LOG_RESULT_PACKETS gt_config_get_int("search/log_result_packets=0")
paulo@0 36
paulo@0 37 /*****************************************************************************/
paulo@0 38
paulo@0 39 typedef struct gt_search_reply
paulo@0 40 {
paulo@0 41 uint8_t ttl;
paulo@0 42 uint8_t results; /* number of results on the current packet */
paulo@0 43 GtPacket *packet; /* the current packet to stack results on */
paulo@0 44 gt_guid_t *guid;
paulo@0 45 } gt_search_reply_t;
paulo@0 46
paulo@0 47 /*****************************************************************************/
paulo@0 48
paulo@0 49 /* cache of recent queries TODO: flush this on plugin unload */
paulo@0 50 static Dataset *query_cache = NULL;
paulo@0 51
paulo@0 52 /* flushes the old cache entries */
paulo@0 53 static timer_id flush_timer = 0;
paulo@0 54
paulo@0 55 /*****************************************************************************/
paulo@0 56
paulo@0 57 static BOOL is_printable (const char *s)
paulo@0 58 {
paulo@0 59 while (*s)
paulo@0 60 {
paulo@0 61 if (!isprint (*s))
paulo@0 62 return FALSE;
paulo@0 63
paulo@0 64 s++;
paulo@0 65 }
paulo@0 66
paulo@0 67 return TRUE;
paulo@0 68 }
paulo@0 69
paulo@0 70 static void parse_text_meta (const char *data, Dataset **meta)
paulo@0 71 {
paulo@0 72 int rate, freq, min, sec;
paulo@0 73 int n;
paulo@0 74 char *lower;
paulo@0 75
paulo@0 76 if (!data)
paulo@0 77 return;
paulo@0 78
paulo@0 79 /* only ASCII strings are plaintext metadata */
paulo@0 80 if (!is_printable (data))
paulo@0 81 return;
paulo@0 82
paulo@0 83 /* skip strings that start with "urn:", we know what those are */
paulo@0 84 if (!strncasecmp (data, "urn:", 4))
paulo@0 85 return;
paulo@0 86
paulo@0 87 if (!(lower = STRDUP (data)))
paulo@0 88 return;
paulo@0 89
paulo@0 90 string_lower (lower);
paulo@0 91 n = sscanf (lower, "%d kbps %d khz %d:%d", &rate, &freq, &min, &sec);
paulo@0 92
paulo@0 93 /* try again with a slightly different format if it failed */
paulo@0 94 if (n != 4)
paulo@0 95 n = sscanf (lower, "%d kbps(vbr) %d khz %d:%d", &rate, &freq, &min, &sec);
paulo@0 96
paulo@0 97 free (lower);
paulo@0 98
paulo@0 99 if (n != 4)
paulo@0 100 {
paulo@0 101 #if 0
paulo@0 102 static int warned = 0;
paulo@0 103
paulo@0 104 if (warned++ < 4)
paulo@0 105 GT->DBGFN (GT, "unknown plaintext metadata?: %s", data);
paulo@0 106 #endif
paulo@0 107
paulo@0 108 return;
paulo@0 109 }
paulo@0 110
paulo@0 111 /* XXX: actually this should be META_DEBUG */
paulo@0 112 if (XML_DEBUG)
paulo@0 113 GT->DBGFN (GT, "parsed %d kbps %d khz %d:%d", rate, freq, min, sec);
paulo@0 114
paulo@0 115 dataset_insertstr (meta, "bitrate", stringf ("%li", rate * 1000));
paulo@0 116 dataset_insertstr (meta, "frequency", stringf ("%u", freq * 1000));
paulo@0 117 dataset_insertstr (meta, "duration", stringf ("%i", min * 60 + sec));
paulo@0 118 }
paulo@0 119
paulo@0 120 void gt_parse_extended_data (char *ext_block, gt_urn_t **r_urn,
paulo@0 121 Dataset **r_meta)
paulo@0 122 {
paulo@0 123 gt_urn_t *urn = NULL;
paulo@0 124 char *ext;
paulo@0 125
paulo@0 126 if (r_urn)
paulo@0 127 *r_urn = NULL;
paulo@0 128 if (r_meta)
paulo@0 129 *r_meta = NULL;
paulo@0 130
paulo@0 131 if (!ext_block)
paulo@0 132 return;
paulo@0 133
paulo@0 134 /*
paulo@0 135 * 0x1c is the separator character for so-called "GEM"
paulo@0 136 * (Gnutella-Extension Mechanism) extensions.
paulo@0 137 */
paulo@0 138 while ((ext = string_sep (&ext_block, "\x1c")))
paulo@0 139 {
paulo@0 140 if (string_isempty (ext))
paulo@0 141 break;
paulo@0 142
paulo@0 143 if (r_urn && (urn = gt_urn_parse (ext)))
paulo@0 144 {
paulo@0 145 free (*r_urn);
paulo@0 146 *r_urn = urn;
paulo@0 147 }
paulo@0 148
paulo@0 149 if (r_meta)
paulo@0 150 {
paulo@0 151 parse_text_meta (ext, r_meta);
paulo@0 152 gt_xml_parse (ext, r_meta);
paulo@0 153 }
paulo@0 154 }
paulo@0 155 }
paulo@0 156
paulo@0 157 static BOOL append_result (GtPacket *packet, FileShare *file)
paulo@0 158 {
paulo@0 159 GtShare *share;
paulo@0 160 Hash *hash;
paulo@0 161
paulo@0 162 if (!(share = share_get_udata (file, GT->name)))
paulo@0 163 return FALSE;
paulo@0 164
paulo@0 165 /* search results
paulo@0 166 * format: <index#> <file size> <file name> <extra data(include hash)> */
paulo@0 167 gt_packet_put_uint32 (packet, share->index);
paulo@0 168 gt_packet_put_uint32 (packet, file->size);
paulo@0 169 gt_packet_put_str (packet, share->filename);
paulo@0 170
paulo@0 171 /*
paulo@0 172 * This is the information that goes "between the nulls" in a
paulo@0 173 * query hit. The first null comes after the filename.
paulo@0 174 *
paulo@0 175 * This is a bit specific and icky. It should be abstracted away.
paulo@0 176 */
paulo@0 177 if ((hash = share_get_hash (file, "SHA1")))
paulo@0 178 {
paulo@0 179 char *sha1;
paulo@0 180
paulo@0 181 assert (hash->len == SHA1_BINSIZE);
paulo@0 182
paulo@0 183 if ((sha1 = sha1_string (hash->data)))
paulo@0 184 {
paulo@0 185 char buf[128];
paulo@0 186 int len;
paulo@0 187
paulo@0 188 /* make the hash be uppercase */
paulo@0 189 string_upper (sha1);
paulo@0 190
paulo@0 191 len = strlen (sha1);
paulo@0 192 assert (len == SHA1_STRLEN);
paulo@0 193
paulo@0 194 snprintf (buf, sizeof (buf) - 1, "urn:sha1:%s", sha1);
paulo@0 195 len += strlen ("urn:sha1:");
paulo@0 196
paulo@0 197 gt_packet_put_ustr (packet, buf, len);
paulo@0 198 free (sha1);
paulo@0 199 }
paulo@0 200 }
paulo@0 201
paulo@0 202 /* put the second null there */
paulo@0 203 gt_packet_put_uint8 (packet, 0);
paulo@0 204
paulo@0 205 if (gt_packet_error (packet))
paulo@0 206 {
paulo@0 207 gt_packet_free (packet);
paulo@0 208 return FALSE;
paulo@0 209 }
paulo@0 210
paulo@0 211 return TRUE;
paulo@0 212 }
paulo@0 213
paulo@0 214 /* add a trailer to the packets */
paulo@0 215 static void transmit_results (TCPC *c, GtPacket *packet, uint8_t hits)
paulo@0 216 {
paulo@0 217 gt_eqhd1_t eqhd1 = EQHD1_EMPTY;
paulo@0 218 gt_eqhd2_t eqhd2 = EQHD2_EMPTY;
paulo@0 219 uint8_t *ggep;
paulo@0 220 size_t ggep_len;
paulo@0 221
paulo@0 222 /* set the push bit as significant */
paulo@0 223 eqhd2 |= EQHD2_HAS_PUSH;
paulo@0 224 /* set the busy bit as significant */
paulo@0 225 eqhd1 |= EQHD1_HAS_BUSY;
paulo@0 226
paulo@0 227 /*
paulo@0 228 * We shouldnt mark ourselves firewalled if the destination is
paulo@0 229 * a local ip address and ttl == 1. However, for greater TTLs,
paulo@0 230 * there's no knowing if we should mark it or not...
paulo@0 231 */
paulo@0 232 if (GT_SELF->firewalled)
paulo@0 233 eqhd1 |= EQHD1_PUSH_FLAG;
paulo@0 234
paulo@0 235 if (upload_availability () == 0)
paulo@0 236 eqhd2 |= EQHD2_BUSY_FLAG;
paulo@0 237
paulo@0 238 /* add the query hit descriptor
paulo@0 239 * <vendor id> <length> <qhd_data1> <qhd_data2> <private_data> */
paulo@0 240 gt_packet_put_ustr (packet, (const unsigned char *)"GIFT", 4);
paulo@0 241 gt_packet_put_uint8 (packet, 2);
paulo@0 242 gt_packet_put_uint8 (packet, eqhd1);
paulo@0 243 gt_packet_put_uint8 (packet, eqhd2);
paulo@0 244
paulo@0 245 /* append GGEP block (only contains PUSH proxies for now) */
paulo@0 246 if (gt_push_proxy_get_ggep_block (&ggep, &ggep_len))
paulo@0 247 gt_packet_put_ustr (packet, ggep, ggep_len);
paulo@0 248
paulo@0 249 /* client identifier */
paulo@0 250 gt_packet_put_ustr (packet, GT_SELF_GUID, 16);
paulo@0 251
paulo@0 252 if (gt_packet_error (packet))
paulo@0 253 {
paulo@0 254 gt_packet_free (packet);
paulo@0 255 return;
paulo@0 256 }
paulo@0 257
paulo@0 258 #if 0
paulo@0 259 GT->DBGFN (GT, "packet before twiddling result number: (will twiddle %i)", hits);
paulo@0 260 TRACE_MEM (packet->data, packet->len);
paulo@0 261 #endif
paulo@0 262
paulo@0 263 /* rewind the packet to the search hit count and replace the hitcount
paulo@0 264 * it is the first byte after the header
paulo@0 265 * XXX: this should use a facility of gt_packet */
paulo@0 266 packet->data[GNUTELLA_HDR_LEN] = hits;
paulo@0 267
paulo@0 268 #if 0
paulo@0 269 GT->DBGFN (GT, "packet after twiddling:");
paulo@0 270 TRACE_MEM (packet->data, packet->len);
paulo@0 271 #endif
paulo@0 272
paulo@0 273 if (LOG_RESULT_PACKETS)
paulo@0 274 GT->dbg (GT, "transmitting %i", hits);
paulo@0 275
paulo@0 276 /* send the reply along the path to the node that queried us */
paulo@0 277 gt_packet_send (c, packet);
paulo@0 278 gt_packet_free (packet);
paulo@0 279 }
paulo@0 280
paulo@0 281 static BOOL query_request_result (TCPC *c, FileShare *file,
paulo@0 282 gt_search_reply_t *reply)
paulo@0 283 {
paulo@0 284 GtPacket *packet;
paulo@0 285
paulo@0 286 if (!file)
paulo@0 287 {
paulo@0 288 /* send any remaining data */
paulo@0 289 if (reply->packet)
paulo@0 290 transmit_results (c, reply->packet, reply->results);
paulo@0 291
paulo@0 292 return FALSE;
paulo@0 293 }
paulo@0 294
paulo@0 295 packet = reply->packet;
paulo@0 296
paulo@0 297 if (packet)
paulo@0 298 {
paulo@0 299 /* send the packet if the max results per packet is reached
paulo@0 300 * or the size of the packet is large */
paulo@0 301 if (reply->results == 255 || gt_packet_payload_len (packet) > 2000)
paulo@0 302 {
paulo@0 303 transmit_results (c, packet, reply->results);
paulo@0 304
paulo@0 305 reply->packet = NULL;
paulo@0 306 reply->results = 0;
paulo@0 307
paulo@0 308 /* handle this item again */
paulo@0 309 return TRUE;
paulo@0 310 }
paulo@0 311
paulo@0 312 if (append_result (packet, file))
paulo@0 313 reply->results++;
paulo@0 314
paulo@0 315 return FALSE;
paulo@0 316 }
paulo@0 317
paulo@0 318 /* allocate a packet */
paulo@0 319 if (!(packet = gt_packet_new (GT_MSG_QUERY_REPLY, reply->ttl, reply->guid)))
paulo@0 320 {
paulo@0 321 GIFT_ERROR (("mem failure?"));
paulo@0 322 return FALSE;
paulo@0 323 }
paulo@0 324
paulo@0 325 /* setup the search header */
paulo@0 326 gt_packet_put_uint8 (packet, 0);
paulo@0 327 gt_packet_put_port (packet, GT_SELF->gt_port);
paulo@0 328 gt_packet_put_ip (packet, GT_NODE(c)->my_ip);
paulo@0 329 gt_packet_put_uint32 (packet, 0); /* speed (kbits) */
paulo@0 330
paulo@0 331 if (gt_packet_error (packet))
paulo@0 332 {
paulo@0 333 GIFT_ERROR (("failed seting up search result packet"));
paulo@0 334 gt_packet_free (packet);
paulo@0 335 return FALSE;
paulo@0 336 }
paulo@0 337
paulo@0 338 reply->packet = packet;
paulo@0 339
paulo@0 340 /* handle this item again */
paulo@0 341 return TRUE;
paulo@0 342 }
paulo@0 343
paulo@0 344 static BOOL query_request_result_free (TCPC *c, FileShare *file,
paulo@0 345 gt_search_reply_t *reply)
paulo@0 346 {
paulo@0 347 GtShare *share;
paulo@0 348
paulo@0 349 if (!file)
paulo@0 350 {
paulo@0 351 free (reply->guid);
paulo@0 352 free (reply);
paulo@0 353 return FALSE;
paulo@0 354 }
paulo@0 355
paulo@0 356 /* just a sanity check */
paulo@0 357 if (file && !(share = share_get_udata (file, GT->name)))
paulo@0 358 return FALSE;
paulo@0 359
paulo@0 360 return FALSE;
paulo@0 361 }
paulo@0 362
paulo@0 363 /* This emulates the old queue interface */
paulo@0 364 static BOOL send_result (FileShare *file, void **args)
paulo@0 365 {
paulo@0 366 TCPC *c = args[0];
paulo@0 367 gt_search_reply_t *reply = args[1];
paulo@0 368
paulo@0 369 while (query_request_result (c, file, reply))
paulo@0 370 ;
paulo@0 371
paulo@0 372 query_request_result_free (c, file, reply);
paulo@0 373 return TRUE;
paulo@0 374 }
paulo@0 375
paulo@0 376 static void send_results (TCPC *c, List *results, gt_search_reply_t *reply)
paulo@0 377 {
paulo@0 378 void *args[2];
paulo@0 379
paulo@0 380 args[0] = c;
paulo@0 381 args[1] = reply;
paulo@0 382
paulo@0 383 results = list_foreach_remove (results, (ListForeachFunc)send_result, args);
paulo@0 384 assert (results == NULL);
paulo@0 385
paulo@0 386 query_request_result (c, NULL, reply);
paulo@0 387 query_request_result_free (c, NULL, reply);
paulo@0 388 }
paulo@0 389
paulo@0 390 static int flush_old (ds_data_t *key, ds_data_t *value, time_t *now)
paulo@0 391 {
paulo@0 392 time_t *timestamp = value->data;
paulo@0 393
paulo@0 394 if (*now - *timestamp >= 10 * EMINUTES)
paulo@0 395 return DS_CONTINUE | DS_REMOVE;
paulo@0 396
paulo@0 397 return DS_CONTINUE;
paulo@0 398 }
paulo@0 399
paulo@0 400 static BOOL flush_qcache (Dataset *cache)
paulo@0 401 {
paulo@0 402 time_t now = time (NULL);
paulo@0 403
paulo@0 404 assert (query_cache != NULL);
paulo@0 405 dataset_foreach_ex (query_cache, DS_FOREACH_EX(flush_old), &now);
paulo@0 406
paulo@0 407 return TRUE;
paulo@0 408 }
paulo@0 409
paulo@0 410 /* TODO: need to break up this file soon to isolate these things */
paulo@0 411 static BOOL query_cache_lookup (gt_guid_t *guid)
paulo@0 412 {
paulo@0 413 time_t now;
paulo@0 414
paulo@0 415 if (dataset_lookup (query_cache, guid, GT_GUID_LEN))
paulo@0 416 return TRUE;
paulo@0 417
paulo@0 418 /* limit the maximum length the query cache can grow */
paulo@0 419 if (dataset_length (query_cache) >= 2000)
paulo@0 420 return FALSE;
paulo@0 421
paulo@0 422 /* add the guid for catching duplicates next time */
paulo@0 423 now = time (NULL);
paulo@0 424 dataset_insert (&query_cache, guid, GT_GUID_LEN, &now, sizeof (now));
paulo@0 425
paulo@0 426 if (!flush_timer)
paulo@0 427 {
paulo@0 428 flush_timer = timer_add (5 * MINUTES, (TimerCallback)flush_qcache,
paulo@0 429 NULL);
paulo@0 430 }
paulo@0 431
paulo@0 432 return FALSE;
paulo@0 433 }
paulo@0 434
paulo@0 435 GT_MSG_HANDLER(gt_msg_query)
paulo@0 436 {
paulo@0 437 char *query;
paulo@0 438 char *extended;
paulo@0 439 gt_guid_t *guid;
paulo@0 440 gt_urn_t *urn;
paulo@0 441 List *list;
paulo@0 442 uint8_t ttl;
paulo@0 443 uint8_t hops;
paulo@0 444 unsigned char *hash;
paulo@0 445 gt_query_flags_t flags;
paulo@0 446 gt_search_type_t type;
paulo@0 447 gt_search_reply_t *reply;
paulo@0 448
paulo@0 449 flags = gt_packet_get_uint16 (packet);
paulo@0 450 query = gt_packet_get_str (packet);
paulo@0 451 extended = gt_packet_get_str (packet);
paulo@0 452
paulo@0 453 guid = gt_packet_guid (packet);
paulo@0 454
paulo@0 455 /*
paulo@0 456 * TODO: node->share_state can be null here, if the node hasn't
paulo@0 457 * successfully handshaked yet. Should fix this by storing messages until
paulo@0 458 * handshake is complete.
paulo@0 459 */
paulo@0 460 if (node->share_state && node->share_state->hidden)
paulo@0 461 return;
paulo@0 462
paulo@0 463 /* don't reply if the host is firewalled and we are too */
paulo@0 464 if ((flags & QF_HAS_FLAGS) && (flags & QF_ONLY_NON_FW) &&
paulo@0 465 GT_SELF->firewalled)
paulo@0 466 {
paulo@0 467 return;
paulo@0 468 }
paulo@0 469
paulo@0 470 /* don't reply if this is our own search -- TODO: substitute a
paulo@0 471 * full-fledged routing table */
paulo@0 472 if (gt_search_find (guid))
paulo@0 473 {
paulo@0 474 if (MSG_DEBUG)
paulo@0 475 {
paulo@0 476 GT->dbg (GT, "not searching, own search (guid %s)",
paulo@0 477 gt_guid_str (guid));
paulo@0 478 }
paulo@0 479
paulo@0 480 return;
paulo@0 481 }
paulo@0 482
paulo@0 483 /* check if we've handled this search already */
paulo@0 484 if (query_cache_lookup (guid))
paulo@0 485 {
paulo@0 486 if (MSG_DEBUG)
paulo@0 487 GT->DBGSOCK (GT, c, "duplicate search (%s)", gt_guid_str (guid));
paulo@0 488
paulo@0 489 return;
paulo@0 490 }
paulo@0 491
paulo@0 492 gt_parse_extended_data (extended, &urn, NULL);
paulo@0 493
paulo@0 494 /* WARNING: this assumes sha1 */
paulo@0 495 hash = gt_urn_data (urn);
paulo@0 496
paulo@0 497 if (hash)
paulo@0 498 type = GT_SEARCH_HASH;
paulo@0 499 else
paulo@0 500 type = GT_SEARCH_KEYWORD;
paulo@0 501
paulo@0 502 #if 0
paulo@0 503 GT->DBGFN (GT, "min_speed = %hu, query = '%s', extended data = '%s'",
paulo@0 504 min_speed, query, extended);
paulo@0 505 #endif
paulo@0 506
paulo@0 507 ttl = gt_packet_ttl (packet);
paulo@0 508 hops = gt_packet_hops (packet);
paulo@0 509
paulo@0 510 list = gt_search_exec (query, type, urn, ttl, hops);
paulo@0 511 free (urn);
paulo@0 512
paulo@0 513 if (!list)
paulo@0 514 return;
paulo@0 515
paulo@0 516 if (!(reply = MALLOC (sizeof (gt_search_reply_t))))
paulo@0 517 {
paulo@0 518 list_free (list);
paulo@0 519 return;
paulo@0 520 }
paulo@0 521
paulo@0 522 /* set the ttl of the reply to be +1 the hops the request travelled */
paulo@0 523 reply->ttl = gt_packet_hops (packet) + 1;
paulo@0 524
paulo@0 525 /* use the guid of the packet in replying to results */
paulo@0 526 reply->guid = gt_guid_dup (guid);
paulo@0 527
paulo@0 528 send_results (c, list, reply);
paulo@0 529 }