This was again straight forward.
#include "para.h"
#include "list.h"
+#include "sched.h"
#include "filter.h"
#include "error.h"
#include "string.h"
if (fn->loaded > fn->bufsize * 4 / 5)
return 0;
- if (len < 1000 && !*fc->eof)
+ if (len < 1000 && !*fc->reader_eof)
return 0;
if (!padd->initialized) {
#include "para.h"
#include "compress_filter.cmdline.h"
#include "list.h"
+#include "sched.h"
#include "filter.h"
#include "string.h"
recv_ldflags=""
filter_cmdline_objs="filter.cmdline compress_filter.cmdline"
-filter_errlist_objs="filter_chain wav compress filter string"
+filter_errlist_objs="filter_chain wav compress filter string stdin stdout sched fd"
filter_ldflags=""
audiod_cmdline_objs="audiod.cmdline grab_client.cmdline compress_filter.cmdline
#define FILTER_CHAIN_ERRORS \
PARA_ERROR(UNSUPPORTED_FILTER, "given filter not supported"), \
PARA_ERROR(BAD_FILTER_OPTIONS, "invalid filter option given"), \
+ PARA_ERROR(FC_EOF, "filter chain: eof"), \
#define STAT_ERRORS \
#include "filter.cmdline.h"
#include "list.h"
+#include "sched.h"
#include "filter.h"
-#include "error.h"
#include "string.h"
+#include "stdin.h"
+#include "stdout.h"
+#include "error.h"
INIT_FILTER_ERRLISTS;
#define INBUF_SIZE 32 * 1024
+static struct stdin_task stdin_task_struct;
+static struct stdin_task *sit = &stdin_task_struct;
static struct filter_chain filter_chain_struct;
static struct filter_chain *fc = &filter_chain_struct;
+static struct stdout_task stdout_task_struct;
+static struct stdout_task *sot = &stdout_task_struct;
struct gengetopt_args_info conf;
va_end(argp);
}
-static char *inbuf;
-static size_t loaded;
-static int eof;
+void filter_event_handler(struct task *t)
+{
+ PARA_ERROR_LOG("%s\n", PARA_STRERROR(-t->ret));
+ unregister_task(t);
+}
-static int init_active_filter_list(void)
+static void open_filters(void)
+{
+ struct filter_node *fn;
+
+ list_for_each_entry(fn, &fc->filters, node) {
+ fn->filter->open(fn);
+ PARA_INFO_LOG("opened %s filter\n", fn->filter->name);
+ fc->outbuf = fn->buf;
+ fc->out_loaded = &fn->loaded;
+ }
+}
+
+
+static int init_filter_chain(void)
{
int i, filter_num;
struct filter_node *fn;
INIT_LIST_HEAD(&fc->filters);
- fc->inbuf = inbuf;
- fc->in_loaded = &loaded;
- fc->eof = &eof;
+ fc->inbuf = sit->buf;
+ fc->in_loaded = &sit->loaded;
+ fc->reader_eof = &sit->eof;
for (i = 0; i < conf.filter_given; i++) {
- char *fa = para_strdup(conf.filter_arg[i]);
+ char *fa = conf.filter_arg[i];
fn = para_calloc(sizeof(struct filter_node));
filter_num = check_filter_arg(fa, &fn->conf);
if (filter_num < 0) {
}
if (list_empty(&fc->filters))
return -E_NO_FILTERS;
+ fc->task.private_data = fc;
+ fc->task.pre_select = filter_pre_select;
+ fc->task.event_handler = filter_event_handler;
+ sprintf(fc->task.status, "filter chain");
+ open_filters();
return 1;
}
-static void open_filters(void)
-{
- struct filter_node *fn;
-
- list_for_each_entry(fn, &fc->filters, node) {
- fn->filter->open(fn);
- PARA_INFO_LOG("opened %s filter\n", fn->filter->name);
- fc->outbuf = fn->buf;
- fc->out_loaded = &fn->loaded;
- }
-}
-
static int parse_config(int argc, char *argv[])
{
static char *cf; /* config file */
int main(int argc, char *argv[])
{
- int converted, ret;
- char *ib, *ob; /* input/output buffer */
- size_t *il, *ol; /* number of loaded bytes in input/output buffer */
+ int ret;
+ struct sched s;
+
+ init_sched();
+ stdin_set_defaults(sit);
+ sit->buf = para_malloc(sit->bufsize),
filter_init(filters);
ret = parse_config(argc, argv);
if (ret < 0)
goto out;
- inbuf = para_malloc(INBUF_SIZE);
- ret = init_active_filter_list();
+ ret = init_filter_chain();
if (ret < 0)
goto out;
- open_filters();
- ib = fc->inbuf;
- ob = fc->outbuf;
- il = fc->in_loaded;
- ol = fc->out_loaded;
- PARA_DEBUG_LOG("ib %p in, ob: %p\n", ib, ob);
-again:
- if (*il < INBUF_SIZE && !eof) {
- ret = read(STDIN_FILENO, ib + *il, INBUF_SIZE - *il);
- PARA_DEBUG_LOG("read %d/%zd\n", ret, INBUF_SIZE - *il);
- if (ret < 0)
- goto out;
- if (!ret)
- eof = 1;
- *il += ret;
- }
- ret = filter_io(fc);
- if (ret < 0)
- goto out;
- converted = ret;
- if (*ol) {
- ret = write(STDOUT_FILENO, ob, *ol);
- PARA_DEBUG_LOG("wrote %d/%zd\n", ret, *ol);
- if (ret <= 0)
- goto out;
- *ol -= ret;
- if (*ol) {
- PARA_NOTICE_LOG("short write: %zd bytes left\n", *ol);
- memmove(ob, ob + ret, *ol);
- }
- }
- if (!eof || converted)
- goto again;
- ret = 0;
+
+ stdout_set_defaults(sot);
+ sot->buf = fc->outbuf;
+ sot->loaded = fc->out_loaded;
+ sot->eof = &fc->eof;
+
+ register_task(&sot->task);
+ register_task(&fc->task);
+ register_task(&sit->task);
+ s.default_timeout.tv_sec = 1;
+ s.default_timeout.tv_usec = 0;
+ ret = sched(&s);
out:
+ free(sit->buf);
if (ret < 0)
PARA_EMERG_LOG("%s\n", PARA_STRERROR(-ret));
close_filters(fc);
- return ret;
+ return ret < 0? EXIT_FAILURE : EXIT_SUCCESS;
}
* pointer to variable containing the number of bytes loaded in the output buffer
*/
size_t *out_loaded;
- /**
- *
- *
- * non-zero if end of file was encountered
- */
- int *eof;
- /**
- *
- *
- * non-zero if an error occured
- */
- int error;
+ /** non-zero if this filter wont' produce any more output */
+ int eof;
+ /** pointer to the eof flag of the receiving application */
+ int *reader_eof;
+ /** the task associated with the filter chain */
+ struct task task;
};
/**
void filter_init(struct filter *all_filters);
int check_filter_arg(char *filter_arg, void **conf);
int del_filter_callback(struct filter_callback *fcb);
+void filter_pre_select(struct sched *s, struct task *t);
/**
* the structure associated with a paraslash filter
#include "para.h"
#include "list.h"
+#include "sched.h"
+#include "fd.h"
#include "filter.h"
#include "error.h"
#include "string.h"
/**
* call the convert function of each filter
*
- * \param fc the filter chain containing the list of filter nodes.
- *
* This is the core function of the filter subsystem. It loops over the list of
- * filter nodes determined by \a fc and calls the filter's convert function if
+ * filter nodes determined by \a t and calls the filter's convert function if
* there is input available for the filter node in question. If the convert
* function consumed some or all of its input data, all registered input
* callbacks are called. Similarly, if a convert function produced output, all
* registerd output callbacks get called.
*
- * \return The sum of output bytes produced by the convert functions on success,
- * negative return value on errors.
+ * \return The sum of output bytes produced by the convert functions on
+ * success, negative return value on errors (the return value is stored in
+ * t->ret).
*
* \sa filter_node, filter#convert, filter_callback
*/
-int filter_io(struct filter_chain *fc)
+void filter_pre_select(__a_unused struct sched *s, struct task *t)
{
+ struct filter_chain *fc = t->private_data;
struct filter_node *fn;
char *ib;
size_t *loaded;
loaded = fc->in_loaded;
conv = 0;
list_for_each_entry(fn, &fc->filters, node) {
- int ret;
if (*loaded && fn->loaded < fn->bufsize) {
size_t old_fn_loaded = fn->loaded;
PARA_DEBUG_LOG("fc %p loaded: %zd, calling %s convert\n",
fc, *loaded, fn->filter->name);
- ret = fn->filter->convert(ib, *loaded, fn);
- if (ret < 0) {
- if (!fc->error)
- fc->error = -ret;
- return ret;
- }
- call_callbacks(fn, ib, ret, fn->buf + old_fn_loaded,
+ t->ret = fn->filter->convert(ib, *loaded, fn);
+ if (t->ret < 0)
+ return;
+ call_callbacks(fn, ib, t->ret, fn->buf + old_fn_loaded,
fn->loaded - old_fn_loaded);
- *loaded -= ret;
- conv += ret;
- if (*loaded && ret) {
+ *loaded -= t->ret;
+ conv += t->ret;
+ if (*loaded && t->ret) {
PARA_DEBUG_LOG("moving %zd bytes in input buffer for %s filter\n",
*loaded, fn->filter->name);
- memmove(ib, ib + ret, *loaded);
+ memmove(ib, ib + t->ret, *loaded);
}
}
ib = fn->buf;
loaded = &fn->loaded;
}
-// PARA_DEBUG_LOG("loaded: %d\n", *loaded);
conv_total += conv;
+ PARA_DEBUG_LOG("reader eof: %d, eof: %d out_loaded: %d, conv: %d, conv_total: %d\n", *fc->reader_eof,
+ fc->eof, *fc->out_loaded, conv, conv_total);
if (conv)
goto again;
- return conv_total;
+ t->ret = 1;
+ if (!*fc->reader_eof)
+ return;
+ if (*fc->out_loaded)
+ return;
+ if (*fc->in_loaded && conv_total)
+ return;
+ t->ret = -E_FC_EOF;
+ fc->eof = 1;
}
/**
/** \file mp3dec.c paraslash's mp3 decoder */
#include "para.h"
-
#include "list.h"
+#include "sched.h"
#include "filter.h"
#include "error.h"
#include <mad.h>
#include "oggdec_filter.cmdline.h"
#include "list.h"
+#include "sched.h"
#include "filter.h"
#include "error.h"
#include "string.h"
size_t ret, have = pod->inbuf_len - pod->converted;
char *p = pod->inbuf + pod->converted;
- if (*fn->fc->eof)
- return 0;
// PARA_DEBUG_LOG("pod = %p\n", pod);
// PARA_DEBUG_LOG("vorbis requests %d bytes, have %d\n", size * nmemb, have);
if (pod->inbuf_len < size) {
+ if (*fn->fc->reader_eof)
+ return 0;
errno = EAGAIN;
return -1;
}
if (!pod->vf) {
int ib = 1024 * conf->initial_buffer_arg; /* initial buffer */
- if (len <ib && !*fn->fc->eof && !fn->fc->error) {
+ if (len <ib && !*fn->fc->reader_eof) {
PARA_INFO_LOG("initial input buffer %zd/%d, waiting for more data\n",
len, ib);
return 0;
if (ret < 0)
return -E_OGGDEC_BADLINK;
fn->loaded += ret;
- if (!*fn->fc->eof && !fn->fc->error && fn->loaded < fn->bufsize)
+ if (!*fn->fc->reader_eof && fn->loaded < fn->bufsize)
goto again;
return pod->converted;
}
void rn_event_handler(struct task *t)
{
- PARA_ERROR_LOG("%s\n", PARA_STRERROR(-t->ret));
- unregister_task(t);
-}
-
-void stdout_event_handler(struct task *t)
-{
- PARA_ERROR_LOG("%s\n", PARA_STRERROR(-t->ret));
+ PARA_NOTICE_LOG("%s\n", PARA_STRERROR(-t->ret));
unregister_task(t);
}
int main(int argc, char *argv[])
{
- int ret, eof = 0, max, r_opened = 0, receiver_num;
- struct timeval timeout;
+ int ret, r_opened = 0, receiver_num;
struct receiver *r = NULL;
- fd_set rfds, wfds;
struct receiver_node rn;
struct stdout_task sot;
struct sched s;
goto out;
r_opened = 1;
- sot.task.private_data = &sot;
- sot.task.pre_select = stdout_pre_select;
- sot.task.post_select = stdout_post_select;
- sot.task.event_handler = stdout_event_handler;
- sot.task.flags = 0;
- sprintf(sot.task.status, "stdout writer");
+ stdout_set_defaults(&sot);
sot.buf = rn.buf;
sot.loaded = &rn.loaded;
sot.eof = &rn.eof;
t->ret = 1; /* success */
}
+static void stdin_default_event_handler(struct task *t)
+{
+ PARA_NOTICE_LOG("%p: %s\n", t, PARA_STRERROR(-t->ret));
+ unregister_task(t);
+}
+
void stdin_post_select(struct sched *s, struct task *t)
{
struct stdin_task *sit = t->private_data;
if (t->ret < 0)
sit->eof = 1;
}
+
+void stdin_set_defaults(struct stdin_task *sit)
+{
+ sit->bufsize = 16 * 1024,
+ sit->loaded = 0,
+ sit->task.flags = 0,
+ sit->task.pre_select = stdin_pre_select;
+ sit->task.post_select = stdin_post_select;
+ sit->task.event_handler = stdin_default_event_handler;
+ sit->task.private_data = sit;
+ sprintf(sit->task.status, "stdin reader");
+}
void stdin_pre_select(struct sched *s, struct task *t);
void stdin_post_select(struct sched *s, struct task *t);
+void stdin_set_defaults(struct stdin_task *sit);
*sot->loaded -= ret;
t->ret = 1;
}
+
+void stdout_default_event_handler(struct task *t)
+{
+ PARA_NOTICE_LOG("%p: %s\n", t, PARA_STRERROR(-t->ret));
+ unregister_task(t);
+}
+
+
+void stdout_set_defaults(struct stdout_task *sot)
+{
+ sot->task.private_data = sot;
+ sot->task.pre_select = stdout_pre_select;
+ sot->task.post_select = stdout_post_select;
+ sot->task.event_handler = stdout_default_event_handler;
+ sot->task.flags = 0;
+ sprintf(sot->task.status, "stdout writer");
+}
void stdout_pre_select(struct sched *s, struct task *t);
void stdout_post_select(struct sched *s, struct task *t);
+void stdout_set_defaults(struct stdout_task *sot);
#include "para.h"
#include "list.h"
+#include "sched.h"
#include "filter.h"
#include "string.h"
register_task(&idt.task);
}
-static void stdin_event_handler(struct task *t)
-{
- unregister_task(t);
- if (t->ret != -E_STDIN_EOF)
- PARA_ERROR_LOG("%s\n", PARA_STRERROR(-t->ret));
- else
- PARA_INFO_LOG("%s\n", PARA_STRERROR(-t->ret));
-}
-
int main(int argc, char *argv[])
{
int ret = -E_WRITE_SYNTAX;
init_supported_writers();
init_sched();
- sit.bufsize = 16 * 1024,
- sit.buf = para_malloc(16 * 1024),
- sit.loaded = 0,
- sit.task.pre_select = stdin_pre_select;
- sit.task.post_select = stdin_post_select;
- sit.task.event_handler = stdin_event_handler;
- sit.task.private_data = &sit;
- sprintf(sit.task.status, "stdin reader");
+ stdin_set_defaults(&sit);
+ sit.buf = para_malloc(sit.bufsize),
register_task(&sit.task);
cwt.task.pre_select = check_wav_pre_select;