return list_is_singular(&btrn->parent->children);
}
-struct btr_buffer_reference *btr_next_br(struct btr_node *btrn)
-{
- if (list_empty(&btrn->input_queue))
- return NULL;
- return list_first_entry(&btrn->input_queue, struct btr_buffer_reference, node);
-}
-
-
static inline size_t br_available_bytes(struct btr_buffer_reference *br)
{
return br->btrb->size - br->consumed;
return br_available_bytes(br);
}
-void btr_increase_used_bytes(struct btr_buffer_reference *br, size_t consumed)
+size_t btr_next_buffer(struct btr_node *btrn, char **bufp)
+{
+ struct btr_buffer_reference *br;
+
+ if (list_empty(&btrn->input_queue)) {
+ *bufp = NULL;
+ return 0;
+ }
+ br = list_first_entry(&btrn->input_queue, struct btr_buffer_reference, node);
+ return btr_get_buffer_by_reference(br, bufp);
+}
+
+void btr_consume(struct btr_node *btrn, size_t numbytes)
{
- br->consumed += consumed;
+ struct btr_buffer_reference *br;
+
+ assert(!list_empty(&btrn->input_queue));
+ br = list_first_entry(&btrn->input_queue, struct btr_buffer_reference, node);
+ assert(br->consumed + numbytes <= br->btrb->size);
+ br->consumed += numbytes;
if (br->consumed == br->btrb->size)
btr_drop_buffer_reference(br);
}
{
struct btr_node *ch;
+ if (!btrn)
+ return;
+ PARA_NOTICE_LOG("deleting %s\n", btrn->name);
FOR_EACH_CHILD(ch, btrn)
ch->parent = NULL;
flush_input_queue(btrn);
void btr_add_output(char *buf, size_t size, struct btr_node *btrn);
bool btr_is_leaf_node(struct btr_node *btrn);
size_t btr_bytes_pending(struct btr_node *btrn);
+size_t btr_get_input_queue_size(struct btr_node *btrn);
+bool btr_no_parent(struct btr_node *btrn);
+size_t btr_next_buffer(struct btr_node *btrn, char **bufp);
+void btr_consume(struct btr_node *btrn, size_t numbytes);
filter_cmdline_objs="add_cmdline(filter compress_filter amp_filter prebuffer_filter)"
filter_errlist_objs="filter_common wav_filter compress_filter filter string
stdin stdout sched fd amp_filter ggo fecdec_filter fec
- prebuffer_filter time bitstream imdct wma_common wmadec_filter"
+ prebuffer_filter time bitstream imdct wma_common wmadec_filter buffer_tree"
filter_ldflags="-lm"
filters=" compress wav amp fecdec wmadec prebuffer"
client_cmdline_objs="add_cmdline(client)"
client_errlist_objs="client net string crypt fd sched stdin stdout
- client_common sha1"
+ client_common sha1 buffer_tree"
client_ldflags=""
gui_cmdline_objs="add_cmdline(gui)"
#define SHA1_ERRORS
#define RBTREE_ERRORS
#define RECV_ERRORS
-#define STDOUT_ERRORS
#define IPC_ERRORS
#define DCCP_SEND_ERRORS
#define HTTP_SEND_ERRORS
extern const char **para_errlist[];
+#define STDOUT_ERRORS \
+ PARA_ERROR(ORPHAN, "orphaned (EOF)"), \
+
+
#define BITSTREAM_ERRORS \
PARA_ERROR(VLC, "invalid vlc code"), \
if (rn->output_error && *rn->output_error < 0) {
t->error = *rn->output_error;
- return;
+ goto err;
}
- if (phd->status == HTTP_CONNECTED) {
+ if (phd->status == HTTP_CONNECTED) {
char *rq;
if (!FD_ISSET(phd->fd, &s->wfds))
return;
PARA_INFO_LOG("sending http request\n");
t->error = send_va_buffer(phd->fd, "%s", rq);
free(rq);
- if (t->error >= 0)
- phd->status = HTTP_SENT_GET_REQUEST;
+ if (t->error < 0)
+ goto err;
+ phd->status = HTTP_SENT_GET_REQUEST;
return;
}
if (!FD_ISSET(phd->fd, &s->rfds))
return;
if (phd->status == HTTP_SENT_GET_REQUEST) {
t->error = recv_pattern(phd->fd, HTTP_OK_MSG, strlen(HTTP_OK_MSG));
- if (t->error >= 0) {
- PARA_INFO_LOG("received ok msg, streaming\n");
- phd->status = HTTP_STREAMING;
- }
+ if (t->error < 0)
+ goto err;
+ PARA_INFO_LOG("received ok msg, streaming\n");
+ phd->status = HTTP_STREAMING;
return;
}
if (conf->buffer_tree_given) {
char *buf;
+
if (btr_bytes_pending(rn->btrn) > HTTP_RECV_MAX_PENDING) {
t->error = -E_HTTP_RECV_OVERRUN;
- return;
+ goto err;
}
buf = para_malloc(HTTP_RECV_READ_BUF_SIZE);
t->error = recv_bin_buffer(phd->fd, buf, HTTP_RECV_READ_BUF_SIZE);
t->error = -E_RECV_EOF;
if (t->error < 0) {
free(buf);
- return;
+ goto err;
}
btr_add_output(buf, t->error, rn->btrn);
return;
}
t->error = -E_HTTP_RECV_OVERRUN;
if (rn->loaded >= BUFSIZE)
- return;
+ goto err;
t->error = recv_bin_buffer(phd->fd, rn->buf + rn->loaded,
BUFSIZE - rn->loaded);
if (t->error == 0)
t->error = -E_RECV_EOF;
if (t->error < 0)
- return;
+ goto err;
rn->loaded += t->error;
+ return;
+err:
+ if (conf->buffer_tree_given) {
+ btr_del_node(rn->btrn);
+ rn->btrn = NULL;
+ }
}
static void http_recv_close(struct receiver_node *rn)
{
struct private_http_recv_data *phd = rn->private_data;
+ struct http_recv_args_info *conf = rn->conf;
+
+ if (conf->buffer_tree_given)
+ btr_del_node(rn->btrn);
close(phd->fd);
free(rn->buf);
free(rn->private_data);
rn->private_data = phd = para_calloc(sizeof(struct private_http_recv_data));
phd->fd = fd;
phd->status = HTTP_CONNECTED;
+ if (conf->buffer_tree_given)
+ rn->btrn = btr_new_node("receiver", NULL);
return 1;
}
}
r = &receivers[receiver_num];
rn.receiver = r;
- rn.btrn = btr_new_node("receiver", NULL /* no parent */);
ret = r->open(&rn);
if (ret < 0)
goto out;
r_opened = 1;
+ if (conf.buffer_tree_given)
+ sot.btrn = btr_new_node("stdout", rn.btrn);
+
stdout_set_defaults(&sot);
sot.bufp = &rn.buf;
sot.loaded = &rn.loaded;
ret = schedule(&s);
out:
- if (r_opened) {
- btr_del_node(rn.btrn);
+ if (r_opened)
r->close(&rn);
- }
if (r)
r->shutdown();
if (ret < 0)
#include "fd.h"
#include "error.h"
#include "stdout.h"
+#include "buffer_tree.h"
/**
* The pre_select function of the stdout task.
para_fd_set(STDOUT_FILENO, &s->wfds, &s->max_fileno);
}
+static void stdout_pre_select_btr(struct sched *s, struct task *t)
+{
+ struct stdout_task *sot = container_of(t, struct stdout_task, task);
+ size_t sz = btr_get_input_queue_size(sot->btrn);
+
+ t->error = 0;
+ sot->check_fd = 0;
+ if (sz == 0) {
+ if (btr_no_parent(sot->btrn)) {
+ t->error = -E_ORPHAN;
+ btr_del_node(sot->btrn);
+ s->timeout.tv_sec = 0;
+ s->timeout.tv_usec = 1;
+ }
+ return;
+ }
+ sot->check_fd = 1;
+ para_fd_set(STDOUT_FILENO, &s->wfds, &s->max_fileno);
+}
+
/**
* The post select function of the stdout task.
*
memmove(*sot->bufp, *sot->bufp + ret, *sot->loaded);
}
+static void stdout_post_select_btr(struct sched *s, struct task *t)
+{
+ struct stdout_task *sot = container_of(t, struct stdout_task, task);
+ ssize_t ret;
+ size_t sz = btr_get_input_queue_size(sot->btrn);
+ bool orphan = btr_no_parent(sot->btrn);
+ char *buf;
+
+ t->error = 0;
+ if (!sot->check_fd) {
+ if (sz == 0 && orphan) {
+ t->error = -E_ORPHAN;
+ goto err;
+ }
+ return;
+ }
+ if (!FD_ISSET(STDOUT_FILENO, &s->wfds))
+ return;
+ sz = btr_next_buffer(sot->btrn, &buf);
+ if (sz == 0)
+ return;
+ ret = write(STDOUT_FILENO, buf, sz);
+ if (ret < 0) {
+ t->error = -ERRNO_TO_PARA_ERROR(errno);
+ goto err;
+ }
+ btr_consume(sot->btrn, ret);
+ return;
+err:
+ btr_del_node(sot->btrn);
+}
/**
* Initialize a stdout task structure with default values.
*
{
int ret;
- sot->use_buffer_tree = false;
- sot->task.pre_select = stdout_pre_select;
- sot->task.post_select = stdout_post_select;
+ if (sot->btrn) {
+ sot->task.pre_select = stdout_pre_select_btr;
+ sot->task.post_select = stdout_post_select_btr;
+ } else {
+ sot->task.pre_select = stdout_pre_select;
+ sot->task.post_select = stdout_post_select;
+ }
sprintf(sot->task.status, "stdout writer");
ret = mark_fd_nonblocking(STDOUT_FILENO);
if (ret >= 0)
struct task task;
/** Whether \p STDOUT_FILENO was included in the write fd set. */
int check_fd;
- /** Whether to use the buffer tree API. */
- bool use_buffer_tree;
+ /** Non-null if buffer tree API should be used. */
+ struct btr_node *btrn;
};
void stdout_set_defaults(struct stdout_task *sot);