From: Andre Date: Thu, 25 May 2006 12:49:03 +0000 (+0200) Subject: convert para_filter to the new scheduler X-Git-Tag: v0.2.14~101^2~23 X-Git-Url: http://git.tue.mpg.de/?a=commitdiff_plain;h=40cbbb95e256e63e1e6958e9034311d4904604af;p=paraslash.git convert para_filter to the new scheduler This was again straight forward. --- diff --git a/aacdec.c b/aacdec.c index e07c146e..25facd53 100644 --- a/aacdec.c +++ b/aacdec.c @@ -25,6 +25,7 @@ #include "para.h" #include "list.h" +#include "sched.h" #include "filter.h" #include "error.h" #include "string.h" @@ -60,7 +61,7 @@ static ssize_t aacdec(char *input_buffer, size_t len, struct filter_node *fn) if (fn->loaded > fn->bufsize * 4 / 5) return 0; - if (len < 1000 && !*fc->eof) + if (len < 1000 && !*fc->reader_eof) return 0; if (!padd->initialized) { diff --git a/compress.c b/compress.c index 3e605d0e..936ddc3a 100644 --- a/compress.c +++ b/compress.c @@ -25,6 +25,7 @@ #include "para.h" #include "compress_filter.cmdline.h" #include "list.h" +#include "sched.h" #include "filter.h" #include "string.h" diff --git a/configure.ac b/configure.ac index 17028937..5b3e2b06 100644 --- a/configure.ac +++ b/configure.ac @@ -61,7 +61,7 @@ recv_errlist_objs="http_recv recv_common recv time string net dccp_recv recv_ldflags="" filter_cmdline_objs="filter.cmdline compress_filter.cmdline" -filter_errlist_objs="filter_chain wav compress filter string" +filter_errlist_objs="filter_chain wav compress filter string stdin stdout sched fd" filter_ldflags="" audiod_cmdline_objs="audiod.cmdline grab_client.cmdline compress_filter.cmdline diff --git a/error.h b/error.h index aa5666b3..c4d9e5fa 100644 --- a/error.h +++ b/error.h @@ -154,6 +154,7 @@ extern const char **para_errlist[]; #define FILTER_CHAIN_ERRORS \ PARA_ERROR(UNSUPPORTED_FILTER, "given filter not supported"), \ PARA_ERROR(BAD_FILTER_OPTIONS, "invalid filter option given"), \ + PARA_ERROR(FC_EOF, "filter chain: eof"), \ #define STAT_ERRORS \ diff --git a/filter.c b/filter.c index c84b4823..79ae4dcb 100644 --- a/filter.c +++ b/filter.c @@ -21,16 +21,23 @@ #include "filter.cmdline.h" #include "list.h" +#include "sched.h" #include "filter.h" -#include "error.h" #include "string.h" +#include "stdin.h" +#include "stdout.h" +#include "error.h" INIT_FILTER_ERRLISTS; #define INBUF_SIZE 32 * 1024 +static struct stdin_task stdin_task_struct; +static struct stdin_task *sit = &stdin_task_struct; static struct filter_chain filter_chain_struct; static struct filter_chain *fc = &filter_chain_struct; +static struct stdout_task stdout_task_struct; +static struct stdout_task *sot = &stdout_task_struct; struct gengetopt_args_info conf; @@ -46,23 +53,38 @@ __printf_2_3 void para_log(int ll, const char* fmt,...) va_end(argp); } -static char *inbuf; -static size_t loaded; -static int eof; +void filter_event_handler(struct task *t) +{ + PARA_ERROR_LOG("%s\n", PARA_STRERROR(-t->ret)); + unregister_task(t); +} -static int init_active_filter_list(void) +static void open_filters(void) +{ + struct filter_node *fn; + + list_for_each_entry(fn, &fc->filters, node) { + fn->filter->open(fn); + PARA_INFO_LOG("opened %s filter\n", fn->filter->name); + fc->outbuf = fn->buf; + fc->out_loaded = &fn->loaded; + } +} + + +static int init_filter_chain(void) { int i, filter_num; struct filter_node *fn; INIT_LIST_HEAD(&fc->filters); - fc->inbuf = inbuf; - fc->in_loaded = &loaded; - fc->eof = &eof; + fc->inbuf = sit->buf; + fc->in_loaded = &sit->loaded; + fc->reader_eof = &sit->eof; for (i = 0; i < conf.filter_given; i++) { - char *fa = para_strdup(conf.filter_arg[i]); + char *fa = conf.filter_arg[i]; fn = para_calloc(sizeof(struct filter_node)); filter_num = check_filter_arg(fa, &fn->conf); if (filter_num < 0) { @@ -77,21 +99,14 @@ static int init_active_filter_list(void) } if (list_empty(&fc->filters)) return -E_NO_FILTERS; + fc->task.private_data = fc; + fc->task.pre_select = filter_pre_select; + fc->task.event_handler = filter_event_handler; + sprintf(fc->task.status, "filter chain"); + open_filters(); return 1; } -static void open_filters(void) -{ - struct filter_node *fn; - - list_for_each_entry(fn, &fc->filters, node) { - fn->filter->open(fn); - PARA_INFO_LOG("opened %s filter\n", fn->filter->name); - fc->outbuf = fn->buf; - fc->out_loaded = &fn->loaded; - } -} - static int parse_config(int argc, char *argv[]) { static char *cf; /* config file */ @@ -122,55 +137,36 @@ static int parse_config(int argc, char *argv[]) int main(int argc, char *argv[]) { - int converted, ret; - char *ib, *ob; /* input/output buffer */ - size_t *il, *ol; /* number of loaded bytes in input/output buffer */ + int ret; + struct sched s; + + init_sched(); + stdin_set_defaults(sit); + sit->buf = para_malloc(sit->bufsize), filter_init(filters); ret = parse_config(argc, argv); if (ret < 0) goto out; - inbuf = para_malloc(INBUF_SIZE); - ret = init_active_filter_list(); + ret = init_filter_chain(); if (ret < 0) goto out; - open_filters(); - ib = fc->inbuf; - ob = fc->outbuf; - il = fc->in_loaded; - ol = fc->out_loaded; - PARA_DEBUG_LOG("ib %p in, ob: %p\n", ib, ob); -again: - if (*il < INBUF_SIZE && !eof) { - ret = read(STDIN_FILENO, ib + *il, INBUF_SIZE - *il); - PARA_DEBUG_LOG("read %d/%zd\n", ret, INBUF_SIZE - *il); - if (ret < 0) - goto out; - if (!ret) - eof = 1; - *il += ret; - } - ret = filter_io(fc); - if (ret < 0) - goto out; - converted = ret; - if (*ol) { - ret = write(STDOUT_FILENO, ob, *ol); - PARA_DEBUG_LOG("wrote %d/%zd\n", ret, *ol); - if (ret <= 0) - goto out; - *ol -= ret; - if (*ol) { - PARA_NOTICE_LOG("short write: %zd bytes left\n", *ol); - memmove(ob, ob + ret, *ol); - } - } - if (!eof || converted) - goto again; - ret = 0; + + stdout_set_defaults(sot); + sot->buf = fc->outbuf; + sot->loaded = fc->out_loaded; + sot->eof = &fc->eof; + + register_task(&sot->task); + register_task(&fc->task); + register_task(&sit->task); + s.default_timeout.tv_sec = 1; + s.default_timeout.tv_usec = 0; + ret = sched(&s); out: + free(sit->buf); if (ret < 0) PARA_EMERG_LOG("%s\n", PARA_STRERROR(-ret)); close_filters(fc); - return ret; + return ret < 0? EXIT_FAILURE : EXIT_SUCCESS; } diff --git a/filter.h b/filter.h index a6b39071..10947b9e 100644 --- a/filter.h +++ b/filter.h @@ -75,18 +75,12 @@ struct filter_chain { * pointer to variable containing the number of bytes loaded in the output buffer */ size_t *out_loaded; - /** - * - * - * non-zero if end of file was encountered - */ - int *eof; - /** - * - * - * non-zero if an error occured - */ - int error; + /** non-zero if this filter wont' produce any more output */ + int eof; + /** pointer to the eof flag of the receiving application */ + int *reader_eof; + /** the task associated with the filter chain */ + struct task task; }; /** @@ -217,6 +211,7 @@ int filter_io(struct filter_chain *fc); void filter_init(struct filter *all_filters); int check_filter_arg(char *filter_arg, void **conf); int del_filter_callback(struct filter_callback *fcb); +void filter_pre_select(struct sched *s, struct task *t); /** * the structure associated with a paraslash filter diff --git a/filter_chain.c b/filter_chain.c index d755e434..2510e49d 100644 --- a/filter_chain.c +++ b/filter_chain.c @@ -20,6 +20,8 @@ #include "para.h" #include "list.h" +#include "sched.h" +#include "fd.h" #include "filter.h" #include "error.h" #include "string.h" @@ -95,22 +97,22 @@ static void call_callbacks(struct filter_node *fn, char *inbuf, size_t inlen, /** * call the convert function of each filter * - * \param fc the filter chain containing the list of filter nodes. - * * This is the core function of the filter subsystem. It loops over the list of - * filter nodes determined by \a fc and calls the filter's convert function if + * filter nodes determined by \a t and calls the filter's convert function if * there is input available for the filter node in question. If the convert * function consumed some or all of its input data, all registered input * callbacks are called. Similarly, if a convert function produced output, all * registerd output callbacks get called. * - * \return The sum of output bytes produced by the convert functions on success, - * negative return value on errors. + * \return The sum of output bytes produced by the convert functions on + * success, negative return value on errors (the return value is stored in + * t->ret). * * \sa filter_node, filter#convert, filter_callback */ -int filter_io(struct filter_chain *fc) +void filter_pre_select(__a_unused struct sched *s, struct task *t) { + struct filter_chain *fc = t->private_data; struct filter_node *fn; char *ib; size_t *loaded; @@ -120,35 +122,40 @@ again: loaded = fc->in_loaded; conv = 0; list_for_each_entry(fn, &fc->filters, node) { - int ret; if (*loaded && fn->loaded < fn->bufsize) { size_t old_fn_loaded = fn->loaded; PARA_DEBUG_LOG("fc %p loaded: %zd, calling %s convert\n", fc, *loaded, fn->filter->name); - ret = fn->filter->convert(ib, *loaded, fn); - if (ret < 0) { - if (!fc->error) - fc->error = -ret; - return ret; - } - call_callbacks(fn, ib, ret, fn->buf + old_fn_loaded, + t->ret = fn->filter->convert(ib, *loaded, fn); + if (t->ret < 0) + return; + call_callbacks(fn, ib, t->ret, fn->buf + old_fn_loaded, fn->loaded - old_fn_loaded); - *loaded -= ret; - conv += ret; - if (*loaded && ret) { + *loaded -= t->ret; + conv += t->ret; + if (*loaded && t->ret) { PARA_DEBUG_LOG("moving %zd bytes in input buffer for %s filter\n", *loaded, fn->filter->name); - memmove(ib, ib + ret, *loaded); + memmove(ib, ib + t->ret, *loaded); } } ib = fn->buf; loaded = &fn->loaded; } -// PARA_DEBUG_LOG("loaded: %d\n", *loaded); conv_total += conv; + PARA_DEBUG_LOG("reader eof: %d, eof: %d out_loaded: %d, conv: %d, conv_total: %d\n", *fc->reader_eof, + fc->eof, *fc->out_loaded, conv, conv_total); if (conv) goto again; - return conv_total; + t->ret = 1; + if (!*fc->reader_eof) + return; + if (*fc->out_loaded) + return; + if (*fc->in_loaded && conv_total) + return; + t->ret = -E_FC_EOF; + fc->eof = 1; } /** diff --git a/mp3dec.c b/mp3dec.c index c02432ee..dd7887d5 100644 --- a/mp3dec.c +++ b/mp3dec.c @@ -19,8 +19,8 @@ /** \file mp3dec.c paraslash's mp3 decoder */ #include "para.h" - #include "list.h" +#include "sched.h" #include "filter.h" #include "error.h" #include diff --git a/oggdec.c b/oggdec.c index 637fa965..481e42ac 100644 --- a/oggdec.c +++ b/oggdec.c @@ -22,6 +22,7 @@ #include "oggdec_filter.cmdline.h" #include "list.h" +#include "sched.h" #include "filter.h" #include "error.h" #include "string.h" @@ -53,11 +54,11 @@ static size_t cb_read(void *buf, size_t size, size_t nmemb, void *datasource) size_t ret, have = pod->inbuf_len - pod->converted; char *p = pod->inbuf + pod->converted; - if (*fn->fc->eof) - return 0; // PARA_DEBUG_LOG("pod = %p\n", pod); // PARA_DEBUG_LOG("vorbis requests %d bytes, have %d\n", size * nmemb, have); if (pod->inbuf_len < size) { + if (*fn->fc->reader_eof) + return 0; errno = EAGAIN; return -1; } @@ -133,7 +134,7 @@ static ssize_t ogg_convert(char *inbuffer, size_t len, struct filter_node *fn) if (!pod->vf) { int ib = 1024 * conf->initial_buffer_arg; /* initial buffer */ - if (len fc->eof && !fn->fc->error) { + if (len fc->reader_eof) { PARA_INFO_LOG("initial input buffer %zd/%d, waiting for more data\n", len, ib); return 0; @@ -167,7 +168,7 @@ again: if (ret < 0) return -E_OGGDEC_BADLINK; fn->loaded += ret; - if (!*fn->fc->eof && !fn->fc->error && fn->loaded < fn->bufsize) + if (!*fn->fc->reader_eof && fn->loaded < fn->bufsize) goto again; return pod->converted; } diff --git a/recv.c b/recv.c index 4b84e8b6..f872fec8 100644 --- a/recv.c +++ b/recv.c @@ -133,22 +133,14 @@ out: void rn_event_handler(struct task *t) { - PARA_ERROR_LOG("%s\n", PARA_STRERROR(-t->ret)); - unregister_task(t); -} - -void stdout_event_handler(struct task *t) -{ - PARA_ERROR_LOG("%s\n", PARA_STRERROR(-t->ret)); + PARA_NOTICE_LOG("%s\n", PARA_STRERROR(-t->ret)); unregister_task(t); } int main(int argc, char *argv[]) { - int ret, eof = 0, max, r_opened = 0, receiver_num; - struct timeval timeout; + int ret, r_opened = 0, receiver_num; struct receiver *r = NULL; - fd_set rfds, wfds; struct receiver_node rn; struct stdout_task sot; struct sched s; @@ -173,12 +165,7 @@ int main(int argc, char *argv[]) goto out; r_opened = 1; - sot.task.private_data = &sot; - sot.task.pre_select = stdout_pre_select; - sot.task.post_select = stdout_post_select; - sot.task.event_handler = stdout_event_handler; - sot.task.flags = 0; - sprintf(sot.task.status, "stdout writer"); + stdout_set_defaults(&sot); sot.buf = rn.buf; sot.loaded = &rn.loaded; sot.eof = &rn.eof; diff --git a/stdin.c b/stdin.c index 299c326e..ebfa6ccf 100644 --- a/stdin.c +++ b/stdin.c @@ -14,6 +14,12 @@ void stdin_pre_select(struct sched *s, struct task *t) t->ret = 1; /* success */ } +static void stdin_default_event_handler(struct task *t) +{ + PARA_NOTICE_LOG("%p: %s\n", t, PARA_STRERROR(-t->ret)); + unregister_task(t); +} + void stdin_post_select(struct sched *s, struct task *t) { struct stdin_task *sit = t->private_data; @@ -35,3 +41,15 @@ void stdin_post_select(struct sched *s, struct task *t) if (t->ret < 0) sit->eof = 1; } + +void stdin_set_defaults(struct stdin_task *sit) +{ + sit->bufsize = 16 * 1024, + sit->loaded = 0, + sit->task.flags = 0, + sit->task.pre_select = stdin_pre_select; + sit->task.post_select = stdin_post_select; + sit->task.event_handler = stdin_default_event_handler; + sit->task.private_data = sit; + sprintf(sit->task.status, "stdin reader"); +} diff --git a/stdin.h b/stdin.h index 753cef56..cb6cbfb6 100644 --- a/stdin.h +++ b/stdin.h @@ -8,3 +8,4 @@ struct stdin_task { void stdin_pre_select(struct sched *s, struct task *t); void stdin_post_select(struct sched *s, struct task *t); +void stdin_set_defaults(struct stdin_task *sit); diff --git a/stdout.c b/stdout.c index 83703055..af598070 100644 --- a/stdout.c +++ b/stdout.c @@ -38,3 +38,20 @@ void stdout_post_select(struct sched *s, struct task *t) *sot->loaded -= ret; t->ret = 1; } + +void stdout_default_event_handler(struct task *t) +{ + PARA_NOTICE_LOG("%p: %s\n", t, PARA_STRERROR(-t->ret)); + unregister_task(t); +} + + +void stdout_set_defaults(struct stdout_task *sot) +{ + sot->task.private_data = sot; + sot->task.pre_select = stdout_pre_select; + sot->task.post_select = stdout_post_select; + sot->task.event_handler = stdout_default_event_handler; + sot->task.flags = 0; + sprintf(sot->task.status, "stdout writer"); +} diff --git a/stdout.h b/stdout.h index f02483d1..e467f41d 100644 --- a/stdout.h +++ b/stdout.h @@ -9,3 +9,4 @@ struct stdout_task { void stdout_pre_select(struct sched *s, struct task *t); void stdout_post_select(struct sched *s, struct task *t); +void stdout_set_defaults(struct stdout_task *sot); diff --git a/wav.c b/wav.c index b955695f..0b249715 100644 --- a/wav.c +++ b/wav.c @@ -21,6 +21,7 @@ #include "para.h" #include "list.h" +#include "sched.h" #include "filter.h" #include "string.h" diff --git a/write.c b/write.c index 5bcd1f60..13b15a1d 100644 --- a/write.c +++ b/write.c @@ -193,15 +193,6 @@ static void cwt_event_handler(struct task *t) register_task(&idt.task); } -static void stdin_event_handler(struct task *t) -{ - unregister_task(t); - if (t->ret != -E_STDIN_EOF) - PARA_ERROR_LOG("%s\n", PARA_STRERROR(-t->ret)); - else - PARA_INFO_LOG("%s\n", PARA_STRERROR(-t->ret)); -} - int main(int argc, char *argv[]) { int ret = -E_WRITE_SYNTAX; @@ -214,14 +205,8 @@ int main(int argc, char *argv[]) init_supported_writers(); init_sched(); - sit.bufsize = 16 * 1024, - sit.buf = para_malloc(16 * 1024), - sit.loaded = 0, - sit.task.pre_select = stdin_pre_select; - sit.task.post_select = stdin_post_select; - sit.task.event_handler = stdin_event_handler; - sit.task.private_data = &sit; - sprintf(sit.task.status, "stdin reader"); + stdin_set_defaults(&sit); + sit.buf = para_malloc(sit.bufsize), register_task(&sit.task); cwt.task.pre_select = check_wav_pre_select;