]> git.tue.mpg.de Git - paraslash.git/commitdiff
Basic infrastructure changes for FEC/DCCP support.
authorAndre Noll <maan@systemlinux.org>
Tue, 27 Apr 2010 19:43:19 +0000 (21:43 +0200)
committerAndre Noll <maan@systemlinux.org>
Mon, 7 Jun 2010 21:24:33 +0000 (23:24 +0200)
This patch allows to add a FEC client without specifying the FEC
parameters. Instead, these parameters are set at stream start time
via the new ->open() method of the senders.

It is desirable to defer ssetting the FEC parameters to allow dynamic
determination of the slice size. We need an established connection
to find out the best value for the slice size, but the UDP sender
currently has to fix the slice size at target add time, which is
too early.

The slice size for the UDP and the DCCP sender are currently hardcoded
values; this will be fixed by subsequent patches.

13 files changed:
aft.c
audiod.c
chunk_queue.c
command.c
dccp_send.c
http_send.c
send.h
send_common.c
server.c
server.h
udp_send.c
vss.c
vss.h

diff --git a/aft.c b/aft.c
index 25058e7c079d75be59d1d93cf495718740f5df20..eb0f7b57911ed49fdb19a6bee1638d84ef1288a2 100644 (file)
--- a/aft.c
+++ b/aft.c
@@ -21,7 +21,6 @@
 #include "afh.h"
 #include "afs.h"
 #include "net.h"
-#include "vss.h"
 #include "fd.h"
 #include "ipc.h"
 #include "portable_io.h"
index 778318cebc2b8958e26b378987a4842c615a237d..5c17e6eab1337d8b23584faa67f443cd4f11a200 100644 (file)
--- a/audiod.c
+++ b/audiod.c
@@ -863,7 +863,8 @@ static int init_default_filters(void)
                 * If udp is used to receive this audiod format, add fecdec as
                 * the first filter.
                 */
