annotate src/io/tx_deflate.c @ 0:d39e1d0d75b6

initial add
author paulo@hit-nxdomain.opendns.com
date Sat, 20 Feb 2010 21:18:28 -0800
parents
children
rev   line source
paulo@0 1 /*
paulo@0 2 * $Id: tx_deflate.c,v 1.15 2004/05/02 08:55:00 hipnod Exp $
paulo@0 3 *
paulo@0 4 * Copyright (C) 2004 giFT project (gift.sourceforge.net)
paulo@0 5 *
paulo@0 6 * This program is free software; you can redistribute it and/or modify it
paulo@0 7 * under the terms of the GNU General Public License as published by the
paulo@0 8 * Free Software Foundation; either version 2, or (at your option) any
paulo@0 9 * later version.
paulo@0 10 *
paulo@0 11 * This program is distributed in the hope that it will be useful, but
paulo@0 12 * WITHOUT ANY WARRANTY; without even the implied warranty of
paulo@0 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
paulo@0 14 * General Public License for more details.
paulo@0 15 */
paulo@0 16
paulo@0 17 #include "gt_gnutella.h"
paulo@0 18 #include "gt_packet.h" /* packet manipulation macros */
paulo@0 19
paulo@0 20 #include "io/tx_stack.h"
paulo@0 21 #include "io/tx_layer.h"
paulo@0 22 #include "io/io_buf.h"
paulo@0 23
paulo@0 24 #include <zlib.h>
paulo@0 25
paulo@0 26 /*****************************************************************************/
paulo@0 27
paulo@0 28 #define DEFLATE_DEBUG 0
paulo@0 29
paulo@0 30 #if DEFLATE_DEBUG
paulo@0 31 #define DEFLATE_TRACEFN(tx) \
paulo@0 32 GT->DBGSOCK (GT, tx->stack->c, "entered")
paulo@0 33
paulo@0 34 #define DEFLATE_DUMP(tx_deflate) \
paulo@0 35 { \
paulo@0 36 if (DEFLATE_DEBUG) \
paulo@0 37 { \
paulo@0 38 float percent = ((float)tx_deflate->nbytes_in - \
paulo@0 39 tx_deflate->nbytes_out - tx_deflate->nbytes_unflushed) / \
paulo@0 40 (float)tx_deflate->nbytes_in; \
paulo@0 41 \
paulo@0 42 GT->DBGSOCK (GT, tx->stack->c, "in %lu out %lu flushed %lu unflushed %lu (flushing %d) " \
paulo@0 43 "ratio %.2f%% avg %.2f", \
paulo@0 44 (long)tx_deflate->nbytes_in, (long)tx_deflate->nbytes_out, \
paulo@0 45 (long)tx_deflate->nbytes_flushed, \
paulo@0 46 (long)tx_deflate->nbytes_unflushed, \
paulo@0 47 (long)tx_deflate->flushing, percent * 100.0, \
paulo@0 48 (double)tx_deflate->nbytes_out / \
paulo@0 49 difftime (time (NULL), tx->stack->start_time)); \
paulo@0 50 } \
paulo@0 51 }
paulo@0 52 #else /* !DEFLATE_DEBUG */
paulo@0 53 #define DEFLATE_TRACEFN(tx)
paulo@0 54 #define DEFLATE_DUMP(tx_deflate)
paulo@0 55 #endif /* DEFLATE_DEBUG */
paulo@0 56
paulo@0 57 /*****************************************************************************/
paulo@0 58
paulo@0 59 #define TX_DEFLATE_BUFSIZE (1024 - 1) /* -1 for auto-nullification */
paulo@0 60
paulo@0 61 #define FLUSH_AFTER (4096) /* flush after this many bytes */
paulo@0 62
paulo@0 63 #define NAGLE_TIMEOUT (200 * MSEC) /* 200 milliseconds */
paulo@0 64
paulo@0 65 /*****************************************************************************/
paulo@0 66
paulo@0 67 struct tx_deflate
paulo@0 68 {
paulo@0 69 /* zlib data */
paulo@0 70 z_stream z;
paulo@0 71
paulo@0 72 /* compressed buffer */
paulo@0 73 struct io_buf *buf;
paulo@0 74
paulo@0 75 /* Nagle timer that sends stored data after NAGLE_TIMEOUT milliseconds */
paulo@0 76 timer_id nagle_timer;
paulo@0 77
paulo@0 78 size_t nbytes_in; /* total uncompressed bytes */
paulo@0 79 size_t nbytes_out; /* total compressed bytes */
paulo@0 80 size_t nbytes_flushed; /* total bytes written to lower layer */
paulo@0 81 size_t nbytes_unflushed; /* bytes currently waiting in z_stream */
paulo@0 82
paulo@0 83 /*
paulo@0 84 * Whether the zstream is currently being flushed, and so whether deflate
paulo@0 85 * must receive a Z_SYNC_FLUSH parameter to continue flushing. The flush
paulo@0 86 * ends when deflate returns with avail_out > 0.
paulo@0 87 */
paulo@0 88 BOOL flushing;
paulo@0 89
paulo@0 90 /*
paulo@0 91 * When doing a flush, it's possible that there will be a partially
paulo@0 92 * filled buffer leftover. If there's no new data that comes in, the data
paulo@0 93 * will be delayed again until more data comes from the upper layer. This
paulo@0 94 * flag is set when this happens, so we know that we should flush the
paulo@0 95 * buffer to the lower layer as soon as possible, even if it isn't
paulo@0 96 * completely full.
paulo@0 97 */
paulo@0 98 BOOL delayed;
paulo@0 99 };
paulo@0 100
paulo@0 101 /*****************************************************************************/
paulo@0 102
paulo@0 103 static void start_nagle_timer (struct tx_layer *tx, struct tx_deflate *deflate);
paulo@0 104 static void stop_nagle_timer (struct tx_layer *tx, struct tx_deflate *deflate);
paulo@0 105
paulo@0 106 /*****************************************************************************/
paulo@0 107
paulo@0 108 static void tx_deflate_enable (struct tx_layer *tx)
paulo@0 109 {
paulo@0 110 /* TODO */
paulo@0 111 }
paulo@0 112
paulo@0 113 static void tx_deflate_disable (struct tx_layer *tx)
paulo@0 114 {
paulo@0 115 /* TODO */
paulo@0 116 }
paulo@0 117
paulo@0 118 /*****************************************************************************/
paulo@0 119
paulo@0 120 static void tx_deflate_toggle (struct tx_layer *tx, BOOL stop)
paulo@0 121 {
paulo@0 122 /* nothing, we do not consume packets, only pass along */
paulo@0 123 }
paulo@0 124
paulo@0 125 /*****************************************************************************/
paulo@0 126
paulo@0 127 static BOOL alloc_buffer (struct tx_deflate *tx_deflate)
paulo@0 128 {
paulo@0 129 if (tx_deflate->buf)
paulo@0 130 return TRUE;
paulo@0 131
paulo@0 132 if (!(tx_deflate->buf = io_buf_new (TX_DEFLATE_BUFSIZE)))
paulo@0 133 return FALSE;
paulo@0 134
paulo@0 135 return TRUE;
paulo@0 136 }
paulo@0 137
paulo@0 138 static void finish_flush (struct tx_deflate *tx_deflate)
paulo@0 139 {
paulo@0 140 tx_deflate->nbytes_unflushed = 0;
paulo@0 141 tx_deflate->flushing = FALSE;
paulo@0 142 }
paulo@0 143
paulo@0 144 static tx_status_t flush_buffer (struct tx_layer *tx,
paulo@0 145 struct tx_deflate *tx_deflate)
paulo@0 146 {
paulo@0 147 tx_status_t ret;
paulo@0 148 size_t n;
paulo@0 149
paulo@0 150 DEFLATE_TRACEFN(tx);
paulo@0 151
paulo@0 152 n = io_buf_read_avail (tx_deflate->buf);
paulo@0 153
paulo@0 154 /*
paulo@0 155 * The buffer filled up. Try to send again until the lower
paulo@0 156 * layer is saturated.
paulo@0 157 */
paulo@0 158 ret = gt_tx_layer_queue (tx, tx_deflate->buf);
paulo@0 159 assert (ret != TX_EMPTY);
paulo@0 160
paulo@0 161 if (ret == TX_ERROR || ret == TX_FULL)
paulo@0 162 return ret;
paulo@0 163
paulo@0 164 tx_deflate->nbytes_flushed += n;
paulo@0 165 assert (ret == TX_OK);
paulo@0 166
paulo@0 167 stop_nagle_timer (tx, tx_deflate);
paulo@0 168
paulo@0 169 tx_deflate->buf = NULL;
paulo@0 170 tx_deflate->delayed = FALSE;
paulo@0 171
paulo@0 172 return TX_OK;
paulo@0 173 }
paulo@0 174
paulo@0 175 /*
paulo@0 176 * Try to flush the data inside the z_stream and send it to the layer beneath
paulo@0 177 * this one.
paulo@0 178 */
paulo@0 179 static tx_status_t flush_stream (struct tx_layer *tx,
paulo@0 180 struct tx_deflate *tx_deflate)
paulo@0 181 {
paulo@0 182 z_stream *z = &tx_deflate->z;
paulo@0 183 tx_status_t ret;
paulo@0 184 int zret;
paulo@0 185 size_t wlen, old_avail;
paulo@0 186
paulo@0 187 DEFLATE_TRACEFN(tx);
paulo@0 188
paulo@0 189 if (!alloc_buffer (tx_deflate))
paulo@0 190 return TX_ERROR;
paulo@0 191
paulo@0 192 old_avail = io_buf_write_avail (tx_deflate->buf);
paulo@0 193
paulo@0 194 z->avail_in = 0;
paulo@0 195 z->next_in = NULL; /* don't disrupt anything else */
paulo@0 196 z->next_out = io_buf_write_ptr (tx_deflate->buf);
paulo@0 197 z->avail_out = old_avail;
paulo@0 198
paulo@0 199 zret = deflate (z, Z_SYNC_FLUSH);
paulo@0 200
paulo@0 201 /*
paulo@0 202 * If this is true we've already flushed all possible data.
paulo@0 203 */
paulo@0 204 if (zret == Z_BUF_ERROR)
paulo@0 205 {
paulo@0 206 tx_deflate->flushing = FALSE;
paulo@0 207
paulo@0 208 /* send the stored data */
paulo@0 209 if (io_buf_read_avail (tx_deflate->buf) > 0)
paulo@0 210 return flush_buffer (tx, tx_deflate);
paulo@0 211
paulo@0 212 return TX_EMPTY;
paulo@0 213 }
paulo@0 214
paulo@0 215 if (zret != Z_OK)
paulo@0 216 return TX_ERROR;
paulo@0 217
paulo@0 218 wlen = old_avail - z->avail_out;
paulo@0 219
paulo@0 220 io_buf_push (tx_deflate->buf, wlen);
paulo@0 221 tx_deflate->nbytes_out += wlen;
paulo@0 222
paulo@0 223 tx_deflate->flushing = TRUE;
paulo@0 224
paulo@0 225 /* if there is space, the flush completed successfully */
paulo@0 226 if (z->avail_out > 0)
paulo@0 227 finish_flush (tx_deflate);
paulo@0 228
paulo@0 229 if ((ret = flush_buffer (tx, tx_deflate) != TX_OK))
paulo@0 230 return ret;
paulo@0 231
paulo@0 232 /* stop when the flush completes */
paulo@0 233 if (!tx_deflate->flushing)
paulo@0 234 return TX_OK;
paulo@0 235
paulo@0 236 /* tail recurse until the flush completes */
paulo@0 237 return flush_stream (tx, tx_deflate);
paulo@0 238 }
paulo@0 239
paulo@0 240 static BOOL deflate_nagle_timeout (struct tx_layer *tx)
paulo@0 241 {
paulo@0 242 struct tx_deflate *tx_deflate = tx->udata;
paulo@0 243 tx_status_t ret;
paulo@0 244
paulo@0 245 DEFLATE_TRACEFN(tx);
paulo@0 246
paulo@0 247 /* this assertion means we have to disarm the timer when sending the
paulo@0 248 * buffer */
paulo@0 249 assert (tx_deflate->buf != NULL);
paulo@0 250
paulo@0 251 ret = flush_stream (tx, tx_deflate);
paulo@0 252
paulo@0 253 /* no matter what, we disable the Nagle timer after this */
paulo@0 254 stop_nagle_timer (tx, tx_deflate);
paulo@0 255
paulo@0 256 if (ret == TX_ERROR)
paulo@0 257 {
paulo@0 258 gt_tx_stack_abort (tx->stack);
paulo@0 259 return FALSE;
paulo@0 260 }
paulo@0 261
paulo@0 262 if (DEFLATE_DEBUG)
paulo@0 263 GT->DBGSOCK (GT, tx->stack->c, "buffer delayed?: %d", tx_deflate->delayed);
paulo@0 264
paulo@0 265 return FALSE;
paulo@0 266 }
paulo@0 267
paulo@0 268 static void start_nagle_timer (struct tx_layer *tx,
paulo@0 269 struct tx_deflate *tx_deflate)
paulo@0 270 {
paulo@0 271 if (DEFLATE_DEBUG)
paulo@0 272 GT->DBGSOCK (GT, tx->stack->c, "nagle timer=%d", tx_deflate->nagle_timer);
paulo@0 273
paulo@0 274 if (tx_deflate->nagle_timer != 0)
paulo@0 275 return;
paulo@0 276
paulo@0 277 tx_deflate->nagle_timer = timer_add (NAGLE_TIMEOUT,
paulo@0 278 (TimerCallback)deflate_nagle_timeout,
paulo@0 279 tx);
paulo@0 280 }
paulo@0 281
paulo@0 282 static void stop_nagle_timer (struct tx_layer *tx,
paulo@0 283 struct tx_deflate *tx_deflate)
paulo@0 284 {
paulo@0 285 if (DEFLATE_DEBUG)
paulo@0 286 GT->DBGSOCK (GT, tx->stack->c, "nagle timer=%d", tx_deflate->nagle_timer);
paulo@0 287
paulo@0 288 timer_remove_zero (&tx_deflate->nagle_timer);
paulo@0 289 }
paulo@0 290
paulo@0 291 /*****************************************************************************/
paulo@0 292
paulo@0 293 /*
paulo@0 294 * The upper layer has sent us a buffer to process.
paulo@0 295 */
paulo@0 296 static tx_status_t tx_deflate_queue (struct tx_layer *tx, struct io_buf *msg)
paulo@0 297 {
paulo@0 298 struct tx_deflate *tx_deflate = tx->udata;
paulo@0 299 z_stream *z = &tx_deflate->z;
paulo@0 300 BOOL flush_completed = FALSE;
paulo@0 301 int ret;
paulo@0 302
paulo@0 303 DEFLATE_TRACEFN(tx);
paulo@0 304
paulo@0 305 /*
paulo@0 306 * Deflate the incoming message, adding it to the buffer.
paulo@0 307 *
paulo@0 308 * If our buffer is currently full, return TX_FULL.
paulo@0 309 */
paulo@0 310
paulo@0 311 if (!alloc_buffer (tx_deflate))
paulo@0 312 {
paulo@0 313 io_buf_free (msg);
paulo@0 314 return TX_ERROR;
paulo@0 315 }
paulo@0 316
paulo@0 317 z->next_in = io_buf_read_ptr (msg);
paulo@0 318 z->avail_in = io_buf_read_avail (msg);
paulo@0 319 z->next_out = io_buf_write_ptr (tx_deflate->buf);
paulo@0 320 z->avail_out = io_buf_write_avail (tx_deflate->buf);
paulo@0 321
paulo@0 322 if (z->avail_out == 0)
paulo@0 323 return TX_FULL;
paulo@0 324
paulo@0 325 while (io_buf_read_avail (msg) > 0 && z->avail_out > 0)
paulo@0 326 {
paulo@0 327 size_t rlen, wlen;
paulo@0 328
paulo@0 329 assert (z->next_in == io_buf_read_ptr (msg));
paulo@0 330 assert (z->next_out == io_buf_write_ptr (tx_deflate->buf));
paulo@0 331
paulo@0 332 /* begin flushing after a certain amount */
paulo@0 333 if (tx_deflate->nbytes_unflushed >= FLUSH_AFTER)
paulo@0 334 tx_deflate->flushing = TRUE;
paulo@0 335
paulo@0 336 ret = deflate (z, tx_deflate->flushing ? Z_SYNC_FLUSH : 0);
paulo@0 337
paulo@0 338 if (ret != Z_OK)
paulo@0 339 {
paulo@0 340 GT->DBGFN (GT, "deflate: error %d", ret);
paulo@0 341 io_buf_free (msg);
paulo@0 342 return TX_ERROR;
paulo@0 343 }
paulo@0 344
paulo@0 345 rlen = io_buf_read_avail (msg) - z->avail_in;
paulo@0 346 wlen = io_buf_write_avail (tx_deflate->buf) - z->avail_out;
paulo@0 347 assert (rlen > 0 || wlen > 0); /* hmm, is this true when flushing? */
paulo@0 348 #if 0
paulo@0 349 assert (wlen > 0);
paulo@0 350 #endif
paulo@0 351
paulo@0 352 tx_deflate->nbytes_in += rlen;
paulo@0 353 tx_deflate->nbytes_unflushed += rlen;
paulo@0 354 tx_deflate->nbytes_out += wlen;
paulo@0 355
paulo@0 356 DEFLATE_DUMP(tx_deflate);
paulo@0 357
paulo@0 358 /* update the buffer lengths */
paulo@0 359 io_buf_push (tx_deflate->buf, wlen);
paulo@0 360 io_buf_pop (msg, rlen);
paulo@0 361
paulo@0 362 if (z->avail_out == 0)
paulo@0 363 break;
paulo@0 364
paulo@0 365 /*
paulo@0 366 * If we have available output space and no more input space,
paulo@0 367 * we know the flush completed, so unset flush mode.
paulo@0 368 *
paulo@0 369 * NOTE: there might be a bug here. The flush may fit exactly
paulo@0 370 * everytime, causing us to never leave flush mode. I think zlib may
paulo@0 371 * try to prevent this itself, though.
paulo@0 372 */
paulo@0 373 if (tx_deflate->flushing && z->avail_in == 0)
paulo@0 374 {
paulo@0 375 flush_completed = TRUE;
paulo@0 376 finish_flush (tx_deflate);
paulo@0 377 }
paulo@0 378 }
paulo@0 379
paulo@0 380 /*
paulo@0 381 * If we completed a flush, and the buffer isn't full, set the delayed
paulo@0 382 * flag so that service_deflate() will write the buffer immediately to
paulo@0 383 * reduce latency, as it has already endured a Nagle timeout period.
paulo@0 384 */
paulo@0 385 if (flush_completed &&
paulo@0 386 io_buf_read_avail (tx_deflate->buf) < TX_DEFLATE_BUFSIZE)
paulo@0 387 {
paulo@0 388 if (DEFLATE_DEBUG)
paulo@0 389 {
paulo@0 390 GT->DBGSOCK (GT, tx->stack->c, "setting ->delayed flag on buf(%d)",
paulo@0 391 io_buf_read_avail (tx_deflate->buf));
paulo@0 392 }
paulo@0 393
paulo@0 394 tx_deflate->delayed = TRUE;
paulo@0 395 }
paulo@0 396
paulo@0 397 /*
paulo@0 398 * If the message buffer was only partially emptied, don't free
paulo@0 399 * it and let tx_layer.c know to handle it specially.
paulo@0 400 */
paulo@0 401 if (io_buf_read_avail (msg) > 0)
paulo@0 402 return TX_PARTIAL;
paulo@0 403
paulo@0 404 io_buf_free (msg);
paulo@0 405
paulo@0 406 return TX_OK;
paulo@0 407 }
paulo@0 408
paulo@0 409 /*****************************************************************************/
paulo@0 410
paulo@0 411 /*
paulo@0 412 * Get more data to write.
paulo@0 413 */
paulo@0 414 static tx_status_t get_buffers (struct tx_layer *tx,
paulo@0 415 struct tx_deflate *tx_deflate)
paulo@0 416 {
paulo@0 417 if (tx_deflate->buf && io_buf_write_avail (tx_deflate->buf) == 0)
paulo@0 418 return TX_OK;
paulo@0 419
paulo@0 420 return gt_tx_layer_ready (tx);
paulo@0 421 }
paulo@0 422
paulo@0 423 /*
paulo@0 424 * This is the most complicated part of the whole stack:
paulo@0 425 *
paulo@0 426 * [1] Call upper layer's ready routine to grab a buffer (gt_tx_layer_ready).
paulo@0 427 *
paulo@0 428 * [2] That function will call tx_deflate_queue, which compresses the data to
paulo@0 429 * a buffer, as many times as it can while there's more data to process.
paulo@0 430 *
paulo@0 431 * [3] If we didn't fill the buffer, or there was no data, return TX_EMPTY
paulo@0 432 * telling the lower layer there is no data.
paulo@0 433 *
paulo@0 434 * [4] If there's no data in the upper layer, but we're in flush mode, call
paulo@0 435 * flush_stream() to send whatever data is stored inside the z_stream,
paulo@0 436 * and stop.
paulo@0 437 *
paulo@0 438 * [5] If we filled the buffer, or if we have a paritally filled buffer that
paulo@0 439 * was delayed in deflate_nagle_timeout(), send it to the lower layer with
paulo@0 440 * flush_buffer(). If the lower layer returns TX_FULL, stop and return
paulo@0 441 * TX_OK. Otherwise, continue by calling this function recursively.
paulo@0 442 *
paulo@0 443 * NOTE: The buffer is filled in tx_deflate_queue but sent in this
paulo@0 444 * function (or from the Nagle timer if the buffer isn't full).
paulo@0 445 *
paulo@0 446 * The caller of this function has to setup a Nagle timer if any data was
paulo@0 447 * written and TX_FULL was not encountered.
paulo@0 448 */
paulo@0 449 static tx_status_t service_deflate (struct tx_layer *tx,
paulo@0 450 struct tx_deflate *tx_deflate)
paulo@0 451 {
paulo@0 452 tx_status_t ret;
paulo@0 453
paulo@0 454 DEFLATE_TRACEFN(tx);
paulo@0 455
paulo@0 456 /* [1] + [2] */
paulo@0 457 ret = get_buffers (tx, tx_deflate);
paulo@0 458
paulo@0 459 if (ret == TX_ERROR)
paulo@0 460 return TX_ERROR;
paulo@0 461
paulo@0 462 /* [3] */
paulo@0 463 if (ret == TX_EMPTY)
paulo@0 464 {
paulo@0 465 assert (ret == TX_EMPTY);
paulo@0 466
paulo@0 467 /* [4]: continue flush even if no data avail */
paulo@0 468 if (tx_deflate->flushing)
paulo@0 469 ret = flush_stream (tx, tx_deflate);
paulo@0 470
paulo@0 471 return ret;
paulo@0 472 }
paulo@0 473
paulo@0 474 assert (tx_deflate->buf != NULL);
paulo@0 475
paulo@0 476 if (DEFLATE_DEBUG)
paulo@0 477 {
paulo@0 478 if (tx_deflate->delayed)
paulo@0 479 {
paulo@0 480 GT->DBGSOCK (GT, tx->stack->c, "flushing delayed buf(%d)",
paulo@0 481 io_buf_read_avail (tx_deflate->buf));
paulo@0 482 }
paulo@0 483 }
paulo@0 484
paulo@0 485 assert (ret == TX_OK);
paulo@0 486
paulo@0 487 /*
paulo@0 488 * [5]
paulo@0 489 *
paulo@0 490 * flush_buffer will stop the Nagle timer if the buffer was
paulo@0 491 * successfully sent.
paulo@0 492 *
paulo@0 493 * We must also flush the buffer if it contains partial data from a
paulo@0 494 * previous flush that was delayed in the Nagle timer due to having no
paulo@0 495 * space.
paulo@0 496 */
paulo@0 497 if (tx_deflate->delayed || io_buf_write_avail (tx_deflate->buf) == 0)
paulo@0 498 ret = flush_buffer (tx, tx_deflate);
paulo@0 499
paulo@0 500 if (ret != TX_OK)
paulo@0 501 return ret;
paulo@0 502
paulo@0 503 /* tail recurse until the lower layer is saturated */
paulo@0 504 return service_deflate (tx, tx_deflate);
paulo@0 505 }
paulo@0 506
paulo@0 507 /*
paulo@0 508 * The lower layer is ready to write.
paulo@0 509 */
paulo@0 510 static tx_status_t tx_deflate_ready (struct tx_layer *tx)
paulo@0 511 {
paulo@0 512 struct tx_deflate *tx_deflate = tx->udata;
paulo@0 513 size_t old_flushed;
paulo@0 514 tx_status_t ret;
paulo@0 515
paulo@0 516 /* keep track of how much was previously flushed */
paulo@0 517 old_flushed = tx_deflate->nbytes_flushed;
paulo@0 518
paulo@0 519 ret = service_deflate (tx, tx_deflate);
paulo@0 520
paulo@0 521 if (ret == TX_ERROR || ret == TX_FULL)
paulo@0 522 {
paulo@0 523 if (ret == TX_FULL)
paulo@0 524 {
paulo@0 525 /* flush buffer shouldve deactivated the Nagle timer */
paulo@0 526 assert (tx_deflate->nagle_timer == 0);
paulo@0 527
paulo@0 528 /* we wrote something -- let caller know it's ok */
paulo@0 529 ret = TX_OK;
paulo@0 530 }
paulo@0 531
paulo@0 532 return ret;
paulo@0 533 }
paulo@0 534
paulo@0 535 assert (ret == TX_OK || ret == TX_EMPTY);
paulo@0 536
paulo@0 537 /*
paulo@0 538 * If the lower layer was not saturated (evidenced by _not_ returning
paulo@0 539 * TX_FULL), and there is a partially completed buffer, the Nagle
paulo@0 540 * timer must be armed. This ensures the data waiting in this layer will
paulo@0 541 * go out in a timely manner. If the lower layer was saturated, we don't
paulo@0 542 * need to arm the timer because there is no buffer space to flush to
paulo@0 543 * anyway, and when the lower layer unsaturates it will reinvoke this
paulo@0 544 * layer to write more data.
paulo@0 545 *
paulo@0 546 * TODO: Still need to flush if there is some urgent data waiting. So,
paulo@0 547 * should add a ->flush callback.
paulo@0 548 *
paulo@0 549 * XXX: Using tx_deflate->buf != NULL as a hacky way to recognize that
paulo@0 550 * some data was written to the z_stream.
paulo@0 551 */
paulo@0 552 if (tx_deflate->buf != NULL)
paulo@0 553 start_nagle_timer (tx, tx_deflate);
paulo@0 554
paulo@0 555 if (DEFLATE_DEBUG)
paulo@0 556 {
paulo@0 557 GT->DBGSOCK (GT, tx->stack->c, "buf waiting=[%d] ret=%s",
paulo@0 558 tx_deflate->buf ? io_buf_read_avail (tx_deflate->buf) : 0,
paulo@0 559 ret == TX_EMPTY ? "TX_EMPTY" : "TX_OK");
paulo@0 560 }
paulo@0 561
paulo@0 562 DEFLATE_DUMP(tx_deflate);
paulo@0 563
paulo@0 564 /*
paulo@0 565 * For the return value from this function, decipher whether
paulo@0 566 * service_deflate() wrote some data.
paulo@0 567 *
paulo@0 568 * If nothing was written, then we should stop sending now, by returning
paulo@0 569 * TX_EMPTY. That will remove the input in tx_link.c that's calling this
paulo@0 570 * layer, which kind of sucks, because this could be the case a lot of the
paulo@0 571 * time when the whole buffer hasn't been filled up, leading to a removing
paulo@0 572 * and adding the input a lot.
paulo@0 573 *
paulo@0 574 * Otherwise, return TX_OK if something was sent to the lower layer.
paulo@0 575 */
paulo@0 576 if (old_flushed == tx_deflate->nbytes_flushed)
paulo@0 577 return TX_EMPTY;
paulo@0 578
paulo@0 579 return TX_OK;
paulo@0 580 }
paulo@0 581
paulo@0 582 /*****************************************************************************/
paulo@0 583
paulo@0 584 static BOOL tx_deflate_init (struct tx_layer *tx)
paulo@0 585 {
paulo@0 586 struct tx_deflate *tx_deflate;
paulo@0 587
paulo@0 588 if (!(tx_deflate = malloc (sizeof(*tx_deflate))))
paulo@0 589 return FALSE;
paulo@0 590
paulo@0 591 /* zlib documents these variables as needing initialization before
paulo@0 592 * deflateInit() */
paulo@0 593 tx_deflate->z.zalloc = Z_NULL;
paulo@0 594 tx_deflate->z.zfree = Z_NULL;
paulo@0 595 tx_deflate->z.opaque = Z_NULL;
paulo@0 596
paulo@0 597 if (deflateInit (&tx_deflate->z, Z_DEFAULT_COMPRESSION) != Z_OK)
paulo@0 598 {
paulo@0 599 FREE (tx_deflate);
paulo@0 600 return FALSE;
paulo@0 601 }
paulo@0 602
paulo@0 603 tx_deflate->buf = NULL;
paulo@0 604 tx_deflate->nagle_timer = 0;
paulo@0 605 tx_deflate->nbytes_in = 0;
paulo@0 606 tx_deflate->nbytes_out = 0;
paulo@0 607 tx_deflate->nbytes_flushed = 0;
paulo@0 608 tx_deflate->nbytes_unflushed = 0;
paulo@0 609 tx_deflate->flushing = FALSE;
paulo@0 610 tx_deflate->delayed = FALSE;
paulo@0 611
paulo@0 612 tx->udata = tx_deflate;
paulo@0 613 return TRUE;
paulo@0 614 }
paulo@0 615
paulo@0 616 static void tx_deflate_destroy (struct tx_layer *tx)
paulo@0 617 {
paulo@0 618 struct tx_deflate *tx_deflate = tx->udata;
paulo@0 619
paulo@0 620 io_buf_free (tx_deflate->buf);
paulo@0 621 timer_remove (tx_deflate->nagle_timer);
paulo@0 622
paulo@0 623 deflateEnd (&tx_deflate->z);
paulo@0 624 FREE (tx_deflate);
paulo@0 625 }
paulo@0 626
paulo@0 627 /*****************************************************************************/
paulo@0 628
paulo@0 629 struct tx_layer_ops gt_tx_deflate_ops =
paulo@0 630 {
paulo@0 631 tx_deflate_init,
paulo@0 632 tx_deflate_destroy,
paulo@0 633 tx_deflate_toggle,
paulo@0 634 tx_deflate_queue,
paulo@0 635 tx_deflate_ready,
paulo@0 636 tx_deflate_enable,
paulo@0 637 tx_deflate_disable,
paulo@0 638 };