Mercurial > hg > index.fcgi > gift-gnutella > gift-gnutella-0.0.11-1pba
comparison src/io/tx_packet.c @ 0:d39e1d0d75b6
initial add
author | paulo@hit-nxdomain.opendns.com |
---|---|
date | Sat, 20 Feb 2010 21:18:28 -0800 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:b023a262bbe9 |
---|---|
1 /* | |
2 * $Id: tx_packet.c,v 1.10 2005/01/04 14:59:23 mkern Exp $ | |
3 * | |
4 * Copyright (C) 2004 giFT project (gift.sourceforge.net) | |
5 * | |
6 * This program is free software; you can redistribute it and/or modify it | |
7 * under the terms of the GNU General Public License as published by the | |
8 * Free Software Foundation; either version 2, or (at your option) any | |
9 * later version. | |
10 * | |
11 * This program is distributed in the hope that it will be useful, but | |
12 * WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |
14 * General Public License for more details. | |
15 */ | |
16 | |
17 #include "gt_gnutella.h" | |
18 #include "gt_packet.h" /* packet manipulation macros */ | |
19 | |
20 #include "io/tx_stack.h" | |
21 #include "io/tx_layer.h" | |
22 #include "io/io_buf.h" | |
23 | |
24 /*****************************************************************************/ | |
25 | |
26 #define TX_PACKET_DEBUG 0 | |
27 | |
28 /*****************************************************************************/ | |
29 | |
30 /* | |
31 * Relative packet priority ratios. These control how many packets of a | |
32 * certain type are sent prior to looking for other types. For each type we | |
33 * maintain an independent FIFO queue. Each time a packet can be sent, each | |
34 * queue is checked. | |
35 * | |
36 * Packets in a queue will be sent while the ratio is greater than zero, and | |
37 * there are no higher priority packets waiting. Once there are no queues | |
38 * with both waiting packets and a non-zero ratio, the queue priority ratios | |
39 * are reset so that more packets can be sent. This process continues until | |
40 * the lower layer becomes saturated. | |
41 * | |
42 * Note that it is bad idea to reset the priority ratios only when all the | |
43 * ratios are zero, because this could lead to starvation for some packet | |
44 * types. | |
45 * | |
46 * Pushes have the highest priorty of normal messages. Also, there is a | |
47 * special 'urgent' queue that has higher priority, that includes replies to | |
48 * keepalive pings and other important high-priority messages. | |
49 */ | |
50 #define URGENT_RATIO INT_MAX | |
51 #define PUSH_RATIO 5 | |
52 #define QHIT_RATIO 4 | |
53 #define QUERY_RATIO 3 | |
54 #define PONG_RATIO 2 | |
55 #define PING_RATIO 1 | |
56 #define MISC_RATIO 1 | |
57 | |
58 #define NR_QUEUES (7) | |
59 | |
60 /*****************************************************************************/ | |
61 | |
62 struct packet_queue | |
63 { | |
64 gt_packet_type_t msg_type; | |
65 size_t ratio; /* how many packets left on this turn? */ | |
66 size_t bytes_queued; /* total bytes queued */ | |
67 List *queue; | |
68 }; | |
69 | |
70 struct tx_packet | |
71 { | |
72 struct packet_queue queues[NR_QUEUES]; | |
73 int total_pkts; /* used to quickly test if empty */ | |
74 }; | |
75 | |
76 /*****************************************************************************/ | |
77 /* DEBUGGING/TRACING */ | |
78 | |
79 #if TX_PACKET_DEBUG | |
80 /* ripped from gt_packet.c */ | |
81 static const char *packet_command_str (uint8_t cmd) | |
82 { | |
83 static char buf[16]; | |
84 | |
85 switch (cmd) | |
86 { | |
87 case GT_MSG_PING: return "PING"; | |
88 case GT_MSG_PING_REPLY: return "PONG"; | |
89 case GT_MSG_BYE: return "BYE"; | |
90 case GT_MSG_QUERY_ROUTE: return "QROUTE"; | |
91 case GT_MSG_VENDOR: return "VMSG"; | |
92 case GT_MSG_VENDOR_STD: return "VMSG-S"; | |
93 case GT_MSG_PUSH: return "PUSH"; | |
94 case GT_MSG_QUERY: return "QUERY"; | |
95 case GT_MSG_QUERY_REPLY: return "HITS"; | |
96 | |
97 default: | |
98 snprintf (buf, sizeof (buf), "[<%02hx>]", cmd); | |
99 return buf; | |
100 } | |
101 } | |
102 | |
103 static void dump_packet (struct io_buf *buf, String *str) | |
104 { | |
105 uint8_t cmd = get_command (buf->data); | |
106 string_appendf (str, "%s,", packet_command_str (cmd)); | |
107 } | |
108 | |
109 static void trace_queue_list (List *queue, String *str) | |
110 { | |
111 list_foreach (queue, (ListForeachFunc)dump_packet, str); | |
112 } | |
113 #endif /* TX_PACKET_DEBUG */ | |
114 | |
115 static void trace_queue (struct tx_layer *tx, const char *id) | |
116 { | |
117 #if TX_PACKET_DEBUG | |
118 struct tx_packet *tx_packet = tx->udata; | |
119 int i; | |
120 String *s; | |
121 TCPC *c; | |
122 | |
123 if (!(s = string_new (NULL, 0, 0, TRUE))) | |
124 return; | |
125 | |
126 c = tx->stack->c; | |
127 string_appendf (s, "{%s totalpkts=%d ", id, tx_packet->total_pkts); | |
128 | |
129 for (i = 0; i < NR_QUEUES; i++) | |
130 trace_queue_list (tx_packet->queues[i].queue, s); | |
131 | |
132 string_append (s, "}"); | |
133 | |
134 GT->DBGSOCK (GT, c, "%s", s->str); | |
135 string_free (s); | |
136 #endif | |
137 } | |
138 | |
139 /*****************************************************************************/ | |
140 | |
141 /* return the queue on which this message should go */ | |
142 static size_t get_queue (struct io_buf *msg) | |
143 { | |
144 uint8_t cmd; | |
145 | |
146 cmd = get_command (msg->data); | |
147 | |
148 switch (cmd) | |
149 { | |
150 default: | |
151 abort (); | |
152 | |
153 case GT_MSG_VENDOR: | |
154 case GT_MSG_VENDOR_STD: | |
155 case GT_MSG_QUERY_ROUTE: | |
156 return 6; | |
157 | |
158 case GT_MSG_PING: | |
159 { | |
160 /* queue keep-alive pings in the urgent queue */ | |
161 if (get_ttl (msg->data) == 1 && get_hops (msg->data) == 0) | |
162 return 0; | |
163 } | |
164 return 5; | |
165 | |
166 case GT_MSG_PING_REPLY: | |
167 { | |
168 /* | |
169 * Queue replies to keep-alive ping in the urgent queue. | |
170 * | |
171 * This allows the remote end to starve it's own connection | |
172 * with a series of keep-alive pings. Only flow-control | |
173 * can handle this. | |
174 */ | |
175 if (get_ttl (msg->data) == 1 && get_hops (msg->data) == 0) | |
176 return 0; | |
177 } | |
178 return 4; | |
179 | |
180 case GT_MSG_QUERY: | |
181 { | |
182 /* make queries from this node more important */ | |
183 if (get_ttl (msg->data) == 1 && get_hops (msg->data) == 0) | |
184 return 1; | |
185 } | |
186 return 3; | |
187 | |
188 case GT_MSG_QUERY_REPLY: | |
189 return 2; | |
190 | |
191 case GT_MSG_PUSH: | |
192 return 1; | |
193 | |
194 case GT_MSG_BYE: | |
195 return 0; | |
196 } | |
197 | |
198 abort (); | |
199 } | |
200 | |
201 static void enqueue_packet (struct packet_queue *pkt_queue, struct io_buf *msg) | |
202 { | |
203 pkt_queue->queue = list_append (pkt_queue->queue, msg); | |
204 } | |
205 | |
206 /* | |
207 * Called from upper layer when it wants to send us a message buffer. | |
208 */ | |
209 static tx_status_t tx_packet_queue (struct tx_layer *tx, struct io_buf *io_buf) | |
210 { | |
211 struct tx_packet *tx_packet = tx->udata; | |
212 size_t queue_nr; | |
213 | |
214 queue_nr = get_queue (io_buf); | |
215 | |
216 assert (queue_nr < NR_QUEUES); | |
217 enqueue_packet (&tx_packet->queues[queue_nr], io_buf); | |
218 | |
219 tx_packet->total_pkts++; | |
220 assert (tx_packet->total_pkts > 0); | |
221 | |
222 trace_queue (tx, "*0*"); | |
223 | |
224 return TX_OK; | |
225 } | |
226 | |
227 /*****************************************************************************/ | |
228 | |
229 static void set_queue (struct packet_queue *queue, gt_packet_type_t msg_type, | |
230 size_t prio) | |
231 { | |
232 queue->msg_type = msg_type; | |
233 queue->ratio = prio; | |
234 } | |
235 | |
236 static void reset_ratios (struct packet_queue *queue, size_t len) | |
237 { | |
238 set_queue (&queue[0], 0xff, URGENT_RATIO); | |
239 set_queue (&queue[1], GT_MSG_PUSH, PUSH_RATIO); | |
240 set_queue (&queue[2], GT_MSG_QUERY_REPLY, QHIT_RATIO); | |
241 set_queue (&queue[3], GT_MSG_QUERY, QUERY_RATIO); | |
242 set_queue (&queue[4], GT_MSG_PING_REPLY, PONG_RATIO); | |
243 set_queue (&queue[5], GT_MSG_PING, PING_RATIO); | |
244 set_queue (&queue[6], 0xff, MISC_RATIO); | |
245 } | |
246 | |
247 /*****************************************************************************/ | |
248 | |
249 /* | |
250 * Try to send a single message buffer from the packet queue to the lower | |
251 * layer. If the lower layer has become saturated, return FALSE. | |
252 * | |
253 * The lower layer takes responsibility for the messages sent to it in | |
254 * entirety in gt_tx_layer_queue() unless it is full. In that case it | |
255 * returns TX_FULL. | |
256 */ | |
257 static tx_status_t shift_queue (struct tx_layer *tx, | |
258 struct tx_packet *tx_packet, | |
259 struct packet_queue *pkt_queue) | |
260 { | |
261 List *msg_l; | |
262 struct io_buf *msg; | |
263 tx_status_t ret; | |
264 | |
265 msg_l = list_nth (pkt_queue->queue, 0); | |
266 msg = msg_l->data; | |
267 | |
268 ret = gt_tx_layer_queue (tx, msg); | |
269 | |
270 if (ret != TX_OK) | |
271 { | |
272 assert (ret != TX_EMPTY); /* impossible to be empty */ | |
273 return ret; | |
274 } | |
275 | |
276 /* shift this packet off the queue */ | |
277 pkt_queue->queue = list_remove_link (pkt_queue->queue, msg_l); | |
278 | |
279 tx_packet->total_pkts--; | |
280 assert (tx_packet->total_pkts >= 0); | |
281 | |
282 if (TX_PACKET_DEBUG) | |
283 trace_queue (tx, "*2*"); | |
284 | |
285 return ret; | |
286 } | |
287 | |
288 static tx_status_t service_queues (struct tx_layer *layer, | |
289 struct tx_packet *tx_packet) | |
290 { | |
291 int i; | |
292 tx_status_t ret; | |
293 | |
294 for (i = 0; i < NR_QUEUES; i++) | |
295 { | |
296 struct packet_queue *pkt_queue = &tx_packet->queues[i]; | |
297 | |
298 /* skip if ratio is small */ | |
299 while (pkt_queue->ratio > 0 && pkt_queue->queue != NULL) | |
300 { | |
301 ret = shift_queue (layer, tx_packet, pkt_queue); | |
302 pkt_queue->ratio--; | |
303 | |
304 if (ret == TX_FULL) | |
305 return TX_OK; | |
306 | |
307 if (ret != TX_OK) | |
308 return ret; | |
309 } | |
310 } | |
311 | |
312 /* reset the ratios to write more data */ | |
313 reset_ratios (tx_packet->queues, NR_QUEUES); | |
314 | |
315 /* we wrote something, so return ok */ | |
316 if (tx_packet->total_pkts == 0) | |
317 return TX_OK; | |
318 | |
319 /* tail recurse until lower layer is saturated */ | |
320 return service_queues (layer, tx_packet); | |
321 } | |
322 | |
323 /* | |
324 * Gets called when the lower layer is writable. | |
325 */ | |
326 static tx_status_t tx_packet_ready (struct tx_layer *tx) | |
327 { | |
328 struct tx_packet *tx_packet = tx->udata; | |
329 | |
330 if (tx_packet->total_pkts == 0) | |
331 return TX_EMPTY; | |
332 | |
333 if (TX_PACKET_DEBUG) | |
334 trace_queue (tx, "*1*"); | |
335 | |
336 return service_queues (tx, tx_packet); | |
337 } | |
338 | |
339 /*****************************************************************************/ | |
340 | |
341 static BOOL tx_packet_init (struct tx_layer *tx) | |
342 { | |
343 struct tx_packet *tx_packet; | |
344 int i; | |
345 | |
346 if (!(tx_packet = malloc (sizeof (struct tx_packet)))) | |
347 return FALSE; | |
348 | |
349 tx_packet->total_pkts = 0; | |
350 | |
351 for (i = 0; i < NR_QUEUES; i++) | |
352 { | |
353 tx_packet->queues[i].queue = NULL; | |
354 tx_packet->queues[i].bytes_queued = 0; | |
355 } | |
356 | |
357 reset_ratios (tx_packet->queues, NR_QUEUES); | |
358 | |
359 tx->udata = tx_packet; | |
360 | |
361 return TRUE; | |
362 } | |
363 | |
364 static BOOL free_io_buf (struct io_buf *io_buf, void *udata) | |
365 { | |
366 io_buf_free (io_buf); | |
367 return TRUE; | |
368 } | |
369 | |
370 static void flush_packets (struct packet_queue *pkt_queue) | |
371 { | |
372 list_foreach_remove (pkt_queue->queue, (ListForeachFunc)free_io_buf, NULL); | |
373 pkt_queue = NULL; | |
374 } | |
375 | |
376 static void tx_packet_destroy (struct tx_layer *tx) | |
377 { | |
378 struct tx_packet *tx_packet = tx->udata; | |
379 int i; | |
380 | |
381 for (i = 0; i < NR_QUEUES; i++) | |
382 flush_packets (&tx_packet->queues[i]); | |
383 | |
384 FREE (tx_packet); | |
385 } | |
386 | |
387 static void tx_packet_consume (struct tx_layer *tx, BOOL stop) | |
388 { | |
389 /* nothing */ | |
390 } | |
391 | |
392 /*****************************************************************************/ | |
393 | |
394 static void tx_packet_enable (struct tx_layer *tx) | |
395 { | |
396 /* TODO */ | |
397 } | |
398 | |
399 static void tx_packet_disable (struct tx_layer *tx) | |
400 { | |
401 /* TODO */ | |
402 } | |
403 | |
404 /*****************************************************************************/ | |
405 | |
406 struct tx_layer_ops gt_tx_packet_ops = | |
407 { | |
408 tx_packet_init, | |
409 tx_packet_destroy, | |
410 tx_packet_consume, | |
411 tx_packet_queue, | |
412 tx_packet_ready, | |
413 tx_packet_enable, | |
414 tx_packet_disable, | |
415 }; |