-               if (strcmp(afi[i].receiver->name, "udp") == 0) {
+               if (strcmp(afi[i].receiver->name, "udp") == 0 ||
+                               strcmp(afi[i].receiver->name, "dccp") == 0) {
                        tmp = para_strdup("fecdec");
                        add_filter(i, tmp);
                        free(tmp);
index 3f5ac1d929262c116f6c7822f67f0ceb8cf1e7f6..468b1dc2dd4828796d53e5804d6f28611b88c462 100644 (file)
@@ -11,7 +11,6 @@
 #include "para.h"
 #include "list.h"
 #include "afh.h"
-#include "vss.h"
 #include "string.h"
 #include "error.h"
 
index b7795a002a20f36349bd450522b99f746f3256ba..42fa3e7584e7f622b7137730edcbfb5f07cbfd0f 100644 (file)
--- a/command.c
+++ b/command.c
@@ -23,9 +23,9 @@
 #include "afh.h"
 #include "afs.h"
 #include "server.h"
-#include "vss.h"
 #include "list.h"
 #include "send.h"
+#include "vss.h"
 #include "rc4.h"
 #include "net.h"
 #include "daemon.h"
index 6248ae80ef784d5739298b61b9494e76d2e769e5..0e69a969055f1a8466eae06770f91d7ddcfef6db 100644 (file)
@@ -24,8 +24,8 @@
 #include "server.h"
 #include "net.h"
 #include "list.h"
-#include "vss.h"
 #include "send.h"
+#include "vss.h"
 #include "fd.h"
 #include "close_on_fork.h"
 #include "chunk_queue.h"
 #define DCCP_MAX_BYTES_PER_WRITE 1024
 
 static struct sender_status dccp_sender_status, *dss = &dccp_sender_status;
+static struct sender *self;
+
+
+struct dccp_fec_client {
+       struct fec_client_parms fcp;
+       struct fec_client *fc;
+       struct sender_client *sc;
+};
 
 static void dccp_pre_select(int *max_fileno, fd_set *rfds,
                __a_unused fd_set *wfds)
@@ -65,9 +73,47 @@ static int dccp_get_tx_ccid(int sockfd)
        return tx_ccid;
 }
 
+static void dccp_shutdown_client(struct sender_client *sc)
+{
+       struct dccp_fec_client *dfc = sc->private_data;
+
+       vss_del_fec_client(dfc->fc);
+       shutdown_client(sc, dss);
+}
+
+static void dccp_shutdown_clients(void)
+{
+       struct sender_client *sc, *tmp;
+
+       list_for_each_entry_safe(sc, tmp, &dss->client_list, node)
+               dccp_shutdown_client(sc);
+}
+
+static int dccp_open(void *client, struct fec_client_parms **fcp)
+{
+       struct dccp_fec_client *dfc = client;
+
+       dfc->fcp.slices_per_group = 4;
+       dfc->fcp.data_slices_per_group = 3;
+       dfc->fcp.max_slice_bytes = 1472;
+       *fcp = &dfc->fcp;
+       return 1;
+}
+
+static int dccp_send_fec(char *buf, size_t len, void *private_data)
+{
+       struct dccp_fec_client *dfc = private_data;
+       int ret = write_nonblock(dfc->sc->fd, buf, len, DCCP_MAX_BYTES_PER_WRITE);
+
+       if (ret < 0)
+               dccp_shutdown_client(dfc->sc);
+       return ret;
+}
+
 static void dccp_post_select(fd_set *rfds, __a_unused fd_set *wfds)
 {
        struct sender_client *sc;
+       struct dccp_fec_client *dfc;
        int tx_ccid;
 
        sc = accept_sender_client(dss, rfds);
@@ -87,26 +133,15 @@ static void dccp_post_select(fd_set *rfds, __a_unused fd_set *wfds)
         * from the client; by shutting down this unused communication path we can
         * reduce processing costs a bit. See analogous comment in dccp_recv.c.
         */
-       if (shutdown(sc->fd, SHUT_RD) >= 0)
+       if (shutdown(sc->fd, SHUT_RD) < 0) {
+               PARA_WARNING_LOG("%s\n", strerror(errno));
+               shutdown_client(sc, dss);
                return;
-       PARA_WARNING_LOG("%s\n", strerror(errno));
-       shutdown_client(sc, dss);
-}
-
-static void dccp_send(long unsigned current_chunk,
-               __a_unused long unsigned chunks_sent, const char *buf,
-               size_t len, const char *header_buf, size_t header_len)
-{
-       struct sender_client *sc, *tmp;
-
-       list_for_each_entry_safe(sc, tmp, &dss->client_list, node)
-               send_chunk(sc, dss, DCCP_MAX_BYTES_PER_WRITE, current_chunk, buf,
-                       len, header_buf, header_len);
-}
-
-static void dccp_shutdown_clients(void)
-{
-       shutdown_clients(dss);
+       }
+       dfc = para_calloc(sizeof(*dfc));
+       sc->private_data = dfc;
+       dfc->sc = sc;
+       vss_add_fec_client(self, dfc, &dfc->fc);
 }
 
 static int dccp_com_on(__a_unused struct sender_command_data *scd)
@@ -176,7 +211,9 @@ void dccp_send_init(struct sender *s)
        int ret;
 
        s->info = dccp_info;
-       s->send = dccp_send;
+       s->send = NULL;
+       s->open = dccp_open;
+       s->send_fec = dccp_send_fec;
        s->pre_select = dccp_pre_select;
        s->post_select = dccp_post_select;
        s->shutdown_clients = dccp_shutdown_clients;
@@ -194,4 +231,5 @@ void dccp_send_init(struct sender *s)
        ret = generic_com_on(dss, IPPROTO_DCCP);
        if (ret < 0)
                PARA_ERROR_LOG("%s\n", para_strerror(-ret));
+       self = s;
 }
