From: Andre Noll <maan@systemlinux.org>
Date: Sat, 15 Dec 2007 16:01:47 +0000 (+0100)
Subject: Replace eof by error in receivers/filters/writers.
X-Git-Tag: v0.3.0~57
X-Git-Url: http://git.tue.mpg.de/?a=commitdiff_plain;h=19d9318abf42debb15d833d4e56ab636893285c3;p=paraslash.git

Replace eof by error in receivers/filters/writers.

This way it's possible to tell at a later time why
the receiver/filter/writer terminated.

This allows to increase the delay for reconnecting in case
the receiver failed to connect to para_server: Let the receivers
set the error value to -E_RECV_EOF in case a normal end of file
event occurred and check this value when calculating the restart
barrier.
---

diff --git a/aacdec.c b/aacdec.c
index 2f2a1ad4..258864a0 100644
--- a/aacdec.c
+++ b/aacdec.c
@@ -61,7 +61,7 @@ static ssize_t aacdec(char *input_buffer, size_t len, struct filter_node *fn)
 
 	if (fn->loaded > fn->bufsize * 3 / 5)
 		return 0;
-	if (len < 2048 && !*fc->input_eof)
+	if (len < 2048 && !*fc->input_error)
 		return 0;
 
 	if (!padd->initialized) {
diff --git a/alsa_write.c b/alsa_write.c
index faadec1f..c856ef4a 100644
--- a/alsa_write.c
+++ b/alsa_write.c
@@ -176,7 +176,7 @@ static int alsa_write_post_select(__a_unused struct sched *s,
 
 //	PARA_INFO_LOG("%zd frames\n", frames);
 	if (!frames) {
-		if (*wng->input_eof)
+		if (*wng->input_error)
 			wn->written = *wng->loaded;
 		return 1;
 	}
diff --git a/audiod.c b/audiod.c
index dfb61db4..f83943a6 100644
--- a/audiod.c
+++ b/audiod.c
@@ -197,34 +197,34 @@ static void close_receiver(int slot_num)
 	if (s->format < 0 || !s->receiver_node)
 		return;
 	a = &afi[s->format];
-	PARA_NOTICE_LOG("closing %s receiver in slot %d (eof = %d)\n",
-		audio_formats[s->format] , slot_num, s->receiver_node->eof);
+	PARA_NOTICE_LOG("closing %s receiver in slot %d\n",
+		audio_formats[s->format], slot_num);
 	a->receiver->close(s->receiver_node);
 	free(s->receiver_node);
 	s->receiver_node = NULL;
 }
 
-static void kill_all_decoders(void)
+static void kill_all_decoders(int error)
 {
 	int i;
 
 	FOR_EACH_SLOT(i) {
 		struct slot_info *s = &slot[i];
-		if (s->wng && !s->wng->eof) {
+		if (s->wng && !s->wng->error) {
 			PARA_INFO_LOG("unregistering writer node group in slot %d\n",
 				i);
 			wng_unregister(s->wng);
-			s->wng->eof = 1;
+			s->wng->error = error;
 		}
-		if (s->fc && !s->fc->eof) {
+		if (s->fc && !s->fc->error) {
 			PARA_INFO_LOG("unregistering filter chain in slot %d\n", i);
 			unregister_task(&s->fc->task);
-			s->fc->eof = 1;
+			s->fc->error = error;
 		}
-		if (s->receiver_node && !s->receiver_node->eof) {
+		if (s->receiver_node && !s->receiver_node->error) {
 			PARA_INFO_LOG("unregistering receiver_node in slot %d\n", i);
 			unregister_task(&s->receiver_node->task);
-			s->receiver_node->eof = 1;
+			s->receiver_node->error = error;
 		}
 	}
 }
@@ -266,7 +266,7 @@ static void filter_event_handler(struct task *t)
 {
 	PARA_NOTICE_LOG("%s\n", PARA_STRERROR(-t->ret));
 	struct filter_chain *fc = t->private_data;
-	fc->eof = 1;
+	fc->error = t->ret;
 	unregister_task(t);
 }
 
@@ -285,13 +285,13 @@ static void open_filters(int slot_num)
 	INIT_LIST_HEAD(&s->fc->filters);
 	s->fc->inbuf = s->receiver_node->buf;
 	s->fc->in_loaded = &s->receiver_node->loaded;
-	s->fc->input_eof = &s->receiver_node->eof;
+	s->fc->input_error = &s->receiver_node->error;
 	s->fc->task.pre_select = filter_pre_select;
 	s->fc->task.event_handler = filter_event_handler;
 	s->fc->task.private_data = s->fc;
-	s->fc->eof = 0;
+	s->fc->error = 0;
 
-	s->receiver_node->output_eof = &s->fc->eof;
+	s->receiver_node->output_error = &s->fc->error;
 	sprintf(s->fc->task.status, "filter chain");
 	for (i = 0; i < nf; i++) {
 		struct filter_node *fn = para_calloc(sizeof(struct filter_node));
@@ -315,7 +315,7 @@ static void wng_event_handler(struct task *t)
 	struct writer_node_group *wng = t->private_data;
 
 	PARA_INFO_LOG("%s\n", PARA_STRERROR(-t->ret));
-	wng->eof = 1;
+	wng->error = t->ret;
 	wng_unregister(wng);
 }
 
@@ -333,15 +333,15 @@ static void open_writers(int slot_num)
 	if (s->fc) {
 		s->wng->buf = s->fc->outbuf;
 		s->wng->loaded = s->fc->out_loaded;
-		s->wng->input_eof = &s->fc->eof;
+		s->wng->input_error = &s->fc->error;
 		s->wng->channels = &s->fc->channels;
 		s->wng->samplerate = &s->fc->samplerate;
-		s->fc->output_eof = &s->wng->eof;
+		s->fc->output_error = &s->wng->error;
 		PARA_INFO_LOG("samplerate: %d\n", *s->wng->samplerate);
 	} else {
 		s->wng->buf = s->receiver_node->buf;
 		s->wng->loaded = &s->receiver_node->loaded;
-		s->wng->input_eof = &s->receiver_node->eof;
+		s->wng->input_error = &s->receiver_node->error;
 	}
 	s->wng->task.event_handler = wng_event_handler;
 	for (i = 0; i < a->num_writers; i++) {
@@ -360,16 +360,19 @@ static void open_writers(int slot_num)
 static void rn_event_handler(struct task *t)
 {
 	struct receiver_node *rn = t->private_data;
-	const struct timeval restart_delay = {0, 10 * 1000};
 	int i;
 
 	PARA_NOTICE_LOG("%s\n", PARA_STRERROR(-t->ret));
 	unregister_task(t);
-	rn->eof = 1;
+	rn->error = t->ret;
 	/* set restart barrier */
 	FOR_EACH_SLOT(i) {
+		struct timeval restart_delay = {0, 10 * 1000};
 		if (slot[i].receiver_node != rn)
 			continue;
+		if (rn->error != -E_RECV_EOF)
+			/* don't reconnect immediately on errors */
+			restart_delay.tv_sec = 5;
 		tv_add(now, &restart_delay, &afi[slot[i].format].restart_barrier);
 	}
 }
@@ -420,7 +423,7 @@ static int receiver_running(int format)
 	FOR_EACH_SLOT(i) {
 		struct slot_info *s = &slot[i];
 		if (s->format == format && s->receiver_node
-				&& !s->receiver_node->eof)
+				&& !s->receiver_node->error)
 			return 1;
 	}
 	return 0;
@@ -558,13 +561,13 @@ static void try_to_close_slot(int slot_num)
 
 	if (s->format < 0)
 		return;
-	if (s->receiver_node && !s->receiver_node->eof)
+	if (s->receiver_node && !s->receiver_node->error)
 		return;
-	if (s->fc && !s->fc->eof)
+	if (s->fc && !s->fc->error)
 		return;
-	if (s->wng && !s->wng->eof)
+	if (s->wng && !s->wng->error)
 		return;
-	PARA_INFO_LOG("closing slot %d \n", slot_num);
+	PARA_INFO_LOG("closing slot %d\n", slot_num);
 	wng_close(s->wng);
 	close_filters(s->fc);
 	free(s->fc);
@@ -585,7 +588,7 @@ static void audiod_pre_select(struct sched *s, __a_unused struct task *t)
 
 	t->ret = 1;
 	if (audiod_status != AUDIOD_ON || !stat_task->playing)
-		return kill_all_decoders();
+		return kill_all_decoders(-E_NOT_PLAYING);
 	if (open_current_receiver(s))
 		s->timeout = min_delay;
 	FOR_EACH_SLOT(i) {
diff --git a/client.c b/client.c
index 1506f656..a37bc7e2 100644
--- a/client.c
+++ b/client.c
@@ -41,13 +41,13 @@ static void client_event_handler(struct task *t)
 		register_task(&sit.task);
 		p->inbuf = sit.buf;
 		p->in_loaded = &sit.loaded;
-		p->in_eof = &sit.eof;
+		p->in_error = &sit.error;
 		return;
 	}
 	stdout_set_defaults(&sot);
 	sot.buf = p->buf;
 	sot.loaded = &p->loaded;
-	sot.input_eof = &p->eof;
+	sot.input_error = &p->eof;
 	register_task(&sot.task);
 }
 
diff --git a/client.h b/client.h
index 06eeda38..199d5c9a 100644
--- a/client.h
+++ b/client.h
@@ -73,8 +73,8 @@ struct private_client_data {
 	char *inbuf;
 	/** number of bytes loaded in \p inbuf */
 	size_t *in_loaded;
-	/** non-zero if input task encountered an eof or an errro condition */
-	int *in_eof;
+	/** Non-zero if input task encountered an eof or an error condition. */
+	int *in_error;
 };
 
 void client_close(struct private_client_data *pcd);
diff --git a/client_common.c b/client_common.c
index 7c31fcdd..3501a5df 100644
--- a/client_common.c
+++ b/client_common.c
@@ -224,8 +224,8 @@ void client_pre_select(struct sched *s, struct task *t)
 			para_fd_set(pcd->fd, &s->wfds, &s->max_fileno);
 			pcd->check_w = 1;
 		} else {
-			if (*pcd->in_eof) {
-				t->ret = -E_INPUT_EOF;
+			if (*pcd->in_error) {
+				t->ret = *pcd->in_error;
 				s->timeout.tv_sec = 0;
 				s->timeout.tv_usec = 1;
 			}
diff --git a/dccp_recv.c b/dccp_recv.c
index 755473c9..eeb7c57a 100644
--- a/dccp_recv.c
+++ b/dccp_recv.c
@@ -99,9 +99,10 @@ static void dccp_recv_post_select(struct sched *s, struct task *t)
 	struct receiver_node *rn = t->private_data;
 	struct private_dccp_recv_data *pdd = rn->private_data;
 
-	t->ret = -E_DCCP_RECV_EOF;
-	if (rn->output_eof && *rn->output_eof)
+	if (rn->output_error && *rn->output_error) {
+		t->ret = *rn->output_error;
 		goto out;
+	}
 	t->ret = 1;
 	if (!s->select_ret || !FD_ISSET(pdd->fd, &s->rfds))
 		goto out; /* nothing to do */
@@ -112,14 +113,14 @@ static void dccp_recv_post_select(struct sched *s, struct task *t)
 		DCCP_BUFSIZE - rn->loaded);
 	if (t->ret <= 0) {
 		if (!t->ret)
-			t->ret = -E_DCCP_RECV_EOF;
+			t->ret = -E_RECV_EOF;
 		goto out;
 	}
 	rn->loaded += t->ret;
 	return;
 out:
 	if (t->ret < 0)
-		rn->eof = 1;
+		rn->error = t->ret;
 }
 
 /**
diff --git a/error.h b/error.h
index 534e7f97..469b85be 100644
--- a/error.h
+++ b/error.h
@@ -24,10 +24,9 @@ DEFINE_ERRLIST_OBJECT_ENUM;
 #define RINGBUFFER_ERRORS
 #define SCORE_ERRORS
 #define SHA1_ERRORS
-#define RECV_ERRORS
 #define AFH_COMMON_ERRORS
 #define RBTREE_ERRORS
-
+#define RECV_ERRORS
 
 extern const char **para_errlist[];
 
@@ -153,7 +152,6 @@ extern const char **para_errlist[];
 	PARA_ERROR(NO_CONFIG, "config file not found"), \
 	PARA_ERROR(CLIENT_AUTH, "authentication failed"), \
 	PARA_ERROR(SERVER_EOF, "connection closed by para_server"), \
-	PARA_ERROR(INPUT_EOF, "end of input"), \
 	PARA_ERROR(HANDSHAKE_COMPLETE, ""), /* not really an error */ \
 
 
@@ -170,7 +168,6 @@ extern const char **para_errlist[];
 
 #define STDOUT_ERRORS \
 	PARA_ERROR(STDOUT_WRITE, "failed to write to stdout"), \
-	PARA_ERROR(STDOUT_EOF, "end of file"), \
 
 
 #define NET_ERRORS \
@@ -194,12 +191,14 @@ extern const char **para_errlist[];
 
 
 #define HTTP_RECV_ERRORS \
-	PARA_ERROR(HTTP_RECV_EOF, "http_recv: end of file"), \
 	PARA_ERROR(HTTP_RECV_OVERRUN, "http_recv: output buffer overrun"), \
 
 
 #define RECV_COMMON_ERRORS \
 	PARA_ERROR(RECV_SYNTAX, "recv syntax error"), \
+	PARA_ERROR(RECV_EOF, "end of file"), \
+	PARA_ERROR(RECV_CONNECT, "connection failed"), \
+
 
 
 #define AUDIOD_ERRORS \
@@ -207,6 +206,7 @@ extern const char **para_errlist[];
 	PARA_ERROR(MISSING_COLON, "syntax error: missing colon"), \
 	PARA_ERROR(UNSUPPORTED_AUDIO_FORMAT, "given audio format not supported"), \
 	PARA_ERROR(SIGNAL_CAUGHT, "caught signal"), \
+	PARA_ERROR(NOT_PLAYING, "not playing"), \
 
 
 #define AUDIOD_COMMAND_ERRORS \
@@ -356,7 +356,6 @@ extern const char **para_errlist[];
 
 #define DCCP_RECV_ERRORS \
 	PARA_ERROR(DCCP_OVERRUN, "dccp output buffer buffer overrun"), \
-	PARA_ERROR(DCCP_RECV_EOF, "dccp_recv: end of file"), \
 
 
 #define DCCP_SEND_ERRORS \
@@ -410,7 +409,6 @@ extern const char **para_errlist[];
 
 #define WRITE_COMMON_ERRORS \
 	PARA_ERROR(WRITE_COMMON_SYNTAX, "syntax error in write option"), \
-	PARA_ERROR(WNG_EOF, "wng: end of file"), \
 
 
 #define AACDEC_ERRORS \
diff --git a/filter.c b/filter.c
index 52a1ce5b..27bbe63e 100644
--- a/filter.c
+++ b/filter.c
@@ -66,9 +66,9 @@ static int init_filter_chain(void)
 
 	fc->inbuf = sit->buf;
 	fc->in_loaded = &sit->loaded;
-	fc->input_eof = &sit->eof;
-	fc->eof = 0;
-	fc->output_eof = &sot->eof;
+	fc->input_error = &sit->error;
+	fc->error = 0;
+	fc->output_error = &sot->error;
 	fc->task.private_data = fc;
 	fc->task.pre_select = filter_pre_select;
 	fc->task.event_handler = filter_event_handler;
@@ -160,7 +160,7 @@ int main(int argc, char *argv[])
 	stdout_set_defaults(sot);
 	sot->buf = fc->outbuf;
 	sot->loaded = fc->out_loaded;
-	sot->input_eof = &fc->eof;
+	sot->input_error = &fc->error;
 
 	register_task(&sit->task);
 	register_task(&fc->task);
diff --git a/filter.h b/filter.h
index 9e197f6c..09e45913 100644
--- a/filter.h
+++ b/filter.h
@@ -63,12 +63,12 @@ struct filter_chain {
 	 * pointer to variable containing the number of bytes loaded in the output buffer
 	 */
 		size_t *out_loaded;
-	/** non-zero if this filter wont' produce any more output */
-	int eof;
-	/** pointer to the eof flag of the receiving application */
-	int *input_eof;
-	/** pointer to the eof flag of the writing application */
-	int *output_eof;
+	/** Non-zero if this filter wont' produce any more output. */
+	int error;
+	/** Pointer to the error variable of the receiving application. */
+	int *input_error;
+	/** Pointer to the eof flag of the writing application. */
+	int *output_error;
 	/** the task associated with the filter chain */
 	struct task task;
 };
diff --git a/filter_chain.c b/filter_chain.c
index 7852220a..295f2d06 100644
--- a/filter_chain.c
+++ b/filter_chain.c
@@ -115,9 +115,10 @@ void filter_pre_select(__a_unused struct sched *s, struct task *t)
 	size_t *loaded;
 	int conv, conv_total = 0;
 
-	t->ret = -E_FC_EOF;
-	if (fc->output_eof && *fc->output_eof)
+	if (fc->output_error && *fc->output_error) {
+		t->ret =  *fc->output_error;
 		goto err_out;
+	}
 again:
 	ib = fc->inbuf;
 	loaded = fc->in_loaded;
@@ -153,7 +154,7 @@ again:
 	if (conv)
 		goto again;
 	t->ret = 1;
-	if (!*fc->input_eof)
+	if (!*fc->input_error)
 		return;
 	if (*fc->out_loaded)
 		return;
@@ -161,7 +162,7 @@ again:
 		return;
 	t->ret = -E_FC_EOF;
 err_out:
-	fc->eof = 1;
+	fc->error = t->ret;
 }
 
 /**
diff --git a/http_recv.c b/http_recv.c
index 4283db4b..8c19d6d7 100644
--- a/http_recv.c
+++ b/http_recv.c
@@ -100,9 +100,10 @@ static void http_recv_post_select(struct sched *s, struct task *t)
 	struct receiver_node *rn = t->private_data;
 	struct private_http_recv_data *phd = rn->private_data;
 
-	t->ret = -E_HTTP_RECV_EOF;
-	if (rn->output_eof && *rn->output_eof)
+	if (rn->output_error && *rn->output_error) {
+		t->ret = *rn->output_error;
 		goto out;
+	}
 	t->ret = 1;
 	if (!s->select_ret)
 		goto out;
@@ -136,13 +137,13 @@ static void http_recv_post_select(struct sched *s, struct task *t)
 		BUFSIZE - rn->loaded);
 	if (t->ret <= 0) {
 		if (!t->ret)
-			t->ret = -E_HTTP_RECV_EOF;
+			t->ret = -E_RECV_EOF;
 		goto out;
 	}
 	rn->loaded += t->ret;
 out:
 	if (t->ret < 0)
-		rn->eof = 1;
+		rn->error = t->ret;
 }
 
 static void http_recv_close(struct receiver_node *rn)
diff --git a/oggdec.c b/oggdec.c
index 3dced8a0..4c8aae5f 100644
--- a/oggdec.c
+++ b/oggdec.c
@@ -46,7 +46,7 @@ static size_t cb_read(void *buf, size_t size, size_t nmemb, void *datasource)
 //	PARA_DEBUG_LOG("pod = %p\n", pod);
 //	PARA_DEBUG_LOG("vorbis requests %d bytes, have %d\n", size * nmemb, have);
 	if (pod->inbuf_len < size) {
-		if (*fn->fc->input_eof)
+		if (*fn->fc->input_error)
 			return 0;
 		errno = EAGAIN;
 		return (size_t)-1;
@@ -123,7 +123,7 @@ static ssize_t ogg_convert(char *inbuffer, size_t len, struct filter_node *fn)
 
 	if (!pod->vf) {
 		int ib = 1024 * conf->initial_buffer_arg; /* initial buffer */
-		if (len <ib && !*fn->fc->input_eof) {
+		if (len <ib && !*fn->fc->input_error) {
 			PARA_DEBUG_LOG("initial input buffer %zd/%d, "
 				"waiting for more data\n", len, ib);
 			return 0;
@@ -149,7 +149,7 @@ static ssize_t ogg_convert(char *inbuffer, size_t len, struct filter_node *fn)
 		PARA_NOTICE_LOG("%d channels, %d Hz\n", fn->fc->channels,
 			fn->fc->samplerate);
 	}
-	while (!*fn->fc->input_eof && fn->loaded < fn->bufsize) {
+	while (!*fn->fc->input_error && fn->loaded < fn->bufsize) {
 		int length = fn->bufsize - fn->loaded;
 		long read_ret = ov_read(pod->vf, fn->buf + fn->loaded, length,
 			ENDIAN, 2 /* 16 bit */, 1 /* signed */, NULL);
diff --git a/ortp_recv.c b/ortp_recv.c
index fca470ed..b28d084a 100644
--- a/ortp_recv.c
+++ b/ortp_recv.c
@@ -123,9 +123,9 @@ static void ortp_recv_post_select(__a_unused struct sched *s, struct task *t)
 	size_t packet_size;
 
 //	PARA_INFO_LOG("rn: %p, pord: %p, session: %p\n", rn, pord, pord->session);
-	t->ret = -E_ORTP_RECV_EOF;
-	if (rn->output_eof && *rn->output_eof) {
-		rn->eof = 1;
+	if (rn->output_error && *rn->output_error) {
+		rn->error = *rn->output_error;
+		t->ret = rn->error;
 		return;
 	}
 	t->ret = 1;
@@ -150,11 +150,11 @@ static void ortp_recv_post_select(__a_unused struct sched *s, struct task *t)
 		pord->start = *now;
 	t->ret = msg_to_buf(mp, tmpbuf, CHUNK_SIZE);
 	if (t->ret < ORTP_AUDIO_HEADER_LEN) {
-		rn->eof = 1;
 		if (t->ret < 0)
 			t->ret = -E_MSG_TO_BUF;
 		else
 			t->ret = -E_ORTP_RECV_EOF;
+		rn->error = t->ret;
 		goto err_out;
 	}
 	packet_size = t->ret;
@@ -168,8 +168,8 @@ static void ortp_recv_post_select(__a_unused struct sched *s, struct task *t)
 	switch (packet_type) {
 	unsigned header_len, payload_len;
 	case ORTP_EOF:
-		rn->eof = 1;
-		t->ret = -E_ORTP_RECV_EOF;
+		t->ret = -E_RECV_EOF;
+		rn->error = t->ret;
 		goto err_out;
 	case ORTP_BOF:
 		PARA_INFO_LOG("bof (%zu)\n", packet_size);
@@ -222,7 +222,7 @@ success:
 	compute_next_chunk(chunk_time, pord);
 	return;
 err_out:
-	rn->eof = 1;
+	rn->error = t->ret;
 	freemsg(mp);
 }
 
diff --git a/recv.c b/recv.c
index 0fe46411..d1909f25 100644
--- a/recv.c
+++ b/recv.c
@@ -49,7 +49,7 @@ static void rn_event_handler(struct task *t)
 {
 	struct receiver_node *rn = t->private_data;
 	PARA_NOTICE_LOG("%s\n", PARA_STRERROR(-t->ret));
-	rn->eof = 1;
+	rn->error = t->ret;
 	unregister_task(t);
 }
 
@@ -94,7 +94,7 @@ int main(int argc, char *argv[])
 	stdout_set_defaults(&sot);
 	sot.buf = rn.buf;
 	sot.loaded = &rn.loaded;
-	sot.input_eof = &rn.eof;
+	sot.input_error = &rn.error;
 	register_task(&sot.task);
 
 	rn.task.private_data = &rn;
diff --git a/recv.h b/recv.h
index e1ef4e21..6d318d6a 100644
--- a/recv.h
+++ b/recv.h
@@ -18,10 +18,10 @@ struct receiver_node {
 	size_t loaded;
 	/** receiver-specific data */
 	void *private_data;
-	/** set to 1 if end of file is reached */
-	int eof;
-	/** pointer to the eof member of the consumer */
-	int *output_eof;
+	/** Set to non-zero error value on errors or on end of file. */
+	int error;
+	/** Pointer to the error member of the consumer. */
+	int *output_error;
 	/** pointer to the configuration data for this instance */
 	void *conf;
 	/** the task associated with this instance */
diff --git a/stdin.c b/stdin.c
index 649d464b..25eca2ef 100644
--- a/stdin.c
+++ b/stdin.c
@@ -75,7 +75,7 @@ static void stdin_post_select(struct sched *s, struct task *t)
 	} else
 		t->ret = -E_STDIN_EOF;
 	if (t->ret < 0)
-		sit->eof = 1;
+		sit->error = t->ret;
 }
 
 /**
@@ -92,7 +92,7 @@ void stdin_set_defaults(struct stdin_task *sit)
 {
 	sit->bufsize = 16 * 1024,
 	sit->loaded = 0,
-	sit->eof = 0,
+	sit->error = 0,
 	sit->task.pre_select = stdin_pre_select;
 	sit->task.post_select = stdin_post_select;
 	sit->task.event_handler = stdin_default_event_handler;
diff --git a/stdin.h b/stdin.h
index 4f5a6112..515c4d26 100644
--- a/stdin.h
+++ b/stdin.h
@@ -19,7 +19,7 @@ struct stdin_task {
 	/** The task structure. */
 	struct task task;
 	/** Non-zero on read error, or if a read from stdin returned zero. */
-	int eof;
+	int error;
 };
 
 void stdin_set_defaults(struct stdin_task *sit);
diff --git a/stdout.c b/stdout.c
index fe188c9a..babed752 100644
--- a/stdout.c
+++ b/stdout.c
@@ -33,8 +33,8 @@ static void stdout_pre_select(struct sched *s, struct task *t)
 	t->ret = 1;
 	sot->check_fd = 0;
 	if (!*sot->loaded) {
-		if (*sot->input_eof) {
-			t->ret = -E_STDOUT_EOF;
+		if (*sot->input_error) {
+			t->ret = *sot->input_error;
 			s->timeout.tv_sec = 0;
 			s->timeout.tv_usec = 1;
 		}
@@ -62,8 +62,8 @@ static void stdout_post_select(struct sched *s, struct task *t)
 
 	t->ret = 1;
 	if (!sot->check_fd) {
-		if (*sot->input_eof)
-			t->ret = -E_STDOUT_EOF;
+		if (*sot->input_error)
+			t->ret = *sot->input_error;
 		return;
 	}
 	if (!FD_ISSET(STDOUT_FILENO, &s->wfds))
@@ -99,7 +99,7 @@ void stdout_set_defaults(struct stdout_task *sot)
 	sot->task.pre_select = stdout_pre_select;
 	sot->task.post_select = stdout_post_select;
 	sot->task.event_handler = stdout_default_event_handler;
-	sot->eof = 0;
+	sot->error = 0;
 	mark_fd_nonblocking(STDOUT_FILENO);
 	sprintf(sot->task.status, "stdout writer");
 }
diff --git a/stdout.h b/stdout.h
index ee4f35ff..92e1cf49 100644
--- a/stdout.h
+++ b/stdout.h
@@ -14,10 +14,10 @@ struct stdout_task {
 	char *buf;
 	/** Number of bytes loaded in \a buf. */
 	size_t *loaded;
-	/** Üointer to the eof flag of the feeding task. */
-	int *input_eof;
-	/** Non-zero if a write error occured. */
-	int eof;
+	/** Pointer to the error variable of the feeding task. */
+	int *input_error;
+	/** Non-zero if a write error occurred. */
+	int error;
 	/** The task structure. */
 	struct task task;
 	/** Whether \p STDOUT_FILENO was included in the write fd set. */
diff --git a/write.c b/write.c
index 3b87335f..125df1c1 100644
--- a/write.c
+++ b/write.c
@@ -28,11 +28,11 @@ struct check_wav_task {
 	char *buf;
 	/** Number of bytes loaded in \a buf. */
 	size_t *loaded;
-	/** Non-zero if end of file was reached. */
-	int *eof;
+	/** Non-zero if an error occurred or end of file was reached. */
+	int *error;
 	/** Number of channels specified in wav header given by \a buf. */
 	unsigned channels;
-	/** Samplerate specified in wav header given by \a buf. */
+	/** Sample rate specified in wav header given by \a buf. */
 	unsigned samplerate;
 	/** The task structure for this task. */
 	struct task task;
@@ -67,7 +67,7 @@ static void check_wav_pre_select(__a_unused struct sched *s, struct task *t)
 	unsigned char *a;
 
 	if (*wt->loaded < WAV_HEADER_LEN) {
-		t->ret = *wt->eof? -E_PREMATURE_END : 1;
+		t->ret = *wt->error? -E_PREMATURE_END : 1;
 		return;
 	}
 	wt->channels = 2;
@@ -170,7 +170,7 @@ static void idt_event_handler(struct task *t)
 	unregister_task(t);
 	wng->buf = sit.buf;
 	wng->loaded = &sit.loaded;
-	wng->input_eof = &sit.eof;
+	wng->input_error = &sit.error;
 	wng->task.event_handler = wng_event_handler;
 	wng->channels = &cwt.channels;
 	wng->samplerate = &cwt.samplerate;
@@ -234,7 +234,7 @@ int main(int argc, char *argv[])
 	cwt.task.event_handler = cwt_event_handler;
 	cwt.buf = sit.buf;
 	cwt.loaded = &sit.loaded;
-	cwt.eof = &sit.eof;
+	cwt.error = &sit.error;
 	sprintf(cwt.task.status, "check wav");
 	register_task(&cwt.task);
 
diff --git a/write.h b/write.h
index 2eb4e3db..71c666d6 100644
--- a/write.h
+++ b/write.h
@@ -101,10 +101,10 @@ struct writer_node_group {
 	struct writer_node *writer_nodes;
 	/** the maximum of the chunk_bytes values of the writer nodes in this group */
 	size_t max_chunk_bytes;
-	/** non-zero if end of file was encountered by the feeding task */
-	int *input_eof;
-	/** non-zero if end of file was encountered */
-	int eof;
+	/** Non-zero if an error or end of file was encountered by the feeding task. */
+	int *input_error;
+	/** Non-zero if an error occurred or end of file was encountered. */
+	int error;
 	/** current output buffer */
 	char *buf;
 	/** number of bytes loaded in the output buffer */
diff --git a/write_common.c b/write_common.c
index b35773f7..a8d21149 100644
--- a/write_common.c
+++ b/write_common.c
@@ -28,7 +28,7 @@ static void wng_pre_select(__a_unused struct sched *s, struct task *t)
 		struct writer_node *wn = &g->writer_nodes[i];
 		t->ret = wn->writer->pre_select(s, wn);
 		if (t->ret < 0) {
-			g->eof = 1;
+			g->error = t->ret;
 			return;
 		}
 	}
@@ -44,7 +44,7 @@ static void wng_post_select(struct sched *s, struct task *t)
 		struct writer_node *wn = &g->writer_nodes[i];
 		t->ret = wn->writer->post_select(s, wn);
 		if (t->ret < 0) {
-			g->eof = 1;
+			g->error = t->ret;
 			return;
 		}
 		if (!i)
@@ -58,9 +58,9 @@ static void wng_post_select(struct sched *s, struct task *t)
 		FOR_EACH_WRITER_NODE(i, g)
 			g->writer_nodes[i].written -= min_written;
 	}
-	if (!*g->loaded && *g->input_eof) {
-		g->eof = 1;
-		t->ret = -E_WNG_EOF;
+	if (!*g->loaded && *g->input_error) {
+		g->error = *g->input_error;
+		t->ret = g->error;
 		return;
 	}
 	t->ret = 1;
@@ -96,7 +96,7 @@ int wng_open(struct writer_node_group *g)
 		g->max_chunk_bytes = PARA_MAX(g->max_chunk_bytes, ret);
 	}
 	sprintf(g->task.status, "%s", "writer node group");
-	g->eof = 0;
+	g->error = 0;
 	return 1;
 err_out:
 	PARA_ERROR_LOG("%s\n", PARA_STRERROR(-ret));
@@ -106,7 +106,7 @@ err_out:
 		wn->writer->close(wn);
 	}
 	g->num_writers = 0;
-	g->eof = 1;
+	g->error = ret;
 	return ret;
 }
 
@@ -119,7 +119,6 @@ err_out:
 void wng_unregister(struct writer_node_group *g)
 {
 	unregister_task(&g->task);
-	g->eof = 1;
 }
 
 /**