paulo@0: /* paulo@0: * $Id: gt_xfer.c,v 1.103 2005/01/05 14:08:40 mkern Exp $ paulo@0: * paulo@0: * Copyright (C) 2001-2003 giFT project (gift.sourceforge.net) paulo@0: * paulo@0: * This program is free software; you can redistribute it and/or modify it paulo@0: * under the terms of the GNU General Public License as published by the paulo@0: * Free Software Foundation; either version 2, or (at your option) any paulo@0: * later version. paulo@0: * paulo@0: * This program is distributed in the hope that it will be useful, but paulo@0: * WITHOUT ANY WARRANTY; without even the implied warranty of paulo@0: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU paulo@0: * General Public License for more details. paulo@0: */ paulo@0: paulo@0: #include "gt_gnutella.h" paulo@0: paulo@0: #include "gt_xfer_obj.h" paulo@0: #include "gt_xfer.h" paulo@0: #include "gt_http_client.h" paulo@0: #include "gt_http_server.h" paulo@0: #include "gt_share.h" paulo@0: #include "gt_share_file.h" paulo@0: #include "gt_packet.h" paulo@0: #include "gt_node.h" paulo@0: #include "gt_node_list.h" paulo@0: #include "gt_netorg.h" paulo@0: #include "gt_connect.h" paulo@0: #include "gt_bind.h" paulo@0: #include "gt_utils.h" paulo@0: paulo@0: #include "encoding/url.h" paulo@0: paulo@0: #include "transfer/source.h" paulo@0: paulo@0: /******************************************************************************/ paulo@0: paulo@0: /* maximum number of push connections in limbo each remote user */ paulo@0: #define PUSH_MAX_LIMBO gt_config_get_int("transfer/push_max_in_limbo=5") paulo@0: paulo@0: /******************************************************************************/ paulo@0: paulo@0: /* an alternative to carrying the push TTL around in the source url */ paulo@0: #define PUSH_MAX_TTL 12 paulo@0: paulo@0: /* maximum time to keep push connections in "limbo" while awaiting giftd paulo@0: * to reissue Chunks */ paulo@0: #define PUSH_LIMBO_TIMEOUT (4 * MINUTES) paulo@0: paulo@0: /* how long to wait for a PUSH reply before timing out in order to free the paulo@0: * Chunk */ paulo@0: #define PUSH_WAIT_INTERVAL (30 * SECONDS) paulo@0: paulo@0: /* minimum interval between pushes */ paulo@0: #define PUSH_MIN_DEFER_TIME (30 * ESECONDS) paulo@0: paulo@0: /* maximum amount of time a push will be forced to wait before being sent */ paulo@0: #define PUSH_MAX_DEFER_TIME (10 * EMINUTES) paulo@0: paulo@0: /*****************************************************************************/ paulo@0: paulo@0: /* this stores information about an indirect ("pushed") download */ paulo@0: typedef struct gt_push_source paulo@0: { paulo@0: gt_guid_t *guid; paulo@0: in_addr_t ip; paulo@0: in_addr_t src_ip; /* whether this push was to a local source */ paulo@0: List *xfers; /* xfers for this source */ paulo@0: List *connections; /* connection for this source */ paulo@0: time_t last_sent; /* time of last push sent to this source */ paulo@0: double defer_time; /* min time to wait before sending another push paulo@0: * doubles every push to PUSH_MAX_DEFER_TIME */ paulo@0: } GtPushSource; paulo@0: paulo@0: /*****************************************************************************/ paulo@0: paulo@0: /* Maps guid->{list of unique GtPushSources} */ paulo@0: static Dataset *gt_push_requests; paulo@0: paulo@0: /******************************************************************************/ paulo@0: paulo@0: static void push_source_reset_last_sent (GtPushSource *push_src); paulo@0: static void push_source_set_last_sent (gt_guid_t *guid, in_addr_t ip); paulo@0: static BOOL push_source_should_send (gt_guid_t *guid, in_addr_t ip); paulo@0: paulo@0: /*****************************************************************************/ paulo@0: paulo@0: /* paulo@0: * The source URL is stored on disk and could be outdated if the format has paulo@0: * changed. This updates it with any changes when we first read it from paulo@0: * the state file on startup. paulo@0: */ paulo@0: static void replace_url (Source *src, GtSource *gt) paulo@0: { paulo@0: char *url; paulo@0: paulo@0: if (!(url = gt_source_serialize (gt))) paulo@0: return; paulo@0: paulo@0: /* swap urls */ paulo@0: FREE (src->url); paulo@0: src->url = url; paulo@0: } paulo@0: paulo@0: /******************************************************************************/ paulo@0: paulo@0: static FileShare *lookup_index (GtTransfer *xfer, char *request) paulo@0: { paulo@0: FileShare *file; paulo@0: char *index_str; paulo@0: char *filename; paulo@0: char *decoded; paulo@0: uint32_t index; paulo@0: paulo@0: filename = request; paulo@0: index_str = string_sep (&filename, "/"); paulo@0: paulo@0: if (!filename || !index_str) paulo@0: return NULL; paulo@0: paulo@0: index = ATOUL (index_str); paulo@0: paulo@0: decoded = gt_url_decode (filename); paulo@0: file = gt_share_local_lookup_by_index (index, decoded); paulo@0: paulo@0: free (decoded); paulo@0: paulo@0: /* the filename may or may not be url encoded */ paulo@0: if (!file) paulo@0: file = gt_share_local_lookup_by_index (index, filename); paulo@0: paulo@0: return file; paulo@0: } paulo@0: paulo@0: static FileShare *lookup_urns (GtTransfer *xfer, char *urns) paulo@0: { paulo@0: FileShare *file = NULL; paulo@0: char *urn; paulo@0: paulo@0: /* paulo@0: * Try to lookup all urns provided in the header, paulo@0: * until one is found. paulo@0: */ paulo@0: while (!file && !string_isempty (urns)) paulo@0: { paulo@0: urn = string_sep_set (&urns, ", "); paulo@0: file = gt_share_local_lookup_by_urn (urn); paulo@0: } paulo@0: paulo@0: return file; paulo@0: } paulo@0: paulo@0: static FileShare *lookup_uri_res (GtTransfer *xfer, char *request) paulo@0: { paulo@0: FileShare *file = NULL; paulo@0: char *resolver = NULL; paulo@0: char *urn; paulo@0: paulo@0: resolver = string_sep (&request, "?"); paulo@0: urn = string_sep (&request, " "); paulo@0: paulo@0: if (resolver && !strcasecmp (resolver, "N2R")) paulo@0: { paulo@0: string_trim (request); paulo@0: file = lookup_urns (xfer, urn); paulo@0: } paulo@0: paulo@0: if (file && HTTP_DEBUG) paulo@0: GT->dbg (GT, "file=%s", share_get_hpath (file)); paulo@0: paulo@0: return file; paulo@0: } paulo@0: paulo@0: static Share *lookup_hpath (char *namespace, GtTransfer *xfer, char *request) paulo@0: { paulo@0: char *hpath; paulo@0: Share *share; paulo@0: paulo@0: /* paulo@0: * Reconstruct the hpath paulo@0: */ paulo@0: if (!(hpath = stringf_dup ("/%s/%s", namespace, request))) paulo@0: return NULL; paulo@0: paulo@0: if (HTTP_DEBUG) paulo@0: GT->dbg (GT, "request by hpath: %s", hpath); paulo@0: paulo@0: share = GT->share_lookup (GT, SHARE_LOOKUP_HPATH, hpath); paulo@0: free (hpath); paulo@0: paulo@0: return share; paulo@0: } paulo@0: paulo@0: /* Take a request for a file, i.e. everything after GET in: paulo@0: * paulo@0: * "GET /get/1279/filename.mp3" paulo@0: * paulo@0: * and convert it to a localized path to a file. paulo@0: * paulo@0: * Path has been "secured" already if necessary. paulo@0: * paulo@0: * The path returned must be static. paulo@0: * TODO: this interface is a bit bizarre */ paulo@0: char *gt_localize_request (GtTransfer *xfer, char *s_path, int *authorized) paulo@0: { paulo@0: static char open_path[PATH_MAX]; paulo@0: char *namespace; paulo@0: char *path, *path0; paulo@0: char *content_urns; paulo@0: FileShare *file; paulo@0: paulo@0: /* TODO: use authorized for Browse Host (BH) requests */ paulo@0: if (!STRCMP (s_path, "/")) paulo@0: { paulo@0: /* *authorized = TRUE; */ paulo@0: if (HTTP_DEBUG) paulo@0: GT->DBGFN (GT, "received unimplemented Browse Host request"); paulo@0: paulo@0: return NULL; paulo@0: } paulo@0: paulo@0: if (authorized) paulo@0: *authorized = FALSE; paulo@0: paulo@0: if (!(path0 = path = STRDUP (s_path))) paulo@0: return NULL; paulo@0: paulo@0: if (HTTP_DEBUG) paulo@0: GT->dbg (GT, "path=%s", path); paulo@0: paulo@0: /* get rid of leading slash */ paulo@0: string_sep (&path, "/"); paulo@0: namespace = string_sep (&path, "/"); paulo@0: paulo@0: if (!namespace || !path) paulo@0: { paulo@0: GT->DBGFN (GT, "null namespace or path: %s %s\n", namespace, path); paulo@0: free (path0); paulo@0: return NULL; paulo@0: } paulo@0: paulo@0: /* paulo@0: * If the client supplied "X-Gnutella-Content-URN: ", lookup paulo@0: * by that instead of the request. paulo@0: */ paulo@0: content_urns = dataset_lookupstr (xfer->header, "x-gnutella-content-urn"); paulo@0: paulo@0: if (content_urns) paulo@0: file = lookup_urns (xfer, content_urns); paulo@0: else if (!strcasecmp (namespace, "get")) paulo@0: file = lookup_index (xfer, path); paulo@0: else if (!strcasecmp (namespace, "uri-res")) paulo@0: file = lookup_uri_res (xfer, path); paulo@0: else paulo@0: file = lookup_hpath (namespace, xfer, path); paulo@0: paulo@0: /* paulo@0: * Set xfer->content_urn [which replies with 'X-Gnutella-Content-URN'] paulo@0: * to a comma-separated list of all URNs for this file. paulo@0: */ paulo@0: xfer->content_urns = gt_share_local_get_urns (file); paulo@0: paulo@0: if (!file) paulo@0: { paulo@0: if (HTTP_DEBUG) paulo@0: GT->DBGFN (GT, "bad request: /%s/%s", namespace, path); paulo@0: paulo@0: free (path0); paulo@0: return NULL; paulo@0: } paulo@0: paulo@0: free (path0); paulo@0: paulo@0: if (!share_complete (file)) paulo@0: return NULL; paulo@0: paulo@0: /* argh, need to return static data */ paulo@0: snprintf (open_path, sizeof (open_path) - 1, "%s", share_get_hpath (file)); paulo@0: paulo@0: /* try to fill in the hash */ paulo@0: xfer->hash = share_dsp_hash (file, "SHA1"); paulo@0: paulo@0: return open_path; paulo@0: } paulo@0: paulo@0: /******************************************************************************/ paulo@0: paulo@0: static char *index_request (char *request, size_t size, paulo@0: uint32_t index, const char *filename) paulo@0: { paulo@0: /* paulo@0: * The filename may not have ever been set. Fail the request in paulo@0: * this case. paulo@0: */ paulo@0: if (!filename || string_isempty (filename)) paulo@0: return NULL; paulo@0: paulo@0: /* paulo@0: * Filename is encoded, we don't support sending unecoded requests paulo@0: * anymore. NOTE: filename doesnt have leading '/' here, that may change paulo@0: */ paulo@0: snprintf (request, size - 1, "/get/%u/%s", index, filename); paulo@0: return request; paulo@0: } paulo@0: paulo@0: /* paulo@0: * Setup a request string. Try request-by-hash (/uri-res/N2R?urn:sha1:..), paulo@0: * but if there are any problems, fallback to a "/get//" paulo@0: * request. paulo@0: */ paulo@0: static char *request_str (Source *source, uint32_t index, char *filename) paulo@0: { paulo@0: static char request[RW_BUFFER]; paulo@0: char *hash = source->hash; paulo@0: GtSource *gt; paulo@0: paulo@0: gt = source->udata; paulo@0: assert (gt != NULL); paulo@0: paulo@0: /* paulo@0: * Do a uri-res request unless one has failed already or paulo@0: * if we have no filename and thus no choice but to use the hash. paulo@0: * (doesn't happen currently but will for download-mesh sources paulo@0: * that only have the hash and no filename) paulo@0: */ paulo@0: if (hash && (!gt->uri_res_failed || string_isempty (filename))) paulo@0: { paulo@0: char *str0, *str; paulo@0: paulo@0: if (!(str0 = STRDUP (hash))) paulo@0: return index_request (request, sizeof (request), index, filename); paulo@0: paulo@0: str = str0; paulo@0: string_sep (&str, ":"); paulo@0: paulo@0: /* hashes are canonically uppercase on the gnet */ paulo@0: string_upper (str); paulo@0: paulo@0: if (str) paulo@0: { paulo@0: snprintf (request, sizeof (request) - 1, paulo@0: "/uri-res/N2R?urn:sha1:%s", str); paulo@0: free (str0); paulo@0: return request; paulo@0: } paulo@0: paulo@0: free (str0); paulo@0: } paulo@0: paulo@0: return index_request (request, sizeof (request), index, filename); paulo@0: } paulo@0: paulo@0: /*****************************************************************************/ paulo@0: /* PUSH HANDLING */ paulo@0: paulo@0: /* paulo@0: * This code has to deal with some tricky race conditions involving paulo@0: * chunk timeouts and pushes. paulo@0: */ paulo@0: paulo@0: static GtPushSource *gt_push_source_new (gt_guid_t *guid, in_addr_t ip, paulo@0: in_addr_t src_ip) paulo@0: { paulo@0: GtPushSource *src; paulo@0: paulo@0: if (!(src = MALLOC (sizeof (GtPushSource)))) paulo@0: return NULL; paulo@0: paulo@0: src->guid = gt_guid_dup (guid); paulo@0: src->ip = ip; paulo@0: src->src_ip = src_ip; paulo@0: src->xfers = NULL; paulo@0: src->connections = NULL; paulo@0: paulo@0: push_source_reset_last_sent (src); paulo@0: paulo@0: return src; paulo@0: } paulo@0: paulo@0: static void gt_push_source_free (GtPushSource *src) paulo@0: { paulo@0: if (!src) paulo@0: return; paulo@0: paulo@0: assert (src->xfers == NULL); paulo@0: assert (src->connections == NULL); paulo@0: paulo@0: free (src->guid); paulo@0: free (src); paulo@0: } paulo@0: paulo@0: /*****************************************************************************/ paulo@0: paulo@0: /* TODO: break this into two parts, first part looks in the paulo@0: * list for matching ip. If none is found, look for paulo@0: * a firewalled (local ip) push source. */ paulo@0: static int find_ip (GtPushSource *src, in_addr_t *ip) paulo@0: { paulo@0: /* If the source is a local IP address behind a non-local one, paulo@0: * authorize by just the client guid. Otherwise, use the IP. */ paulo@0: if (gt_is_local_ip (src->ip, src->src_ip) || src->ip == *ip) paulo@0: return 0; paulo@0: paulo@0: return -1; paulo@0: } paulo@0: paulo@0: static List *lookup_source_list (gt_guid_t *guid) paulo@0: { paulo@0: List *src_list; paulo@0: paulo@0: if (!(src_list = dataset_lookup (gt_push_requests, guid, 16))) paulo@0: return NULL; paulo@0: paulo@0: return src_list; paulo@0: } paulo@0: paulo@0: static GtPushSource *push_source_lookup (gt_guid_t *guid, in_addr_t ip) paulo@0: { paulo@0: List *requests; paulo@0: List *list; paulo@0: paulo@0: if (!(requests = lookup_source_list (guid))) paulo@0: return NULL; paulo@0: paulo@0: list = list_find_custom (requests, &ip, (ListForeachFunc)find_ip); paulo@0: return list_nth_data (list, 0); paulo@0: } paulo@0: paulo@0: /*****************************************************************************/ paulo@0: paulo@0: static void insert_source_list (gt_guid_t *guid, List *src_list) paulo@0: { paulo@0: if (!gt_push_requests) paulo@0: gt_push_requests = dataset_new (DATASET_HASH); paulo@0: paulo@0: dataset_insert (>_push_requests, guid, 16, src_list, 0); paulo@0: } paulo@0: paulo@0: static void add_push_source (List *pushes, gt_guid_t *guid, in_addr_t ip, paulo@0: in_addr_t src_ip) paulo@0: { paulo@0: GtPushSource *src; paulo@0: List *old_list; paulo@0: paulo@0: if (!(src = gt_push_source_new (guid, ip, src_ip))) paulo@0: return; paulo@0: paulo@0: if ((old_list = list_find_custom (pushes, &ip, (ListForeachFunc)find_ip))) paulo@0: { paulo@0: /* push source is already there */ paulo@0: gt_push_source_free (src); paulo@0: return; paulo@0: } paulo@0: paulo@0: pushes = list_prepend (pushes, src); paulo@0: insert_source_list (guid, pushes); paulo@0: } paulo@0: paulo@0: void gt_push_source_add (gt_guid_t *guid, in_addr_t ip, in_addr_t src_ip) paulo@0: { paulo@0: List *pushes; paulo@0: paulo@0: pushes = lookup_source_list (guid); paulo@0: add_push_source (pushes, guid, ip, src_ip); paulo@0: } paulo@0: paulo@0: /*****************************************************************************/ paulo@0: /* Timing controls for push requests */ paulo@0: paulo@0: static void push_source_reset_last_sent (GtPushSource *push_src) paulo@0: { paulo@0: push_src->last_sent = gt_uptime (); /* wrong */ paulo@0: push_src->defer_time = 0.0; paulo@0: } paulo@0: paulo@0: static void push_source_set_last_sent (gt_guid_t *guid, in_addr_t ip) paulo@0: { paulo@0: GtPushSource *src; paulo@0: paulo@0: if (!(src = push_source_lookup (guid, ip))) paulo@0: return; paulo@0: paulo@0: time (&src->last_sent); paulo@0: } paulo@0: paulo@0: static BOOL push_source_should_send (gt_guid_t *guid, in_addr_t ip) paulo@0: { paulo@0: GtPushSource *src; paulo@0: double deferred; paulo@0: time_t now; paulo@0: paulo@0: time (&now); paulo@0: paulo@0: if (!(src = push_source_lookup (guid, ip))) paulo@0: return FALSE; paulo@0: paulo@0: deferred = difftime (now, src->last_sent); paulo@0: paulo@0: /* randomize the defer time a bit in order to not send pushes for all paulo@0: * downloads at once */ paulo@0: if (deferred < src->defer_time + -10.0 + 20.0 * rand() / (RAND_MAX + 1.0)) paulo@0: return FALSE; paulo@0: paulo@0: /* paulo@0: * Double the defer time interval (the minimum time between sent paulo@0: * pushes for this source). paulo@0: */ paulo@0: src->defer_time *= 2; paulo@0: if (src->defer_time >= PUSH_MAX_DEFER_TIME) paulo@0: src->defer_time = PUSH_MAX_DEFER_TIME; paulo@0: paulo@0: if (src->defer_time == 0) paulo@0: src->defer_time = PUSH_MIN_DEFER_TIME; paulo@0: paulo@0: return TRUE; paulo@0: } paulo@0: paulo@0: /*****************************************************************************/ paulo@0: paulo@0: static void flush_inputs (TCPC *c) paulo@0: { paulo@0: int ret; paulo@0: paulo@0: assert (c->fd >= 0); paulo@0: paulo@0: /* queued writes arent used by the HTTP system in this plugin, paulo@0: * so this should always be true */ paulo@0: ret = tcp_flush (c, TRUE); paulo@0: assert (ret == 0); paulo@0: paulo@0: input_remove_all (c->fd); paulo@0: } paulo@0: paulo@0: static void continue_download (GtPushSource *push_src, GtTransfer *xfer, paulo@0: TCPC *c) paulo@0: { paulo@0: Chunk *chunk; paulo@0: paulo@0: chunk = gt_transfer_get_chunk (xfer); paulo@0: paulo@0: /* remove all previous inputs */ paulo@0: flush_inputs (c); paulo@0: paulo@0: /* HACK: remove the detach timeout */ paulo@0: timer_remove_zero (&xfer->detach_timer); paulo@0: paulo@0: /* connect the TCPC and GtTransfer */ paulo@0: gt_transfer_set_tcpc (xfer, c); paulo@0: paulo@0: /* update the IP and port for placing in Host: */ paulo@0: peer_addr (c->fd, &xfer->ip, &xfer->port); paulo@0: paulo@0: /* the connection and the chunk have met up */ paulo@0: gt_transfer_status (xfer, SOURCE_WAITING, "Received GIV response"); paulo@0: paulo@0: if (HTTP_DEBUG) paulo@0: GT->DBGSOCK (GT, c, "Continuing download for %s", xfer->request); paulo@0: paulo@0: input_add (c->fd, xfer, INPUT_WRITE, paulo@0: (InputCallback)gt_http_client_start, TIMEOUT_DEF); paulo@0: } paulo@0: paulo@0: static void reset_conn (int fd, input_id id, TCPC *c) paulo@0: { paulo@0: /* paulo@0: * We should only get here if some data was sent, or if it timed out. In paulo@0: * which case we should close this connection, because it shouldn't be paulo@0: * sending anything. paulo@0: */ paulo@0: if (HTTP_DEBUG) paulo@0: { paulo@0: if (fd == -1) paulo@0: GT->DBGSOCK (GT, c, "connection timed out"); paulo@0: else paulo@0: GT->DBGSOCK (GT, c, "connection closed or sent invalid data"); paulo@0: } paulo@0: paulo@0: gt_push_source_remove_conn (c); paulo@0: tcp_close (c); paulo@0: } paulo@0: paulo@0: static void store_conn (GtPushSource *src, TCPC *c) paulo@0: { paulo@0: flush_inputs (c); paulo@0: paulo@0: input_add (c->fd, c, INPUT_READ, paulo@0: (InputCallback)reset_conn, PUSH_LIMBO_TIMEOUT); paulo@0: paulo@0: assert (!list_find (src->connections, c)); paulo@0: src->connections = list_prepend (src->connections, c); paulo@0: paulo@0: if (HTTP_DEBUG) paulo@0: GT->DBGSOCK (GT, c, "storing connection"); paulo@0: } paulo@0: paulo@0: BOOL gt_push_source_add_conn (gt_guid_t *guid, in_addr_t ip, TCPC *c) paulo@0: { paulo@0: GtTransfer *xfer; paulo@0: GtPushSource *push_src; paulo@0: paulo@0: if (!(push_src = push_source_lookup (guid, ip))) paulo@0: { paulo@0: if (HTTP_DEBUG) paulo@0: { paulo@0: GT->err (GT, "couldn't find push source %s:[%s]", paulo@0: gt_guid_str (guid), net_ip_str (ip)); paulo@0: } paulo@0: paulo@0: tcp_close (c); paulo@0: return FALSE; paulo@0: } paulo@0: paulo@0: /* paulo@0: * Don't allow too many connections in flight from the same remote user. paulo@0: */ paulo@0: if (list_length (push_src->connections) >= PUSH_MAX_LIMBO) paulo@0: { paulo@0: if (HTTP_DEBUG) paulo@0: { paulo@0: GT->DBGSOCK (GT, c, "too many push connections from %s, closing", paulo@0: gt_guid_str (guid)); paulo@0: } paulo@0: paulo@0: tcp_close (c); paulo@0: return FALSE; paulo@0: } paulo@0: paulo@0: /* paulo@0: * Since we now know this push source is alive, reset the push send paulo@0: * tracking time: in case the connection is lost, we'll resend the push paulo@0: * right away instead of waiting. paulo@0: */ paulo@0: push_source_reset_last_sent (push_src); paulo@0: paulo@0: /* paulo@0: * Store the connection if there are no GtTransfer requests from paulo@0: * giFT at the moment. paulo@0: */ paulo@0: if (!push_src->xfers) paulo@0: { paulo@0: store_conn (push_src, c); paulo@0: return FALSE; paulo@0: } paulo@0: paulo@0: xfer = list_nth_data (push_src->xfers, 0); paulo@0: push_src->xfers = list_remove (push_src->xfers, xfer); paulo@0: paulo@0: continue_download (push_src, xfer, c); paulo@0: return TRUE; paulo@0: } paulo@0: paulo@0: /* return TRUE if there's a connection residing on this push source */ paulo@0: static BOOL push_source_lookup_conn (gt_guid_t *guid, in_addr_t ip) paulo@0: { paulo@0: GtPushSource *push_src; paulo@0: paulo@0: if (!(push_src = push_source_lookup (guid, ip))) paulo@0: return FALSE; paulo@0: paulo@0: if (push_src->connections) paulo@0: { paulo@0: if (HTTP_DEBUG) paulo@0: GT->DBGFN (GT, "found push connection for %s", net_ip_str (ip)); paulo@0: paulo@0: return TRUE; paulo@0: } paulo@0: paulo@0: return FALSE; paulo@0: } paulo@0: paulo@0: static void store_xfer (GtPushSource *src, GtTransfer *xfer) paulo@0: { paulo@0: assert (!list_find (src->xfers, xfer)); paulo@0: src->xfers = list_prepend (src->xfers, xfer); paulo@0: } paulo@0: paulo@0: BOOL gt_push_source_add_xfer (gt_guid_t *guid, in_addr_t ip, paulo@0: in_addr_t src_ip, GtTransfer *xfer) paulo@0: { paulo@0: TCPC *c; paulo@0: GtPushSource *push_src; paulo@0: paulo@0: assert (xfer != NULL); paulo@0: paulo@0: /* create the source if it doesn't exist already */ paulo@0: gt_push_source_add (guid, ip, src_ip); paulo@0: paulo@0: if (!(push_src = push_source_lookup (guid, ip))) paulo@0: { paulo@0: if (HTTP_DEBUG) paulo@0: { paulo@0: GT->err (GT, "couldn't find push source (%s:[%s]) for chunk %s", paulo@0: gt_guid_str (guid), net_ip_str (ip), xfer->request); paulo@0: } paulo@0: paulo@0: return FALSE; paulo@0: } paulo@0: paulo@0: /* paulo@0: * Store the GtTransfer if there are no connections to service it paulo@0: * at the moment. paulo@0: */ paulo@0: if (!push_src->connections) paulo@0: { paulo@0: store_xfer (push_src, xfer); paulo@0: return FALSE; paulo@0: } paulo@0: paulo@0: c = list_nth_data (push_src->connections, 0); paulo@0: push_src->connections = list_remove (push_src->connections, c); paulo@0: paulo@0: continue_download (push_src, xfer, c); paulo@0: return TRUE; paulo@0: } paulo@0: paulo@0: /*****************************************************************************/ paulo@0: paulo@0: static BOOL remove_xfer (GtPushSource *src, GtTransfer *xfer) paulo@0: { paulo@0: src->xfers = list_remove (src->xfers, xfer); paulo@0: return FALSE; paulo@0: } paulo@0: paulo@0: static void remove_xfer_list (ds_data_t *key, ds_data_t *value, paulo@0: GtTransfer *xfer) paulo@0: { paulo@0: List *src_list = value->data; paulo@0: paulo@0: list_foreach (src_list, (ListForeachFunc)remove_xfer, xfer); paulo@0: } paulo@0: paulo@0: /* paulo@0: * The chunk is being cancelled, so remove it from being tracked. paulo@0: * paulo@0: * After this, if the push recipient connects, we will have to wait paulo@0: * for another chunk timeout before transmitting. paulo@0: */ paulo@0: void gt_push_source_remove_xfer (GtTransfer *xfer) paulo@0: { paulo@0: if (!xfer) paulo@0: return; paulo@0: paulo@0: dataset_foreach (gt_push_requests, DS_FOREACH(remove_xfer_list), xfer); paulo@0: } paulo@0: paulo@0: static BOOL remove_conn (GtPushSource *src, TCPC *c) paulo@0: { paulo@0: src->connections = list_remove (src->connections, c); paulo@0: return FALSE; paulo@0: } paulo@0: paulo@0: static void remove_conn_list (ds_data_t *key, ds_data_t *value, TCPC *c) paulo@0: { paulo@0: List *src_list = value->data; paulo@0: paulo@0: list_foreach (src_list, (ListForeachFunc)remove_conn, c); paulo@0: } paulo@0: paulo@0: /* paulo@0: * The connection from this push download closed paulo@0: */ paulo@0: void gt_push_source_remove_conn (TCPC *c) paulo@0: { paulo@0: if (!c) paulo@0: return; paulo@0: paulo@0: dataset_foreach (gt_push_requests, DS_FOREACH(remove_conn_list), c); paulo@0: } paulo@0: paulo@0: static BOOL cleanup_xfer (GtTransfer *xfer, void *udata) paulo@0: { paulo@0: gt_push_source_remove_xfer (xfer); paulo@0: return TRUE; paulo@0: } paulo@0: paulo@0: static BOOL cleanup_conn (TCPC *c, void *udata) paulo@0: { paulo@0: gt_push_source_remove_conn (c); paulo@0: tcp_close (c); paulo@0: return TRUE; paulo@0: } paulo@0: paulo@0: static void remove_push_source (GtPushSource *src) paulo@0: { paulo@0: List *src_list; paulo@0: paulo@0: src_list = lookup_source_list (src->guid); paulo@0: src_list = list_remove (src_list, src); paulo@0: paulo@0: insert_source_list (src->guid, src_list); paulo@0: } paulo@0: paulo@0: void gt_push_source_remove (gt_guid_t *guid, in_addr_t ip, in_addr_t src_ip) paulo@0: { paulo@0: GtPushSource *src; paulo@0: paulo@0: if (!(src = push_source_lookup (guid, ip))) paulo@0: return; paulo@0: paulo@0: /* cleanup all the chunks and connections */ paulo@0: src->xfers = paulo@0: list_foreach_remove (src->xfers, (ListForeachFunc)cleanup_xfer, paulo@0: NULL); paulo@0: src->connections = paulo@0: list_foreach_remove (src->connections, (ListForeachFunc)cleanup_conn, paulo@0: NULL); paulo@0: paulo@0: /* remove this source from the global list */ paulo@0: remove_push_source (src); paulo@0: paulo@0: gt_push_source_free (src); paulo@0: } paulo@0: paulo@0: /*****************************************************************************/ paulo@0: paulo@0: static BOOL detach_timeout (void *udata) paulo@0: { paulo@0: GtTransfer *xfer = udata; paulo@0: paulo@0: /* Added this on 2004-12-22 to track observed assertion failure in paulo@0: * gt_transfer_get_chunk. -- mkern paulo@0: */ paulo@0: if (!xfer->chunk || xfer->chunk->udata != xfer) paulo@0: { paulo@0: GT->DBGFN (GT, "Detach timeout troubles. status = %d, " paulo@0: "text = %s, xfer->ip = %s, " paulo@0: "xfer = %p, xfer->chunk->udata = %p, " paulo@0: "xfer->detach_timer = 0x%X", paulo@0: xfer->detach_status, xfer->detach_msgtxt, paulo@0: net_ip_str (xfer->ip), xfer, paulo@0: xfer->chunk->udata, xfer->detach_timer); paulo@0: } paulo@0: paulo@0: /* Sometimes gt_transfer_status will trigger an paulo@0: * assert (xfer->chunk->udata == xfer) failure in gt_transfer_get_chunk. paulo@0: * But why? Is xfer already freed? Does it have another chunk and the paulo@0: * timer was not removed? paulo@0: */ paulo@0: gt_transfer_status (xfer, xfer->detach_status, xfer->detach_msgtxt); paulo@0: gt_transfer_close (xfer, TRUE); paulo@0: paulo@0: return FALSE; paulo@0: } paulo@0: paulo@0: /* paulo@0: * Attach a timer that will "detach" the GtTransfer from the Chunk by paulo@0: * cancelling it, but pretend that the Source is in some other state besides paulo@0: * "cancelled" or "timed out" by changing the status text. paulo@0: * paulo@0: * This is useful to keep a semi-consistent UI in certain situations, such as paulo@0: * sending out push requests, and cancelling requests when the remote side has paulo@0: * queued our request. paulo@0: */ paulo@0: static void detach_transfer_in (GtTransfer *xfer, SourceStatus status, paulo@0: char *status_txt, time_t interval) paulo@0: { paulo@0: char *msg; paulo@0: paulo@0: msg = STRDUP (status_txt); paulo@0: paulo@0: gt_transfer_status (xfer, status, msg); paulo@0: xfer->detach_status = status; paulo@0: paulo@0: free (xfer->detach_msgtxt); paulo@0: xfer->detach_msgtxt = msg; paulo@0: paulo@0: xfer->detach_timer = timer_add (interval, paulo@0: (TimerCallback)detach_timeout, xfer); paulo@0: } paulo@0: paulo@0: /* paulo@0: * Detach an GtTransfer from its Chunk. paulo@0: */ paulo@0: static void detach_transfer (GtTransfer *xfer, SourceStatus status, paulo@0: char *msgtxt) paulo@0: { paulo@0: /* paulo@0: * Cancelling from p->download_start will cause download_pause() to crash. paulo@0: * So, the detach must happen from timer context. paulo@0: */ paulo@0: detach_transfer_in (xfer, status, msgtxt, 2 * SECONDS); paulo@0: } paulo@0: paulo@0: /*****************************************************************************/ paulo@0: paulo@0: static void send_push (GtTransfer *xfer, GtSource *gt, TCPC *server) paulo@0: { paulo@0: GtPacket *packet; paulo@0: paulo@0: if (!(packet = gt_packet_new (GT_MSG_PUSH, PUSH_MAX_TTL, NULL))) paulo@0: return; paulo@0: paulo@0: gt_packet_put_ustr (packet, gt->guid, 16); paulo@0: gt_packet_put_uint32 (packet, gt->index); paulo@0: gt_packet_put_ip (packet, GT_NODE(server)->my_ip); paulo@0: gt_packet_put_port (packet, GT_SELF->gt_port); paulo@0: paulo@0: if (gt_packet_error (packet)) paulo@0: { paulo@0: gt_packet_free (packet); paulo@0: return; paulo@0: } paulo@0: paulo@0: gt_packet_send (server, packet); paulo@0: gt_packet_free (packet); paulo@0: paulo@0: /* paulo@0: * Don't wait for the whole Chunk timeout -- that keeps the Chunk paulo@0: * occupied for too long if there are other active sources (the Chunk paulo@0: * also times out longer and longer each time, so this gets worse paulo@0: * the longer the transfer is inactive). paulo@0: * paulo@0: * This is really an infelicity of the Chunk system. paulo@0: */ paulo@0: detach_transfer_in (xfer, SOURCE_QUEUED_REMOTE, "Sent PUSH request", paulo@0: PUSH_WAIT_INTERVAL); paulo@0: paulo@0: /* set the last time we sent a push to now */ paulo@0: push_source_set_last_sent (gt->guid, gt->user_ip); paulo@0: } paulo@0: paulo@0: static BOOL send_push_to_server (in_addr_t server_ip, in_port_t server_port, paulo@0: GtTransfer *xfer, GtSource *gt) paulo@0: { paulo@0: GtNode *server; paulo@0: paulo@0: if (!(server = gt_node_lookup (server_ip, server_port))) paulo@0: { paulo@0: server = gt_node_register (server_ip, server_port, paulo@0: GT_NODE_ULTRA); paulo@0: } paulo@0: paulo@0: if (!server) paulo@0: { paulo@0: GT->err (GT, "couldn't register server"); paulo@0: return FALSE; paulo@0: } paulo@0: paulo@0: if (server->state & (GT_NODE_CONNECTED | GT_NODE_CONNECTING_2)) paulo@0: { paulo@0: assert (GT_CONN(server) != NULL); paulo@0: paulo@0: /* Server is in a state for receiving packets -- send the push */ paulo@0: send_push (xfer, gt, GT_CONN(server)); paulo@0: return TRUE; paulo@0: } paulo@0: else if (server->state & GT_NODE_CONNECTING_1) paulo@0: { paulo@0: /* dont try to connect again; wait till we're connected */ paulo@0: return FALSE; paulo@0: } paulo@0: else if (gt_conn_need_connections (GT_NODE_ULTRA) > 0 && paulo@0: !server->tried_connect && paulo@0: gt_connect (server) >= 0) paulo@0: { paulo@0: /* paulo@0: * We've tried to connect to the server so we could deliver the push paulo@0: * request eventually NOTE: this doesnt send a push until the next paulo@0: * chunk timeout. paulo@0: */ paulo@0: return FALSE; paulo@0: } paulo@0: paulo@0: return FALSE; paulo@0: } paulo@0: paulo@0: static void handle_push_download (Chunk *chunk, GtTransfer *xfer, GtSource *gt) paulo@0: { paulo@0: GtNode *server; paulo@0: paulo@0: /* paulo@0: * If this succeeds, we already have a connection to this paulo@0: * user and the transfer will continue by using that connection. paulo@0: * paulo@0: * TODO: the gt_push_source_add() should be used by some paulo@0: * per-source data structure paulo@0: */ paulo@0: if (gt_push_source_add_xfer (gt->guid, gt->user_ip, gt->server_ip, xfer)) paulo@0: return; paulo@0: paulo@0: /* paulo@0: * Dont send pushes too often. Maybe should use a global queue instead. paulo@0: * paulo@0: * NOTE: we can't free the xfer here because we have stored it. paulo@0: */ paulo@0: if (push_source_should_send (gt->guid, gt->user_ip) == FALSE) paulo@0: { paulo@0: /* don't occupy the Chunk forever */ paulo@0: detach_transfer_in (xfer, SOURCE_QUEUED_REMOTE, "Awaiting connection", paulo@0: PUSH_WAIT_INTERVAL); paulo@0: return; paulo@0: } paulo@0: paulo@0: /* paulo@0: * Next, try to find the server that supplied this result, paulo@0: * and send them a push. paulo@0: */ paulo@0: if (send_push_to_server (gt->server_ip, gt->server_port, xfer, gt)) paulo@0: return; paulo@0: paulo@0: /* paulo@0: * Finally, try sending to a random connected server. paulo@0: * paulo@0: * TODO: these should be rate-limited, either globally or paulo@0: * per-source. paulo@0: */ paulo@0: if ((server = gt_conn_random (GT_NODE_ULTRA, GT_NODE_CONNECTED))) paulo@0: { paulo@0: send_push_to_server (server->ip, server->gt_port, xfer, gt); paulo@0: return; paulo@0: } paulo@0: paulo@0: detach_transfer (xfer, SOURCE_QUEUED_REMOTE, "No PUSH route"); paulo@0: } paulo@0: paulo@0: static BOOL set_request (GtTransfer *xfer, Chunk *chunk, Source *source, paulo@0: GtSource *gt_src) paulo@0: { paulo@0: char *request; paulo@0: paulo@0: if (!chunk || !xfer) paulo@0: return FALSE; paulo@0: paulo@0: request = request_str (source, gt_src->index, gt_src->filename); paulo@0: paulo@0: if (!gt_transfer_set_request (xfer, request)) paulo@0: { paulo@0: GT->DBGFN (GT, "UI made an invalid request for '%s'", request); paulo@0: return FALSE; paulo@0: } paulo@0: paulo@0: /* connect the xfer and the chunk */ paulo@0: gt_transfer_set_chunk (xfer, chunk); paulo@0: paulo@0: return TRUE; paulo@0: } paulo@0: paulo@0: static BOOL should_push (GtSource *gt) paulo@0: { paulo@0: TCPC *persistent; paulo@0: paulo@0: /* we cannot push if there is no guid to send the push to */ paulo@0: if (gt_guid_is_empty (gt->guid)) paulo@0: return FALSE; paulo@0: paulo@0: persistent = gt_http_connection_lookup (GT_TRANSFER_DOWNLOAD, paulo@0: gt->user_ip, paulo@0: gt->user_port); paulo@0: paulo@0: /* need to close the connection to re-add it to the list, because paulo@0: * _lookup removes it from the persistent connection list */ paulo@0: gt_http_connection_close (GT_TRANSFER_DOWNLOAD, persistent, FALSE); paulo@0: paulo@0: /* if we already have a connection don't send a push */ paulo@0: if (persistent) paulo@0: return FALSE; paulo@0: paulo@0: /* now check for a persistent "pushed" connection, which would be stored paulo@0: * separately from a directly connected one */ paulo@0: if (push_source_lookup_conn (gt->guid, gt->user_ip)) paulo@0: return TRUE; paulo@0: paulo@0: /* send a push if the IP is local */ paulo@0: if (gt_is_local_ip (gt->user_ip, gt->server_ip)) paulo@0: return TRUE; paulo@0: paulo@0: /* don't send a push if we cannot receive incoming connections */ paulo@0: if (gt_bind_is_firewalled()) paulo@0: return FALSE; paulo@0: paulo@0: /* send a push if they set the firewalled bit */ paulo@0: if (gt->firewalled) paulo@0: return TRUE; paulo@0: paulo@0: /* the last connection attempt failed, so try a push */ paulo@0: if (gt->connect_failed) paulo@0: return TRUE; paulo@0: paulo@0: return FALSE; paulo@0: } paulo@0: paulo@0: static void handle_download (Chunk *chunk, GtTransfer *xfer, GtSource *gt) paulo@0: { paulo@0: /* paulo@0: * Send a push, or connect directly. paulo@0: */ paulo@0: if (should_push (gt)) paulo@0: { paulo@0: /* (possibly) retry a connection attempt next time */ paulo@0: gt->connect_failed = FALSE; paulo@0: paulo@0: handle_push_download (chunk, xfer, gt); paulo@0: } paulo@0: else paulo@0: { paulo@0: gt_http_client_get (chunk, xfer); paulo@0: } paulo@0: } paulo@0: paulo@0: static BOOL download_is_queued (GtSource *gt) paulo@0: { paulo@0: /* back out if the request is still too early */ paulo@0: if (time (NULL) < gt->retry_time) paulo@0: return TRUE; paulo@0: paulo@0: return FALSE; paulo@0: } paulo@0: paulo@0: int gnutella_download_start (Protocol *p, Transfer *transfer, Chunk *chunk, paulo@0: Source *source) paulo@0: { paulo@0: GtTransfer *xfer; paulo@0: GtSource *gt; paulo@0: off_t start; paulo@0: off_t stop; paulo@0: paulo@0: gt = source->udata; paulo@0: assert (gt != NULL); paulo@0: paulo@0: /* giftd should send us only deactivated Chunks */ paulo@0: assert (chunk->udata == NULL); paulo@0: paulo@0: /* free the Source URL and update it with any format changes */ paulo@0: replace_url (source, gt); paulo@0: paulo@0: /* thank you, pretender :) */ paulo@0: start = chunk->start + chunk->transmit; paulo@0: stop = chunk->stop; paulo@0: paulo@0: if (!(xfer = gt_transfer_new (GT_TRANSFER_DOWNLOAD, source, paulo@0: gt->user_ip, gt->user_port, start, stop))) paulo@0: { paulo@0: GT->DBGFN (GT, "gt_transfer_new failed"); paulo@0: return FALSE; paulo@0: } paulo@0: paulo@0: if (!set_request (xfer, chunk, source, gt)) paulo@0: { paulo@0: gt_transfer_close (xfer, TRUE); paulo@0: return FALSE; paulo@0: } paulo@0: paulo@0: if (download_is_queued (gt)) paulo@0: { paulo@0: detach_transfer (xfer, SOURCE_QUEUED_REMOTE, gt->status_txt); paulo@0: return TRUE; paulo@0: } paulo@0: paulo@0: handle_download (chunk, xfer, gt); paulo@0: paulo@0: return TRUE; paulo@0: } paulo@0: paulo@0: void gnutella_download_stop (Protocol *p, Transfer *transfer, Chunk *chunk, paulo@0: Source *source, BOOL complete) paulo@0: { paulo@0: gt_download_cancel (chunk, NULL); paulo@0: } paulo@0: paulo@0: int gnutella_upload_start (Protocol *p, Transfer *transfer, Chunk *chunk, paulo@0: Source *source, unsigned long avail) paulo@0: { paulo@0: return TRUE; paulo@0: } paulo@0: paulo@0: void gnutella_upload_stop (Protocol *p, Transfer *transfer, Chunk *chunk, paulo@0: Source *source) paulo@0: { paulo@0: gt_upload_cancel (chunk, NULL); paulo@0: } paulo@0: paulo@0: int gnutella_chunk_suspend (Protocol *p, Transfer *transfer, Chunk *chunk, paulo@0: Source *source) paulo@0: { paulo@0: return gt_chunk_suspend (chunk, transfer, NULL); paulo@0: } paulo@0: paulo@0: int gnutella_chunk_resume (Protocol *p, Transfer *transfer, Chunk *chunk, paulo@0: Source *source) paulo@0: { paulo@0: return gt_chunk_resume (chunk, transfer, NULL); paulo@0: }