From 5496ca9d07ede9aaa3afd86b60cdf16ed8ccca2b Mon Sep 17 00:00:00 2001 From: Andre Noll Date: Tue, 5 Jan 2010 20:38:39 +0100 Subject: [PATCH] btr support for the UDP receiver. --- udp_recv.c | 58 +++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 57 insertions(+), 1 deletion(-) diff --git a/udp_recv.c b/udp_recv.c index f9782985..bb166c51 100644 --- a/udp_recv.c +++ b/udp_recv.c @@ -22,6 +22,7 @@ #include "string.h" #include "net.h" #include "fd.h" +#include "buffer_tree.h" /** The size of the receiver node buffer. */ #define UDP_RECV_CHUNK_SIZE (128 * 1024) @@ -39,7 +40,13 @@ static void udp_recv_pre_select(struct sched *s, struct task *t) { struct receiver_node *rn = container_of(t, struct receiver_node, task); struct private_udp_recv_data *purd = rn->private_data; + int ret; + if (rn->btrn) { + ret = generic_recv_pre_select(s, t); + if (ret <= 0) + return; + } para_fd_set(purd->fd, &s->rfds, &s->max_fileno); } @@ -59,7 +66,7 @@ static int add_rn_output(struct receiver_node *rn, char *buf, size_t len) return 1; } -static void udp_recv_post_select(__a_unused struct sched *s, struct task *t) +static void udp_recv_post_select_nobtr(__a_unused struct sched *s, struct task *t) { struct receiver_node *rn = container_of(t, struct receiver_node, task); struct private_udp_recv_data *purd = rn->private_data; @@ -94,6 +101,54 @@ success: t->error = 1; } +#define UDP_RECV_READ_BUF_SIZE 1500 + +static void udp_recv_post_select_btr(__a_unused struct sched *s, struct task *t) +{ + struct receiver_node *rn = container_of(t, struct receiver_node, task); + struct private_udp_recv_data *purd = rn->private_data; + struct btr_node *btrn = rn->btrn; + int ret; + char *buf = NULL; + size_t packet_size; + + t->error = 0; + ret = btr_node_status(btrn, 0, BTR_NT_ROOT); + if (ret < 0) + goto err; + if (ret == 0) + return; + if (!FD_ISSET(purd->fd, &s->rfds)) + return; + buf = para_malloc(UDP_RECV_READ_BUF_SIZE); + ret = recv_bin_buffer(purd->fd, buf, UDP_RECV_READ_BUF_SIZE); + if (ret == 0) + ret = -E_RECV_EOF; + if (ret < 0) + goto err; + packet_size = ret; + if (packet_size >= FEC_EOF_PACKET_LEN) { + if (!memcmp(buf, FEC_EOF_PACKET, FEC_EOF_PACKET_LEN)) { + ret = -E_RECV_EOF; + goto err; + } + } + btr_add_output(buf, ret, btrn); + return; +err: + free(buf); + btr_remove_node(btrn); + t->error = ret; +} + +static void udp_recv_post_select(__a_unused struct sched *s, struct task *t) +{ + struct receiver_node *rn = container_of(t, struct receiver_node, task); + if (rn->btrn) + return udp_recv_post_select_btr(s, t); + udp_recv_post_select_nobtr(s, t); +} + static void udp_shutdown(void) { return; @@ -107,6 +162,7 @@ static void udp_recv_close(struct receiver_node *rn) close(purd->fd); free(rn->private_data); free(rn->buf); + udp_recv_cmdline_parser_free(rn->conf); } static void *udp_recv_parse_config(int argc, char **argv) -- 2.39.5