]> git.tue.mpg.de Git - paraslash.git/commitdiff
btr support for para_client.
authorAndre Noll <maan@systemlinux.org>
Fri, 15 Jan 2010 05:46:03 +0000 (06:46 +0100)
committerAndre Noll <maan@systemlinux.org>
Fri, 15 Jan 2010 05:46:03 +0000 (06:46 +0100)
The use of the client code in audiod requires a node to be inserted
as the _parent_ of some existing node, which was not neccessary
before. Therefore a new btrn pointer for the child node is added to
struct btr_node_description and btr_new_node() is adjusted accordingly.

The patch also adds a small helper sched_request_barrier() to
the scheduler.

audiod.c
buffer_tree.c
buffer_tree.h
client.c
client.h
client_common.c
sched.c
sched.h

index 4b2c65f7a48e381ea016f4f8276b15c2e1766f1d..27eba9e3e584cd06294bef568cc4eb19d1ce1f88 100644 (file)
--- a/audiod.c
+++ b/audiod.c
@@ -110,6 +110,8 @@ struct status_task {
        struct timeval clock_diff_barrier;
        /** Number of the audio format as announced by para_server. */
        int current_audio_format_num;
+       /* The status task btrn is the child of the client task. */
+       struct btr_node *btrn;
 };
 
 /** The array of status items sent by para_server. */
@@ -1032,6 +1034,7 @@ static void close_stat_pipe(void)
 {
        if (!stat_task->ct)
                return;
+       btr_free_node(stat_task->ct->btrn);
        client_close(stat_task->ct);
        stat_task->ct = NULL;
        clear_and_dump_items();
@@ -1128,8 +1131,17 @@ static void start_stop_decoders(struct sched *s)
        sched_min_delay(s);
 }
 
-/* restart the client task if necessary */
 static void status_pre_select(struct sched *s, struct task *t)
+{
+       struct status_task *st = container_of(t, struct status_task, task);
+       int ret;
+
+       ret = btr_node_status(st->btrn, 0, BTR_NT_LEAF);
+       sched_request_barrier(&st->restart_barrier, s);
+}
+
+/* restart the client task if necessary */
+static void status_post_select(struct sched *s, struct task *t)
 {
        struct status_task *st = container_of(t, struct status_task, task);
 
@@ -1137,7 +1149,7 @@ static void status_pre_select(struct sched *s, struct task *t)
                if (!st->ct)
                        goto out;
                if (st->ct->task.error >= 0) {
-                       st->ct->task.error = -E_AUDIOD_OFF;
+                       kill_btrn(st->ct->btrn, &st->ct->task, -E_AUDIOD_OFF);
                        goto out;
                }
                if (st->ct->task.error != -E_TASK_UNREGISTERED)
@@ -1147,6 +1159,8 @@ static void status_pre_select(struct sched *s, struct task *t)
                goto out;
        }
        if (st->ct) {
+               char *buf;
+               size_t sz;
                int ret;
                if (st->ct->task.error < 0) {
                        if (st->ct->task.error != -E_TASK_UNREGISTERED)
@@ -1156,21 +1170,25 @@ static void status_pre_select(struct sched *s, struct task *t)
                }
                if (st->ct->status != CL_RECEIVING)
                        goto out;
-               ret = for_each_stat_item(st->ct->buf, st->ct->loaded,
-                       update_item);
+               ret = btr_node_status(st->btrn, 0, BTR_NT_LEAF);
+               if (ret <= 0)
+                       goto out;
+               sz = btr_next_buffer(st->btrn, &buf);
+               ret = for_each_stat_item(buf, sz, update_item);
                if (ret < 0) {
-                       st->ct->task.error = ret;
+                       kill_btrn(st->ct->btrn, &st->ct->task, ret);
                        goto out;
                }
-               if (st->ct->loaded != ret) {
+               if (sz != ret)
                        st->last_status_read = *now;
-                       st->ct->loaded = ret;
-               } else {
+               else {
                        struct timeval diff;
                        tv_diff(now, &st->last_status_read, &diff);
                        if (diff.tv_sec > 61)
-                               st->ct->task.error = -E_STATUS_TIMEOUT;
+                               kill_btrn(st->ct->btrn, &st->ct->task,
+                                       -E_STATUS_TIMEOUT);
                }
+               btr_consume(st->btrn, sz - ret);
                goto out;
        }
        if (tv_diff(now, &st->restart_barrier, NULL) < 0)
@@ -1180,14 +1198,14 @@ static void status_pre_select(struct sched *s, struct task *t)
                int argc = 5;
                PARA_INFO_LOG("clock diff count: %d\n", st->clock_diff_count);
                st->clock_diff_count--;
-               client_open(argc, argv, &st->ct, NULL);
+               client_open(argc, argv, &st->ct, NULL, NULL, st->btrn);
                set_stat_task_restart_barrier(2);
                sched_min_delay(s);
 
        } else {
                char *argv[] = {"audiod", "--", "stat", "-p", NULL};
                int argc = 4;
-               client_open(argc, argv, &st->ct, NULL);
+               client_open(argc, argv, &st->ct, NULL, NULL, st->btrn);
                set_stat_task_restart_barrier(5);
                sched_min_delay(s);
        }