index 424d63b2948c941b2ac28775ec4910b7ec9ff73c..2c918fc8a53ef56c85dd7ae77b04a1ed63953f64 100644 (file)
@@ -19,9 +19,9 @@
 #include "afs.h"
 #include "server.h"
 #include "http.h"
-#include "vss.h"
 #include "list.h"
 #include "send.h"
+#include "vss.h"
 #include "close_on_fork.h"
 #include "net.h"
 #include "fd.h"
diff --git a/send.h b/send.h
index acf62db4f80ca6e82b9c5db3d77e942211b20d4f..72c7a6c6c6313e15d8cd64ceb0997624ef0be52e 100644 (file)
--- a/send.h
+++ b/send.h
@@ -9,6 +9,28 @@
 /** The sender subcommands. */
 enum {SENDER_ADD, SENDER_DELETE, SENDER_ALLOW, SENDER_DENY, SENDER_ON, SENDER_OFF, NUM_SENDER_CMDS};
 
+/**
+ * Each paraslash sender may register arbitrary many clients to the virtual
+ * streaming system, possibly with varying fec parameters. In order to do so,
+ * it must allocate a \a fec_client_parms structure and pass it to \ref
+ * add_fec_client.
+ *
+ * Clients are automatically removed from that list by the vss if an error
+ * occurs, or if the sender requests deletion of a client by calling \ref
+ * vss_del_fec_client().
+ */
+struct fec_client;
+
+/** FEC parameters requested by FEC clients. */
+struct fec_client_parms {
+       /** Number of data slices plus redundant slices. */
+       uint8_t slices_per_group;
+       /** Number of slices minus number of redundant slices. */
+       uint8_t data_slices_per_group;
+       /** Maximal number of bytes per slice, initially zero. */
+       uint16_t max_slice_bytes;
+};
+
 /**
  * Describes one supported sender of para_server.
  *
@@ -47,10 +69,27 @@ struct sender {
         * only differ if the stream was repositioned by the \a ff or \a jmp
         * command. Of course, \a buf is a pointer to the chunk of data which
         * should be sent, and \a len is the length of this buffer.
-       */
+        */
        void (*send)(long unsigned current_chunk, long unsigned chunks_sent,
                const char *buf, size_t len, const char *header_buf,
                size_t header_len);
+
+       /**
+        * Obtain the FEC parameters of a FEC client.
+        *
+        * This is called once by vss.c at the beginning of a stream. Senders
+        * are supposed to set \a fcp to a struct which is suitable for the FEC
+        * client identified by \a private_data.
+        */
+       int (*open)(void *client, struct fec_client_parms **fcp);
+       /**
+        * Send the next slice to a FEC client.
+        *
+        * Called by vss.c when the next slice should be sent to the FEC client
+        * identified by \a private_data, the pointer which was previously
+        * passed to vss_add_fec_target().
+        */
+       int (*send_fec)(char *buf, size_t num_bytes, void *private_data);
        /**
         * Add file descriptors to fd_sets.
         *
index b44c8133d821ef2611650eee0ea40039af18bbc3..ed140c8487ee7fb806d247f7d874855e804efcff 100644 (file)
@@ -417,7 +417,6 @@ static int parse_fec_parms(const char *arg, struct sender_command_data *scd)
        ret = -ERRNO_TO_PARA_ERROR(EINVAL);
        if (val < 0 || val > 65535)
                goto out;
-       scd->max_slice_bytes = val;
        /* parse data_slices_per_group */
        b = e + 1;
        e = strchr(b, ':');
@@ -480,7 +479,6 @@ int parse_fec_url(const char *arg, struct sender_command_data *scd)
                goto out;
        }
        /* use default fec parameters. */
-       scd->max_slice_bytes = 1472;
        scd->slices_per_group = 16;
        scd->data_slices_per_group = 14;
        ret = 0;
