Mercurial > hg > index.fcgi > gift-gnutella > gift-gnutella-0.0.11-1pba
diff src/io/tx_packet.c @ 0:d39e1d0d75b6
initial add
author | paulo@hit-nxdomain.opendns.com |
---|---|
date | Sat, 20 Feb 2010 21:18:28 -0800 |
parents | |
children |
line diff
1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/src/io/tx_packet.c Sat Feb 20 21:18:28 2010 -0800 1.3 @@ -0,0 +1,415 @@ 1.4 +/* 1.5 + * $Id: tx_packet.c,v 1.10 2005/01/04 14:59:23 mkern Exp $ 1.6 + * 1.7 + * Copyright (C) 2004 giFT project (gift.sourceforge.net) 1.8 + * 1.9 + * This program is free software; you can redistribute it and/or modify it 1.10 + * under the terms of the GNU General Public License as published by the 1.11 + * Free Software Foundation; either version 2, or (at your option) any 1.12 + * later version. 1.13 + * 1.14 + * This program is distributed in the hope that it will be useful, but 1.15 + * WITHOUT ANY WARRANTY; without even the implied warranty of 1.16 + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 1.17 + * General Public License for more details. 1.18 + */ 1.19 + 1.20 +#include "gt_gnutella.h" 1.21 +#include "gt_packet.h" /* packet manipulation macros */ 1.22 + 1.23 +#include "io/tx_stack.h" 1.24 +#include "io/tx_layer.h" 1.25 +#include "io/io_buf.h" 1.26 + 1.27 +/*****************************************************************************/ 1.28 + 1.29 +#define TX_PACKET_DEBUG 0 1.30 + 1.31 +/*****************************************************************************/ 1.32 + 1.33 +/* 1.34 + * Relative packet priority ratios. These control how many packets of a 1.35 + * certain type are sent prior to looking for other types. For each type we 1.36 + * maintain an independent FIFO queue. Each time a packet can be sent, each 1.37 + * queue is checked. 1.38 + * 1.39 + * Packets in a queue will be sent while the ratio is greater than zero, and 1.40 + * there are no higher priority packets waiting. Once there are no queues 1.41 + * with both waiting packets and a non-zero ratio, the queue priority ratios 1.42 + * are reset so that more packets can be sent. This process continues until 1.43 + * the lower layer becomes saturated. 1.44 + * 1.45 + * Note that it is bad idea to reset the priority ratios only when all the 1.46 + * ratios are zero, because this could lead to starvation for some packet 1.47 + * types. 1.48 + * 1.49 + * Pushes have the highest priorty of normal messages. Also, there is a 1.50 + * special 'urgent' queue that has higher priority, that includes replies to 1.51 + * keepalive pings and other important high-priority messages. 1.52 + */ 1.53 +#define URGENT_RATIO INT_MAX 1.54 +#define PUSH_RATIO 5 1.55 +#define QHIT_RATIO 4 1.56 +#define QUERY_RATIO 3 1.57 +#define PONG_RATIO 2 1.58 +#define PING_RATIO 1 1.59 +#define MISC_RATIO 1 1.60 + 1.61 +#define NR_QUEUES (7) 1.62 + 1.63 +/*****************************************************************************/ 1.64 + 1.65 +struct packet_queue 1.66 +{ 1.67 + gt_packet_type_t msg_type; 1.68 + size_t ratio; /* how many packets left on this turn? */ 1.69 + size_t bytes_queued; /* total bytes queued */ 1.70 + List *queue; 1.71 +}; 1.72 + 1.73 +struct tx_packet 1.74 +{ 1.75 + struct packet_queue queues[NR_QUEUES]; 1.76 + int total_pkts; /* used to quickly test if empty */ 1.77 +}; 1.78 + 1.79 +/*****************************************************************************/ 1.80 +/* DEBUGGING/TRACING */ 1.81 + 1.82 +#if TX_PACKET_DEBUG 1.83 +/* ripped from gt_packet.c */ 1.84 +static const char *packet_command_str (uint8_t cmd) 1.85 +{ 1.86 + static char buf[16]; 1.87 + 1.88 + switch (cmd) 1.89 + { 1.90 + case GT_MSG_PING: return "PING"; 1.91 + case GT_MSG_PING_REPLY: return "PONG"; 1.92 + case GT_MSG_BYE: return "BYE"; 1.93 + case GT_MSG_QUERY_ROUTE: return "QROUTE"; 1.94 + case GT_MSG_VENDOR: return "VMSG"; 1.95 + case GT_MSG_VENDOR_STD: return "VMSG-S"; 1.96 + case GT_MSG_PUSH: return "PUSH"; 1.97 + case GT_MSG_QUERY: return "QUERY"; 1.98 + case GT_MSG_QUERY_REPLY: return "HITS"; 1.99 + 1.100 + default: 1.101 + snprintf (buf, sizeof (buf), "[<%02hx>]", cmd); 1.102 + return buf; 1.103 + } 1.104 +} 1.105 + 1.106 +static void dump_packet (struct io_buf *buf, String *str) 1.107 +{ 1.108 + uint8_t cmd = get_command (buf->data); 1.109 + string_appendf (str, "%s,", packet_command_str (cmd)); 1.110 +} 1.111 + 1.112 +static void trace_queue_list (List *queue, String *str) 1.113 +{ 1.114 + list_foreach (queue, (ListForeachFunc)dump_packet, str); 1.115 +} 1.116 +#endif /* TX_PACKET_DEBUG */ 1.117 + 1.118 +static void trace_queue (struct tx_layer *tx, const char *id) 1.119 +{ 1.120 +#if TX_PACKET_DEBUG 1.121 + struct tx_packet *tx_packet = tx->udata; 1.122 + int i; 1.123 + String *s; 1.124 + TCPC *c; 1.125 + 1.126 + if (!(s = string_new (NULL, 0, 0, TRUE))) 1.127 + return; 1.128 + 1.129 + c = tx->stack->c; 1.130 + string_appendf (s, "{%s totalpkts=%d ", id, tx_packet->total_pkts); 1.131 + 1.132 + for (i = 0; i < NR_QUEUES; i++) 1.133 + trace_queue_list (tx_packet->queues[i].queue, s); 1.134 + 1.135 + string_append (s, "}"); 1.136 + 1.137 + GT->DBGSOCK (GT, c, "%s", s->str); 1.138 + string_free (s); 1.139 +#endif 1.140 +} 1.141 + 1.142 +/*****************************************************************************/ 1.143 + 1.144 +/* return the queue on which this message should go */ 1.145 +static size_t get_queue (struct io_buf *msg) 1.146 +{ 1.147 + uint8_t cmd; 1.148 + 1.149 + cmd = get_command (msg->data); 1.150 + 1.151 + switch (cmd) 1.152 + { 1.153 + default: 1.154 + abort (); 1.155 + 1.156 + case GT_MSG_VENDOR: 1.157 + case GT_MSG_VENDOR_STD: 1.158 + case GT_MSG_QUERY_ROUTE: 1.159 + return 6; 1.160 + 1.161 + case GT_MSG_PING: 1.162 + { 1.163 + /* queue keep-alive pings in the urgent queue */ 1.164 + if (get_ttl (msg->data) == 1 && get_hops (msg->data) == 0) 1.165 + return 0; 1.166 + } 1.167 + return 5; 1.168 + 1.169 + case GT_MSG_PING_REPLY: 1.170 + { 1.171 + /* 1.172 + * Queue replies to keep-alive ping in the urgent queue. 1.173 + * 1.174 + * This allows the remote end to starve it's own connection 1.175 + * with a series of keep-alive pings. Only flow-control 1.176 + * can handle this. 1.177 + */ 1.178 + if (get_ttl (msg->data) == 1 && get_hops (msg->data) == 0) 1.179 + return 0; 1.180 + } 1.181 + return 4; 1.182 + 1.183 + case GT_MSG_QUERY: 1.184 + { 1.185 + /* make queries from this node more important */ 1.186 + if (get_ttl (msg->data) == 1 && get_hops (msg->data) == 0) 1.187 + return 1; 1.188 + } 1.189 + return 3; 1.190 + 1.191 + case GT_MSG_QUERY_REPLY: 1.192 + return 2; 1.193 + 1.194 + case GT_MSG_PUSH: 1.195 + return 1; 1.196 + 1.197 + case GT_MSG_BYE: 1.198 + return 0; 1.199 + } 1.200 + 1.201 + abort (); 1.202 +} 1.203 + 1.204 +static void enqueue_packet (struct packet_queue *pkt_queue, struct io_buf *msg) 1.205 +{ 1.206 + pkt_queue->queue = list_append (pkt_queue->queue, msg); 1.207 +} 1.208 + 1.209 +/* 1.210 + * Called from upper layer when it wants to send us a message buffer. 1.211 + */ 1.212 +static tx_status_t tx_packet_queue (struct tx_layer *tx, struct io_buf *io_buf) 1.213 +{ 1.214 + struct tx_packet *tx_packet = tx->udata; 1.215 + size_t queue_nr; 1.216 + 1.217 + queue_nr = get_queue (io_buf); 1.218 + 1.219 + assert (queue_nr < NR_QUEUES); 1.220 + enqueue_packet (&tx_packet->queues[queue_nr], io_buf); 1.221 + 1.222 + tx_packet->total_pkts++; 1.223 + assert (tx_packet->total_pkts > 0); 1.224 + 1.225 + trace_queue (tx, "*0*"); 1.226 + 1.227 + return TX_OK; 1.228 +} 1.229 + 1.230 +/*****************************************************************************/ 1.231 + 1.232 +static void set_queue (struct packet_queue *queue, gt_packet_type_t msg_type, 1.233 + size_t prio) 1.234 +{ 1.235 + queue->msg_type = msg_type; 1.236 + queue->ratio = prio; 1.237 +} 1.238 + 1.239 +static void reset_ratios (struct packet_queue *queue, size_t len) 1.240 +{ 1.241 + set_queue (&queue[0], 0xff, URGENT_RATIO); 1.242 + set_queue (&queue[1], GT_MSG_PUSH, PUSH_RATIO); 1.243 + set_queue (&queue[2], GT_MSG_QUERY_REPLY, QHIT_RATIO); 1.244 + set_queue (&queue[3], GT_MSG_QUERY, QUERY_RATIO); 1.245 + set_queue (&queue[4], GT_MSG_PING_REPLY, PONG_RATIO); 1.246 + set_queue (&queue[5], GT_MSG_PING, PING_RATIO); 1.247 + set_queue (&queue[6], 0xff, MISC_RATIO); 1.248 +} 1.249 + 1.250 +/*****************************************************************************/ 1.251 + 1.252 +/* 1.253 + * Try to send a single message buffer from the packet queue to the lower 1.254 + * layer. If the lower layer has become saturated, return FALSE. 1.255 + * 1.256 + * The lower layer takes responsibility for the messages sent to it in 1.257 + * entirety in gt_tx_layer_queue() unless it is full. In that case it 1.258 + * returns TX_FULL. 1.259 + */ 1.260 +static tx_status_t shift_queue (struct tx_layer *tx, 1.261 + struct tx_packet *tx_packet, 1.262 + struct packet_queue *pkt_queue) 1.263 +{ 1.264 + List *msg_l; 1.265 + struct io_buf *msg; 1.266 + tx_status_t ret; 1.267 + 1.268 + msg_l = list_nth (pkt_queue->queue, 0); 1.269 + msg = msg_l->data; 1.270 + 1.271 + ret = gt_tx_layer_queue (tx, msg); 1.272 + 1.273 + if (ret != TX_OK) 1.274 + { 1.275 + assert (ret != TX_EMPTY); /* impossible to be empty */ 1.276 + return ret; 1.277 + } 1.278 + 1.279 + /* shift this packet off the queue */ 1.280 + pkt_queue->queue = list_remove_link (pkt_queue->queue, msg_l); 1.281 + 1.282 + tx_packet->total_pkts--; 1.283 + assert (tx_packet->total_pkts >= 0); 1.284 + 1.285 + if (TX_PACKET_DEBUG) 1.286 + trace_queue (tx, "*2*"); 1.287 + 1.288 + return ret; 1.289 +} 1.290 + 1.291 +static tx_status_t service_queues (struct tx_layer *layer, 1.292 + struct tx_packet *tx_packet) 1.293 +{ 1.294 + int i; 1.295 + tx_status_t ret; 1.296 + 1.297 + for (i = 0; i < NR_QUEUES; i++) 1.298 + { 1.299 + struct packet_queue *pkt_queue = &tx_packet->queues[i]; 1.300 + 1.301 + /* skip if ratio is small */ 1.302 + while (pkt_queue->ratio > 0 && pkt_queue->queue != NULL) 1.303 + { 1.304 + ret = shift_queue (layer, tx_packet, pkt_queue); 1.305 + pkt_queue->ratio--; 1.306 + 1.307 + if (ret == TX_FULL) 1.308 + return TX_OK; 1.309 + 1.310 + if (ret != TX_OK) 1.311 + return ret; 1.312 + } 1.313 + } 1.314 + 1.315 + /* reset the ratios to write more data */ 1.316 + reset_ratios (tx_packet->queues, NR_QUEUES); 1.317 + 1.318 + /* we wrote something, so return ok */ 1.319 + if (tx_packet->total_pkts == 0) 1.320 + return TX_OK; 1.321 + 1.322 + /* tail recurse until lower layer is saturated */ 1.323 + return service_queues (layer, tx_packet); 1.324 +} 1.325 + 1.326 +/* 1.327 + * Gets called when the lower layer is writable. 1.328 + */ 1.329 +static tx_status_t tx_packet_ready (struct tx_layer *tx) 1.330 +{ 1.331 + struct tx_packet *tx_packet = tx->udata; 1.332 + 1.333 + if (tx_packet->total_pkts == 0) 1.334 + return TX_EMPTY; 1.335 + 1.336 + if (TX_PACKET_DEBUG) 1.337 + trace_queue (tx, "*1*"); 1.338 + 1.339 + return service_queues (tx, tx_packet); 1.340 +} 1.341 + 1.342 +/*****************************************************************************/ 1.343 + 1.344 +static BOOL tx_packet_init (struct tx_layer *tx) 1.345 +{ 1.346 + struct tx_packet *tx_packet; 1.347 + int i; 1.348 + 1.349 + if (!(tx_packet = malloc (sizeof (struct tx_packet)))) 1.350 + return FALSE; 1.351 + 1.352 + tx_packet->total_pkts = 0; 1.353 + 1.354 + for (i = 0; i < NR_QUEUES; i++) 1.355 + { 1.356 + tx_packet->queues[i].queue = NULL; 1.357 + tx_packet->queues[i].bytes_queued = 0; 1.358 + } 1.359 + 1.360 + reset_ratios (tx_packet->queues, NR_QUEUES); 1.361 + 1.362 + tx->udata = tx_packet; 1.363 + 1.364 + return TRUE; 1.365 +} 1.366 + 1.367 +static BOOL free_io_buf (struct io_buf *io_buf, void *udata) 1.368 +{ 1.369 + io_buf_free (io_buf); 1.370 + return TRUE; 1.371 +} 1.372 + 1.373 +static void flush_packets (struct packet_queue *pkt_queue) 1.374 +{ 1.375 + list_foreach_remove (pkt_queue->queue, (ListForeachFunc)free_io_buf, NULL); 1.376 + pkt_queue = NULL; 1.377 +} 1.378 + 1.379 +static void tx_packet_destroy (struct tx_layer *tx) 1.380 +{ 1.381 + struct tx_packet *tx_packet = tx->udata; 1.382 + int i; 1.383 + 1.384 + for (i = 0; i < NR_QUEUES; i++) 1.385 + flush_packets (&tx_packet->queues[i]); 1.386 + 1.387 + FREE (tx_packet); 1.388 +} 1.389 + 1.390 +static void tx_packet_consume (struct tx_layer *tx, BOOL stop) 1.391 +{ 1.392 + /* nothing */ 1.393 +} 1.394 + 1.395 +/*****************************************************************************/ 1.396 + 1.397 +static void tx_packet_enable (struct tx_layer *tx) 1.398 +{ 1.399 + /* TODO */ 1.400 +} 1.401 + 1.402 +static void tx_packet_disable (struct tx_layer *tx) 1.403 +{ 1.404 + /* TODO */ 1.405 +} 1.406 + 1.407 +/*****************************************************************************/ 1.408 + 1.409 +struct tx_layer_ops gt_tx_packet_ops = 1.410 +{ 1.411 + tx_packet_init, 1.412 + tx_packet_destroy, 1.413 + tx_packet_consume, 1.414 + tx_packet_queue, 1.415 + tx_packet_ready, 1.416 + tx_packet_enable, 1.417 + tx_packet_disable, 1.418 +};