@@ -1204,10 +1222,13 @@ static void init_status_task(struct status_task *st)
 {
        memset(st, 0, sizeof(struct status_task));
        st->task.pre_select = status_pre_select;
+       st->task.post_select = status_post_select;
        st->sa_time_diff_sign = 1;
        st->clock_diff_count = conf.clock_diff_count_arg;
        st->current_audio_format_num = -1;
-       sprintf(st->task.status, "status task");
+       sprintf(st->task.status, "stat");
+       st->btrn = btr_new_node(&(struct btr_node_description)
+               EMBRACE(.name = "stat"));
 }
 
 static void set_initial_status(void)
index 7180149ee49ade68b6310bb2f596562cb8ff08a8..1ea68917cd5438cffc4d64a5d3292d43fcd7da3a 100644 (file)
@@ -171,9 +171,9 @@ static void btr_pool_deallocate(struct btr_pool *btrp, size_t size)
 /*
        (parent, child):
        (NULL, NULL): new, isolated node.
-       (NULL, c): new node becomes root, c must be old root
        (p, NULL): new leaf node
-       (p, c): new internal node, ch must be child of p
+       (NULL, c): new node becomes root, c must be old root
+       (p, c): new internal node, ch must be child of p, not yet implemented
 
 */
 struct btr_node *btr_new_node(struct btr_node_description *bnd)
@@ -186,14 +186,31 @@ struct btr_node *btr_new_node(struct btr_node_description *bnd)
        btrn->context = bnd->context;
        btrn->start.tv_sec = 0;
        btrn->start.tv_usec = 0;
-       if (bnd->parent)
-               list_add_tail(&btrn->node, &bnd->parent->children);
        INIT_LIST_HEAD(&btrn->children);
        INIT_LIST_HEAD(&btrn->input_queue);
-       if (bnd->parent)
-               PARA_INFO_LOG("added %s as child of %s\n", bnd->name, bnd->parent->name);
-       else
-               PARA_INFO_LOG("added %s as btr root\n", bnd->name);
+       if (!bnd->child) {
+               if (bnd->parent) {
+                       list_add_tail(&btrn->node, &bnd->parent->children);
+                       PARA_INFO_LOG("new leaf node: %s (child of %s)\n",
+                               bnd->name, bnd->parent->name);
+               } else
+                       PARA_INFO_LOG("added %s as btr root\n", bnd->name);
+               goto out;
+       }
+       if (!bnd->parent) {
+               assert(!bnd->child->parent);
+               PARA_INFO_LOG("new root: %s (was %s)\n",
+                       bnd->name, bnd->child->name);
+               btrn->parent = NULL;
+               list_add_tail(&bnd->child->node, &btrn->children);
+               /* link it in */
+               bnd->child->parent = btrn;
+               goto out;
+       }
+       PARA_EMERG_LOG("inserting internal nodes not yet supported.\n");
+       exit(EXIT_FAILURE);
+       assert(bnd->child->parent == bnd->parent);
+out:
        return btrn;
 }
 
