diff src/io/tx_packet.c @ 0:d39e1d0d75b6

initial add
author paulo@hit-nxdomain.opendns.com
date Sat, 20 Feb 2010 21:18:28 -0800
parents
children
line diff
     1.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     1.2 +++ b/src/io/tx_packet.c	Sat Feb 20 21:18:28 2010 -0800
     1.3 @@ -0,0 +1,415 @@
     1.4 +/*
     1.5 + * $Id: tx_packet.c,v 1.10 2005/01/04 14:59:23 mkern Exp $
     1.6 + *
     1.7 + * Copyright (C) 2004 giFT project (gift.sourceforge.net)
     1.8 + *
     1.9 + * This program is free software; you can redistribute it and/or modify it
    1.10 + * under the terms of the GNU General Public License as published by the
    1.11 + * Free Software Foundation; either version 2, or (at your option) any
    1.12 + * later version.
    1.13 + *
    1.14 + * This program is distributed in the hope that it will be useful, but
    1.15 + * WITHOUT ANY WARRANTY; without even the implied warranty of
    1.16 + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
    1.17 + * General Public License for more details.
    1.18 + */
    1.19 +
    1.20 +#include "gt_gnutella.h"
    1.21 +#include "gt_packet.h"       /* packet manipulation macros */
    1.22 +
    1.23 +#include "io/tx_stack.h"
    1.24 +#include "io/tx_layer.h"
    1.25 +#include "io/io_buf.h"
    1.26 +
    1.27 +/*****************************************************************************/
    1.28 +
    1.29 +#define TX_PACKET_DEBUG    0
    1.30 +
    1.31 +/*****************************************************************************/
    1.32 +
    1.33 +/*
    1.34 + * Relative packet priority ratios.  These control how many packets of a
    1.35 + * certain type are sent prior to looking for other types.  For each type we
    1.36 + * maintain an independent FIFO queue.  Each time a packet can be sent, each
    1.37 + * queue is checked.
    1.38 + *
    1.39 + * Packets in a queue will be sent while the ratio is greater than zero, and
    1.40 + * there are no higher priority packets waiting.  Once there are no queues
    1.41 + * with both waiting packets and a non-zero ratio, the queue priority ratios
    1.42 + * are reset so that more packets can be sent.  This process continues until
    1.43 + * the lower layer becomes saturated.
    1.44 + *
    1.45 + * Note that it is bad idea to reset the priority ratios only when all the
    1.46 + * ratios are zero, because this could lead to starvation for some packet
    1.47 + * types.
    1.48 + *
    1.49 + * Pushes have the highest priorty of normal messages.  Also, there is a
    1.50 + * special 'urgent' queue that has higher priority, that includes replies to
    1.51 + * keepalive pings and other important high-priority messages.
    1.52 + */
    1.53 +#define URGENT_RATIO  INT_MAX
    1.54 +#define PUSH_RATIO    5
    1.55 +#define QHIT_RATIO    4
    1.56 +#define QUERY_RATIO   3
    1.57 +#define PONG_RATIO    2
    1.58 +#define PING_RATIO    1
    1.59 +#define MISC_RATIO    1
    1.60 +
    1.61 +#define NR_QUEUES    (7)
    1.62 +
    1.63 +/*****************************************************************************/
    1.64 +
    1.65 +struct packet_queue
    1.66 +{
    1.67 +	gt_packet_type_t msg_type;
    1.68 +	size_t           ratio;        /* how many packets left on this turn? */
    1.69 +	size_t           bytes_queued; /* total bytes queued */
    1.70 +	List            *queue;
    1.71 +};
    1.72 +
    1.73 +struct tx_packet
    1.74 +{
    1.75 +	struct packet_queue queues[NR_QUEUES];
    1.76 +	int                 total_pkts;         /* used to quickly test if empty */
    1.77 +};
    1.78 +
    1.79 +/*****************************************************************************/
    1.80 +/* DEBUGGING/TRACING */
    1.81 +
    1.82 +#if TX_PACKET_DEBUG
    1.83 +/* ripped from gt_packet.c */
    1.84 +static const char *packet_command_str (uint8_t cmd)
    1.85 +{
    1.86 +	static char buf[16];
    1.87 +
    1.88 +	switch (cmd)
    1.89 +	{
    1.90 +	 case GT_MSG_PING:        return "PING";
    1.91 +	 case GT_MSG_PING_REPLY:  return "PONG";
    1.92 +	 case GT_MSG_BYE:         return "BYE";
    1.93 +	 case GT_MSG_QUERY_ROUTE: return "QROUTE";
    1.94 +	 case GT_MSG_VENDOR:      return "VMSG";
    1.95 +	 case GT_MSG_VENDOR_STD:  return "VMSG-S";
    1.96 +	 case GT_MSG_PUSH:        return "PUSH";
    1.97 +	 case GT_MSG_QUERY:       return "QUERY";
    1.98 +	 case GT_MSG_QUERY_REPLY: return "HITS";
    1.99 +
   1.100 +	 default:
   1.101 +		snprintf (buf, sizeof (buf), "[<%02hx>]", cmd);
   1.102 +		return buf;
   1.103 +	}
   1.104 +}
   1.105 +
   1.106 +static void dump_packet (struct io_buf *buf, String *str)
   1.107 +{
   1.108 +	uint8_t cmd = get_command (buf->data);
   1.109 +	string_appendf (str, "%s,", packet_command_str (cmd));
   1.110 +}
   1.111 +
   1.112 +static void trace_queue_list (List *queue, String *str)
   1.113 +{
   1.114 +	list_foreach (queue, (ListForeachFunc)dump_packet, str);
   1.115 +}
   1.116 +#endif /* TX_PACKET_DEBUG */
   1.117 +
   1.118 +static void trace_queue (struct tx_layer *tx, const char *id)
   1.119 +{
   1.120 +#if TX_PACKET_DEBUG
   1.121 +	struct tx_packet *tx_packet = tx->udata;
   1.122 +	int i;
   1.123 +	String *s;
   1.124 +	TCPC *c;
   1.125 +
   1.126 +	if (!(s = string_new (NULL, 0, 0, TRUE)))
   1.127 +		return;
   1.128 +
   1.129 +	c = tx->stack->c;
   1.130 +	string_appendf (s, "{%s totalpkts=%d ", id, tx_packet->total_pkts);
   1.131 +
   1.132 +	for (i = 0; i < NR_QUEUES; i++)
   1.133 +		trace_queue_list (tx_packet->queues[i].queue, s);
   1.134 +
   1.135 +	string_append (s, "}");
   1.136 +
   1.137 +	GT->DBGSOCK (GT, c, "%s", s->str);
   1.138 +	string_free (s);
   1.139 +#endif
   1.140 +}
   1.141 +
   1.142 +/*****************************************************************************/
   1.143 +
   1.144 +/* return the queue on which this message should go */
   1.145 +static size_t get_queue (struct io_buf *msg)
   1.146 +{
   1.147 +	uint8_t cmd;
   1.148 +
   1.149 +	cmd = get_command (msg->data);
   1.150 +
   1.151 +	switch (cmd)
   1.152 +	{
   1.153 +	 default:
   1.154 +		abort ();
   1.155 +
   1.156 +	 case GT_MSG_VENDOR:
   1.157 +	 case GT_MSG_VENDOR_STD:
   1.158 +	 case GT_MSG_QUERY_ROUTE:
   1.159 +		return 6;
   1.160 +
   1.161 +	 case GT_MSG_PING:
   1.162 +		{
   1.163 +			/* queue keep-alive pings in the urgent queue */
   1.164 +			if (get_ttl (msg->data) == 1 && get_hops (msg->data) == 0)
   1.165 +				return 0;
   1.166 +		}
   1.167 +		return 5;
   1.168 +
   1.169 +	 case GT_MSG_PING_REPLY:
   1.170 +		{
   1.171 +			/*
   1.172 +			 * Queue replies to keep-alive ping in the urgent queue.
   1.173 +			 *
   1.174 +			 * This allows the remote end to starve it's own connection
   1.175 +			 * with a series of keep-alive pings.  Only flow-control
   1.176 +			 * can handle this.
   1.177 +			 */
   1.178 +			if (get_ttl (msg->data) == 1 && get_hops (msg->data) == 0)
   1.179 +				return 0;
   1.180 +		}
   1.181 +		return 4;
   1.182 +
   1.183 +	 case GT_MSG_QUERY:
   1.184 +		{
   1.185 +			/* make queries from this node more important */
   1.186 +			if (get_ttl (msg->data) == 1 && get_hops (msg->data) == 0)
   1.187 +				return 1;
   1.188 +		}
   1.189 +		return 3;
   1.190 +
   1.191 +	 case GT_MSG_QUERY_REPLY:
   1.192 +		return 2;
   1.193 +
   1.194 +	 case GT_MSG_PUSH:
   1.195 +		return 1;
   1.196 +
   1.197 +	 case GT_MSG_BYE:
   1.198 +		return 0;
   1.199 +	}
   1.200 +
   1.201 +	abort ();
   1.202 +}
   1.203 +
   1.204 +static void enqueue_packet (struct packet_queue *pkt_queue, struct io_buf *msg)
   1.205 +{
   1.206 +	pkt_queue->queue = list_append (pkt_queue->queue, msg);
   1.207 +}
   1.208 +
   1.209 +/*
   1.210 + * Called from upper layer when it wants to send us a message buffer.
   1.211 + */
   1.212 +static tx_status_t tx_packet_queue (struct tx_layer *tx, struct io_buf *io_buf)
   1.213 +{
   1.214 +	struct tx_packet *tx_packet = tx->udata;
   1.215 +	size_t queue_nr;
   1.216 +
   1.217 +	queue_nr = get_queue (io_buf);
   1.218 +
   1.219 +	assert (queue_nr < NR_QUEUES);
   1.220 +	enqueue_packet (&tx_packet->queues[queue_nr], io_buf);
   1.221 +
   1.222 +	tx_packet->total_pkts++;
   1.223 +	assert (tx_packet->total_pkts > 0);
   1.224 +
   1.225 +	trace_queue (tx, "*0*");
   1.226 +
   1.227 +	return TX_OK;
   1.228 +}
   1.229 +
   1.230 +/*****************************************************************************/
   1.231 +
   1.232 +static void set_queue (struct packet_queue *queue, gt_packet_type_t msg_type,
   1.233 +                       size_t prio)
   1.234 +{
   1.235 +	queue->msg_type = msg_type;
   1.236 +	queue->ratio    = prio;
   1.237 +}
   1.238 +
   1.239 +static void reset_ratios (struct packet_queue *queue, size_t len)
   1.240 +{
   1.241 +	set_queue (&queue[0], 0xff,               URGENT_RATIO);
   1.242 +	set_queue (&queue[1], GT_MSG_PUSH,        PUSH_RATIO);
   1.243 +	set_queue (&queue[2], GT_MSG_QUERY_REPLY, QHIT_RATIO);
   1.244 +	set_queue (&queue[3], GT_MSG_QUERY,       QUERY_RATIO);
   1.245 +	set_queue (&queue[4], GT_MSG_PING_REPLY,  PONG_RATIO);
   1.246 +	set_queue (&queue[5], GT_MSG_PING,        PING_RATIO);
   1.247 +	set_queue (&queue[6], 0xff,               MISC_RATIO);
   1.248 +}
   1.249 +
   1.250 +/*****************************************************************************/
   1.251 +
   1.252 +/*
   1.253 + * Try to send a single message buffer from the packet queue to the lower
   1.254 + * layer.  If the lower layer has become saturated, return FALSE.
   1.255 + *
   1.256 + * The lower layer takes responsibility for the messages sent to it in
   1.257 + * entirety in gt_tx_layer_queue() unless it is full.  In that case it
   1.258 + * returns TX_FULL.
   1.259 + */
   1.260 +static tx_status_t shift_queue (struct tx_layer *tx,
   1.261 +                                struct tx_packet *tx_packet,
   1.262 +                                struct packet_queue *pkt_queue)
   1.263 +{
   1.264 +	List          *msg_l;
   1.265 +	struct io_buf *msg;
   1.266 +	tx_status_t    ret;
   1.267 +
   1.268 +	msg_l = list_nth (pkt_queue->queue, 0);
   1.269 +	msg   = msg_l->data;
   1.270 +
   1.271 +	ret = gt_tx_layer_queue (tx, msg);
   1.272 +
   1.273 +	if (ret != TX_OK)
   1.274 +	{
   1.275 +		assert (ret != TX_EMPTY); /* impossible to be empty */
   1.276 +		return ret;
   1.277 +	}
   1.278 +
   1.279 +	/* shift this packet off the queue */
   1.280 +	pkt_queue->queue = list_remove_link (pkt_queue->queue, msg_l);
   1.281 +
   1.282 +	tx_packet->total_pkts--;
   1.283 +	assert (tx_packet->total_pkts >= 0);
   1.284 +
   1.285 +	if (TX_PACKET_DEBUG)
   1.286 +		trace_queue (tx, "*2*");
   1.287 +
   1.288 +	return ret;
   1.289 +}
   1.290 +
   1.291 +static tx_status_t service_queues (struct tx_layer *layer,
   1.292 +                                   struct tx_packet *tx_packet)
   1.293 +{
   1.294 +	int         i;
   1.295 +	tx_status_t ret;
   1.296 +
   1.297 +	for (i = 0; i < NR_QUEUES; i++)
   1.298 +	{
   1.299 +		struct packet_queue *pkt_queue  = &tx_packet->queues[i];
   1.300 +
   1.301 +		/* skip if ratio is small */
   1.302 +		while (pkt_queue->ratio > 0 && pkt_queue->queue != NULL)
   1.303 +		{
   1.304 +			ret = shift_queue (layer, tx_packet, pkt_queue);
   1.305 +			pkt_queue->ratio--;
   1.306 +
   1.307 +			if (ret == TX_FULL)
   1.308 +				return TX_OK;
   1.309 +
   1.310 +			if (ret != TX_OK)
   1.311 +				return ret;
   1.312 +		}
   1.313 +	}
   1.314 +
   1.315 +	/* reset the ratios to write more data */
   1.316 +	reset_ratios (tx_packet->queues, NR_QUEUES);
   1.317 +
   1.318 +	/* we wrote something, so return ok */
   1.319 +	if (tx_packet->total_pkts == 0)
   1.320 +		return TX_OK;
   1.321 +
   1.322 +	/* tail recurse until lower layer is saturated */
   1.323 +	return service_queues (layer, tx_packet);
   1.324 +}
   1.325 +
   1.326 +/*
   1.327 + * Gets called when the lower layer is writable.
   1.328 + */
   1.329 +static tx_status_t tx_packet_ready (struct tx_layer *tx)
   1.330 +{
   1.331 +	struct tx_packet *tx_packet = tx->udata;
   1.332 +
   1.333 +	if (tx_packet->total_pkts == 0)
   1.334 +		return TX_EMPTY;
   1.335 +
   1.336 +	if (TX_PACKET_DEBUG)
   1.337 +		trace_queue (tx, "*1*");
   1.338 +
   1.339 +	return service_queues (tx, tx_packet);
   1.340 +}
   1.341 +
   1.342 +/*****************************************************************************/
   1.343 +
   1.344 +static BOOL tx_packet_init (struct tx_layer *tx)
   1.345 +{
   1.346 +	struct tx_packet *tx_packet;
   1.347 +	int               i;
   1.348 +
   1.349 +	if (!(tx_packet = malloc (sizeof (struct tx_packet))))
   1.350 +		return FALSE;
   1.351 +
   1.352 +	tx_packet->total_pkts = 0;
   1.353 +
   1.354 +	for (i = 0; i < NR_QUEUES; i++)
   1.355 +	{
   1.356 +		tx_packet->queues[i].queue        = NULL;
   1.357 +		tx_packet->queues[i].bytes_queued = 0;
   1.358 +	}
   1.359 +
   1.360 +	reset_ratios (tx_packet->queues, NR_QUEUES);
   1.361 +
   1.362 +	tx->udata = tx_packet;
   1.363 +
   1.364 +	return TRUE;
   1.365 +}
   1.366 +
   1.367 +static BOOL free_io_buf (struct io_buf *io_buf, void *udata)
   1.368 +{
   1.369 +	io_buf_free (io_buf);
   1.370 +	return TRUE;
   1.371 +}
   1.372 +
   1.373 +static void flush_packets (struct packet_queue *pkt_queue)
   1.374 +{
   1.375 +	list_foreach_remove (pkt_queue->queue, (ListForeachFunc)free_io_buf, NULL);
   1.376 +	pkt_queue = NULL;
   1.377 +}
   1.378 +
   1.379 +static void tx_packet_destroy (struct tx_layer *tx)
   1.380 +{
   1.381 +	struct tx_packet *tx_packet = tx->udata;
   1.382 +	int i;
   1.383 +
   1.384 +	for (i = 0; i < NR_QUEUES; i++)
   1.385 +		flush_packets (&tx_packet->queues[i]);
   1.386 +
   1.387 +	FREE (tx_packet);
   1.388 +}
   1.389 +
   1.390 +static void tx_packet_consume (struct tx_layer *tx, BOOL stop)
   1.391 +{
   1.392 +	/* nothing */
   1.393 +}
   1.394 +
   1.395 +/*****************************************************************************/
   1.396 +
   1.397 +static void tx_packet_enable (struct tx_layer *tx)
   1.398 +{
   1.399 +	/* TODO */
   1.400 +}
   1.401 +
   1.402 +static void tx_packet_disable (struct tx_layer *tx)
   1.403 +{
   1.404 +	/* TODO */
   1.405 +}
   1.406 +
   1.407 +/*****************************************************************************/
   1.408 +
   1.409 +struct tx_layer_ops gt_tx_packet_ops =
   1.410 +{
   1.411 +	tx_packet_init,
   1.412 +	tx_packet_destroy,
   1.413 +	tx_packet_consume,
   1.414 +	tx_packet_queue,
   1.415 +	tx_packet_ready,
   1.416 +	tx_packet_enable,
   1.417 +	tx_packet_disable,
   1.418 +};