]> git.tue.mpg.de Git - paraslash.git/commitdiff
Allow addblob commands to create output.
authorAndre Noll <maan@systemlinux.org>
Tue, 24 Jul 2012 06:14:12 +0000 (08:14 +0200)
committerAndre Noll <maan@systemlinux.org>
Wed, 3 Apr 2013 22:37:26 +0000 (22:37 +0000)
Currently all addblob commands remain silent even on fatal errors
because para_client either reads the command output, or sends its
stdin, but never both.

This patch overcomes this shortcoming. It makes para_client create
two buffer trees instead of one. The client states CL_SENT_COMMAND
and CL_RECEIVING are replaced by a single CL_EXECUTING state.

Blob data is now sent as a sideband packet.  The change breaks
compatibility with earlier 0.4.x versions again, but that's OK as
this is 0.5.0 material anyway.

audiod.c
blob.c
client.c
client.h
client_common.c
sideband.h

index 4f2d4151b58cf62e980c4f61c94c8ca1e1923297..b2d5c8db452167f9a265ca2c899bebbff630456e 100644 (file)
--- a/audiod.c
+++ b/audiod.c
@@ -1200,7 +1200,7 @@ static void status_post_select(struct sched *s, struct task *t)
                        close_stat_pipe();
                        goto out;
                }
-               if (st->ct->status != CL_RECEIVING)
+               if (st->ct->status != CL_EXECUTING)
                        goto out;
                ret = btr_node_status(st->btrn, st->min_iqs, BTR_NT_LEAF);
                if (ret <= 0) {
diff --git a/blob.c b/blob.c
index a6ae1acf61237aaa31b18035739acefd80c52fc1..373d8d005cc1ea9e232f47f76e0c3794cd018538 100644 (file)
--- a/blob.c
+++ b/blob.c
@@ -311,12 +311,12 @@ static int com_rmblob(callback_function *f, struct command_context *cc)
                afs_cb_result_handler, cc);
 }
 
