From: Andre Noll Date: Fri, 12 Jun 2020 01:34:32 +0000 (+0200) Subject: vss: Rework fec client setup. X-Git-Tag: v0.6.3~2^2~1 X-Git-Url: http://git.tue.mpg.de/?a=commitdiff_plain;h=f1164756a9ce8f61b12060a48b84977f24d9e2b3;p=paraslash.git vss: Rework fec client setup. The current fec code assumes that the chunks of the audio file form a contigous buffer. At least for aac/m4a this is not true, which is why streaming m4a files over udp never worked well. This patch should be a big improvement in this regard. We now copy the chunks to preallocated buffers, which also makes the code easier to follow because we can get rid of the two extra buffers in struct fec_client. --- diff --git a/vss.c b/vss.c index 2a24a500..737b77ac 100644 --- a/vss.c +++ b/vss.c @@ -148,14 +148,10 @@ struct fec_client { struct fec_group group; /** The current slice. */ uint8_t current_slice_num; - /** The data to be FEC-encoded (point to a region within the mapped audio file). */ - const unsigned char **src_data; + /** The data to be FEC-encoded. */ + unsigned char **src_data; /** Last time an audio header was sent. */ struct timeval next_header_time; - /** Used for the last source pointer of an audio file. */ - unsigned char *extra_src_buf; - /** Needed for the last slice of the audio file header. */ - unsigned char *extra_header_buf; /** Extra slices needed to store largest chunk + header. */ int num_extra_slices; /** Contains the FEC-encoded data. */ @@ -240,6 +236,21 @@ static bool need_data_slices(struct fec_client *fc, struct vss_task *vsst) return false; } +static int fc_num_data_slices(const struct fec_client *fc) +{ + return fc->fcp->data_slices_per_group + fc->num_extra_slices; +} + +static int fc_num_slices(const struct fec_client *fc) +{ + return fc->fcp->slices_per_group + fc->num_extra_slices; +} + +static int fc_num_redundant_slices(const struct fec_client *fc) +{ + return fc->fcp->slices_per_group - fc->fcp->data_slices_per_group; +} + static int num_slices(long unsigned bytes, int max_payload, int rs) { int ret; @@ -270,7 +281,7 @@ static void set_group_timing(struct fec_client *fc, struct vss_task *vsst) static int initialize_fec_client(struct fec_client *fc, struct vss_task *vsst) { - int k, n, ret; + int i, k, n, ret; int hs, ds, rs; /* header/data/redundant slices */ struct fec_client_parms *fcp = fc->fcp; @@ -290,7 +301,17 @@ static int initialize_fec_client(struct fec_client *fc, struct vss_task *vsst) if (fc->mps <= FEC_HEADER_SIZE) return -ERRNO_TO_PARA_ERROR(EINVAL); - rs = fc->fcp->slices_per_group - fc->fcp->data_slices_per_group; + /* free previous buffers, if any */ + if (fc->src_data) { + k = fc_num_data_slices(fc); + for (i = 0; i < k; i++) + free(fc->src_data[i]); + free(fc->src_data); + fc->src_data = NULL; + } + free(fc->enc_buf); + + rs = fc_num_redundant_slices(fc); ret = num_slices(vsst->header_len, fc->mps - FEC_HEADER_SIZE, rs); if (ret < 0) return ret; @@ -306,17 +327,18 @@ static int initialize_fec_client(struct fec_client *fc, struct vss_task *vsst) if (k < fc->fcp->data_slices_per_group) k = fc->fcp->data_slices_per_group; fc->num_extra_slices = k - fc->fcp->data_slices_per_group; - n = k + rs; + n = fc_num_slices(fc); + PARA_INFO_LOG("mps: %d, k: %d, n: %d, extra slices: %d\n", + fc->mps, k, n, fc->num_extra_slices); + fec_free(fc->parms); ret = fec_new(k, n, &fc->parms); if (ret < 0) return ret; - PARA_INFO_LOG("mps: %d, k: %d, n: %d, extra slices: %d\n", - fc->mps, k, n, fc->num_extra_slices); - fc->src_data = para_realloc(fc->src_data, k * sizeof(char *)); - fc->enc_buf = para_realloc(fc->enc_buf, fc->mps); - fc->extra_src_buf = para_realloc(fc->extra_src_buf, fc->mps); - fc->extra_header_buf = para_realloc(fc->extra_header_buf, fc->mps); + fc->src_data = para_malloc(k * sizeof(char *)); + for (i = 0; i < k; i++) + fc->src_data[i] = para_malloc(fc->mps); + fc->enc_buf = para_malloc(fc->mps); fc->state = FEC_STATE_READY_TO_RUN; fc->next_header_time.tv_sec = 0; @@ -395,6 +417,8 @@ static int compute_group_size(struct vss_task *vsst, struct fec_group *g, g->num_chunks++; } assert(g->num_chunks); + PARA_DEBUG_LOG("group #%u: %u chunks, %u bytes total\n", g->num, + g->num_chunks, g->bytes); return 1; } @@ -448,8 +472,8 @@ static int compute_group_size(struct vss_task *vsst, struct fec_group *g, static int compute_slice_size(struct fec_client *fc, struct vss_task *vsst) { struct fec_group *g = &fc->group; - int k = fc->fcp->data_slices_per_group + fc->num_extra_slices; - int n = fc->fcp->slices_per_group + fc->num_extra_slices; + int k = fc_num_data_slices(fc); + int n = fc_num_slices(fc); int ret, k1, k2, h, d, min, max, sum; int max_slice_bytes = fc->mps - FEC_HEADER_SIZE; int max_group_bytes; @@ -512,9 +536,8 @@ static int compute_slice_size(struct fec_client *fc, struct vss_task *vsst) static int setup_next_fec_group(struct fec_client *fc, struct vss_task *vsst) { - int ret, i, k, n, data_slices; - size_t len; - char *buf, *p; + int ret, i, c; + size_t copy, src_copied, slice_copied; struct fec_group *g = &fc->group; if (fc->state == FEC_STATE_NONE) { @@ -538,86 +561,63 @@ static int setup_next_fec_group(struct fec_client *fc, struct vss_task *vsst) g->first_chunk += g->num_chunks; g->num++; } - k = fc->fcp->data_slices_per_group + fc->num_extra_slices; - n = fc->fcp->slices_per_group + fc->num_extra_slices; - compute_slice_size(fc, vsst); assert(g->slice_bytes > 0); - ret = num_slices(g->bytes, g->slice_bytes, n - k); - if (ret < 0) - return ret; - data_slices = ret; - assert(g->num_header_slices + data_slices <= k); fc->current_slice_num = 0; if (g->num == 0) set_group_timing(fc, vsst); /* setup header slices */ - buf = vsst->header_buf; - for (i = 0; i < g->num_header_slices; i++) { - uint32_t payload_size; - if (buf + g->slice_bytes <= vsst->header_buf + vsst->header_len) { - fc->src_data[i] = (const unsigned char *)buf; - buf += g->slice_bytes; - continue; - } - /* - * Can not use vss->header_buf for this slice as it - * goes beyond the buffer. This slice will not be fully - * used. - */ - payload_size = vsst->header_buf + vsst->header_len - buf; - memcpy(fc->extra_header_buf, buf, payload_size); - if (payload_size < g->slice_bytes) - memset(fc->extra_header_buf + payload_size, 0, - g->slice_bytes - payload_size); - /* - * There might be more than one header slice to fill although - * only the first one will be used. Set all header slices to - * our extra buffer. - */ - while (i < g->num_header_slices) - fc->src_data[i++] = fc->extra_header_buf; - break; /* we don't want i to be increased. */ + for (i = 0, src_copied = 0; i < g->num_header_slices; i++) { + copy = PARA_MIN((size_t)g->slice_bytes, vsst->header_len - src_copied); + if (copy == 0) + break; + memcpy(fc->src_data[i], vsst->header_buf + src_copied, copy); + if (copy < g->slice_bytes) + memset(fc->src_data[i] + copy, 0, g->slice_bytes - copy); + src_copied += copy; } - /* - * Setup data slices. Note that for ogg streams chunk 0 points to a - * buffer on the heap rather than to the mapped audio file. + * There might be more than one header slice to fill although only the + * first one will be used. Zero out any remaining header slices. */ - ret = vss_get_chunk(g->first_chunk, vsst, &buf, &len); - if (ret < 0) - return ret; - for (p = buf; i < g->num_header_slices + data_slices; i++) { - if (p + g->slice_bytes > buf + g->bytes) { - /* - * We must make a copy for this slice since using p - * directly would exceed the buffer. - */ - uint32_t payload_size = buf + g->bytes - p; - assert(payload_size + FEC_HEADER_SIZE <= fc->mps); - memcpy(fc->extra_src_buf, p, payload_size); - if (payload_size < g->slice_bytes) - memset(fc->extra_src_buf + payload_size, 0, - g->slice_bytes - payload_size); - fc->src_data[i] = fc->extra_src_buf; - i++; - break; + while (i < g->num_header_slices) + memset(fc->src_data[i++], 0, g->slice_bytes); + + slice_copied = 0; + for (c = g->first_chunk; c < g->first_chunk + g->num_chunks; c++) { + char *buf; + size_t src_len; + ret = vss_get_chunk(c, vsst, &buf, &src_len); + if (ret < 0) + return ret; + if (src_len == 0) + continue; + src_copied = 0; + while (src_copied < src_len) { + copy = PARA_MIN((size_t)g->slice_bytes - slice_copied, + src_len - src_copied); + memcpy(fc->src_data[i] + slice_copied, + buf + src_copied, copy); + src_copied += copy; + slice_copied += copy; + if (slice_copied == g->slice_bytes) { + i++; + slice_copied = 0; + } } - fc->src_data[i] = (const unsigned char *)p; - p += g->slice_bytes; - } - if (i < k) { - /* use arbitrary data for all remaining slices */ - buf = vsst->map; - for (; i < k; i++) - fc->src_data[i] = (const unsigned char *)buf; } + if (i < fc_num_data_slices(fc) && slice_copied < g->slice_bytes) + memset(fc->src_data[i] + slice_copied, 0, + g->slice_bytes - slice_copied); + /* zero out remaining slices, if any */ + while (++i < fc_num_data_slices(fc)) + memset(fc->src_data[i], 0, g->slice_bytes); PARA_DEBUG_LOG("FEC group %u: %u chunks (%u - %u), %u bytes\n", g->num, g->num_chunks, g->first_chunk, g->first_chunk + g->num_chunks - 1, g->bytes ); PARA_DEBUG_LOG("slice_bytes: %d, %d header slices, %d data slices\n", - g->slice_bytes, g->num_header_slices, data_slices + g->slice_bytes, g->num_header_slices, fc_num_data_slices(fc) ); return 1; } @@ -637,8 +637,9 @@ static int compute_next_fec_slice(struct fec_client *fc, struct vss_task *vsst) } } write_fec_header(fc, vsst); - fec_encode(fc->parms, fc->src_data, fc->enc_buf + FEC_HEADER_SIZE, - fc->current_slice_num, fc->group.slice_bytes); + fec_encode(fc->parms, (const unsigned char * const*)fc->src_data, + fc->enc_buf + FEC_HEADER_SIZE, fc->current_slice_num, + fc->group.slice_bytes); return 1; } @@ -684,11 +685,13 @@ struct fec_client *vss_add_fec_client(struct sender_client *sc, */ void vss_del_fec_client(struct fec_client *fc) { + int i; + list_del(&fc->node); - free(fc->src_data); free(fc->enc_buf); - free(fc->extra_src_buf); - free(fc->extra_header_buf); + for (i = 0; i < fc_num_data_slices(fc); i++) + free(fc->src_data[i]); + free(fc->src_data); fec_free(fc->parms); free(fc); }