playlist sha1 rbtree sched audiod grab_client filter_common wav_filter compress_filter
http_recv dccp_recv recv_common write_common file_write audiod_command
client_common recv stdout filter stdin audioc write client fsck exec send_common ggo
-udp_recv udp_send color"
+udp_recv udp_send color fec fecdec_filter"
all_executables="server recv filter audioc write client fsck afh"
recv_cmdline_objs="recv.cmdline http_recv.cmdline dccp_recv.cmdline udp_recv.cmdline"
recv_errlist_objs="http_recv recv_common recv time string net dccp_recv
- fd sched stdout ggo udp_recv"
+ fd sched stdout ggo udp_recv fec"
recv_ldflags=""
receivers=" http dccp udp"
senders=" http dccp udp"
filter_cmdline_objs="filter.cmdline compress_filter.cmdline amp_filter.cmdline"
-filter_errlist_objs="filter_common wav_filter compress_filter filter string stdin stdout sched fd amp_filter ggo"
+filter_errlist_objs="filter_common wav_filter compress_filter filter string
+ stdin stdout sched fd amp_filter ggo fecdec_filter fec"
filter_ldflags=""
-filters=" compress wav amp"
+filters=" compress wav amp fecdec"
audioc_cmdline_objs="audioc.cmdline"
audioc_errlist_objs="audioc string net fd"
audiod_command_list amp_filter.cmdline udp_recv.cmdline"
audiod_errlist_objs="audiod signal string daemon stat net
time grab_client filter_common wav_filter compress_filter amp_filter http_recv dccp_recv
- recv_common fd sched write_common file_write audiod_command crypt
- client_common ggo udp_recv color"
+ recv_common fd sched write_common file_write audiod_command crypt fecdec_filter
+ client_common ggo udp_recv color fec"
audiod_ldflags=""
audiod_audio_formats=""
server_errlist_objs="server afh_common mp3_afh vss command net string signal
time daemon stat crypt http_send close_on_fork
ipc dccp_send fd user_list chunk_queue afs osl aft mood score attribute
- blob playlist sha1 rbtree sched acl send_common udp_send color"
+ blob playlist sha1 rbtree sched acl send_common udp_send color fec"
server_ldflags=""
server_audio_formats=" mp3"
#define GGO_ERRORS
#define COLOR_ERRORS
-
extern const char **para_errlist[];
#define COMPRESS_FILTER_ERRORS \
PARA_ERROR(COMPRESS_SYNTAX, "syntax error in compress filter config"), \
+#define FEC_ERRORS \
+ PARA_ERROR(FEC_BAD_IDX, "invalid index vector"), \
+ PARA_ERROR(FEC_SINGULAR, "unexpected singular matrix"), \
+ PARA_ERROR(FEC_PIVOT, "pivot column not found"), \
+ PARA_ERROR(FEC_PARMS, "invalid fec parameters"), \
+
+
+#define FECDEC_FILTER_ERRORS \
+ PARA_ERROR(BAD_FEC_HEADER, "invalid fec header"), \
+ PARA_ERROR(BAD_SLICE_SIZE, "slice size too large"), \
+ PARA_ERROR(BAD_SLICE_NUM, "invalid slice number"), \
+ PARA_ERROR(FECDEC_OVERRUN, "fecdec output buffer overrun"), \
+ PARA_ERROR(FECDEC_EOF, "received eof packet"), \
+
+
#define AMP_FILTER_ERRORS \
PARA_ERROR(AMP_SYNTAX, "syntax error in amp filter config"), \
--- /dev/null
+/*
+ * fec.c -- forward error correction based on Vandermonde matrices
+ * 980624
+ * (C) 1997-98 Luigi Rizzo (luigi@iet.unipi.it)
+ *
+ * Portions derived from code by Phil Karn (karn@ka9q.ampr.org),
+ * Robert Morelos-Zaragoza (robert@spectra.eng.hawaii.edu) and Hari
+ * Thirumoorthy (harit@spectra.eng.hawaii.edu), Aug 1995
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
+ * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS
+ * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
+ * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
+ * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
+ * TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
+ * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
+ * OF SUCH DAMAGE.
+ */
+
+#include "para.h"
+#include "error.h"
+#include "portable_io.h"
+#include "string.h"
+#include "fec.h"
+
+#define GF_BITS 8 /* code over GF(256) */
+#define GF_SIZE ((1 << GF_BITS) - 1)
+
+/*
+ * To speed up computations, we have tables for logarithm, exponent and inverse
+ * of a number. We use a table for multiplication as well (it takes 64K, no big
+ * deal even on a PDA, especially because it can be pre-initialized an put into
+ * a ROM!). The macro gf_mul(x,y) takes care of multiplications.
+ */
+static unsigned char gf_exp[2 * GF_SIZE]; /* index->poly form conversion table */
+static int gf_log[GF_SIZE + 1]; /* Poly->index form conversion table */
+static unsigned char inverse[GF_SIZE + 1]; /* inverse of field elem. */
+static unsigned char gf_mul_table[GF_SIZE + 1][GF_SIZE + 1];
+/* Multiply two numbers. */
+#define gf_mul(x,y) gf_mul_table[x][y]
+
+/* Compute x % GF_SIZE without a slow divide. */
+static inline unsigned char modnn(int x)
+{
+ while (x >= GF_SIZE) {
+ x -= GF_SIZE;
+ x = (x >> GF_BITS) + (x & GF_SIZE);
+ }
+ return x;
+}
+
+static void init_mul_table(void)
+{
+ int i, j;
+ for (i = 0; i < GF_SIZE + 1; i++)
+ for (j = 0; j < GF_SIZE + 1; j++)
+ gf_mul_table[i][j] =
+ gf_exp[modnn(gf_log[i] + gf_log[j])];
+
+ for (j = 0; j < GF_SIZE + 1; j++)
+ gf_mul_table[0][j] = gf_mul_table[j][0] = 0;
+}
+
+static unsigned char *alloc_matrix(int rows, int cols)
+{
+ return para_malloc(rows * cols);
+}
+
+/*
+ * Initialize the data structures used for computations in GF.
+ *
+ * This generates GF(2**GF_BITS) from the irreducible polynomial p(X) in
+ * p[0]..p[m].
+ *
+ * Lookup tables:
+ * index->polynomial form gf_exp[] contains j= \alpha^i;
+ * polynomial form -> index form gf_log[ j = \alpha^i ] = i
+ * \alpha=x is the primitive element of GF(2^m)
+ *
+ * For efficiency, gf_exp[] has size 2*GF_SIZE, so that a simple
+ * multiplication of two numbers can be resolved without calling modnn
+ */
+static void generate_gf(void)
+{
+ int i;
+ unsigned char mask = 1;
+ char *pp = "101110001"; /* The primitive polynomial 1+x^2+x^3+x^4+x^8 */
+ gf_exp[GF_BITS] = 0; /* will be updated at the end of the 1st loop */
+
+ /*
+ * first, generate the (polynomial representation of) powers of \alpha,
+ * which are stored in gf_exp[i] = \alpha ** i .
+ * At the same time build gf_log[gf_exp[i]] = i .
+ * The first GF_BITS powers are simply bits shifted to the left.
+ */
+ for (i = 0; i < GF_BITS; i++, mask <<= 1) {
+ gf_exp[i] = mask;
+ gf_log[gf_exp[i]] = i;
+ /*
+ * If pp[i] == 1 then \alpha ** i occurs in poly-repr
+ * gf_exp[GF_BITS] = \alpha ** GF_BITS
+ */
+ if (pp[i] == '1')
+ gf_exp[GF_BITS] ^= mask;
+ }
+ /*
+ * now gf_exp[GF_BITS] = \alpha ** GF_BITS is complete, so can also
+ * compute its inverse.
+ */
+ gf_log[gf_exp[GF_BITS]] = GF_BITS;
+ /*
+ * Poly-repr of \alpha ** (i+1) is given by poly-repr of \alpha ** i
+ * shifted left one-bit and accounting for any \alpha ** GF_BITS term
+ * that may occur when poly-repr of \alpha ** i is shifted.
+ */
+ mask = 1 << (GF_BITS - 1);
+ for (i = GF_BITS + 1; i < GF_SIZE; i++) {
+ if (gf_exp[i - 1] >= mask)
+ gf_exp[i] =
+ gf_exp[GF_BITS] ^ ((gf_exp[i - 1] ^ mask) << 1);
+ else
+ gf_exp[i] = gf_exp[i - 1] << 1;
+ gf_log[gf_exp[i]] = i;
+ }
+ /*
+ * log(0) is not defined, so use a special value
+ */
+ gf_log[0] = GF_SIZE;
+ /* set the extended gf_exp values for fast multiply */
+ for (i = 0; i < GF_SIZE; i++)
+ gf_exp[i + GF_SIZE] = gf_exp[i];
+
+ inverse[0] = 0; /* 0 has no inverse. */
+ inverse[1] = 1;
+ for (i = 2; i <= GF_SIZE; i++)
+ inverse[i] = gf_exp[GF_SIZE - gf_log[i]];
+}
+
+/*
+ * Compute dst[] = dst[] + c * src[]
+ *
+ * This is used often, so better optimize it! Currently the loop is unrolled 16
+ * times. The case c=0 is also optimized, whereas c=1 is not.
+ */
+#define UNROLL 16
+static void addmul(unsigned char *dst1, const unsigned char const *src1,
+ unsigned char c, int sz)
+{
+ if (c == 0)
+ return;
+ unsigned char *dst = dst1, *lim = &dst[sz - UNROLL + 1],
+ *col = gf_mul_table[c];
+ const unsigned char const *src = src1;
+
+ for (; dst < lim; dst += UNROLL, src += UNROLL) {
+ dst[0] ^= col[src[0]];
+ dst[1] ^= col[src[1]];
+ dst[2] ^= col[src[2]];
+ dst[3] ^= col[src[3]];
+ dst[4] ^= col[src[4]];
+ dst[5] ^= col[src[5]];
+ dst[6] ^= col[src[6]];
+ dst[7] ^= col[src[7]];
+ dst[8] ^= col[src[8]];
+ dst[9] ^= col[src[9]];
+ dst[10] ^= col[src[10]];
+ dst[11] ^= col[src[11]];
+ dst[12] ^= col[src[12]];
+ dst[13] ^= col[src[13]];
+ dst[14] ^= col[src[14]];
+ dst[15] ^= col[src[15]];
+ }
+ lim += UNROLL - 1;
+ for (; dst < lim; dst++, src++) /* final components */
+ *dst ^= col[*src];
+}
+
+/*
+ * Compute C = AB where A is n*k, B is k*m, C is n*m
+ */
+static void matmul(unsigned char *a, unsigned char *b, unsigned char *c,
+ int n, int k, int m)
+{
+ int row, col, i;
+
+ for (row = 0; row < n; row++) {
+ for (col = 0; col < m; col++) {
+ unsigned char *pa = &a[row * k], *pb = &b[col], acc = 0;
+ for (i = 0; i < k; i++, pa++, pb += m)
+ acc ^= gf_mul(*pa, *pb);
+ c[row * m + col] = acc;
+ }
+ }
+}
+
+#define FEC_SWAP(a,b) {typeof(a) tmp = a; a = b; b = tmp;}
+
+/*
+ * Compute the inverse of a matrix.
+ *
+ * k is the size of the matrix 'src' (Gauss-Jordan, adapted from Numerical
+ * Recipes in C). Returns -1 if 'src' is singular.
+ */
+static int invert_mat(unsigned char *src, int k)
+{
+ int irow, icol, row, col, ix, error;
+ int *indxc = para_malloc(k * sizeof(int));
+ int *indxr = para_malloc(k * sizeof(int));
+ int *ipiv = para_malloc(k * sizeof(int)); /* elements used as pivots */
+ unsigned char c, *p, *id_row = alloc_matrix(1, k),
+ *temp_row = alloc_matrix(1, k);
+
+ memset(id_row, 0, k);
+ memset(ipiv, 0, k * sizeof(int));
+
+ for (col = 0; col < k; col++) {
+ unsigned char *pivot_row;
+ /*
+ * Zeroing column 'col', look for a non-zero element.
+ * First try on the diagonal, if it fails, look elsewhere.
+ */
+ irow = icol = -1;
+ if (ipiv[col] != 1 && src[col * k + col] != 0) {
+ irow = col;
+ icol = col;
+ goto found_piv;
+ }
+ for (row = 0; row < k; row++) {
+ if (ipiv[row] != 1) {
+ for (ix = 0; ix < k; ix++) {
+ if (ipiv[ix] == 0) {
+ if (src[row * k + ix] != 0) {
+ irow = row;
+ icol = ix;
+ goto found_piv;
+ }
+ } else if (ipiv[ix] > 1) {
+ error = -E_FEC_PIVOT;
+ goto fail;
+ }
+ }
+ }
+ }
+ error = -E_FEC_PIVOT;
+ if (icol == -1)
+ goto fail;
+found_piv:
+ ++(ipiv[icol]);
+ /*
+ * swap rows irow and icol, so afterwards the diagonal element
+ * will be correct. Rarely done, not worth optimizing.
+ */
+ if (irow != icol)
+ for (ix = 0; ix < k; ix++)
+ FEC_SWAP(src[irow * k + ix], src[icol * k + ix]);
+ indxr[col] = irow;
+ indxc[col] = icol;
+ pivot_row = &src[icol * k];
+ error = -E_FEC_SINGULAR;
+ c = pivot_row[icol];
+ if (c == 0)
+ goto fail;
+ if (c != 1) { /* otherwise this is a NOP */
+ /*
+ * this is done often , but optimizing is not so
+ * fruitful, at least in the obvious ways (unrolling)
+ */
+ c = inverse[c];
+ pivot_row[icol] = 1;
+ for (ix = 0; ix < k; ix++)
+ pivot_row[ix] = gf_mul(c, pivot_row[ix]);
+ }
+ /*
+ * from all rows, remove multiples of the selected row to zero
+ * the relevant entry (in fact, the entry is not zero because
+ * we know it must be zero). (Here, if we know that the
+ * pivot_row is the identity, we can optimize the addmul).
+ */
+ id_row[icol] = 1;
+ if (memcmp(pivot_row, id_row, k) != 0) {
+ for (p = src, ix = 0; ix < k; ix++, p += k) {
+ if (ix != icol) {
+ c = p[icol];
+ p[icol] = 0;
+ addmul(p, pivot_row, c, k);
+ }
+ }
+ }
+ id_row[icol] = 0;
+ }
+ for (col = k - 1; col >= 0; col--) {
+ if (indxr[col] < 0 || indxr[col] >= k)
+ PARA_CRIT_LOG("AARGH, indxr[col] %d\n", indxr[col]);
+ else if (indxc[col] < 0 || indxc[col] >= k)
+ PARA_CRIT_LOG("AARGH, indxc[col] %d\n", indxc[col]);
+ else if (indxr[col] != indxc[col]) {
+ for (row = 0; row < k; row++) {
+ FEC_SWAP(src[row * k + indxr[col]],
+ src[row * k + indxc[col]]);
+ }
+ }
+ }
+ error = 0;
+fail:
+ free(indxc);
+ free(indxr);
+ free(ipiv);
+ free(id_row);
+ free(temp_row);
+ return error;
+}
+
+/*
+ * Invert a Vandermonde matrix.
+ *
+ * It assumes that the matrix is not singular and _IS_ a Vandermonde matrix.
+ * Only uses the second column of the matrix, containing the p_i's.
+ *
+ * Algorithm borrowed from "Numerical recipes in C" -- sec.2.8, but largely
+ * revised for GF purposes.
+ */
+static void invert_vdm(unsigned char *src, int k)
+{
+ int i, j, row, col;
+ unsigned char *b, *c, *p, t, xx;
+
+ if (k == 1) /* degenerate */
+ return;
+ /*
+ * c holds the coefficient of P(x) = Prod (x - p_i), i=0..k-1
+ * b holds the coefficient for the matrix inversion
+ */
+ c = para_malloc(k);
+ b = para_malloc(k);
+ p = para_malloc(k);
+
+ for (j = 1, i = 0; i < k; i++, j += k) {
+ c[i] = 0;
+ p[i] = src[j];
+ }
+ /*
+ * construct coeffs recursively. We know c[k] = 1 (implicit) and start
+ * P_0 = x - p_0, then at each stage multiply by x - p_i generating P_i
+ * = x P_{i-1} - p_i P_{i-1} After k steps we are done.
+ */
+ c[k - 1] = p[0]; /* really -p(0), but x = -x in GF(2^m) */
+ for (i = 1; i < k; i++) {
+ unsigned char p_i = p[i];
+ for (j = k - 1 - (i - 1); j < k - 1; j++)
+ c[j] ^= gf_mul(p_i, c[j + 1]);
+ c[k - 1] ^= p_i;
+ }
+
+ for (row = 0; row < k; row++) {
+ /*
+ * synthetic division etc.
+ */
+ xx = p[row];
+ t = 1;
+ b[k - 1] = 1; /* this is in fact c[k] */
+ for (i = k - 2; i >= 0; i--) {
+ b[i] = c[i + 1] ^ gf_mul(xx, b[i + 1]);
+ t = gf_mul(xx, t) ^ b[i];
+ }
+ for (col = 0; col < k; col++)
+ src[col * k + row] = gf_mul(inverse[t], b[col]);
+ }
+ free(c);
+ free(b);
+ free(p);
+}
+
+static int fec_initialized;
+
+static void init_fec(void)
+{
+ generate_gf();
+ init_mul_table();
+ fec_initialized = 1;
+}
+
+struct fec_parms {
+ int k, n; /* parameters of the code */
+ unsigned char *enc_matrix;
+};
+
+/**
+ * Deallocate a fec params structure.
+ *
+ * \param p The structure to free.
+ */
+void fec_free(struct fec_parms *p)
+{
+ if (!p)
+ return;
+ free(p->enc_matrix);
+ free(p);
+}
+
+/**
+ * Create a new encoder and return an opaque descriptor to it.
+ *
+ * \param k Number of input slices.
+ * \param n Number of output slices.
+ * \param result On success the Fec descriptor is returned here.
+ *
+ * \return Standard.
+ *
+ * This creates the k*n encoding matrix. It is computed starting with a
+ * Vandermonde matrix, and then transformed into a systematic matrix.
+ */
+int fec_new(int k, int n, struct fec_parms **result)
+{
+ int row, col;
+ unsigned char *p, *tmp_m;
+ struct fec_parms *parms;
+
+ if (!fec_initialized)
+ init_fec();
+
+ if (k < 1 || k > GF_SIZE + 1 || n > GF_SIZE + 1 || k > n)
+ return -E_FEC_PARMS;
+ parms = para_malloc(sizeof(struct fec_parms));
+ parms->k = k;
+ parms->n = n;
+ parms->enc_matrix = alloc_matrix(n, k);
+ tmp_m = alloc_matrix(n, k);
+ /*
+ * fill the matrix with powers of field elements, starting from 0.
+ * The first row is special, cannot be computed with exp. table.
+ */
+ tmp_m[0] = 1;
+ for (col = 1; col < k; col++)
+ tmp_m[col] = 0;
+ for (p = tmp_m + k, row = 0; row < n - 1; row++, p += k) {
+ for (col = 0; col < k; col++)
+ p[col] = gf_exp[modnn(row * col)];
+ }
+
+ /*
+ * quick code to build systematic matrix: invert the top
+ * k*k vandermonde matrix, multiply right the bottom n-k rows
+ * by the inverse, and construct the identity matrix at the top.
+ */
+ invert_vdm(tmp_m, k); /* much faster than invert_mat */
+ matmul(tmp_m + k * k, tmp_m, parms->enc_matrix + k * k, n - k, k, k);
+ /*
+ * the upper matrix is I so do not bother with a slow multiply
+ */
+ memset(parms->enc_matrix, 0, k * k);
+ for (p = parms->enc_matrix, col = 0; col < k; col++, p += k + 1)
+ *p = 1;
+ free(tmp_m);
+ *result = parms;
+ return 0;
+}
+
+/**
+ * Compute one encoded slice of the given input.
+ *
+ * \param parms The fec parameters returned earlier by fec_new().
+ * \param src The \a k data slices to encode.
+ * \param dst Result pointer.
+ * \param idx The index of the slice to compute.
+ * \param sz The size of the input data packets.
+ *
+ * Encode the \a k slices of size \a sz given by \a src and store the output
+ * slice number \a idx in \dst.
+ */
+void fec_encode(struct fec_parms *parms, const unsigned char * const *src,
+ unsigned char *dst, int idx, int sz)
+{
+ int i, k = parms->k;
+ unsigned char *p;
+
+ assert(idx <= parms->n);
+
+ if (idx < k) {
+ memcpy(dst, src[idx], sz);
+ return;
+ }
+ p = &(parms->enc_matrix[idx * k]);
+ memset(dst, 0, sz);
+ for (i = 0; i < k; i++)
+ addmul(dst, src[i], p[i], sz);
+}
+
+/* Move src packets in their position. */
+static int shuffle(unsigned char **data, int *idx, int k)
+{
+ int i;
+
+ for (i = 0; i < k;) {
+ if (idx[i] >= k || idx[i] == i)
+ i++;
+ else { /* put index and data at the right position */
+ int c = idx[i];
+
+ if (idx[c] == c) /* conflict */
+ return -E_FEC_BAD_IDX;
+ FEC_SWAP(idx[i], idx[c]);
+ FEC_SWAP(data[i], data[c]);
+ }
+ }
+ return 0;
+}
+
+/*
+ * Construct the decoding matrix given the indices. The encoding matrix must
+ * already be allocated.
+ */
+static int build_decode_matrix(struct fec_parms *parms, int *idx,
+ unsigned char **result)
+{
+ int ret = -E_FEC_BAD_IDX, i, k = parms->k;
+ unsigned char *p, *matrix = alloc_matrix(k, k);
+
+ for (i = 0, p = matrix; i < k; i++, p += k) {
+ if (idx[i] >= parms->n) /* invalid index */
+ goto err;
+ if (idx[i] < k) {
+ memset(p, 0, k);
+ p[i] = 1;
+ } else
+ memcpy(p, &(parms->enc_matrix[idx[i] * k]), k);
+ }
+ ret = invert_mat(matrix, k);
+ if (ret < 0)
+ goto err;
+ *result = matrix;
+ return 0;
+err:
+ free(matrix);
+ *result = NULL;
+ return ret;
+}
+
+/**
+ * Decode one slice from the group of received slices.
+ *
+ * \param code Pointer to fec params structure.
+ * \param data Pointers to received packets.
+ * \param idx Pointer to packet indices (gets modified).
+ * \param sz Size of each packet.
+ *
+ * \return Zero on success, -1 on errors.
+ *
+ * The \a data vector of received slices and the indices of slices are used to
+ * produce the correct output slice. The data slices are modified in-place.
+ */
+int fec_decode(struct fec_parms *parms, unsigned char **data, int *idx,
+ int sz)
+{
+ unsigned char *m_dec, **slice;
+ int ret, row, col, k = parms->k;
+
+ ret = shuffle(data, idx, k);
+ if (ret < 0)
+ return ret;
+ ret = build_decode_matrix(parms, idx, &m_dec);
+ if (ret < 0)
+ return ret;
+ /* do the actual decoding */
+ slice = para_malloc(k * sizeof(unsigned char *));
+ for (row = 0; row < k; row++) {
+ if (idx[row] >= k) {
+ slice[row] = para_calloc(sz);
+ for (col = 0; col < k; col++)
+ addmul(slice[row], data[col],
+ m_dec[row * k + col], sz);
+ }
+ }
+ /* move slices to their final destination */
+ for (row = 0; row < k; row++) {
+ if (idx[row] >= k) {
+ memcpy(data[row], slice[row], sz);
+ free(slice[row]);
+ }
+ }
+ free(slice);
+ free(m_dec);
+ return 0;
+}
--- /dev/null
+/*
+ * fec.c -- forward error correction based on Vandermonde matrices
+ * 980614
+ * (C) 1997-98 Luigi Rizzo (luigi@iet.unipi.it)
+ *
+ * Portions derived from code by Phil Karn (karn@ka9q.ampr.org),
+ * Robert Morelos-Zaragoza (robert@spectra.eng.hawaii.edu) and Hari
+ * Thirumoorthy (harit@spectra.eng.hawaii.edu), Aug 1995
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
+ * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS
+ * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
+ * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
+ * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
+ * TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
+ * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
+ * OF SUCH DAMAGE.
+ */
+
+#define FEC_MAGIC 0xFECC0DEC
+#define FEC_HEADER_SIZE 32
+
+struct fec_parms;
+
+int fec_new(int k, int n, struct fec_parms **parms);
+void fec_free(struct fec_parms *p);
+void fec_encode(struct fec_parms *parms, const unsigned char * const *src,
+ unsigned char *dst, int idx, int sz);
+int fec_decode(struct fec_parms *parms, unsigned char **data, int *idx,
+ int sz);
+
+
--- /dev/null
+/*
+ * Copyright (C) 2009 Andre Noll <maan@systemlinux.org>
+ *
+ * Licensed under the GPL v2. For licencing details see COPYING.
+ */
+
+/** \file fecdev_filter.c A filter fec-decodes an audio stream. */
+
+#include <dirent.h>
+#include "para.h"
+#include "error.h"
+#include "list.h"
+#include "sched.h"
+#include "ggo.h"
+#include "filter.h"
+#include "string.h"
+#include "portable_io.h"
+#include "fec.h"
+#include "fd.h"
+
+#define NUM_FEC_GROUPS 3
+#define INPUT_BUFFER_SIZE 16384
+
+/** size of the output buffer */
+#define FECDEC_OUTBUF_SIZE 81920
+
+struct fec_header {
+ uint8_t slices_per_group;
+ uint8_t data_slices_per_group;
+ uint32_t audio_header_size;
+
+ uint32_t group_num;
+ uint32_t group_bytes;
+
+ uint8_t slice_num;
+ uint16_t slice_bytes;
+};
+
+struct fec_group {
+ struct fec_header h;
+ int num_received_slices;
+ int num_slices;
+ int *idx;
+ unsigned char **data;
+};
+
+struct private_fecdec_data {
+ struct fec_parms *fec;
+ struct fec_group groups[NUM_FEC_GROUPS];
+};
+
+#define FOR_EACH_FEC_GROUP(g, d) for (g = (d)->groups; \
+ (g) - (d)->groups < NUM_FEC_GROUPS; (g)++)
+
+#define UNUSED_GROUP_NUM 0xffffffff
+
+static int group_complete(struct fec_group *fg)
+{
+ if (fg->h.group_num == UNUSED_GROUP_NUM)
+ return 0;
+ //PARA_INFO_LOG("received slices: %u, slices per group: %u\n", fg->num_received_slices, fg->h.data_slices_per_group);
+ return fg->num_received_slices >= fg->h.data_slices_per_group;
+}
+
+static int group_empty(struct fec_group *fg)
+{
+ return fg->num_received_slices == 0;
+}
+
+static void clear_group(struct fec_group *fg)
+{
+ int i;
+
+ if (!group_complete(fg) && !group_empty(fg))
+ PARA_WARNING_LOG("Clearing incomplete group %d "
+ "(contains %d slices)\n", fg->h.group_num,
+ fg->num_received_slices);
+ for (i = 0; i < fg->num_slices; i++) {
+ free(fg->data[i]);
+ fg->data[i] = NULL;
+ fg->idx[i] = -1;
+ }
+ free(fg->data);
+ free(fg->idx);
+ fg->num_slices = 0;
+ memset(&fg->h, 0, sizeof(struct fec_header));
+ fg->num_received_slices = 0;
+ fg->h.group_num = UNUSED_GROUP_NUM;
+}
+
+static int find_group(struct fec_header *h,
+ struct private_fecdec_data *pfd, struct fec_group **result)
+{
+ struct fec_group *fg;
+
+ FOR_EACH_FEC_GROUP(fg, pfd) {
+ if (fg->h.group_num != h->group_num)
+ continue;
+ *result = fg;
+ return 1;
+ }
+ return 0;
+}
+
+static struct fec_group *find_unused_group(struct private_fecdec_data *pfd)
+{
+ struct fec_group *fg;
+
+ FOR_EACH_FEC_GROUP(fg, pfd) {
+ if (fg->num_received_slices == 0)
+ return fg;
+ }
+ return NULL;
+}
+
+static struct fec_group *try_to_free_group(struct private_fecdec_data *pfd)
+{
+ struct fec_group *fg;
+
+ FOR_EACH_FEC_GROUP(fg, pfd) {
+ if (!group_complete(fg))
+ continue;
+ clear_group(fg);
+ return fg;
+ }
+ return NULL;
+}
+
+static struct fec_group *free_oldest_group(struct private_fecdec_data *pfd)
+{
+ struct fec_group *fg, *oldest = NULL;
+
+ FOR_EACH_FEC_GROUP(fg, pfd) {
+ if (!oldest || oldest->h.group_num > fg->h.group_num)
+ oldest = fg;
+ }
+ clear_group(oldest);
+ return oldest;
+}
+
+static int get_group(struct fec_header *h, struct private_fecdec_data *pfd,
+ struct fec_group **result)
+{
+ struct fec_group *fg;
+ int ret = find_group(h, pfd, &fg);
+
+ if (ret < 0)
+ return ret;
+ if (ret > 0) /* found group */
+ goto success;
+ /* group not found */
+ fg = find_unused_group(pfd);
+ if (fg)
+ goto update_header;
+ fg = try_to_free_group(pfd);
+ if (fg)
+ goto update_header;
+ fg = free_oldest_group(pfd);
+update_header:
+ fg->h = *h;
+success:
+ *result = fg;
+ return 1;
+}
+
+static int add_slice(char *buf, struct fec_group *fg)
+{
+ int r, slice_num;
+
+ if (group_complete(fg))
+ return 0;
+ slice_num = fg->h.slice_num;
+ if (fg->num_slices == 0) {
+ fg->num_slices = fg->h.slices_per_group;
+ fg->idx = malloc(fg->num_slices * sizeof(int));
+ fg->data = malloc(fg->num_slices * sizeof(unsigned char *));
+ memset(fg->data, 0, fg->num_slices * sizeof(unsigned char *));
+ }
+ r = fg->num_received_slices;
+ fg->idx[r] = slice_num;
+ fg->data[r] = malloc(fg->h.slice_bytes);
+ memcpy(fg->data[r], buf, fg->h.slice_bytes);
+ fg->num_received_slices++;
+ return 1;
+}
+
+static int decode_group(struct fec_group *fg, struct filter_node *fn)
+{
+ int i, ret, sb = fg->h.slice_bytes;
+ size_t written = 0;
+ struct private_fecdec_data *pfd = fn->private_data;
+
+ ret = fec_decode(pfd->fec, fg->data, fg->idx, sb);
+ if (ret < 0)
+ return ret;
+ PARA_DEBUG_LOG("writing group %d (%d/%d decoded data bytes)\n",
+ fg->h.group_num, fg->h.group_bytes,
+ fg->h.data_slices_per_group * sb);
+ for (i = 0; i < fg->h.data_slices_per_group; i++) {
+ size_t n = sb;
+ if (n + written > fg->h.group_bytes)
+ n = fg->h.group_bytes - written;
+ if (fn->loaded + n > fn->bufsize)
+ return -E_FECDEC_OVERRUN;
+ memcpy(fn->buf + fn->loaded, fg->data[i], n);
+ fn->loaded += n;
+ written += n;
+ }
+ return 0;
+}
+
+/**
+ * Read a fec header from a buffer.
+ *
+ * \param buf The buffer to write to.
+ * \param h The fec header to write.
+ */
+static int read_fec_header(char *buf, size_t len, struct fec_header *h)
+{
+ uint32_t magic;
+
+ if (len < FEC_HEADER_SIZE)
+ return 0;
+ magic = read_u32(buf);
+ if (magic != FEC_MAGIC)
+ return -E_BAD_FEC_HEADER;
+ h->slices_per_group = read_u8(buf + 4);
+ h->data_slices_per_group = read_u8(buf + 5);
+ h->audio_header_size = read_u32(buf + 6);
+
+ h->group_num = read_u32(buf + 10);
+ h->group_bytes = read_u32(buf + 14);
+
+ h->slice_num = read_u8(buf + 18);
+ h->slice_bytes = read_u16(buf + 20);
+ if (!h->group_bytes && & h->slice_bytes)
+ return -E_FECDEC_EOF;
+// PARA_DEBUG_LOG("group %u, slize %u, slices per group: %u\n",
+// h->group_num, h->slice_num, h->slices_per_group);
+ return 1;
+}
+
+static int dispatch_slice(char *buf, size_t len, struct fec_header *h,
+ struct filter_node *fn)
+{
+ struct fec_group *fg;
+ int ret;
+ struct private_fecdec_data *pfd = fn->private_data;
+
+ if (h->slice_bytes > len) /* can not use the thing, try to read more */
+ return 0;
+ ret = get_group(h, pfd, &fg);
+ if (ret < 0)
+ return ret;
+ if (group_complete(fg)) {
+ PARA_DEBUG_LOG("group complete, ignoring slice %d\n",
+ h->slice_num);
+ return 1;
+ }
+ fg->h = *h;
+ ret = add_slice(buf, fg);
+ if (ret < 0)
+ return ret;
+ if (group_complete(fg)) {
+ if (!pfd->fec) {
+ int k = h->data_slices_per_group, n = h->slices_per_group;
+ PARA_NOTICE_LOG("init fec (%d, %d)\n", k, n);
+ ret = fec_new(k, n, &pfd->fec);
+ if (ret < 0)
+ return ret;
+ }
+ ret = decode_group(fg, fn);
+ if (ret < 0)
+ return ret;
+ }
+ return 1;
+}
+
+static int fecdec(char *buf, size_t len, struct filter_node *fn)
+{
+ int ret;
+ struct fec_header h;
+
+ ret = read_fec_header(buf, len, &h);
+ if (ret <= 0)
+ return ret;
+ if (h.slice_bytes > INPUT_BUFFER_SIZE)
+ return -E_BAD_SLICE_SIZE;
+ if (h.slice_num > h.slices_per_group)
+ return -E_BAD_SLICE_NUM;
+ ret = dispatch_slice(buf + FEC_HEADER_SIZE, len - FEC_HEADER_SIZE,
+ &h, fn);
+ //PARA_INFO_LOG("ret: %d, len: %d, slice_bytes: %d\n", ret, len, h.slice_bytes);
+ if (ret <= 0)
+ return ret;
+ return FEC_HEADER_SIZE + h.slice_bytes;
+}
+
+static void fecdec_close(struct filter_node *fn)
+{
+ struct private_fecdec_data *pfd = fn->private_data;
+ struct fec_group *fg;
+
+ FOR_EACH_FEC_GROUP(fg, pfd)
+ clear_group(fg);
+ free(fn->buf);
+ fn->buf = NULL;
+ free(fn->private_data);
+ fn->private_data = NULL;
+}
+
+static void fecdec_open(struct filter_node *fn)
+{
+ fn->bufsize = FECDEC_OUTBUF_SIZE;
+ fn->buf = para_malloc(fn->bufsize);
+ fn->private_data = para_calloc(sizeof(struct private_fecdec_data));
+ fn->loaded = 0;
+}
+
+/**
+ * The init function of the fecdec filter.
+ *
+ * \param f struct to initialize.
+ */
+void fecdec_filter_init(struct filter *f)
+{
+ f->convert = fecdec;
+ f->close = fecdec_close;
+ f->open = fecdec_open;
+}
+++ /dev/null
-/*
- * Copyright (C) 2006-2009 Andre Noll <maan@systemlinux.org>
- *
- * Licensed under the GPL v2. For licencing details see COPYING.
- */
-
-/** \file udp_header.h some macros used by udp_send.c and udp_recv.c. */
-#include <net/if.h>
-
-/**
- * Number of bytes of the paraslash udp header.
- *
- * The udp sender prepends a header at the beginning of each data chunk. Within
- * this header, the type of the current audio stream and the * type of this
- * data chunk is coded.
- */
-#define UDP_AUDIO_HEADER_LEN 16
-
-/** The possible stream types. */
-enum udp_stream_type {
- /** Used for mp3 and aac streams. */
- UDP_PLAIN_STREAM,
- /** Ogg vorbis streams. */
- UDP_HEADER_STREAM,
- /** stream type not yet known. */
- UDP_UNKNOWN_STREAM
-};
-
-/** The possible packet types. */
-enum udp_audio_packet_type {
- /** Beginning of file. */
- UDP_BOF_PACKET,
- /** End of file. */
- UDP_EOF_PACKET,
- /** Combined header/data packet (ogg only). */
- UDP_HEADER_PACKET,
- /** Packet contains only audio file data. */
- UDP_DATA_PACKET,
- /** Invalid packet type. */
- UDP_UNKNOWN_PACKET
-};
-
-/** The contents of an udp audio header. */
-struct udp_audio_header {
- /** see \ref udp_stream_type. */
- uint8_t stream_type;
- /** see \ref udp_audio_packet_type. */
- uint8_t packet_type;
- /** Non-zero only for header packets. */
- uint16_t header_len;
- /** Length of header plus audio file data. */
- uint16_t payload_len;
-};
-
-/**
- * Write a struct udp_audio_header to a buffer.
- *
- * \param buf The buffer to write to.
- * \param h The audio header to write.
- *
- * Used by the udp sender.
- *
- */
-_static_inline_ void write_udp_audio_header(char *buf, struct udp_audio_header *h)
-{
- memcpy(buf, "UDPM", 4);
- write_u8(buf + 4, h->stream_type);
- write_u8(buf + 5, h->packet_type);
- write_u16(buf + 6, h->header_len);
- write_u16(buf + 8, h->payload_len);
- memset(buf + 10, 0, 6);
-}
-
-/**
- * Used by the udp receiver to read a struct udp_audio_header from a buffer.
- *
- * \param buf The buffer to read from.
- * \param len The length of \a buf.
- * \param h Result pointer.
- *
- * \return 1 if \a buf contains a valid udp audio header, -1 else.
- */
-_static_inline_ int read_udp_audio_header(char *buf, size_t len,
- struct udp_audio_header *h)
-{
- if (len < 4)
- goto err;
- if (memcmp(buf, "UDPM", 4))
- goto err;
- h->stream_type = read_u8(buf + 4);
- h->packet_type = read_u8(buf + 5);
- h->header_len = read_u16(buf + 6);
- h->payload_len = read_u16(buf + 8);
- return 1;
-err:
- h->stream_type = UDP_UNKNOWN_STREAM;
- h->packet_type = UDP_UNKNOWN_PACKET;
- h->header_len = h->payload_len = 0;
- return -1;
-}
/** \file udp_recv.c Paraslash's udp receiver */
#include <dirent.h>
+#include <net/if.h>
#include "para.h"
#include "error.h"
#include "portable_io.h"
-#include "udp_header.h"
#include "list.h"
#include "sched.h"
#include "ggo.h"
/** The size of the receiver node buffer. */
#define UDP_RECV_CHUNK_SIZE (128 * 1024)
-
/**
* Data specific to the udp receiver.
*
* \sa \ref receiver, \ref receiver_node.
*/
struct private_udp_recv_data {
- /**
- * Whether a header was received.
- *
- * A flag indicating whether this receiver already received a packet
- * which contains the audio file header.
- *
- * This flag has no effect if the audio stream indicates that no extra
- * headers will be sent (mp3, aac). Otherwise, all data packets are
- * dropped until the header is received.
- */
- int have_header;
/** The socket file descriptor. */
int fd;
- /** Non-zero on short reads. */
- uint16_t need_more;
- /** Copied from the first audio header received. */
- uint16_t stream_type;
};
static void udp_recv_pre_select(struct sched *s, struct task *t)
return nbytes + loaded < UDP_RECV_CHUNK_SIZE;
}
-/*
- * Perform some sanity checks on an udp audio file header.
- *
- * return: negative on error, 0: discard data, 1: use data
- */
-static int examine_audio_header(struct private_udp_recv_data *purd,
- struct udp_audio_header *uah, size_t packet_size)
-{
- /* payload_len includes header */
- if (uah->payload_len < uah->header_len)
- return -E_UDP_BAD_HEADER;
- switch (uah->packet_type) {
- case UDP_EOF_PACKET:
- return -E_RECV_EOF;
- case UDP_BOF_PACKET:
- purd->have_header = 1;
- /* fall through */
- case UDP_DATA_PACKET:
- if (uah->header_len) /* header in no-header packet */
- return -E_UDP_BAD_HEADER;
- break;
- case UDP_HEADER_PACKET:
- if (!uah->header_len) /** no header in header packet */
- return -E_UDP_BAD_HEADER;
- break;
- default: /* bad packet type */
- return -E_UDP_BAD_HEADER;
- }
- /* check stream type */
- if (uah->stream_type != UDP_PLAIN_STREAM &&
- uah->stream_type != UDP_HEADER_STREAM)
- return -E_UDP_BAD_STREAM_TYPE;
- if (purd->stream_type == UDP_UNKNOWN_STREAM)
- purd->stream_type = uah->stream_type;
- /* stream type must not change */
- if (uah->stream_type != purd->stream_type)
- return -E_UDP_BAD_STREAM_TYPE;
- if (!purd->have_header && uah->stream_type == UDP_HEADER_STREAM)
- /* can't use the data, wait for header packet */
- return 0;
- if (packet_size < uah->payload_len + UDP_AUDIO_HEADER_LEN)
- /* we read only a part of the package */
- purd->need_more = uah->payload_len
- + UDP_AUDIO_HEADER_LEN - packet_size;
- return 1;
-}
-
static int add_rn_output(struct receiver_node *rn, char *buf, size_t len)
{
if (!len)
struct private_udp_recv_data *purd = rn->private_data;
int ret;
char tmpbuf[UDP_RECV_CHUNK_SIZE];
- uint16_t data_len;
- char *data_buf;
size_t packet_size;
- struct udp_audio_header uah;
if (rn->output_error && *rn->output_error < 0) {
t->error = *rn->output_error;
if (!ret)
return;
packet_size = ret;
- for (;;) {
- uint16_t num;
-
- if (!purd->need_more) {
- ret = read_udp_audio_header(tmpbuf, packet_size, &uah);
- if (ret >= 0)
- break;
- goto success; /* drop data */
- }
- num = PARA_MIN(purd->need_more, (uint16_t)packet_size);
- assert(num > 0);
- t->error = add_rn_output(rn, tmpbuf, num);
- if (t->error < 0)
- return;
- purd->need_more -= num;
- if (packet_size <= num)
- goto success;
- packet_size -= num;
- memmove(tmpbuf, tmpbuf + num, packet_size);
- }
- assert(!purd->need_more);
- t->error = examine_audio_header(purd, &uah, packet_size);
- if (t->error <= 0)
+ t->error = add_rn_output(rn, tmpbuf, packet_size);
+ if (t->error < 0)
return;
- data_len = uah.payload_len;
- data_buf = tmpbuf + UDP_AUDIO_HEADER_LEN;
- if (uah.packet_type == UDP_HEADER_PACKET) {
- if (purd->have_header) { /* skip header */
- data_buf += uah.header_len;
- data_len -= uah.header_len;
- } else { /* only use the header */
- purd->have_header = 1;
- data_len = uah.header_len;
- }
- }
- t->error = add_rn_output(rn, data_buf, data_len);
- return;
success:
t->error = 1;
}
ret = mark_fd_nonblocking(purd->fd);
if (ret < 0)
goto err;
- purd->stream_type = UDP_UNKNOWN_STREAM;
PARA_NOTICE_LOG("receiving from %s:%d, fd=%d\n", c->host_arg,
c->port_arg, purd->fd);
return purd->fd;
#include <sys/time.h>
#include <dirent.h>
+#include <net/if.h>
#include "server.cmdline.h"
#include "para.h"
#include "list.h"
#include "send.h"
#include "portable_io.h"
-#include "udp_header.h"
#include "net.h"
#include "fd.h"
#include "sched.h"
int fd;
/** The list of queued chunks for this fd. */
struct chunk_queue *cq;
+ struct fec_client *fc;
+ struct fec_client_parms fcp;
};
static struct list_head targets;
PARA_NOTICE_LOG("deleting %s#%d (%s) from list\n", ut->host,
ut->port, msg);
udp_close_target(ut);
+ vss_del_fec_client(ut->fc);
list_del(&ut->node);
free(ut);
}
return 1;
}
-static void udp_send_buf(char *buf, size_t len)
-{
- struct udp_target *ut, *tmp;
- int ret;
-
- list_for_each_entry_safe(ut, tmp, &targets, node) {
- ret = udp_init_session(ut);
- if (ret < 0) {
- udp_delete_target(ut, para_strerror(-ret));
- continue;
- }
- ret = send_queued_chunks(ut->fd, ut->cq, 0);
- if (ret < 0) {
- udp_delete_target(ut, para_strerror(-ret));
- continue;
- }
- if (!len)
- continue;
- if (!ret) { /* still data left in the queue */
- ret = cq_enqueue(ut->cq, buf, len);
- if (ret < 0) {
- udp_delete_target(ut, para_strerror(-ret));
- continue;
- }
- }
- ret = write_nonblock(ut->fd, buf, len, 0);
- if (ret < 0) {
- udp_delete_target(ut, para_strerror(-ret));
- continue;
- }
- if (ret != len) {
- ret = cq_enqueue(ut->cq, buf + ret, len - ret);
- if (ret < 0) {
- udp_delete_target(ut, para_strerror(-ret));
- continue;
- }
- }
- }
-}
-
static void udp_shutdown_targets(void)
{
- char buf[UDP_AUDIO_HEADER_LEN];
struct udp_target *ut, *tmp;
- struct udp_audio_header uah = {
- .stream_type = UDP_UNKNOWN_STREAM,
- .packet_type = UDP_EOF_PACKET,
- };
+ const char *buf = NULL;
+ size_t len = 0; /* STFU, gcc */
- write_udp_audio_header(buf, &uah);
list_for_each_entry_safe(ut, tmp, &targets, node) {
if (ut->fd < 0)
continue;
- write(ut->fd, buf, UDP_AUDIO_HEADER_LEN);
+ if (!buf)
+ len = vss_get_fec_eof_packet(&buf);
+ write(ut->fd, buf, len);
udp_close_target(ut);
}
}
-static int need_extra_header(long unsigned current_chunk)
-{
- static struct timeval last_header;
- struct timeval diff;
-
- if (!current_chunk)
- return 0;
- tv_diff(now, &last_header, &diff);
- if (tv2ms(&diff) < conf.udp_header_interval_arg)
- return 0;
- last_header = *now;
- return 1;
-}
-
-static void udp_send(long unsigned current_chunk, __a_unused long unsigned chunks_sent,
- const char *buf, size_t len, const char *header_buf,
- size_t header_len)
-{
- char *sendbuf;
- size_t sendbuf_len;
- struct timeval *chunk_tv;
- struct udp_audio_header uah;
-
-// PARA_NOTICE_LOG("len: %zd, header_len: %zd\n", len, header_len);
- if (sender_status != SENDER_ON)
- return;
-
- /* we might not yet know the chunk time */
- chunk_tv = vss_chunk_time();
- if (!chunk_tv)
- return;
- if (list_empty(&targets))
- return;
- uah.stream_type = header_len? UDP_HEADER_STREAM : UDP_PLAIN_STREAM;
- uah.header_len = need_extra_header(current_chunk)? header_len : 0;
- if (!current_chunk)
- uah.packet_type = UDP_BOF_PACKET;
- else if (uah.header_len)
- uah.packet_type = UDP_HEADER_PACKET;
- else
- uah.packet_type = UDP_DATA_PACKET;
- uah.payload_len = uah.header_len + len;
- sendbuf_len = UDP_AUDIO_HEADER_LEN + uah.payload_len;
- sendbuf = para_malloc(sendbuf_len);
- write_udp_audio_header(sendbuf, &uah);
- if (uah.header_len)
- memcpy(sendbuf + UDP_AUDIO_HEADER_LEN, header_buf,
- uah.header_len);
- memcpy(sendbuf + UDP_AUDIO_HEADER_LEN + uah.header_len, buf, len);
- udp_send_buf(sendbuf, sendbuf_len);
- free(sendbuf);
-}
-
static int udp_com_on(__a_unused struct sender_command_data *scd)
{
sender_status = SENDER_ON;
return 1;
}
+static int udp_send_fec(char *buf, size_t len, void *private_data)
+{
+ struct udp_target *ut = private_data;
+ int ret = udp_init_session(ut);
+
+ if (ret < 0)
+ goto fail;
+ ret = send_queued_chunks(ut->fd, ut->cq, 0);
+ if (ret < 0)
+ goto fail;
+ if (!len)
+ return 0;
+ if (!ret) { /* still data left in the queue */
+ ret = cq_enqueue(ut->cq, buf, len);
+ if (ret < 0)
+ goto fail;
+ }
+ ret = write_nonblock(ut->fd, buf, len, 0);
+ if (ret < 0)
+ goto fail;
+ if (ret != len) {
+ ret = cq_enqueue(ut->cq, buf + ret, len - ret);
+ if (ret < 0)
+ goto fail;
+ }
+ return 1;
+fail:
+ udp_delete_target(ut, para_strerror(-ret));
+ return ret;
+}
+
static void udp_add_target(const char *host, int port)
{
struct udp_target *ut = para_calloc(sizeof(struct udp_target));
PARA_INFO_LOG("adding to target list (%s#%d)\n",
ut->host, ut->port);
para_list_add(&ut->node, &targets);
+ ut->fcp.slices_per_group = 16;
+ ut->fcp.data_slices_per_group = 14;
+ ut->fcp.max_slice_bytes = 1400;
+ ut->fcp.send = udp_send_fec;
+ ut->fcp.private_data = ut;
+ vss_add_fec_client(&ut->fcp, &ut->fc);
}
static int udp_com_add(struct sender_command_data *scd)
INIT_LIST_HEAD(&targets);
s->info = udp_info;
s->help = udp_help;
- s->send = udp_send;
+ s->send = NULL;
s->pre_select = NULL;
s->post_select = NULL;
s->shutdown_clients = udp_shutdown_targets;
#include "para.h"
#include "error.h"
+#include "portable_io.h"
+#include "fec.h"
#include "string.h"
#include "afh.h"
#include "afs.h"
#include "server.h"
#include "net.h"
#include "server.cmdline.h"
-#include "vss.h"
#include "list.h"
+#include "vss.h"
#include "send.h"
#include "ipc.h"
#include "fd.h"
size_t header_len;
};
+static struct list_head fec_client_list;
+
+struct fec_slice {
+ uint8_t num;
+ uint16_t bytes;
+};
+
+struct fec_group {
+ uint32_t num;
+ uint32_t bytes;
+ uint32_t first_chunk;
+ uint32_t num_chunks;
+ struct timeval duration;
+ struct timeval start;
+ struct timeval slice_duration;
+};
+
+struct fec_client {
+ struct fec_client_parms *fcp;
+ struct fec_parms *parms;
+ struct list_head node;
+ struct timeval stream_start;
+ int first_stream_chunk;
+ struct fec_group group;
+ struct fec_slice slice;
+ const unsigned char **src_data;
+ unsigned char *extra_src_buf;
+ size_t extra_src_buf_size;
+ unsigned char *enc_buf;
+ size_t enc_buf_size;
+};
+
+/**
+ * Get the chunk time of the current audio file.
+ *
+ * \return A pointer to a struct containing the chunk time, or NULL,
+ * if currently no audio file is selected.
+ */
+struct timeval *vss_chunk_time(void)
+{
+ if (mmd->afd.afhi.chunk_tv.tv_sec == 0 &&
+ mmd->afd.afhi.chunk_tv.tv_usec == 0)
+ return NULL;
+ return &mmd->afd.afhi.chunk_tv;
+}
+
+static void setup_fec_group(struct fec_client *fc, struct vss_task *vsst)
+{
+ uint32_t num_bytes = 0, chunk_num, max_group_size, last_payload_size;
+ int i, k = fc->fcp->data_slices_per_group;
+ const unsigned char *start_buf = NULL;
+ struct timeval tmp, *chunk_tv = vss_chunk_time();
+
+ assert(chunk_tv);
+ max_group_size = (fc->fcp->max_slice_bytes - FEC_HEADER_SIZE) * k;
+ chunk_num = fc->group.first_chunk;
+ for (;;) {
+ const unsigned char *buf;
+ size_t len;
+
+ if (chunk_num >= mmd->afd.afhi.chunks_total)
+ break;
+ afh_get_chunk(chunk_num, &mmd->afd.afhi, vsst->map, (const char **)&buf, &len);
+ if (!start_buf)
+ start_buf = buf;
+ if (num_bytes + len > max_group_size)
+ break;
+ chunk_num++;
+ num_bytes += len;
+ }
+ assert(start_buf);
+ fc->group.num_chunks = chunk_num - fc->group.first_chunk;
+ fc->group.num++;
+ fc->group.bytes = num_bytes;
+ fc->slice.num = 0;
+ fc->slice.bytes = ROUND_UP(num_bytes, k) / k;
+
+ /* The last slice will not be fully used */
+ last_payload_size = num_bytes % fc->slice.bytes;
+ if (!last_payload_size)
+ last_payload_size = fc->slice.bytes;
+
+ tv_scale(fc->group.num_chunks, chunk_tv, &fc->group.duration);
+ tv_scale(fc->group.first_chunk - fc->first_stream_chunk, chunk_tv,
+ &tmp);
+ tv_add(&fc->stream_start, &tmp, &fc->group.start);
+ tv_divide(fc->fcp->slices_per_group, &fc->group.duration,
+ &fc->group.slice_duration);
+
+ for (i = 0; i < k; i++)
+ fc->src_data[i] = start_buf + i * fc->slice.bytes;
+
+ if ((char *)start_buf + k * fc->slice.bytes > vsst->map + mmd->size) {
+ /* can not use last slice as it goes beyond the map */
+ if (fc->extra_src_buf_size < fc->slice.bytes)
+ fc->extra_src_buf = para_realloc(fc->extra_src_buf, fc->slice.bytes);
+ memcpy(fc->extra_src_buf, start_buf + (k - 1) * fc->slice.bytes,
+ last_payload_size);
+ memset(fc->extra_src_buf + last_payload_size, 0,
+ fc->slice.bytes - last_payload_size);
+ fc->src_data[k - 1] = fc->extra_src_buf;
+ }
+
+}
+
+/**
+ * Write a fec header to a buffer.
+ *
+ * \param buf The buffer to write to.
+ * \param h The fec header to write.
+ */
+static void write_fec_header(struct fec_client *fc)
+{
+ char *buf = (char *)fc->enc_buf;
+
+ write_u32(buf, FEC_MAGIC);
+
+ write_u8(buf + 4, fc->fcp->slices_per_group);
+ write_u8(buf + 5, fc->fcp->data_slices_per_group);
+ write_u32(buf + 6, (uint32_t)0); /* audio header len */
+
+ write_u32(buf + 10, fc->group.num);
+ write_u32(buf + 14, fc->group.bytes);
+
+ write_u8(buf + 18, fc->slice.num);
+ write_u16(buf + 20, fc->slice.bytes);
+ memset(buf + 22, 0, 11);
+}
+
+/**
+ * Return a buffer that marks the end of the stream.
+ *
+ * \return The length of the eof buffer.
+ *
+ * This is used for (multicast) udp streaming where closing the socket on the
+ * sender might not give rise to an eof condition at the peer.
+ */
+size_t vss_get_fec_eof_packet(const char **buf)
+{
+ static const char fec_eof_packet[FEC_HEADER_SIZE] =
+ "\xec\x0d\xcc\xfe\0\0\0\0"
+ "\0\0\0\0\0\0\0\0"
+ "\0\0\0\0\0\0\0\0"
+ "\0\0\0\0\0\0\0\0";
+ *buf = fec_eof_packet;
+ return FEC_HEADER_SIZE;
+}
+
+static void compute_next_fec_slice(struct fec_client *fc, struct vss_task *vsst)
+{
+ if (fc->first_stream_chunk < 0) {
+ fc->stream_start = *now;
+ fc->first_stream_chunk = mmd->current_chunk;
+ fc->group.first_chunk = mmd->current_chunk;
+ fc->group.num = 0;
+ setup_fec_group(fc, vsst);
+ } else if (fc->slice.num == fc->fcp->slices_per_group) {
+ fc->group.first_chunk += fc->group.num_chunks;
+ setup_fec_group(fc, vsst);
+
+ }
+ if (fc->enc_buf_size < fc->slice.bytes + FEC_HEADER_SIZE) {
+ fc->enc_buf_size = fc->slice.bytes + FEC_HEADER_SIZE;
+ fc->enc_buf = para_realloc(fc->enc_buf, fc->enc_buf_size);
+ }
+ write_fec_header(fc);
+ fec_encode(fc->parms, fc->src_data, fc->enc_buf + FEC_HEADER_SIZE,
+ fc->slice.num, fc->slice.bytes);
+}
+
+/**
+ * Add one entry to the list of active fec clients.
+ *
+ * \param fcp Describes the fec parameters to be used for this client.
+ * \param result An opaque pointer that must be used by remove the client later.
+ *
+ * \return Standard.
+ */
+int vss_add_fec_client(struct fec_client_parms *fcp, struct fec_client **result)
+{
+ int ret;
+ struct fec_client *fc;
+
+ if (fcp->max_slice_bytes < FEC_HEADER_SIZE + fcp->data_slices_per_group)
+ return -ERRNO_TO_PARA_ERROR(EINVAL);
+ fc = para_calloc(sizeof(*fc));
+ fc->fcp = fcp;
+ ret = fec_new(fcp->data_slices_per_group, fcp->slices_per_group,
+ &fc->parms);
+ if (ret < 0)
+ goto err;
+ fc->first_stream_chunk = -1; /* stream not yet started */
+ fc->src_data = para_malloc(fc->fcp->slices_per_group * sizeof(char *));
+ para_list_add(&fc->node, &fec_client_list);
+ *result = fc;
+ return 1;
+err:
+ fec_free(fc->parms);
+ free(fc);
+ *result = NULL;
+ return ret;
+}
+
+/**
+ * Remove one entry from the list of active fec clients.
+ *
+ * \param result The client to be removed.
+ */
+void vss_del_fec_client(struct fec_client *fc)
+{
+ list_del(&fc->node);
+ free(fc->src_data);
+ free(fc->enc_buf);
+ free(fc->extra_src_buf);
+ fec_free(fc->parms);
+ free(fc);
+}
+
+/*
+ * Compute if/when next slice is due. If it isn't due yet and \a diff is
+ * not \p Null, compute the time difference next - now, where
+ *
+ * next = stream_start + (first_group_chunk - first_stream_chunk)
+ * * chunk_time + slice_num * slice_time
+ */
+static int next_slice_is_due(struct fec_client *fc, struct timeval *diff)
+{
+ struct timeval tmp, next;
+ int ret;
+
+ if (fc->first_stream_chunk < 0)
+ return 1;
+ tv_scale(fc->slice.num, &fc->group.slice_duration, &tmp);
+ tv_add(&tmp, &fc->group.start, &next);
+ ret = tv_diff(&next, now, diff);
+ return ret < 0? 1 : 0;
+}
+
+static void compute_slice_timeout(struct timeval *timeout)
+{
+ struct fec_client *fc;
+
+ assert(vss_playing());
+ list_for_each_entry(fc, &fec_client_list, node) {
+ struct timeval diff;
+ int ret = next_slice_is_due(fc, &diff);
+
+ // PARA_NOTICE_LOG("diff: %lu, ret: %d\n", tv2ms(&diff), ret);
+ if (ret) {
+ timeout->tv_sec = 0;
+ timeout->tv_usec = 0;
+ goto out;
+ }
+ /* timeout = min(timeout, diff) */
+ if (tv_diff(&diff, timeout, NULL) < 0)
+ *timeout = diff;
+ }
+out:
+ return;
+ PARA_NOTICE_LOG("slice timeout: %lu:%lu\n", (long unsigned)timeout->tv_sec, (long unsigned)timeout->tv_usec);
+}
+
/**
* Check if vss status flag \a P (playing) is set.
*
return NULL;
compute_chunk_time(mmd->chunks_sent, &mmd->afd.afhi.chunk_tv,
&mmd->stream_start, &next_chunk);
- if (chk_barrier("chunk", &next_chunk, &the_timeout, 0) < 0)
+ if (chk_barrier("chunk", &next_chunk, &the_timeout, 0) >= 0) {
+ /* chunk is due or bof */
+ the_timeout.tv_sec = 0;
+ the_timeout.tv_usec = 0;
return &the_timeout;
- /* chunk is due or bof */
- the_timeout.tv_sec = 0;
- the_timeout.tv_usec = 0;
+ }
+ /* compute min of current timeout and next slice time */
+ compute_slice_timeout(&the_timeout);
return &the_timeout;
}
static void vss_eof(struct vss_task *vsst)
{
+
mmd->stream_start = *now;
if (!vsst->map)
return;
return SUPPORTED_AUDIO_FORMATS;
}
-/**
- * Get the chunk time of the current audio file.
- *
- * \return A pointer to a struct containing the chunk time, or NULL,
- * if currently no audio file is selected.
- */
-struct timeval *vss_chunk_time(void)
-{
- if (mmd->afd.afhi.chunk_tv.tv_sec == 0 &&
- mmd->afd.afhi.chunk_tv.tv_usec == 0)
- return NULL;
- return &mmd->afd.afhi.chunk_tv;
-}
-
static int need_to_request_new_audio_file(struct vss_task *vsst)
{
struct timeval diff;
return 1;
}
+
+
/**
* Compute the timeout for para_server's main select-loop.
*
struct timeval *tv, diff;
struct vss_task *vsst = container_of(t, struct vss_task, task);
- if (!vsst->map || vss_next() || vss_paused() || vss_repos())
+ if (!vsst->map || vss_next() || vss_paused() || vss_repos()) {
+ struct fec_client *fc, *tmp;
for (i = 0; senders[i].name; i++)
- senders[i].shutdown_clients();
+ if (senders[i].shutdown_clients)
+ senders[i].shutdown_clients();
+ list_for_each_entry_safe(fc, tmp, &fec_client_list, node)
+ fc->first_stream_chunk = -1;
+ }
if (vss_next())
vss_eof(vsst);
else if (vss_paused()) {
/**
* Main sending function.
*
- * This function gets called from para_server as soon as the next chunk of data
- * should be pushed out. It obtains a pointer to the data to be sent out as
- * well as its length from mmd->afd.afhi. This information is then passed to
- * each supported sender's send() function which is supposed to send out the data
- * to all connected clients.
+ * This function gets called from vss_post_select(). It checks whether the next
+ * chunk of data should be pushed out. It obtains a pointer to the data to be
+ * sent out as well as its length from mmd->afd.afhi. This information is then
+ * passed to each supported sender's send() function as well as to the send()
+ * functions of each registered fec client.
*/
-static void vss_send_chunk(struct vss_task *vsst)
+static void vss_send(struct vss_task *vsst)
{
int i;
struct timeval due;
- const char *buf;
- size_t len;
+ struct fec_client *fc, *tmp_fc;
if (!vsst->map || !vss_playing())
return;
- compute_chunk_time(mmd->chunks_sent, &mmd->afd.afhi.chunk_tv,
- &mmd->stream_start, &due);
- if (tv_diff(&due, now, NULL) > 0)
- return;
if (chk_barrier("eof", &vsst->eof_barrier, &due, 1) < 0)
return;
if (chk_barrier("data send", &vsst->data_send_barrier,
mmd->new_vss_status_flags |= VSS_NEXT;
return;
}
- /*
- * We call the send function also in case of empty chunks as they
- * might have still some data queued which can be sent in this case.
- */
if (!mmd->chunks_sent) {
struct timeval tmp;
mmd->stream_start = *now;
mmd->offset = tv2ms(&tmp);
mmd->events++;
}
- afh_get_chunk(mmd->current_chunk, &mmd->afd.afhi, vsst->map, &buf, &len);
- for (i = 0; senders[i].name; i++)
- senders[i].send(mmd->current_chunk, mmd->chunks_sent, buf, len,
- vsst->header_buf, vsst->header_len);
+ compute_chunk_time(mmd->chunks_sent, &mmd->afd.afhi.chunk_tv,
+ &mmd->stream_start, &due);
+ if (tv_diff(&due, now, NULL) <= 0) {
+ const char *buf;
+ size_t len;
+ /*
+ * We call the send function also in case of empty chunks as
+ * they might have still some data queued which can be sent in
+ * this case.
+ */
+ afh_get_chunk(mmd->current_chunk, &mmd->afd.afhi, vsst->map,
+ &buf, &len);
+ for (i = 0; senders[i].name; i++) {
+ if (!senders[i].send)
+ continue;
+ senders[i].send(mmd->current_chunk, mmd->chunks_sent,
+ buf, len, vsst->header_buf, vsst->header_len);
+ }
+ }
+ list_for_each_entry_safe(fc, tmp_fc, &fec_client_list, node) {
+ if (!next_slice_is_due(fc, NULL))
+ continue;
+ compute_next_fec_slice(fc, vsst);
+ PARA_DEBUG_LOG("sending %d:%d (%zu bytes)\n", fc->group.num,
+ fc->slice.num, fc->slice.bytes);
+ fc->fcp->send((char *)fc->enc_buf,
+ fc->slice.bytes + FEC_HEADER_SIZE,
+ fc->fcp->private_data);
+ fc->slice.num++;
+ }
mmd->new_vss_status_flags |= VSS_PLAYING;
mmd->chunks_sent++;
mmd->current_chunk++;
if ((vss_playing() && !(mmd->vss_status_flags & VSS_PLAYING)) ||
(vss_next() && vss_playing()))
tv_add(now, &vsst->announce_tv, &vsst->data_send_barrier);
- vss_send_chunk(vsst);
+ vss_send(vsst);
}
/**
tv_add(&vsst->autoplay_barrier, &vsst->announce_tv,
&vsst->data_send_barrier);
}
+ INIT_LIST_HEAD(&fec_client_list);
register_task(&vsst->task);
}
#define VSS_PLAYING 8
/** A client requested to change the audio file selector. */
#define VSS_CHANGE 16
+
+/**
+ * Each paraslash sender may register arbitrary many clients to the virtual
+ * streaming system, possibly with varying fec parameters. In order to do so,
+ * it must allocate a \a fec_client_parms structure and pass it to \ref
+ * add_fec_client.
+ *
+ * Clients are automatically removed from that list by the vss if an error
+ * occurs, or if the sender requests deletion of a client by calling \ref
+ * vss_del_fec_client().
+ */
+struct fec_client;
+
+struct fec_client_parms {
+ uint8_t slices_per_group;
+ uint8_t data_slices_per_group;
+ uint16_t max_slice_bytes;
+ int (*send)(char *buf, size_t num_bytes, void *private_data);
+ void *private_data;
+};
+
+int vss_add_fec_client(struct fec_client_parms *fcp, struct fec_client **result);
+void vss_del_fec_client(struct fec_client *fc);
+size_t vss_get_fec_eof_packet(const char **buf);