-static void com_addblob_callback(struct osl_table *table, __a_unused int fd,
+static void com_addblob_callback(struct osl_table *table, int fd,
                const struct osl_object *query)
 {
        struct osl_object objs[NUM_BLOB_COLUMNS];
-       char *name = query->data;
-       size_t name_len = strlen(name) + 1;
+       char *name = query->data, *msg;
+       size_t name_len = strlen(name) + 1, msg_len;
        uint32_t id;
        unsigned num_rows;
        int ret;
@@ -344,6 +344,10 @@ static void com_addblob_callback(struct osl_table *table, __a_unused int fd,
                if (ret < 0 && ret != -OSL_ERRNO_TO_PARA_ERROR(E_OSL_RB_KEY_NOT_FOUND))
                        goto out;
                if (ret >= 0) { /* we already have a blob with this name */
+                       ret = osl(osl_get_object(table, row, BLOBCOL_ID, &obj));
+                       if (ret < 0)
+                               goto out;
+                       id = *(uint32_t *)obj.data;
                        obj.data = name + name_len;
                        obj.size = query->size - name_len;
                        ret = osl(osl_update_object(table, row, BLOBCOL_DEF, &obj));
@@ -377,38 +381,48 @@ static void com_addblob_callback(struct osl_table *table, __a_unused int fd,
        afs_event(BLOB_ADD, NULL, table);
 out:
        if (ret < 0)
-               PARA_NOTICE_LOG("%s\n", para_strerror(-ret));
+               msg_len = xasprintf(&msg, "could not add %s: %s\n", name,
+                       para_strerror(-ret));
+       else
+               msg_len = xasprintf(&msg, "added %s as id %u\n", name, id);
+       pass_buffer_as_shm(fd, SBD_OUTPUT, msg, msg_len);
+       free(msg);
 }
 
-/*
- * write input from fd to dynamically allocated buffer,
- * but maximal 10M.
- */
+/* Write input from fd to dynamically allocated buffer, but maximal 10M. */
 static int fd2buf(struct stream_cipher_context *scc, struct osl_object *obj)
 {
-       const size_t chunk_size = 1024, max_size = 10 * 1024 * 1024;
-       size_t size = 2048, received = 0;
+       size_t max_size = 10 * 1024 * 1024;
        int ret;
-       char *buf = para_malloc(size);
+       struct iovec iov;
 
-       for (;;) {
-               ret = sc_recv_bin_buffer(scc, buf + received, chunk_size);
-               if (ret <= 0)
-                       break;
-               received += ret;
-               if (received + chunk_size >= size) {
-                       size *= 2;
-                       ret = -E_INPUT_TOO_LARGE;
-                       if (size > max_size)
-                               break;
-                       buf = para_realloc(buf, size);
-               }
+       obj->data = NULL;
+       obj->size = 0;
+again:
+       do {
+               ret = recv_sb(scc, SBD_BLOB_DATA, max_size, &iov);
+       } while (ret == 0);
+
+       if (ret < 0) {
+               free(obj->data);
+               obj->data = NULL;
+               obj->size = 0;
+               return ret;
        }
-       obj->data = buf;
-       obj->size = received;
-       if (ret < 0)
-               free(buf);
-       return ret;
+       if (iov.iov_len == 0) /* end of blob */
+               return 1;
+       if (!obj->data) {
+               obj->data = iov.iov_base;
+               obj->size = iov.iov_len;
+       } else {
+               obj->data = para_realloc(obj->data, obj->size + iov.iov_len);
+               memcpy(obj->data + obj->size, iov.iov_base, iov.iov_len);
+               obj->size += iov.iov_len;
+               free(iov.iov_base);
+               max_size -= iov.iov_len;
+       }
+       goto again;
+       return 1;
 }
 
 /*
@@ -465,7 +479,7 @@ static int com_addblob(callback_function *f, struct command_context *cc)
                return -E_BLOB_SYNTAX;
        arg_obj.size = strlen(cc->argv[1]) + 1;
        arg_obj.data = (char *)cc->argv[1];
-       return stdin_command(cc, &arg_obj, f, NULL, NULL);
+       return stdin_command(cc, &arg_obj, f, afs_cb_result_handler, cc);
 }
 
 /* FIXME: Print output to client, not to log file */
index 715df9c6a3aa8b107ef4bcfdffaad9a0c1abea2f..413d08ccbf9269a62a033886ce8254a74b7e5145 100644 (file)
--- a/client.c
+++ b/client.c
@@ -455,7 +455,7 @@ static int client_i9e_line_handler(char *line)
        ret = client_connect(ct, &sched, NULL, NULL);
        if (ret < 0)
                return ret;
-       i9e_attach_to_stdout(ct->btrn);
+       i9e_attach_to_stdout(ct->btrn[0]);
        return 1;
 }
 
@@ -537,16 +537,13 @@ static void supervisor_post_select(struct sched *s, struct task *t)
                t->error = ct->task.error;
                return;
        }
-       if (ct->status == CL_SENDING) {
+       if (ct->status == CL_EXECUTING) {
                stdin_set_defaults(&sit);
                register_task(s, &sit.task);
-               t->error = -E_TASK_STARTED;
-               return;
-       }
-       if (ct->status == CL_RECEIVING) {
                stdout_set_defaults(&sot);
                register_task(s, &sot.task);
-               t->error = -E_TASK_STARTED; return;
+               t->error = -E_TASK_STARTED;
+               return;
        }
 }
 
@@ -600,7 +597,7 @@ int main(int argc, char *argv[])
        if (ret < 0)
                goto out;
        sot.btrn = btr_new_node(&(struct btr_node_description)
-               EMBRACE(.name = "stdout", .parent = ct->btrn));
+               EMBRACE(.name = "stdout", .parent = ct->btrn[0]));
        register_task(&sched, &svt);
        ret = schedule(&sched);
        if (ret >= 0 && ct->task.error < 0) {
index 0685f33e0a3ac2f06106adbacf151f0773840c3b..6e438f7ed15179da1817aea1c738712e229d7223 100644 (file)
--- a/client.h
+++ b/client.h
@@ -20,12 +20,10 @@ enum {
        CL_SENT_CH_RESPONSE,
        /** Server accepts this authentication. */
        CL_RECEIVED_PROCEED,
-       /** Client sends the command. */
-       CL_SENT_COMMAND,
-       /** Server expects data. */
+       /** Command is executing. */
+       CL_EXECUTING,
+       /** Server is expecting data (addblob commands only). */
        CL_SENDING,
-       /** Client expects data. */
-       CL_RECEIVING,
 };
 
 /** Data specific to a client task. */
@@ -34,8 +32,10 @@ struct client_task {
        int status;
        /** The file descriptor and the session keys. */
        struct stream_cipher_context scc;
-       /** The sideband context. */
-       struct sb_context *sbc;
+       /** The sideband contexts for receiving/sending. */
+       struct sb_context *sbc[2];
+       /** The buffer tree nodes for receiving/sending. */
+       struct btr_node *btrn[2];
        /** The hash value of the decrypted challenge. */
        unsigned char *challenge_hash;
        /** The configuration (including the command). */
@@ -48,8 +48,6 @@ struct client_task {
        char *user;
        /** The client task structure. */
        struct task task;
-       /** The buffer tree node of the client task. */
-       struct btr_node *btrn;
        /** List of features supported by the server. */
        char **features;
 };
index 38e596e3f7f84b4d780ee0da80b08957fd0754ff..3ff43d1b7bffd2c1c2438078d431122ea9f6942d 100644 (file)
@@ -49,7 +49,8 @@ void client_disconnect(struct client_task *ct)
        ct->scc.recv = NULL;
        sc_free(ct->scc.send);
        ct->scc.send = NULL;
-       btr_remove_node(&ct->btrn);
+       btr_remove_node(&ct->btrn[0]);
+       btr_remove_node(&ct->btrn[1]);
 }
 
 /**
@@ -69,7 +70,8 @@ void client_close(struct client_task *ct)
        free(ct->key_file);
        client_cmdline_parser_free(&ct->conf);
        free(ct->challenge_hash);
-       sb_free(ct->sbc);
+       sb_free(ct->sbc[0]);
+       sb_free(ct->sbc[1]);
        free(ct);
 }
 
@@ -91,7 +93,6 @@ static void client_pre_select(struct sched *s, struct task *t)
 {
        int ret;
        struct client_task *ct = container_of(t, struct client_task, task);
-       struct btr_node *btrn = ct->btrn;
 
        if (ct->scc.fd < 0)
                return;
@@ -99,7 +100,6 @@ static void client_pre_select(struct sched *s, struct task *t)
        case CL_CONNECTED:
        case CL_SENT_AUTH:
        case CL_SENT_CH_RESPONSE:
-       case CL_SENT_COMMAND:
                para_fd_set(ct->scc.fd, &s->rfds, &s->max_fileno);
                return;
 
@@ -109,51 +109,49 @@ static void client_pre_select(struct sched *s, struct task *t)
                para_fd_set(ct->scc.fd, &s->wfds, &s->max_fileno);
                return;
 
-       case CL_RECEIVING:
-               ret = btr_node_status(btrn, 0, BTR_NT_ROOT);
-               if (ret != 0) {
+       case CL_SENDING:
+               if (ct->btrn[1]) {
+                       ret = btr_node_status(ct->btrn[1], 0, BTR_NT_LEAF);
                        if (ret < 0)
                                sched_min_delay(s);
-                       else
-                               para_fd_set(ct->scc.fd, &s->rfds,
-                                       &s->max_fileno);
+                       else if (ret > 0)
+                               para_fd_set(ct->scc.fd, &s->wfds, &s->max_fileno);
                }
-               return;
-       case CL_SENDING:
-               ret = btr_node_status(btrn, 0, BTR_NT_LEAF);
-               if (ret != 0) {
+               /* fall though */
+       case CL_EXECUTING:
+               if (ct->btrn[0]) {
+                       ret = btr_node_status(ct->btrn[0], 0, BTR_NT_ROOT);
                        if (ret < 0)
                                sched_min_delay(s);
-                       else
-                               para_fd_set(ct->scc.fd, &s->wfds,
-                                       &s->max_fileno);
+                       else if (ret > 0)
+                               para_fd_set(ct->scc.fd, &s->rfds, &s->max_fileno);
                }
                return;
        }
 }
 
-static int send_sb(struct client_task *ct, void *buf, size_t numbytes,
+static int send_sb(struct client_task *ct, int channel, void *buf, size_t numbytes,
                enum sb_designator band, bool dont_free)
 {
        int ret, fd = ct->scc.fd;
        struct iovec iov[2];
 
-       if (!ct->sbc) {
+       if (!ct->sbc[channel]) {
                struct sb_buffer sbb;
                sb_transformation trafo = ct->status < CL_RECEIVED_PROCEED?
                        NULL : sc_trafo;
                sbb = (typeof(sbb))SBB_INIT(band, buf, numbytes);
-               ct->sbc = sb_new_send(&sbb, dont_free, trafo, ct->scc.send);
+               ct->sbc[channel] = sb_new_send(&sbb, dont_free, trafo, ct->scc.send);
        }
-       ret = sb_get_send_buffers(ct->sbc, iov);
+       ret = sb_get_send_buffers(ct->sbc[channel], iov);
        ret = xwritev(fd, iov, ret);
        if (ret < 0) {
-               sb_free(ct->sbc);
-               ct->sbc = NULL;
+               sb_free(ct->sbc[channel]);
+               ct->sbc[channel] = NULL;
                return ret;
        }
-       if (sb_sent(ct->sbc, ret)) {
-               ct->sbc = NULL;
+       if (sb_sent(ct->sbc[channel], ret)) {
+               ct->sbc[channel] = NULL;
                return 1;
        }
        return 0;
@@ -176,21 +174,21 @@ static int recv_sb(struct client_task *ct, fd_set *rfds,
                trafo = sc_trafo;
                trafo_context = ct->scc.recv;
        }
-       if (!ct->sbc)
-               ct->sbc = sb_new_recv(0, trafo, trafo_context);
+       if (!ct->sbc[0])
+               ct->sbc[0] = sb_new_recv(0, trafo, trafo_context);
 again:
-       sb_get_recv_buffer(ct->sbc, &iov);
+       sb_get_recv_buffer(ct->sbc[0], &iov);
        ret = read_nonblock(ct->scc.fd, iov.iov_base, iov.iov_len, rfds, &n);
        if (ret < 0) {
-               sb_free(ct->sbc);
-               ct->sbc = NULL;
+               sb_free(ct->sbc[0]);
+               ct->sbc[0] = NULL;
                return ret;
        }
        if (n == 0)
                return 0;
-       if (!sb_received(ct->sbc, n, result))
+       if (!sb_received(ct->sbc[0], n, result))
                goto again;
-       ct->sbc = NULL;
+       ct->sbc[0] = NULL;
        return 1;
 }
 
@@ -226,10 +224,14 @@ static int dispatch_sbb(struct client_task *ct, struct sb_buffer *sbb)
                PARA_DEBUG_LOG("band: %s\n", designator[sbb->band]);
 
        switch (sbb->band) {
+       case SBD_AWAITING_DATA:
+               ct->status = CL_SENDING;
+               ret = 1;
+               goto out;
        case SBD_OUTPUT:
                if (iov_valid(&sbb->iov))
                        btr_add_output(sbb->iov.iov_base, sbb->iov.iov_len,
-                               ct->btrn);
+                               ct->btrn[0]);
                ret = 1;
                goto out;
        case SBD_DEBUG_LOG:
@@ -274,8 +276,8 @@ static int send_sb_command(struct client_task *ct)
        char *command, *p;
        size_t len = 0;
 
-       if (ct->sbc)
-               return send_sb(ct, NULL, 0, 0, false);
+       if (ct->sbc[1])
+               return send_sb(ct, 0, NULL, 0, 0, false);
 
        for (i = 0; i < ct->conf.inputs_num; i++)
                len += strlen(ct->conf.inputs[i]) + 1;
@@ -285,7 +287,7 @@ static int send_sb_command(struct client_task *ct)
                p += strlen(ct->conf.inputs[i]) + 1;
        }
        PARA_DEBUG_LOG("--> %s\n", command);
-       return send_sb(ct, command, len, SBD_COMMAND, false);
+       return send_sb(ct, 0, command, len, SBD_COMMAND, false);
 }
 
 /**
@@ -304,7 +306,6 @@ static int send_sb_command(struct client_task *ct)
 static void client_post_select(struct sched *s, struct task *t)
 {
        struct client_task *ct = container_of(t, struct client_task, task);
-       struct btr_node *btrn = ct->btrn;
        int ret = 0;
        size_t n;
        char buf[CLIENT_BUFSIZE];
@@ -373,7 +374,7 @@ static void client_post_select(struct sched *s, struct task *t)
                return;
                }
        case CL_RECEIVED_CHALLENGE:
-               ret = send_sb(ct, ct->challenge_hash, HASH_SIZE,
+               ret = send_sb(ct, 0, ct->challenge_hash, HASH_SIZE,
                        SBD_CHALLENGE_RESPONSE, false);
                if (ret != 0)
                        ct->challenge_hash = NULL;
@@ -401,66 +402,73 @@ static void client_post_select(struct sched *s, struct task *t)
                ret = send_sb_command(ct);
                if (ret <= 0)
                        goto out;
-               ct->status = CL_SENT_COMMAND;
+               ct->status = CL_EXECUTING;
                return;
                }
-       case CL_SENT_COMMAND:
-               {
-               struct sb_buffer sbb;
-               ret = recv_sb(ct, &s->rfds, &sbb);
-               if (ret <= 0)
-                       goto out;
-               if (sbb.band == SBD_AWAITING_DATA) {
-                       ct->status = CL_SENDING;
-                       free(sbb.iov.iov_base);
-                       goto out;
-               }
-               ct->status = CL_RECEIVING;
-               ret = dispatch_sbb(ct, &sbb);
-               goto out;
-               }
        case CL_SENDING:
-               {
-               char *buf2;
-               size_t sz;
-               ret = btr_node_status(btrn, 0, BTR_NT_LEAF);
-               if (ret < 0)
-                       goto out;
-               if (ret == 0)
-                       return;
-               if (!FD_ISSET(ct->scc.fd, &s->wfds))
-                       return;
-               sz = btr_next_buffer(btrn, &buf2);
-               ret = sc_send_bin_buffer(&ct->scc, buf2, sz);
-               if (ret < 0)
-                       goto out;
-               btr_consume(btrn, sz);
-               return;
+               if (ct->btrn[1]) {
+                       char *buf2;
+                       size_t sz;
+                       ret = btr_node_status(ct->btrn[1], 0, BTR_NT_LEAF);
+                       if (ret == -E_BTR_EOF) {
+                               /* empty blob data packet indicates EOF */
+                               PARA_INFO_LOG("blob sent\n");
+                               ret = send_sb(ct, 1, NULL, 0, SBD_BLOB_DATA, true);
+                               if (ret >= 0)
+                                       ret = -E_BTR_EOF;
+                       }
+                       if (ret < 0)
+                               goto close1;
+                       if (ret > 0 && FD_ISSET(ct->scc.fd, &s->wfds)) {
+                               sz = btr_next_buffer(ct->btrn[1], &buf2);
+                               assert(sz);
+                               ret = send_sb(ct, 1, buf2, sz, SBD_BLOB_DATA, true);
+                               if (ret < 0)
+                                       goto close1;
+                               if (ret > 0)
+                                       btr_consume(ct->btrn[1], sz);
+                       }
                }
