From d17f43b13cffb458a9931e878086e3d2caa28d25 Mon Sep 17 00:00:00 2001 From: Andre Noll Date: Sat, 9 Jan 2010 18:41:07 +0100 Subject: [PATCH] udp_recv: Use buffer pool API. Although fecdec, the filter receiving the output of the udp receiver, does not strictly benefit from this change, using buffer pools in receivers has the advantage that we don't need to guess the input buffer size or call realloc() after the receive to shrink the buffer. --- udp_recv.c | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/udp_recv.c b/udp_recv.c index f68a7100..0c5a3abf 100644 --- a/udp_recv.c +++ b/udp_recv.c @@ -33,6 +33,7 @@ struct private_udp_recv_data { /** The socket file descriptor. */ int fd; + struct btr_pool *btrp; }; static void udp_recv_pre_select(struct sched *s, struct task *t) @@ -100,8 +101,6 @@ success: t->error = 1; } -#define UDP_RECV_READ_BUF_SIZE 1500 - static void udp_recv_post_select_btr(__a_unused struct sched *s, struct task *t) { struct receiver_node *rn = container_of(t, struct receiver_node, task); @@ -109,7 +108,7 @@ static void udp_recv_post_select_btr(__a_unused struct sched *s, struct task *t) struct btr_node *btrn = rn->btrn; int ret; char *buf = NULL; - size_t packet_size; + size_t packet_size, bufsize; t->error = 0; ret = btr_node_status(btrn, 0, BTR_NT_ROOT); @@ -119,8 +118,11 @@ static void udp_recv_post_select_btr(__a_unused struct sched *s, struct task *t) return; if (!FD_ISSET(purd->fd, &s->rfds)) return; - buf = para_malloc(UDP_RECV_READ_BUF_SIZE); - ret = recv_bin_buffer(purd->fd, buf, UDP_RECV_READ_BUF_SIZE); + bufsize = btr_pool_get_buffer(purd->btrp, &buf); + ret = -E_UDP_OVERRUN; + if (bufsize == 0) + goto err; + ret = recv_bin_buffer(purd->fd, buf, bufsize); if (ret == 0) ret = -E_RECV_EOF; if (ret < 0) @@ -128,16 +130,19 @@ static void udp_recv_post_select_btr(__a_unused struct sched *s, struct task *t) packet_size = ret; if (packet_size >= FEC_EOF_PACKET_LEN) { if (!memcmp(buf, FEC_EOF_PACKET, FEC_EOF_PACKET_LEN)) { + PARA_CRIT_LOG("%p: eof\n", rn); ret = -E_RECV_EOF; goto err; } } - btr_add_output(buf, ret, btrn); + btr_pool_allocate(purd->btrp, packet_size); + btr_add_output_pool(purd->btrp, buf, packet_size, btrn); return; err: - free(buf); btr_remove_node(btrn); t->error = ret; + close(purd->fd); + purd->fd = -1; } static void udp_recv_post_select(__a_unused struct sched *s, struct task *t) @@ -159,9 +164,10 @@ static void udp_recv_close(struct receiver_node *rn) if (purd->fd >= 0) close(purd->fd); + PARA_CRIT_LOG("%p: close\n", rn); + btr_pool_free(purd->btrp); free(rn->private_data); free(rn->buf); - udp_recv_cmdline_parser_free(rn->conf); } static void *udp_recv_parse_config(int argc, char **argv) @@ -261,10 +267,13 @@ static int udp_recv_open(struct receiver_node *rn) } ret = mark_fd_nonblocking(purd->fd); - if (ret < 0) + if (ret < 0) { + close(purd->fd); goto err; - PARA_NOTICE_LOG("receiving from %s:%d, fd=%d\n", c->host_arg, + } + PARA_CRIT_LOG("rn %p: receiving from %s:%d, fd=%d\n", rn, c->host_arg, c->port_arg, purd->fd); + purd->btrp = btr_pool_new(320 * 1024); return purd->fd; err: free(rn->private_data); -- 2.39.5