From: Andre Noll Date: Tue, 5 Jan 2010 01:32:21 +0000 (+0100) Subject: Introduce btr_node_status() and add btr support to the file writer. X-Git-Tag: v0.4.2~202 X-Git-Url: http://git.tue.mpg.de/?a=commitdiff_plain;h=d0f36435b0f81368a778fda33f3a7df86830f5ac;p=paraslash.git Introduce btr_node_status() and add btr support to the file writer. --- diff --git a/aacdec_filter.c b/aacdec_filter.c index cb27633c..ad1c06a0 100644 --- a/aacdec_filter.c +++ b/aacdec_filter.c @@ -220,11 +220,12 @@ static void aacdec_post_select(__a_unused struct sched *s, struct task *t) next_buffer: t->error = 0; - ret = prepare_filter_node(fn); + ret = btr_node_status(btrn, fn->min_iqs); if (ret < 0) goto err; if (ret == 0) return; + btr_merge(btrn, fn->min_iqs); len = btr_next_buffer(btrn, (char **)&inbuf); iqs = btr_get_input_queue_size(btrn); if (!padd->initialized) { diff --git a/alsa_write.c b/alsa_write.c index 8b56f31a..67c845d3 100644 --- a/alsa_write.c +++ b/alsa_write.c @@ -173,10 +173,10 @@ static int alsa_write_pre_select(struct sched *s, struct writer_node *wn) if (!pad->handle) return 1; if (wn->btrn) { - size_t sz = btr_get_input_queue_size(wn->btrn); - if (sz < pad->bytes_per_frame) { - if (!btr_no_parent(wn->btrn)) - return 1; + int ret = btr_node_status(wn->btrn, wn->min_iqs); + if (ret == 0) + return 1; + if (ret < 0) { underrun = 10; goto timeout; } @@ -213,7 +213,7 @@ timeout: static void alsa_write_pre_select_btr(struct sched *s, struct task *t) { struct writer_node *wn = container_of(t, struct writer_node, task); - t->error = alsa_write_pre_select(s, wn); + alsa_write_pre_select(s, wn); } static void xrun(snd_pcm_t *handle) @@ -291,6 +291,7 @@ static void alsa_close(struct writer_node *wn) snd_pcm_close(pad->handle); snd_config_update_free_global(); } + alsa_cmdline_parser_free(wn->conf); free(pad); } @@ -307,9 +308,10 @@ static void alsa_write_post_select_btr(__a_unused struct sched *s, again: t->error = 0; - ret = prepare_writer_node(wn); + ret = btr_node_status(btrn, wn->min_iqs); if (ret == 0) return; + btr_merge(btrn, wn->min_iqs); bytes = btr_next_buffer(btrn, &data); if (bytes < pad->bytes_per_frame) { /* eof */ assert(btr_no_parent(btrn)); @@ -388,8 +390,6 @@ again: ret = -E_ALSA_WRITE; err: assert(ret < 0); - btr_del_node(btrn); - alsa_close(wn); t->error = ret; } diff --git a/amp_filter.c b/amp_filter.c index c0835f80..10122464 100644 --- a/amp_filter.c +++ b/amp_filter.c @@ -116,14 +116,18 @@ static void amp_post_select(__a_unused struct sched *s, struct task *t) return; } next_buffer: - ret = prepare_filter_node(fn); - in_bytes = btr_next_buffer(btrn, (char **)&in); + ret = btr_node_status(btrn, fn->min_iqs); if (ret < 0) goto err; - len = in_bytes / 2; - /* len == 0 happens if eof and in_bytes == 1. */ - if (ret == 0 || len == 0) + if (ret == 0) return; + btr_merge(btrn, fn->min_iqs); + in_bytes = btr_next_buffer(btrn, (char **)&in); + len = in_bytes / 2; + if (len == 0) { /* eof and in_bytes == 1 */ + ret = -E_AMP_EOF; + goto err; + } if (inplace) out = in; diff --git a/buffer_tree.c b/buffer_tree.c index c826e8fa..3b001541 100644 --- a/buffer_tree.c +++ b/buffer_tree.c @@ -404,3 +404,22 @@ void btr_log_tree(struct btr_node *btrn, int loglevel) { return log_tree_recursively(btrn, loglevel, 0); } + +/** 640K ought to be enough for everybody ;) */ +#define BTRN_MAX_PENDING (640 * 1024) + +int btr_node_status(struct btr_node *btrn, size_t min_iqs) +{ + size_t iqs; + + if (btr_eof(btrn)) + return -E_BTR_EOF; + if (btr_bytes_pending(btrn) > BTRN_MAX_PENDING) + return 0; + iqs = btr_get_input_queue_size(btrn); + if (iqs == 0) /* we have a parent, because not eof */ + return 0; + if (iqs < min_iqs && !btr_no_parent(btrn)) + return 0; + return 1; +} diff --git a/buffer_tree.h b/buffer_tree.h index 785cd272..a95d1671 100644 --- a/buffer_tree.h +++ b/buffer_tree.h @@ -24,3 +24,4 @@ bool btr_eof(struct btr_node *btrn); void btr_log_tree(struct btr_node *btrn, int loglevel); int btr_pushdown_one(struct btr_node *btrn); bool btr_inplace_ok(struct btr_node *btrn); +int btr_node_status(struct btr_node *btrn, size_t min_iqs); diff --git a/compress_filter.c b/compress_filter.c index b17549c3..142ad899 100644 --- a/compress_filter.c +++ b/compress_filter.c @@ -103,11 +103,12 @@ static void compress_post_select(__a_unused struct sched *s, struct task *t) //inplace = false; next_buffer: t->error = 0; - ret = prepare_filter_node(fn); + ret = btr_node_status(btrn, fn->min_iqs); if (ret < 0) goto err; if (ret == 0) return; + btr_merge(btrn, fn->min_iqs); length = btr_next_buffer(btrn, &inbuf) & ~(size_t)1; ip = (int16_t *)inbuf; if (inplace) diff --git a/error.h b/error.h index bf633f54..739eb6c1 100644 --- a/error.h +++ b/error.h @@ -33,11 +33,13 @@ DEFINE_ERRLIST_OBJECT_ENUM; #define SERVER_COMMAND_LIST_ERRORS #define AFS_COMMAND_LIST_ERRORS #define AUDIOD_COMMAND_LIST_ERRORS -#define BUFFER_TREE_ERRORS extern const char **para_errlist[]; +#define BUFFER_TREE_ERRORS \ + PARA_ERROR(BTR_EOF, "buffer tree: end of file"), \ + #define STDOUT_ERRORS \ PARA_ERROR(ORPHAN, "orphaned (EOF)"), \ @@ -107,6 +109,7 @@ extern const char **para_errlist[]; #define AMP_FILTER_ERRORS \ PARA_ERROR(AMP_SYNTAX, "syntax error in amp filter config"), \ PARA_ERROR(AMP_ZERO_AMP, "no amplification necessary"), \ + PARA_ERROR(AMP_EOF, "amp: end of file"), \ #define SEND_COMMON_ERRORS \ diff --git a/file_write.c b/file_write.c index 6ba80194..a9635ca2 100644 --- a/file_write.c +++ b/file_write.c @@ -18,6 +18,7 @@ #include "ggo.h" #include "buffer_tree.h" #include "write.h" +#include "write_common.h" #include "string.h" #include "fd.h" #include "file_write.cmdline.h" @@ -87,6 +88,25 @@ static int file_write_pre_select(struct sched *s, struct writer_node *wn) return 1; } +static void file_write_pre_select_btr(struct sched *s, struct task *t) +{ + struct writer_node *wn = container_of(t, struct writer_node, task); + struct private_file_write_data *pfwd = wn->private_data; + int ret; + + t->error = 0; + pfwd->check_fd = 0; + ret = btr_node_status(wn->btrn, wn->min_iqs); + if (ret >= 0) { + para_fd_set(pfwd->fd, &s->wfds, &s->max_fileno); + pfwd->check_fd = 1; + } + if (ret != 0) { + s->timeout.tv_sec = 0; + s->timeout.tv_usec = 1; + } +} + static int file_write_post_select(struct sched *s, struct writer_node *wn) { struct private_file_write_data *pfwd = wn->private_data; @@ -111,10 +131,45 @@ static int file_write_post_select(struct sched *s, struct writer_node *wn) static void file_write_close(struct writer_node *wn) { struct private_file_write_data *pfwd = wn->private_data; + close(pfwd->fd); + file_cmdline_parser_free(wn->conf); free(pfwd); } +static void file_write_post_select_btr(__a_unused struct sched *s, + struct task *t) +{ + struct writer_node *wn = container_of(t, struct writer_node, task); + struct private_file_write_data *pfwd = wn->private_data; + struct btr_node *btrn = wn->btrn; + int ret; + char *buf; + size_t bytes; + + t->error = 0; + ret = btr_node_status(btrn, wn->min_iqs); + if (ret == 0) + return; + if (ret < 0) + goto err; + if (!pfwd->check_fd) + return; + if (!FD_ISSET(pfwd->fd, &s->wfds)) + return; + bytes = btr_next_buffer(btrn, &buf); + assert(bytes > 0); + //PARA_INFO_LOG("writing %zu\n", bytes); + ret = write(pfwd->fd, buf, bytes); + if (ret < 0) + goto err; + btr_consume(btrn, ret); + return; +err: + assert(ret < 0); + t->error = ret; +} + __malloc static void *file_write_parse_config(const char *options) { struct file_write_args_info *conf @@ -136,7 +191,9 @@ void file_write_init(struct writer *w) file_cmdline_parser_init(&dummy); w->open = file_write_open; w->pre_select = file_write_pre_select; + w->pre_select_btr = file_write_pre_select_btr; w->post_select = file_write_post_select; + w->post_select_btr = file_write_post_select_btr; w->parse_config = file_write_parse_config; w->close = file_write_close; w->shutdown = NULL; /* nothing to do */ diff --git a/filter.h b/filter.h index b6901813..ae0a11e3 100644 --- a/filter.h +++ b/filter.h @@ -205,7 +205,6 @@ void filter_init(void); int check_filter_arg(char *filter_arg, void **conf); void filter_post_select(__a_unused struct sched *s, struct task *t); void print_filter_helps(int detailed); -int prepare_filter_node(struct filter_node *fn); void generic_filter_pre_select(struct sched *s, struct task *t); static inline void write_int16_host_endian(char *buf, int val) diff --git a/filter_common.c b/filter_common.c index a27c2d30..acd84d5c 100644 --- a/filter_common.c +++ b/filter_common.c @@ -276,38 +276,14 @@ void print_filter_helps(int detailed) } -/** 640K ought to be enough for everybody ;) */ -#define FILTER_MAX_PENDING (640 * 1024) - -int prepare_filter_node(struct filter_node *fn) -{ - struct btr_node *btrn = fn->btrn; - size_t iqs; - - if (btr_eof(btrn)) - return -E_FC_EOF; - if (btr_bytes_pending(btrn) > FILTER_MAX_PENDING) - return 0; - iqs = btr_get_input_queue_size(btrn); - if (iqs < fn->min_iqs && !btr_no_parent(btrn)) - return 0; - assert(iqs != 0); - /* avoid "buffer too small" errors from the decoder */ - btr_merge(btrn, fn->min_iqs); - return 1; -} - void generic_filter_pre_select(struct sched *s, struct task *t) { struct filter_node *fn = container_of(t, struct filter_node, task); - size_t iqs = btr_get_input_queue_size(fn->btrn); t->error = 0; - if (iqs < fn->min_iqs) - return; - if (btr_bytes_pending(fn->btrn) > FILTER_MAX_PENDING) - return; /* FIXME, should use reasonable bound on timeout */ - s->timeout.tv_sec = 0; - s->timeout.tv_usec = 1; + if (btr_node_status(fn->btrn, fn->min_iqs) != 0) { + s->timeout.tv_sec = 0; + s->timeout.tv_usec = 1; + } } diff --git a/mp3dec_filter.c b/mp3dec_filter.c index 1e074a7e..a6a628ec 100644 --- a/mp3dec_filter.c +++ b/mp3dec_filter.c @@ -198,11 +198,12 @@ next_buffer: iqs = btr_get_input_queue_size(btrn); if (need_bad_data_delay(pmd, iqs)) return; - ret = prepare_filter_node(fn); + ret = btr_node_status(btrn, fn->min_iqs); if (ret < 0) goto err; if (ret == 0) return; + btr_merge(btrn, fn->min_iqs); len = btr_next_buffer(btrn, &inbuffer); mad_stream_buffer(&pmd->stream, (unsigned char *)inbuffer, len); next_frame: diff --git a/oggdec_filter.c b/oggdec_filter.c index ccc42559..34a04dbc 100644 --- a/oggdec_filter.c +++ b/oggdec_filter.c @@ -191,11 +191,12 @@ static void ogg_post_select(__a_unused struct sched *s, struct task *t) char *in; t->error = 0; - ret = prepare_filter_node(fn); + ret = btr_node_status(btrn, fn->min_iqs); if (ret < 0) goto err; if (ret == 0) return; + btr_merge(btrn, fn->min_iqs); len = btr_next_buffer(btrn, &in); iqs = btr_get_input_queue_size(btrn); if (!pod->vf) { diff --git a/wmadec_filter.c b/wmadec_filter.c index 214ceb1b..d214b421 100644 --- a/wmadec_filter.c +++ b/wmadec_filter.c @@ -1238,11 +1238,12 @@ static void wmadec_post_select(__a_unused struct sched *s, struct task *t) next_buffer: t->error = 0; - ret = prepare_filter_node(fn); + ret = btr_node_status(btrn, fn->min_iqs); if (ret < 0) goto err; if (ret == 0) return; + btr_merge(btrn, fn->min_iqs); len = btr_next_buffer(btrn, (char **)&in); ret = -E_WMADEC_EOF; if (len < fn->min_iqs) diff --git a/write.c b/write.c index c3848946..9577f040 100644 --- a/write.c +++ b/write.c @@ -176,22 +176,19 @@ static void check_wav_post_select_btr(__a_unused struct sched *s, struct task *t PARA_NOTICE_LOG("wav header not found\n"); cwt->state = CWS_NO_HEADER; sprintf(t->status, "check wav: no header"); - goto consume; + goto out; } PARA_INFO_LOG("found wav header\n"); cwt->state = CWS_HAVE_HEADER; sprintf(t->status, "check wav: have header"); cwt->channels = (unsigned) a[22]; cwt->samplerate = a[24] + (a[25] << 8) + (a[26] << 16) + (a[27] << 24); -consume: PARA_INFO_LOG("channels: %d, sample rate: %d\n", cwt->channels, cwt->samplerate); btr_consume(cwt->btrn, WAV_HEADER_LEN); out: - if (sz) { + if (sz) btr_pushdown(cwt->btrn); - s->timeout.tv_sec = 0; - s->timeout.tv_usec = 1; - } else { + else { if (btr_no_parent(cwt->btrn)) t->error = -E_WRITE_EOF; } @@ -308,6 +305,7 @@ static int main_btr(struct sched *s) wns[0] = setup_writer_node(NULL, cwt->btrn); if (!wns[0]) goto out; + i = 1; } else { wns = para_malloc(conf.writer_given * sizeof(*wns)); for (i = 0; i < conf.writer_given; i++) { @@ -326,8 +324,10 @@ out: for (i--; i >= 0; i--) { struct writer_node *wn = wns[i]; struct writer *w = writers + wn->writer_num; + w->close(wn); - free(wn->conf); /* FIXME should call gengetopt cleanup funtion */ + btr_del_node(wn->btrn); + free(wn->conf); free(wn); } free(wns); diff --git a/write_common.c b/write_common.c index ff8999f6..14ce8195 100644 --- a/write_common.c +++ b/write_common.c @@ -257,7 +257,7 @@ struct writer_node *setup_writer_node(const char *arg, struct btr_node *parent) { struct writer_node *wn = para_calloc(sizeof(*wn)); struct writer *w; - const char *name; + char *name; if (arg) wn->conf = check_writer_arg(arg, &wn->writer_num); @@ -270,9 +270,10 @@ struct writer_node *setup_writer_node(const char *arg, struct btr_node *parent) return NULL; } w = writers + wn->writer_num; - name = writer_names[wn->writer_num]; + name = make_message("%s writer", writer_names[wn->writer_num]); wn->btrn = btr_new_node(name, parent, w->execute, wn); - sprintf(wn->task.status, "%s", name); + strcpy(wn->task.status, name); + free(name); w->open(wn); wn->task.post_select = w->post_select_btr; wn->task.pre_select = w->pre_select_btr; @@ -303,20 +304,3 @@ void print_writer_helps(int detailed) ggo_print_help(&w->help, detailed); } } - -int prepare_writer_node(struct writer_node *wn) -{ - struct btr_node *btrn = wn->btrn; - size_t iqs; - - if (btr_eof(btrn)) - return -E_WRITE_COMMON_EOF; - iqs = btr_get_input_queue_size(btrn); - if (iqs < wn->min_iqs && !btr_no_parent(btrn)) - return 0; - assert(iqs != 0); - /* avoid "buffer too small" errors from the decoder */ - btr_merge(btrn, wn->min_iqs); - return 1; -} - diff --git a/write_common.h b/write_common.h index 34e21cee..f34ad3fe 100644 --- a/write_common.h +++ b/write_common.h @@ -14,4 +14,3 @@ void *check_writer_arg(const char *wa, int *writer_num); struct writer_node_group *setup_default_wng(void); void print_writer_helps(int detailed); struct writer_node *setup_writer_node(const char *arg, struct btr_node *parent); -int prepare_writer_node(struct writer_node *wn);