-       case CL_RECEIVING:
-               {
-               struct sb_buffer sbb;
-               ret = btr_node_status(btrn, 0, BTR_NT_ROOT);
-               if (ret < 0)
-                       goto out;
-               if (ret == 0)
-                       return;
-               /*
-                * The FD_ISSET() is not strictly necessary, but is allows us
-                * to skip the malloc below if there is nothing to read anyway.
-                */
-               if (!FD_ISSET(ct->scc.fd, &s->rfds))
-                       return;
-               ret = recv_sb(ct, &s->rfds, &sbb);
-               if (ret > 0)
-                       ret = dispatch_sbb(ct, &sbb);
-               goto out;
+               /* fall though */
+       case CL_EXECUTING:
+               if (ct->btrn[0]) {
+                       ret = btr_node_status(ct->btrn[0], 0, BTR_NT_ROOT);
+                       if (ret < 0)
+                               goto close0;
+                       if (ret > 0 && FD_ISSET(ct->scc.fd, &s->rfds)) {
+                               struct sb_buffer sbb;
+                               ret = recv_sb(ct, &s->rfds, &sbb);
+                               if (ret < 0)
+                                       goto close0;
+                               if (ret > 0) {
+                                       ret = dispatch_sbb(ct, &sbb);
+                                       if (ret < 0)
+                                               goto close0;
+                               }
+                       }
                }
+               ret = 0;
+               goto out;
        }