index bc143039563b830b07164efd96ac76a4f6d3e267..0e256053e81f531a2cb178fd57f45ff935b437e6 100644 (file)
--- a/server.c
+++ b/server.c
 #include "string.h"
 #include "afs.h"
 #include "server.h"
+#include "list.h"
+#include "send.h"
 #include "vss.h"
 #include "config.h"
 #include "close_on_fork.h"
-#include "list.h"
-#include "send.h"
 #include "net.h"
 #include "daemon.h"
 #include "ipc.h"
index 43ba841d28681fc06b92c06d7ccfce95a3a6423f..822e80e161425ff36fa55b46400f5d580a433c64 100644 (file)
--- a/server.h
+++ b/server.h
@@ -25,8 +25,6 @@ struct sender_command_data{
        int netmask;
        /** The port number for add/remove. */
        int port;
-       /** Maximal slice size. */
-       uint16_t max_slice_bytes;
        /** Number of data slices plus redundant slices. */
        uint8_t slices_per_group;
        /** Number of slices minus number of redundant slices. */
index e80670d0f3866dcde32114c0459a506b8cb78cff..29deafb0e85c209fad5954fe14faf4b432d984e9 100644 (file)
@@ -21,9 +21,9 @@
 #include "afh.h"
 #include "afs.h"
 #include "server.h"
-#include "vss.h"
 #include "list.h"
 #include "send.h"
+#include "vss.h"
 #include "portable_io.h"
 #include "net.h"
 #include "fd.h"
@@ -51,6 +51,7 @@ struct udp_target {
 
 static struct list_head targets;
 static int sender_status;
+static struct sender *self;
 
 static void udp_close_target(struct udp_target *ut)
 {
@@ -152,8 +153,9 @@ err:
 /** The maximal size of the per-target chunk queue. */
 #define UDP_CQ_BYTES 40000
 
-static int udp_init_session(struct udp_target *ut)
+static int udp_open(void *client, struct fec_client_parms **fcp)
 {
+       struct udp_target *ut = client;
        int ret;
        char *iface = NULL;
 
@@ -181,6 +183,8 @@ static int udp_init_session(struct udp_target *ut)
        }
        add_close_on_fork_list(ut->fd);
        ut->cq = cq_new(UDP_CQ_BYTES);
+       ut->fcp.max_slice_bytes = 1472; /* FIXME */
+       *fcp = &ut->fcp;
        PARA_NOTICE_LOG("sending to udp %s#%d\n", ut->host, ut->port);
        return 1;
 }
@@ -238,9 +242,6 @@ static int udp_send_fec(char *buf, size_t len, void *private_data)
 
        if (sender_status == SENDER_OFF)
                return 0;
-       ret = udp_init_session(ut);
-       if (ret < 0)
-               goto fail;
        ret = send_queued_chunks(ut->fd, ut->cq, 0);
        if (ret == -ERRNO_TO_PARA_ERROR(ECONNREFUSED))
                ret = 0;
@@ -278,10 +279,7 @@ static void udp_add_target(struct sender_command_data *scd)
        para_list_add(&ut->node, &targets);
        ut->fcp.slices_per_group = scd->slices_per_group;
        ut->fcp.data_slices_per_group = scd->data_slices_per_group;
-       ut->fcp.max_slice_bytes = scd->max_slice_bytes;
-       ut->fcp.send = udp_send_fec;
-       ut->fcp.private_data = ut;
-       vss_add_fec_client(&ut->fcp, &ut->fc);
+       vss_add_fec_client(self, ut, &ut->fc);
 }
 
 static int udp_com_add(struct sender_command_data *scd)
@@ -366,6 +364,8 @@ void udp_send_init(struct sender *s)
        s->info = udp_info;
        s->help = udp_help;
        s->send = NULL;
+       s->send_fec = udp_send_fec;
+       s->open = udp_open;
        s->pre_select = NULL;
        s->post_select = NULL;
        s->shutdown_clients = udp_shutdown_targets;
@@ -379,5 +379,6 @@ void udp_send_init(struct sender *s)
        udp_init_target_list();
        if (!conf.udp_no_autostart_given)
                sender_status = SENDER_ON;
+       self = s;
        PARA_DEBUG_LOG("udp sender init complete\n");
 }
