view src/io/tx_link.c @ 0:d39e1d0d75b6

initial add
author paulo@hit-nxdomain.opendns.com
date Sat, 20 Feb 2010 21:18:28 -0800
parents
children
line source
1 /*
2 * $Id: tx_link.c,v 1.10 2004/05/02 08:55:00 hipnod Exp $
3 *
4 * Copyright (C) 2003 giFT project (gift.sourceforge.net)
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2, or (at your option) any
9 * later version.
10 *
11 * This program is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
15 */
17 #include "gt_gnutella.h"
19 #include "io/tx_stack.h"
20 #include "io/tx_layer.h"
21 #include "io/io_buf.h"
23 /*****************************************************************************/
25 #define LINK_DEBUG 0
27 /*****************************************************************************/
29 struct tx_link
30 {
31 input_id id;
32 struct io_buf *buf;
33 #if 0
34 TCPC *c;
35 #endif
36 };
38 /*****************************************************************************/
40 static void deactivate_queue (struct tx_layer *tx);
41 static void activate_queue (struct tx_layer *tx);
43 /*****************************************************************************/
45 static BOOL tx_link_init (struct tx_layer *tx)
46 {
47 struct tx_link *tx_link;
49 if (!(tx_link = NEW (struct tx_link)))
50 return FALSE;
52 tx_link->id = 0;
53 tx_link->buf = NULL;
55 /* store our layer-specific info in the toplevel layer */
56 tx->udata = tx_link;
58 return TRUE;
59 }
61 static void tx_link_destroy (struct tx_layer *tx)
62 {
63 struct tx_link *tx_link = tx->udata;
65 input_remove (tx_link->id);
66 tx_link->id = 0;
68 io_buf_free (tx_link->buf);
70 FREE (tx_link);
71 }
73 /*****************************************************************************/
75 static const char *tx_status_str (tx_status_t ret)
76 {
77 switch (ret)
78 {
79 case TX_EMPTY: return "TX_EMPTY";
80 case TX_FULL: return "TX_FULL";
81 case TX_ERROR: return "TX_ERROR";
82 case TX_OK: return "TX_OK";
83 case TX_PARTIAL: return "TX_PARTIAL";
84 default: return "TX_UNKNOWN";
85 }
86 }
88 static tx_status_t request_more_data (struct tx_layer *tx)
89 {
90 int ret;
92 /*
93 * Ask the layer above this one to send this layer data.
94 */
95 if ((ret = gt_tx_layer_ready (tx)) == TX_ERROR)
96 return TX_ERROR;
98 if (LINK_DEBUG)
99 GT->DBGSOCK (GT, tx->stack->c, "ret=%s", tx_status_str (ret));
101 return ret;
102 }
104 static tx_status_t tx_link_process (struct tx_layer *tx, struct io_buf *io_buf)
105 {
106 uint8_t *ptr;
107 size_t len;
108 int n;
110 ptr = io_buf_read_ptr (io_buf);
111 len = io_buf_read_avail (io_buf);
113 /*
114 * gt_tx_stack_send() calls tcp_send() to send the data on the
115 * connection. This is done because no interface for passing parameters
116 * like a TCPC is exposed anywhere to users of GtTxStack.
117 */
118 if ((n = gt_tx_stack_send (tx->stack, ptr, len)) <= 0)
119 return TX_ERROR;
121 /*
122 * Pop whatever bytes were written off the buffer. This may be less than
123 * the the whole buffer in the case of a short write. In that case we
124 * don't remove the buffer, but continue writing it later.
125 */
126 io_buf_pop (io_buf, n);
128 return TX_OK;
129 }
131 /*
132 * The packet-sending input callback.
133 */
134 static void tx_link_send_data (int fd, input_id id, struct tx_layer *tx)
135 {
136 struct tx_link *tx_link = tx->udata;
137 struct io_buf *io_buf;
138 size_t len;
140 /*
141 * If there's no data to write, request more from the upper layer.
142 */
143 if (!(io_buf = tx_link->buf))
144 {
145 tx_status_t ret;
147 if ((ret = request_more_data (tx)) == TX_ERROR)
148 {
149 gt_tx_stack_abort (tx->stack);
150 return;
151 }
153 /* deactivate the queue until more data arrives */
154 if (ret == TX_EMPTY)
155 {
156 if (LINK_DEBUG)
157 GT->DBGSOCK (GT, tx->stack->c, "empty, deactivating");
159 assert (tx_link->buf == NULL);
160 deactivate_queue (tx);
161 return;
162 }
164 /* upper layer must have called our queue function */
165 assert (tx_link->buf != NULL);
166 io_buf = tx_link->buf;
168 /* fall through and send the buffer */
169 }
171 len = io_buf_read_avail (io_buf);
173 /*
174 * It is safe to abort the tx-stack if we encountered an error because
175 * there are no other callers into it currently.
176 */
177 if (tx_link_process (tx, io_buf) == TX_ERROR)
178 {
179 gt_tx_stack_abort (tx->stack);
180 return;
181 }
183 if (io_buf_read_avail (io_buf) > 0)
184 {
185 assert (io_buf_read_avail (io_buf) < len);
186 return;
187 }
189 /*
190 * The complete buffer was written. This input callback will continue
191 * grabbing data from the upper layer until gt_tx_layer_ready() returns
192 * TX_EMPTY or TX_ERROR.
193 */
194 io_buf_free (io_buf);
195 tx_link->buf = NULL;
196 }
198 static void activate_queue (struct tx_layer *tx)
199 {
200 struct tx_link *tx_link = tx->udata;
202 /* skip if input already active */
203 if (tx_link->id)
204 return;
206 tx_link->id = input_add (tx->stack->c->fd, tx, INPUT_WRITE,
207 (InputCallback)tx_link_send_data, 0);
208 }
210 static void deactivate_queue (struct tx_layer *tx)
211 {
212 struct tx_link *tx_link = tx->udata;
214 if (!tx_link->id)
215 return;
217 input_remove (tx_link->id);
218 tx_link->id = 0;
219 }
221 /* begin or end consuming data in this layer */
222 static void tx_link_toggle (struct tx_layer *tx, BOOL stop)
223 {
224 if (stop)
225 deactivate_queue (tx);
226 else
227 activate_queue (tx);
228 }
230 /*****************************************************************************/
232 static tx_status_t tx_link_queue (struct tx_layer *tx, struct io_buf *io_buf)
233 {
234 struct tx_link *tx_link = tx->udata;
236 if (tx_link->buf != NULL)
237 {
238 /* this layer is "saturated" with its single packet */
239 return TX_FULL;
240 }
242 tx_link->buf = io_buf;
243 activate_queue (tx);
245 /* TODO: need to change this if we change to writing as much
246 * as possible synchronously instead of one message from the handler */
247 return TX_OK;
248 }
250 static tx_status_t tx_link_ready (struct tx_layer *tx)
251 {
252 abort (); /* can't handle layers underneath us */
253 return TX_ERROR;
254 }
256 /*****************************************************************************/
258 static void tx_link_enable (struct tx_layer *tx)
259 {
260 activate_queue (tx);
261 }
263 static void tx_link_disable (struct tx_layer *tx)
264 {
265 deactivate_queue (tx);
266 }
268 /*****************************************************************************/
270 struct tx_layer_ops gt_tx_link_ops =
271 {
272 tx_link_init,
273 tx_link_destroy,
274 tx_link_toggle,
275 tx_link_queue,
276 tx_link_ready,
277 tx_link_enable,
278 tx_link_disable,
279 };