index d91d43648e6e69ad6e4d54f98c6d68eecbc68ec2..abba7472d6407959ac2f09b59e8b43ba19b0dcda 100644 (file)
@@ -113,6 +113,7 @@ enum btr_node_type {
 struct btr_node_description {
        const char *name;
        struct btr_node *parent;
+       struct btr_node *child;
        btr_command_handler handler;
        void *context;
 };
index 6ea5e7e1b1d2601cb84c3c532fab85cd6f32481b..8cae657ba60ecbb344bdfd6156ed7c1567cac3ba 100644 (file)
--- a/client.c
+++ b/client.c
@@ -19,6 +19,7 @@
 #include "stdin.h"
 #include "stdout.h"
 #include "client.h"
+#include "buffer_tree.h"
 #include "error.h"
 
 INIT_CLIENT_ERRLISTS;
@@ -35,19 +36,12 @@ static void supervisor_post_select(__a_unused struct sched *s, struct task *t)
        }
        if (ct->status == CL_SENDING) {
                stdin_set_defaults(&sit);
-               sit.buf = para_malloc(sit.bufsize),
                register_task(&sit.task);
-               ct->inbuf = sit.buf;
-               ct->in_loaded = &sit.loaded;
-               ct->in_error = &sit.task.error;
                t->error = -E_TASK_STARTED;
                return;
        }
        if (ct->status == CL_RECEIVING) {
                stdout_set_defaults(&sot);
-               sot.bufp = &ct->buf;
-               sot.loaded = &ct->loaded;
-               sot.input_error = &ct->task.error;
                register_task(&sot.task);
                t->error = -E_TASK_STARTED;
                return;
@@ -59,10 +53,9 @@ static struct task svt = {
        .status = "supervisor task"
 };
 
-static int client_loglevel; /* loglevel */
+static int client_loglevel = LL_ERROR; /* loglevel */
 INIT_STDERR_LOGGING(client_loglevel);
 
-
 /**
  * The client program to connect to para_server.
  *
@@ -90,13 +83,28 @@ int main(int argc, char *argv[])
        init_random_seed_or_die();
        s.default_timeout.tv_sec = 1;
        s.default_timeout.tv_usec = 0;
-       ret = client_open(argc, argv, &ct, &client_loglevel);
-       if (ret < 0) /* can not use PARA_LOG here because ct is NULL */
-               exit(EXIT_FAILURE);
+       /*
+        * We add buffer tree nodes for stdin and stdout even though
+        * only one of them will be needed. This simplifies the code
+        * a bit wrt. to the buffer tree setup.
+        */
+       sit.btrn = btr_new_node(&(struct btr_node_description)
+               EMBRACE(.name = "stdin"));
+       ret = client_open(argc, argv, &ct, &client_loglevel, sit.btrn, NULL);
+       if (ret < 0)
+               goto out;
+       sot.btrn = btr_new_node(&(struct btr_node_description)
+               EMBRACE(.name = "stdout", .parent = ct->btrn));
        register_task(&svt);
        ret = schedule(&s);
-       if (ret < 0)
-               PARA_ERROR_LOG("%s\n", para_strerror(-ret));
+out:
        client_close(ct);
-       return ret >= 0? EXIT_SUCCESS: EXIT_FAILURE;
+       btr_free_node(sit.btrn);
+       btr_free_node(sot.btrn);
+       if (ret < 0) {
+               /* can not use PARA_LOG here because ct is NULL */
+               fprintf(stderr, "%s\n", para_strerror(-ret));
+               return EXIT_FAILURE;
+       }
+       return EXIT_SUCCESS;
 }
