paulo@0: /* paulo@0: * $Id: tx_packet.c,v 1.10 2005/01/04 14:59:23 mkern Exp $ paulo@0: * paulo@0: * Copyright (C) 2004 giFT project (gift.sourceforge.net) paulo@0: * paulo@0: * This program is free software; you can redistribute it and/or modify it paulo@0: * under the terms of the GNU General Public License as published by the paulo@0: * Free Software Foundation; either version 2, or (at your option) any paulo@0: * later version. paulo@0: * paulo@0: * This program is distributed in the hope that it will be useful, but paulo@0: * WITHOUT ANY WARRANTY; without even the implied warranty of paulo@0: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU paulo@0: * General Public License for more details. paulo@0: */ paulo@0: paulo@0: #include "gt_gnutella.h" paulo@0: #include "gt_packet.h" /* packet manipulation macros */ paulo@0: paulo@0: #include "io/tx_stack.h" paulo@0: #include "io/tx_layer.h" paulo@0: #include "io/io_buf.h" paulo@0: paulo@0: /*****************************************************************************/ paulo@0: paulo@0: #define TX_PACKET_DEBUG 0 paulo@0: paulo@0: /*****************************************************************************/ paulo@0: paulo@0: /* paulo@0: * Relative packet priority ratios. These control how many packets of a paulo@0: * certain type are sent prior to looking for other types. For each type we paulo@0: * maintain an independent FIFO queue. Each time a packet can be sent, each paulo@0: * queue is checked. paulo@0: * paulo@0: * Packets in a queue will be sent while the ratio is greater than zero, and paulo@0: * there are no higher priority packets waiting. Once there are no queues paulo@0: * with both waiting packets and a non-zero ratio, the queue priority ratios paulo@0: * are reset so that more packets can be sent. This process continues until paulo@0: * the lower layer becomes saturated. paulo@0: * paulo@0: * Note that it is bad idea to reset the priority ratios only when all the paulo@0: * ratios are zero, because this could lead to starvation for some packet paulo@0: * types. paulo@0: * paulo@0: * Pushes have the highest priorty of normal messages. Also, there is a paulo@0: * special 'urgent' queue that has higher priority, that includes replies to paulo@0: * keepalive pings and other important high-priority messages. paulo@0: */ paulo@0: #define URGENT_RATIO INT_MAX paulo@0: #define PUSH_RATIO 5 paulo@0: #define QHIT_RATIO 4 paulo@0: #define QUERY_RATIO 3 paulo@0: #define PONG_RATIO 2 paulo@0: #define PING_RATIO 1 paulo@0: #define MISC_RATIO 1 paulo@0: paulo@0: #define NR_QUEUES (7) paulo@0: paulo@0: /*****************************************************************************/ paulo@0: paulo@0: struct packet_queue paulo@0: { paulo@0: gt_packet_type_t msg_type; paulo@0: size_t ratio; /* how many packets left on this turn? */ paulo@0: size_t bytes_queued; /* total bytes queued */ paulo@0: List *queue; paulo@0: }; paulo@0: paulo@0: struct tx_packet paulo@0: { paulo@0: struct packet_queue queues[NR_QUEUES]; paulo@0: int total_pkts; /* used to quickly test if empty */ paulo@0: }; paulo@0: paulo@0: /*****************************************************************************/ paulo@0: /* DEBUGGING/TRACING */ paulo@0: paulo@0: #if TX_PACKET_DEBUG paulo@0: /* ripped from gt_packet.c */ paulo@0: static const char *packet_command_str (uint8_t cmd) paulo@0: { paulo@0: static char buf[16]; paulo@0: paulo@0: switch (cmd) paulo@0: { paulo@0: case GT_MSG_PING: return "PING"; paulo@0: case GT_MSG_PING_REPLY: return "PONG"; paulo@0: case GT_MSG_BYE: return "BYE"; paulo@0: case GT_MSG_QUERY_ROUTE: return "QROUTE"; paulo@0: case GT_MSG_VENDOR: return "VMSG"; paulo@0: case GT_MSG_VENDOR_STD: return "VMSG-S"; paulo@0: case GT_MSG_PUSH: return "PUSH"; paulo@0: case GT_MSG_QUERY: return "QUERY"; paulo@0: case GT_MSG_QUERY_REPLY: return "HITS"; paulo@0: paulo@0: default: paulo@0: snprintf (buf, sizeof (buf), "[<%02hx>]", cmd); paulo@0: return buf; paulo@0: } paulo@0: } paulo@0: paulo@0: static void dump_packet (struct io_buf *buf, String *str) paulo@0: { paulo@0: uint8_t cmd = get_command (buf->data); paulo@0: string_appendf (str, "%s,", packet_command_str (cmd)); paulo@0: } paulo@0: paulo@0: static void trace_queue_list (List *queue, String *str) paulo@0: { paulo@0: list_foreach (queue, (ListForeachFunc)dump_packet, str); paulo@0: } paulo@0: #endif /* TX_PACKET_DEBUG */ paulo@0: paulo@0: static void trace_queue (struct tx_layer *tx, const char *id) paulo@0: { paulo@0: #if TX_PACKET_DEBUG paulo@0: struct tx_packet *tx_packet = tx->udata; paulo@0: int i; paulo@0: String *s; paulo@0: TCPC *c; paulo@0: paulo@0: if (!(s = string_new (NULL, 0, 0, TRUE))) paulo@0: return; paulo@0: paulo@0: c = tx->stack->c; paulo@0: string_appendf (s, "{%s totalpkts=%d ", id, tx_packet->total_pkts); paulo@0: paulo@0: for (i = 0; i < NR_QUEUES; i++) paulo@0: trace_queue_list (tx_packet->queues[i].queue, s); paulo@0: paulo@0: string_append (s, "}"); paulo@0: paulo@0: GT->DBGSOCK (GT, c, "%s", s->str); paulo@0: string_free (s); paulo@0: #endif paulo@0: } paulo@0: paulo@0: /*****************************************************************************/ paulo@0: paulo@0: /* return the queue on which this message should go */ paulo@0: static size_t get_queue (struct io_buf *msg) paulo@0: { paulo@0: uint8_t cmd; paulo@0: paulo@0: cmd = get_command (msg->data); paulo@0: paulo@0: switch (cmd) paulo@0: { paulo@0: default: paulo@0: abort (); paulo@0: paulo@0: case GT_MSG_VENDOR: paulo@0: case GT_MSG_VENDOR_STD: paulo@0: case GT_MSG_QUERY_ROUTE: paulo@0: return 6; paulo@0: paulo@0: case GT_MSG_PING: paulo@0: { paulo@0: /* queue keep-alive pings in the urgent queue */ paulo@0: if (get_ttl (msg->data) == 1 && get_hops (msg->data) == 0) paulo@0: return 0; paulo@0: } paulo@0: return 5; paulo@0: paulo@0: case GT_MSG_PING_REPLY: paulo@0: { paulo@0: /* paulo@0: * Queue replies to keep-alive ping in the urgent queue. paulo@0: * paulo@0: * This allows the remote end to starve it's own connection paulo@0: * with a series of keep-alive pings. Only flow-control paulo@0: * can handle this. paulo@0: */ paulo@0: if (get_ttl (msg->data) == 1 && get_hops (msg->data) == 0) paulo@0: return 0; paulo@0: } paulo@0: return 4; paulo@0: paulo@0: case GT_MSG_QUERY: paulo@0: { paulo@0: /* make queries from this node more important */ paulo@0: if (get_ttl (msg->data) == 1 && get_hops (msg->data) == 0) paulo@0: return 1; paulo@0: } paulo@0: return 3; paulo@0: paulo@0: case GT_MSG_QUERY_REPLY: paulo@0: return 2; paulo@0: paulo@0: case GT_MSG_PUSH: paulo@0: return 1; paulo@0: paulo@0: case GT_MSG_BYE: paulo@0: return 0; paulo@0: } paulo@0: paulo@0: abort (); paulo@0: } paulo@0: paulo@0: static void enqueue_packet (struct packet_queue *pkt_queue, struct io_buf *msg) paulo@0: { paulo@0: pkt_queue->queue = list_append (pkt_queue->queue, msg); paulo@0: } paulo@0: paulo@0: /* paulo@0: * Called from upper layer when it wants to send us a message buffer. paulo@0: */ paulo@0: static tx_status_t tx_packet_queue (struct tx_layer *tx, struct io_buf *io_buf) paulo@0: { paulo@0: struct tx_packet *tx_packet = tx->udata; paulo@0: size_t queue_nr; paulo@0: paulo@0: queue_nr = get_queue (io_buf); paulo@0: paulo@0: assert (queue_nr < NR_QUEUES); paulo@0: enqueue_packet (&tx_packet->queues[queue_nr], io_buf); paulo@0: paulo@0: tx_packet->total_pkts++; paulo@0: assert (tx_packet->total_pkts > 0); paulo@0: paulo@0: trace_queue (tx, "*0*"); paulo@0: paulo@0: return TX_OK; paulo@0: } paulo@0: paulo@0: /*****************************************************************************/ paulo@0: paulo@0: static void set_queue (struct packet_queue *queue, gt_packet_type_t msg_type, paulo@0: size_t prio) paulo@0: { paulo@0: queue->msg_type = msg_type; paulo@0: queue->ratio = prio; paulo@0: } paulo@0: paulo@0: static void reset_ratios (struct packet_queue *queue, size_t len) paulo@0: { paulo@0: set_queue (&queue[0], 0xff, URGENT_RATIO); paulo@0: set_queue (&queue[1], GT_MSG_PUSH, PUSH_RATIO); paulo@0: set_queue (&queue[2], GT_MSG_QUERY_REPLY, QHIT_RATIO); paulo@0: set_queue (&queue[3], GT_MSG_QUERY, QUERY_RATIO); paulo@0: set_queue (&queue[4], GT_MSG_PING_REPLY, PONG_RATIO); paulo@0: set_queue (&queue[5], GT_MSG_PING, PING_RATIO); paulo@0: set_queue (&queue[6], 0xff, MISC_RATIO); paulo@0: } paulo@0: paulo@0: /*****************************************************************************/ paulo@0: paulo@0: /* paulo@0: * Try to send a single message buffer from the packet queue to the lower paulo@0: * layer. If the lower layer has become saturated, return FALSE. paulo@0: * paulo@0: * The lower layer takes responsibility for the messages sent to it in paulo@0: * entirety in gt_tx_layer_queue() unless it is full. In that case it paulo@0: * returns TX_FULL. paulo@0: */ paulo@0: static tx_status_t shift_queue (struct tx_layer *tx, paulo@0: struct tx_packet *tx_packet, paulo@0: struct packet_queue *pkt_queue) paulo@0: { paulo@0: List *msg_l; paulo@0: struct io_buf *msg; paulo@0: tx_status_t ret; paulo@0: paulo@0: msg_l = list_nth (pkt_queue->queue, 0); paulo@0: msg = msg_l->data; paulo@0: paulo@0: ret = gt_tx_layer_queue (tx, msg); paulo@0: paulo@0: if (ret != TX_OK) paulo@0: { paulo@0: assert (ret != TX_EMPTY); /* impossible to be empty */ paulo@0: return ret; paulo@0: } paulo@0: paulo@0: /* shift this packet off the queue */ paulo@0: pkt_queue->queue = list_remove_link (pkt_queue->queue, msg_l); paulo@0: paulo@0: tx_packet->total_pkts--; paulo@0: assert (tx_packet->total_pkts >= 0); paulo@0: paulo@0: if (TX_PACKET_DEBUG) paulo@0: trace_queue (tx, "*2*"); paulo@0: paulo@0: return ret; paulo@0: } paulo@0: paulo@0: static tx_status_t service_queues (struct tx_layer *layer, paulo@0: struct tx_packet *tx_packet) paulo@0: { paulo@0: int i; paulo@0: tx_status_t ret; paulo@0: paulo@0: for (i = 0; i < NR_QUEUES; i++) paulo@0: { paulo@0: struct packet_queue *pkt_queue = &tx_packet->queues[i]; paulo@0: paulo@0: /* skip if ratio is small */ paulo@0: while (pkt_queue->ratio > 0 && pkt_queue->queue != NULL) paulo@0: { paulo@0: ret = shift_queue (layer, tx_packet, pkt_queue); paulo@0: pkt_queue->ratio--; paulo@0: paulo@0: if (ret == TX_FULL) paulo@0: return TX_OK; paulo@0: paulo@0: if (ret != TX_OK) paulo@0: return ret; paulo@0: } paulo@0: } paulo@0: paulo@0: /* reset the ratios to write more data */ paulo@0: reset_ratios (tx_packet->queues, NR_QUEUES); paulo@0: paulo@0: /* we wrote something, so return ok */ paulo@0: if (tx_packet->total_pkts == 0) paulo@0: return TX_OK; paulo@0: paulo@0: /* tail recurse until lower layer is saturated */ paulo@0: return service_queues (layer, tx_packet); paulo@0: } paulo@0: paulo@0: /* paulo@0: * Gets called when the lower layer is writable. paulo@0: */ paulo@0: static tx_status_t tx_packet_ready (struct tx_layer *tx) paulo@0: { paulo@0: struct tx_packet *tx_packet = tx->udata; paulo@0: paulo@0: if (tx_packet->total_pkts == 0) paulo@0: return TX_EMPTY; paulo@0: paulo@0: if (TX_PACKET_DEBUG) paulo@0: trace_queue (tx, "*1*"); paulo@0: paulo@0: return service_queues (tx, tx_packet); paulo@0: } paulo@0: paulo@0: /*****************************************************************************/ paulo@0: paulo@0: static BOOL tx_packet_init (struct tx_layer *tx) paulo@0: { paulo@0: struct tx_packet *tx_packet; paulo@0: int i; paulo@0: paulo@0: if (!(tx_packet = malloc (sizeof (struct tx_packet)))) paulo@0: return FALSE; paulo@0: paulo@0: tx_packet->total_pkts = 0; paulo@0: paulo@0: for (i = 0; i < NR_QUEUES; i++) paulo@0: { paulo@0: tx_packet->queues[i].queue = NULL; paulo@0: tx_packet->queues[i].bytes_queued = 0; paulo@0: } paulo@0: paulo@0: reset_ratios (tx_packet->queues, NR_QUEUES); paulo@0: paulo@0: tx->udata = tx_packet; paulo@0: paulo@0: return TRUE; paulo@0: } paulo@0: paulo@0: static BOOL free_io_buf (struct io_buf *io_buf, void *udata) paulo@0: { paulo@0: io_buf_free (io_buf); paulo@0: return TRUE; paulo@0: } paulo@0: paulo@0: static void flush_packets (struct packet_queue *pkt_queue) paulo@0: { paulo@0: list_foreach_remove (pkt_queue->queue, (ListForeachFunc)free_io_buf, NULL); paulo@0: pkt_queue = NULL; paulo@0: } paulo@0: paulo@0: static void tx_packet_destroy (struct tx_layer *tx) paulo@0: { paulo@0: struct tx_packet *tx_packet = tx->udata; paulo@0: int i; paulo@0: paulo@0: for (i = 0; i < NR_QUEUES; i++) paulo@0: flush_packets (&tx_packet->queues[i]); paulo@0: paulo@0: FREE (tx_packet); paulo@0: } paulo@0: paulo@0: static void tx_packet_consume (struct tx_layer *tx, BOOL stop) paulo@0: { paulo@0: /* nothing */ paulo@0: } paulo@0: paulo@0: /*****************************************************************************/ paulo@0: paulo@0: static void tx_packet_enable (struct tx_layer *tx) paulo@0: { paulo@0: /* TODO */ paulo@0: } paulo@0: paulo@0: static void tx_packet_disable (struct tx_layer *tx) paulo@0: { paulo@0: /* TODO */ paulo@0: } paulo@0: paulo@0: /*****************************************************************************/ paulo@0: paulo@0: struct tx_layer_ops gt_tx_packet_ops = paulo@0: { paulo@0: tx_packet_init, paulo@0: tx_packet_destroy, paulo@0: tx_packet_consume, paulo@0: tx_packet_queue, paulo@0: tx_packet_ready, paulo@0: tx_packet_enable, paulo@0: tx_packet_disable, paulo@0: };