view src/io/tx_deflate.c @ 0:d39e1d0d75b6

initial add
author paulo@hit-nxdomain.opendns.com
date Sat, 20 Feb 2010 21:18:28 -0800
parents
children
line source
1 /*
2 * $Id: tx_deflate.c,v 1.15 2004/05/02 08:55:00 hipnod Exp $
3 *
4 * Copyright (C) 2004 giFT project (gift.sourceforge.net)
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2, or (at your option) any
9 * later version.
10 *
11 * This program is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
15 */
17 #include "gt_gnutella.h"
18 #include "gt_packet.h" /* packet manipulation macros */
20 #include "io/tx_stack.h"
21 #include "io/tx_layer.h"
22 #include "io/io_buf.h"
24 #include <zlib.h>
26 /*****************************************************************************/
28 #define DEFLATE_DEBUG 0
30 #if DEFLATE_DEBUG
31 #define DEFLATE_TRACEFN(tx) \
32 GT->DBGSOCK (GT, tx->stack->c, "entered")
34 #define DEFLATE_DUMP(tx_deflate) \
35 { \
36 if (DEFLATE_DEBUG) \
37 { \
38 float percent = ((float)tx_deflate->nbytes_in - \
39 tx_deflate->nbytes_out - tx_deflate->nbytes_unflushed) / \
40 (float)tx_deflate->nbytes_in; \
41 \
42 GT->DBGSOCK (GT, tx->stack->c, "in %lu out %lu flushed %lu unflushed %lu (flushing %d) " \
43 "ratio %.2f%% avg %.2f", \
44 (long)tx_deflate->nbytes_in, (long)tx_deflate->nbytes_out, \
45 (long)tx_deflate->nbytes_flushed, \
46 (long)tx_deflate->nbytes_unflushed, \
47 (long)tx_deflate->flushing, percent * 100.0, \
48 (double)tx_deflate->nbytes_out / \
49 difftime (time (NULL), tx->stack->start_time)); \
50 } \
51 }
52 #else /* !DEFLATE_DEBUG */
53 #define DEFLATE_TRACEFN(tx)
54 #define DEFLATE_DUMP(tx_deflate)
55 #endif /* DEFLATE_DEBUG */
57 /*****************************************************************************/
59 #define TX_DEFLATE_BUFSIZE (1024 - 1) /* -1 for auto-nullification */
61 #define FLUSH_AFTER (4096) /* flush after this many bytes */
63 #define NAGLE_TIMEOUT (200 * MSEC) /* 200 milliseconds */
65 /*****************************************************************************/
67 struct tx_deflate
68 {
69 /* zlib data */
70 z_stream z;
72 /* compressed buffer */
73 struct io_buf *buf;
75 /* Nagle timer that sends stored data after NAGLE_TIMEOUT milliseconds */
76 timer_id nagle_timer;
78 size_t nbytes_in; /* total uncompressed bytes */
79 size_t nbytes_out; /* total compressed bytes */
80 size_t nbytes_flushed; /* total bytes written to lower layer */
81 size_t nbytes_unflushed; /* bytes currently waiting in z_stream */
83 /*
84 * Whether the zstream is currently being flushed, and so whether deflate
85 * must receive a Z_SYNC_FLUSH parameter to continue flushing. The flush
86 * ends when deflate returns with avail_out > 0.
87 */
88 BOOL flushing;
90 /*
91 * When doing a flush, it's possible that there will be a partially
92 * filled buffer leftover. If there's no new data that comes in, the data
93 * will be delayed again until more data comes from the upper layer. This
94 * flag is set when this happens, so we know that we should flush the
95 * buffer to the lower layer as soon as possible, even if it isn't
96 * completely full.
97 */
98 BOOL delayed;
99 };
101 /*****************************************************************************/
103 static void start_nagle_timer (struct tx_layer *tx, struct tx_deflate *deflate);
104 static void stop_nagle_timer (struct tx_layer *tx, struct tx_deflate *deflate);
106 /*****************************************************************************/
108 static void tx_deflate_enable (struct tx_layer *tx)
109 {
110 /* TODO */
111 }
113 static void tx_deflate_disable (struct tx_layer *tx)
114 {
115 /* TODO */
116 }
118 /*****************************************************************************/
120 static void tx_deflate_toggle (struct tx_layer *tx, BOOL stop)
121 {
122 /* nothing, we do not consume packets, only pass along */
123 }
125 /*****************************************************************************/
127 static BOOL alloc_buffer (struct tx_deflate *tx_deflate)
128 {
129 if (tx_deflate->buf)
130 return TRUE;
132 if (!(tx_deflate->buf = io_buf_new (TX_DEFLATE_BUFSIZE)))
133 return FALSE;
135 return TRUE;
136 }
138 static void finish_flush (struct tx_deflate *tx_deflate)
139 {
140 tx_deflate->nbytes_unflushed = 0;
141 tx_deflate->flushing = FALSE;
142 }
144 static tx_status_t flush_buffer (struct tx_layer *tx,
145 struct tx_deflate *tx_deflate)
146 {
147 tx_status_t ret;
148 size_t n;
150 DEFLATE_TRACEFN(tx);
152 n = io_buf_read_avail (tx_deflate->buf);
154 /*
155 * The buffer filled up. Try to send again until the lower
156 * layer is saturated.
157 */
158 ret = gt_tx_layer_queue (tx, tx_deflate->buf);
159 assert (ret != TX_EMPTY);
161 if (ret == TX_ERROR || ret == TX_FULL)
162 return ret;
164 tx_deflate->nbytes_flushed += n;
165 assert (ret == TX_OK);
167 stop_nagle_timer (tx, tx_deflate);
169 tx_deflate->buf = NULL;
170 tx_deflate->delayed = FALSE;
172 return TX_OK;
173 }
175 /*
176 * Try to flush the data inside the z_stream and send it to the layer beneath
177 * this one.
178 */
179 static tx_status_t flush_stream (struct tx_layer *tx,
180 struct tx_deflate *tx_deflate)
181 {
182 z_stream *z = &tx_deflate->z;
183 tx_status_t ret;
184 int zret;
185 size_t wlen, old_avail;
187 DEFLATE_TRACEFN(tx);
189 if (!alloc_buffer (tx_deflate))
190 return TX_ERROR;
192 old_avail = io_buf_write_avail (tx_deflate->buf);
194 z->avail_in = 0;
195 z->next_in = NULL; /* don't disrupt anything else */
196 z->next_out = io_buf_write_ptr (tx_deflate->buf);
197 z->avail_out = old_avail;
199 zret = deflate (z, Z_SYNC_FLUSH);
201 /*
202 * If this is true we've already flushed all possible data.
203 */
204 if (zret == Z_BUF_ERROR)
205 {
206 tx_deflate->flushing = FALSE;
208 /* send the stored data */
209 if (io_buf_read_avail (tx_deflate->buf) > 0)
210 return flush_buffer (tx, tx_deflate);
212 return TX_EMPTY;
213 }
215 if (zret != Z_OK)
216 return TX_ERROR;
218 wlen = old_avail - z->avail_out;
220 io_buf_push (tx_deflate->buf, wlen);
221 tx_deflate->nbytes_out += wlen;
223 tx_deflate->flushing = TRUE;
225 /* if there is space, the flush completed successfully */
226 if (z->avail_out > 0)
227 finish_flush (tx_deflate);
229 if ((ret = flush_buffer (tx, tx_deflate) != TX_OK))
230 return ret;
232 /* stop when the flush completes */
233 if (!tx_deflate->flushing)
234 return TX_OK;
236 /* tail recurse until the flush completes */
237 return flush_stream (tx, tx_deflate);
238 }
240 static BOOL deflate_nagle_timeout (struct tx_layer *tx)
241 {
242 struct tx_deflate *tx_deflate = tx->udata;
243 tx_status_t ret;
245 DEFLATE_TRACEFN(tx);
247 /* this assertion means we have to disarm the timer when sending the
248 * buffer */
249 assert (tx_deflate->buf != NULL);
251 ret = flush_stream (tx, tx_deflate);
253 /* no matter what, we disable the Nagle timer after this */
254 stop_nagle_timer (tx, tx_deflate);
256 if (ret == TX_ERROR)
257 {
258 gt_tx_stack_abort (tx->stack);
259 return FALSE;
260 }
262 if (DEFLATE_DEBUG)
263 GT->DBGSOCK (GT, tx->stack->c, "buffer delayed?: %d", tx_deflate->delayed);
265 return FALSE;
266 }
268 static void start_nagle_timer (struct tx_layer *tx,
269 struct tx_deflate *tx_deflate)
270 {
271 if (DEFLATE_DEBUG)
272 GT->DBGSOCK (GT, tx->stack->c, "nagle timer=%d", tx_deflate->nagle_timer);
274 if (tx_deflate->nagle_timer != 0)
275 return;
277 tx_deflate->nagle_timer = timer_add (NAGLE_TIMEOUT,
278 (TimerCallback)deflate_nagle_timeout,
279 tx);
280 }
282 static void stop_nagle_timer (struct tx_layer *tx,
283 struct tx_deflate *tx_deflate)
284 {
285 if (DEFLATE_DEBUG)
286 GT->DBGSOCK (GT, tx->stack->c, "nagle timer=%d", tx_deflate->nagle_timer);
288 timer_remove_zero (&tx_deflate->nagle_timer);
289 }
291 /*****************************************************************************/
293 /*
294 * The upper layer has sent us a buffer to process.
295 */
296 static tx_status_t tx_deflate_queue (struct tx_layer *tx, struct io_buf *msg)
297 {
298 struct tx_deflate *tx_deflate = tx->udata;
299 z_stream *z = &tx_deflate->z;
300 BOOL flush_completed = FALSE;
301 int ret;
303 DEFLATE_TRACEFN(tx);
305 /*
306 * Deflate the incoming message, adding it to the buffer.
307 *
308 * If our buffer is currently full, return TX_FULL.
309 */
311 if (!alloc_buffer (tx_deflate))
312 {
313 io_buf_free (msg);
314 return TX_ERROR;
315 }
317 z->next_in = io_buf_read_ptr (msg);
318 z->avail_in = io_buf_read_avail (msg);
319 z->next_out = io_buf_write_ptr (tx_deflate->buf);
320 z->avail_out = io_buf_write_avail (tx_deflate->buf);
322 if (z->avail_out == 0)
323 return TX_FULL;
325 while (io_buf_read_avail (msg) > 0 && z->avail_out > 0)
326 {
327 size_t rlen, wlen;
329 assert (z->next_in == io_buf_read_ptr (msg));
330 assert (z->next_out == io_buf_write_ptr (tx_deflate->buf));
332 /* begin flushing after a certain amount */
333 if (tx_deflate->nbytes_unflushed >= FLUSH_AFTER)
334 tx_deflate->flushing = TRUE;
336 ret = deflate (z, tx_deflate->flushing ? Z_SYNC_FLUSH : 0);
338 if (ret != Z_OK)
339 {
340 GT->DBGFN (GT, "deflate: error %d", ret);
341 io_buf_free (msg);
342 return TX_ERROR;
343 }
345 rlen = io_buf_read_avail (msg) - z->avail_in;
346 wlen = io_buf_write_avail (tx_deflate->buf) - z->avail_out;
347 assert (rlen > 0 || wlen > 0); /* hmm, is this true when flushing? */
348 #if 0
349 assert (wlen > 0);
350 #endif
352 tx_deflate->nbytes_in += rlen;
353 tx_deflate->nbytes_unflushed += rlen;
354 tx_deflate->nbytes_out += wlen;
356 DEFLATE_DUMP(tx_deflate);
358 /* update the buffer lengths */
359 io_buf_push (tx_deflate->buf, wlen);
360 io_buf_pop (msg, rlen);
362 if (z->avail_out == 0)
363 break;
365 /*
366 * If we have available output space and no more input space,
367 * we know the flush completed, so unset flush mode.
368 *
369 * NOTE: there might be a bug here. The flush may fit exactly
370 * everytime, causing us to never leave flush mode. I think zlib may
371 * try to prevent this itself, though.
372 */
373 if (tx_deflate->flushing && z->avail_in == 0)
374 {
375 flush_completed = TRUE;
376 finish_flush (tx_deflate);
377 }
378 }
380 /*
381 * If we completed a flush, and the buffer isn't full, set the delayed
382 * flag so that service_deflate() will write the buffer immediately to
383 * reduce latency, as it has already endured a Nagle timeout period.
384 */
385 if (flush_completed &&
386 io_buf_read_avail (tx_deflate->buf) < TX_DEFLATE_BUFSIZE)
387 {
388 if (DEFLATE_DEBUG)
389 {
390 GT->DBGSOCK (GT, tx->stack->c, "setting ->delayed flag on buf(%d)",
391 io_buf_read_avail (tx_deflate->buf));
392 }
394 tx_deflate->delayed = TRUE;
395 }
397 /*
398 * If the message buffer was only partially emptied, don't free
399 * it and let tx_layer.c know to handle it specially.
400 */
401 if (io_buf_read_avail (msg) > 0)
402 return TX_PARTIAL;
404 io_buf_free (msg);
406 return TX_OK;
407 }
409 /*****************************************************************************/
411 /*
412 * Get more data to write.
413 */
414 static tx_status_t get_buffers (struct tx_layer *tx,
415 struct tx_deflate *tx_deflate)
416 {
417 if (tx_deflate->buf && io_buf_write_avail (tx_deflate->buf) == 0)
418 return TX_OK;
420 return gt_tx_layer_ready (tx);
421 }
423 /*
424 * This is the most complicated part of the whole stack:
425 *
426 * [1] Call upper layer's ready routine to grab a buffer (gt_tx_layer_ready).
427 *
428 * [2] That function will call tx_deflate_queue, which compresses the data to
429 * a buffer, as many times as it can while there's more data to process.
430 *
431 * [3] If we didn't fill the buffer, or there was no data, return TX_EMPTY
432 * telling the lower layer there is no data.
433 *
434 * [4] If there's no data in the upper layer, but we're in flush mode, call
435 * flush_stream() to send whatever data is stored inside the z_stream,
436 * and stop.
437 *
438 * [5] If we filled the buffer, or if we have a paritally filled buffer that
439 * was delayed in deflate_nagle_timeout(), send it to the lower layer with
440 * flush_buffer(). If the lower layer returns TX_FULL, stop and return
441 * TX_OK. Otherwise, continue by calling this function recursively.
442 *
443 * NOTE: The buffer is filled in tx_deflate_queue but sent in this
444 * function (or from the Nagle timer if the buffer isn't full).
445 *
446 * The caller of this function has to setup a Nagle timer if any data was
447 * written and TX_FULL was not encountered.
448 */
449 static tx_status_t service_deflate (struct tx_layer *tx,
450 struct tx_deflate *tx_deflate)
451 {
452 tx_status_t ret;
454 DEFLATE_TRACEFN(tx);
456 /* [1] + [2] */
457 ret = get_buffers (tx, tx_deflate);
459 if (ret == TX_ERROR)
460 return TX_ERROR;
462 /* [3] */
463 if (ret == TX_EMPTY)
464 {
465 assert (ret == TX_EMPTY);
467 /* [4]: continue flush even if no data avail */
468 if (tx_deflate->flushing)
469 ret = flush_stream (tx, tx_deflate);
471 return ret;
472 }
474 assert (tx_deflate->buf != NULL);
476 if (DEFLATE_DEBUG)
477 {
478 if (tx_deflate->delayed)
479 {
480 GT->DBGSOCK (GT, tx->stack->c, "flushing delayed buf(%d)",
481 io_buf_read_avail (tx_deflate->buf));
482 }
483 }
485 assert (ret == TX_OK);
487 /*
488 * [5]
489 *
490 * flush_buffer will stop the Nagle timer if the buffer was
491 * successfully sent.
492 *
493 * We must also flush the buffer if it contains partial data from a
494 * previous flush that was delayed in the Nagle timer due to having no
495 * space.
496 */
497 if (tx_deflate->delayed || io_buf_write_avail (tx_deflate->buf) == 0)
498 ret = flush_buffer (tx, tx_deflate);
500 if (ret != TX_OK)
501 return ret;
503 /* tail recurse until the lower layer is saturated */
504 return service_deflate (tx, tx_deflate);
505 }
507 /*
508 * The lower layer is ready to write.
509 */
510 static tx_status_t tx_deflate_ready (struct tx_layer *tx)
511 {
512 struct tx_deflate *tx_deflate = tx->udata;
513 size_t old_flushed;
514 tx_status_t ret;
516 /* keep track of how much was previously flushed */
517 old_flushed = tx_deflate->nbytes_flushed;
519 ret = service_deflate (tx, tx_deflate);
521 if (ret == TX_ERROR || ret == TX_FULL)
522 {
523 if (ret == TX_FULL)
524 {
525 /* flush buffer shouldve deactivated the Nagle timer */
526 assert (tx_deflate->nagle_timer == 0);
528 /* we wrote something -- let caller know it's ok */
529 ret = TX_OK;
530 }
532 return ret;
533 }
535 assert (ret == TX_OK || ret == TX_EMPTY);
537 /*
538 * If the lower layer was not saturated (evidenced by _not_ returning
539 * TX_FULL), and there is a partially completed buffer, the Nagle
540 * timer must be armed. This ensures the data waiting in this layer will
541 * go out in a timely manner. If the lower layer was saturated, we don't
542 * need to arm the timer because there is no buffer space to flush to
543 * anyway, and when the lower layer unsaturates it will reinvoke this
544 * layer to write more data.
545 *
546 * TODO: Still need to flush if there is some urgent data waiting. So,
547 * should add a ->flush callback.
548 *
549 * XXX: Using tx_deflate->buf != NULL as a hacky way to recognize that
550 * some data was written to the z_stream.
551 */
552 if (tx_deflate->buf != NULL)
553 start_nagle_timer (tx, tx_deflate);
555 if (DEFLATE_DEBUG)
556 {
557 GT->DBGSOCK (GT, tx->stack->c, "buf waiting=[%d] ret=%s",
558 tx_deflate->buf ? io_buf_read_avail (tx_deflate->buf) : 0,
559 ret == TX_EMPTY ? "TX_EMPTY" : "TX_OK");
560 }
562 DEFLATE_DUMP(tx_deflate);
564 /*
565 * For the return value from this function, decipher whether
566 * service_deflate() wrote some data.
567 *
568 * If nothing was written, then we should stop sending now, by returning
569 * TX_EMPTY. That will remove the input in tx_link.c that's calling this
570 * layer, which kind of sucks, because this could be the case a lot of the
571 * time when the whole buffer hasn't been filled up, leading to a removing
572 * and adding the input a lot.
573 *
574 * Otherwise, return TX_OK if something was sent to the lower layer.
575 */
576 if (old_flushed == tx_deflate->nbytes_flushed)
577 return TX_EMPTY;
579 return TX_OK;
580 }
582 /*****************************************************************************/
584 static BOOL tx_deflate_init (struct tx_layer *tx)
585 {
586 struct tx_deflate *tx_deflate;
588 if (!(tx_deflate = malloc (sizeof(*tx_deflate))))
589 return FALSE;
591 /* zlib documents these variables as needing initialization before
592 * deflateInit() */
593 tx_deflate->z.zalloc = Z_NULL;
594 tx_deflate->z.zfree = Z_NULL;
595 tx_deflate->z.opaque = Z_NULL;
597 if (deflateInit (&tx_deflate->z, Z_DEFAULT_COMPRESSION) != Z_OK)
598 {
599 FREE (tx_deflate);
600 return FALSE;
601 }
603 tx_deflate->buf = NULL;
604 tx_deflate->nagle_timer = 0;
605 tx_deflate->nbytes_in = 0;
606 tx_deflate->nbytes_out = 0;
607 tx_deflate->nbytes_flushed = 0;
608 tx_deflate->nbytes_unflushed = 0;
609 tx_deflate->flushing = FALSE;
610 tx_deflate->delayed = FALSE;
612 tx->udata = tx_deflate;
613 return TRUE;
614 }
616 static void tx_deflate_destroy (struct tx_layer *tx)
617 {
618 struct tx_deflate *tx_deflate = tx->udata;
620 io_buf_free (tx_deflate->buf);
621 timer_remove (tx_deflate->nagle_timer);
623 deflateEnd (&tx_deflate->z);
624 FREE (tx_deflate);
625 }
627 /*****************************************************************************/
629 struct tx_layer_ops gt_tx_deflate_ops =
630 {
631 tx_deflate_init,
632 tx_deflate_destroy,
633 tx_deflate_toggle,
634 tx_deflate_queue,
635 tx_deflate_ready,
636 tx_deflate_enable,
637 tx_deflate_disable,
638 };