From 0b101563e67c91bc660f6a16b5889901d9e8eeff Mon Sep 17 00:00:00 2001 From: Andre Noll Date: Tue, 29 Dec 2009 00:42:47 +0100 Subject: [PATCH] btr: Make it kind of work for http recv + stdout. --- buffer_tree.c | 32 +++++++++++++++++--------- buffer_tree.h | 4 ++++ configure.ac | 4 ++-- error.h | 5 ++++- http_recv.c | 38 +++++++++++++++++++++---------- recv.c | 8 +++---- stdout.c | 62 ++++++++++++++++++++++++++++++++++++++++++++++++--- stdout.h | 4 ++-- 8 files changed, 123 insertions(+), 34 deletions(-) diff --git a/buffer_tree.c b/buffer_tree.c index 941d58db..1046460d 100644 --- a/buffer_tree.c +++ b/buffer_tree.c @@ -141,14 +141,6 @@ bool btr_inplace_ok(struct btr_node *btrn) return list_is_singular(&btrn->parent->children); } -struct btr_buffer_reference *btr_next_br(struct btr_node *btrn) -{ - if (list_empty(&btrn->input_queue)) - return NULL; - return list_first_entry(&btrn->input_queue, struct btr_buffer_reference, node); -} - - static inline size_t br_available_bytes(struct btr_buffer_reference *br) { return br->btrb->size - br->consumed; @@ -160,9 +152,26 @@ size_t btr_get_buffer_by_reference(struct btr_buffer_reference *br, char **buf) return br_available_bytes(br); } -void btr_increase_used_bytes(struct btr_buffer_reference *br, size_t consumed) +size_t btr_next_buffer(struct btr_node *btrn, char **bufp) +{ + struct btr_buffer_reference *br; + + if (list_empty(&btrn->input_queue)) { + *bufp = NULL; + return 0; + } + br = list_first_entry(&btrn->input_queue, struct btr_buffer_reference, node); + return btr_get_buffer_by_reference(br, bufp); +} + +void btr_consume(struct btr_node *btrn, size_t numbytes) { - br->consumed += consumed; + struct btr_buffer_reference *br; + + assert(!list_empty(&btrn->input_queue)); + br = list_first_entry(&btrn->input_queue, struct btr_buffer_reference, node); + assert(br->consumed + numbytes <= br->btrb->size); + br->consumed += numbytes; if (br->consumed == br->btrb->size) btr_drop_buffer_reference(br); } @@ -178,6 +187,9 @@ void btr_del_node(struct btr_node *btrn) { struct btr_node *ch; + if (!btrn) + return; + PARA_NOTICE_LOG("deleting %s\n", btrn->name); FOR_EACH_CHILD(ch, btrn) ch->parent = NULL; flush_input_queue(btrn); diff --git a/buffer_tree.h b/buffer_tree.h index b066ad9e..d3409736 100644 --- a/buffer_tree.h +++ b/buffer_tree.h @@ -6,3 +6,7 @@ void btr_del_node(struct btr_node *btrn); void btr_add_output(char *buf, size_t size, struct btr_node *btrn); bool btr_is_leaf_node(struct btr_node *btrn); size_t btr_bytes_pending(struct btr_node *btrn); +size_t btr_get_input_queue_size(struct btr_node *btrn); +bool btr_no_parent(struct btr_node *btrn); +size_t btr_next_buffer(struct btr_node *btrn, char **bufp); +void btr_consume(struct btr_node *btrn, size_t numbytes); diff --git a/configure.ac b/configure.ac index f7e3f8d3..aa62ebc7 100644 --- a/configure.ac +++ b/configure.ac @@ -108,7 +108,7 @@ senders=" http dccp udp" filter_cmdline_objs="add_cmdline(filter compress_filter amp_filter prebuffer_filter)" filter_errlist_objs="filter_common wav_filter compress_filter filter string stdin stdout sched fd amp_filter ggo fecdec_filter fec - prebuffer_filter time bitstream imdct wma_common wmadec_filter" + prebuffer_filter time bitstream imdct wma_common wmadec_filter buffer_tree" filter_ldflags="-lm" filters=" compress wav amp fecdec wmadec prebuffer" @@ -147,7 +147,7 @@ default_writer="FILE_WRITE" client_cmdline_objs="add_cmdline(client)" client_errlist_objs="client net string crypt fd sched stdin stdout - client_common sha1" + client_common sha1 buffer_tree" client_ldflags="" gui_cmdline_objs="add_cmdline(gui)" diff --git a/error.h b/error.h index 6b5e300f..4aec60d8 100644 --- a/error.h +++ b/error.h @@ -23,7 +23,6 @@ DEFINE_ERRLIST_OBJECT_ENUM; #define SHA1_ERRORS #define RBTREE_ERRORS #define RECV_ERRORS -#define STDOUT_ERRORS #define IPC_ERRORS #define DCCP_SEND_ERRORS #define HTTP_SEND_ERRORS @@ -39,6 +38,10 @@ DEFINE_ERRLIST_OBJECT_ENUM; extern const char **para_errlist[]; +#define STDOUT_ERRORS \ + PARA_ERROR(ORPHAN, "orphaned (EOF)"), \ + + #define BITSTREAM_ERRORS \ PARA_ERROR(VLC, "invalid vlc code"), \ diff --git a/http_recv.c b/http_recv.c index bedd989e..abb8c1e8 100644 --- a/http_recv.c +++ b/http_recv.c @@ -108,9 +108,9 @@ static void http_recv_post_select(struct sched *s, struct task *t) if (rn->output_error && *rn->output_error < 0) { t->error = *rn->output_error; - return; + goto err; } - if (phd->status == HTTP_CONNECTED) { + if (phd->status == HTTP_CONNECTED) { char *rq; if (!FD_ISSET(phd->fd, &s->wfds)) return; @@ -118,25 +118,27 @@ static void http_recv_post_select(struct sched *s, struct task *t) PARA_INFO_LOG("sending http request\n"); t->error = send_va_buffer(phd->fd, "%s", rq); free(rq); - if (t->error >= 0) - phd->status = HTTP_SENT_GET_REQUEST; + if (t->error < 0) + goto err; + phd->status = HTTP_SENT_GET_REQUEST; return; } if (!FD_ISSET(phd->fd, &s->rfds)) return; if (phd->status == HTTP_SENT_GET_REQUEST) { t->error = recv_pattern(phd->fd, HTTP_OK_MSG, strlen(HTTP_OK_MSG)); - if (t->error >= 0) { - PARA_INFO_LOG("received ok msg, streaming\n"); - phd->status = HTTP_STREAMING; - } + if (t->error < 0) + goto err; + PARA_INFO_LOG("received ok msg, streaming\n"); + phd->status = HTTP_STREAMING; return; } if (conf->buffer_tree_given) { char *buf; + if (btr_bytes_pending(rn->btrn) > HTTP_RECV_MAX_PENDING) { t->error = -E_HTTP_RECV_OVERRUN; - return; + goto err; } buf = para_malloc(HTTP_RECV_READ_BUF_SIZE); t->error = recv_bin_buffer(phd->fd, buf, HTTP_RECV_READ_BUF_SIZE); @@ -144,26 +146,36 @@ static void http_recv_post_select(struct sched *s, struct task *t) t->error = -E_RECV_EOF; if (t->error < 0) { free(buf); - return; + goto err; } btr_add_output(buf, t->error, rn->btrn); return; } t->error = -E_HTTP_RECV_OVERRUN; if (rn->loaded >= BUFSIZE) - return; + goto err; t->error = recv_bin_buffer(phd->fd, rn->buf + rn->loaded, BUFSIZE - rn->loaded); if (t->error == 0) t->error = -E_RECV_EOF; if (t->error < 0) - return; + goto err; rn->loaded += t->error; + return; +err: + if (conf->buffer_tree_given) { + btr_del_node(rn->btrn); + rn->btrn = NULL; + } } static void http_recv_close(struct receiver_node *rn) { struct private_http_recv_data *phd = rn->private_data; + struct http_recv_args_info *conf = rn->conf; + + if (conf->buffer_tree_given) + btr_del_node(rn->btrn); close(phd->fd); free(rn->buf); free(rn->private_data); @@ -198,6 +210,8 @@ static int http_recv_open(struct receiver_node *rn) rn->private_data = phd = para_calloc(sizeof(struct private_http_recv_data)); phd->fd = fd; phd->status = HTTP_CONNECTED; + if (conf->buffer_tree_given) + rn->btrn = btr_new_node("receiver", NULL); return 1; } diff --git a/recv.c b/recv.c index 1bb0532a..297fbe92 100644 --- a/recv.c +++ b/recv.c @@ -92,12 +92,14 @@ int main(int argc, char *argv[]) } r = &receivers[receiver_num]; rn.receiver = r; - rn.btrn = btr_new_node("receiver", NULL /* no parent */); ret = r->open(&rn); if (ret < 0) goto out; r_opened = 1; + if (conf.buffer_tree_given) + sot.btrn = btr_new_node("stdout", rn.btrn); + stdout_set_defaults(&sot); sot.bufp = &rn.buf; sot.loaded = &rn.loaded; @@ -111,10 +113,8 @@ int main(int argc, char *argv[]) ret = schedule(&s); out: - if (r_opened) { - btr_del_node(rn.btrn); + if (r_opened) r->close(&rn); - } if (r) r->shutdown(); if (ret < 0) diff --git a/stdout.c b/stdout.c index 4e3b6df9..6539b68f 100644 --- a/stdout.c +++ b/stdout.c @@ -16,6 +16,7 @@ #include "fd.h" #include "error.h" #include "stdout.h" +#include "buffer_tree.h" /** * The pre_select function of the stdout task. @@ -44,6 +45,26 @@ static void stdout_pre_select(struct sched *s, struct task *t) para_fd_set(STDOUT_FILENO, &s->wfds, &s->max_fileno); } +static void stdout_pre_select_btr(struct sched *s, struct task *t) +{ + struct stdout_task *sot = container_of(t, struct stdout_task, task); + size_t sz = btr_get_input_queue_size(sot->btrn); + + t->error = 0; + sot->check_fd = 0; + if (sz == 0) { + if (btr_no_parent(sot->btrn)) { + t->error = -E_ORPHAN; + btr_del_node(sot->btrn); + s->timeout.tv_sec = 0; + s->timeout.tv_usec = 1; + } + return; + } + sot->check_fd = 1; + para_fd_set(STDOUT_FILENO, &s->wfds, &s->max_fileno); +} + /** * The post select function of the stdout task. * @@ -78,6 +99,37 @@ static void stdout_post_select(struct sched *s, struct task *t) memmove(*sot->bufp, *sot->bufp + ret, *sot->loaded); } +static void stdout_post_select_btr(struct sched *s, struct task *t) +{ + struct stdout_task *sot = container_of(t, struct stdout_task, task); + ssize_t ret; + size_t sz = btr_get_input_queue_size(sot->btrn); + bool orphan = btr_no_parent(sot->btrn); + char *buf; + + t->error = 0; + if (!sot->check_fd) { + if (sz == 0 && orphan) { + t->error = -E_ORPHAN; + goto err; + } + return; + } + if (!FD_ISSET(STDOUT_FILENO, &s->wfds)) + return; + sz = btr_next_buffer(sot->btrn, &buf); + if (sz == 0) + return; + ret = write(STDOUT_FILENO, buf, sz); + if (ret < 0) { + t->error = -ERRNO_TO_PARA_ERROR(errno); + goto err; + } + btr_consume(sot->btrn, ret); + return; +err: + btr_del_node(sot->btrn); +} /** * Initialize a stdout task structure with default values. * @@ -90,9 +142,13 @@ void stdout_set_defaults(struct stdout_task *sot) { int ret; - sot->use_buffer_tree = false; - sot->task.pre_select = stdout_pre_select; - sot->task.post_select = stdout_post_select; + if (sot->btrn) { + sot->task.pre_select = stdout_pre_select_btr; + sot->task.post_select = stdout_post_select_btr; + } else { + sot->task.pre_select = stdout_pre_select; + sot->task.post_select = stdout_post_select; + } sprintf(sot->task.status, "stdout writer"); ret = mark_fd_nonblocking(STDOUT_FILENO); if (ret >= 0) diff --git a/stdout.h b/stdout.h index c3eb311d..fee9800c 100644 --- a/stdout.h +++ b/stdout.h @@ -20,8 +20,8 @@ struct stdout_task { struct task task; /** Whether \p STDOUT_FILENO was included in the write fd set. */ int check_fd; - /** Whether to use the buffer tree API. */ - bool use_buffer_tree; + /** Non-null if buffer tree API should be used. */ + struct btr_node *btrn; }; void stdout_set_defaults(struct stdout_task *sot); -- 2.39.5