From: Andre Noll Date: Sat, 28 Feb 2009 13:55:46 +0000 (+0100) Subject: Add forward error correction code to the udp sender/receiver. X-Git-Tag: v0.3.4~57^2~16^2 X-Git-Url: http://git.tue.mpg.de/?a=commitdiff_plain;h=625c5cd993d07a63061a0788f174e12fa1c221e0;p=paraslash.git Add forward error correction code to the udp sender/receiver. This patch adds the first draft of a FEC implementation based on code by Luigi Rizzo. On the server side, the FEC encoding is done within the virtual streaming system which also contains the timing routines for sending a FEC-encoded audio stream. Senders my request such an encoded stream by calling vss_add_fec_client() with a fec_client_parms structure that contains the FEC parameters and a callback function which is used to actually send the data. On the receiver side, the new fecdec filter is introduced which must be used to decode a FEC-encoded stream. As the fec parameters are contained in the header of each data slice of the encoded stream, no options to this filter are necessary. ATM, FEC is only used by the udp sender/receiver, but other protocols can be easily changed to use FEC as well. This new code is still experimental, lacks documentation and the fec parameters can currently only be changed by editing the source code. --- diff --git a/configure.ac b/configure.ac index 13ec59ba..dca49203 100644 --- a/configure.ac +++ b/configure.ac @@ -84,22 +84,23 @@ dccp_send fd user_list chunk_queue afs osl aft mood score attribute blob ringbuf playlist sha1 rbtree sched audiod grab_client filter_common wav_filter compress_filter http_recv dccp_recv recv_common write_common file_write audiod_command client_common recv stdout filter stdin audioc write client fsck exec send_common ggo -udp_recv udp_send color" +udp_recv udp_send color fec fecdec_filter" all_executables="server recv filter audioc write client fsck afh" recv_cmdline_objs="recv.cmdline http_recv.cmdline dccp_recv.cmdline udp_recv.cmdline" recv_errlist_objs="http_recv recv_common recv time string net dccp_recv - fd sched stdout ggo udp_recv" + fd sched stdout ggo udp_recv fec" recv_ldflags="" receivers=" http dccp udp" senders=" http dccp udp" filter_cmdline_objs="filter.cmdline compress_filter.cmdline amp_filter.cmdline" -filter_errlist_objs="filter_common wav_filter compress_filter filter string stdin stdout sched fd amp_filter ggo" +filter_errlist_objs="filter_common wav_filter compress_filter filter string + stdin stdout sched fd amp_filter ggo fecdec_filter fec" filter_ldflags="" -filters=" compress wav amp" +filters=" compress wav amp fecdec" audioc_cmdline_objs="audioc.cmdline" audioc_errlist_objs="audioc string net fd" @@ -110,8 +111,8 @@ audiod_cmdline_objs="audiod.cmdline grab_client.cmdline compress_filter.cmdline audiod_command_list amp_filter.cmdline udp_recv.cmdline" audiod_errlist_objs="audiod signal string daemon stat net time grab_client filter_common wav_filter compress_filter amp_filter http_recv dccp_recv - recv_common fd sched write_common file_write audiod_command crypt - client_common ggo udp_recv color" + recv_common fd sched write_common file_write audiod_command crypt fecdec_filter + client_common ggo udp_recv color fec" audiod_ldflags="" audiod_audio_formats="" @@ -123,7 +124,7 @@ server_cmdline_objs="server.cmdline server_command_list afs_command_list" server_errlist_objs="server afh_common mp3_afh vss command net string signal time daemon stat crypt http_send close_on_fork ipc dccp_send fd user_list chunk_queue afs osl aft mood score attribute - blob playlist sha1 rbtree sched acl send_common udp_send color" + blob playlist sha1 rbtree sched acl send_common udp_send color fec" server_ldflags="" server_audio_formats=" mp3" diff --git a/error.h b/error.h index 0a5fb217..c3ac3dca 100644 --- a/error.h +++ b/error.h @@ -31,13 +31,27 @@ DEFINE_ERRLIST_OBJECT_ENUM; #define GGO_ERRORS #define COLOR_ERRORS - extern const char **para_errlist[]; #define COMPRESS_FILTER_ERRORS \ PARA_ERROR(COMPRESS_SYNTAX, "syntax error in compress filter config"), \ +#define FEC_ERRORS \ + PARA_ERROR(FEC_BAD_IDX, "invalid index vector"), \ + PARA_ERROR(FEC_SINGULAR, "unexpected singular matrix"), \ + PARA_ERROR(FEC_PIVOT, "pivot column not found"), \ + PARA_ERROR(FEC_PARMS, "invalid fec parameters"), \ + + +#define FECDEC_FILTER_ERRORS \ + PARA_ERROR(BAD_FEC_HEADER, "invalid fec header"), \ + PARA_ERROR(BAD_SLICE_SIZE, "slice size too large"), \ + PARA_ERROR(BAD_SLICE_NUM, "invalid slice number"), \ + PARA_ERROR(FECDEC_OVERRUN, "fecdec output buffer overrun"), \ + PARA_ERROR(FECDEC_EOF, "received eof packet"), \ + + #define AMP_FILTER_ERRORS \ PARA_ERROR(AMP_SYNTAX, "syntax error in amp filter config"), \ diff --git a/fec.c b/fec.c new file mode 100644 index 00000000..e543aedc --- /dev/null +++ b/fec.c @@ -0,0 +1,598 @@ +/* + * fec.c -- forward error correction based on Vandermonde matrices + * 980624 + * (C) 1997-98 Luigi Rizzo (luigi@iet.unipi.it) + * + * Portions derived from code by Phil Karn (karn@ka9q.ampr.org), + * Robert Morelos-Zaragoza (robert@spectra.eng.hawaii.edu) and Hari + * Thirumoorthy (harit@spectra.eng.hawaii.edu), Aug 1995 + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, + * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A + * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS + * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, + * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR + * TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT + * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY + * OF SUCH DAMAGE. + */ + +#include "para.h" +#include "error.h" +#include "portable_io.h" +#include "string.h" +#include "fec.h" + +#define GF_BITS 8 /* code over GF(256) */ +#define GF_SIZE ((1 << GF_BITS) - 1) + +/* + * To speed up computations, we have tables for logarithm, exponent and inverse + * of a number. We use a table for multiplication as well (it takes 64K, no big + * deal even on a PDA, especially because it can be pre-initialized an put into + * a ROM!). The macro gf_mul(x,y) takes care of multiplications. + */ +static unsigned char gf_exp[2 * GF_SIZE]; /* index->poly form conversion table */ +static int gf_log[GF_SIZE + 1]; /* Poly->index form conversion table */ +static unsigned char inverse[GF_SIZE + 1]; /* inverse of field elem. */ +static unsigned char gf_mul_table[GF_SIZE + 1][GF_SIZE + 1]; +/* Multiply two numbers. */ +#define gf_mul(x,y) gf_mul_table[x][y] + +/* Compute x % GF_SIZE without a slow divide. */ +static inline unsigned char modnn(int x) +{ + while (x >= GF_SIZE) { + x -= GF_SIZE; + x = (x >> GF_BITS) + (x & GF_SIZE); + } + return x; +} + +static void init_mul_table(void) +{ + int i, j; + for (i = 0; i < GF_SIZE + 1; i++) + for (j = 0; j < GF_SIZE + 1; j++) + gf_mul_table[i][j] = + gf_exp[modnn(gf_log[i] + gf_log[j])]; + + for (j = 0; j < GF_SIZE + 1; j++) + gf_mul_table[0][j] = gf_mul_table[j][0] = 0; +} + +static unsigned char *alloc_matrix(int rows, int cols) +{ + return para_malloc(rows * cols); +} + +/* + * Initialize the data structures used for computations in GF. + * + * This generates GF(2**GF_BITS) from the irreducible polynomial p(X) in + * p[0]..p[m]. + * + * Lookup tables: + * index->polynomial form gf_exp[] contains j= \alpha^i; + * polynomial form -> index form gf_log[ j = \alpha^i ] = i + * \alpha=x is the primitive element of GF(2^m) + * + * For efficiency, gf_exp[] has size 2*GF_SIZE, so that a simple + * multiplication of two numbers can be resolved without calling modnn + */ +static void generate_gf(void) +{ + int i; + unsigned char mask = 1; + char *pp = "101110001"; /* The primitive polynomial 1+x^2+x^3+x^4+x^8 */ + gf_exp[GF_BITS] = 0; /* will be updated at the end of the 1st loop */ + + /* + * first, generate the (polynomial representation of) powers of \alpha, + * which are stored in gf_exp[i] = \alpha ** i . + * At the same time build gf_log[gf_exp[i]] = i . + * The first GF_BITS powers are simply bits shifted to the left. + */ + for (i = 0; i < GF_BITS; i++, mask <<= 1) { + gf_exp[i] = mask; + gf_log[gf_exp[i]] = i; + /* + * If pp[i] == 1 then \alpha ** i occurs in poly-repr + * gf_exp[GF_BITS] = \alpha ** GF_BITS + */ + if (pp[i] == '1') + gf_exp[GF_BITS] ^= mask; + } + /* + * now gf_exp[GF_BITS] = \alpha ** GF_BITS is complete, so can also + * compute its inverse. + */ + gf_log[gf_exp[GF_BITS]] = GF_BITS; + /* + * Poly-repr of \alpha ** (i+1) is given by poly-repr of \alpha ** i + * shifted left one-bit and accounting for any \alpha ** GF_BITS term + * that may occur when poly-repr of \alpha ** i is shifted. + */ + mask = 1 << (GF_BITS - 1); + for (i = GF_BITS + 1; i < GF_SIZE; i++) { + if (gf_exp[i - 1] >= mask) + gf_exp[i] = + gf_exp[GF_BITS] ^ ((gf_exp[i - 1] ^ mask) << 1); + else + gf_exp[i] = gf_exp[i - 1] << 1; + gf_log[gf_exp[i]] = i; + } + /* + * log(0) is not defined, so use a special value + */ + gf_log[0] = GF_SIZE; + /* set the extended gf_exp values for fast multiply */ + for (i = 0; i < GF_SIZE; i++) + gf_exp[i + GF_SIZE] = gf_exp[i]; + + inverse[0] = 0; /* 0 has no inverse. */ + inverse[1] = 1; + for (i = 2; i <= GF_SIZE; i++) + inverse[i] = gf_exp[GF_SIZE - gf_log[i]]; +} + +/* + * Compute dst[] = dst[] + c * src[] + * + * This is used often, so better optimize it! Currently the loop is unrolled 16 + * times. The case c=0 is also optimized, whereas c=1 is not. + */ +#define UNROLL 16 +static void addmul(unsigned char *dst1, const unsigned char const *src1, + unsigned char c, int sz) +{ + if (c == 0) + return; + unsigned char *dst = dst1, *lim = &dst[sz - UNROLL + 1], + *col = gf_mul_table[c]; + const unsigned char const *src = src1; + + for (; dst < lim; dst += UNROLL, src += UNROLL) { + dst[0] ^= col[src[0]]; + dst[1] ^= col[src[1]]; + dst[2] ^= col[src[2]]; + dst[3] ^= col[src[3]]; + dst[4] ^= col[src[4]]; + dst[5] ^= col[src[5]]; + dst[6] ^= col[src[6]]; + dst[7] ^= col[src[7]]; + dst[8] ^= col[src[8]]; + dst[9] ^= col[src[9]]; + dst[10] ^= col[src[10]]; + dst[11] ^= col[src[11]]; + dst[12] ^= col[src[12]]; + dst[13] ^= col[src[13]]; + dst[14] ^= col[src[14]]; + dst[15] ^= col[src[15]]; + } + lim += UNROLL - 1; + for (; dst < lim; dst++, src++) /* final components */ + *dst ^= col[*src]; +} + +/* + * Compute C = AB where A is n*k, B is k*m, C is n*m + */ +static void matmul(unsigned char *a, unsigned char *b, unsigned char *c, + int n, int k, int m) +{ + int row, col, i; + + for (row = 0; row < n; row++) { + for (col = 0; col < m; col++) { + unsigned char *pa = &a[row * k], *pb = &b[col], acc = 0; + for (i = 0; i < k; i++, pa++, pb += m) + acc ^= gf_mul(*pa, *pb); + c[row * m + col] = acc; + } + } +} + +#define FEC_SWAP(a,b) {typeof(a) tmp = a; a = b; b = tmp;} + +/* + * Compute the inverse of a matrix. + * + * k is the size of the matrix 'src' (Gauss-Jordan, adapted from Numerical + * Recipes in C). Returns -1 if 'src' is singular. + */ +static int invert_mat(unsigned char *src, int k) +{ + int irow, icol, row, col, ix, error; + int *indxc = para_malloc(k * sizeof(int)); + int *indxr = para_malloc(k * sizeof(int)); + int *ipiv = para_malloc(k * sizeof(int)); /* elements used as pivots */ + unsigned char c, *p, *id_row = alloc_matrix(1, k), + *temp_row = alloc_matrix(1, k); + + memset(id_row, 0, k); + memset(ipiv, 0, k * sizeof(int)); + + for (col = 0; col < k; col++) { + unsigned char *pivot_row; + /* + * Zeroing column 'col', look for a non-zero element. + * First try on the diagonal, if it fails, look elsewhere. + */ + irow = icol = -1; + if (ipiv[col] != 1 && src[col * k + col] != 0) { + irow = col; + icol = col; + goto found_piv; + } + for (row = 0; row < k; row++) { + if (ipiv[row] != 1) { + for (ix = 0; ix < k; ix++) { + if (ipiv[ix] == 0) { + if (src[row * k + ix] != 0) { + irow = row; + icol = ix; + goto found_piv; + } + } else if (ipiv[ix] > 1) { + error = -E_FEC_PIVOT; + goto fail; + } + } + } + } + error = -E_FEC_PIVOT; + if (icol == -1) + goto fail; +found_piv: + ++(ipiv[icol]); + /* + * swap rows irow and icol, so afterwards the diagonal element + * will be correct. Rarely done, not worth optimizing. + */ + if (irow != icol) + for (ix = 0; ix < k; ix++) + FEC_SWAP(src[irow * k + ix], src[icol * k + ix]); + indxr[col] = irow; + indxc[col] = icol; + pivot_row = &src[icol * k]; + error = -E_FEC_SINGULAR; + c = pivot_row[icol]; + if (c == 0) + goto fail; + if (c != 1) { /* otherwise this is a NOP */ + /* + * this is done often , but optimizing is not so + * fruitful, at least in the obvious ways (unrolling) + */ + c = inverse[c]; + pivot_row[icol] = 1; + for (ix = 0; ix < k; ix++) + pivot_row[ix] = gf_mul(c, pivot_row[ix]); + } + /* + * from all rows, remove multiples of the selected row to zero + * the relevant entry (in fact, the entry is not zero because + * we know it must be zero). (Here, if we know that the + * pivot_row is the identity, we can optimize the addmul). + */ + id_row[icol] = 1; + if (memcmp(pivot_row, id_row, k) != 0) { + for (p = src, ix = 0; ix < k; ix++, p += k) { + if (ix != icol) { + c = p[icol]; + p[icol] = 0; + addmul(p, pivot_row, c, k); + } + } + } + id_row[icol] = 0; + } + for (col = k - 1; col >= 0; col--) { + if (indxr[col] < 0 || indxr[col] >= k) + PARA_CRIT_LOG("AARGH, indxr[col] %d\n", indxr[col]); + else if (indxc[col] < 0 || indxc[col] >= k) + PARA_CRIT_LOG("AARGH, indxc[col] %d\n", indxc[col]); + else if (indxr[col] != indxc[col]) { + for (row = 0; row < k; row++) { + FEC_SWAP(src[row * k + indxr[col]], + src[row * k + indxc[col]]); + } + } + } + error = 0; +fail: + free(indxc); + free(indxr); + free(ipiv); + free(id_row); + free(temp_row); + return error; +} + +/* + * Invert a Vandermonde matrix. + * + * It assumes that the matrix is not singular and _IS_ a Vandermonde matrix. + * Only uses the second column of the matrix, containing the p_i's. + * + * Algorithm borrowed from "Numerical recipes in C" -- sec.2.8, but largely + * revised for GF purposes. + */ +static void invert_vdm(unsigned char *src, int k) +{ + int i, j, row, col; + unsigned char *b, *c, *p, t, xx; + + if (k == 1) /* degenerate */ + return; + /* + * c holds the coefficient of P(x) = Prod (x - p_i), i=0..k-1 + * b holds the coefficient for the matrix inversion + */ + c = para_malloc(k); + b = para_malloc(k); + p = para_malloc(k); + + for (j = 1, i = 0; i < k; i++, j += k) { + c[i] = 0; + p[i] = src[j]; + } + /* + * construct coeffs recursively. We know c[k] = 1 (implicit) and start + * P_0 = x - p_0, then at each stage multiply by x - p_i generating P_i + * = x P_{i-1} - p_i P_{i-1} After k steps we are done. + */ + c[k - 1] = p[0]; /* really -p(0), but x = -x in GF(2^m) */ + for (i = 1; i < k; i++) { + unsigned char p_i = p[i]; + for (j = k - 1 - (i - 1); j < k - 1; j++) + c[j] ^= gf_mul(p_i, c[j + 1]); + c[k - 1] ^= p_i; + } + + for (row = 0; row < k; row++) { + /* + * synthetic division etc. + */ + xx = p[row]; + t = 1; + b[k - 1] = 1; /* this is in fact c[k] */ + for (i = k - 2; i >= 0; i--) { + b[i] = c[i + 1] ^ gf_mul(xx, b[i + 1]); + t = gf_mul(xx, t) ^ b[i]; + } + for (col = 0; col < k; col++) + src[col * k + row] = gf_mul(inverse[t], b[col]); + } + free(c); + free(b); + free(p); +} + +static int fec_initialized; + +static void init_fec(void) +{ + generate_gf(); + init_mul_table(); + fec_initialized = 1; +} + +struct fec_parms { + int k, n; /* parameters of the code */ + unsigned char *enc_matrix; +}; + +/** + * Deallocate a fec params structure. + * + * \param p The structure to free. + */ +void fec_free(struct fec_parms *p) +{ + if (!p) + return; + free(p->enc_matrix); + free(p); +} + +/** + * Create a new encoder and return an opaque descriptor to it. + * + * \param k Number of input slices. + * \param n Number of output slices. + * \param result On success the Fec descriptor is returned here. + * + * \return Standard. + * + * This creates the k*n encoding matrix. It is computed starting with a + * Vandermonde matrix, and then transformed into a systematic matrix. + */ +int fec_new(int k, int n, struct fec_parms **result) +{ + int row, col; + unsigned char *p, *tmp_m; + struct fec_parms *parms; + + if (!fec_initialized) + init_fec(); + + if (k < 1 || k > GF_SIZE + 1 || n > GF_SIZE + 1 || k > n) + return -E_FEC_PARMS; + parms = para_malloc(sizeof(struct fec_parms)); + parms->k = k; + parms->n = n; + parms->enc_matrix = alloc_matrix(n, k); + tmp_m = alloc_matrix(n, k); + /* + * fill the matrix with powers of field elements, starting from 0. + * The first row is special, cannot be computed with exp. table. + */ + tmp_m[0] = 1; + for (col = 1; col < k; col++) + tmp_m[col] = 0; + for (p = tmp_m + k, row = 0; row < n - 1; row++, p += k) { + for (col = 0; col < k; col++) + p[col] = gf_exp[modnn(row * col)]; + } + + /* + * quick code to build systematic matrix: invert the top + * k*k vandermonde matrix, multiply right the bottom n-k rows + * by the inverse, and construct the identity matrix at the top. + */ + invert_vdm(tmp_m, k); /* much faster than invert_mat */ + matmul(tmp_m + k * k, tmp_m, parms->enc_matrix + k * k, n - k, k, k); + /* + * the upper matrix is I so do not bother with a slow multiply + */ + memset(parms->enc_matrix, 0, k * k); + for (p = parms->enc_matrix, col = 0; col < k; col++, p += k + 1) + *p = 1; + free(tmp_m); + *result = parms; + return 0; +} + +/** + * Compute one encoded slice of the given input. + * + * \param parms The fec parameters returned earlier by fec_new(). + * \param src The \a k data slices to encode. + * \param dst Result pointer. + * \param idx The index of the slice to compute. + * \param sz The size of the input data packets. + * + * Encode the \a k slices of size \a sz given by \a src and store the output + * slice number \a idx in \dst. + */ +void fec_encode(struct fec_parms *parms, const unsigned char * const *src, + unsigned char *dst, int idx, int sz) +{ + int i, k = parms->k; + unsigned char *p; + + assert(idx <= parms->n); + + if (idx < k) { + memcpy(dst, src[idx], sz); + return; + } + p = &(parms->enc_matrix[idx * k]); + memset(dst, 0, sz); + for (i = 0; i < k; i++) + addmul(dst, src[i], p[i], sz); +} + +/* Move src packets in their position. */ +static int shuffle(unsigned char **data, int *idx, int k) +{ + int i; + + for (i = 0; i < k;) { + if (idx[i] >= k || idx[i] == i) + i++; + else { /* put index and data at the right position */ + int c = idx[i]; + + if (idx[c] == c) /* conflict */ + return -E_FEC_BAD_IDX; + FEC_SWAP(idx[i], idx[c]); + FEC_SWAP(data[i], data[c]); + } + } + return 0; +} + +/* + * Construct the decoding matrix given the indices. The encoding matrix must + * already be allocated. + */ +static int build_decode_matrix(struct fec_parms *parms, int *idx, + unsigned char **result) +{ + int ret = -E_FEC_BAD_IDX, i, k = parms->k; + unsigned char *p, *matrix = alloc_matrix(k, k); + + for (i = 0, p = matrix; i < k; i++, p += k) { + if (idx[i] >= parms->n) /* invalid index */ + goto err; + if (idx[i] < k) { + memset(p, 0, k); + p[i] = 1; + } else + memcpy(p, &(parms->enc_matrix[idx[i] * k]), k); + } + ret = invert_mat(matrix, k); + if (ret < 0) + goto err; + *result = matrix; + return 0; +err: + free(matrix); + *result = NULL; + return ret; +} + +/** + * Decode one slice from the group of received slices. + * + * \param code Pointer to fec params structure. + * \param data Pointers to received packets. + * \param idx Pointer to packet indices (gets modified). + * \param sz Size of each packet. + * + * \return Zero on success, -1 on errors. + * + * The \a data vector of received slices and the indices of slices are used to + * produce the correct output slice. The data slices are modified in-place. + */ +int fec_decode(struct fec_parms *parms, unsigned char **data, int *idx, + int sz) +{ + unsigned char *m_dec, **slice; + int ret, row, col, k = parms->k; + + ret = shuffle(data, idx, k); + if (ret < 0) + return ret; + ret = build_decode_matrix(parms, idx, &m_dec); + if (ret < 0) + return ret; + /* do the actual decoding */ + slice = para_malloc(k * sizeof(unsigned char *)); + for (row = 0; row < k; row++) { + if (idx[row] >= k) { + slice[row] = para_calloc(sz); + for (col = 0; col < k; col++) + addmul(slice[row], data[col], + m_dec[row * k + col], sz); + } + } + /* move slices to their final destination */ + for (row = 0; row < k; row++) { + if (idx[row] >= k) { + memcpy(data[row], slice[row], sz); + free(slice[row]); + } + } + free(slice); + free(m_dec); + return 0; +} diff --git a/fec.h b/fec.h new file mode 100644 index 00000000..241cf9a4 --- /dev/null +++ b/fec.h @@ -0,0 +1,47 @@ +/* + * fec.c -- forward error correction based on Vandermonde matrices + * 980614 + * (C) 1997-98 Luigi Rizzo (luigi@iet.unipi.it) + * + * Portions derived from code by Phil Karn (karn@ka9q.ampr.org), + * Robert Morelos-Zaragoza (robert@spectra.eng.hawaii.edu) and Hari + * Thirumoorthy (harit@spectra.eng.hawaii.edu), Aug 1995 + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, + * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A + * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS + * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, + * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR + * TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT + * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY + * OF SUCH DAMAGE. + */ + +#define FEC_MAGIC 0xFECC0DEC +#define FEC_HEADER_SIZE 32 + +struct fec_parms; + +int fec_new(int k, int n, struct fec_parms **parms); +void fec_free(struct fec_parms *p); +void fec_encode(struct fec_parms *parms, const unsigned char * const *src, + unsigned char *dst, int idx, int sz); +int fec_decode(struct fec_parms *parms, unsigned char **data, int *idx, + int sz); + + diff --git a/fecdec_filter.c b/fecdec_filter.c new file mode 100644 index 00000000..721fc094 --- /dev/null +++ b/fecdec_filter.c @@ -0,0 +1,330 @@ +/* + * Copyright (C) 2009 Andre Noll + * + * Licensed under the GPL v2. For licencing details see COPYING. + */ + +/** \file fecdev_filter.c A filter fec-decodes an audio stream. */ + +#include +#include "para.h" +#include "error.h" +#include "list.h" +#include "sched.h" +#include "ggo.h" +#include "filter.h" +#include "string.h" +#include "portable_io.h" +#include "fec.h" +#include "fd.h" + +#define NUM_FEC_GROUPS 3 +#define INPUT_BUFFER_SIZE 16384 + +/** size of the output buffer */ +#define FECDEC_OUTBUF_SIZE 81920 + +struct fec_header { + uint8_t slices_per_group; + uint8_t data_slices_per_group; + uint32_t audio_header_size; + + uint32_t group_num; + uint32_t group_bytes; + + uint8_t slice_num; + uint16_t slice_bytes; +}; + +struct fec_group { + struct fec_header h; + int num_received_slices; + int num_slices; + int *idx; + unsigned char **data; +}; + +struct private_fecdec_data { + struct fec_parms *fec; + struct fec_group groups[NUM_FEC_GROUPS]; +}; + +#define FOR_EACH_FEC_GROUP(g, d) for (g = (d)->groups; \ + (g) - (d)->groups < NUM_FEC_GROUPS; (g)++) + +#define UNUSED_GROUP_NUM 0xffffffff + +static int group_complete(struct fec_group *fg) +{ + if (fg->h.group_num == UNUSED_GROUP_NUM) + return 0; + //PARA_INFO_LOG("received slices: %u, slices per group: %u\n", fg->num_received_slices, fg->h.data_slices_per_group); + return fg->num_received_slices >= fg->h.data_slices_per_group; +} + +static int group_empty(struct fec_group *fg) +{ + return fg->num_received_slices == 0; +} + +static void clear_group(struct fec_group *fg) +{ + int i; + + if (!group_complete(fg) && !group_empty(fg)) + PARA_WARNING_LOG("Clearing incomplete group %d " + "(contains %d slices)\n", fg->h.group_num, + fg->num_received_slices); + for (i = 0; i < fg->num_slices; i++) { + free(fg->data[i]); + fg->data[i] = NULL; + fg->idx[i] = -1; + } + free(fg->data); + free(fg->idx); + fg->num_slices = 0; + memset(&fg->h, 0, sizeof(struct fec_header)); + fg->num_received_slices = 0; + fg->h.group_num = UNUSED_GROUP_NUM; +} + +static int find_group(struct fec_header *h, + struct private_fecdec_data *pfd, struct fec_group **result) +{ + struct fec_group *fg; + + FOR_EACH_FEC_GROUP(fg, pfd) { + if (fg->h.group_num != h->group_num) + continue; + *result = fg; + return 1; + } + return 0; +} + +static struct fec_group *find_unused_group(struct private_fecdec_data *pfd) +{ + struct fec_group *fg; + + FOR_EACH_FEC_GROUP(fg, pfd) { + if (fg->num_received_slices == 0) + return fg; + } + return NULL; +} + +static struct fec_group *try_to_free_group(struct private_fecdec_data *pfd) +{ + struct fec_group *fg; + + FOR_EACH_FEC_GROUP(fg, pfd) { + if (!group_complete(fg)) + continue; + clear_group(fg); + return fg; + } + return NULL; +} + +static struct fec_group *free_oldest_group(struct private_fecdec_data *pfd) +{ + struct fec_group *fg, *oldest = NULL; + + FOR_EACH_FEC_GROUP(fg, pfd) { + if (!oldest || oldest->h.group_num > fg->h.group_num) + oldest = fg; + } + clear_group(oldest); + return oldest; +} + +static int get_group(struct fec_header *h, struct private_fecdec_data *pfd, + struct fec_group **result) +{ + struct fec_group *fg; + int ret = find_group(h, pfd, &fg); + + if (ret < 0) + return ret; + if (ret > 0) /* found group */ + goto success; + /* group not found */ + fg = find_unused_group(pfd); + if (fg) + goto update_header; + fg = try_to_free_group(pfd); + if (fg) + goto update_header; + fg = free_oldest_group(pfd); +update_header: + fg->h = *h; +success: + *result = fg; + return 1; +} + +static int add_slice(char *buf, struct fec_group *fg) +{ + int r, slice_num; + + if (group_complete(fg)) + return 0; + slice_num = fg->h.slice_num; + if (fg->num_slices == 0) { + fg->num_slices = fg->h.slices_per_group; + fg->idx = malloc(fg->num_slices * sizeof(int)); + fg->data = malloc(fg->num_slices * sizeof(unsigned char *)); + memset(fg->data, 0, fg->num_slices * sizeof(unsigned char *)); + } + r = fg->num_received_slices; + fg->idx[r] = slice_num; + fg->data[r] = malloc(fg->h.slice_bytes); + memcpy(fg->data[r], buf, fg->h.slice_bytes); + fg->num_received_slices++; + return 1; +} + +static int decode_group(struct fec_group *fg, struct filter_node *fn) +{ + int i, ret, sb = fg->h.slice_bytes; + size_t written = 0; + struct private_fecdec_data *pfd = fn->private_data; + + ret = fec_decode(pfd->fec, fg->data, fg->idx, sb); + if (ret < 0) + return ret; + PARA_DEBUG_LOG("writing group %d (%d/%d decoded data bytes)\n", + fg->h.group_num, fg->h.group_bytes, + fg->h.data_slices_per_group * sb); + for (i = 0; i < fg->h.data_slices_per_group; i++) { + size_t n = sb; + if (n + written > fg->h.group_bytes) + n = fg->h.group_bytes - written; + if (fn->loaded + n > fn->bufsize) + return -E_FECDEC_OVERRUN; + memcpy(fn->buf + fn->loaded, fg->data[i], n); + fn->loaded += n; + written += n; + } + return 0; +} + +/** + * Read a fec header from a buffer. + * + * \param buf The buffer to write to. + * \param h The fec header to write. + */ +static int read_fec_header(char *buf, size_t len, struct fec_header *h) +{ + uint32_t magic; + + if (len < FEC_HEADER_SIZE) + return 0; + magic = read_u32(buf); + if (magic != FEC_MAGIC) + return -E_BAD_FEC_HEADER; + h->slices_per_group = read_u8(buf + 4); + h->data_slices_per_group = read_u8(buf + 5); + h->audio_header_size = read_u32(buf + 6); + + h->group_num = read_u32(buf + 10); + h->group_bytes = read_u32(buf + 14); + + h->slice_num = read_u8(buf + 18); + h->slice_bytes = read_u16(buf + 20); + if (!h->group_bytes && & h->slice_bytes) + return -E_FECDEC_EOF; +// PARA_DEBUG_LOG("group %u, slize %u, slices per group: %u\n", +// h->group_num, h->slice_num, h->slices_per_group); + return 1; +} + +static int dispatch_slice(char *buf, size_t len, struct fec_header *h, + struct filter_node *fn) +{ + struct fec_group *fg; + int ret; + struct private_fecdec_data *pfd = fn->private_data; + + if (h->slice_bytes > len) /* can not use the thing, try to read more */ + return 0; + ret = get_group(h, pfd, &fg); + if (ret < 0) + return ret; + if (group_complete(fg)) { + PARA_DEBUG_LOG("group complete, ignoring slice %d\n", + h->slice_num); + return 1; + } + fg->h = *h; + ret = add_slice(buf, fg); + if (ret < 0) + return ret; + if (group_complete(fg)) { + if (!pfd->fec) { + int k = h->data_slices_per_group, n = h->slices_per_group; + PARA_NOTICE_LOG("init fec (%d, %d)\n", k, n); + ret = fec_new(k, n, &pfd->fec); + if (ret < 0) + return ret; + } + ret = decode_group(fg, fn); + if (ret < 0) + return ret; + } + return 1; +} + +static int fecdec(char *buf, size_t len, struct filter_node *fn) +{ + int ret; + struct fec_header h; + + ret = read_fec_header(buf, len, &h); + if (ret <= 0) + return ret; + if (h.slice_bytes > INPUT_BUFFER_SIZE) + return -E_BAD_SLICE_SIZE; + if (h.slice_num > h.slices_per_group) + return -E_BAD_SLICE_NUM; + ret = dispatch_slice(buf + FEC_HEADER_SIZE, len - FEC_HEADER_SIZE, + &h, fn); + //PARA_INFO_LOG("ret: %d, len: %d, slice_bytes: %d\n", ret, len, h.slice_bytes); + if (ret <= 0) + return ret; + return FEC_HEADER_SIZE + h.slice_bytes; +} + +static void fecdec_close(struct filter_node *fn) +{ + struct private_fecdec_data *pfd = fn->private_data; + struct fec_group *fg; + + FOR_EACH_FEC_GROUP(fg, pfd) + clear_group(fg); + free(fn->buf); + fn->buf = NULL; + free(fn->private_data); + fn->private_data = NULL; +} + +static void fecdec_open(struct filter_node *fn) +{ + fn->bufsize = FECDEC_OUTBUF_SIZE; + fn->buf = para_malloc(fn->bufsize); + fn->private_data = para_calloc(sizeof(struct private_fecdec_data)); + fn->loaded = 0; +} + +/** + * The init function of the fecdec filter. + * + * \param f struct to initialize. + */ +void fecdec_filter_init(struct filter *f) +{ + f->convert = fecdec; + f->close = fecdec_close; + f->open = fecdec_open; +} diff --git a/udp_header.h b/udp_header.h deleted file mode 100644 index 7e94b584..00000000 --- a/udp_header.h +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Copyright (C) 2006-2009 Andre Noll - * - * Licensed under the GPL v2. For licencing details see COPYING. - */ - -/** \file udp_header.h some macros used by udp_send.c and udp_recv.c. */ -#include - -/** - * Number of bytes of the paraslash udp header. - * - * The udp sender prepends a header at the beginning of each data chunk. Within - * this header, the type of the current audio stream and the * type of this - * data chunk is coded. - */ -#define UDP_AUDIO_HEADER_LEN 16 - -/** The possible stream types. */ -enum udp_stream_type { - /** Used for mp3 and aac streams. */ - UDP_PLAIN_STREAM, - /** Ogg vorbis streams. */ - UDP_HEADER_STREAM, - /** stream type not yet known. */ - UDP_UNKNOWN_STREAM -}; - -/** The possible packet types. */ -enum udp_audio_packet_type { - /** Beginning of file. */ - UDP_BOF_PACKET, - /** End of file. */ - UDP_EOF_PACKET, - /** Combined header/data packet (ogg only). */ - UDP_HEADER_PACKET, - /** Packet contains only audio file data. */ - UDP_DATA_PACKET, - /** Invalid packet type. */ - UDP_UNKNOWN_PACKET -}; - -/** The contents of an udp audio header. */ -struct udp_audio_header { - /** see \ref udp_stream_type. */ - uint8_t stream_type; - /** see \ref udp_audio_packet_type. */ - uint8_t packet_type; - /** Non-zero only for header packets. */ - uint16_t header_len; - /** Length of header plus audio file data. */ - uint16_t payload_len; -}; - -/** - * Write a struct udp_audio_header to a buffer. - * - * \param buf The buffer to write to. - * \param h The audio header to write. - * - * Used by the udp sender. - * - */ -_static_inline_ void write_udp_audio_header(char *buf, struct udp_audio_header *h) -{ - memcpy(buf, "UDPM", 4); - write_u8(buf + 4, h->stream_type); - write_u8(buf + 5, h->packet_type); - write_u16(buf + 6, h->header_len); - write_u16(buf + 8, h->payload_len); - memset(buf + 10, 0, 6); -} - -/** - * Used by the udp receiver to read a struct udp_audio_header from a buffer. - * - * \param buf The buffer to read from. - * \param len The length of \a buf. - * \param h Result pointer. - * - * \return 1 if \a buf contains a valid udp audio header, -1 else. - */ -_static_inline_ int read_udp_audio_header(char *buf, size_t len, - struct udp_audio_header *h) -{ - if (len < 4) - goto err; - if (memcmp(buf, "UDPM", 4)) - goto err; - h->stream_type = read_u8(buf + 4); - h->packet_type = read_u8(buf + 5); - h->header_len = read_u16(buf + 6); - h->payload_len = read_u16(buf + 8); - return 1; -err: - h->stream_type = UDP_UNKNOWN_STREAM; - h->packet_type = UDP_UNKNOWN_PACKET; - h->header_len = h->payload_len = 0; - return -1; -} diff --git a/udp_recv.c b/udp_recv.c index 9ea35d8d..759caa3d 100644 --- a/udp_recv.c +++ b/udp_recv.c @@ -6,11 +6,11 @@ /** \file udp_recv.c Paraslash's udp receiver */ #include +#include #include "para.h" #include "error.h" #include "portable_io.h" -#include "udp_header.h" #include "list.h" #include "sched.h" #include "ggo.h" @@ -23,30 +23,14 @@ /** The size of the receiver node buffer. */ #define UDP_RECV_CHUNK_SIZE (128 * 1024) - /** * Data specific to the udp receiver. * * \sa \ref receiver, \ref receiver_node. */ struct private_udp_recv_data { - /** - * Whether a header was received. - * - * A flag indicating whether this receiver already received a packet - * which contains the audio file header. - * - * This flag has no effect if the audio stream indicates that no extra - * headers will be sent (mp3, aac). Otherwise, all data packets are - * dropped until the header is received. - */ - int have_header; /** The socket file descriptor. */ int fd; - /** Non-zero on short reads. */ - uint16_t need_more; - /** Copied from the first audio header received. */ - uint16_t stream_type; }; static void udp_recv_pre_select(struct sched *s, struct task *t) @@ -62,53 +46,6 @@ static int enough_space(size_t nbytes, size_t loaded) return nbytes + loaded < UDP_RECV_CHUNK_SIZE; } -/* - * Perform some sanity checks on an udp audio file header. - * - * return: negative on error, 0: discard data, 1: use data - */ -static int examine_audio_header(struct private_udp_recv_data *purd, - struct udp_audio_header *uah, size_t packet_size) -{ - /* payload_len includes header */ - if (uah->payload_len < uah->header_len) - return -E_UDP_BAD_HEADER; - switch (uah->packet_type) { - case UDP_EOF_PACKET: - return -E_RECV_EOF; - case UDP_BOF_PACKET: - purd->have_header = 1; - /* fall through */ - case UDP_DATA_PACKET: - if (uah->header_len) /* header in no-header packet */ - return -E_UDP_BAD_HEADER; - break; - case UDP_HEADER_PACKET: - if (!uah->header_len) /** no header in header packet */ - return -E_UDP_BAD_HEADER; - break; - default: /* bad packet type */ - return -E_UDP_BAD_HEADER; - } - /* check stream type */ - if (uah->stream_type != UDP_PLAIN_STREAM && - uah->stream_type != UDP_HEADER_STREAM) - return -E_UDP_BAD_STREAM_TYPE; - if (purd->stream_type == UDP_UNKNOWN_STREAM) - purd->stream_type = uah->stream_type; - /* stream type must not change */ - if (uah->stream_type != purd->stream_type) - return -E_UDP_BAD_STREAM_TYPE; - if (!purd->have_header && uah->stream_type == UDP_HEADER_STREAM) - /* can't use the data, wait for header packet */ - return 0; - if (packet_size < uah->payload_len + UDP_AUDIO_HEADER_LEN) - /* we read only a part of the package */ - purd->need_more = uah->payload_len - + UDP_AUDIO_HEADER_LEN - packet_size; - return 1; -} - static int add_rn_output(struct receiver_node *rn, char *buf, size_t len) { if (!len) @@ -126,10 +63,7 @@ static void udp_recv_post_select(__a_unused struct sched *s, struct task *t) struct private_udp_recv_data *purd = rn->private_data; int ret; char tmpbuf[UDP_RECV_CHUNK_SIZE]; - uint16_t data_len; - char *data_buf; size_t packet_size; - struct udp_audio_header uah; if (rn->output_error && *rn->output_error < 0) { t->error = *rn->output_error; @@ -148,43 +82,9 @@ static void udp_recv_post_select(__a_unused struct sched *s, struct task *t) if (!ret) return; packet_size = ret; - for (;;) { - uint16_t num; - - if (!purd->need_more) { - ret = read_udp_audio_header(tmpbuf, packet_size, &uah); - if (ret >= 0) - break; - goto success; /* drop data */ - } - num = PARA_MIN(purd->need_more, (uint16_t)packet_size); - assert(num > 0); - t->error = add_rn_output(rn, tmpbuf, num); - if (t->error < 0) - return; - purd->need_more -= num; - if (packet_size <= num) - goto success; - packet_size -= num; - memmove(tmpbuf, tmpbuf + num, packet_size); - } - assert(!purd->need_more); - t->error = examine_audio_header(purd, &uah, packet_size); - if (t->error <= 0) + t->error = add_rn_output(rn, tmpbuf, packet_size); + if (t->error < 0) return; - data_len = uah.payload_len; - data_buf = tmpbuf + UDP_AUDIO_HEADER_LEN; - if (uah.packet_type == UDP_HEADER_PACKET) { - if (purd->have_header) { /* skip header */ - data_buf += uah.header_len; - data_len -= uah.header_len; - } else { /* only use the header */ - purd->have_header = 1; - data_len = uah.header_len; - } - } - t->error = add_rn_output(rn, data_buf, data_len); - return; success: t->error = 1; } @@ -303,7 +203,6 @@ static int udp_recv_open(struct receiver_node *rn) ret = mark_fd_nonblocking(purd->fd); if (ret < 0) goto err; - purd->stream_type = UDP_UNKNOWN_STREAM; PARA_NOTICE_LOG("receiving from %s:%d, fd=%d\n", c->host_arg, c->port_arg, purd->fd); return purd->fd; diff --git a/udp_send.c b/udp_send.c index ca4d8cf2..c17fc313 100644 --- a/udp_send.c +++ b/udp_send.c @@ -9,6 +9,7 @@ #include #include +#include #include "server.cmdline.h" #include "para.h" @@ -21,7 +22,6 @@ #include "list.h" #include "send.h" #include "portable_io.h" -#include "udp_header.h" #include "net.h" #include "fd.h" #include "sched.h" @@ -40,6 +40,8 @@ struct udp_target { int fd; /** The list of queued chunks for this fd. */ struct chunk_queue *cq; + struct fec_client *fc; + struct fec_client_parms fcp; }; static struct list_head targets; @@ -61,6 +63,7 @@ static void udp_delete_target(struct udp_target *ut, const char *msg) PARA_NOTICE_LOG("deleting %s#%d (%s) from list\n", ut->host, ut->port, msg); udp_close_target(ut); + vss_del_fec_client(ut->fc); list_del(&ut->node); free(ut); } @@ -177,117 +180,22 @@ static int udp_init_session(struct udp_target *ut) return 1; } -static void udp_send_buf(char *buf, size_t len) -{ - struct udp_target *ut, *tmp; - int ret; - - list_for_each_entry_safe(ut, tmp, &targets, node) { - ret = udp_init_session(ut); - if (ret < 0) { - udp_delete_target(ut, para_strerror(-ret)); - continue; - } - ret = send_queued_chunks(ut->fd, ut->cq, 0); - if (ret < 0) { - udp_delete_target(ut, para_strerror(-ret)); - continue; - } - if (!len) - continue; - if (!ret) { /* still data left in the queue */ - ret = cq_enqueue(ut->cq, buf, len); - if (ret < 0) { - udp_delete_target(ut, para_strerror(-ret)); - continue; - } - } - ret = write_nonblock(ut->fd, buf, len, 0); - if (ret < 0) { - udp_delete_target(ut, para_strerror(-ret)); - continue; - } - if (ret != len) { - ret = cq_enqueue(ut->cq, buf + ret, len - ret); - if (ret < 0) { - udp_delete_target(ut, para_strerror(-ret)); - continue; - } - } - } -} - static void udp_shutdown_targets(void) { - char buf[UDP_AUDIO_HEADER_LEN]; struct udp_target *ut, *tmp; - struct udp_audio_header uah = { - .stream_type = UDP_UNKNOWN_STREAM, - .packet_type = UDP_EOF_PACKET, - }; + const char *buf = NULL; + size_t len = 0; /* STFU, gcc */ - write_udp_audio_header(buf, &uah); list_for_each_entry_safe(ut, tmp, &targets, node) { if (ut->fd < 0) continue; - write(ut->fd, buf, UDP_AUDIO_HEADER_LEN); + if (!buf) + len = vss_get_fec_eof_packet(&buf); + write(ut->fd, buf, len); udp_close_target(ut); } } -static int need_extra_header(long unsigned current_chunk) -{ - static struct timeval last_header; - struct timeval diff; - - if (!current_chunk) - return 0; - tv_diff(now, &last_header, &diff); - if (tv2ms(&diff) < conf.udp_header_interval_arg) - return 0; - last_header = *now; - return 1; -} - -static void udp_send(long unsigned current_chunk, __a_unused long unsigned chunks_sent, - const char *buf, size_t len, const char *header_buf, - size_t header_len) -{ - char *sendbuf; - size_t sendbuf_len; - struct timeval *chunk_tv; - struct udp_audio_header uah; - -// PARA_NOTICE_LOG("len: %zd, header_len: %zd\n", len, header_len); - if (sender_status != SENDER_ON) - return; - - /* we might not yet know the chunk time */ - chunk_tv = vss_chunk_time(); - if (!chunk_tv) - return; - if (list_empty(&targets)) - return; - uah.stream_type = header_len? UDP_HEADER_STREAM : UDP_PLAIN_STREAM; - uah.header_len = need_extra_header(current_chunk)? header_len : 0; - if (!current_chunk) - uah.packet_type = UDP_BOF_PACKET; - else if (uah.header_len) - uah.packet_type = UDP_HEADER_PACKET; - else - uah.packet_type = UDP_DATA_PACKET; - uah.payload_len = uah.header_len + len; - sendbuf_len = UDP_AUDIO_HEADER_LEN + uah.payload_len; - sendbuf = para_malloc(sendbuf_len); - write_udp_audio_header(sendbuf, &uah); - if (uah.header_len) - memcpy(sendbuf + UDP_AUDIO_HEADER_LEN, header_buf, - uah.header_len); - memcpy(sendbuf + UDP_AUDIO_HEADER_LEN + uah.header_len, buf, len); - udp_send_buf(sendbuf, sendbuf_len); - free(sendbuf); -} - static int udp_com_on(__a_unused struct sender_command_data *scd) { sender_status = SENDER_ON; @@ -316,6 +224,37 @@ static int udp_com_delete(struct sender_command_data *scd) return 1; } +static int udp_send_fec(char *buf, size_t len, void *private_data) +{ + struct udp_target *ut = private_data; + int ret = udp_init_session(ut); + + if (ret < 0) + goto fail; + ret = send_queued_chunks(ut->fd, ut->cq, 0); + if (ret < 0) + goto fail; + if (!len) + return 0; + if (!ret) { /* still data left in the queue */ + ret = cq_enqueue(ut->cq, buf, len); + if (ret < 0) + goto fail; + } + ret = write_nonblock(ut->fd, buf, len, 0); + if (ret < 0) + goto fail; + if (ret != len) { + ret = cq_enqueue(ut->cq, buf + ret, len - ret); + if (ret < 0) + goto fail; + } + return 1; +fail: + udp_delete_target(ut, para_strerror(-ret)); + return ret; +} + static void udp_add_target(const char *host, int port) { struct udp_target *ut = para_calloc(sizeof(struct udp_target)); @@ -326,6 +265,12 @@ static void udp_add_target(const char *host, int port) PARA_INFO_LOG("adding to target list (%s#%d)\n", ut->host, ut->port); para_list_add(&ut->node, &targets); + ut->fcp.slices_per_group = 16; + ut->fcp.data_slices_per_group = 14; + ut->fcp.max_slice_bytes = 1400; + ut->fcp.send = udp_send_fec; + ut->fcp.private_data = ut; + vss_add_fec_client(&ut->fcp, &ut->fc); } static int udp_com_add(struct sender_command_data *scd) @@ -401,7 +346,7 @@ void udp_send_init(struct sender *s) INIT_LIST_HEAD(&targets); s->info = udp_info; s->help = udp_help; - s->send = udp_send; + s->send = NULL; s->pre_select = NULL; s->post_select = NULL; s->shutdown_clients = udp_shutdown_targets; diff --git a/vss.c b/vss.c index d2053493..0801eeb0 100644 --- a/vss.c +++ b/vss.c @@ -15,14 +15,16 @@ #include "para.h" #include "error.h" +#include "portable_io.h" +#include "fec.h" #include "string.h" #include "afh.h" #include "afs.h" #include "server.h" #include "net.h" #include "server.cmdline.h" -#include "vss.h" #include "list.h" +#include "vss.h" #include "send.h" #include "ipc.h" #include "fd.h" @@ -87,6 +89,268 @@ struct vss_task { size_t header_len; }; +static struct list_head fec_client_list; + +struct fec_slice { + uint8_t num; + uint16_t bytes; +}; + +struct fec_group { + uint32_t num; + uint32_t bytes; + uint32_t first_chunk; + uint32_t num_chunks; + struct timeval duration; + struct timeval start; + struct timeval slice_duration; +}; + +struct fec_client { + struct fec_client_parms *fcp; + struct fec_parms *parms; + struct list_head node; + struct timeval stream_start; + int first_stream_chunk; + struct fec_group group; + struct fec_slice slice; + const unsigned char **src_data; + unsigned char *extra_src_buf; + size_t extra_src_buf_size; + unsigned char *enc_buf; + size_t enc_buf_size; +}; + +/** + * Get the chunk time of the current audio file. + * + * \return A pointer to a struct containing the chunk time, or NULL, + * if currently no audio file is selected. + */ +struct timeval *vss_chunk_time(void) +{ + if (mmd->afd.afhi.chunk_tv.tv_sec == 0 && + mmd->afd.afhi.chunk_tv.tv_usec == 0) + return NULL; + return &mmd->afd.afhi.chunk_tv; +} + +static void setup_fec_group(struct fec_client *fc, struct vss_task *vsst) +{ + uint32_t num_bytes = 0, chunk_num, max_group_size, last_payload_size; + int i, k = fc->fcp->data_slices_per_group; + const unsigned char *start_buf = NULL; + struct timeval tmp, *chunk_tv = vss_chunk_time(); + + assert(chunk_tv); + max_group_size = (fc->fcp->max_slice_bytes - FEC_HEADER_SIZE) * k; + chunk_num = fc->group.first_chunk; + for (;;) { + const unsigned char *buf; + size_t len; + + if (chunk_num >= mmd->afd.afhi.chunks_total) + break; + afh_get_chunk(chunk_num, &mmd->afd.afhi, vsst->map, (const char **)&buf, &len); + if (!start_buf) + start_buf = buf; + if (num_bytes + len > max_group_size) + break; + chunk_num++; + num_bytes += len; + } + assert(start_buf); + fc->group.num_chunks = chunk_num - fc->group.first_chunk; + fc->group.num++; + fc->group.bytes = num_bytes; + fc->slice.num = 0; + fc->slice.bytes = ROUND_UP(num_bytes, k) / k; + + /* The last slice will not be fully used */ + last_payload_size = num_bytes % fc->slice.bytes; + if (!last_payload_size) + last_payload_size = fc->slice.bytes; + + tv_scale(fc->group.num_chunks, chunk_tv, &fc->group.duration); + tv_scale(fc->group.first_chunk - fc->first_stream_chunk, chunk_tv, + &tmp); + tv_add(&fc->stream_start, &tmp, &fc->group.start); + tv_divide(fc->fcp->slices_per_group, &fc->group.duration, + &fc->group.slice_duration); + + for (i = 0; i < k; i++) + fc->src_data[i] = start_buf + i * fc->slice.bytes; + + if ((char *)start_buf + k * fc->slice.bytes > vsst->map + mmd->size) { + /* can not use last slice as it goes beyond the map */ + if (fc->extra_src_buf_size < fc->slice.bytes) + fc->extra_src_buf = para_realloc(fc->extra_src_buf, fc->slice.bytes); + memcpy(fc->extra_src_buf, start_buf + (k - 1) * fc->slice.bytes, + last_payload_size); + memset(fc->extra_src_buf + last_payload_size, 0, + fc->slice.bytes - last_payload_size); + fc->src_data[k - 1] = fc->extra_src_buf; + } + +} + +/** + * Write a fec header to a buffer. + * + * \param buf The buffer to write to. + * \param h The fec header to write. + */ +static void write_fec_header(struct fec_client *fc) +{ + char *buf = (char *)fc->enc_buf; + + write_u32(buf, FEC_MAGIC); + + write_u8(buf + 4, fc->fcp->slices_per_group); + write_u8(buf + 5, fc->fcp->data_slices_per_group); + write_u32(buf + 6, (uint32_t)0); /* audio header len */ + + write_u32(buf + 10, fc->group.num); + write_u32(buf + 14, fc->group.bytes); + + write_u8(buf + 18, fc->slice.num); + write_u16(buf + 20, fc->slice.bytes); + memset(buf + 22, 0, 11); +} + +/** + * Return a buffer that marks the end of the stream. + * + * \return The length of the eof buffer. + * + * This is used for (multicast) udp streaming where closing the socket on the + * sender might not give rise to an eof condition at the peer. + */ +size_t vss_get_fec_eof_packet(const char **buf) +{ + static const char fec_eof_packet[FEC_HEADER_SIZE] = + "\xec\x0d\xcc\xfe\0\0\0\0" + "\0\0\0\0\0\0\0\0" + "\0\0\0\0\0\0\0\0" + "\0\0\0\0\0\0\0\0"; + *buf = fec_eof_packet; + return FEC_HEADER_SIZE; +} + +static void compute_next_fec_slice(struct fec_client *fc, struct vss_task *vsst) +{ + if (fc->first_stream_chunk < 0) { + fc->stream_start = *now; + fc->first_stream_chunk = mmd->current_chunk; + fc->group.first_chunk = mmd->current_chunk; + fc->group.num = 0; + setup_fec_group(fc, vsst); + } else if (fc->slice.num == fc->fcp->slices_per_group) { + fc->group.first_chunk += fc->group.num_chunks; + setup_fec_group(fc, vsst); + + } + if (fc->enc_buf_size < fc->slice.bytes + FEC_HEADER_SIZE) { + fc->enc_buf_size = fc->slice.bytes + FEC_HEADER_SIZE; + fc->enc_buf = para_realloc(fc->enc_buf, fc->enc_buf_size); + } + write_fec_header(fc); + fec_encode(fc->parms, fc->src_data, fc->enc_buf + FEC_HEADER_SIZE, + fc->slice.num, fc->slice.bytes); +} + +/** + * Add one entry to the list of active fec clients. + * + * \param fcp Describes the fec parameters to be used for this client. + * \param result An opaque pointer that must be used by remove the client later. + * + * \return Standard. + */ +int vss_add_fec_client(struct fec_client_parms *fcp, struct fec_client **result) +{ + int ret; + struct fec_client *fc; + + if (fcp->max_slice_bytes < FEC_HEADER_SIZE + fcp->data_slices_per_group) + return -ERRNO_TO_PARA_ERROR(EINVAL); + fc = para_calloc(sizeof(*fc)); + fc->fcp = fcp; + ret = fec_new(fcp->data_slices_per_group, fcp->slices_per_group, + &fc->parms); + if (ret < 0) + goto err; + fc->first_stream_chunk = -1; /* stream not yet started */ + fc->src_data = para_malloc(fc->fcp->slices_per_group * sizeof(char *)); + para_list_add(&fc->node, &fec_client_list); + *result = fc; + return 1; +err: + fec_free(fc->parms); + free(fc); + *result = NULL; + return ret; +} + +/** + * Remove one entry from the list of active fec clients. + * + * \param result The client to be removed. + */ +void vss_del_fec_client(struct fec_client *fc) +{ + list_del(&fc->node); + free(fc->src_data); + free(fc->enc_buf); + free(fc->extra_src_buf); + fec_free(fc->parms); + free(fc); +} + +/* + * Compute if/when next slice is due. If it isn't due yet and \a diff is + * not \p Null, compute the time difference next - now, where + * + * next = stream_start + (first_group_chunk - first_stream_chunk) + * * chunk_time + slice_num * slice_time + */ +static int next_slice_is_due(struct fec_client *fc, struct timeval *diff) +{ + struct timeval tmp, next; + int ret; + + if (fc->first_stream_chunk < 0) + return 1; + tv_scale(fc->slice.num, &fc->group.slice_duration, &tmp); + tv_add(&tmp, &fc->group.start, &next); + ret = tv_diff(&next, now, diff); + return ret < 0? 1 : 0; +} + +static void compute_slice_timeout(struct timeval *timeout) +{ + struct fec_client *fc; + + assert(vss_playing()); + list_for_each_entry(fc, &fec_client_list, node) { + struct timeval diff; + int ret = next_slice_is_due(fc, &diff); + + // PARA_NOTICE_LOG("diff: %lu, ret: %d\n", tv2ms(&diff), ret); + if (ret) { + timeout->tv_sec = 0; + timeout->tv_usec = 0; + goto out; + } + /* timeout = min(timeout, diff) */ + if (tv_diff(&diff, timeout, NULL) < 0) + *timeout = diff; + } +out: + return; + PARA_NOTICE_LOG("slice timeout: %lu:%lu\n", (long unsigned)timeout->tv_sec, (long unsigned)timeout->tv_usec); +} + /** * Check if vss status flag \a P (playing) is set. * @@ -184,16 +448,20 @@ static struct timeval *vss_compute_timeout(struct vss_task *vsst) return NULL; compute_chunk_time(mmd->chunks_sent, &mmd->afd.afhi.chunk_tv, &mmd->stream_start, &next_chunk); - if (chk_barrier("chunk", &next_chunk, &the_timeout, 0) < 0) + if (chk_barrier("chunk", &next_chunk, &the_timeout, 0) >= 0) { + /* chunk is due or bof */ + the_timeout.tv_sec = 0; + the_timeout.tv_usec = 0; return &the_timeout; - /* chunk is due or bof */ - the_timeout.tv_sec = 0; - the_timeout.tv_usec = 0; + } + /* compute min of current timeout and next slice time */ + compute_slice_timeout(&the_timeout); return &the_timeout; } static void vss_eof(struct vss_task *vsst) { + mmd->stream_start = *now; if (!vsst->map) return; @@ -230,20 +498,6 @@ const char *supported_audio_formats(void) return SUPPORTED_AUDIO_FORMATS; } -/** - * Get the chunk time of the current audio file. - * - * \return A pointer to a struct containing the chunk time, or NULL, - * if currently no audio file is selected. - */ -struct timeval *vss_chunk_time(void) -{ - if (mmd->afd.afhi.chunk_tv.tv_sec == 0 && - mmd->afd.afhi.chunk_tv.tv_usec == 0) - return NULL; - return &mmd->afd.afhi.chunk_tv; -} - static int need_to_request_new_audio_file(struct vss_task *vsst) { struct timeval diff; @@ -262,6 +516,8 @@ static int need_to_request_new_audio_file(struct vss_task *vsst) return 1; } + + /** * Compute the timeout for para_server's main select-loop. * @@ -285,9 +541,14 @@ static void vss_pre_select(struct sched *s, struct task *t) struct timeval *tv, diff; struct vss_task *vsst = container_of(t, struct vss_task, task); - if (!vsst->map || vss_next() || vss_paused() || vss_repos()) + if (!vsst->map || vss_next() || vss_paused() || vss_repos()) { + struct fec_client *fc, *tmp; for (i = 0; senders[i].name; i++) - senders[i].shutdown_clients(); + if (senders[i].shutdown_clients) + senders[i].shutdown_clients(); + list_for_each_entry_safe(fc, tmp, &fec_client_list, node) + fc->first_stream_chunk = -1; + } if (vss_next()) vss_eof(vsst); else if (vss_paused()) { @@ -407,25 +668,20 @@ err: /** * Main sending function. * - * This function gets called from para_server as soon as the next chunk of data - * should be pushed out. It obtains a pointer to the data to be sent out as - * well as its length from mmd->afd.afhi. This information is then passed to - * each supported sender's send() function which is supposed to send out the data - * to all connected clients. + * This function gets called from vss_post_select(). It checks whether the next + * chunk of data should be pushed out. It obtains a pointer to the data to be + * sent out as well as its length from mmd->afd.afhi. This information is then + * passed to each supported sender's send() function as well as to the send() + * functions of each registered fec client. */ -static void vss_send_chunk(struct vss_task *vsst) +static void vss_send(struct vss_task *vsst) { int i; struct timeval due; - const char *buf; - size_t len; + struct fec_client *fc, *tmp_fc; if (!vsst->map || !vss_playing()) return; - compute_chunk_time(mmd->chunks_sent, &mmd->afd.afhi.chunk_tv, - &mmd->stream_start, &due); - if (tv_diff(&due, now, NULL) > 0) - return; if (chk_barrier("eof", &vsst->eof_barrier, &due, 1) < 0) return; if (chk_barrier("data send", &vsst->data_send_barrier, @@ -435,10 +691,6 @@ static void vss_send_chunk(struct vss_task *vsst) mmd->new_vss_status_flags |= VSS_NEXT; return; } - /* - * We call the send function also in case of empty chunks as they - * might have still some data queued which can be sent in this case. - */ if (!mmd->chunks_sent) { struct timeval tmp; mmd->stream_start = *now; @@ -446,10 +698,36 @@ static void vss_send_chunk(struct vss_task *vsst) mmd->offset = tv2ms(&tmp); mmd->events++; } - afh_get_chunk(mmd->current_chunk, &mmd->afd.afhi, vsst->map, &buf, &len); - for (i = 0; senders[i].name; i++) - senders[i].send(mmd->current_chunk, mmd->chunks_sent, buf, len, - vsst->header_buf, vsst->header_len); + compute_chunk_time(mmd->chunks_sent, &mmd->afd.afhi.chunk_tv, + &mmd->stream_start, &due); + if (tv_diff(&due, now, NULL) <= 0) { + const char *buf; + size_t len; + /* + * We call the send function also in case of empty chunks as + * they might have still some data queued which can be sent in + * this case. + */ + afh_get_chunk(mmd->current_chunk, &mmd->afd.afhi, vsst->map, + &buf, &len); + for (i = 0; senders[i].name; i++) { + if (!senders[i].send) + continue; + senders[i].send(mmd->current_chunk, mmd->chunks_sent, + buf, len, vsst->header_buf, vsst->header_len); + } + } + list_for_each_entry_safe(fc, tmp_fc, &fec_client_list, node) { + if (!next_slice_is_due(fc, NULL)) + continue; + compute_next_fec_slice(fc, vsst); + PARA_DEBUG_LOG("sending %d:%d (%zu bytes)\n", fc->group.num, + fc->slice.num, fc->slice.bytes); + fc->fcp->send((char *)fc->enc_buf, + fc->slice.bytes + FEC_HEADER_SIZE, + fc->fcp->private_data); + fc->slice.num++; + } mmd->new_vss_status_flags |= VSS_PLAYING; mmd->chunks_sent++; mmd->current_chunk++; @@ -484,7 +762,7 @@ static void vss_post_select(struct sched *s, struct task *t) if ((vss_playing() && !(mmd->vss_status_flags & VSS_PLAYING)) || (vss_next() && vss_playing())) tv_add(now, &vsst->announce_tv, &vsst->data_send_barrier); - vss_send_chunk(vsst); + vss_send(vsst); } /** @@ -527,5 +805,6 @@ void init_vss_task(int afs_socket) tv_add(&vsst->autoplay_barrier, &vsst->announce_tv, &vsst->data_send_barrier); } + INIT_LIST_HEAD(&fec_client_list); register_task(&vsst->task); } diff --git a/vss.h b/vss.h index 5c917e6d..efabdf36 100644 --- a/vss.h +++ b/vss.h @@ -25,3 +25,27 @@ const char *supported_audio_formats(void); #define VSS_PLAYING 8 /** A client requested to change the audio file selector. */ #define VSS_CHANGE 16 + +/** + * Each paraslash sender may register arbitrary many clients to the virtual + * streaming system, possibly with varying fec parameters. In order to do so, + * it must allocate a \a fec_client_parms structure and pass it to \ref + * add_fec_client. + * + * Clients are automatically removed from that list by the vss if an error + * occurs, or if the sender requests deletion of a client by calling \ref + * vss_del_fec_client(). + */ +struct fec_client; + +struct fec_client_parms { + uint8_t slices_per_group; + uint8_t data_slices_per_group; + uint16_t max_slice_bytes; + int (*send)(char *buf, size_t num_bytes, void *private_data); + void *private_data; +}; + +int vss_add_fec_client(struct fec_client_parms *fcp, struct fec_client **result); +void vss_del_fec_client(struct fec_client *fc); +size_t vss_get_fec_eof_packet(const char **buf);