#include "afh.h"
#include "afs.h"
#include "net.h"
-#include "vss.h"
#include "fd.h"
#include "ipc.h"
#include "portable_io.h"
* If udp is used to receive this audiod format, add fecdec as
* the first filter.
*/
- if (strcmp(afi[i].receiver->name, "udp") == 0) {
+ if (strcmp(afi[i].receiver->name, "udp") == 0 ||
+ strcmp(afi[i].receiver->name, "dccp") == 0) {
tmp = para_strdup("fecdec");
add_filter(i, tmp);
free(tmp);
#include "para.h"
#include "list.h"
#include "afh.h"
-#include "vss.h"
#include "string.h"
#include "error.h"
#include "afh.h"
#include "afs.h"
#include "server.h"
-#include "vss.h"
#include "list.h"
#include "send.h"
+#include "vss.h"
#include "rc4.h"
#include "net.h"
#include "daemon.h"
#include "server.h"
#include "net.h"
#include "list.h"
-#include "vss.h"
#include "send.h"
+#include "vss.h"
#include "fd.h"
#include "close_on_fork.h"
#include "chunk_queue.h"
#define DCCP_MAX_BYTES_PER_WRITE 1024
static struct sender_status dccp_sender_status, *dss = &dccp_sender_status;
+static struct sender *self;
+
+
+struct dccp_fec_client {
+ struct fec_client_parms fcp;
+ struct fec_client *fc;
+ struct sender_client *sc;
+};
static void dccp_pre_select(int *max_fileno, fd_set *rfds,
__a_unused fd_set *wfds)
return tx_ccid;
}
+static void dccp_shutdown_client(struct sender_client *sc)
+{
+ struct dccp_fec_client *dfc = sc->private_data;
+
+ vss_del_fec_client(dfc->fc);
+ shutdown_client(sc, dss);
+}
+
+static void dccp_shutdown_clients(void)
+{
+ struct sender_client *sc, *tmp;
+
+ list_for_each_entry_safe(sc, tmp, &dss->client_list, node)
+ dccp_shutdown_client(sc);
+}
+
+static int dccp_open(void *client, struct fec_client_parms **fcp)
+{
+ struct dccp_fec_client *dfc = client;
+
+ dfc->fcp.slices_per_group = 4;
+ dfc->fcp.data_slices_per_group = 3;
+ dfc->fcp.max_slice_bytes = 1472;
+ *fcp = &dfc->fcp;
+ return 1;
+}
+
+static int dccp_send_fec(char *buf, size_t len, void *private_data)
+{
+ struct dccp_fec_client *dfc = private_data;
+ int ret = write_nonblock(dfc->sc->fd, buf, len, DCCP_MAX_BYTES_PER_WRITE);
+
+ if (ret < 0)
+ dccp_shutdown_client(dfc->sc);
+ return ret;
+}
+
static void dccp_post_select(fd_set *rfds, __a_unused fd_set *wfds)
{
struct sender_client *sc;
+ struct dccp_fec_client *dfc;
int tx_ccid;
sc = accept_sender_client(dss, rfds);
* from the client; by shutting down this unused communication path we can
* reduce processing costs a bit. See analogous comment in dccp_recv.c.
*/
- if (shutdown(sc->fd, SHUT_RD) >= 0)
+ if (shutdown(sc->fd, SHUT_RD) < 0) {
+ PARA_WARNING_LOG("%s\n", strerror(errno));
+ shutdown_client(sc, dss);
return;
- PARA_WARNING_LOG("%s\n", strerror(errno));
- shutdown_client(sc, dss);
-}
-
-static void dccp_send(long unsigned current_chunk,
- __a_unused long unsigned chunks_sent, const char *buf,
- size_t len, const char *header_buf, size_t header_len)
-{
- struct sender_client *sc, *tmp;
-
- list_for_each_entry_safe(sc, tmp, &dss->client_list, node)
- send_chunk(sc, dss, DCCP_MAX_BYTES_PER_WRITE, current_chunk, buf,
- len, header_buf, header_len);
-}
-
-static void dccp_shutdown_clients(void)
-{
- shutdown_clients(dss);
+ }
+ dfc = para_calloc(sizeof(*dfc));
+ sc->private_data = dfc;
+ dfc->sc = sc;
+ vss_add_fec_client(self, dfc, &dfc->fc);
}
static int dccp_com_on(__a_unused struct sender_command_data *scd)
int ret;
s->info = dccp_info;
- s->send = dccp_send;
+ s->send = NULL;
+ s->open = dccp_open;
+ s->send_fec = dccp_send_fec;
s->pre_select = dccp_pre_select;
s->post_select = dccp_post_select;
s->shutdown_clients = dccp_shutdown_clients;
ret = generic_com_on(dss, IPPROTO_DCCP);
if (ret < 0)
PARA_ERROR_LOG("%s\n", para_strerror(-ret));
+ self = s;
}
#include "afs.h"
#include "server.h"
#include "http.h"
-#include "vss.h"
#include "list.h"
#include "send.h"
+#include "vss.h"
#include "close_on_fork.h"
#include "net.h"
#include "fd.h"
/** The sender subcommands. */
enum {SENDER_ADD, SENDER_DELETE, SENDER_ALLOW, SENDER_DENY, SENDER_ON, SENDER_OFF, NUM_SENDER_CMDS};
+/**
+ * Each paraslash sender may register arbitrary many clients to the virtual
+ * streaming system, possibly with varying fec parameters. In order to do so,
+ * it must allocate a \a fec_client_parms structure and pass it to \ref
+ * add_fec_client.
+ *
+ * Clients are automatically removed from that list by the vss if an error
+ * occurs, or if the sender requests deletion of a client by calling \ref
+ * vss_del_fec_client().
+ */
+struct fec_client;
+
+/** FEC parameters requested by FEC clients. */
+struct fec_client_parms {
+ /** Number of data slices plus redundant slices. */
+ uint8_t slices_per_group;
+ /** Number of slices minus number of redundant slices. */
+ uint8_t data_slices_per_group;
+ /** Maximal number of bytes per slice, initially zero. */
+ uint16_t max_slice_bytes;
+};
+
/**
* Describes one supported sender of para_server.
*
* only differ if the stream was repositioned by the \a ff or \a jmp
* command. Of course, \a buf is a pointer to the chunk of data which
* should be sent, and \a len is the length of this buffer.
- */
+ */
void (*send)(long unsigned current_chunk, long unsigned chunks_sent,
const char *buf, size_t len, const char *header_buf,
size_t header_len);
+
+ /**
+ * Obtain the FEC parameters of a FEC client.
+ *
+ * This is called once by vss.c at the beginning of a stream. Senders
+ * are supposed to set \a fcp to a struct which is suitable for the FEC
+ * client identified by \a private_data.
+ */
+ int (*open)(void *client, struct fec_client_parms **fcp);
+ /**
+ * Send the next slice to a FEC client.
+ *
+ * Called by vss.c when the next slice should be sent to the FEC client
+ * identified by \a private_data, the pointer which was previously
+ * passed to vss_add_fec_target().
+ */
+ int (*send_fec)(char *buf, size_t num_bytes, void *private_data);
/**
* Add file descriptors to fd_sets.
*
ret = -ERRNO_TO_PARA_ERROR(EINVAL);
if (val < 0 || val > 65535)
goto out;
- scd->max_slice_bytes = val;
/* parse data_slices_per_group */
b = e + 1;
e = strchr(b, ':');
goto out;
}
/* use default fec parameters. */
- scd->max_slice_bytes = 1472;
scd->slices_per_group = 16;
scd->data_slices_per_group = 14;
ret = 0;
#include "string.h"
#include "afs.h"
#include "server.h"
+#include "list.h"
+#include "send.h"
#include "vss.h"
#include "config.h"
#include "close_on_fork.h"
-#include "list.h"
-#include "send.h"
#include "net.h"
#include "daemon.h"
#include "ipc.h"
int netmask;
/** The port number for add/remove. */
int port;
- /** Maximal slice size. */
- uint16_t max_slice_bytes;
/** Number of data slices plus redundant slices. */
uint8_t slices_per_group;
/** Number of slices minus number of redundant slices. */
#include "afh.h"
#include "afs.h"
#include "server.h"
-#include "vss.h"
#include "list.h"
#include "send.h"
+#include "vss.h"
#include "portable_io.h"
#include "net.h"
#include "fd.h"
static struct list_head targets;
static int sender_status;
+static struct sender *self;
static void udp_close_target(struct udp_target *ut)
{
/** The maximal size of the per-target chunk queue. */
#define UDP_CQ_BYTES 40000
-static int udp_init_session(struct udp_target *ut)
+static int udp_open(void *client, struct fec_client_parms **fcp)
{
+ struct udp_target *ut = client;
int ret;
char *iface = NULL;
}
add_close_on_fork_list(ut->fd);
ut->cq = cq_new(UDP_CQ_BYTES);
+ ut->fcp.max_slice_bytes = 1472; /* FIXME */
+ *fcp = &ut->fcp;
PARA_NOTICE_LOG("sending to udp %s#%d\n", ut->host, ut->port);
return 1;
}
if (sender_status == SENDER_OFF)
return 0;
- ret = udp_init_session(ut);
- if (ret < 0)
- goto fail;
ret = send_queued_chunks(ut->fd, ut->cq, 0);
if (ret == -ERRNO_TO_PARA_ERROR(ECONNREFUSED))
ret = 0;
para_list_add(&ut->node, &targets);
ut->fcp.slices_per_group = scd->slices_per_group;
ut->fcp.data_slices_per_group = scd->data_slices_per_group;
- ut->fcp.max_slice_bytes = scd->max_slice_bytes;
- ut->fcp.send = udp_send_fec;
- ut->fcp.private_data = ut;
- vss_add_fec_client(&ut->fcp, &ut->fc);
+ vss_add_fec_client(self, ut, &ut->fc);
}
static int udp_com_add(struct sender_command_data *scd)
s->info = udp_info;
s->help = udp_help;
s->send = NULL;
+ s->send_fec = udp_send_fec;
+ s->open = udp_open;
s->pre_select = NULL;
s->post_select = NULL;
s->shutdown_clients = udp_shutdown_targets;
udp_init_target_list();
if (!conf.udp_no_autostart_given)
sender_status = SENDER_ON;
+ self = s;
PARA_DEBUG_LOG("udp sender init complete\n");
}
#include "net.h"
#include "server.cmdline.h"
#include "list.h"
-#include "vss.h"
#include "send.h"
+#include "vss.h"
#include "ipc.h"
#include "fd.h"
#include "sched.h"
struct fec_client {
/** If negative, this client is temporarily disabled. */
int error;
+ /** UDP or DCCP. */
+ struct sender *sender;
/** Parameters requested by the client. */
struct fec_client_parms *fcp;
/** Used by the core FEC code. */
int num_extra_slices;
/** Contains the FEC-encoded data. */
unsigned char *enc_buf;
+ /** Pointer obtained from sender when the client is added. */
+ void *private_data;
};
/**
*
* \return Standard.
*/
-int vss_add_fec_client(struct fec_client_parms *fcp, struct fec_client **result)
+int vss_add_fec_client(struct sender *sender, void *private_data,
+ struct fec_client **result)
{
- int ret;
- struct fec_client *fc;
+ struct fec_client *fc = para_calloc(sizeof(*fc));
- if (fcp->max_slice_bytes < FEC_HEADER_SIZE + fcp->data_slices_per_group)
- return -ERRNO_TO_PARA_ERROR(EINVAL);
- fc = para_calloc(sizeof(*fc));
- fc->fcp = fcp;
- ret = fec_new(fcp->data_slices_per_group, fcp->slices_per_group,
- &fc->parms);
- if (ret < 0)
- goto err;
- fc->first_stream_chunk = -1; /* stream not yet started */
- fc->src_data = para_malloc(fc->fcp->slices_per_group * sizeof(char *));
- fc->enc_buf = para_calloc(fc->fcp->max_slice_bytes);
- fc->num_extra_slices = 0;
- fc->extra_src_buf = para_calloc(fc->fcp->max_slice_bytes);
- fc->next_header_time.tv_sec = 0;
+ fc->private_data = private_data;
+ fc->sender = sender;
para_list_add(&fc->node, &fec_client_list);
*result = fc;
return 1;
-err:
- fec_free(fc->parms);
- free(fc);
- *result = NULL;
- return ret;
}
/**
list_for_each_entry_safe(fc, tmp, &fec_client_list, node) {
fc->first_stream_chunk = -1;
fc->error = 0;
+ fc->fcp = NULL;
}
mmd->stream_start.tv_sec = 0;
mmd->stream_start.tv_usec = 0;
mmd->new_vss_status_flags = VSS_NEXT;
}
+static int open_fec_client(struct fec_client *fc)
+{
+ int ret;
+ struct fec_client_parms *fcp;
+
+ ret = fc->sender->open(fc->private_data, &fc->fcp);
+ if (ret < 0) {
+ fc->fcp = NULL;
+ return ret;
+ }
+ fcp = fc->fcp;
+ if (fcp->max_slice_bytes < FEC_HEADER_SIZE + fcp->data_slices_per_group)
+ return -ERRNO_TO_PARA_ERROR(EINVAL);
+ ret = fec_new(fcp->data_slices_per_group, fcp->slices_per_group,
+ &fc->parms);
+ if (ret < 0)
+ goto err;
+ fc->first_stream_chunk = -1; /* stream not yet started */
+ fc->src_data = para_malloc(fc->fcp->slices_per_group * sizeof(char *));
+ fc->enc_buf = para_calloc(fc->fcp->max_slice_bytes);
+ fc->num_extra_slices = 0;
+ fc->extra_src_buf = para_calloc(fc->fcp->max_slice_bytes);
+ fc->next_header_time.tv_sec = 0;
+ return 1;
+err:
+ fec_free(fc->parms);
+ return ret;
+}
+
/**
* Main sending function.
*
*/
static void vss_send(struct vss_task *vsst)
{
- int i, fec_active = 0;
+ int ret, i, fec_active = 0;
struct timeval due;
struct fec_client *fc, *tmp_fc;
list_for_each_entry_safe(fc, tmp_fc, &fec_client_list, node) {
if (fc->error < 0)
continue;
+ if (!fc->fcp) {
+ ret = open_fec_client(fc);
+ if (ret < 0) {
+ PARA_ERROR_LOG("%s\n", para_strerror(-ret));
+ continue;
+ }
+ }
if (!next_slice_is_due(fc, NULL)) {
fec_active = 1;
continue;
continue;
PARA_DEBUG_LOG("sending %d:%d (%u bytes)\n", fc->group.num,
fc->current_slice_num, fc->fcp->max_slice_bytes);
- fc->fcp->send((char *)fc->enc_buf,
+ fc->sender->send_fec((char *)fc->enc_buf,
fc->fcp->max_slice_bytes,
- fc->fcp->private_data);
+ fc->private_data);
fc->current_slice_num++;
fec_active = 1;
}
/** Currently playing. */
#define VSS_PLAYING 8
-/**
- * Each paraslash sender may register arbitrary many clients to the virtual
- * streaming system, possibly with varying fec parameters. In order to do so,
- * it must allocate a \a fec_client_parms structure and pass it to \ref
- * add_fec_client.
- *
- * Clients are automatically removed from that list by the vss if an error
- * occurs, or if the sender requests deletion of a client by calling \ref
- * vss_del_fec_client().
- */
-struct fec_client;
-
-/** FEC parameters requested by FEC clients. */
-struct fec_client_parms {
- /** Number of data slices plus redundant slices. */
- uint8_t slices_per_group;
- /** Number of slices minus number of redundant slices. */
- uint8_t data_slices_per_group;
- /** Maximal number of bytes per slice. */
- uint16_t max_slice_bytes;
- /** Called by vss.c when the next slice should be sent. */
- int (*send)(char *buf, size_t num_bytes, void *private_data);
- /** Passed verbatim to \a send(). */
- void *private_data;
-};
-
-int vss_add_fec_client(struct fec_client_parms *fcp, struct fec_client **result);
+int vss_add_fec_client(struct sender *sender, void *private_data,
+ struct fec_client **result);
void vss_del_fec_client(struct fec_client *fc);
size_t vss_get_fec_eof_packet(const char **buf);