annotate src/io/tx_link.c @ 0:d39e1d0d75b6

initial add
author paulo@hit-nxdomain.opendns.com
date Sat, 20 Feb 2010 21:18:28 -0800
parents
children
rev   line source
paulo@0 1 /*
paulo@0 2 * $Id: tx_link.c,v 1.10 2004/05/02 08:55:00 hipnod Exp $
paulo@0 3 *
paulo@0 4 * Copyright (C) 2003 giFT project (gift.sourceforge.net)
paulo@0 5 *
paulo@0 6 * This program is free software; you can redistribute it and/or modify it
paulo@0 7 * under the terms of the GNU General Public License as published by the
paulo@0 8 * Free Software Foundation; either version 2, or (at your option) any
paulo@0 9 * later version.
paulo@0 10 *
paulo@0 11 * This program is distributed in the hope that it will be useful, but
paulo@0 12 * WITHOUT ANY WARRANTY; without even the implied warranty of
paulo@0 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
paulo@0 14 * General Public License for more details.
paulo@0 15 */
paulo@0 16
paulo@0 17 #include "gt_gnutella.h"
paulo@0 18
paulo@0 19 #include "io/tx_stack.h"
paulo@0 20 #include "io/tx_layer.h"
paulo@0 21 #include "io/io_buf.h"
paulo@0 22
paulo@0 23 /*****************************************************************************/
paulo@0 24
paulo@0 25 #define LINK_DEBUG 0
paulo@0 26
paulo@0 27 /*****************************************************************************/
paulo@0 28
paulo@0 29 struct tx_link
paulo@0 30 {
paulo@0 31 input_id id;
paulo@0 32 struct io_buf *buf;
paulo@0 33 #if 0
paulo@0 34 TCPC *c;
paulo@0 35 #endif
paulo@0 36 };
paulo@0 37
paulo@0 38 /*****************************************************************************/
paulo@0 39
paulo@0 40 static void deactivate_queue (struct tx_layer *tx);
paulo@0 41 static void activate_queue (struct tx_layer *tx);
paulo@0 42
paulo@0 43 /*****************************************************************************/
paulo@0 44
paulo@0 45 static BOOL tx_link_init (struct tx_layer *tx)
paulo@0 46 {
paulo@0 47 struct tx_link *tx_link;
paulo@0 48
paulo@0 49 if (!(tx_link = NEW (struct tx_link)))
paulo@0 50 return FALSE;
paulo@0 51
paulo@0 52 tx_link->id = 0;
paulo@0 53 tx_link->buf = NULL;
paulo@0 54
paulo@0 55 /* store our layer-specific info in the toplevel layer */
paulo@0 56 tx->udata = tx_link;
paulo@0 57
paulo@0 58 return TRUE;
paulo@0 59 }
paulo@0 60
paulo@0 61 static void tx_link_destroy (struct tx_layer *tx)
paulo@0 62 {
paulo@0 63 struct tx_link *tx_link = tx->udata;
paulo@0 64
paulo@0 65 input_remove (tx_link->id);
paulo@0 66 tx_link->id = 0;
paulo@0 67
paulo@0 68 io_buf_free (tx_link->buf);
paulo@0 69
paulo@0 70 FREE (tx_link);
paulo@0 71 }
paulo@0 72
paulo@0 73 /*****************************************************************************/
paulo@0 74
paulo@0 75 static const char *tx_status_str (tx_status_t ret)
paulo@0 76 {
paulo@0 77 switch (ret)
paulo@0 78 {
paulo@0 79 case TX_EMPTY: return "TX_EMPTY";
paulo@0 80 case TX_FULL: return "TX_FULL";
paulo@0 81 case TX_ERROR: return "TX_ERROR";
paulo@0 82 case TX_OK: return "TX_OK";
paulo@0 83 case TX_PARTIAL: return "TX_PARTIAL";
paulo@0 84 default: return "TX_UNKNOWN";
paulo@0 85 }
paulo@0 86 }
paulo@0 87
paulo@0 88 static tx_status_t request_more_data (struct tx_layer *tx)
paulo@0 89 {
paulo@0 90 int ret;
paulo@0 91
paulo@0 92 /*
paulo@0 93 * Ask the layer above this one to send this layer data.
paulo@0 94 */
paulo@0 95 if ((ret = gt_tx_layer_ready (tx)) == TX_ERROR)
paulo@0 96 return TX_ERROR;
paulo@0 97
paulo@0 98 if (LINK_DEBUG)
paulo@0 99 GT->DBGSOCK (GT, tx->stack->c, "ret=%s", tx_status_str (ret));
paulo@0 100
paulo@0 101 return ret;
paulo@0 102 }
paulo@0 103
paulo@0 104 static tx_status_t tx_link_process (struct tx_layer *tx, struct io_buf *io_buf)
paulo@0 105 {
paulo@0 106 uint8_t *ptr;
paulo@0 107 size_t len;
paulo@0 108 int n;
paulo@0 109
paulo@0 110 ptr = io_buf_read_ptr (io_buf);
paulo@0 111 len = io_buf_read_avail (io_buf);
paulo@0 112
paulo@0 113 /*
paulo@0 114 * gt_tx_stack_send() calls tcp_send() to send the data on the
paulo@0 115 * connection. This is done because no interface for passing parameters
paulo@0 116 * like a TCPC is exposed anywhere to users of GtTxStack.
paulo@0 117 */
paulo@0 118 if ((n = gt_tx_stack_send (tx->stack, ptr, len)) <= 0)
paulo@0 119 return TX_ERROR;
paulo@0 120
paulo@0 121 /*
paulo@0 122 * Pop whatever bytes were written off the buffer. This may be less than
paulo@0 123 * the the whole buffer in the case of a short write. In that case we
paulo@0 124 * don't remove the buffer, but continue writing it later.
paulo@0 125 */
paulo@0 126 io_buf_pop (io_buf, n);
paulo@0 127
paulo@0 128 return TX_OK;
paulo@0 129 }
paulo@0 130
paulo@0 131 /*
paulo@0 132 * The packet-sending input callback.
paulo@0 133 */
paulo@0 134 static void tx_link_send_data (int fd, input_id id, struct tx_layer *tx)
paulo@0 135 {
paulo@0 136 struct tx_link *tx_link = tx->udata;
paulo@0 137 struct io_buf *io_buf;
paulo@0 138 size_t len;
paulo@0 139
paulo@0 140 /*
paulo@0 141 * If there's no data to write, request more from the upper layer.
paulo@0 142 */
paulo@0 143 if (!(io_buf = tx_link->buf))
paulo@0 144 {
paulo@0 145 tx_status_t ret;
paulo@0 146
paulo@0 147 if ((ret = request_more_data (tx)) == TX_ERROR)
paulo@0 148 {
paulo@0 149 gt_tx_stack_abort (tx->stack);
paulo@0 150 return;
paulo@0 151 }
paulo@0 152
paulo@0 153 /* deactivate the queue until more data arrives */
paulo@0 154 if (ret == TX_EMPTY)
paulo@0 155 {
paulo@0 156 if (LINK_DEBUG)
paulo@0 157 GT->DBGSOCK (GT, tx->stack->c, "empty, deactivating");
paulo@0 158
paulo@0 159 assert (tx_link->buf == NULL);
paulo@0 160 deactivate_queue (tx);
paulo@0 161 return;
paulo@0 162 }
paulo@0 163
paulo@0 164 /* upper layer must have called our queue function */
paulo@0 165 assert (tx_link->buf != NULL);
paulo@0 166 io_buf = tx_link->buf;
paulo@0 167
paulo@0 168 /* fall through and send the buffer */
paulo@0 169 }
paulo@0 170
paulo@0 171 len = io_buf_read_avail (io_buf);
paulo@0 172
paulo@0 173 /*
paulo@0 174 * It is safe to abort the tx-stack if we encountered an error because
paulo@0 175 * there are no other callers into it currently.
paulo@0 176 */
paulo@0 177 if (tx_link_process (tx, io_buf) == TX_ERROR)
paulo@0 178 {
paulo@0 179 gt_tx_stack_abort (tx->stack);
paulo@0 180 return;
paulo@0 181 }
paulo@0 182
paulo@0 183 if (io_buf_read_avail (io_buf) > 0)
paulo@0 184 {
paulo@0 185 assert (io_buf_read_avail (io_buf) < len);
paulo@0 186 return;
paulo@0 187 }
paulo@0 188
paulo@0 189 /*
paulo@0 190 * The complete buffer was written. This input callback will continue
paulo@0 191 * grabbing data from the upper layer until gt_tx_layer_ready() returns
paulo@0 192 * TX_EMPTY or TX_ERROR.
paulo@0 193 */
paulo@0 194 io_buf_free (io_buf);
paulo@0 195 tx_link->buf = NULL;
paulo@0 196 }
paulo@0 197
paulo@0 198 static void activate_queue (struct tx_layer *tx)
paulo@0 199 {
paulo@0 200 struct tx_link *tx_link = tx->udata;
paulo@0 201
paulo@0 202 /* skip if input already active */
paulo@0 203 if (tx_link->id)
paulo@0 204 return;
paulo@0 205
paulo@0 206 tx_link->id = input_add (tx->stack->c->fd, tx, INPUT_WRITE,
paulo@0 207 (InputCallback)tx_link_send_data, 0);
paulo@0 208 }
paulo@0 209
paulo@0 210 static void deactivate_queue (struct tx_layer *tx)
paulo@0 211 {
paulo@0 212 struct tx_link *tx_link = tx->udata;
paulo@0 213
paulo@0 214 if (!tx_link->id)
paulo@0 215 return;
paulo@0 216
paulo@0 217 input_remove (tx_link->id);
paulo@0 218 tx_link->id = 0;
paulo@0 219 }
paulo@0 220
paulo@0 221 /* begin or end consuming data in this layer */
paulo@0 222 static void tx_link_toggle (struct tx_layer *tx, BOOL stop)
paulo@0 223 {
paulo@0 224 if (stop)
paulo@0 225 deactivate_queue (tx);
paulo@0 226 else
paulo@0 227 activate_queue (tx);
paulo@0 228 }
paulo@0 229
paulo@0 230 /*****************************************************************************/
paulo@0 231
paulo@0 232 static tx_status_t tx_link_queue (struct tx_layer *tx, struct io_buf *io_buf)
paulo@0 233 {
paulo@0 234 struct tx_link *tx_link = tx->udata;
paulo@0 235
paulo@0 236 if (tx_link->buf != NULL)
paulo@0 237 {
paulo@0 238 /* this layer is "saturated" with its single packet */
paulo@0 239 return TX_FULL;
paulo@0 240 }
paulo@0 241
paulo@0 242 tx_link->buf = io_buf;
paulo@0 243 activate_queue (tx);
paulo@0 244
paulo@0 245 /* TODO: need to change this if we change to writing as much
paulo@0 246 * as possible synchronously instead of one message from the handler */
paulo@0 247 return TX_OK;
paulo@0 248 }
paulo@0 249
paulo@0 250 static tx_status_t tx_link_ready (struct tx_layer *tx)
paulo@0 251 {
paulo@0 252 abort (); /* can't handle layers underneath us */
paulo@0 253 return TX_ERROR;
paulo@0 254 }
paulo@0 255
paulo@0 256 /*****************************************************************************/
paulo@0 257
paulo@0 258 static void tx_link_enable (struct tx_layer *tx)
paulo@0 259 {
paulo@0 260 activate_queue (tx);
paulo@0 261 }
paulo@0 262
paulo@0 263 static void tx_link_disable (struct tx_layer *tx)
paulo@0 264 {
paulo@0 265 deactivate_queue (tx);
paulo@0 266 }
paulo@0 267
paulo@0 268 /*****************************************************************************/
paulo@0 269
paulo@0 270 struct tx_layer_ops gt_tx_link_ops =
paulo@0 271 {
paulo@0 272 tx_link_init,
paulo@0 273 tx_link_destroy,
paulo@0 274 tx_link_toggle,
paulo@0 275 tx_link_queue,
paulo@0 276 tx_link_ready,
paulo@0 277 tx_link_enable,
paulo@0 278 tx_link_disable,
paulo@0 279 };