From 8bf35b38357c3ce59f52ae87f6e84e4b6d183ac7 Mon Sep 17 00:00:00 2001 From: Andre Noll Date: Sat, 25 Jan 2014 19:41:45 +0100 Subject: [PATCH] task_register() conversion: receivers This adds a new public function, task_reap(), to sched.c. It is called by para_audiod and para_play to free the memory occupied by the receiver node after EOF. sched_shutdown() can not be used for this purpose since the scheduler stays active during the life time of these programs (i.e. schedule() never returns) while receiver nodes come and go. The new task_reap() has to face the problem that it is called from another task's ->post_select() method, so removing the task being reaped from the scheduler task list is not possible in task_reap(). Hence this patch adds the new flag "dead" to struct task. It is initially unset and is turned on in task_reap() to indicate that (a) the task has exited (i.e. ->post_select() returned negative) and (b) task_reap() has been called to fetch the exit status. Only if this flag is set, the scheduler removes the task from the task list. --- afh_recv.c | 4 ++-- audiod.c | 20 +++++++++++-------- dccp_recv.c | 4 ++-- http_recv.c | 4 ++-- play.c | 15 +++++++++----- recv.c | 10 ++++++---- recv.h | 2 +- recv_common.c | 2 +- sched.c | 55 +++++++++++++++++++++++++++++++++++++++++++++++++-- sched.h | 3 +++ udp_recv.c | 4 ++-- 11 files changed, 94 insertions(+), 29 deletions(-) diff --git a/afh_recv.c b/afh_recv.c index 941ec4e9..e320fdee 100644 --- a/afh_recv.c +++ b/afh_recv.c @@ -152,7 +152,7 @@ static void afh_recv_close(struct receiver_node *rn) static void afh_recv_pre_select(struct sched *s, struct task *t) { - struct receiver_node *rn = container_of(t, struct receiver_node, task); + struct receiver_node *rn = task_context(t); struct private_afh_recv_data *pard = rn->private_data; struct afh_info *afhi = &pard->afhi; struct afh_recv_args_info *conf = rn->conf; @@ -172,7 +172,7 @@ static void afh_recv_pre_select(struct sched *s, struct task *t) static int afh_recv_post_select(__a_unused struct sched *s, struct task *t) { - struct receiver_node *rn = container_of(t, struct receiver_node, task); + struct receiver_node *rn = task_context(t); struct afh_recv_args_info *conf = rn->conf; struct private_afh_recv_data *pard = rn->private_data; struct btr_node *btrn = rn->btrn; diff --git a/audiod.c b/audiod.c index b7b6d2d7..4b88ec5b 100644 --- a/audiod.c +++ b/audiod.c @@ -391,6 +391,7 @@ static void close_receiver(int slot_num) audio_formats[s->format], slot_num); a->receiver->close(s->receiver_node); btr_remove_node(&s->receiver_node->btrn); + task_reap(&s->receiver_node->task); free(s->receiver_node); s->receiver_node = NULL; tv_add(now, &(struct timeval)EMBRACE(0, 200 * 1000), @@ -459,7 +460,7 @@ static void notify_receivers(int error) continue; if (!s->receiver_node) continue; - task_notify(&s->receiver_node->task, error); + task_notify(s->receiver_node->task, error); } } @@ -566,10 +567,12 @@ static int open_receiver(int format) s->receiver_node = rn; PARA_NOTICE_LOG("started %s: %s receiver in slot %d\n", audio_formats[format], r->name, slot_num); - rn->task.pre_select = r->pre_select; - rn->task.post_select = r->post_select; - sprintf(rn->task.status, "%s receiver node", r->name); - register_task(&sched, &rn->task); + rn->task = task_register(&(struct task_info) { + .name = r->name, + .pre_select = r->pre_select, + .post_select = r->post_select, + .context = rn, + }, &sched); return slot_num; } @@ -584,7 +587,7 @@ static bool receiver_running(void) if (!s->receiver_node) continue; - if (s->receiver_node->task.error >= 0) + if (s->receiver_node->task->error >= 0) return true; if (ss1 == ss2) return true; @@ -611,7 +614,7 @@ struct btr_node *audiod_get_btr_root(void) struct timeval rstime; if (!s->receiver_node) continue; - if (s->receiver_node->task.error < 0) + if (s->receiver_node->task->error < 0) continue; btr_get_node_start(s->receiver_node->btrn, &rstime); if (newest_slot >= 0 && tv_diff(&rstime, &newest_rstime, NULL) < 0) @@ -1103,7 +1106,7 @@ static bool must_close_slot(int slot_num) if (s->format < 0) return false; - if (s->receiver_node && s->receiver_node->task.error >= 0) + if (s->receiver_node && s->receiver_node->task->error >= 0) return false; for (i = 0; i < a->num_filters; i++) if (s->fns && s->fns[i].task.error >= 0) @@ -1427,6 +1430,7 @@ int main(int argc, char *argv[]) sched.default_timeout.tv_sec = 2; sched.default_timeout.tv_usec = 999 * 1000; ret = schedule(&sched); + sched_shutdown(&sched); PARA_EMERG_LOG("%s\n", para_strerror(-ret)); return EXIT_FAILURE; diff --git a/dccp_recv.c b/dccp_recv.c index 3d6588ac..1c41fd3c 100644 --- a/dccp_recv.c +++ b/dccp_recv.c @@ -121,7 +121,7 @@ static void *dccp_recv_parse_config(int argc, char **argv) static void dccp_recv_pre_select(struct sched *s, struct task *t) { - struct receiver_node *rn = container_of(t, struct receiver_node, task); + struct receiver_node *rn = task_context(t); if (generic_recv_pre_select(s, t) <= 0) return; @@ -130,7 +130,7 @@ static void dccp_recv_pre_select(struct sched *s, struct task *t) static int dccp_recv_post_select(struct sched *s, struct task *t) { - struct receiver_node *rn = container_of(t, struct receiver_node, task); + struct receiver_node *rn = task_context(t); struct btr_node *btrn = rn->btrn; struct iovec iov[2]; int ret, iovcnt; diff --git a/http_recv.c b/http_recv.c index 1f02e48d..03fca4e1 100644 --- a/http_recv.c +++ b/http_recv.c @@ -62,7 +62,7 @@ static char *make_request_msg(void) static void http_recv_pre_select(struct sched *s, struct task *t) { - struct receiver_node *rn = container_of(t, struct receiver_node, task); + struct receiver_node *rn = task_context(t); struct private_http_recv_data *phd = rn->private_data; if (generic_recv_pre_select(s, t) <= 0) @@ -80,7 +80,7 @@ static void http_recv_pre_select(struct sched *s, struct task *t) */ static int http_recv_post_select(struct sched *s, struct task *t) { - struct receiver_node *rn = container_of(t, struct receiver_node, task); + struct receiver_node *rn = task_context(t); struct private_http_recv_data *phd = rn->private_data; struct btr_node *btrn = rn->btrn; int ret, iovcnt; diff --git a/play.c b/play.c index 5736a2dd..2dcc37ea 100644 --- a/play.c +++ b/play.c @@ -249,7 +249,7 @@ static int get_playback_error(struct play_task *pt) return 0; if (pt->fn.task.error >= 0) return 0; - if (pt->rn.task.error >= 0) + if (pt->rn.task->error >= 0) return 0; if (err == -E_BTR_EOF || err == -E_RECV_EOF || err == -E_EOF || err == -E_WRITE_COMMON_EOF) @@ -277,6 +277,7 @@ static int eof_cleanup(struct play_task *pt) free(pt->fn.conf); memset(&pt->fn, 0, sizeof(struct filter_node)); + task_reap(&pt->rn.task); btr_remove_node(&pt->rn.btrn); /* * On eof (ret > 0), we do not wipe the receiver node struct until a @@ -351,9 +352,6 @@ static int open_new_file(struct play_task *pt) free(tmp); tmp = NULL; } - pt->rn.task.pre_select = afh_recv->pre_select; - pt->rn.task.post_select = afh_recv->post_select; - sprintf(pt->rn.task.status, "%s receiver node", afh_recv->name); return 1; fail: wipe_receiver_node(pt); @@ -405,7 +403,13 @@ static int load_file(struct play_task *pt) pt->wn.task.error = 0; /* success, register tasks */ - register_task(&sched, &pt->rn.task); + pt->rn.task = task_register( + &(struct task_info) { + .name = afh_recv->name, + .pre_select = afh_recv->pre_select, + .post_select = afh_recv->post_select, + .context = &pt->rn + }, &sched); register_task(&sched, &pt->fn.task); register_writer_node(&pt->wn, pt->fn.btrn, &sched); return 1; @@ -1250,6 +1254,7 @@ int main(int argc, char *argv[]) sprintf(pt->task.status, "play task"); register_task(&sched, &pt->task); ret = schedule(&sched); + sched_shutdown(&sched); if (ret < 0) PARA_ERROR_LOG("%s\n", para_strerror(-ret)); return ret < 0? EXIT_FAILURE : EXIT_SUCCESS; diff --git a/recv.c b/recv.c index 4d8916f9..0d7bb2ce 100644 --- a/recv.c +++ b/recv.c @@ -65,6 +65,7 @@ int main(int argc, char *argv[]) struct receiver_node rn; struct stdout_task sot; static struct sched s; + struct task_info ti; recv_cmdline_parser(argc, argv, &conf); loglevel = get_loglevel_by_name(conf.loglevel_arg); @@ -93,10 +94,11 @@ int main(int argc, char *argv[]) EMBRACE(.parent = rn.btrn, .name = "stdout")); stdout_task_register(&sot, &s); - rn.task.pre_select = r->pre_select; - rn.task.post_select = r->post_select; - sprintf(rn.task.status, "%s", r->name); - register_task(&s, &rn.task); + ti.name = r->name; + ti.pre_select = r->pre_select; + ti.post_select = r->post_select; + ti.context = &rn; + rn.task = task_register(&ti, &s); s.default_timeout.tv_sec = 1; s.default_timeout.tv_usec = 0; diff --git a/recv.h b/recv.h index a590aabd..2b5e36d7 100644 --- a/recv.h +++ b/recv.h @@ -17,7 +17,7 @@ struct receiver_node { /** Pointer to the configuration data for this instance. */ void *conf; /** The task associated with this instance. */ - struct task task; + struct task *task; /** The receiver node is always the root of the buffer tree. */ struct btr_node *btrn; /** Each receiver node maintains a buffer pool for the received data. */ diff --git a/recv_common.c b/recv_common.c index eb5fe57e..921d57ae 100644 --- a/recv_common.c +++ b/recv_common.c @@ -127,7 +127,7 @@ void print_receiver_helps(unsigned flags) */ int generic_recv_pre_select(struct sched *s, struct task *t) { - struct receiver_node *rn = container_of(t, struct receiver_node, task); + struct receiver_node *rn = task_context(t); int ret = btr_node_status(rn->btrn, 0, BTR_NT_ROOT); if (ret < 0) diff --git a/sched.c b/sched.c index 101cc4c0..6b8e0910 100644 --- a/sched.c +++ b/sched.c @@ -75,8 +75,11 @@ static unsigned sched_post_select(struct sched *s) unsigned num_running_tasks = 0; list_for_each_entry_safe(t, tmp, &s->task_list, node) { - if (t->error < 0) + if (t->error < 0) { + if (t->dead) /* task has been reaped */ + unlink_and_free_task(t); continue; + } call_post_select(s, t); t->notification = 0; if (t->error < 0) { @@ -138,6 +141,53 @@ again: goto again; } +/** + * Obtain the error status of a task and deallocate its resources. + * + * \param tptr Identifies the task to reap. + * + * This function is similar to wait(2) in that it returns information about a + * terminated task and allows to release the resources associated with the + * task. Until this function is called, the terminated task remains in a zombie + * state. + * + * \return If \a tptr is \p NULL, or \a *tptr is \p NULL, the function does + * nothing and returns zero. Otherwise, it is checked whether the task + * identified by \a tptr is still running. If it is, the function returns zero + * and again, no action is taken. Otherwise the (negative) error code of the + * terminated task is returned and \a *tptr is set to \p NULL. The task will + * then be removed removed from the scheduler task list. + * + * \sa \ref sched_shutdown(), wait(2). + */ +int task_reap(struct task **tptr) +{ + struct task *t; + + if (!tptr) + return 0; + t = *tptr; + if (!t) + return 0; + if (!t->owned_by_sched) + return 0; + if (t->error >= 0) + return 0; + if (t->dead) /* will be freed in sched_post_select() */ + return 0; + /* + * With list_for_each_entry_safe() it is only safe to remove the + * _current_ list item. Since we are being called from the loop in + * schedule() via some task's ->post_select() function, freeing the + * given task here would result in use-after-free bugs in schedule(). + * So we only set t->dead which tells schedule() to free the task in + * the next iteration of its loop. + */ + t->dead = true; + *tptr = NULL; + return t->error; +} + /** * Deallocate all resources of all tasks of a scheduler instance. * @@ -199,6 +249,7 @@ struct task *task_register(struct task_info *info, struct sched *s) t->status[sizeof(t->status) - 1] = '\0'; t->notification = 0; t->error = 0; + t->dead = false; t->pre_select = info->pre_select; t->post_select = info->post_select; t->context = info->context; @@ -239,7 +290,7 @@ char *get_task_list(struct sched *s) list_for_each_entry_safe(t, tmp, &s->task_list, node) { char *tmp_msg; tmp_msg = make_message("%s%p\t%s\t%s\n", msg? msg : "", t, - t->error < 0? "zombie" : "running", + t->error < 0? (t->dead? "dead" : "zombie") : "running", t->status); free(msg); msg = tmp_msg; diff --git a/sched.h b/sched.h index 40524845..fd714b61 100644 --- a/sched.h +++ b/sched.h @@ -55,6 +55,8 @@ struct task { int notification; /** Whether the task structure should be freed in sched_shutdown(). */ bool owned_by_sched; + /** True if task is in error state and exit status has been queried. */ + bool dead; /** Usually a pointer to the struct containing this task. */ void *context; }; @@ -104,6 +106,7 @@ char *get_task_list(struct sched *s); void task_notify(struct task *t, int err); void task_notify_all(struct sched *s, int err); int task_get_notification(const struct task *t); +int task_reap(struct task **tptr); void sched_min_delay(struct sched *s); void sched_request_timeout(struct timeval *to, struct sched *s); void sched_request_timeout_ms(long unsigned ms, struct sched *s); diff --git a/udp_recv.c b/udp_recv.c index 15cf73eb..4d4b67f9 100644 --- a/udp_recv.c +++ b/udp_recv.c @@ -29,7 +29,7 @@ static void udp_recv_pre_select(struct sched *s, struct task *t) { - struct receiver_node *rn = container_of(t, struct receiver_node, task); + struct receiver_node *rn = task_context(t); if (generic_recv_pre_select(s, t) <= 0) return; @@ -56,7 +56,7 @@ static int udp_check_eof(size_t sz, struct iovec iov[2]) static int udp_recv_post_select(__a_unused struct sched *s, struct task *t) { - struct receiver_node *rn = container_of(t, struct receiver_node, task); + struct receiver_node *rn = task_context(t); struct btr_node *btrn = rn->btrn; size_t num_bytes; struct iovec iov[2]; -- 2.39.5