s->fc->inbuf = s->receiver_node->buf;
s->fc->in_loaded = &s->receiver_node->loaded;
s->fc->input_eof = &s->receiver_node->eof;
-
s->fc->task.pre_select = filter_pre_select;
s->fc->task.event_handler = filter_event_handler;
s->fc->task.private_data = s->fc;
s->fc->task.flags = 0;
s->fc->eof = 0;
+
+ s->receiver_node->output_eof = &s->fc->eof;
sprintf(s->fc->task.status, "filter chain");
for (i = 0; i < nf; i++) {
struct filter_node *fn = para_calloc(sizeof(struct filter_node));
static void wng_event_handler(struct task *t)
{
- PARA_INFO_LOG("%s\n", PARA_STRERROR(-t->ret));
- unregister_task(t);
+ struct writer_node_group *wng = t->private_data;
+ int i;
+
+ wng_unregister(wng);
+ FOR_EACH_SLOT(i) {
+ struct slot_info *s = &slot[i];
+ if (s->wng != wng)
+ continue;
+// if (s->fc)
+// s->fc->eof = 1;
+// if (s->receiver_node)
+// s->receiver_node->eof = 1;
+ PARA_INFO_LOG("slot %d: %s\n", i, PARA_STRERROR(-t->ret));
+ }
}
static void open_writers(int slot_num)
return;
PARA_INFO_LOG("closing slot %d \n", slot_num);
wng_close(s->wng);
- wng_destroy(s->wng);
close_filters(s->fc);
free(s->fc);
close_receiver(slot_num);
struct receiver_node *rn = t->private_data;
struct private_dccp_recv_data *pdd = rn->private_data;
+ t->ret = -E_DCCP_RECV_EOF;
+ if (rn->output_eof && *rn->output_eof) {
+ rn->eof = 1;
+ return;
+ }
t->ret = 1;
if (!s->select_ret || !pdd || !FD_ISSET(pdd->fd, &s->rfds))
return; /* nothing to do */
*loaded -= t->ret;
conv += t->ret;
if (*loaded && t->ret) {
- PARA_INFO_LOG("moving %zd bytes in input buffer for %s filter\n",
+ PARA_DEBUG_LOG("moving %zd bytes in input "
+ "buffer for %s filter\n",
*loaded, fn->filter->name);
memmove(ib, ib + t->ret, *loaded);
}
loaded = &fn->loaded;
}
conv_total += conv;
-// PARA_INFO_LOG("eof (in/out/fc): %d/%d/%d out_loaded: %d, conv: %d, conv_total: %d\n", *fc->input_eof,
-// *fc->output_eof, fc->eof, *fc->out_loaded, conv, conv_total);
+ PARA_DEBUG_LOG("eof (in/out/fc): %d/%d/%d out_loaded: %d, "
+ "conv: %d, conv_total: %d\n", *fc->input_eof,
+ fc->output_eof? *fc->output_eof : -42,
+ fc->eof, *fc->out_loaded, conv, conv_total);
if (conv)
goto again;
t->ret = 1;
struct receiver_node *rn = t->private_data;
struct private_http_recv_data *phd = rn->private_data;
+ t->ret = -E_HTTP_RECV_EOF;
+ if (rn->output_eof && *rn->output_eof) {
+ rn->eof = 1;
+ return;
+ }
t->ret = 1;
if (!s->select_ret) /* we're not interested in timeouts */
return;
unsigned chunk_time;
// PARA_INFO_LOG("rn: %p, pord: %p, session: %p\n", rn, pord, pord->session);
+ t->ret = -E_ORTP_RECV_EOF;
+ if (rn->output_eof && *rn->output_eof) {
+ rn->eof = 1;
+ return;
+ }
t->ret = 1;
if (pord->start.tv_sec)
if (tv_diff(&s->now, &pord->next_chunk, NULL) < 0)
void *private_data;
/** set to 1 if end of file is reached */
int eof;
+ int *output_eof;
/** pointer to the configuration data for this instance */
void *conf;
/** the task associated with this instance */
list_for_each_entry_safe(t, tmp, &post_select_list, post_select_node) {
t->post_select(s, t);
-// PARA_INFO_LOG("%s \n", t->status);
+// PARA_INFO_LOG("%s: %d\n", t->status, t->ret);
if (t->ret > 0 || !t->event_handler)
continue;
t->event_handler(t);
PARA_INFO_LOG("%s\n", PARA_STRERROR(-t->ret));
unregister_task(t);
wng_close(g);
- wng_destroy(g);
}
FOR_EACH_WRITER_NODE(i, g) {
struct writer_node *wn = &g->writer_nodes[i];
t->ret = wn->task.ret;
- if (t->ret < 0)
+ if (t->ret < 0) {
+ g->eof = 1;
return;
+ }
if (!i)
min_written = t->ret;
else
goto out;
wn->chunk_bytes = ret;
g->max_chunk_bytes = PARA_MAX(g->max_chunk_bytes, ret);
- PARA_DEBUG_LOG("pre_select: %p\n", &wn->writer->pre_select);
- PARA_DEBUG_LOG("post_select: %p\n", &wn->writer->post_select);
wn->task.pre_select = wn->writer->pre_select;
wn->task.post_select = wn->writer->post_select;
wn->task.private_data = wn;
return ret;
}
-void wng_destroy(struct writer_node_group *g)
+void wng_unregister(struct writer_node_group *g)
{
- if (!g)
- return;
- free(g->written);
- free(g->writer_nodes);
- free(g);
+ int i;
+
+ FOR_EACH_WRITER_NODE(i, g) {
+ struct writer_node *wn = &g->writer_nodes[i];
+ unregister_task(&wn->task);
+ }
+ unregister_task(&g->task);
}
void wng_close(struct writer_node_group *g)
PARA_NOTICE_LOG("closing wng with %d writer(s)\n", g->num_writers);
FOR_EACH_WRITER_NODE(i, g) {
struct writer_node *wn = &g->writer_nodes[i];
- unregister_task(&wn->task);
wn->writer->close(wn);
}
+ free(g->written);
+ free(g->writer_nodes);
+ free(g);
}
struct writer_node_group *wng_new(unsigned num_writers)
int wng_open(struct writer_node_group *g);
void wng_close(struct writer_node_group *g);
struct writer_node_group *wng_new(unsigned num_writers);
-void wng_destroy(struct writer_node_group *g);
+void wng_unregister(struct writer_node_group *g);
void init_supported_writers(void);
void *check_writer_arg(char *wa, int *writer_num);
struct writer_node_group *setup_default_wng(void);