From b0cad48a864fe3f621138e717ff025060c396fad Mon Sep 17 00:00:00 2001 From: Andre Date: Thu, 25 May 2006 15:53:00 +0200 Subject: [PATCH] introduce input_eof and ouput_eof para_filter/para_audiod needs this. For example, it is pointless to convert more audiod data if the writing application is no longer active. --- aacdec.c | 2 +- alsa_writer.c | 4 +-- audiod.c | 6 ++-- filter.c | 18 +++++++----- filter.h | 4 ++- filter_chain.c | 13 ++++++--- oggdec.c | 6 ++-- recv.c | 75 +------------------------------------------------- stdin.c | 1 + stdout.c | 3 +- stdout.h | 3 +- write.c | 6 ++-- write.h | 27 +++++++++--------- write_common.c | 4 ++- 14 files changed, 56 insertions(+), 116 deletions(-) diff --git a/aacdec.c b/aacdec.c index 25facd53..fe046b70 100644 --- a/aacdec.c +++ b/aacdec.c @@ -61,7 +61,7 @@ static ssize_t aacdec(char *input_buffer, size_t len, struct filter_node *fn) if (fn->loaded > fn->bufsize * 4 / 5) return 0; - if (len < 1000 && !*fc->reader_eof) + if (len < 1000 && !*fc->input_eof) return 0; if (!padd->initialized) { diff --git a/alsa_writer.c b/alsa_writer.c index ad65c92f..f05dbbd0 100644 --- a/alsa_writer.c +++ b/alsa_writer.c @@ -140,7 +140,7 @@ static void alsa_write_pre_select(struct sched *s, struct task *t) struct timeval diff; t->ret = 0; - if (*wng->eof && *wng->loaded < pad->bytes_per_frame) + if (*wng->input_eof && *wng->loaded < pad->bytes_per_frame) return; t->ret = 1; if (*wng->loaded < pad->bytes_per_frame) @@ -165,7 +165,7 @@ static void alsa_write_post_select(struct sched *s, struct task *t) t->ret = 0; if (!frames) { - if (*wng->eof) + if (*wng->input_eof) t->ret = *wng->loaded; return; } diff --git a/audiod.c b/audiod.c index 1d2d69c1..db4d478c 100644 --- a/audiod.c +++ b/audiod.c @@ -451,7 +451,6 @@ static void kill_stream_writer(int slot_num) s->wpid, audio_formats[s->format], slot_num); kill(s->wpid, SIGTERM); s->wkilled = 1; - s->fci->error = 1; } static void set_restart_barrier(int format, struct timeval *now) @@ -928,16 +927,15 @@ static void close_decoder_if_idle(int slot_num) return; if (!s->fci) return; - if (!rn->eof && !s->fci->error && s->wpid > 0) + if (!rn->eof && !s->fc->eof && s->wpid > 0) return; - if (!s->fci->error && s->wpid > 0) { /* eof */ + if (!s->fci->eof && s->wpid > 0) { /* eof */ if (filter_io(s->fci) > 0) return; if (get_loaded_bytes(slot_num)) return; } if (s->write_fd > 0) { - PARA_INFO_LOG("err: %d\n", s->fci->error); PARA_INFO_LOG("slot %d: closing write fd %d\n", slot_num, s->write_fd); close(s->write_fd); diff --git a/filter.c b/filter.c index 79ae4dcb..cfa2e94f 100644 --- a/filter.c +++ b/filter.c @@ -55,7 +55,7 @@ __printf_2_3 void para_log(int ll, const char* fmt,...) void filter_event_handler(struct task *t) { - PARA_ERROR_LOG("%s\n", PARA_STRERROR(-t->ret)); + PARA_NOTICE_LOG("%s\n", PARA_STRERROR(-t->ret)); unregister_task(t); } @@ -81,7 +81,13 @@ static int init_filter_chain(void) fc->inbuf = sit->buf; fc->in_loaded = &sit->loaded; - fc->reader_eof = &sit->eof; + fc->input_eof = &sit->eof; + fc->eof = 0; + fc->output_eof = &sot->eof; + fc->task.private_data = fc; + fc->task.pre_select = filter_pre_select; + fc->task.event_handler = filter_event_handler; + sprintf(fc->task.status, "filter chain"); for (i = 0; i < conf.filter_given; i++) { char *fa = conf.filter_arg[i]; @@ -99,10 +105,6 @@ static int init_filter_chain(void) } if (list_empty(&fc->filters)) return -E_NO_FILTERS; - fc->task.private_data = fc; - fc->task.pre_select = filter_pre_select; - fc->task.event_handler = filter_event_handler; - sprintf(fc->task.status, "filter chain"); open_filters(); return 1; } @@ -153,15 +155,17 @@ int main(int argc, char *argv[]) goto out; stdout_set_defaults(sot); + PARA_EMERG_LOG("fc->output_eof: %d\n", *fc->output_eof); sot->buf = fc->outbuf; sot->loaded = fc->out_loaded; - sot->eof = &fc->eof; + sot->input_eof = &fc->eof; register_task(&sot->task); register_task(&fc->task); register_task(&sit->task); s.default_timeout.tv_sec = 1; s.default_timeout.tv_usec = 0; + PARA_EMERG_LOG("fc->output_eof: %d\n", *fc->output_eof); ret = sched(&s); out: free(sit->buf); diff --git a/filter.h b/filter.h index 10947b9e..45ade8d3 100644 --- a/filter.h +++ b/filter.h @@ -78,7 +78,9 @@ struct filter_chain { /** non-zero if this filter wont' produce any more output */ int eof; /** pointer to the eof flag of the receiving application */ - int *reader_eof; + int *input_eof; + /** pointer to the eof flag of the writing application */ + int *output_eof; /** the task associated with the filter chain */ struct task task; }; diff --git a/filter_chain.c b/filter_chain.c index 2510e49d..5b87b087 100644 --- a/filter_chain.c +++ b/filter_chain.c @@ -117,6 +117,10 @@ void filter_pre_select(__a_unused struct sched *s, struct task *t) char *ib; size_t *loaded; int conv, conv_total = 0; + + t->ret = -E_FC_EOF; + if (*fc->output_eof) + goto err_out; again: ib = fc->inbuf; loaded = fc->in_loaded; @@ -128,7 +132,7 @@ again: fc, *loaded, fn->filter->name); t->ret = fn->filter->convert(ib, *loaded, fn); if (t->ret < 0) - return; + goto err_out; call_callbacks(fn, ib, t->ret, fn->buf + old_fn_loaded, fn->loaded - old_fn_loaded); *loaded -= t->ret; @@ -143,18 +147,19 @@ again: loaded = &fn->loaded; } conv_total += conv; - PARA_DEBUG_LOG("reader eof: %d, eof: %d out_loaded: %d, conv: %d, conv_total: %d\n", *fc->reader_eof, - fc->eof, *fc->out_loaded, conv, conv_total); + PARA_DEBUG_LOG("eof (in/out/fc): %d/%d/%d out_loaded: %d, conv: %d, conv_total: %d\n", *fc->input_eof, + *fc->output_eof, fc->eof, *fc->out_loaded, conv, conv_total); if (conv) goto again; t->ret = 1; - if (!*fc->reader_eof) + if (!*fc->input_eof) return; if (*fc->out_loaded) return; if (*fc->in_loaded && conv_total) return; t->ret = -E_FC_EOF; +err_out: fc->eof = 1; } diff --git a/oggdec.c b/oggdec.c index 481e42ac..e1c4517e 100644 --- a/oggdec.c +++ b/oggdec.c @@ -57,7 +57,7 @@ static size_t cb_read(void *buf, size_t size, size_t nmemb, void *datasource) // PARA_DEBUG_LOG("pod = %p\n", pod); // PARA_DEBUG_LOG("vorbis requests %d bytes, have %d\n", size * nmemb, have); if (pod->inbuf_len < size) { - if (*fn->fc->reader_eof) + if (*fn->fc->input_eof) return 0; errno = EAGAIN; return -1; @@ -134,7 +134,7 @@ static ssize_t ogg_convert(char *inbuffer, size_t len, struct filter_node *fn) if (!pod->vf) { int ib = 1024 * conf->initial_buffer_arg; /* initial buffer */ - if (len fc->reader_eof) { + if (len fc->input_eof) { PARA_INFO_LOG("initial input buffer %zd/%d, waiting for more data\n", len, ib); return 0; @@ -168,7 +168,7 @@ again: if (ret < 0) return -E_OGGDEC_BADLINK; fn->loaded += ret; - if (!*fn->fc->reader_eof && fn->loaded < fn->bufsize) + if (!*fn->fc->input_eof && fn->loaded < fn->bufsize) goto again; return pod->converted; } diff --git a/recv.c b/recv.c index f872fec8..cf178d20 100644 --- a/recv.c +++ b/recv.c @@ -58,79 +58,6 @@ static void *parse_config(int argc, char *argv[], int *receiver_num) return check_receiver_arg(conf.receiver_arg, receiver_num); } -#if 0 -int main(int argc, char *argv[]) -{ - int ret, eof = 0, max, r_opened = 0, receiver_num; - struct timeval timeout; - struct receiver *r = NULL; - fd_set rfds, wfds; - struct receiver_node rn; - - memset(&rn, 0, sizeof(struct receiver_node)); - for (ret = 0; receivers[ret].name; ret++) - receivers[ret].init(&receivers[ret]); - ret = -E_RECV_SYNTAX; - rn.conf = parse_config(argc, argv, &receiver_num); - if (!rn.conf) { - PARA_EMERG_LOG("%s", "parse failed\n"); - goto out; - } - r = &receivers[receiver_num]; - rn.receiver = r; - ret = r->open(&rn); - if (ret < 0) - goto out; - r_opened = 1; -recv: - FD_ZERO(&rfds); - FD_ZERO(&wfds); - timeout.tv_sec = 0; - timeout.tv_usec = 999 * 1000; - max = -1; - ret = r->pre_select(&rn, &rfds, &wfds, &timeout); - max = PARA_MAX(max, ret); - - PARA_DEBUG_LOG("timeout: %lums, max: %d\n", tv2ms(&timeout), max); - ret = para_select(max + 1, &rfds, &wfds, &timeout); - if (ret < 0) { - ret = -E_RECV_SELECT; - goto out; - } - ret = r->post_select(&rn, ret, &rfds, &wfds); - if (ret < 0) - goto out; - if (!ret) - eof = 1; - if (!rn.loaded) { - if (eof) - goto out; - goto recv; - } - ret = write(STDOUT_FILENO, rn.buf, rn.loaded); - PARA_DEBUG_LOG("wrote %d/%zd\n", ret, rn.loaded); - if (ret < 0) { - ret = -E_WRITE_STDOUT; - goto out; - } - if (ret != rn.loaded) { - PARA_INFO_LOG("short write %d/%zd\n", ret, rn.loaded); - memmove(rn.buf, rn.buf + ret, rn.loaded - ret); - } - rn.loaded -= ret; - if (rn.loaded || !eof) - goto recv; -out: - if (r_opened) - r->close(&rn); - if (r) - r->shutdown(); - if (ret < 0) - PARA_ERROR_LOG("%s\n", PARA_STRERROR(-ret)); - return ret; -} -#endif - void rn_event_handler(struct task *t) { PARA_NOTICE_LOG("%s\n", PARA_STRERROR(-t->ret)); @@ -168,7 +95,7 @@ int main(int argc, char *argv[]) stdout_set_defaults(&sot); sot.buf = rn.buf; sot.loaded = &rn.loaded; - sot.eof = &rn.eof; + sot.input_eof = &rn.eof; register_task(&sot.task); rn.task.private_data = &rn; diff --git a/stdin.c b/stdin.c index ebfa6ccf..22f1dd97 100644 --- a/stdin.c +++ b/stdin.c @@ -46,6 +46,7 @@ void stdin_set_defaults(struct stdin_task *sit) { sit->bufsize = 16 * 1024, sit->loaded = 0, + sit->eof = 0, sit->task.flags = 0, sit->task.pre_select = stdin_pre_select; sit->task.post_select = stdin_post_select; diff --git a/stdout.c b/stdout.c index af598070..1c60669a 100644 --- a/stdout.c +++ b/stdout.c @@ -25,7 +25,7 @@ void stdout_post_select(struct sched *s, struct task *t) t->ret = 1; if (!sot->check_fd) { - if (*sot->eof) + if (*sot->input_eof) t->ret = -E_STDOUT_EOF; return; } @@ -53,5 +53,6 @@ void stdout_set_defaults(struct stdout_task *sot) sot->task.post_select = stdout_post_select; sot->task.event_handler = stdout_default_event_handler; sot->task.flags = 0; + sot->eof = 0; sprintf(sot->task.status, "stdout writer"); } diff --git a/stdout.h b/stdout.h index e467f41d..5e45ace8 100644 --- a/stdout.h +++ b/stdout.h @@ -2,7 +2,8 @@ struct stdout_task { char *buf; size_t *bufsize; size_t *loaded; - int *eof; + int *input_eof; + int eof; struct task task; int check_fd; }; diff --git a/write.c b/write.c index 13b15a1d..a3cd4b5d 100644 --- a/write.c +++ b/write.c @@ -150,9 +150,8 @@ static struct writer_node_group *check_args(void) } ret = 1; out: - if (ret > 0) { + if (ret > 0) return wng; - } free(wng); return NULL; } @@ -165,8 +164,7 @@ static void idt_event_handler(struct task *t) unregister_task(t); wng->buf = sit.buf; wng->loaded = &sit.loaded; - wng->eof = &sit.eof; - sprintf(wng->task.status, "%s", "writer node group"); + wng->input_eof = &sit.eof; ret = wng_open(wng); if (ret < 0) { PARA_ERROR_LOG("%s\n", PARA_STRERROR(-ret)); diff --git a/write.h b/write.h index e6e0a762..04ad09d5 100644 --- a/write.h +++ b/write.h @@ -85,19 +85,20 @@ void (*shutdown)(struct writer_node *); * describes a set of writer nodes that all write the same stream. */ struct writer_node_group { -/** number of nodes belonging to this group */ -unsigned num_writers; -/** array of pointers to the corresponding writer nodes */ -struct writer_node *writer_nodes; -/** keeps track of how many bytes have been written by each node */ -int *written; -/** the maximum of the chunk_bytes values of the writer nodes in this group */ -size_t max_chunk_bytes; -/** non-zero if end of file was encountered */ -int *eof; -char *buf; -size_t *loaded; -struct task task; + /** number of nodes belonging to this group */ + unsigned num_writers; + /** array of pointers to the corresponding writer nodes */ + struct writer_node *writer_nodes; + /** keeps track of how many bytes have been written by each node */ + int *written; + /** the maximum of the chunk_bytes values of the writer nodes in this group */ + size_t max_chunk_bytes; + /** non-zero if end of file was encountered */ + int *input_eof; + int eof; + char *buf; + size_t *loaded; + struct task task; }; /** loop over each writer node in a writer group */ diff --git a/write_common.c b/write_common.c index 0b2772ae..6c0faa7c 100644 --- a/write_common.c +++ b/write_common.c @@ -45,7 +45,7 @@ static void wng_post_select(struct sched *s, struct task *t) min_written = PARA_MIN(min_written, t->ret); } *g->loaded -= min_written; - if (!*g->loaded && *g->eof) + if (!*g->loaded && *g->input_eof) t->ret = -E_WNG_EOF; else t->ret = 1; @@ -72,6 +72,8 @@ int wng_open(struct writer_node_group *g) wn->task.private_data = wn; register_task(&wn->task); } + sprintf(g->task.status, "%s", "writer node group"); + g->eof = 0; register_task(&g->task); out: return ret; -- 2.39.5