rev |
line source |
paulo@0
|
1 /*
|
paulo@0
|
2 * $Id: tx_packet.c,v 1.10 2005/01/04 14:59:23 mkern Exp $
|
paulo@0
|
3 *
|
paulo@0
|
4 * Copyright (C) 2004 giFT project (gift.sourceforge.net)
|
paulo@0
|
5 *
|
paulo@0
|
6 * This program is free software; you can redistribute it and/or modify it
|
paulo@0
|
7 * under the terms of the GNU General Public License as published by the
|
paulo@0
|
8 * Free Software Foundation; either version 2, or (at your option) any
|
paulo@0
|
9 * later version.
|
paulo@0
|
10 *
|
paulo@0
|
11 * This program is distributed in the hope that it will be useful, but
|
paulo@0
|
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
|
paulo@0
|
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
paulo@0
|
14 * General Public License for more details.
|
paulo@0
|
15 */
|
paulo@0
|
16
|
paulo@0
|
17 #include "gt_gnutella.h"
|
paulo@0
|
18 #include "gt_packet.h" /* packet manipulation macros */
|
paulo@0
|
19
|
paulo@0
|
20 #include "io/tx_stack.h"
|
paulo@0
|
21 #include "io/tx_layer.h"
|
paulo@0
|
22 #include "io/io_buf.h"
|
paulo@0
|
23
|
paulo@0
|
24 /*****************************************************************************/
|
paulo@0
|
25
|
paulo@0
|
26 #define TX_PACKET_DEBUG 0
|
paulo@0
|
27
|
paulo@0
|
28 /*****************************************************************************/
|
paulo@0
|
29
|
paulo@0
|
30 /*
|
paulo@0
|
31 * Relative packet priority ratios. These control how many packets of a
|
paulo@0
|
32 * certain type are sent prior to looking for other types. For each type we
|
paulo@0
|
33 * maintain an independent FIFO queue. Each time a packet can be sent, each
|
paulo@0
|
34 * queue is checked.
|
paulo@0
|
35 *
|
paulo@0
|
36 * Packets in a queue will be sent while the ratio is greater than zero, and
|
paulo@0
|
37 * there are no higher priority packets waiting. Once there are no queues
|
paulo@0
|
38 * with both waiting packets and a non-zero ratio, the queue priority ratios
|
paulo@0
|
39 * are reset so that more packets can be sent. This process continues until
|
paulo@0
|
40 * the lower layer becomes saturated.
|
paulo@0
|
41 *
|
paulo@0
|
42 * Note that it is bad idea to reset the priority ratios only when all the
|
paulo@0
|
43 * ratios are zero, because this could lead to starvation for some packet
|
paulo@0
|
44 * types.
|
paulo@0
|
45 *
|
paulo@0
|
46 * Pushes have the highest priorty of normal messages. Also, there is a
|
paulo@0
|
47 * special 'urgent' queue that has higher priority, that includes replies to
|
paulo@0
|
48 * keepalive pings and other important high-priority messages.
|
paulo@0
|
49 */
|
paulo@0
|
50 #define URGENT_RATIO INT_MAX
|
paulo@0
|
51 #define PUSH_RATIO 5
|
paulo@0
|
52 #define QHIT_RATIO 4
|
paulo@0
|
53 #define QUERY_RATIO 3
|
paulo@0
|
54 #define PONG_RATIO 2
|
paulo@0
|
55 #define PING_RATIO 1
|
paulo@0
|
56 #define MISC_RATIO 1
|
paulo@0
|
57
|
paulo@0
|
58 #define NR_QUEUES (7)
|
paulo@0
|
59
|
paulo@0
|
60 /*****************************************************************************/
|
paulo@0
|
61
|
paulo@0
|
62 struct packet_queue
|
paulo@0
|
63 {
|
paulo@0
|
64 gt_packet_type_t msg_type;
|
paulo@0
|
65 size_t ratio; /* how many packets left on this turn? */
|
paulo@0
|
66 size_t bytes_queued; /* total bytes queued */
|
paulo@0
|
67 List *queue;
|
paulo@0
|
68 };
|
paulo@0
|
69
|
paulo@0
|
70 struct tx_packet
|
paulo@0
|
71 {
|
paulo@0
|
72 struct packet_queue queues[NR_QUEUES];
|
paulo@0
|
73 int total_pkts; /* used to quickly test if empty */
|
paulo@0
|
74 };
|
paulo@0
|
75
|
paulo@0
|
76 /*****************************************************************************/
|
paulo@0
|
77 /* DEBUGGING/TRACING */
|
paulo@0
|
78
|
paulo@0
|
79 #if TX_PACKET_DEBUG
|
paulo@0
|
80 /* ripped from gt_packet.c */
|
paulo@0
|
81 static const char *packet_command_str (uint8_t cmd)
|
paulo@0
|
82 {
|
paulo@0
|
83 static char buf[16];
|
paulo@0
|
84
|
paulo@0
|
85 switch (cmd)
|
paulo@0
|
86 {
|
paulo@0
|
87 case GT_MSG_PING: return "PING";
|
paulo@0
|
88 case GT_MSG_PING_REPLY: return "PONG";
|
paulo@0
|
89 case GT_MSG_BYE: return "BYE";
|
paulo@0
|
90 case GT_MSG_QUERY_ROUTE: return "QROUTE";
|
paulo@0
|
91 case GT_MSG_VENDOR: return "VMSG";
|
paulo@0
|
92 case GT_MSG_VENDOR_STD: return "VMSG-S";
|
paulo@0
|
93 case GT_MSG_PUSH: return "PUSH";
|
paulo@0
|
94 case GT_MSG_QUERY: return "QUERY";
|
paulo@0
|
95 case GT_MSG_QUERY_REPLY: return "HITS";
|
paulo@0
|
96
|
paulo@0
|
97 default:
|
paulo@0
|
98 snprintf (buf, sizeof (buf), "[<%02hx>]", cmd);
|
paulo@0
|
99 return buf;
|
paulo@0
|
100 }
|
paulo@0
|
101 }
|
paulo@0
|
102
|
paulo@0
|
103 static void dump_packet (struct io_buf *buf, String *str)
|
paulo@0
|
104 {
|
paulo@0
|
105 uint8_t cmd = get_command (buf->data);
|
paulo@0
|
106 string_appendf (str, "%s,", packet_command_str (cmd));
|
paulo@0
|
107 }
|
paulo@0
|
108
|
paulo@0
|
109 static void trace_queue_list (List *queue, String *str)
|
paulo@0
|
110 {
|
paulo@0
|
111 list_foreach (queue, (ListForeachFunc)dump_packet, str);
|
paulo@0
|
112 }
|
paulo@0
|
113 #endif /* TX_PACKET_DEBUG */
|
paulo@0
|
114
|
paulo@0
|
115 static void trace_queue (struct tx_layer *tx, const char *id)
|
paulo@0
|
116 {
|
paulo@0
|
117 #if TX_PACKET_DEBUG
|
paulo@0
|
118 struct tx_packet *tx_packet = tx->udata;
|
paulo@0
|
119 int i;
|
paulo@0
|
120 String *s;
|
paulo@0
|
121 TCPC *c;
|
paulo@0
|
122
|
paulo@0
|
123 if (!(s = string_new (NULL, 0, 0, TRUE)))
|
paulo@0
|
124 return;
|
paulo@0
|
125
|
paulo@0
|
126 c = tx->stack->c;
|
paulo@0
|
127 string_appendf (s, "{%s totalpkts=%d ", id, tx_packet->total_pkts);
|
paulo@0
|
128
|
paulo@0
|
129 for (i = 0; i < NR_QUEUES; i++)
|
paulo@0
|
130 trace_queue_list (tx_packet->queues[i].queue, s);
|
paulo@0
|
131
|
paulo@0
|
132 string_append (s, "}");
|
paulo@0
|
133
|
paulo@0
|
134 GT->DBGSOCK (GT, c, "%s", s->str);
|
paulo@0
|
135 string_free (s);
|
paulo@0
|
136 #endif
|
paulo@0
|
137 }
|
paulo@0
|
138
|
paulo@0
|
139 /*****************************************************************************/
|
paulo@0
|
140
|
paulo@0
|
141 /* return the queue on which this message should go */
|
paulo@0
|
142 static size_t get_queue (struct io_buf *msg)
|
paulo@0
|
143 {
|
paulo@0
|
144 uint8_t cmd;
|
paulo@0
|
145
|
paulo@0
|
146 cmd = get_command (msg->data);
|
paulo@0
|
147
|
paulo@0
|
148 switch (cmd)
|
paulo@0
|
149 {
|
paulo@0
|
150 default:
|
paulo@0
|
151 abort ();
|
paulo@0
|
152
|
paulo@0
|
153 case GT_MSG_VENDOR:
|
paulo@0
|
154 case GT_MSG_VENDOR_STD:
|
paulo@0
|
155 case GT_MSG_QUERY_ROUTE:
|
paulo@0
|
156 return 6;
|
paulo@0
|
157
|
paulo@0
|
158 case GT_MSG_PING:
|
paulo@0
|
159 {
|
paulo@0
|
160 /* queue keep-alive pings in the urgent queue */
|
paulo@0
|
161 if (get_ttl (msg->data) == 1 && get_hops (msg->data) == 0)
|
paulo@0
|
162 return 0;
|
paulo@0
|
163 }
|
paulo@0
|
164 return 5;
|
paulo@0
|
165
|
paulo@0
|
166 case GT_MSG_PING_REPLY:
|
paulo@0
|
167 {
|
paulo@0
|
168 /*
|
paulo@0
|
169 * Queue replies to keep-alive ping in the urgent queue.
|
paulo@0
|
170 *
|
paulo@0
|
171 * This allows the remote end to starve it's own connection
|
paulo@0
|
172 * with a series of keep-alive pings. Only flow-control
|
paulo@0
|
173 * can handle this.
|
paulo@0
|
174 */
|
paulo@0
|
175 if (get_ttl (msg->data) == 1 && get_hops (msg->data) == 0)
|
paulo@0
|
176 return 0;
|
paulo@0
|
177 }
|
paulo@0
|
178 return 4;
|
paulo@0
|
179
|
paulo@0
|
180 case GT_MSG_QUERY:
|
paulo@0
|
181 {
|
paulo@0
|
182 /* make queries from this node more important */
|
paulo@0
|
183 if (get_ttl (msg->data) == 1 && get_hops (msg->data) == 0)
|
paulo@0
|
184 return 1;
|
paulo@0
|
185 }
|
paulo@0
|
186 return 3;
|
paulo@0
|
187
|
paulo@0
|
188 case GT_MSG_QUERY_REPLY:
|
paulo@0
|
189 return 2;
|
paulo@0
|
190
|
paulo@0
|
191 case GT_MSG_PUSH:
|
paulo@0
|
192 return 1;
|
paulo@0
|
193
|
paulo@0
|
194 case GT_MSG_BYE:
|
paulo@0
|
195 return 0;
|
paulo@0
|
196 }
|
paulo@0
|
197
|
paulo@0
|
198 abort ();
|
paulo@0
|
199 }
|
paulo@0
|
200
|
paulo@0
|
201 static void enqueue_packet (struct packet_queue *pkt_queue, struct io_buf *msg)
|
paulo@0
|
202 {
|
paulo@0
|
203 pkt_queue->queue = list_append (pkt_queue->queue, msg);
|
paulo@0
|
204 }
|
paulo@0
|
205
|
paulo@0
|
206 /*
|
paulo@0
|
207 * Called from upper layer when it wants to send us a message buffer.
|
paulo@0
|
208 */
|
paulo@0
|
209 static tx_status_t tx_packet_queue (struct tx_layer *tx, struct io_buf *io_buf)
|
paulo@0
|
210 {
|
paulo@0
|
211 struct tx_packet *tx_packet = tx->udata;
|
paulo@0
|
212 size_t queue_nr;
|
paulo@0
|
213
|
paulo@0
|
214 queue_nr = get_queue (io_buf);
|
paulo@0
|
215
|
paulo@0
|
216 assert (queue_nr < NR_QUEUES);
|
paulo@0
|
217 enqueue_packet (&tx_packet->queues[queue_nr], io_buf);
|
paulo@0
|
218
|
paulo@0
|
219 tx_packet->total_pkts++;
|
paulo@0
|
220 assert (tx_packet->total_pkts > 0);
|
paulo@0
|
221
|
paulo@0
|
222 trace_queue (tx, "*0*");
|
paulo@0
|
223
|
paulo@0
|
224 return TX_OK;
|
paulo@0
|
225 }
|
paulo@0
|
226
|
paulo@0
|
227 /*****************************************************************************/
|
paulo@0
|
228
|
paulo@0
|
229 static void set_queue (struct packet_queue *queue, gt_packet_type_t msg_type,
|
paulo@0
|
230 size_t prio)
|
paulo@0
|
231 {
|
paulo@0
|
232 queue->msg_type = msg_type;
|
paulo@0
|
233 queue->ratio = prio;
|
paulo@0
|
234 }
|
paulo@0
|
235
|
paulo@0
|
236 static void reset_ratios (struct packet_queue *queue, size_t len)
|
paulo@0
|
237 {
|
paulo@0
|
238 set_queue (&queue[0], 0xff, URGENT_RATIO);
|
paulo@0
|
239 set_queue (&queue[1], GT_MSG_PUSH, PUSH_RATIO);
|
paulo@0
|
240 set_queue (&queue[2], GT_MSG_QUERY_REPLY, QHIT_RATIO);
|
paulo@0
|
241 set_queue (&queue[3], GT_MSG_QUERY, QUERY_RATIO);
|
paulo@0
|
242 set_queue (&queue[4], GT_MSG_PING_REPLY, PONG_RATIO);
|
paulo@0
|
243 set_queue (&queue[5], GT_MSG_PING, PING_RATIO);
|
paulo@0
|
244 set_queue (&queue[6], 0xff, MISC_RATIO);
|
paulo@0
|
245 }
|
paulo@0
|
246
|
paulo@0
|
247 /*****************************************************************************/
|
paulo@0
|
248
|
paulo@0
|
249 /*
|
paulo@0
|
250 * Try to send a single message buffer from the packet queue to the lower
|
paulo@0
|
251 * layer. If the lower layer has become saturated, return FALSE.
|
paulo@0
|
252 *
|
paulo@0
|
253 * The lower layer takes responsibility for the messages sent to it in
|
paulo@0
|
254 * entirety in gt_tx_layer_queue() unless it is full. In that case it
|
paulo@0
|
255 * returns TX_FULL.
|
paulo@0
|
256 */
|
paulo@0
|
257 static tx_status_t shift_queue (struct tx_layer *tx,
|
paulo@0
|
258 struct tx_packet *tx_packet,
|
paulo@0
|
259 struct packet_queue *pkt_queue)
|
paulo@0
|
260 {
|
paulo@0
|
261 List *msg_l;
|
paulo@0
|
262 struct io_buf *msg;
|
paulo@0
|
263 tx_status_t ret;
|
paulo@0
|
264
|
paulo@0
|
265 msg_l = list_nth (pkt_queue->queue, 0);
|
paulo@0
|
266 msg = msg_l->data;
|
paulo@0
|
267
|
paulo@0
|
268 ret = gt_tx_layer_queue (tx, msg);
|
paulo@0
|
269
|
paulo@0
|
270 if (ret != TX_OK)
|
paulo@0
|
271 {
|
paulo@0
|
272 assert (ret != TX_EMPTY); /* impossible to be empty */
|
paulo@0
|
273 return ret;
|
paulo@0
|
274 }
|
paulo@0
|
275
|
paulo@0
|
276 /* shift this packet off the queue */
|
paulo@0
|
277 pkt_queue->queue = list_remove_link (pkt_queue->queue, msg_l);
|
paulo@0
|
278
|
paulo@0
|
279 tx_packet->total_pkts--;
|
paulo@0
|
280 assert (tx_packet->total_pkts >= 0);
|
paulo@0
|
281
|
paulo@0
|
282 if (TX_PACKET_DEBUG)
|
paulo@0
|
283 trace_queue (tx, "*2*");
|
paulo@0
|
284
|
paulo@0
|
285 return ret;
|
paulo@0
|
286 }
|
paulo@0
|
287
|
paulo@0
|
288 static tx_status_t service_queues (struct tx_layer *layer,
|
paulo@0
|
289 struct tx_packet *tx_packet)
|
paulo@0
|
290 {
|
paulo@0
|
291 int i;
|
paulo@0
|
292 tx_status_t ret;
|
paulo@0
|
293
|
paulo@0
|
294 for (i = 0; i < NR_QUEUES; i++)
|
paulo@0
|
295 {
|
paulo@0
|
296 struct packet_queue *pkt_queue = &tx_packet->queues[i];
|
paulo@0
|
297
|
paulo@0
|
298 /* skip if ratio is small */
|
paulo@0
|
299 while (pkt_queue->ratio > 0 && pkt_queue->queue != NULL)
|
paulo@0
|
300 {
|
paulo@0
|
301 ret = shift_queue (layer, tx_packet, pkt_queue);
|
paulo@0
|
302 pkt_queue->ratio--;
|
paulo@0
|
303
|
paulo@0
|
304 if (ret == TX_FULL)
|
paulo@0
|
305 return TX_OK;
|
paulo@0
|
306
|
paulo@0
|
307 if (ret != TX_OK)
|
paulo@0
|
308 return ret;
|
paulo@0
|
309 }
|
paulo@0
|
310 }
|
paulo@0
|
311
|
paulo@0
|
312 /* reset the ratios to write more data */
|
paulo@0
|
313 reset_ratios (tx_packet->queues, NR_QUEUES);
|
paulo@0
|
314
|
paulo@0
|
315 /* we wrote something, so return ok */
|
paulo@0
|
316 if (tx_packet->total_pkts == 0)
|
paulo@0
|
317 return TX_OK;
|
paulo@0
|
318
|
paulo@0
|
319 /* tail recurse until lower layer is saturated */
|
paulo@0
|
320 return service_queues (layer, tx_packet);
|
paulo@0
|
321 }
|
paulo@0
|
322
|
paulo@0
|
323 /*
|
paulo@0
|
324 * Gets called when the lower layer is writable.
|
paulo@0
|
325 */
|
paulo@0
|
326 static tx_status_t tx_packet_ready (struct tx_layer *tx)
|
paulo@0
|
327 {
|
paulo@0
|
328 struct tx_packet *tx_packet = tx->udata;
|
paulo@0
|
329
|
paulo@0
|
330 if (tx_packet->total_pkts == 0)
|
paulo@0
|
331 return TX_EMPTY;
|
paulo@0
|
332
|
paulo@0
|
333 if (TX_PACKET_DEBUG)
|
paulo@0
|
334 trace_queue (tx, "*1*");
|
paulo@0
|
335
|
paulo@0
|
336 return service_queues (tx, tx_packet);
|
paulo@0
|
337 }
|
paulo@0
|
338
|
paulo@0
|
339 /*****************************************************************************/
|
paulo@0
|
340
|
paulo@0
|
341 static BOOL tx_packet_init (struct tx_layer *tx)
|
paulo@0
|
342 {
|
paulo@0
|
343 struct tx_packet *tx_packet;
|
paulo@0
|
344 int i;
|
paulo@0
|
345
|
paulo@0
|
346 if (!(tx_packet = malloc (sizeof (struct tx_packet))))
|
paulo@0
|
347 return FALSE;
|
paulo@0
|
348
|
paulo@0
|
349 tx_packet->total_pkts = 0;
|
paulo@0
|
350
|
paulo@0
|
351 for (i = 0; i < NR_QUEUES; i++)
|
paulo@0
|
352 {
|
paulo@0
|
353 tx_packet->queues[i].queue = NULL;
|
paulo@0
|
354 tx_packet->queues[i].bytes_queued = 0;
|
paulo@0
|
355 }
|
paulo@0
|
356
|
paulo@0
|
357 reset_ratios (tx_packet->queues, NR_QUEUES);
|
paulo@0
|
358
|
paulo@0
|
359 tx->udata = tx_packet;
|
paulo@0
|
360
|
paulo@0
|
361 return TRUE;
|
paulo@0
|
362 }
|
paulo@0
|
363
|
paulo@0
|
364 static BOOL free_io_buf (struct io_buf *io_buf, void *udata)
|
paulo@0
|
365 {
|
paulo@0
|
366 io_buf_free (io_buf);
|
paulo@0
|
367 return TRUE;
|
paulo@0
|
368 }
|
paulo@0
|
369
|
paulo@0
|
370 static void flush_packets (struct packet_queue *pkt_queue)
|
paulo@0
|
371 {
|
paulo@0
|
372 list_foreach_remove (pkt_queue->queue, (ListForeachFunc)free_io_buf, NULL);
|
paulo@0
|
373 pkt_queue = NULL;
|
paulo@0
|
374 }
|
paulo@0
|
375
|
paulo@0
|
376 static void tx_packet_destroy (struct tx_layer *tx)
|
paulo@0
|
377 {
|
paulo@0
|
378 struct tx_packet *tx_packet = tx->udata;
|
paulo@0
|
379 int i;
|
paulo@0
|
380
|
paulo@0
|
381 for (i = 0; i < NR_QUEUES; i++)
|
paulo@0
|
382 flush_packets (&tx_packet->queues[i]);
|
paulo@0
|
383
|
paulo@0
|
384 FREE (tx_packet);
|
paulo@0
|
385 }
|
paulo@0
|
386
|
paulo@0
|
387 static void tx_packet_consume (struct tx_layer *tx, BOOL stop)
|
paulo@0
|
388 {
|
paulo@0
|
389 /* nothing */
|
paulo@0
|
390 }
|
paulo@0
|
391
|
paulo@0
|
392 /*****************************************************************************/
|
paulo@0
|
393
|
paulo@0
|
394 static void tx_packet_enable (struct tx_layer *tx)
|
paulo@0
|
395 {
|
paulo@0
|
396 /* TODO */
|
paulo@0
|
397 }
|
paulo@0
|
398
|
paulo@0
|
399 static void tx_packet_disable (struct tx_layer *tx)
|
paulo@0
|
400 {
|
paulo@0
|
401 /* TODO */
|
paulo@0
|
402 }
|
paulo@0
|
403
|
paulo@0
|
404 /*****************************************************************************/
|
paulo@0
|
405
|
paulo@0
|
406 struct tx_layer_ops gt_tx_packet_ops =
|
paulo@0
|
407 {
|
paulo@0
|
408 tx_packet_init,
|
paulo@0
|
409 tx_packet_destroy,
|
paulo@0
|
410 tx_packet_consume,
|
paulo@0
|
411 tx_packet_queue,
|
paulo@0
|
412 tx_packet_ready,
|
paulo@0
|
413 tx_packet_enable,
|
paulo@0
|
414 tx_packet_disable,
|
paulo@0
|
415 };
|