annotate src/io/tx_packet.c @ 0:d39e1d0d75b6

initial add
author paulo@hit-nxdomain.opendns.com
date Sat, 20 Feb 2010 21:18:28 -0800
parents
children
rev   line source
paulo@0 1 /*
paulo@0 2 * $Id: tx_packet.c,v 1.10 2005/01/04 14:59:23 mkern Exp $
paulo@0 3 *
paulo@0 4 * Copyright (C) 2004 giFT project (gift.sourceforge.net)
paulo@0 5 *
paulo@0 6 * This program is free software; you can redistribute it and/or modify it
paulo@0 7 * under the terms of the GNU General Public License as published by the
paulo@0 8 * Free Software Foundation; either version 2, or (at your option) any
paulo@0 9 * later version.
paulo@0 10 *
paulo@0 11 * This program is distributed in the hope that it will be useful, but
paulo@0 12 * WITHOUT ANY WARRANTY; without even the implied warranty of
paulo@0 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
paulo@0 14 * General Public License for more details.
paulo@0 15 */
paulo@0 16
paulo@0 17 #include "gt_gnutella.h"
paulo@0 18 #include "gt_packet.h" /* packet manipulation macros */
paulo@0 19
paulo@0 20 #include "io/tx_stack.h"
paulo@0 21 #include "io/tx_layer.h"
paulo@0 22 #include "io/io_buf.h"
paulo@0 23
paulo@0 24 /*****************************************************************************/
paulo@0 25
paulo@0 26 #define TX_PACKET_DEBUG 0
paulo@0 27
paulo@0 28 /*****************************************************************************/
paulo@0 29
paulo@0 30 /*
paulo@0 31 * Relative packet priority ratios. These control how many packets of a
paulo@0 32 * certain type are sent prior to looking for other types. For each type we
paulo@0 33 * maintain an independent FIFO queue. Each time a packet can be sent, each
paulo@0 34 * queue is checked.
paulo@0 35 *
paulo@0 36 * Packets in a queue will be sent while the ratio is greater than zero, and
paulo@0 37 * there are no higher priority packets waiting. Once there are no queues
paulo@0 38 * with both waiting packets and a non-zero ratio, the queue priority ratios
paulo@0 39 * are reset so that more packets can be sent. This process continues until
paulo@0 40 * the lower layer becomes saturated.
paulo@0 41 *
paulo@0 42 * Note that it is bad idea to reset the priority ratios only when all the
paulo@0 43 * ratios are zero, because this could lead to starvation for some packet
paulo@0 44 * types.
paulo@0 45 *
paulo@0 46 * Pushes have the highest priorty of normal messages. Also, there is a
paulo@0 47 * special 'urgent' queue that has higher priority, that includes replies to
paulo@0 48 * keepalive pings and other important high-priority messages.
paulo@0 49 */
paulo@0 50 #define URGENT_RATIO INT_MAX
paulo@0 51 #define PUSH_RATIO 5
paulo@0 52 #define QHIT_RATIO 4
paulo@0 53 #define QUERY_RATIO 3
paulo@0 54 #define PONG_RATIO 2
paulo@0 55 #define PING_RATIO 1
paulo@0 56 #define MISC_RATIO 1
paulo@0 57
paulo@0 58 #define NR_QUEUES (7)
paulo@0 59
paulo@0 60 /*****************************************************************************/
paulo@0 61
paulo@0 62 struct packet_queue
paulo@0 63 {
paulo@0 64 gt_packet_type_t msg_type;
paulo@0 65 size_t ratio; /* how many packets left on this turn? */
paulo@0 66 size_t bytes_queued; /* total bytes queued */
paulo@0 67 List *queue;
paulo@0 68 };
paulo@0 69
paulo@0 70 struct tx_packet
paulo@0 71 {
paulo@0 72 struct packet_queue queues[NR_QUEUES];
paulo@0 73 int total_pkts; /* used to quickly test if empty */
paulo@0 74 };
paulo@0 75
paulo@0 76 /*****************************************************************************/
paulo@0 77 /* DEBUGGING/TRACING */
paulo@0 78
paulo@0 79 #if TX_PACKET_DEBUG
paulo@0 80 /* ripped from gt_packet.c */
paulo@0 81 static const char *packet_command_str (uint8_t cmd)
paulo@0 82 {
paulo@0 83 static char buf[16];
paulo@0 84
paulo@0 85 switch (cmd)
paulo@0 86 {
paulo@0 87 case GT_MSG_PING: return "PING";
paulo@0 88 case GT_MSG_PING_REPLY: return "PONG";
paulo@0 89 case GT_MSG_BYE: return "BYE";
paulo@0 90 case GT_MSG_QUERY_ROUTE: return "QROUTE";
paulo@0 91 case GT_MSG_VENDOR: return "VMSG";
paulo@0 92 case GT_MSG_VENDOR_STD: return "VMSG-S";
paulo@0 93 case GT_MSG_PUSH: return "PUSH";
paulo@0 94 case GT_MSG_QUERY: return "QUERY";
paulo@0 95 case GT_MSG_QUERY_REPLY: return "HITS";
paulo@0 96
paulo@0 97 default:
paulo@0 98 snprintf (buf, sizeof (buf), "[<%02hx>]", cmd);
paulo@0 99 return buf;
paulo@0 100 }
paulo@0 101 }
paulo@0 102
paulo@0 103 static void dump_packet (struct io_buf *buf, String *str)
paulo@0 104 {
paulo@0 105 uint8_t cmd = get_command (buf->data);
paulo@0 106 string_appendf (str, "%s,", packet_command_str (cmd));
paulo@0 107 }
paulo@0 108
paulo@0 109 static void trace_queue_list (List *queue, String *str)
paulo@0 110 {
paulo@0 111 list_foreach (queue, (ListForeachFunc)dump_packet, str);
paulo@0 112 }
paulo@0 113 #endif /* TX_PACKET_DEBUG */
paulo@0 114
paulo@0 115 static void trace_queue (struct tx_layer *tx, const char *id)
paulo@0 116 {
paulo@0 117 #if TX_PACKET_DEBUG
paulo@0 118 struct tx_packet *tx_packet = tx->udata;
paulo@0 119 int i;
paulo@0 120 String *s;
paulo@0 121 TCPC *c;
paulo@0 122
paulo@0 123 if (!(s = string_new (NULL, 0, 0, TRUE)))
paulo@0 124 return;
paulo@0 125
paulo@0 126 c = tx->stack->c;
paulo@0 127 string_appendf (s, "{%s totalpkts=%d ", id, tx_packet->total_pkts);
paulo@0 128
paulo@0 129 for (i = 0; i < NR_QUEUES; i++)
paulo@0 130 trace_queue_list (tx_packet->queues[i].queue, s);
paulo@0 131
paulo@0 132 string_append (s, "}");
paulo@0 133
paulo@0 134 GT->DBGSOCK (GT, c, "%s", s->str);
paulo@0 135 string_free (s);
paulo@0 136 #endif
paulo@0 137 }
paulo@0 138
paulo@0 139 /*****************************************************************************/
paulo@0 140
paulo@0 141 /* return the queue on which this message should go */
paulo@0 142 static size_t get_queue (struct io_buf *msg)
paulo@0 143 {
paulo@0 144 uint8_t cmd;
paulo@0 145
paulo@0 146 cmd = get_command (msg->data);
paulo@0 147
paulo@0 148 switch (cmd)
paulo@0 149 {
paulo@0 150 default:
paulo@0 151 abort ();
paulo@0 152
paulo@0 153 case GT_MSG_VENDOR:
paulo@0 154 case GT_MSG_VENDOR_STD:
paulo@0 155 case GT_MSG_QUERY_ROUTE:
paulo@0 156 return 6;
paulo@0 157
paulo@0 158 case GT_MSG_PING:
paulo@0 159 {
paulo@0 160 /* queue keep-alive pings in the urgent queue */
paulo@0 161 if (get_ttl (msg->data) == 1 && get_hops (msg->data) == 0)
paulo@0 162 return 0;
paulo@0 163 }
paulo@0 164 return 5;
paulo@0 165
paulo@0 166 case GT_MSG_PING_REPLY:
paulo@0 167 {
paulo@0 168 /*
paulo@0 169 * Queue replies to keep-alive ping in the urgent queue.
paulo@0 170 *
paulo@0 171 * This allows the remote end to starve it's own connection
paulo@0 172 * with a series of keep-alive pings. Only flow-control
paulo@0 173 * can handle this.
paulo@0 174 */
paulo@0 175 if (get_ttl (msg->data) == 1 && get_hops (msg->data) == 0)
paulo@0 176 return 0;
paulo@0 177 }
paulo@0 178 return 4;
paulo@0 179
paulo@0 180 case GT_MSG_QUERY:
paulo@0 181 {
paulo@0 182 /* make queries from this node more important */
paulo@0 183 if (get_ttl (msg->data) == 1 && get_hops (msg->data) == 0)
paulo@0 184 return 1;
paulo@0 185 }
paulo@0 186 return 3;
paulo@0 187
paulo@0 188 case GT_MSG_QUERY_REPLY:
paulo@0 189 return 2;
paulo@0 190
paulo@0 191 case GT_MSG_PUSH:
paulo@0 192 return 1;
paulo@0 193
paulo@0 194 case GT_MSG_BYE:
paulo@0 195 return 0;
paulo@0 196 }
paulo@0 197
paulo@0 198 abort ();
paulo@0 199 }
paulo@0 200
paulo@0 201 static void enqueue_packet (struct packet_queue *pkt_queue, struct io_buf *msg)
paulo@0 202 {
paulo@0 203 pkt_queue->queue = list_append (pkt_queue->queue, msg);
paulo@0 204 }
paulo@0 205
paulo@0 206 /*
paulo@0 207 * Called from upper layer when it wants to send us a message buffer.
paulo@0 208 */
paulo@0 209 static tx_status_t tx_packet_queue (struct tx_layer *tx, struct io_buf *io_buf)
paulo@0 210 {
paulo@0 211 struct tx_packet *tx_packet = tx->udata;
paulo@0 212 size_t queue_nr;
paulo@0 213
paulo@0 214 queue_nr = get_queue (io_buf);
paulo@0 215
paulo@0 216 assert (queue_nr < NR_QUEUES);
paulo@0 217 enqueue_packet (&tx_packet->queues[queue_nr], io_buf);
paulo@0 218
paulo@0 219 tx_packet->total_pkts++;
paulo@0 220 assert (tx_packet->total_pkts > 0);
paulo@0 221
paulo@0 222 trace_queue (tx, "*0*");
paulo@0 223
paulo@0 224 return TX_OK;
paulo@0 225 }
paulo@0 226
paulo@0 227 /*****************************************************************************/
paulo@0 228
paulo@0 229 static void set_queue (struct packet_queue *queue, gt_packet_type_t msg_type,
paulo@0 230 size_t prio)
paulo@0 231 {
paulo@0 232 queue->msg_type = msg_type;
paulo@0 233 queue->ratio = prio;
paulo@0 234 }
paulo@0 235
paulo@0 236 static void reset_ratios (struct packet_queue *queue, size_t len)
paulo@0 237 {
paulo@0 238 set_queue (&queue[0], 0xff, URGENT_RATIO);
paulo@0 239 set_queue (&queue[1], GT_MSG_PUSH, PUSH_RATIO);
paulo@0 240 set_queue (&queue[2], GT_MSG_QUERY_REPLY, QHIT_RATIO);
paulo@0 241 set_queue (&queue[3], GT_MSG_QUERY, QUERY_RATIO);
paulo@0 242 set_queue (&queue[4], GT_MSG_PING_REPLY, PONG_RATIO);
paulo@0 243 set_queue (&queue[5], GT_MSG_PING, PING_RATIO);
paulo@0 244 set_queue (&queue[6], 0xff, MISC_RATIO);
paulo@0 245 }
paulo@0 246
paulo@0 247 /*****************************************************************************/
paulo@0 248
paulo@0 249 /*
paulo@0 250 * Try to send a single message buffer from the packet queue to the lower
paulo@0 251 * layer. If the lower layer has become saturated, return FALSE.
paulo@0 252 *
paulo@0 253 * The lower layer takes responsibility for the messages sent to it in
paulo@0 254 * entirety in gt_tx_layer_queue() unless it is full. In that case it
paulo@0 255 * returns TX_FULL.
paulo@0 256 */
paulo@0 257 static tx_status_t shift_queue (struct tx_layer *tx,
paulo@0 258 struct tx_packet *tx_packet,
paulo@0 259 struct packet_queue *pkt_queue)
paulo@0 260 {
paulo@0 261 List *msg_l;
paulo@0 262 struct io_buf *msg;
paulo@0 263 tx_status_t ret;
paulo@0 264
paulo@0 265 msg_l = list_nth (pkt_queue->queue, 0);
paulo@0 266 msg = msg_l->data;
paulo@0 267
paulo@0 268 ret = gt_tx_layer_queue (tx, msg);
paulo@0 269
paulo@0 270 if (ret != TX_OK)
paulo@0 271 {
paulo@0 272 assert (ret != TX_EMPTY); /* impossible to be empty */
paulo@0 273 return ret;
paulo@0 274 }
paulo@0 275
paulo@0 276 /* shift this packet off the queue */
paulo@0 277 pkt_queue->queue = list_remove_link (pkt_queue->queue, msg_l);
paulo@0 278
paulo@0 279 tx_packet->total_pkts--;
paulo@0 280 assert (tx_packet->total_pkts >= 0);
paulo@0 281
paulo@0 282 if (TX_PACKET_DEBUG)
paulo@0 283 trace_queue (tx, "*2*");
paulo@0 284
paulo@0 285 return ret;
paulo@0 286 }
paulo@0 287
paulo@0 288 static tx_status_t service_queues (struct tx_layer *layer,
paulo@0 289 struct tx_packet *tx_packet)
paulo@0 290 {
paulo@0 291 int i;
paulo@0 292 tx_status_t ret;
paulo@0 293
paulo@0 294 for (i = 0; i < NR_QUEUES; i++)
paulo@0 295 {
paulo@0 296 struct packet_queue *pkt_queue = &tx_packet->queues[i];
paulo@0 297
paulo@0 298 /* skip if ratio is small */
paulo@0 299 while (pkt_queue->ratio > 0 && pkt_queue->queue != NULL)
paulo@0 300 {
paulo@0 301 ret = shift_queue (layer, tx_packet, pkt_queue);
paulo@0 302 pkt_queue->ratio--;
paulo@0 303
paulo@0 304 if (ret == TX_FULL)
paulo@0 305 return TX_OK;
paulo@0 306
paulo@0 307 if (ret != TX_OK)
paulo@0 308 return ret;
paulo@0 309 }
paulo@0 310 }
paulo@0 311
paulo@0 312 /* reset the ratios to write more data */
paulo@0 313 reset_ratios (tx_packet->queues, NR_QUEUES);
paulo@0 314
paulo@0 315 /* we wrote something, so return ok */
paulo@0 316 if (tx_packet->total_pkts == 0)
paulo@0 317 return TX_OK;
paulo@0 318
paulo@0 319 /* tail recurse until lower layer is saturated */
paulo@0 320 return service_queues (layer, tx_packet);
paulo@0 321 }
paulo@0 322
paulo@0 323 /*
paulo@0 324 * Gets called when the lower layer is writable.
paulo@0 325 */
paulo@0 326 static tx_status_t tx_packet_ready (struct tx_layer *tx)
paulo@0 327 {
paulo@0 328 struct tx_packet *tx_packet = tx->udata;
paulo@0 329
paulo@0 330 if (tx_packet->total_pkts == 0)
paulo@0 331 return TX_EMPTY;
paulo@0 332
paulo@0 333 if (TX_PACKET_DEBUG)
paulo@0 334 trace_queue (tx, "*1*");
paulo@0 335
paulo@0 336 return service_queues (tx, tx_packet);
paulo@0 337 }
paulo@0 338
paulo@0 339 /*****************************************************************************/
paulo@0 340
paulo@0 341 static BOOL tx_packet_init (struct tx_layer *tx)
paulo@0 342 {
paulo@0 343 struct tx_packet *tx_packet;
paulo@0 344 int i;
paulo@0 345
paulo@0 346 if (!(tx_packet = malloc (sizeof (struct tx_packet))))
paulo@0 347 return FALSE;
paulo@0 348
paulo@0 349 tx_packet->total_pkts = 0;
paulo@0 350
paulo@0 351 for (i = 0; i < NR_QUEUES; i++)
paulo@0 352 {
paulo@0 353 tx_packet->queues[i].queue = NULL;
paulo@0 354 tx_packet->queues[i].bytes_queued = 0;
paulo@0 355 }
paulo@0 356
paulo@0 357 reset_ratios (tx_packet->queues, NR_QUEUES);
paulo@0 358
paulo@0 359 tx->udata = tx_packet;
paulo@0 360
paulo@0 361 return TRUE;
paulo@0 362 }
paulo@0 363
paulo@0 364 static BOOL free_io_buf (struct io_buf *io_buf, void *udata)
paulo@0 365 {
paulo@0 366 io_buf_free (io_buf);
paulo@0 367 return TRUE;
paulo@0 368 }
paulo@0 369
paulo@0 370 static void flush_packets (struct packet_queue *pkt_queue)
paulo@0 371 {
paulo@0 372 list_foreach_remove (pkt_queue->queue, (ListForeachFunc)free_io_buf, NULL);
paulo@0 373 pkt_queue = NULL;
paulo@0 374 }
paulo@0 375
paulo@0 376 static void tx_packet_destroy (struct tx_layer *tx)
paulo@0 377 {
paulo@0 378 struct tx_packet *tx_packet = tx->udata;
paulo@0 379 int i;
paulo@0 380
paulo@0 381 for (i = 0; i < NR_QUEUES; i++)
paulo@0 382 flush_packets (&tx_packet->queues[i]);
paulo@0 383
paulo@0 384 FREE (tx_packet);
paulo@0 385 }
paulo@0 386
paulo@0 387 static void tx_packet_consume (struct tx_layer *tx, BOOL stop)
paulo@0 388 {
paulo@0 389 /* nothing */
paulo@0 390 }
paulo@0 391
paulo@0 392 /*****************************************************************************/
paulo@0 393
paulo@0 394 static void tx_packet_enable (struct tx_layer *tx)
paulo@0 395 {
paulo@0 396 /* TODO */
paulo@0 397 }
paulo@0 398
paulo@0 399 static void tx_packet_disable (struct tx_layer *tx)
paulo@0 400 {
paulo@0 401 /* TODO */
paulo@0 402 }
paulo@0 403
paulo@0 404 /*****************************************************************************/
paulo@0 405
paulo@0 406 struct tx_layer_ops gt_tx_packet_ops =
paulo@0 407 {
paulo@0 408 tx_packet_init,
paulo@0 409 tx_packet_destroy,
paulo@0 410 tx_packet_consume,
paulo@0 411 tx_packet_queue,
paulo@0 412 tx_packet_ready,
paulo@0 413 tx_packet_enable,
paulo@0 414 tx_packet_disable,
paulo@0 415 };