comparison src/io/tx_packet.c @ 0:d39e1d0d75b6

initial add
author paulo@hit-nxdomain.opendns.com
date Sat, 20 Feb 2010 21:18:28 -0800
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:b023a262bbe9
1 /*
2 * $Id: tx_packet.c,v 1.10 2005/01/04 14:59:23 mkern Exp $
3 *
4 * Copyright (C) 2004 giFT project (gift.sourceforge.net)
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2, or (at your option) any
9 * later version.
10 *
11 * This program is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
15 */
16
17 #include "gt_gnutella.h"
18 #include "gt_packet.h" /* packet manipulation macros */
19
20 #include "io/tx_stack.h"
21 #include "io/tx_layer.h"
22 #include "io/io_buf.h"
23
24 /*****************************************************************************/
25
26 #define TX_PACKET_DEBUG 0
27
28 /*****************************************************************************/
29
30 /*
31 * Relative packet priority ratios. These control how many packets of a
32 * certain type are sent prior to looking for other types. For each type we
33 * maintain an independent FIFO queue. Each time a packet can be sent, each
34 * queue is checked.
35 *
36 * Packets in a queue will be sent while the ratio is greater than zero, and
37 * there are no higher priority packets waiting. Once there are no queues
38 * with both waiting packets and a non-zero ratio, the queue priority ratios
39 * are reset so that more packets can be sent. This process continues until
40 * the lower layer becomes saturated.
41 *
42 * Note that it is bad idea to reset the priority ratios only when all the
43 * ratios are zero, because this could lead to starvation for some packet
44 * types.
45 *
46 * Pushes have the highest priorty of normal messages. Also, there is a
47 * special 'urgent' queue that has higher priority, that includes replies to
48 * keepalive pings and other important high-priority messages.
49 */
50 #define URGENT_RATIO INT_MAX
51 #define PUSH_RATIO 5
52 #define QHIT_RATIO 4
53 #define QUERY_RATIO 3
54 #define PONG_RATIO 2
55 #define PING_RATIO 1
56 #define MISC_RATIO 1
57
58 #define NR_QUEUES (7)
59
60 /*****************************************************************************/
61
62 struct packet_queue
63 {
64 gt_packet_type_t msg_type;
65 size_t ratio; /* how many packets left on this turn? */
66 size_t bytes_queued; /* total bytes queued */
67 List *queue;
68 };
69
70 struct tx_packet
71 {
72 struct packet_queue queues[NR_QUEUES];
73 int total_pkts; /* used to quickly test if empty */
74 };
75
76 /*****************************************************************************/
77 /* DEBUGGING/TRACING */
78
79 #if TX_PACKET_DEBUG
80 /* ripped from gt_packet.c */
81 static const char *packet_command_str (uint8_t cmd)
82 {
83 static char buf[16];
84
85 switch (cmd)
86 {
87 case GT_MSG_PING: return "PING";
88 case GT_MSG_PING_REPLY: return "PONG";
89 case GT_MSG_BYE: return "BYE";
90 case GT_MSG_QUERY_ROUTE: return "QROUTE";
91 case GT_MSG_VENDOR: return "VMSG";
92 case GT_MSG_VENDOR_STD: return "VMSG-S";
93 case GT_MSG_PUSH: return "PUSH";
94 case GT_MSG_QUERY: return "QUERY";
95 case GT_MSG_QUERY_REPLY: return "HITS";
96
97 default:
98 snprintf (buf, sizeof (buf), "[<%02hx>]", cmd);
99 return buf;
100 }
101 }
102
103 static void dump_packet (struct io_buf *buf, String *str)
104 {
105 uint8_t cmd = get_command (buf->data);
106 string_appendf (str, "%s,", packet_command_str (cmd));
107 }
108
109 static void trace_queue_list (List *queue, String *str)
110 {
111 list_foreach (queue, (ListForeachFunc)dump_packet, str);
112 }
113 #endif /* TX_PACKET_DEBUG */
114
115 static void trace_queue (struct tx_layer *tx, const char *id)
116 {
117 #if TX_PACKET_DEBUG
118 struct tx_packet *tx_packet = tx->udata;
119 int i;
120 String *s;
121 TCPC *c;
122
123 if (!(s = string_new (NULL, 0, 0, TRUE)))
124 return;
125
126 c = tx->stack->c;
127 string_appendf (s, "{%s totalpkts=%d ", id, tx_packet->total_pkts);
128
129 for (i = 0; i < NR_QUEUES; i++)
130 trace_queue_list (tx_packet->queues[i].queue, s);
131
132 string_append (s, "}");
133
134 GT->DBGSOCK (GT, c, "%s", s->str);
135 string_free (s);
136 #endif
137 }
138
139 /*****************************************************************************/
140
141 /* return the queue on which this message should go */
142 static size_t get_queue (struct io_buf *msg)
143 {
144 uint8_t cmd;
145
146 cmd = get_command (msg->data);
147
148 switch (cmd)
149 {
150 default:
151 abort ();
152
153 case GT_MSG_VENDOR:
154 case GT_MSG_VENDOR_STD:
155 case GT_MSG_QUERY_ROUTE:
156 return 6;
157
158 case GT_MSG_PING:
159 {
160 /* queue keep-alive pings in the urgent queue */
161 if (get_ttl (msg->data) == 1 && get_hops (msg->data) == 0)
162 return 0;
163 }
164 return 5;
165
166 case GT_MSG_PING_REPLY:
167 {
168 /*
169 * Queue replies to keep-alive ping in the urgent queue.
170 *
171 * This allows the remote end to starve it's own connection
172 * with a series of keep-alive pings. Only flow-control
173 * can handle this.
174 */
175 if (get_ttl (msg->data) == 1 && get_hops (msg->data) == 0)
176 return 0;
177 }
178 return 4;
179
180 case GT_MSG_QUERY:
181 {
182 /* make queries from this node more important */
183 if (get_ttl (msg->data) == 1 && get_hops (msg->data) == 0)
184 return 1;
185 }
186 return 3;
187
188 case GT_MSG_QUERY_REPLY:
189 return 2;
190
191 case GT_MSG_PUSH:
192 return 1;
193
194 case GT_MSG_BYE:
195 return 0;
196 }
197
198 abort ();
199 }
200
201 static void enqueue_packet (struct packet_queue *pkt_queue, struct io_buf *msg)
202 {
203 pkt_queue->queue = list_append (pkt_queue->queue, msg);
204 }
205
206 /*
207 * Called from upper layer when it wants to send us a message buffer.
208 */
209 static tx_status_t tx_packet_queue (struct tx_layer *tx, struct io_buf *io_buf)
210 {
211 struct tx_packet *tx_packet = tx->udata;
212 size_t queue_nr;
213
214 queue_nr = get_queue (io_buf);
215
216 assert (queue_nr < NR_QUEUES);
217 enqueue_packet (&tx_packet->queues[queue_nr], io_buf);
218
219 tx_packet->total_pkts++;
220 assert (tx_packet->total_pkts > 0);
221
222 trace_queue (tx, "*0*");
223
224 return TX_OK;
225 }
226
227 /*****************************************************************************/
228
229 static void set_queue (struct packet_queue *queue, gt_packet_type_t msg_type,
230 size_t prio)
231 {
232 queue->msg_type = msg_type;
233 queue->ratio = prio;
234 }
235
236 static void reset_ratios (struct packet_queue *queue, size_t len)
237 {
238 set_queue (&queue[0], 0xff, URGENT_RATIO);
239 set_queue (&queue[1], GT_MSG_PUSH, PUSH_RATIO);
240 set_queue (&queue[2], GT_MSG_QUERY_REPLY, QHIT_RATIO);
241 set_queue (&queue[3], GT_MSG_QUERY, QUERY_RATIO);
242 set_queue (&queue[4], GT_MSG_PING_REPLY, PONG_RATIO);
243 set_queue (&queue[5], GT_MSG_PING, PING_RATIO);
244 set_queue (&queue[6], 0xff, MISC_RATIO);
245 }
246
247 /*****************************************************************************/
248
249 /*
250 * Try to send a single message buffer from the packet queue to the lower
251 * layer. If the lower layer has become saturated, return FALSE.
252 *
253 * The lower layer takes responsibility for the messages sent to it in
254 * entirety in gt_tx_layer_queue() unless it is full. In that case it
255 * returns TX_FULL.
256 */
257 static tx_status_t shift_queue (struct tx_layer *tx,
258 struct tx_packet *tx_packet,
259 struct packet_queue *pkt_queue)
260 {
261 List *msg_l;
262 struct io_buf *msg;
263 tx_status_t ret;
264
265 msg_l = list_nth (pkt_queue->queue, 0);
266 msg = msg_l->data;
267
268 ret = gt_tx_layer_queue (tx, msg);
269
270 if (ret != TX_OK)
271 {
272 assert (ret != TX_EMPTY); /* impossible to be empty */
273 return ret;
274 }
275
276 /* shift this packet off the queue */
277 pkt_queue->queue = list_remove_link (pkt_queue->queue, msg_l);
278
279 tx_packet->total_pkts--;
280 assert (tx_packet->total_pkts >= 0);
281
282 if (TX_PACKET_DEBUG)
283 trace_queue (tx, "*2*");
284
285 return ret;
286 }
287
288 static tx_status_t service_queues (struct tx_layer *layer,
289 struct tx_packet *tx_packet)
290 {
291 int i;
292 tx_status_t ret;
293
294 for (i = 0; i < NR_QUEUES; i++)
295 {
296 struct packet_queue *pkt_queue = &tx_packet->queues[i];
297
298 /* skip if ratio is small */
299 while (pkt_queue->ratio > 0 && pkt_queue->queue != NULL)
300 {
301 ret = shift_queue (layer, tx_packet, pkt_queue);
302 pkt_queue->ratio--;
303
304 if (ret == TX_FULL)
305 return TX_OK;
306
307 if (ret != TX_OK)
308 return ret;
309 }
310 }
311
312 /* reset the ratios to write more data */
313 reset_ratios (tx_packet->queues, NR_QUEUES);
314
315 /* we wrote something, so return ok */
316 if (tx_packet->total_pkts == 0)
317 return TX_OK;
318
319 /* tail recurse until lower layer is saturated */
320 return service_queues (layer, tx_packet);
321 }
322
323 /*
324 * Gets called when the lower layer is writable.
325 */
326 static tx_status_t tx_packet_ready (struct tx_layer *tx)
327 {
328 struct tx_packet *tx_packet = tx->udata;
329
330 if (tx_packet->total_pkts == 0)
331 return TX_EMPTY;
332
333 if (TX_PACKET_DEBUG)
334 trace_queue (tx, "*1*");
335
336 return service_queues (tx, tx_packet);
337 }
338
339 /*****************************************************************************/
340
341 static BOOL tx_packet_init (struct tx_layer *tx)
342 {
343 struct tx_packet *tx_packet;
344 int i;
345
346 if (!(tx_packet = malloc (sizeof (struct tx_packet))))
347 return FALSE;
348
349 tx_packet->total_pkts = 0;
350
351 for (i = 0; i < NR_QUEUES; i++)
352 {
353 tx_packet->queues[i].queue = NULL;
354 tx_packet->queues[i].bytes_queued = 0;
355 }
356
357 reset_ratios (tx_packet->queues, NR_QUEUES);
358
359 tx->udata = tx_packet;
360
361 return TRUE;
362 }
363
364 static BOOL free_io_buf (struct io_buf *io_buf, void *udata)
365 {
366 io_buf_free (io_buf);
367 return TRUE;
368 }
369
370 static void flush_packets (struct packet_queue *pkt_queue)
371 {
372 list_foreach_remove (pkt_queue->queue, (ListForeachFunc)free_io_buf, NULL);
373 pkt_queue = NULL;
374 }
375
376 static void tx_packet_destroy (struct tx_layer *tx)
377 {
378 struct tx_packet *tx_packet = tx->udata;
379 int i;
380
381 for (i = 0; i < NR_QUEUES; i++)
382 flush_packets (&tx_packet->queues[i]);
383
384 FREE (tx_packet);
385 }
386
387 static void tx_packet_consume (struct tx_layer *tx, BOOL stop)
388 {
389 /* nothing */
390 }
391
392 /*****************************************************************************/
393
394 static void tx_packet_enable (struct tx_layer *tx)
395 {
396 /* TODO */
397 }
398
399 static void tx_packet_disable (struct tx_layer *tx)
400 {
401 /* TODO */
402 }
403
404 /*****************************************************************************/
405
406 struct tx_layer_ops gt_tx_packet_ops =
407 {
408 tx_packet_init,
409 tx_packet_destroy,
410 tx_packet_consume,
411 tx_packet_queue,
412 tx_packet_ready,
413 tx_packet_enable,
414 tx_packet_disable,
415 };