diff --git a/vss.c b/vss.c
index 898180c013a4845481bbfd48782fa9168f8e9048..afde0dcb829b42422310eaf33361b990bb98c2ac 100644 (file)
--- a/vss.c
+++ b/vss.c
@@ -26,8 +26,8 @@
 #include "net.h"
 #include "server.cmdline.h"
 #include "list.h"
-#include "vss.h"
 #include "send.h"
+#include "vss.h"
 #include "ipc.h"
 #include "fd.h"
 #include "sched.h"
@@ -137,6 +137,8 @@ struct fec_group {
 struct fec_client {
        /** If negative, this client is temporarily disabled. */
        int error;
+       /** UDP or DCCP. */
+       struct sender *sender;
        /** Parameters requested by the client. */
        struct fec_client_parms *fcp;
        /** Used by the core FEC code. */
@@ -161,6 +163,8 @@ struct fec_client {
        int num_extra_slices;
        /** Contains the FEC-encoded data. */
        unsigned char *enc_buf;
+       /** Pointer obtained from sender when the client is added. */
+       void *private_data;
 };
 
 /**
@@ -413,33 +417,16 @@ size_t vss_get_fec_eof_packet(const char **buf)
  *
  * \return Standard.
  */
-int vss_add_fec_client(struct fec_client_parms *fcp, struct fec_client **result)
+int vss_add_fec_client(struct sender *sender, void *private_data,
+               struct fec_client **result)
 {
-       int ret;
-       struct fec_client *fc;
+       struct fec_client *fc = para_calloc(sizeof(*fc));
 
-       if (fcp->max_slice_bytes < FEC_HEADER_SIZE + fcp->data_slices_per_group)
-               return -ERRNO_TO_PARA_ERROR(EINVAL);
-       fc = para_calloc(sizeof(*fc));
-       fc->fcp = fcp;
-       ret = fec_new(fcp->data_slices_per_group, fcp->slices_per_group,
-               &fc->parms);
-       if (ret < 0)
-               goto err;
-       fc->first_stream_chunk = -1; /* stream not yet started */
-       fc->src_data = para_malloc(fc->fcp->slices_per_group * sizeof(char *));
-       fc->enc_buf = para_calloc(fc->fcp->max_slice_bytes);
-       fc->num_extra_slices = 0;
-       fc->extra_src_buf = para_calloc(fc->fcp->max_slice_bytes);
-       fc->next_header_time.tv_sec = 0;
+       fc->private_data = private_data;
+       fc->sender = sender;
        para_list_add(&fc->node, &fec_client_list);
        *result = fc;
        return 1;
-err:
-       fec_free(fc->parms);
-       free(fc);
-       *result = NULL;
-       return ret;
 }
 
 /**
@@ -713,6 +700,7 @@ static void vss_pre_select(struct sched *s, struct task *t)
                list_for_each_entry_safe(fc, tmp, &fec_client_list, node) {
                        fc->first_stream_chunk = -1;
                        fc->error = 0;
+                       fc->fcp = NULL;
                }
                mmd->stream_start.tv_sec = 0;
                mmd->stream_start.tv_usec = 0;
@@ -837,6 +825,35 @@ err:
        mmd->new_vss_status_flags = VSS_NEXT;
 }
 
+static int open_fec_client(struct fec_client *fc)
+{
+       int ret;
+       struct fec_client_parms *fcp;
+
+       ret = fc->sender->open(fc->private_data, &fc->fcp);
+       if (ret < 0) {
+               fc->fcp = NULL;
+               return ret;
+       }
+       fcp = fc->fcp;
+       if (fcp->max_slice_bytes < FEC_HEADER_SIZE + fcp->data_slices_per_group)
+               return -ERRNO_TO_PARA_ERROR(EINVAL);
+       ret = fec_new(fcp->data_slices_per_group, fcp->slices_per_group,
+               &fc->parms);
+       if (ret < 0)
+               goto err;
+       fc->first_stream_chunk = -1; /* stream not yet started */
+       fc->src_data = para_malloc(fc->fcp->slices_per_group * sizeof(char *));
+       fc->enc_buf = para_calloc(fc->fcp->max_slice_bytes);
+       fc->num_extra_slices = 0;
+       fc->extra_src_buf = para_calloc(fc->fcp->max_slice_bytes);
+       fc->next_header_time.tv_sec = 0;
+       return 1;
+err:
+       fec_free(fc->parms);
+       return ret;
+}
+
 /**
  * Main sending function.
  *
@@ -848,7 +865,7 @@ err:
  */
 static void vss_send(struct vss_task *vsst)
 {
-       int i, fec_active = 0;
+       int ret, i, fec_active = 0;
        struct timeval due;
        struct fec_client *fc, *tmp_fc;
 
@@ -862,6 +879,13 @@ static void vss_send(struct vss_task *vsst)
        list_for_each_entry_safe(fc, tmp_fc, &fec_client_list, node) {
                if (fc->error < 0)
                        continue;
+               if (!fc->fcp) {
+                       ret = open_fec_client(fc);
+                       if (ret < 0) {
+                               PARA_ERROR_LOG("%s\n", para_strerror(-ret));
+                               continue;
+                       }
+               }
                if (!next_slice_is_due(fc, NULL)) {
                        fec_active = 1;
                        continue;
@@ -870,9 +894,9 @@ static void vss_send(struct vss_task *vsst)
                        continue;
                PARA_DEBUG_LOG("sending %d:%d (%u bytes)\n", fc->group.num,
                        fc->current_slice_num, fc->fcp->max_slice_bytes);
-               fc->fcp->send((char *)fc->enc_buf,
+               fc->sender->send_fec((char *)fc->enc_buf,
                        fc->fcp->max_slice_bytes,
-                       fc->fcp->private_data);
+                       fc->private_data);
                fc->current_slice_num++;
                fec_active = 1;
        }
diff --git a/vss.h b/vss.h
index ed2cd7852f92d505e04ec68dbf4608f277e3cfde..3d80274b62fea1f68306eb9b585a411dc90fdcea 100644 (file)
--- a/vss.h
+++ b/vss.h
@@ -24,32 +24,7 @@ const char *supported_audio_formats(void);
 /** Currently playing. */
 #define VSS_PLAYING 8
 
-/**
- * Each paraslash sender may register arbitrary many clients to the virtual
- * streaming system, possibly with varying fec parameters. In order to do so,
- * it must allocate a \a fec_client_parms structure and pass it to \ref
- * add_fec_client.
- *
- * Clients are automatically removed from that list by the vss if an error
- * occurs, or if the sender requests deletion of a client by calling \ref
- * vss_del_fec_client().
- */
-struct fec_client;
-
-/** FEC parameters requested by FEC clients. */
-struct fec_client_parms {
-       /** Number of data slices plus redundant slices. */
-       uint8_t slices_per_group;
-       /** Number of slices minus number of redundant slices. */
-       uint8_t data_slices_per_group;
-       /** Maximal number of bytes per slice. */
-       uint16_t max_slice_bytes;
-       /** Called by vss.c when the next slice should be sent. */
-       int (*send)(char *buf, size_t num_bytes, void *private_data);
-       /** Passed verbatim to \a send(). */
-       void *private_data;
-};
-
-int vss_add_fec_client(struct fec_client_parms *fcp, struct fec_client **result);
+int vss_add_fec_client(struct sender *sender, void *private_data,
+               struct fec_client **result);
 void vss_del_fec_client(struct fec_client *fc);
 size_t vss_get_fec_eof_packet(const char **buf);