paulo@0: /* paulo@0: * $Id: tx_link.c,v 1.10 2004/05/02 08:55:00 hipnod Exp $ paulo@0: * paulo@0: * Copyright (C) 2003 giFT project (gift.sourceforge.net) paulo@0: * paulo@0: * This program is free software; you can redistribute it and/or modify it paulo@0: * under the terms of the GNU General Public License as published by the paulo@0: * Free Software Foundation; either version 2, or (at your option) any paulo@0: * later version. paulo@0: * paulo@0: * This program is distributed in the hope that it will be useful, but paulo@0: * WITHOUT ANY WARRANTY; without even the implied warranty of paulo@0: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU paulo@0: * General Public License for more details. paulo@0: */ paulo@0: paulo@0: #include "gt_gnutella.h" paulo@0: paulo@0: #include "io/tx_stack.h" paulo@0: #include "io/tx_layer.h" paulo@0: #include "io/io_buf.h" paulo@0: paulo@0: /*****************************************************************************/ paulo@0: paulo@0: #define LINK_DEBUG 0 paulo@0: paulo@0: /*****************************************************************************/ paulo@0: paulo@0: struct tx_link paulo@0: { paulo@0: input_id id; paulo@0: struct io_buf *buf; paulo@0: #if 0 paulo@0: TCPC *c; paulo@0: #endif paulo@0: }; paulo@0: paulo@0: /*****************************************************************************/ paulo@0: paulo@0: static void deactivate_queue (struct tx_layer *tx); paulo@0: static void activate_queue (struct tx_layer *tx); paulo@0: paulo@0: /*****************************************************************************/ paulo@0: paulo@0: static BOOL tx_link_init (struct tx_layer *tx) paulo@0: { paulo@0: struct tx_link *tx_link; paulo@0: paulo@0: if (!(tx_link = NEW (struct tx_link))) paulo@0: return FALSE; paulo@0: paulo@0: tx_link->id = 0; paulo@0: tx_link->buf = NULL; paulo@0: paulo@0: /* store our layer-specific info in the toplevel layer */ paulo@0: tx->udata = tx_link; paulo@0: paulo@0: return TRUE; paulo@0: } paulo@0: paulo@0: static void tx_link_destroy (struct tx_layer *tx) paulo@0: { paulo@0: struct tx_link *tx_link = tx->udata; paulo@0: paulo@0: input_remove (tx_link->id); paulo@0: tx_link->id = 0; paulo@0: paulo@0: io_buf_free (tx_link->buf); paulo@0: paulo@0: FREE (tx_link); paulo@0: } paulo@0: paulo@0: /*****************************************************************************/ paulo@0: paulo@0: static const char *tx_status_str (tx_status_t ret) paulo@0: { paulo@0: switch (ret) paulo@0: { paulo@0: case TX_EMPTY: return "TX_EMPTY"; paulo@0: case TX_FULL: return "TX_FULL"; paulo@0: case TX_ERROR: return "TX_ERROR"; paulo@0: case TX_OK: return "TX_OK"; paulo@0: case TX_PARTIAL: return "TX_PARTIAL"; paulo@0: default: return "TX_UNKNOWN"; paulo@0: } paulo@0: } paulo@0: paulo@0: static tx_status_t request_more_data (struct tx_layer *tx) paulo@0: { paulo@0: int ret; paulo@0: paulo@0: /* paulo@0: * Ask the layer above this one to send this layer data. paulo@0: */ paulo@0: if ((ret = gt_tx_layer_ready (tx)) == TX_ERROR) paulo@0: return TX_ERROR; paulo@0: paulo@0: if (LINK_DEBUG) paulo@0: GT->DBGSOCK (GT, tx->stack->c, "ret=%s", tx_status_str (ret)); paulo@0: paulo@0: return ret; paulo@0: } paulo@0: paulo@0: static tx_status_t tx_link_process (struct tx_layer *tx, struct io_buf *io_buf) paulo@0: { paulo@0: uint8_t *ptr; paulo@0: size_t len; paulo@0: int n; paulo@0: paulo@0: ptr = io_buf_read_ptr (io_buf); paulo@0: len = io_buf_read_avail (io_buf); paulo@0: paulo@0: /* paulo@0: * gt_tx_stack_send() calls tcp_send() to send the data on the paulo@0: * connection. This is done because no interface for passing parameters paulo@0: * like a TCPC is exposed anywhere to users of GtTxStack. paulo@0: */ paulo@0: if ((n = gt_tx_stack_send (tx->stack, ptr, len)) <= 0) paulo@0: return TX_ERROR; paulo@0: paulo@0: /* paulo@0: * Pop whatever bytes were written off the buffer. This may be less than paulo@0: * the the whole buffer in the case of a short write. In that case we paulo@0: * don't remove the buffer, but continue writing it later. paulo@0: */ paulo@0: io_buf_pop (io_buf, n); paulo@0: paulo@0: return TX_OK; paulo@0: } paulo@0: paulo@0: /* paulo@0: * The packet-sending input callback. paulo@0: */ paulo@0: static void tx_link_send_data (int fd, input_id id, struct tx_layer *tx) paulo@0: { paulo@0: struct tx_link *tx_link = tx->udata; paulo@0: struct io_buf *io_buf; paulo@0: size_t len; paulo@0: paulo@0: /* paulo@0: * If there's no data to write, request more from the upper layer. paulo@0: */ paulo@0: if (!(io_buf = tx_link->buf)) paulo@0: { paulo@0: tx_status_t ret; paulo@0: paulo@0: if ((ret = request_more_data (tx)) == TX_ERROR) paulo@0: { paulo@0: gt_tx_stack_abort (tx->stack); paulo@0: return; paulo@0: } paulo@0: paulo@0: /* deactivate the queue until more data arrives */ paulo@0: if (ret == TX_EMPTY) paulo@0: { paulo@0: if (LINK_DEBUG) paulo@0: GT->DBGSOCK (GT, tx->stack->c, "empty, deactivating"); paulo@0: paulo@0: assert (tx_link->buf == NULL); paulo@0: deactivate_queue (tx); paulo@0: return; paulo@0: } paulo@0: paulo@0: /* upper layer must have called our queue function */ paulo@0: assert (tx_link->buf != NULL); paulo@0: io_buf = tx_link->buf; paulo@0: paulo@0: /* fall through and send the buffer */ paulo@0: } paulo@0: paulo@0: len = io_buf_read_avail (io_buf); paulo@0: paulo@0: /* paulo@0: * It is safe to abort the tx-stack if we encountered an error because paulo@0: * there are no other callers into it currently. paulo@0: */ paulo@0: if (tx_link_process (tx, io_buf) == TX_ERROR) paulo@0: { paulo@0: gt_tx_stack_abort (tx->stack); paulo@0: return; paulo@0: } paulo@0: paulo@0: if (io_buf_read_avail (io_buf) > 0) paulo@0: { paulo@0: assert (io_buf_read_avail (io_buf) < len); paulo@0: return; paulo@0: } paulo@0: paulo@0: /* paulo@0: * The complete buffer was written. This input callback will continue paulo@0: * grabbing data from the upper layer until gt_tx_layer_ready() returns paulo@0: * TX_EMPTY or TX_ERROR. paulo@0: */ paulo@0: io_buf_free (io_buf); paulo@0: tx_link->buf = NULL; paulo@0: } paulo@0: paulo@0: static void activate_queue (struct tx_layer *tx) paulo@0: { paulo@0: struct tx_link *tx_link = tx->udata; paulo@0: paulo@0: /* skip if input already active */ paulo@0: if (tx_link->id) paulo@0: return; paulo@0: paulo@0: tx_link->id = input_add (tx->stack->c->fd, tx, INPUT_WRITE, paulo@0: (InputCallback)tx_link_send_data, 0); paulo@0: } paulo@0: paulo@0: static void deactivate_queue (struct tx_layer *tx) paulo@0: { paulo@0: struct tx_link *tx_link = tx->udata; paulo@0: paulo@0: if (!tx_link->id) paulo@0: return; paulo@0: paulo@0: input_remove (tx_link->id); paulo@0: tx_link->id = 0; paulo@0: } paulo@0: paulo@0: /* begin or end consuming data in this layer */ paulo@0: static void tx_link_toggle (struct tx_layer *tx, BOOL stop) paulo@0: { paulo@0: if (stop) paulo@0: deactivate_queue (tx); paulo@0: else paulo@0: activate_queue (tx); paulo@0: } paulo@0: paulo@0: /*****************************************************************************/ paulo@0: paulo@0: static tx_status_t tx_link_queue (struct tx_layer *tx, struct io_buf *io_buf) paulo@0: { paulo@0: struct tx_link *tx_link = tx->udata; paulo@0: paulo@0: if (tx_link->buf != NULL) paulo@0: { paulo@0: /* this layer is "saturated" with its single packet */ paulo@0: return TX_FULL; paulo@0: } paulo@0: paulo@0: tx_link->buf = io_buf; paulo@0: activate_queue (tx); paulo@0: paulo@0: /* TODO: need to change this if we change to writing as much paulo@0: * as possible synchronously instead of one message from the handler */ paulo@0: return TX_OK; paulo@0: } paulo@0: paulo@0: static tx_status_t tx_link_ready (struct tx_layer *tx) paulo@0: { paulo@0: abort (); /* can't handle layers underneath us */ paulo@0: return TX_ERROR; paulo@0: } paulo@0: paulo@0: /*****************************************************************************/ paulo@0: paulo@0: static void tx_link_enable (struct tx_layer *tx) paulo@0: { paulo@0: activate_queue (tx); paulo@0: } paulo@0: paulo@0: static void tx_link_disable (struct tx_layer *tx) paulo@0: { paulo@0: deactivate_queue (tx); paulo@0: } paulo@0: paulo@0: /*****************************************************************************/ paulo@0: paulo@0: struct tx_layer_ops gt_tx_link_ops = paulo@0: { paulo@0: tx_link_init, paulo@0: tx_link_destroy, paulo@0: tx_link_toggle, paulo@0: tx_link_queue, paulo@0: tx_link_ready, paulo@0: tx_link_enable, paulo@0: tx_link_disable, paulo@0: };