rev |
line source |
paulo@0
|
1 /*
|
paulo@0
|
2 * $Id: tx_deflate.c,v 1.15 2004/05/02 08:55:00 hipnod Exp $
|
paulo@0
|
3 *
|
paulo@0
|
4 * Copyright (C) 2004 giFT project (gift.sourceforge.net)
|
paulo@0
|
5 *
|
paulo@0
|
6 * This program is free software; you can redistribute it and/or modify it
|
paulo@0
|
7 * under the terms of the GNU General Public License as published by the
|
paulo@0
|
8 * Free Software Foundation; either version 2, or (at your option) any
|
paulo@0
|
9 * later version.
|
paulo@0
|
10 *
|
paulo@0
|
11 * This program is distributed in the hope that it will be useful, but
|
paulo@0
|
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
|
paulo@0
|
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
paulo@0
|
14 * General Public License for more details.
|
paulo@0
|
15 */
|
paulo@0
|
16
|
paulo@0
|
17 #include "gt_gnutella.h"
|
paulo@0
|
18 #include "gt_packet.h" /* packet manipulation macros */
|
paulo@0
|
19
|
paulo@0
|
20 #include "io/tx_stack.h"
|
paulo@0
|
21 #include "io/tx_layer.h"
|
paulo@0
|
22 #include "io/io_buf.h"
|
paulo@0
|
23
|
paulo@0
|
24 #include <zlib.h>
|
paulo@0
|
25
|
paulo@0
|
26 /*****************************************************************************/
|
paulo@0
|
27
|
paulo@0
|
28 #define DEFLATE_DEBUG 0
|
paulo@0
|
29
|
paulo@0
|
30 #if DEFLATE_DEBUG
|
paulo@0
|
31 #define DEFLATE_TRACEFN(tx) \
|
paulo@0
|
32 GT->DBGSOCK (GT, tx->stack->c, "entered")
|
paulo@0
|
33
|
paulo@0
|
34 #define DEFLATE_DUMP(tx_deflate) \
|
paulo@0
|
35 { \
|
paulo@0
|
36 if (DEFLATE_DEBUG) \
|
paulo@0
|
37 { \
|
paulo@0
|
38 float percent = ((float)tx_deflate->nbytes_in - \
|
paulo@0
|
39 tx_deflate->nbytes_out - tx_deflate->nbytes_unflushed) / \
|
paulo@0
|
40 (float)tx_deflate->nbytes_in; \
|
paulo@0
|
41 \
|
paulo@0
|
42 GT->DBGSOCK (GT, tx->stack->c, "in %lu out %lu flushed %lu unflushed %lu (flushing %d) " \
|
paulo@0
|
43 "ratio %.2f%% avg %.2f", \
|
paulo@0
|
44 (long)tx_deflate->nbytes_in, (long)tx_deflate->nbytes_out, \
|
paulo@0
|
45 (long)tx_deflate->nbytes_flushed, \
|
paulo@0
|
46 (long)tx_deflate->nbytes_unflushed, \
|
paulo@0
|
47 (long)tx_deflate->flushing, percent * 100.0, \
|
paulo@0
|
48 (double)tx_deflate->nbytes_out / \
|
paulo@0
|
49 difftime (time (NULL), tx->stack->start_time)); \
|
paulo@0
|
50 } \
|
paulo@0
|
51 }
|
paulo@0
|
52 #else /* !DEFLATE_DEBUG */
|
paulo@0
|
53 #define DEFLATE_TRACEFN(tx)
|
paulo@0
|
54 #define DEFLATE_DUMP(tx_deflate)
|
paulo@0
|
55 #endif /* DEFLATE_DEBUG */
|
paulo@0
|
56
|
paulo@0
|
57 /*****************************************************************************/
|
paulo@0
|
58
|
paulo@0
|
59 #define TX_DEFLATE_BUFSIZE (1024 - 1) /* -1 for auto-nullification */
|
paulo@0
|
60
|
paulo@0
|
61 #define FLUSH_AFTER (4096) /* flush after this many bytes */
|
paulo@0
|
62
|
paulo@0
|
63 #define NAGLE_TIMEOUT (200 * MSEC) /* 200 milliseconds */
|
paulo@0
|
64
|
paulo@0
|
65 /*****************************************************************************/
|
paulo@0
|
66
|
paulo@0
|
67 struct tx_deflate
|
paulo@0
|
68 {
|
paulo@0
|
69 /* zlib data */
|
paulo@0
|
70 z_stream z;
|
paulo@0
|
71
|
paulo@0
|
72 /* compressed buffer */
|
paulo@0
|
73 struct io_buf *buf;
|
paulo@0
|
74
|
paulo@0
|
75 /* Nagle timer that sends stored data after NAGLE_TIMEOUT milliseconds */
|
paulo@0
|
76 timer_id nagle_timer;
|
paulo@0
|
77
|
paulo@0
|
78 size_t nbytes_in; /* total uncompressed bytes */
|
paulo@0
|
79 size_t nbytes_out; /* total compressed bytes */
|
paulo@0
|
80 size_t nbytes_flushed; /* total bytes written to lower layer */
|
paulo@0
|
81 size_t nbytes_unflushed; /* bytes currently waiting in z_stream */
|
paulo@0
|
82
|
paulo@0
|
83 /*
|
paulo@0
|
84 * Whether the zstream is currently being flushed, and so whether deflate
|
paulo@0
|
85 * must receive a Z_SYNC_FLUSH parameter to continue flushing. The flush
|
paulo@0
|
86 * ends when deflate returns with avail_out > 0.
|
paulo@0
|
87 */
|
paulo@0
|
88 BOOL flushing;
|
paulo@0
|
89
|
paulo@0
|
90 /*
|
paulo@0
|
91 * When doing a flush, it's possible that there will be a partially
|
paulo@0
|
92 * filled buffer leftover. If there's no new data that comes in, the data
|
paulo@0
|
93 * will be delayed again until more data comes from the upper layer. This
|
paulo@0
|
94 * flag is set when this happens, so we know that we should flush the
|
paulo@0
|
95 * buffer to the lower layer as soon as possible, even if it isn't
|
paulo@0
|
96 * completely full.
|
paulo@0
|
97 */
|
paulo@0
|
98 BOOL delayed;
|
paulo@0
|
99 };
|
paulo@0
|
100
|
paulo@0
|
101 /*****************************************************************************/
|
paulo@0
|
102
|
paulo@0
|
103 static void start_nagle_timer (struct tx_layer *tx, struct tx_deflate *deflate);
|
paulo@0
|
104 static void stop_nagle_timer (struct tx_layer *tx, struct tx_deflate *deflate);
|
paulo@0
|
105
|
paulo@0
|
106 /*****************************************************************************/
|
paulo@0
|
107
|
paulo@0
|
108 static void tx_deflate_enable (struct tx_layer *tx)
|
paulo@0
|
109 {
|
paulo@0
|
110 /* TODO */
|
paulo@0
|
111 }
|
paulo@0
|
112
|
paulo@0
|
113 static void tx_deflate_disable (struct tx_layer *tx)
|
paulo@0
|
114 {
|
paulo@0
|
115 /* TODO */
|
paulo@0
|
116 }
|
paulo@0
|
117
|
paulo@0
|
118 /*****************************************************************************/
|
paulo@0
|
119
|
paulo@0
|
120 static void tx_deflate_toggle (struct tx_layer *tx, BOOL stop)
|
paulo@0
|
121 {
|
paulo@0
|
122 /* nothing, we do not consume packets, only pass along */
|
paulo@0
|
123 }
|
paulo@0
|
124
|
paulo@0
|
125 /*****************************************************************************/
|
paulo@0
|
126
|
paulo@0
|
127 static BOOL alloc_buffer (struct tx_deflate *tx_deflate)
|
paulo@0
|
128 {
|
paulo@0
|
129 if (tx_deflate->buf)
|
paulo@0
|
130 return TRUE;
|
paulo@0
|
131
|
paulo@0
|
132 if (!(tx_deflate->buf = io_buf_new (TX_DEFLATE_BUFSIZE)))
|
paulo@0
|
133 return FALSE;
|
paulo@0
|
134
|
paulo@0
|
135 return TRUE;
|
paulo@0
|
136 }
|
paulo@0
|
137
|
paulo@0
|
138 static void finish_flush (struct tx_deflate *tx_deflate)
|
paulo@0
|
139 {
|
paulo@0
|
140 tx_deflate->nbytes_unflushed = 0;
|
paulo@0
|
141 tx_deflate->flushing = FALSE;
|
paulo@0
|
142 }
|
paulo@0
|
143
|
paulo@0
|
144 static tx_status_t flush_buffer (struct tx_layer *tx,
|
paulo@0
|
145 struct tx_deflate *tx_deflate)
|
paulo@0
|
146 {
|
paulo@0
|
147 tx_status_t ret;
|
paulo@0
|
148 size_t n;
|
paulo@0
|
149
|
paulo@0
|
150 DEFLATE_TRACEFN(tx);
|
paulo@0
|
151
|
paulo@0
|
152 n = io_buf_read_avail (tx_deflate->buf);
|
paulo@0
|
153
|
paulo@0
|
154 /*
|
paulo@0
|
155 * The buffer filled up. Try to send again until the lower
|
paulo@0
|
156 * layer is saturated.
|
paulo@0
|
157 */
|
paulo@0
|
158 ret = gt_tx_layer_queue (tx, tx_deflate->buf);
|
paulo@0
|
159 assert (ret != TX_EMPTY);
|
paulo@0
|
160
|
paulo@0
|
161 if (ret == TX_ERROR || ret == TX_FULL)
|
paulo@0
|
162 return ret;
|
paulo@0
|
163
|
paulo@0
|
164 tx_deflate->nbytes_flushed += n;
|
paulo@0
|
165 assert (ret == TX_OK);
|
paulo@0
|
166
|
paulo@0
|
167 stop_nagle_timer (tx, tx_deflate);
|
paulo@0
|
168
|
paulo@0
|
169 tx_deflate->buf = NULL;
|
paulo@0
|
170 tx_deflate->delayed = FALSE;
|
paulo@0
|
171
|
paulo@0
|
172 return TX_OK;
|
paulo@0
|
173 }
|
paulo@0
|
174
|
paulo@0
|
175 /*
|
paulo@0
|
176 * Try to flush the data inside the z_stream and send it to the layer beneath
|
paulo@0
|
177 * this one.
|
paulo@0
|
178 */
|
paulo@0
|
179 static tx_status_t flush_stream (struct tx_layer *tx,
|
paulo@0
|
180 struct tx_deflate *tx_deflate)
|
paulo@0
|
181 {
|
paulo@0
|
182 z_stream *z = &tx_deflate->z;
|
paulo@0
|
183 tx_status_t ret;
|
paulo@0
|
184 int zret;
|
paulo@0
|
185 size_t wlen, old_avail;
|
paulo@0
|
186
|
paulo@0
|
187 DEFLATE_TRACEFN(tx);
|
paulo@0
|
188
|
paulo@0
|
189 if (!alloc_buffer (tx_deflate))
|
paulo@0
|
190 return TX_ERROR;
|
paulo@0
|
191
|
paulo@0
|
192 old_avail = io_buf_write_avail (tx_deflate->buf);
|
paulo@0
|
193
|
paulo@0
|
194 z->avail_in = 0;
|
paulo@0
|
195 z->next_in = NULL; /* don't disrupt anything else */
|
paulo@0
|
196 z->next_out = io_buf_write_ptr (tx_deflate->buf);
|
paulo@0
|
197 z->avail_out = old_avail;
|
paulo@0
|
198
|
paulo@0
|
199 zret = deflate (z, Z_SYNC_FLUSH);
|
paulo@0
|
200
|
paulo@0
|
201 /*
|
paulo@0
|
202 * If this is true we've already flushed all possible data.
|
paulo@0
|
203 */
|
paulo@0
|
204 if (zret == Z_BUF_ERROR)
|
paulo@0
|
205 {
|
paulo@0
|
206 tx_deflate->flushing = FALSE;
|
paulo@0
|
207
|
paulo@0
|
208 /* send the stored data */
|
paulo@0
|
209 if (io_buf_read_avail (tx_deflate->buf) > 0)
|
paulo@0
|
210 return flush_buffer (tx, tx_deflate);
|
paulo@0
|
211
|
paulo@0
|
212 return TX_EMPTY;
|
paulo@0
|
213 }
|
paulo@0
|
214
|
paulo@0
|
215 if (zret != Z_OK)
|
paulo@0
|
216 return TX_ERROR;
|
paulo@0
|
217
|
paulo@0
|
218 wlen = old_avail - z->avail_out;
|
paulo@0
|
219
|
paulo@0
|
220 io_buf_push (tx_deflate->buf, wlen);
|
paulo@0
|
221 tx_deflate->nbytes_out += wlen;
|
paulo@0
|
222
|
paulo@0
|
223 tx_deflate->flushing = TRUE;
|
paulo@0
|
224
|
paulo@0
|
225 /* if there is space, the flush completed successfully */
|
paulo@0
|
226 if (z->avail_out > 0)
|
paulo@0
|
227 finish_flush (tx_deflate);
|
paulo@0
|
228
|
paulo@0
|
229 if ((ret = flush_buffer (tx, tx_deflate) != TX_OK))
|
paulo@0
|
230 return ret;
|
paulo@0
|
231
|
paulo@0
|
232 /* stop when the flush completes */
|
paulo@0
|
233 if (!tx_deflate->flushing)
|
paulo@0
|
234 return TX_OK;
|
paulo@0
|
235
|
paulo@0
|
236 /* tail recurse until the flush completes */
|
paulo@0
|
237 return flush_stream (tx, tx_deflate);
|
paulo@0
|
238 }
|
paulo@0
|
239
|
paulo@0
|
240 static BOOL deflate_nagle_timeout (struct tx_layer *tx)
|
paulo@0
|
241 {
|
paulo@0
|
242 struct tx_deflate *tx_deflate = tx->udata;
|
paulo@0
|
243 tx_status_t ret;
|
paulo@0
|
244
|
paulo@0
|
245 DEFLATE_TRACEFN(tx);
|
paulo@0
|
246
|
paulo@0
|
247 /* this assertion means we have to disarm the timer when sending the
|
paulo@0
|
248 * buffer */
|
paulo@0
|
249 assert (tx_deflate->buf != NULL);
|
paulo@0
|
250
|
paulo@0
|
251 ret = flush_stream (tx, tx_deflate);
|
paulo@0
|
252
|
paulo@0
|
253 /* no matter what, we disable the Nagle timer after this */
|
paulo@0
|
254 stop_nagle_timer (tx, tx_deflate);
|
paulo@0
|
255
|
paulo@0
|
256 if (ret == TX_ERROR)
|
paulo@0
|
257 {
|
paulo@0
|
258 gt_tx_stack_abort (tx->stack);
|
paulo@0
|
259 return FALSE;
|
paulo@0
|
260 }
|
paulo@0
|
261
|
paulo@0
|
262 if (DEFLATE_DEBUG)
|
paulo@0
|
263 GT->DBGSOCK (GT, tx->stack->c, "buffer delayed?: %d", tx_deflate->delayed);
|
paulo@0
|
264
|
paulo@0
|
265 return FALSE;
|
paulo@0
|
266 }
|
paulo@0
|
267
|
paulo@0
|
268 static void start_nagle_timer (struct tx_layer *tx,
|
paulo@0
|
269 struct tx_deflate *tx_deflate)
|
paulo@0
|
270 {
|
paulo@0
|
271 if (DEFLATE_DEBUG)
|
paulo@0
|
272 GT->DBGSOCK (GT, tx->stack->c, "nagle timer=%d", tx_deflate->nagle_timer);
|
paulo@0
|
273
|
paulo@0
|
274 if (tx_deflate->nagle_timer != 0)
|
paulo@0
|
275 return;
|
paulo@0
|
276
|
paulo@0
|
277 tx_deflate->nagle_timer = timer_add (NAGLE_TIMEOUT,
|
paulo@0
|
278 (TimerCallback)deflate_nagle_timeout,
|
paulo@0
|
279 tx);
|
paulo@0
|
280 }
|
paulo@0
|
281
|
paulo@0
|
282 static void stop_nagle_timer (struct tx_layer *tx,
|
paulo@0
|
283 struct tx_deflate *tx_deflate)
|
paulo@0
|
284 {
|
paulo@0
|
285 if (DEFLATE_DEBUG)
|
paulo@0
|
286 GT->DBGSOCK (GT, tx->stack->c, "nagle timer=%d", tx_deflate->nagle_timer);
|
paulo@0
|
287
|
paulo@0
|
288 timer_remove_zero (&tx_deflate->nagle_timer);
|
paulo@0
|
289 }
|
paulo@0
|
290
|
paulo@0
|
291 /*****************************************************************************/
|
paulo@0
|
292
|
paulo@0
|
293 /*
|
paulo@0
|
294 * The upper layer has sent us a buffer to process.
|
paulo@0
|
295 */
|
paulo@0
|
296 static tx_status_t tx_deflate_queue (struct tx_layer *tx, struct io_buf *msg)
|
paulo@0
|
297 {
|
paulo@0
|
298 struct tx_deflate *tx_deflate = tx->udata;
|
paulo@0
|
299 z_stream *z = &tx_deflate->z;
|
paulo@0
|
300 BOOL flush_completed = FALSE;
|
paulo@0
|
301 int ret;
|
paulo@0
|
302
|
paulo@0
|
303 DEFLATE_TRACEFN(tx);
|
paulo@0
|
304
|
paulo@0
|
305 /*
|
paulo@0
|
306 * Deflate the incoming message, adding it to the buffer.
|
paulo@0
|
307 *
|
paulo@0
|
308 * If our buffer is currently full, return TX_FULL.
|
paulo@0
|
309 */
|
paulo@0
|
310
|
paulo@0
|
311 if (!alloc_buffer (tx_deflate))
|
paulo@0
|
312 {
|
paulo@0
|
313 io_buf_free (msg);
|
paulo@0
|
314 return TX_ERROR;
|
paulo@0
|
315 }
|
paulo@0
|
316
|
paulo@0
|
317 z->next_in = io_buf_read_ptr (msg);
|
paulo@0
|
318 z->avail_in = io_buf_read_avail (msg);
|
paulo@0
|
319 z->next_out = io_buf_write_ptr (tx_deflate->buf);
|
paulo@0
|
320 z->avail_out = io_buf_write_avail (tx_deflate->buf);
|
paulo@0
|
321
|
paulo@0
|
322 if (z->avail_out == 0)
|
paulo@0
|
323 return TX_FULL;
|
paulo@0
|
324
|
paulo@0
|
325 while (io_buf_read_avail (msg) > 0 && z->avail_out > 0)
|
paulo@0
|
326 {
|
paulo@0
|
327 size_t rlen, wlen;
|
paulo@0
|
328
|
paulo@0
|
329 assert (z->next_in == io_buf_read_ptr (msg));
|
paulo@0
|
330 assert (z->next_out == io_buf_write_ptr (tx_deflate->buf));
|
paulo@0
|
331
|
paulo@0
|
332 /* begin flushing after a certain amount */
|
paulo@0
|
333 if (tx_deflate->nbytes_unflushed >= FLUSH_AFTER)
|
paulo@0
|
334 tx_deflate->flushing = TRUE;
|
paulo@0
|
335
|
paulo@0
|
336 ret = deflate (z, tx_deflate->flushing ? Z_SYNC_FLUSH : 0);
|
paulo@0
|
337
|
paulo@0
|
338 if (ret != Z_OK)
|
paulo@0
|
339 {
|
paulo@0
|
340 GT->DBGFN (GT, "deflate: error %d", ret);
|
paulo@0
|
341 io_buf_free (msg);
|
paulo@0
|
342 return TX_ERROR;
|
paulo@0
|
343 }
|
paulo@0
|
344
|
paulo@0
|
345 rlen = io_buf_read_avail (msg) - z->avail_in;
|
paulo@0
|
346 wlen = io_buf_write_avail (tx_deflate->buf) - z->avail_out;
|
paulo@0
|
347 assert (rlen > 0 || wlen > 0); /* hmm, is this true when flushing? */
|
paulo@0
|
348 #if 0
|
paulo@0
|
349 assert (wlen > 0);
|
paulo@0
|
350 #endif
|
paulo@0
|
351
|
paulo@0
|
352 tx_deflate->nbytes_in += rlen;
|
paulo@0
|
353 tx_deflate->nbytes_unflushed += rlen;
|
paulo@0
|
354 tx_deflate->nbytes_out += wlen;
|
paulo@0
|
355
|
paulo@0
|
356 DEFLATE_DUMP(tx_deflate);
|
paulo@0
|
357
|
paulo@0
|
358 /* update the buffer lengths */
|
paulo@0
|
359 io_buf_push (tx_deflate->buf, wlen);
|
paulo@0
|
360 io_buf_pop (msg, rlen);
|
paulo@0
|
361
|
paulo@0
|
362 if (z->avail_out == 0)
|
paulo@0
|
363 break;
|
paulo@0
|
364
|
paulo@0
|
365 /*
|
paulo@0
|
366 * If we have available output space and no more input space,
|
paulo@0
|
367 * we know the flush completed, so unset flush mode.
|
paulo@0
|
368 *
|
paulo@0
|
369 * NOTE: there might be a bug here. The flush may fit exactly
|
paulo@0
|
370 * everytime, causing us to never leave flush mode. I think zlib may
|
paulo@0
|
371 * try to prevent this itself, though.
|
paulo@0
|
372 */
|
paulo@0
|
373 if (tx_deflate->flushing && z->avail_in == 0)
|
paulo@0
|
374 {
|
paulo@0
|
375 flush_completed = TRUE;
|
paulo@0
|
376 finish_flush (tx_deflate);
|
paulo@0
|
377 }
|
paulo@0
|
378 }
|
paulo@0
|
379
|
paulo@0
|
380 /*
|
paulo@0
|
381 * If we completed a flush, and the buffer isn't full, set the delayed
|
paulo@0
|
382 * flag so that service_deflate() will write the buffer immediately to
|
paulo@0
|
383 * reduce latency, as it has already endured a Nagle timeout period.
|
paulo@0
|
384 */
|
paulo@0
|
385 if (flush_completed &&
|
paulo@0
|
386 io_buf_read_avail (tx_deflate->buf) < TX_DEFLATE_BUFSIZE)
|
paulo@0
|
387 {
|
paulo@0
|
388 if (DEFLATE_DEBUG)
|
paulo@0
|
389 {
|
paulo@0
|
390 GT->DBGSOCK (GT, tx->stack->c, "setting ->delayed flag on buf(%d)",
|
paulo@0
|
391 io_buf_read_avail (tx_deflate->buf));
|
paulo@0
|
392 }
|
paulo@0
|
393
|
paulo@0
|
394 tx_deflate->delayed = TRUE;
|
paulo@0
|
395 }
|
paulo@0
|
396
|
paulo@0
|
397 /*
|
paulo@0
|
398 * If the message buffer was only partially emptied, don't free
|
paulo@0
|
399 * it and let tx_layer.c know to handle it specially.
|
paulo@0
|
400 */
|
paulo@0
|
401 if (io_buf_read_avail (msg) > 0)
|
paulo@0
|
402 return TX_PARTIAL;
|
paulo@0
|
403
|
paulo@0
|
404 io_buf_free (msg);
|
paulo@0
|
405
|
paulo@0
|
406 return TX_OK;
|
paulo@0
|
407 }
|
paulo@0
|
408
|
paulo@0
|
409 /*****************************************************************************/
|
paulo@0
|
410
|
paulo@0
|
411 /*
|
paulo@0
|
412 * Get more data to write.
|
paulo@0
|
413 */
|
paulo@0
|
414 static tx_status_t get_buffers (struct tx_layer *tx,
|
paulo@0
|
415 struct tx_deflate *tx_deflate)
|
paulo@0
|
416 {
|
paulo@0
|
417 if (tx_deflate->buf && io_buf_write_avail (tx_deflate->buf) == 0)
|
paulo@0
|
418 return TX_OK;
|
paulo@0
|
419
|
paulo@0
|
420 return gt_tx_layer_ready (tx);
|
paulo@0
|
421 }
|
paulo@0
|
422
|
paulo@0
|
423 /*
|
paulo@0
|
424 * This is the most complicated part of the whole stack:
|
paulo@0
|
425 *
|
paulo@0
|
426 * [1] Call upper layer's ready routine to grab a buffer (gt_tx_layer_ready).
|
paulo@0
|
427 *
|
paulo@0
|
428 * [2] That function will call tx_deflate_queue, which compresses the data to
|
paulo@0
|
429 * a buffer, as many times as it can while there's more data to process.
|
paulo@0
|
430 *
|
paulo@0
|
431 * [3] If we didn't fill the buffer, or there was no data, return TX_EMPTY
|
paulo@0
|
432 * telling the lower layer there is no data.
|
paulo@0
|
433 *
|
paulo@0
|
434 * [4] If there's no data in the upper layer, but we're in flush mode, call
|
paulo@0
|
435 * flush_stream() to send whatever data is stored inside the z_stream,
|
paulo@0
|
436 * and stop.
|
paulo@0
|
437 *
|
paulo@0
|
438 * [5] If we filled the buffer, or if we have a paritally filled buffer that
|
paulo@0
|
439 * was delayed in deflate_nagle_timeout(), send it to the lower layer with
|
paulo@0
|
440 * flush_buffer(). If the lower layer returns TX_FULL, stop and return
|
paulo@0
|
441 * TX_OK. Otherwise, continue by calling this function recursively.
|
paulo@0
|
442 *
|
paulo@0
|
443 * NOTE: The buffer is filled in tx_deflate_queue but sent in this
|
paulo@0
|
444 * function (or from the Nagle timer if the buffer isn't full).
|
paulo@0
|
445 *
|
paulo@0
|
446 * The caller of this function has to setup a Nagle timer if any data was
|
paulo@0
|
447 * written and TX_FULL was not encountered.
|
paulo@0
|
448 */
|
paulo@0
|
449 static tx_status_t service_deflate (struct tx_layer *tx,
|
paulo@0
|
450 struct tx_deflate *tx_deflate)
|
paulo@0
|
451 {
|
paulo@0
|
452 tx_status_t ret;
|
paulo@0
|
453
|
paulo@0
|
454 DEFLATE_TRACEFN(tx);
|
paulo@0
|
455
|
paulo@0
|
456 /* [1] + [2] */
|
paulo@0
|
457 ret = get_buffers (tx, tx_deflate);
|
paulo@0
|
458
|
paulo@0
|
459 if (ret == TX_ERROR)
|
paulo@0
|
460 return TX_ERROR;
|
paulo@0
|
461
|
paulo@0
|
462 /* [3] */
|
paulo@0
|
463 if (ret == TX_EMPTY)
|
paulo@0
|
464 {
|
paulo@0
|
465 assert (ret == TX_EMPTY);
|
paulo@0
|
466
|
paulo@0
|
467 /* [4]: continue flush even if no data avail */
|
paulo@0
|
468 if (tx_deflate->flushing)
|
paulo@0
|
469 ret = flush_stream (tx, tx_deflate);
|
paulo@0
|
470
|
paulo@0
|
471 return ret;
|
paulo@0
|
472 }
|
paulo@0
|
473
|
paulo@0
|
474 assert (tx_deflate->buf != NULL);
|
paulo@0
|
475
|
paulo@0
|
476 if (DEFLATE_DEBUG)
|
paulo@0
|
477 {
|
paulo@0
|
478 if (tx_deflate->delayed)
|
paulo@0
|
479 {
|
paulo@0
|
480 GT->DBGSOCK (GT, tx->stack->c, "flushing delayed buf(%d)",
|
paulo@0
|
481 io_buf_read_avail (tx_deflate->buf));
|
paulo@0
|
482 }
|
paulo@0
|
483 }
|
paulo@0
|
484
|
paulo@0
|
485 assert (ret == TX_OK);
|
paulo@0
|
486
|
paulo@0
|
487 /*
|
paulo@0
|
488 * [5]
|
paulo@0
|
489 *
|
paulo@0
|
490 * flush_buffer will stop the Nagle timer if the buffer was
|
paulo@0
|
491 * successfully sent.
|
paulo@0
|
492 *
|
paulo@0
|
493 * We must also flush the buffer if it contains partial data from a
|
paulo@0
|
494 * previous flush that was delayed in the Nagle timer due to having no
|
paulo@0
|
495 * space.
|
paulo@0
|
496 */
|
paulo@0
|
497 if (tx_deflate->delayed || io_buf_write_avail (tx_deflate->buf) == 0)
|
paulo@0
|
498 ret = flush_buffer (tx, tx_deflate);
|
paulo@0
|
499
|
paulo@0
|
500 if (ret != TX_OK)
|
paulo@0
|
501 return ret;
|
paulo@0
|
502
|
paulo@0
|
503 /* tail recurse until the lower layer is saturated */
|
paulo@0
|
504 return service_deflate (tx, tx_deflate);
|
paulo@0
|
505 }
|
paulo@0
|
506
|
paulo@0
|
507 /*
|
paulo@0
|
508 * The lower layer is ready to write.
|
paulo@0
|
509 */
|
paulo@0
|
510 static tx_status_t tx_deflate_ready (struct tx_layer *tx)
|
paulo@0
|
511 {
|
paulo@0
|
512 struct tx_deflate *tx_deflate = tx->udata;
|
paulo@0
|
513 size_t old_flushed;
|
paulo@0
|
514 tx_status_t ret;
|
paulo@0
|
515
|
paulo@0
|
516 /* keep track of how much was previously flushed */
|
paulo@0
|
517 old_flushed = tx_deflate->nbytes_flushed;
|
paulo@0
|
518
|
paulo@0
|
519 ret = service_deflate (tx, tx_deflate);
|
paulo@0
|
520
|
paulo@0
|
521 if (ret == TX_ERROR || ret == TX_FULL)
|
paulo@0
|
522 {
|
paulo@0
|
523 if (ret == TX_FULL)
|
paulo@0
|
524 {
|
paulo@0
|
525 /* flush buffer shouldve deactivated the Nagle timer */
|
paulo@0
|
526 assert (tx_deflate->nagle_timer == 0);
|
paulo@0
|
527
|
paulo@0
|
528 /* we wrote something -- let caller know it's ok */
|
paulo@0
|
529 ret = TX_OK;
|
paulo@0
|
530 }
|
paulo@0
|
531
|
paulo@0
|
532 return ret;
|
paulo@0
|
533 }
|
paulo@0
|
534
|
paulo@0
|
535 assert (ret == TX_OK || ret == TX_EMPTY);
|
paulo@0
|
536
|
paulo@0
|
537 /*
|
paulo@0
|
538 * If the lower layer was not saturated (evidenced by _not_ returning
|
paulo@0
|
539 * TX_FULL), and there is a partially completed buffer, the Nagle
|
paulo@0
|
540 * timer must be armed. This ensures the data waiting in this layer will
|
paulo@0
|
541 * go out in a timely manner. If the lower layer was saturated, we don't
|
paulo@0
|
542 * need to arm the timer because there is no buffer space to flush to
|
paulo@0
|
543 * anyway, and when the lower layer unsaturates it will reinvoke this
|
paulo@0
|
544 * layer to write more data.
|
paulo@0
|
545 *
|
paulo@0
|
546 * TODO: Still need to flush if there is some urgent data waiting. So,
|
paulo@0
|
547 * should add a ->flush callback.
|
paulo@0
|
548 *
|
paulo@0
|
549 * XXX: Using tx_deflate->buf != NULL as a hacky way to recognize that
|
paulo@0
|
550 * some data was written to the z_stream.
|
paulo@0
|
551 */
|
paulo@0
|
552 if (tx_deflate->buf != NULL)
|
paulo@0
|
553 start_nagle_timer (tx, tx_deflate);
|
paulo@0
|
554
|
paulo@0
|
555 if (DEFLATE_DEBUG)
|
paulo@0
|
556 {
|
paulo@0
|
557 GT->DBGSOCK (GT, tx->stack->c, "buf waiting=[%d] ret=%s",
|
paulo@0
|
558 tx_deflate->buf ? io_buf_read_avail (tx_deflate->buf) : 0,
|
paulo@0
|
559 ret == TX_EMPTY ? "TX_EMPTY" : "TX_OK");
|
paulo@0
|
560 }
|
paulo@0
|
561
|
paulo@0
|
562 DEFLATE_DUMP(tx_deflate);
|
paulo@0
|
563
|
paulo@0
|
564 /*
|
paulo@0
|
565 * For the return value from this function, decipher whether
|
paulo@0
|
566 * service_deflate() wrote some data.
|
paulo@0
|
567 *
|
paulo@0
|
568 * If nothing was written, then we should stop sending now, by returning
|
paulo@0
|
569 * TX_EMPTY. That will remove the input in tx_link.c that's calling this
|
paulo@0
|
570 * layer, which kind of sucks, because this could be the case a lot of the
|
paulo@0
|
571 * time when the whole buffer hasn't been filled up, leading to a removing
|
paulo@0
|
572 * and adding the input a lot.
|
paulo@0
|
573 *
|
paulo@0
|
574 * Otherwise, return TX_OK if something was sent to the lower layer.
|
paulo@0
|
575 */
|
paulo@0
|
576 if (old_flushed == tx_deflate->nbytes_flushed)
|
paulo@0
|
577 return TX_EMPTY;
|
paulo@0
|
578
|
paulo@0
|
579 return TX_OK;
|
paulo@0
|
580 }
|
paulo@0
|
581
|
paulo@0
|
582 /*****************************************************************************/
|
paulo@0
|
583
|
paulo@0
|
584 static BOOL tx_deflate_init (struct tx_layer *tx)
|
paulo@0
|
585 {
|
paulo@0
|
586 struct tx_deflate *tx_deflate;
|
paulo@0
|
587
|
paulo@0
|
588 if (!(tx_deflate = malloc (sizeof(*tx_deflate))))
|
paulo@0
|
589 return FALSE;
|
paulo@0
|
590
|
paulo@0
|
591 /* zlib documents these variables as needing initialization before
|
paulo@0
|
592 * deflateInit() */
|
paulo@0
|
593 tx_deflate->z.zalloc = Z_NULL;
|
paulo@0
|
594 tx_deflate->z.zfree = Z_NULL;
|
paulo@0
|
595 tx_deflate->z.opaque = Z_NULL;
|
paulo@0
|
596
|
paulo@0
|
597 if (deflateInit (&tx_deflate->z, Z_DEFAULT_COMPRESSION) != Z_OK)
|
paulo@0
|
598 {
|
paulo@0
|
599 FREE (tx_deflate);
|
paulo@0
|
600 return FALSE;
|
paulo@0
|
601 }
|
paulo@0
|
602
|
paulo@0
|
603 tx_deflate->buf = NULL;
|
paulo@0
|
604 tx_deflate->nagle_timer = 0;
|
paulo@0
|
605 tx_deflate->nbytes_in = 0;
|
paulo@0
|
606 tx_deflate->nbytes_out = 0;
|
paulo@0
|
607 tx_deflate->nbytes_flushed = 0;
|
paulo@0
|
608 tx_deflate->nbytes_unflushed = 0;
|
paulo@0
|
609 tx_deflate->flushing = FALSE;
|
paulo@0
|
610 tx_deflate->delayed = FALSE;
|
paulo@0
|
611
|
paulo@0
|
612 tx->udata = tx_deflate;
|
paulo@0
|
613 return TRUE;
|
paulo@0
|
614 }
|
paulo@0
|
615
|
paulo@0
|
616 static void tx_deflate_destroy (struct tx_layer *tx)
|
paulo@0
|
617 {
|
paulo@0
|
618 struct tx_deflate *tx_deflate = tx->udata;
|
paulo@0
|
619
|
paulo@0
|
620 io_buf_free (tx_deflate->buf);
|
paulo@0
|
621 timer_remove (tx_deflate->nagle_timer);
|
paulo@0
|
622
|
paulo@0
|
623 deflateEnd (&tx_deflate->z);
|
paulo@0
|
624 FREE (tx_deflate);
|
paulo@0
|
625 }
|
paulo@0
|
626
|
paulo@0
|
627 /*****************************************************************************/
|
paulo@0
|
628
|
paulo@0
|
629 struct tx_layer_ops gt_tx_deflate_ops =
|
paulo@0
|
630 {
|
paulo@0
|
631 tx_deflate_init,
|
paulo@0
|
632 tx_deflate_destroy,
|
paulo@0
|
633 tx_deflate_toggle,
|
paulo@0
|
634 tx_deflate_queue,
|
paulo@0
|
635 tx_deflate_ready,
|
paulo@0
|
636 tx_deflate_enable,
|
paulo@0
|
637 tx_deflate_disable,
|
paulo@0
|
638 };
|