if (fn->loaded > fn->bufsize * 4 / 5)
return 0;
- if (len < 1000 && !*fc->reader_eof)
+ if (len < 1000 && !*fc->input_eof)
return 0;
if (!padd->initialized) {
struct timeval diff;
t->ret = 0;
- if (*wng->eof && *wng->loaded < pad->bytes_per_frame)
+ if (*wng->input_eof && *wng->loaded < pad->bytes_per_frame)
return;
t->ret = 1;
if (*wng->loaded < pad->bytes_per_frame)
t->ret = 0;
if (!frames) {
- if (*wng->eof)
+ if (*wng->input_eof)
t->ret = *wng->loaded;
return;
}
s->wpid, audio_formats[s->format], slot_num);
kill(s->wpid, SIGTERM);
s->wkilled = 1;
- s->fci->error = 1;
}
static void set_restart_barrier(int format, struct timeval *now)
return;
if (!s->fci)
return;
- if (!rn->eof && !s->fci->error && s->wpid > 0)
+ if (!rn->eof && !s->fc->eof && s->wpid > 0)
return;
- if (!s->fci->error && s->wpid > 0) { /* eof */
+ if (!s->fci->eof && s->wpid > 0) { /* eof */
if (filter_io(s->fci) > 0)
return;
if (get_loaded_bytes(slot_num))
return;
}
if (s->write_fd > 0) {
- PARA_INFO_LOG("err: %d\n", s->fci->error);
PARA_INFO_LOG("slot %d: closing write fd %d\n", slot_num,
s->write_fd);
close(s->write_fd);
void filter_event_handler(struct task *t)
{
- PARA_ERROR_LOG("%s\n", PARA_STRERROR(-t->ret));
+ PARA_NOTICE_LOG("%s\n", PARA_STRERROR(-t->ret));
unregister_task(t);
}
fc->inbuf = sit->buf;
fc->in_loaded = &sit->loaded;
- fc->reader_eof = &sit->eof;
+ fc->input_eof = &sit->eof;
+ fc->eof = 0;
+ fc->output_eof = &sot->eof;
+ fc->task.private_data = fc;
+ fc->task.pre_select = filter_pre_select;
+ fc->task.event_handler = filter_event_handler;
+ sprintf(fc->task.status, "filter chain");
for (i = 0; i < conf.filter_given; i++) {
char *fa = conf.filter_arg[i];
}
if (list_empty(&fc->filters))
return -E_NO_FILTERS;
- fc->task.private_data = fc;
- fc->task.pre_select = filter_pre_select;
- fc->task.event_handler = filter_event_handler;
- sprintf(fc->task.status, "filter chain");
open_filters();
return 1;
}
goto out;
stdout_set_defaults(sot);
+ PARA_EMERG_LOG("fc->output_eof: %d\n", *fc->output_eof);
sot->buf = fc->outbuf;
sot->loaded = fc->out_loaded;
- sot->eof = &fc->eof;
+ sot->input_eof = &fc->eof;
register_task(&sot->task);
register_task(&fc->task);
register_task(&sit->task);
s.default_timeout.tv_sec = 1;
s.default_timeout.tv_usec = 0;
+ PARA_EMERG_LOG("fc->output_eof: %d\n", *fc->output_eof);
ret = sched(&s);
out:
free(sit->buf);
/** non-zero if this filter wont' produce any more output */
int eof;
/** pointer to the eof flag of the receiving application */
- int *reader_eof;
+ int *input_eof;
+ /** pointer to the eof flag of the writing application */
+ int *output_eof;
/** the task associated with the filter chain */
struct task task;
};
char *ib;
size_t *loaded;
int conv, conv_total = 0;
+
+ t->ret = -E_FC_EOF;
+ if (*fc->output_eof)
+ goto err_out;
again:
ib = fc->inbuf;
loaded = fc->in_loaded;
fc, *loaded, fn->filter->name);
t->ret = fn->filter->convert(ib, *loaded, fn);
if (t->ret < 0)
- return;
+ goto err_out;
call_callbacks(fn, ib, t->ret, fn->buf + old_fn_loaded,
fn->loaded - old_fn_loaded);
*loaded -= t->ret;
loaded = &fn->loaded;
}
conv_total += conv;
- PARA_DEBUG_LOG("reader eof: %d, eof: %d out_loaded: %d, conv: %d, conv_total: %d\n", *fc->reader_eof,
- fc->eof, *fc->out_loaded, conv, conv_total);
+ PARA_DEBUG_LOG("eof (in/out/fc): %d/%d/%d out_loaded: %d, conv: %d, conv_total: %d\n", *fc->input_eof,
+ *fc->output_eof, fc->eof, *fc->out_loaded, conv, conv_total);
if (conv)
goto again;
t->ret = 1;
- if (!*fc->reader_eof)
+ if (!*fc->input_eof)
return;
if (*fc->out_loaded)
return;
if (*fc->in_loaded && conv_total)
return;
t->ret = -E_FC_EOF;
+err_out:
fc->eof = 1;
}
// PARA_DEBUG_LOG("pod = %p\n", pod);
// PARA_DEBUG_LOG("vorbis requests %d bytes, have %d\n", size * nmemb, have);
if (pod->inbuf_len < size) {
- if (*fn->fc->reader_eof)
+ if (*fn->fc->input_eof)
return 0;
errno = EAGAIN;
return -1;
if (!pod->vf) {
int ib = 1024 * conf->initial_buffer_arg; /* initial buffer */
- if (len <ib && !*fn->fc->reader_eof) {
+ if (len <ib && !*fn->fc->input_eof) {
PARA_INFO_LOG("initial input buffer %zd/%d, waiting for more data\n",
len, ib);
return 0;
if (ret < 0)
return -E_OGGDEC_BADLINK;
fn->loaded += ret;
- if (!*fn->fc->reader_eof && fn->loaded < fn->bufsize)
+ if (!*fn->fc->input_eof && fn->loaded < fn->bufsize)
goto again;
return pod->converted;
}
return check_receiver_arg(conf.receiver_arg, receiver_num);
}
-#if 0
-int main(int argc, char *argv[])
-{
- int ret, eof = 0, max, r_opened = 0, receiver_num;
- struct timeval timeout;
- struct receiver *r = NULL;
- fd_set rfds, wfds;
- struct receiver_node rn;
-
- memset(&rn, 0, sizeof(struct receiver_node));
- for (ret = 0; receivers[ret].name; ret++)
- receivers[ret].init(&receivers[ret]);
- ret = -E_RECV_SYNTAX;
- rn.conf = parse_config(argc, argv, &receiver_num);
- if (!rn.conf) {
- PARA_EMERG_LOG("%s", "parse failed\n");
- goto out;
- }
- r = &receivers[receiver_num];
- rn.receiver = r;
- ret = r->open(&rn);
- if (ret < 0)
- goto out;
- r_opened = 1;
-recv:
- FD_ZERO(&rfds);
- FD_ZERO(&wfds);
- timeout.tv_sec = 0;
- timeout.tv_usec = 999 * 1000;
- max = -1;
- ret = r->pre_select(&rn, &rfds, &wfds, &timeout);
- max = PARA_MAX(max, ret);
-
- PARA_DEBUG_LOG("timeout: %lums, max: %d\n", tv2ms(&timeout), max);
- ret = para_select(max + 1, &rfds, &wfds, &timeout);
- if (ret < 0) {
- ret = -E_RECV_SELECT;
- goto out;
- }
- ret = r->post_select(&rn, ret, &rfds, &wfds);
- if (ret < 0)
- goto out;
- if (!ret)
- eof = 1;
- if (!rn.loaded) {
- if (eof)
- goto out;
- goto recv;
- }
- ret = write(STDOUT_FILENO, rn.buf, rn.loaded);
- PARA_DEBUG_LOG("wrote %d/%zd\n", ret, rn.loaded);
- if (ret < 0) {
- ret = -E_WRITE_STDOUT;
- goto out;
- }
- if (ret != rn.loaded) {
- PARA_INFO_LOG("short write %d/%zd\n", ret, rn.loaded);
- memmove(rn.buf, rn.buf + ret, rn.loaded - ret);
- }
- rn.loaded -= ret;
- if (rn.loaded || !eof)
- goto recv;
-out:
- if (r_opened)
- r->close(&rn);
- if (r)
- r->shutdown();
- if (ret < 0)
- PARA_ERROR_LOG("%s\n", PARA_STRERROR(-ret));
- return ret;
-}
-#endif
-
void rn_event_handler(struct task *t)
{
PARA_NOTICE_LOG("%s\n", PARA_STRERROR(-t->ret));
stdout_set_defaults(&sot);
sot.buf = rn.buf;
sot.loaded = &rn.loaded;
- sot.eof = &rn.eof;
+ sot.input_eof = &rn.eof;
register_task(&sot.task);
rn.task.private_data = &rn;
{
sit->bufsize = 16 * 1024,
sit->loaded = 0,
+ sit->eof = 0,
sit->task.flags = 0,
sit->task.pre_select = stdin_pre_select;
sit->task.post_select = stdin_post_select;
t->ret = 1;
if (!sot->check_fd) {
- if (*sot->eof)
+ if (*sot->input_eof)
t->ret = -E_STDOUT_EOF;
return;
}
sot->task.post_select = stdout_post_select;
sot->task.event_handler = stdout_default_event_handler;
sot->task.flags = 0;
+ sot->eof = 0;
sprintf(sot->task.status, "stdout writer");
}
char *buf;
size_t *bufsize;
size_t *loaded;
- int *eof;
+ int *input_eof;
+ int eof;
struct task task;
int check_fd;
};
}
ret = 1;
out:
- if (ret > 0) {
+ if (ret > 0)
return wng;
- }
free(wng);
return NULL;
}
unregister_task(t);
wng->buf = sit.buf;
wng->loaded = &sit.loaded;
- wng->eof = &sit.eof;
- sprintf(wng->task.status, "%s", "writer node group");
+ wng->input_eof = &sit.eof;
ret = wng_open(wng);
if (ret < 0) {
PARA_ERROR_LOG("%s\n", PARA_STRERROR(-ret));
* describes a set of writer nodes that all write the same stream.
*/
struct writer_node_group {
-/** number of nodes belonging to this group */
-unsigned num_writers;
-/** array of pointers to the corresponding writer nodes */
-struct writer_node *writer_nodes;
-/** keeps track of how many bytes have been written by each node */
-int *written;
-/** the maximum of the chunk_bytes values of the writer nodes in this group */
-size_t max_chunk_bytes;
-/** non-zero if end of file was encountered */
-int *eof;
-char *buf;
-size_t *loaded;
-struct task task;
+ /** number of nodes belonging to this group */
+ unsigned num_writers;
+ /** array of pointers to the corresponding writer nodes */
+ struct writer_node *writer_nodes;
+ /** keeps track of how many bytes have been written by each node */
+ int *written;
+ /** the maximum of the chunk_bytes values of the writer nodes in this group */
+ size_t max_chunk_bytes;
+ /** non-zero if end of file was encountered */
+ int *input_eof;
+ int eof;
+ char *buf;
+ size_t *loaded;
+ struct task task;
};
/** loop over each writer node in a writer group */
min_written = PARA_MIN(min_written, t->ret);
}
*g->loaded -= min_written;
- if (!*g->loaded && *g->eof)
+ if (!*g->loaded && *g->input_eof)
t->ret = -E_WNG_EOF;
else
t->ret = 1;
wn->task.private_data = wn;
register_task(&wn->task);
}
+ sprintf(g->task.status, "%s", "writer node group");
+ g->eof = 0;
register_task(&g->task);
out:
return ret;