+close1:
+       PARA_INFO_LOG("channel 1: %s\n", para_strerror(-ret));
+       btr_remove_node(&ct->btrn[1]);
+       if (ct->btrn[0])
+               return;
+       goto out;
+close0:
+       PARA_INFO_LOG("channel 0: %s\n", para_strerror(-ret));
+       btr_remove_node(&ct->btrn[0]);
+       if (ct->btrn[1] && ct->status == CL_SENDING)
+               return;
 out:
+       if (ret >= 0)
+               return;
+       btr_remove_node(&ct->btrn[0]);
+       btr_remove_node(&ct->btrn[1]);
+       if (ret != -E_SERVER_CMD_SUCCESS && ret != -E_SERVER_CMD_FAILURE)
+               PARA_ERROR_LOG("%s\n", para_strerror(-ret));
        t->error = ret;
-       if (ret < 0)
-               btr_remove_node(&ct->btrn);
 }
 
 /**
@@ -493,8 +501,10 @@ int client_connect(struct client_task *ct, struct sched *s,
        if (ret < 0)
                goto err_out;
        ct->status = CL_CONNECTED;
-       ct->btrn = btr_new_node(&(struct btr_node_description)
-               EMBRACE(.name = "client", .parent = parent, .child = child));
+       ct->btrn[0] = btr_new_node(&(struct btr_node_description)
+               EMBRACE(.name = "client recv", .parent = NULL, .child = child));
+       ct->btrn[1] = btr_new_node(&(struct btr_node_description)
+               EMBRACE(.name = "client send", .parent = parent, .child = NULL));
        ct->task.pre_select = client_pre_select;
        ct->task.post_select = client_post_select;
        ct->task.error = 0;
index 50daaa608f9b7cb507f07ea394cc522d4ed7a6f6..20e195b364b3d8bffcce3df4cd2c48f9e6afe4a8 100644 (file)
@@ -58,6 +58,8 @@
        DESIGNATOR(EXIT__SUCCESS), \
        /* Command failed. */ \
        DESIGNATOR(EXIT__FAILURE), \
+       /* The next chunk of the blob (addblob commands only) */ \
+       DESIGNATOR(BLOB_DATA), \
 
 /** Just prefix with \p SBD_. */
 #define DESIGNATOR(x) SBD_ ## x