if (ct->task.error < 0)
return ct->task.error;
if (!svt->stdout_task_started && ct->status == CL_EXECUTING) {
- stdout_set_defaults(&sot);
- register_task(s, &sot.task);
+ stdout_task_register(&sot, s);
svt->stdout_task_started = true;
return 1;
}
default: ret = -E_SERVER_CMD_FAILURE;
}
}
+ sched_shutdown(&sched);
out:
if (ret < 0)
PARA_ERROR_LOG("%s\n", para_strerror(-ret));
}
sot->btrn = btr_new_node(&(struct btr_node_description)
EMBRACE(.name = "stdout", .parent = parent));
- stdout_set_defaults(sot);
- register_task(&s, &sot->task);
+ stdout_task_register(sot, &s);
s.default_timeout.tv_sec = 1;
s.default_timeout.tv_usec = 0;
btr_log_tree(sit->btrn, LL_INFO);
ret = schedule(&s);
+ sched_shutdown(&s);
out_cleanup:
for (i--; i >= 0; i--) {
struct filter_node *fn = fns[i];
goto out;
r_opened = 1;
- memset(&sot, 0, sizeof(struct stdout_task));
sot.btrn = btr_new_node(&(struct btr_node_description)
EMBRACE(.parent = rn.btrn, .name = "stdout"));
- stdout_set_defaults(&sot);
- register_task(&s, &sot.task);
+ stdout_task_register(&sot, &s);
rn.task.pre_select = r->pre_select;
rn.task.post_select = r->post_select;
s.default_timeout.tv_sec = 1;
s.default_timeout.tv_usec = 0;
ret = schedule(&s);
+ sched_shutdown(&s);
out:
if (r_opened)
r->close(&rn);
}
}
+static void unlink_and_free_task(struct task *t)
+{
+ PARA_INFO_LOG("freeing task %s\n", t->status);
+ list_del(&t->node);
+ if (t->owned_by_sched)
+ free(t);
+}
+
//#define SCHED_DEBUG 1
static inline void call_post_select(struct sched *s, struct task *t)
{
#endif
}
-static void sched_post_select(struct sched *s)
+static unsigned sched_post_select(struct sched *s)
{
struct task *t, *tmp;
+ unsigned num_running_tasks = 0;
list_for_each_entry_safe(t, tmp, &s->task_list, node) {
+ if (t->error < 0)
+ continue;
call_post_select(s, t);
t->notification = 0;
- if (t->error < 0)
- list_del(&t->node);
+ if (t->error < 0) {
+ if (!t->owned_by_sched)
+ list_del(&t->node);
+ } else
+ num_running_tasks++;
}
+ return num_running_tasks;
}
/**
int schedule(struct sched *s)
{
int ret;
+ unsigned num_running_tasks;
if (!s->select_function)
s->select_function = para_select;
FD_ZERO(&s->wfds);
}
clock_get_realtime(now);
- sched_post_select(s);
- if (list_empty(&s->task_list))
+ num_running_tasks = sched_post_select(s);
+ if (num_running_tasks == 0)
return 0;
goto again;
}
/**
- * Add a task to the scheduler.
+ * Deallocate all resources of all tasks of a scheduler instance.
+ *
+ * \param s The scheduler instance.
+ *
+ * This should only be called after \ref schedule() has returned.
+ */
+void sched_shutdown(struct sched *s)
+{
+ struct task *t, *tmp;
+
+ list_for_each_entry_safe(t, tmp, &s->task_list, node) {
+ if (t->error >= 0)
+ /* The task list should contain only terminated tasks. */
+ PARA_WARNING_LOG("shutting down running task %s\n",
+ t->status);
+ unlink_and_free_task(t);
+ }
+}
+
+/**
+ * Add a task to the scheduler. Deprecated.
*
* \param t The task to add.
* \param s The scheduler instance to add the task to.
PARA_INFO_LOG("registering %s (%p)\n", t->status, t);
assert(t->post_select);
t->notification = 0;
+ t->owned_by_sched = false;
+ if (!s->task_list.next)
+ INIT_LIST_HEAD(&s->task_list);
+ list_add_tail(&t->node, &s->task_list);
+}
+
+/**
+ * Add a task to the scheduler task list.
+ *
+ * \param info Task information supplied by the caller.
+ * \param s The scheduler instance.
+ *
+ * \return A pointer to a newly allocated task structure. It will be
+ * freed by sched_shutdown().
+ */
+struct task *task_register(struct task_info *info, struct sched *s)
+{
+ struct task *t = para_malloc(sizeof(*t));
+
+ assert(info->post_select);
+
if (!s->task_list.next)
INIT_LIST_HEAD(&s->task_list);
+
+ snprintf(t->status, sizeof(t->status) - 1, "%s", info->name);
+ t->status[sizeof(t->status) - 1] = '\0';
+ t->notification = 0;
+ t->error = 0;
+ t->pre_select = info->pre_select;
+ t->post_select = info->post_select;
+ t->context = info->context;
+ t->owned_by_sched = true;
list_add_tail(&t->node, &s->task_list);
+ return t;
+}
+
+/**
+ * Obtain the context pointer of a task.
+ *
+ * \param t Return this task's context pointer.
+ *
+ * \return A pointer to the memory location specified previously as \a
+ * task_info->context when the task was registered with \ref task_register().
+ */
+void *task_context(struct task *t)
+{
+ assert(t->owned_by_sched);
+ return t->context;
}
/**
list_for_each_entry_safe(t, tmp, &s->task_list, node) {
char *tmp_msg;
- tmp_msg = make_message("%s%p\t%s\n", msg? msg : "", t, t->status);
+ tmp_msg = make_message("%s%p\t%s\t%s\n", msg? msg : "", t,
+ t->error < 0? "zombie" : "running",
+ t->status);
free(msg);
msg = tmp_msg;
}
/**
* Paraslash's task structure.
*
- * Before registering a task to the scheduler, the task structure must be
- * filled in properly by the caller.
+ * This is considered an internal structure and will eventually be made private.
*
* \sa \ref sched.
*/
struct task {
+ /** Copied from the task_info struct during task_register(). */
+ void (*pre_select)(struct sched *s, struct task *t);
+ /** Copied from the task_info struct during task_register(). */
+ int (*post_select)(struct sched *s, struct task *t);
+ /** Whether this task is active (>=0) or in error state (<0). */
+ int error;
+ /** Position of the task in the task list of the scheduler. */
+ struct list_head node;
+ /** The task name supplied when the task was registered(). */
+ char status[255];
+ /** If less than zero, the task was notified by another task. */
+ int notification;
+ /** Whether the task structure should be freed in sched_shutdown(). */
+ bool owned_by_sched;
+ /** Usually a pointer to the struct containing this task. */
+ void *context;
+};
+
+/** Information that must be supplied by callers of \ref task_register(). */
+struct task_info {
+ /** Used for log messages and by \ref get_task_list(). */
+ const char *name;
/**
- * The optional pre select hook of \a t.
+ * The optional pre select method.
*
* Its purpose is to add file descriptors to the fd sets of the
* scheduler and to decrease the select timeout if necessary.
*/
void (*pre_select)(struct sched *s, struct task *t);
/**
- * The mandatory post select hook of \a t.
+ * The mandatory post select method.
*
* Its purpose is to evaluate and act upon the results of the previous
* select call. If this function returns a negative value, the
* scheduler unregisters the task.
*/
int (*post_select)(struct sched *s, struct task *t);
- /** Whether this task is active (>=0) or in error state (<0). */
- int error;
- /** Position of the task in the task list of the scheduler. */
- struct list_head node;
- /** Descriptive text and current status of the task. */
- char status[255];
- /** If less than zero, the task was notified by another task. */
- int notification;
+ /**
+ * This pointer is saved when the task is register(ed). It may be
+ * queried from ->pre_select() and ->post_select() via \ref
+ * task_context(). Usually this is a pointer to the struct owned by the
+ * caller which contains the task pointer as one member.
+ */
+ void *context;
};
/**
*/
extern struct timeval *now;
+struct task *task_register(struct task_info *info, struct sched *s);
+void *task_context(struct task *t);
void register_task(struct sched *s, struct task *t);
int schedule(struct sched *s);
+void sched_shutdown(struct sched *s);
char *get_task_list(struct sched *s);
void task_notify(struct task *t, int err);
void task_notify_all(struct sched *s, int err);
*/
static void stdout_pre_select(struct sched *s, struct task *t)
{
- struct stdout_task *sot = container_of(t, struct stdout_task, task);
+ struct stdout_task *sot = task_context(t);
int ret;
ret = btr_node_status(sot->btrn, 0, BTR_NT_LEAF);
*/
static int stdout_post_select(struct sched *s, struct task *t)
{
- struct stdout_task *sot = container_of(t, struct stdout_task, task);
+ struct stdout_task *sot = task_context(t);
struct btr_node *btrn = sot->btrn;
int ret;
char *buf;
}
return ret;
}
+
/**
- * Initialize a stdout task structure with default values.
+ * Register a stdout task structure.
*
- * \param sot The stdout task structure.
+ * \param sot The stdout task structure to register.
+ * \param s The task will be added to this scheduler's task list.
*
- * This fills in the pre/post select function pointers of the task structure
- * given by \a sot.
+ * This sets up \a sot and registers a task with \a sot as context pointer.
*/
-void stdout_set_defaults(struct stdout_task *sot)
+void stdout_task_register(struct stdout_task *sot, struct sched *s)
{
int ret;
-
- sot->task.pre_select = stdout_pre_select;
- sot->task.post_select = stdout_post_select;
- sprintf(sot->task.status, "stdout");
+ struct task_info ti = {
+ .pre_select = stdout_pre_select,
+ .post_select = stdout_post_select,
+ .context = sot,
+ .name = "stdout",
+ };
/* See stdin.c for details. */
ret = fcntl(STDOUT_FILENO, F_GETFL);
}
sot->fd_flags = ret;
sot->must_set_nonblock_flag = (sot->fd_flags & O_NONBLOCK) == 0;
+ sot->task = task_register(&ti, s);
}
*/
struct stdout_task {
/** The task structure used by the scheduler. */
- struct task task;
+ struct task *task;
/** Stdout is always a leaf node in the buffer tree. */
struct btr_node *btrn;
/** The descriptor flags of STDOUT at startup. */
bool must_set_nonblock_flag;
};
-void stdout_set_defaults(struct stdout_task *sot);
+void stdout_task_register(struct stdout_task *sot, struct sched *s);