next_buffer:
t->error = 0;
- ret = prepare_filter_node(fn);
+ ret = btr_node_status(btrn, fn->min_iqs);
if (ret < 0)
goto err;
if (ret == 0)
return;
+ btr_merge(btrn, fn->min_iqs);
len = btr_next_buffer(btrn, (char **)&inbuf);
iqs = btr_get_input_queue_size(btrn);
if (!padd->initialized) {
if (!pad->handle)
return 1;
if (wn->btrn) {
- size_t sz = btr_get_input_queue_size(wn->btrn);
- if (sz < pad->bytes_per_frame) {
- if (!btr_no_parent(wn->btrn))
- return 1;
+ int ret = btr_node_status(wn->btrn, wn->min_iqs);
+ if (ret == 0)
+ return 1;
+ if (ret < 0) {
underrun = 10;
goto timeout;
}
static void alsa_write_pre_select_btr(struct sched *s, struct task *t)
{
struct writer_node *wn = container_of(t, struct writer_node, task);
- t->error = alsa_write_pre_select(s, wn);
+ alsa_write_pre_select(s, wn);
}
static void xrun(snd_pcm_t *handle)
snd_pcm_close(pad->handle);
snd_config_update_free_global();
}
+ alsa_cmdline_parser_free(wn->conf);
free(pad);
}
again:
t->error = 0;
- ret = prepare_writer_node(wn);
+ ret = btr_node_status(btrn, wn->min_iqs);
if (ret == 0)
return;
+ btr_merge(btrn, wn->min_iqs);
bytes = btr_next_buffer(btrn, &data);
if (bytes < pad->bytes_per_frame) { /* eof */
assert(btr_no_parent(btrn));
ret = -E_ALSA_WRITE;
err:
assert(ret < 0);
- btr_del_node(btrn);
- alsa_close(wn);
t->error = ret;
}
return;
}
next_buffer:
- ret = prepare_filter_node(fn);
- in_bytes = btr_next_buffer(btrn, (char **)&in);
+ ret = btr_node_status(btrn, fn->min_iqs);
if (ret < 0)
goto err;
- len = in_bytes / 2;
- /* len == 0 happens if eof and in_bytes == 1. */
- if (ret == 0 || len == 0)
+ if (ret == 0)
return;
+ btr_merge(btrn, fn->min_iqs);
+ in_bytes = btr_next_buffer(btrn, (char **)&in);
+ len = in_bytes / 2;
+ if (len == 0) { /* eof and in_bytes == 1 */
+ ret = -E_AMP_EOF;
+ goto err;
+ }
if (inplace)
out = in;
{
return log_tree_recursively(btrn, loglevel, 0);
}
+
+/** 640K ought to be enough for everybody ;) */
+#define BTRN_MAX_PENDING (640 * 1024)
+
+int btr_node_status(struct btr_node *btrn, size_t min_iqs)
+{
+ size_t iqs;
+
+ if (btr_eof(btrn))
+ return -E_BTR_EOF;
+ if (btr_bytes_pending(btrn) > BTRN_MAX_PENDING)
+ return 0;
+ iqs = btr_get_input_queue_size(btrn);
+ if (iqs == 0) /* we have a parent, because not eof */
+ return 0;
+ if (iqs < min_iqs && !btr_no_parent(btrn))
+ return 0;
+ return 1;
+}
void btr_log_tree(struct btr_node *btrn, int loglevel);
int btr_pushdown_one(struct btr_node *btrn);
bool btr_inplace_ok(struct btr_node *btrn);
+int btr_node_status(struct btr_node *btrn, size_t min_iqs);
//inplace = false;
next_buffer:
t->error = 0;
- ret = prepare_filter_node(fn);
+ ret = btr_node_status(btrn, fn->min_iqs);
if (ret < 0)
goto err;
if (ret == 0)
return;
+ btr_merge(btrn, fn->min_iqs);
length = btr_next_buffer(btrn, &inbuf) & ~(size_t)1;
ip = (int16_t *)inbuf;
if (inplace)
#define SERVER_COMMAND_LIST_ERRORS
#define AFS_COMMAND_LIST_ERRORS
#define AUDIOD_COMMAND_LIST_ERRORS
-#define BUFFER_TREE_ERRORS
extern const char **para_errlist[];
+#define BUFFER_TREE_ERRORS \
+ PARA_ERROR(BTR_EOF, "buffer tree: end of file"), \
+
#define STDOUT_ERRORS \
PARA_ERROR(ORPHAN, "orphaned (EOF)"), \
#define AMP_FILTER_ERRORS \
PARA_ERROR(AMP_SYNTAX, "syntax error in amp filter config"), \
PARA_ERROR(AMP_ZERO_AMP, "no amplification necessary"), \
+ PARA_ERROR(AMP_EOF, "amp: end of file"), \
#define SEND_COMMON_ERRORS \
#include "ggo.h"
#include "buffer_tree.h"
#include "write.h"
+#include "write_common.h"
#include "string.h"
#include "fd.h"
#include "file_write.cmdline.h"
return 1;
}
+static void file_write_pre_select_btr(struct sched *s, struct task *t)
+{
+ struct writer_node *wn = container_of(t, struct writer_node, task);
+ struct private_file_write_data *pfwd = wn->private_data;
+ int ret;
+
+ t->error = 0;
+ pfwd->check_fd = 0;
+ ret = btr_node_status(wn->btrn, wn->min_iqs);
+ if (ret >= 0) {
+ para_fd_set(pfwd->fd, &s->wfds, &s->max_fileno);
+ pfwd->check_fd = 1;
+ }
+ if (ret != 0) {
+ s->timeout.tv_sec = 0;
+ s->timeout.tv_usec = 1;
+ }
+}
+
static int file_write_post_select(struct sched *s, struct writer_node *wn)
{
struct private_file_write_data *pfwd = wn->private_data;
static void file_write_close(struct writer_node *wn)
{
struct private_file_write_data *pfwd = wn->private_data;
+
close(pfwd->fd);
+ file_cmdline_parser_free(wn->conf);
free(pfwd);
}
+static void file_write_post_select_btr(__a_unused struct sched *s,
+ struct task *t)
+{
+ struct writer_node *wn = container_of(t, struct writer_node, task);
+ struct private_file_write_data *pfwd = wn->private_data;
+ struct btr_node *btrn = wn->btrn;
+ int ret;
+ char *buf;
+ size_t bytes;
+
+ t->error = 0;
+ ret = btr_node_status(btrn, wn->min_iqs);
+ if (ret == 0)
+ return;
+ if (ret < 0)
+ goto err;
+ if (!pfwd->check_fd)
+ return;
+ if (!FD_ISSET(pfwd->fd, &s->wfds))
+ return;
+ bytes = btr_next_buffer(btrn, &buf);
+ assert(bytes > 0);
+ //PARA_INFO_LOG("writing %zu\n", bytes);
+ ret = write(pfwd->fd, buf, bytes);
+ if (ret < 0)
+ goto err;
+ btr_consume(btrn, ret);
+ return;
+err:
+ assert(ret < 0);
+ t->error = ret;
+}
+
__malloc static void *file_write_parse_config(const char *options)
{
struct file_write_args_info *conf
file_cmdline_parser_init(&dummy);
w->open = file_write_open;
w->pre_select = file_write_pre_select;
+ w->pre_select_btr = file_write_pre_select_btr;
w->post_select = file_write_post_select;
+ w->post_select_btr = file_write_post_select_btr;
w->parse_config = file_write_parse_config;
w->close = file_write_close;
w->shutdown = NULL; /* nothing to do */
int check_filter_arg(char *filter_arg, void **conf);
void filter_post_select(__a_unused struct sched *s, struct task *t);
void print_filter_helps(int detailed);
-int prepare_filter_node(struct filter_node *fn);
void generic_filter_pre_select(struct sched *s, struct task *t);
static inline void write_int16_host_endian(char *buf, int val)
}
-/** 640K ought to be enough for everybody ;) */
-#define FILTER_MAX_PENDING (640 * 1024)
-
-int prepare_filter_node(struct filter_node *fn)
-{
- struct btr_node *btrn = fn->btrn;
- size_t iqs;
-
- if (btr_eof(btrn))
- return -E_FC_EOF;
- if (btr_bytes_pending(btrn) > FILTER_MAX_PENDING)
- return 0;
- iqs = btr_get_input_queue_size(btrn);
- if (iqs < fn->min_iqs && !btr_no_parent(btrn))
- return 0;
- assert(iqs != 0);
- /* avoid "buffer too small" errors from the decoder */
- btr_merge(btrn, fn->min_iqs);
- return 1;
-}
-
void generic_filter_pre_select(struct sched *s, struct task *t)
{
struct filter_node *fn = container_of(t, struct filter_node, task);
- size_t iqs = btr_get_input_queue_size(fn->btrn);
t->error = 0;
- if (iqs < fn->min_iqs)
- return;
- if (btr_bytes_pending(fn->btrn) > FILTER_MAX_PENDING)
- return; /* FIXME, should use reasonable bound on timeout */
- s->timeout.tv_sec = 0;
- s->timeout.tv_usec = 1;
+ if (btr_node_status(fn->btrn, fn->min_iqs) != 0) {
+ s->timeout.tv_sec = 0;
+ s->timeout.tv_usec = 1;
+ }
}
iqs = btr_get_input_queue_size(btrn);
if (need_bad_data_delay(pmd, iqs))
return;
- ret = prepare_filter_node(fn);
+ ret = btr_node_status(btrn, fn->min_iqs);
if (ret < 0)
goto err;
if (ret == 0)
return;
+ btr_merge(btrn, fn->min_iqs);
len = btr_next_buffer(btrn, &inbuffer);
mad_stream_buffer(&pmd->stream, (unsigned char *)inbuffer, len);
next_frame:
char *in;
t->error = 0;
- ret = prepare_filter_node(fn);
+ ret = btr_node_status(btrn, fn->min_iqs);
if (ret < 0)
goto err;
if (ret == 0)
return;
+ btr_merge(btrn, fn->min_iqs);
len = btr_next_buffer(btrn, &in);
iqs = btr_get_input_queue_size(btrn);
if (!pod->vf) {
next_buffer:
t->error = 0;
- ret = prepare_filter_node(fn);
+ ret = btr_node_status(btrn, fn->min_iqs);
if (ret < 0)
goto err;
if (ret == 0)
return;
+ btr_merge(btrn, fn->min_iqs);
len = btr_next_buffer(btrn, (char **)&in);
ret = -E_WMADEC_EOF;
if (len < fn->min_iqs)
PARA_NOTICE_LOG("wav header not found\n");
cwt->state = CWS_NO_HEADER;
sprintf(t->status, "check wav: no header");
- goto consume;
+ goto out;
}
PARA_INFO_LOG("found wav header\n");
cwt->state = CWS_HAVE_HEADER;
sprintf(t->status, "check wav: have header");
cwt->channels = (unsigned) a[22];
cwt->samplerate = a[24] + (a[25] << 8) + (a[26] << 16) + (a[27] << 24);
-consume:
PARA_INFO_LOG("channels: %d, sample rate: %d\n", cwt->channels, cwt->samplerate);
btr_consume(cwt->btrn, WAV_HEADER_LEN);
out:
- if (sz) {
+ if (sz)
btr_pushdown(cwt->btrn);
- s->timeout.tv_sec = 0;
- s->timeout.tv_usec = 1;
- } else {
+ else {
if (btr_no_parent(cwt->btrn))
t->error = -E_WRITE_EOF;
}
wns[0] = setup_writer_node(NULL, cwt->btrn);
if (!wns[0])
goto out;
+ i = 1;
} else {
wns = para_malloc(conf.writer_given * sizeof(*wns));
for (i = 0; i < conf.writer_given; i++) {
for (i--; i >= 0; i--) {
struct writer_node *wn = wns[i];
struct writer *w = writers + wn->writer_num;
+
w->close(wn);
- free(wn->conf); /* FIXME should call gengetopt cleanup funtion */
+ btr_del_node(wn->btrn);
+ free(wn->conf);
free(wn);
}
free(wns);
{
struct writer_node *wn = para_calloc(sizeof(*wn));
struct writer *w;
- const char *name;
+ char *name;
if (arg)
wn->conf = check_writer_arg(arg, &wn->writer_num);
return NULL;
}
w = writers + wn->writer_num;
- name = writer_names[wn->writer_num];
+ name = make_message("%s writer", writer_names[wn->writer_num]);
wn->btrn = btr_new_node(name, parent, w->execute, wn);
- sprintf(wn->task.status, "%s", name);
+ strcpy(wn->task.status, name);
+ free(name);
w->open(wn);
wn->task.post_select = w->post_select_btr;
wn->task.pre_select = w->pre_select_btr;
ggo_print_help(&w->help, detailed);
}
}
-
-int prepare_writer_node(struct writer_node *wn)
-{
- struct btr_node *btrn = wn->btrn;
- size_t iqs;
-
- if (btr_eof(btrn))
- return -E_WRITE_COMMON_EOF;
- iqs = btr_get_input_queue_size(btrn);
- if (iqs < wn->min_iqs && !btr_no_parent(btrn))
- return 0;
- assert(iqs != 0);
- /* avoid "buffer too small" errors from the decoder */
- btr_merge(btrn, wn->min_iqs);
- return 1;
-}
-
struct writer_node_group *setup_default_wng(void);
void print_writer_helps(int detailed);
struct writer_node *setup_writer_node(const char *arg, struct btr_node *parent);
-int prepare_writer_node(struct writer_node *wn);