index 98dc32db5fdd547db412836e2f0925436064c3e7..8b0447aedecfee02b28b554d1592c89fd38ffb1d 100644 (file)
--- a/client.h
+++ b/client.h
@@ -16,8 +16,6 @@ enum {
        CL_RECEIVED_WELCOME,
        /** Client sends the authentification request. */
        CL_SENT_AUTH,
-       /** Server sends a challenge. */
-       CL_RECEIVED_CHALLENGE,
        /** Client solves the challenge and sends the result. */
        CL_SENT_CH_RESPONSE,
        /** Server accepts this authentication. */
@@ -30,9 +28,6 @@ enum {
        CL_RECEIVING,
 };
 
-/** The size of the receiving buffer. */
-#define CLIENT_BUFSIZE 8192
-
 /** Data specific to a client task. */
 struct client_task {
        /** The state of the connection. */
@@ -49,22 +44,9 @@ struct client_task {
        char *user;
        /** The client task structure. */
        struct task task;
-       /** The buffer used for handshake and receiving. */
-       char *buf;
-       /** Number of bytes loaded in \a buf. */
-       size_t loaded;
-       /** Non-zero if the pre_select hook added \a fd to the read fd set. */
-       int check_r;
-       /** Non-zero if the pre_select hook added \a fd to the write fd set. */
-       int check_w;
-       /** Pointer to the data to be sent to para_server. */
-       char *inbuf;
-       /** Number of bytes loaded in \a inbuf. */
-       size_t *in_loaded;
-       /** Non-zero if input task encountered an eof or an error condition. */
-       int *in_error;
+       struct btr_node *btrn;
 };
 
 void client_close(struct client_task *ct);
-int client_open(int argc, char *argv[], struct client_task **ct,
-               int *loglevel);
+int client_open(int argc, char *argv[], struct client_task **ct_ptr,
+               int *loglevel, struct btr_node *parent, struct btr_node *child);
index b6bfde9a37f81143f9b2dec7e357907aa705c15f..2a93d488ed1141fcbac9be8265491002f3fc9455 100644 (file)
 #include "client.cmdline.h"
 #include "client.h"
 #include "hash.h"
+#include "buffer_tree.h"
+
+/** The size of the receiving buffer. */
+#define CLIENT_BUFSIZE 4000
 
 /**
  * Close the connection to para_server and free all resources.
@@ -38,7 +42,6 @@ void client_close(struct client_task *ct)
                return;
        if (ct->rc4c.fd >= 0)
                close(ct->rc4c.fd);
-       free(ct->buf);
        free(ct->user);
        free(ct->config_file);
        free(ct->key_file);
@@ -62,10 +65,10 @@ void client_close(struct client_task *ct)
  */
 static void client_pre_select(struct sched *s, struct task *t)
 {
+       int ret;
        struct client_task *ct = container_of(t, struct client_task, task);
+       struct btr_node *btrn = ct->btrn;
 
-       ct->check_r = 0;
-       ct->check_w = 0;
        if (ct->rc4c.fd < 0)
                return;
        switch (ct->status) {
@@ -74,54 +77,46 @@ static void client_pre_select(struct sched *s, struct task *t)
        case CL_SENT_CH_RESPONSE:
        case CL_SENT_COMMAND:
                para_fd_set(ct->rc4c.fd, &s->rfds, &s->max_fileno);
-               ct->check_r = 1;
                return;
 
        case CL_RECEIVED_WELCOME:
-       case CL_RECEIVED_CHALLENGE:
        case CL_RECEIVED_PROCEED:
                para_fd_set(ct->rc4c.fd, &s->wfds, &s->max_fileno);
-               ct->check_w = 1;
                return;
 
        case CL_RECEIVING:
-               if (ct->loaded < CLIENT_BUFSIZE - 1) {
-                       para_fd_set(ct->rc4c.fd, &s->rfds, &s->max_fileno);
-                       ct->check_r = 1;
+               ret = btr_node_status(btrn, 0, BTR_NT_ROOT);
+               if (ret != 0) {
+                       if (ret < 0)
+                               sched_min_delay(s);
+                       else
+                               para_fd_set(ct->rc4c.fd, &s->rfds,
+                                       &s->max_fileno);
                }
                return;
        case CL_SENDING:
-               if (!ct->in_loaded) /* stdin task not yet started */
-                       return;
-               if (*ct->in_loaded) {
-                       PARA_INFO_LOG("loaded: %zd\n", *ct->in_loaded);
-                       para_fd_set(ct->rc4c.fd, &s->wfds, &s->max_fileno);
-                       ct->check_w = 1;
-               } else {
-                       if (*ct->in_error) {
-                               t->error = *ct->in_error;
-                               s->timeout.tv_sec = 0;
-                               s->timeout.tv_usec = 1;
-                       }
+               ret = btr_node_status(btrn, 0, BTR_NT_LEAF);
+               if (ret != 0) {
+                       if (ret < 0)
+                               sched_min_delay(s);
+                       else
+                               para_fd_set(ct->rc4c.fd, &s->wfds,
+                                       &s->max_fileno);
                }
                return;
        }
 }
 
-static ssize_t client_recv_buffer(struct client_task *ct)
+static ssize_t client_recv_buffer(struct client_task *ct, char *buf, size_t len)
 {
        ssize_t ret;
 
        if (ct->status < CL_SENT_CH_RESPONSE)
-               ret = recv_buffer(ct->rc4c.fd, ct->buf + ct->loaded,
-                       CLIENT_BUFSIZE - ct->loaded);
+               ret = recv_buffer(ct->rc4c.fd, buf, len);
        else
-               ret = rc4_recv_buffer(&ct->rc4c, ct->buf + ct->loaded,
-                       CLIENT_BUFSIZE - ct->loaded);
-       if (!ret)
+               ret = rc4_recv_buffer(&ct->rc4c, buf, len);
+       if (ret == 0)
                return -E_SERVER_EOF;
-       if (ret > 0)
-               ct->loaded += ret;
        return ret;
 }
 
@@ -141,61 +136,63 @@ static ssize_t client_recv_buffer(struct client_task *ct)
 static void client_post_select(struct sched *s, struct task *t)
 {
        struct client_task *ct = container_of(t, struct client_task, task);
+       struct btr_node *btrn = ct->btrn;
+       int ret = 0;
+       char buf[CLIENT_BUFSIZE];
 
        t->error = 0;
        if (ct->rc4c.fd < 0)
                return;
-       if (!ct->check_r && !ct->check_w)
-               return;
-       if (ct->check_r && !FD_ISSET(ct->rc4c.fd, &s->rfds))
-               return;
-       if (ct->check_w && !FD_ISSET(ct->rc4c.fd, &s->wfds))
-               return;
        switch (ct->status) {
        case CL_CONNECTED: /* receive welcome message */
-               t->error = client_recv_buffer(ct);
-               if (t->error < 0)
+               if (!FD_ISSET(ct->rc4c.fd, &s->rfds))
+                       return;
+               ret = client_recv_buffer(ct, buf, sizeof(buf));
+               if (ret < 0)
                        goto err;
                ct->status = CL_RECEIVED_WELCOME;
                return;
        case CL_RECEIVED_WELCOME: /* send auth command */
-               sprintf(ct->buf, AUTH_REQUEST_MSG "%s", ct->user);
-               PARA_INFO_LOG("--> %s\n", ct->buf);
-               t->error = send_buffer(ct->rc4c.fd, ct->buf);
-               if (t->error < 0)
+               sprintf(buf, AUTH_REQUEST_MSG "%s", ct->user);
+               PARA_INFO_LOG("--> %s\n", buf);
+               if (!FD_ISSET(ct->rc4c.fd, &s->wfds))
+                       return;
+               ret = send_buffer(ct->rc4c.fd, buf);
+               if (ret < 0)
                        goto err;
                ct->status = CL_SENT_AUTH;
                return;
-       case CL_SENT_AUTH: /* receive challenge and rc4 keys */
-               ct->loaded = 0;
-               t->error = client_recv_buffer(ct);
-               if (t->error < 0)
-                       goto err;
-               ct->loaded = t->error;
-               PARA_INFO_LOG("<-- [challenge] (%zu bytes)\n", ct->loaded);
-               ct->status = CL_RECEIVED_CHALLENGE;
-               return;
-       case CL_RECEIVED_CHALLENGE:
+       case CL_SENT_AUTH:
+               /*
+                * Receive challenge and rc4 keys, decrypt the challenge and
+                * send back the hash of the decrypted challenge.
+                */
                {
                /* decrypted challenge/rc4 buffer */
                unsigned char crypt_buf[1024];
                /* the SHA1 of the decrypted challenge */
                unsigned char challenge_sha1[HASH_SIZE];
 
-               t->error = para_decrypt_buffer(ct->key_file, crypt_buf,
-                       (unsigned char *)ct->buf, ct->loaded);
-               if (t->error < 0)
+               if (!FD_ISSET(ct->rc4c.fd, &s->rfds))
+                       return;
+               ret = client_recv_buffer(ct, buf, sizeof(buf));
+               if (ret < 0)
+                       goto err;
+               PARA_INFO_LOG("<-- [challenge] (%zu bytes)\n", ret);
+               ret = para_decrypt_buffer(ct->key_file, crypt_buf,
+                       (unsigned char *)buf, ret);
+               if (ret < 0)
                        goto err;
                sha1_hash((char *)crypt_buf, CHALLENGE_SIZE, challenge_sha1);
                RC4_set_key(&ct->rc4c.send_key, RC4_KEY_LEN,
                        crypt_buf + CHALLENGE_SIZE);
                RC4_set_key(&ct->rc4c.recv_key, RC4_KEY_LEN,
                        crypt_buf + CHALLENGE_SIZE + RC4_KEY_LEN);
-               hash_to_asc(challenge_sha1, ct->buf);
-               PARA_INFO_LOG("--> %s\n", ct->buf);
-               t->error = send_bin_buffer(ct->rc4c.fd, (char *)challenge_sha1,
+               hash_to_asc(challenge_sha1, buf);
+               PARA_INFO_LOG("--> %s\n", buf);
+               ret = send_bin_buffer(ct->rc4c.fd, (char *)challenge_sha1,
                        HASH_SIZE);
-               if (t->error < 0)
+               if (ret < 0)
                        goto err;
                ct->status = CL_SENT_CH_RESPONSE;
                return;
@@ -203,25 +200,27 @@ static void client_post_select(struct sched *s, struct task *t)
        case CL_SENT_CH_RESPONSE: /* read server response */
                {
                size_t bytes_received;
-               ct->loaded = 0;
-               t->error = client_recv_buffer(ct);
-               if (t->error < 0)
+               if (!FD_ISSET(ct->rc4c.fd, &s->rfds))
+                       return;
+               ret = client_recv_buffer(ct, buf, sizeof(buf));
+               if (ret < 0)
                        goto err;
-               bytes_received = t->error;
+               bytes_received = ret;
                /* check if server has sent "Proceed" message */
-               t->error = -E_CLIENT_AUTH;
+               ret = -E_CLIENT_AUTH;
                if (bytes_received < PROCEED_MSG_LEN)
                        goto err;
-               if (!strstr(ct->buf, PROCEED_MSG))
+               if (!strstr(buf, PROCEED_MSG))
                        goto err;
                ct->status = CL_RECEIVED_PROCEED;
-               t->error = 0;
                return;
                }
        case CL_RECEIVED_PROCEED: /* concat args and send command */
                {
                int i;
                char *command = NULL;
+               if (!FD_ISSET(ct->rc4c.fd, &s->wfds))
+                       return;
                for (i = 0; i < ct->conf.inputs_num; i++) {
                        char *tmp = command;
                        command = make_message("%s\n%s", command?
@@ -230,40 +229,80 @@ static void client_post_select(struct sched *s, struct task *t)
                }
                command = para_strcat(command, EOC_MSG "\n");
                PARA_DEBUG_LOG("--> %s\n", command);
-               t->error = rc4_send_buffer(&ct->rc4c, command);
+               ret = rc4_send_buffer(&ct->rc4c, command);
                free(command);
-               if (t->error < 0)
+               if (ret < 0)
                        goto err;
                ct->status = CL_SENT_COMMAND;
                return;
                }
        case CL_SENT_COMMAND:
-               ct->loaded = 0;
-               t->error = client_recv_buffer(ct);
-               if (t->error < 0)
+               {
+               char *buf2;
+               if (!FD_ISSET(ct->rc4c.fd, &s->rfds))
+                       return;
+               /* can not use "buf" here because we need a malloced buffer */
+               buf2 = para_malloc(CLIENT_BUFSIZE);
+               ret = client_recv_buffer(ct, buf2, CLIENT_BUFSIZE);
+               if (ret < 0) {
+                       free(buf2);
                        goto err;
-               if (strstr(ct->buf, AWAITING_DATA_MSG))
+               }
+               if (strstr(buf2, AWAITING_DATA_MSG)) {
+                       free(buf2);
                        ct->status = CL_SENDING;
-               else
-                       ct->status = CL_RECEIVING;
+                       return;
+               }
+               ct->status = CL_RECEIVING;
+               btr_add_output(buf2, ret, btrn);
                return;
+               }
        case CL_SENDING:
-               PARA_INFO_LOG("loaded: %zd\n", *ct->in_loaded);
-               t->error = rc4_send_bin_buffer(&ct->rc4c, ct->inbuf,
-                       *ct->in_loaded);
-               if (t->error < 0)
+               {
+               char *buf2;
+               size_t sz;
+               ret = btr_node_status(btrn, 0, BTR_NT_LEAF);
+               if (ret < 0)
                        goto err;
-               *ct->in_loaded = 0;
+               if (ret == 0)
+                       return;
+               if (!FD_ISSET(ct->rc4c.fd, &s->wfds))
+                       return;
+               sz = btr_next_buffer(btrn, &buf2);
+               ret = rc4_send_bin_buffer(&ct->rc4c, buf2, sz);
+               if (ret < 0)
+                       goto err;
+               btr_consume(btrn, sz);
                return;
+               }
        case CL_RECEIVING:
-               t->error = client_recv_buffer(ct);
-               if (t->error < 0)
+               {
+               char *buf2;
+               buf2 = para_malloc(CLIENT_BUFSIZE);
+               ret = btr_node_status(btrn, 0, BTR_NT_ROOT);
+               if (ret < 0)
+                       goto err;
+               if (ret == 0)
+                       return;
+               if (!FD_ISSET(ct->rc4c.fd, &s->rfds))
+                       return;
+               ret = client_recv_buffer(ct, buf2, CLIENT_BUFSIZE);
+               if (ret < 0) {
+                       free(buf2);
                        goto err;
+               }
+               buf2 = para_realloc(buf2, ret);
+               btr_add_output(buf2, ret, btrn);
                return;
+               }
        }
 err:
-       if (t->error != -E_SERVER_EOF)
-               PARA_ERROR_LOG("%s\n", para_strerror(-t->error));
+       t->error = ret;
+       if (ret < 0) {
+               if (ret != -E_SERVER_EOF && ret != -E_BTR_EOF)
+                       PARA_ERROR_LOG("%s\n", para_strerror(-t->error));
+               btr_remove_node(btrn);
+       }
 }
 
 /* connect to para_server and register the client task */
@@ -308,13 +347,14 @@ err_out:
  * \return Standard.
  */
 int client_open(int argc, char *argv[], struct client_task **ct_ptr,
-               int *loglevel)
+               int *loglevel, struct btr_node *parent, struct btr_node *child)
 {
        char *home = para_homedir();
        int ret;
        struct client_task *ct = para_calloc(sizeof(struct client_task));
 
-       ct->buf = para_malloc(CLIENT_BUFSIZE);
+       ct->btrn = btr_new_node(&(struct btr_node_description)
+               EMBRACE(.name = "client", .parent = parent, .child = child));
        *ct_ptr = ct;
        ct->rc4c.fd = -1;
        ret = -E_CLIENT_SYNTAX;
@@ -364,6 +404,7 @@ out:
        free(home);
        if (ret < 0) {
                PARA_ERROR_LOG("%s\n", para_strerror(-ret));
+               btr_free_node(ct->btrn);
                client_close(ct);
                *ct_ptr = NULL;
        }
diff --git a/sched.c b/sched.c
index 14346ec0e1c411ab08da2ddbdc85073d6945615b..e58483e9385e5092c094ac0f2b30b06c905e6627 100644 (file)
--- a/sched.c
+++ b/sched.c
@@ -269,3 +269,14 @@ void sched_request_timeout_ms(long unsigned ms, struct sched *s)
        ms2tv(ms, &tv);
        sched_request_timeout(&tv, s);
 }
+
+void sched_request_barrier(struct timeval *barrier, struct sched *s)
+{
+       struct timeval diff;
+
+       if (tv_diff(now, barrier, &diff) > 0)
+               return;
+       sched_request_timeout(&diff, s);
+}
+
+
diff --git a/sched.h b/sched.h
index f119025c4c73f2d6b811f8220e6cec4c2449cd0d..27c723919c21c429e327f0a6eb7effa45ea0a219 100644 (file)
--- a/sched.h
+++ b/sched.h
@@ -82,3 +82,4 @@ void sched_shutdown(void);
 void sched_min_delay(struct sched *s);
 void sched_request_timeout(struct timeval *timeout, struct sched *s);
 void sched_request_timeout_ms(long unsigned ms, struct sched *s);
+void sched_request_barrier(struct timeval *barrier, struct sched *s);