From: Andre Noll Date: Sun, 31 Mar 2013 12:32:07 +0000 (+0000) Subject: sched: Provide alternative post_select variant. X-Git-Tag: v0.4.13~39^2~41 X-Git-Url: http://git.tue.mpg.de/?a=commitdiff_plain;h=9c00a79ed817c854ed10ee6bd1b908df8dc13a2f;p=paraslash.git sched: Provide alternative post_select variant. Code which accesses the ->error field of another task has been a source of bugs in the past. This patch is a first step to make ->error private to sched.c. Currently the ->post_select() methods of all tasks are supposed to set ->error to a negative value to indicate that the task should be terminated, i.e. t->error being negative instructs the scheduler to not call t->pre_select() or t->post_select() any more in subsequent iterations. An equivalent way to achieve the same is to let ->post_select() return an error code instead. Benefits include * It reduces the use of ->error outside of sched.c * It's impossible to miss setting the error code * It simplifies code slightly Therefore we'd like to change the prototype of the ->post_select() methods from void (*post_select)(struct sched *s, struct task *t); to int (*post_select)(struct sched *s, struct task *t); Changes to struct sched affects many parts of the code base, hence converting all users at once would result in a very large patch. Therefore we temporarily introduce an additional ->new_post_select() method according to the second declaration above. The scheduler calls the new method if it is defined and falls back to the old variant otherwise. This approach allows to switch over one task after another without breaking things. This patch introduces the infrastructure just described and switches over the http receiver. Subsequent commits will change all other tasks in the same way. After all tasks have been switched over we can get rid of the old ->post_select variant and rename ->new_post_select() back to ->post_select(). --- diff --git a/audiod.c b/audiod.c index 6da2ccc7..a94a6aca 100644 --- a/audiod.c +++ b/audiod.c @@ -495,8 +495,13 @@ static void open_filters(struct slot_info *s) fn->filter_num = a->filter_nums[i]; fn->conf = a->filter_conf[i]; fn->task.pre_select = f->pre_select; - fn->task.post_select = f->post_select; - + if (f->new_post_select) { + fn->task.new_post_select = f->new_post_select; + fn->task.post_select = NULL; + } else { + fn->task.new_post_select = NULL; + fn->task.post_select = f->post_select; + } fn->btrn = btr_new_node(&(struct btr_node_description) EMBRACE(.name = f->name, .parent = parent, .handler = f->execute, .context = fn)); @@ -560,7 +565,13 @@ static int open_receiver(int format) PARA_NOTICE_LOG("started %s: %s receiver in slot %d\n", audio_formats[format], r->name, slot_num); rn->task.pre_select = r->pre_select; - rn->task.post_select = r->post_select; + if (r->new_post_select) { + rn->task.new_post_select = r->new_post_select; + rn->task.post_select = NULL; + } else { + rn->task.new_post_select = NULL; + rn->task.post_select = r->post_select; + } sprintf(rn->task.status, "%s receiver node", r->name); register_task(&sched, &rn->task); return slot_num; @@ -1042,6 +1053,7 @@ static void init_command_task(struct command_task *ct) { ct->task.pre_select = command_pre_select; ct->task.post_select = command_post_select; + ct->task.new_post_select = NULL; ct->task.error = 0; ct->fd = audiod_get_socket(); /* doesn't return on errors */ sprintf(ct->task.status, "command task"); diff --git a/filter.c b/filter.c index 40e3779e..07b78aed 100644 --- a/filter.c +++ b/filter.c @@ -140,7 +140,13 @@ int main(int argc, char *argv[]) EMBRACE(.name = f->name, .parent = parent, .handler = f->execute, .context = fn)); fn->task.pre_select = f->pre_select; - fn->task.post_select = f->post_select; + if (f->new_post_select) { + fn->task.new_post_select = f->new_post_select; + fn->task.post_select = NULL; + } else { + fn->task.new_post_select = NULL; + fn->task.post_select = f->post_select; + } f->open(fn); register_task(&s, &fn->task); parent = fn->btrn; diff --git a/filter.h b/filter.h index eefa6f69..f7376452 100644 --- a/filter.h +++ b/filter.h @@ -110,6 +110,8 @@ struct filter { * error code. */ void (*post_select)(struct sched *s, struct task *t); + /** New variant, see sched.h. */ + int (*new_post_select)(struct sched *s, struct task *t); /** * Answer a buffer tree query. * diff --git a/http_recv.c b/http_recv.c index a913d3a2..80f0b168 100644 --- a/http_recv.c +++ b/http_recv.c @@ -60,7 +60,6 @@ static void http_recv_pre_select(struct sched *s, struct task *t) struct receiver_node *rn = container_of(t, struct receiver_node, task); struct private_http_recv_data *phd = rn->private_data; - t->error = 0; if (generic_recv_pre_select(s, t) <= 0) return; if (phd->status == HTTP_CONNECTED) @@ -74,7 +73,7 @@ static void http_recv_pre_select(struct sched *s, struct task *t) * area with data read from the socket. In any case, update the state of the * connection if necessary. */ -static void http_recv_post_select(struct sched *s, struct task *t) +static int http_recv_post_select(struct sched *s, struct task *t) { struct receiver_node *rn = container_of(t, struct receiver_node, task); struct private_http_recv_data *phd = rn->private_data; @@ -90,11 +89,11 @@ static void http_recv_post_select(struct sched *s, struct task *t) if (ret < 0) goto out; if (ret == 0) - return; + return 0; if (phd->status == HTTP_CONNECTED) { char *rq; if (!FD_ISSET(rn->fd, &s->wfds)) - return; + return 0; rq = make_request_msg(); PARA_INFO_LOG("sending http request\n"); ret = write_va_buffer(rn->fd, "%s", rq); @@ -102,17 +101,17 @@ static void http_recv_post_select(struct sched *s, struct task *t) if (ret < 0) goto out; phd->status = HTTP_SENT_GET_REQUEST; - return; + return 0; } if (phd->status == HTTP_SENT_GET_REQUEST) { ret = read_pattern(rn->fd, HTTP_OK_MSG, strlen(HTTP_OK_MSG), &s->rfds); if (ret < 0) goto out; if (ret == 0) - return; + return 0; PARA_INFO_LOG("received ok msg, streaming\n"); phd->status = HTTP_STREAMING; - return; + return 0; } ret = -E_HTTP_RECV_OVERRUN; iovcnt = btr_pool_get_buffers(rn->btrp, iov); @@ -128,10 +127,9 @@ static void http_recv_post_select(struct sched *s, struct task *t) btr_add_output_pool(rn->btrp, num_bytes - iov[0].iov_len, btrn); } out: - if (ret >= 0) - return; - btr_remove_node(&rn->btrn); - t->error = ret; + if (ret < 0) + btr_remove_node(&rn->btrn); + return ret; } static void http_recv_close(struct receiver_node *rn) @@ -194,7 +192,8 @@ void http_recv_init(struct receiver *r) r->open = http_recv_open; r->close = http_recv_close; r->pre_select = http_recv_pre_select; - r->post_select = http_recv_post_select; + r->post_select = NULL; + r->new_post_select = http_recv_post_select; r->parse_config = http_recv_parse_config; r->free_config = http_recv_free_config; r->help = (struct ggo_help) { diff --git a/play.c b/play.c index fb72bae4..5794b952 100644 --- a/play.c +++ b/play.c @@ -368,7 +368,13 @@ static int open_new_file(struct play_task *pt) tmp = NULL; } pt->rn.task.pre_select = afh_recv->pre_select; - pt->rn.task.post_select = afh_recv->post_select; + if (afh_recv->new_post_select) { + pt->rn.task.new_post_select = afh_recv->new_post_select; + pt->rn.task.post_select = NULL; + } else { + pt->rn.task.post_select = NULL; + pt->rn.task.new_post_select = afh_recv->new_post_select; + } sprintf(pt->rn.task.status, "%s receiver node", afh_recv->name); return 1; fail: @@ -409,7 +415,13 @@ static int load_file(struct play_task *pt) pt->fn.filter_num = ret; decoder = filters + ret; pt->fn.task.pre_select = decoder->pre_select; - pt->fn.task.post_select = decoder->post_select; + if (decoder->new_post_select) { + pt->fn.task.new_post_select = decoder->new_post_select; + pt->fn.task.post_select = NULL; + } else { + pt->fn.task.new_post_select = NULL; + pt->fn.task.post_select = decoder->post_select; + } sprintf(pt->fn.task.status, "%s decoder", af); pt->fn.btrn = btr_new_node(&(struct btr_node_description) EMBRACE(.name = decoder->name, .parent = pt->rn.btrn, diff --git a/recv.c b/recv.c index 1fb5e25e..4458021c 100644 --- a/recv.c +++ b/recv.c @@ -109,7 +109,13 @@ int main(int argc, char *argv[]) register_task(&s, &sot.task); rn.task.pre_select = r->pre_select; - rn.task.post_select = r->post_select; + if (r->new_post_select) { + rn.task.new_post_select = r->new_post_select; + rn.task.post_select = NULL; + } else { + rn.task.new_post_select = NULL; + rn.task.post_select = r->post_select;; + } sprintf(rn.task.status, "%s", r->name); register_task(&s, &rn.task); diff --git a/recv.h b/recv.h index f70cbbe8..c48a6b36 100644 --- a/recv.h +++ b/recv.h @@ -116,6 +116,8 @@ struct receiver { * \sa select(2), struct receiver. */ void (*post_select)(struct sched *s, struct task *t); + /** New variant, see sched.h. */ + int (*new_post_select)(struct sched *s, struct task *t); /** The two help texts of this receiver. */ struct ggo_help help; diff --git a/sched.c b/sched.c index 95a07d29..2501b50c 100644 --- a/sched.c +++ b/sched.c @@ -36,7 +36,7 @@ static void unregister_task(struct task *t) para_strerror(-t->error)); if (t->pre_select) list_del(&t->pre_select_node); - if (t->post_select) + if (t->new_post_select || t->post_select) list_del(&t->post_select_node); } @@ -62,13 +62,20 @@ static void sched_preselect(struct sched *s) static inline void call_post_select(struct sched *s, struct task *t) { #ifndef SCHED_DEBUG - t->post_select(s, t); + if (t->new_post_select) { + t->error = t->new_post_select(s, t); + return; + } + return t->post_select(s, t); #else struct timeval t1, t2, diff; unsigned long pst; clock_get_realtime(&t1); - t->post_select(s, t); + if (t->new_post_select) + t->error = t->new_post_select(s, t); + else + t->post_select(s, t); clock_get_realtime(&t2); tv_diff(&t1, &t2, &diff); pst = tv2ms(&diff); @@ -165,7 +172,10 @@ void register_task(struct sched *s, struct task *t) PARA_DEBUG_LOG("pre_select: %p\n", &t->pre_select); list_add_tail(&t->pre_select_node, &s->pre_select_list); } - if (t->post_select) { + if (t->new_post_select) { + PARA_DEBUG_LOG("post_select: %p\n", &t->new_post_select); + list_add_tail(&t->post_select_node, &s->post_select_list); + } else if ((t->post_select)) { PARA_DEBUG_LOG("post_select: %p\n", &t->post_select); list_add_tail(&t->post_select_node, &s->post_select_list); } diff --git a/sched.h b/sched.h index 021474a2..907e5fb7 100644 --- a/sched.h +++ b/sched.h @@ -60,6 +60,13 @@ struct task { * Evaluate and act upon the results of the previous select call. */ void (*post_select)(struct sched *s, struct task *t); + /** + * The newer variant of the post select hook of \a t. + * + * This hook must return the error code rather than store it in + * t->error. + */ + int (*new_post_select)(struct sched *s, struct task *t); /** Whether this task is in error state. */ int error; /** Position of the task in the pre_select list of the scheduler. */ diff --git a/write.h b/write.h index 2573ba5c..bac94431 100644 --- a/write.h +++ b/write.h @@ -66,6 +66,8 @@ struct writer { * Called from the post_select function of the writer node's task. */ void (*post_select)(struct sched *s, struct task *t); + /** New variant, see sched.h. */ + int (*new_post_select)(struct sched *s, struct task *t); /** * Close one instance of the writer. * diff --git a/write_common.c b/write_common.c index a4f908ac..a1aac65e 100644 --- a/write_common.c +++ b/write_common.c @@ -113,8 +113,14 @@ void register_writer_node(struct writer_node *wn, struct btr_node *parent, .handler = w->execute, .context = wn)); strcpy(wn->task.status, name); free(name); - wn->task.post_select = w->post_select; wn->task.pre_select = w->pre_select; + if (w->new_post_select) { + wn->task.new_post_select = w->new_post_select; + wn->task.post_select = NULL; + } else { + wn->task.new_post_select = NULL; + wn->task.post_select = w->post_select; + } register_task(s, &wn->task); }