From: Andre Noll Date: Sun, 8 Mar 2009 20:41:27 +0000 (+0100) Subject: Make FEC work with oggvorbis streams. X-Git-Tag: v0.3.4~47 X-Git-Url: http://git.tue.mpg.de/?a=commitdiff_plain;h=1beeb3913ee5d0e03805e55300720d267a6a51e0;p=paraslash.git Make FEC work with oggvorbis streams. Unfortunately, this required to rewrite large parts of the fec code in vss.c. The new code behaves better if the FEC parameters are not suitable for the current audio file, for example if the audio file header does not fit into the desired number of slices. It detects this situation and adjusts the given FEC parameters accordingly. --- diff --git a/fecdec_filter.c b/fecdec_filter.c index d5593f25..794add49 100644 --- a/fecdec_filter.c +++ b/fecdec_filter.c @@ -28,7 +28,7 @@ #define NUM_FEC_GROUPS 3 /** Size of the output buffer of the fecdec filter. */ -#define FECDEC_OUTBUF_SIZE (128 * 1024) +#define FECDEC_OUTBUF_SIZE (1024 * 1024) /* FIXME: This has to depend on the fec params */ /** Data read from the header of a slice. */ struct fec_header { @@ -46,6 +46,10 @@ struct fec_header { uint8_t slice_num; /** Used data bytes of this slice. */ uint16_t slice_bytes; + /** Non-zero if this group is the beginning of the stream. */ + uint8_t bos; + /** Non-zero if this stream embedds audio headers into fec groups. */ + uint8_t header_stream; }; /** @@ -72,6 +76,7 @@ struct private_fecdec_data { struct fec_parms *fec; /** Keeps track of what was received so far. */ struct fecdec_group groups[NUM_FEC_GROUPS]; + int have_header; }; /** Iterate over all fecdec groups. */ @@ -218,19 +223,58 @@ static int add_slice(char *buf, struct fecdec_group *fg) return 1; } +enum fec_group_usability { + FEC_GROUP_UNUSABLE, + FEC_GROUP_USABLE, + FEC_GROUP_USABLE_SKIP_HEADER, +}; + +static enum fec_group_usability group_is_usable(struct fecdec_group *fg, + struct private_fecdec_data *pfd) +{ + struct fec_header *h = &fg->h; + + if (!h->header_stream) + return FEC_GROUP_USABLE; + if (pfd->have_header) { + if (h->audio_header_size) + return FEC_GROUP_USABLE_SKIP_HEADER; + return FEC_GROUP_USABLE; + } + if (fg->h.bos) + return FEC_GROUP_USABLE; + if (fg->h.audio_header_size) + return FEC_GROUP_USABLE; + return FEC_GROUP_UNUSABLE; +} + static int decode_group(struct fecdec_group *fg, struct filter_node *fn) { int i, ret, sb = fg->h.slice_bytes; size_t written = 0; struct private_fecdec_data *pfd = fn->private_data; + enum fec_group_usability u = group_is_usable(fg, pfd); + if (u == FEC_GROUP_UNUSABLE) { + PARA_INFO_LOG("dropping unusable group %d\n", fg->h.group_num); + return 0; + } + PARA_DEBUG_LOG("decoding group %d %d slices\n", fg->h.group_num, + fg->h.data_slices_per_group); ret = fec_decode(pfd->fec, fg->data, fg->idx, sb); if (ret < 0) return ret; + pfd->have_header = 1; + i = 0; + if (u == FEC_GROUP_USABLE_SKIP_HEADER) { + i = ROUND_UP(fg->h.audio_header_size, fg->h.slice_bytes) + / fg->h.slice_bytes; + PARA_DEBUG_LOG("skipping %d header slices\n", i); + } PARA_DEBUG_LOG("writing group %d (%d/%d decoded data bytes)\n", fg->h.group_num, fg->h.group_bytes, fg->h.data_slices_per_group * sb); - for (i = 0; i < fg->h.data_slices_per_group; i++) { + for (; i < fg->h.data_slices_per_group; i++) { size_t n = sb; if (n + written > fg->h.group_bytes) n = fg->h.group_bytes - written; @@ -267,6 +311,8 @@ static int read_fec_header(char *buf, size_t len, struct fec_header *h) h->slice_num = read_u8(buf + 18); h->slice_bytes = read_u16(buf + 20); + h->bos = read_u8(buf + 22); + h->header_stream = read_u8(buf + 23); if (!memcmp(buf, FEC_EOF_PACKET, FEC_EOF_PACKET_LEN)) return -E_FECDEC_EOF; // PARA_DEBUG_LOG("group %u, slize %u, slices per group: %u\n", diff --git a/vss.c b/vss.c index d0e1cf53..1dd24664 100644 --- a/vss.c +++ b/vss.c @@ -87,6 +87,8 @@ struct vss_task { const char *header_buf; /** Length of the audio file header. */ size_t header_len; + /** Time between audio file headers are sent. */ + struct timeval header_interval; }; /** @@ -96,27 +98,17 @@ struct vss_task { */ static struct list_head fec_client_list; -/** - * Describes one slice of a FEC group. - * - * FEC slices directly correspond to the data packages sent by the paraslash - * senders that use FEC. Each slice is identified by its group number and its - * number within the group. All slices have the same size, but the last slice - * of the group may not be filled entirely. - */ -struct fec_slice { - /** The slice number within the FEC group. */ - uint8_t num; - /** The number of used bytes in this slice. */ - uint16_t bytes; -}; - /** * Data associated with one FEC group. * * A FEC group consists of a fixed number of slices and this number is given by * the \a slices_per_group parameter of struct \ref fec_client_parms. Each FEC * group contains a number of chunks of the current audio file. + * + * FEC slices directly correspond to the data packages sent by the paraslash + * senders that use FEC. Each slice is identified by its group number and its + * number within the group. All slices have the same size, but the last slice + * of the group may not be filled entirely. */ struct fec_group { /** The number of the FEC group. */ @@ -127,12 +119,12 @@ struct fec_group { uint32_t first_chunk; /** The number of chunks contained in this group. */ uint32_t num_chunks; - /** The time needed to play all chunks of the group. */ - struct timeval duration; /** When the first chunk was sent. */ struct timeval start; - /** \a The group duration divided by \a slices_per_group. */ + /** The group duration divided by the number of slices. */ struct timeval slice_duration; + /** Group contains the audio file header that occupies that many slices. */ + unsigned num_header_slices; }; /** @@ -151,18 +143,18 @@ struct fec_client { int first_stream_chunk; /** Describes the current group. */ struct fec_group group; - /** Describes the current slice. */ - struct fec_slice slice; + /** The current slice. */ + uint8_t current_slice_num; /** The data to be FEC-encoded (point to a region within the mapped audio file). */ const unsigned char **src_data; - /** Used for the last source pointer of the last group. */ + /** Last time an audio header was sent. */ + struct timeval next_header_time; + /** Used for the last source pointer of an audio file. */ unsigned char *extra_src_buf; - /** The size of the buffer for the extra source pointer. */ - size_t extra_src_buf_size; - /** Contains FEC-encoded data. */ + /** Extra slices needed to store largest chunk + header. */ + int num_extra_slices; + /** Contains the FEC-encoded data. */ unsigned char *enc_buf; - /** Size of \a enc_buf. */ - size_t enc_buf_size; }; /** @@ -185,114 +177,176 @@ struct timeval *vss_chunk_time(void) * \param buf The buffer to write to. * \param h The fec header to write. */ -static void write_fec_header(struct fec_client *fc) +static void write_fec_header(struct fec_client *fc, struct vss_task *vsst) { char *buf = (char *)fc->enc_buf; + struct fec_group *g = &fc->group; + struct fec_client_parms *p = fc->fcp; write_u32(buf, FEC_MAGIC); - write_u8(buf + 4, fc->fcp->slices_per_group); - write_u8(buf + 5, fc->fcp->data_slices_per_group); - write_u32(buf + 6, (uint32_t)0); /* audio header len */ + write_u8(buf + 4, p->slices_per_group + fc->num_extra_slices); + write_u8(buf + 5, p->data_slices_per_group + fc->num_extra_slices); + write_u32(buf + 6, g->num_header_slices? vsst->header_len : 0); + + write_u32(buf + 10, g->num); + write_u32(buf + 14, g->bytes); + + write_u8(buf + 18, fc->current_slice_num); + write_u16(buf + 20, p->max_slice_bytes - FEC_HEADER_SIZE); + write_u8(buf + 22, g->first_chunk? 0 : 1); + write_u8(buf + 23, vsst->header_len? 1 : 0); + memset(buf + 24, 0, 7); +} - write_u32(buf + 10, fc->group.num); - write_u32(buf + 14, fc->group.bytes); +static int need_audio_header(struct fec_client *fc, struct vss_task *vsst) +{ + if (!mmd->current_chunk) { + tv_add(now, &vsst->header_interval, &fc->next_header_time); + return 0; + } + if (!vsst->header_buf) + return 0; + if (!vsst->header_len) + return 0; + if (fc->group.num && tv_diff(&fc->next_header_time, now, NULL) < 0) + return 0; + tv_add(now, &vsst->header_interval, &fc->next_header_time); + return 1; +} - write_u8(buf + 18, fc->slice.num); - write_u16(buf + 20, fc->slice.bytes); - memset(buf + 22, 0, 11); +static uint8_t num_slices(long unsigned bytes, struct fec_client *fc) +{ + uint16_t m = fc->fcp->max_slice_bytes - FEC_HEADER_SIZE; + return (bytes + m - 1) / m; } static int setup_next_fec_group(struct fec_client *fc, struct vss_task *vsst) { - uint32_t max_group_size, last_payload_size; - int i, k = fc->fcp->data_slices_per_group; + int i, k, data_slices; size_t len; - const char *start_buf; + const char *buf, *start_buf; struct timeval tmp, *chunk_tv = vss_chunk_time(); + struct fec_group *g = &fc->group; + unsigned slice_bytes = fc->fcp->max_slice_bytes - FEC_HEADER_SIZE; + uint32_t max_data_size; assert(chunk_tv); + k = fc->fcp->data_slices_per_group + fc->num_extra_slices; if (fc->first_stream_chunk < 0) { + uint32_t largest = afh_get_largest_chunk_size(&mmd->afd.afhi) + + vsst->header_len; + uint8_t needed = num_slices(largest, fc), want; + if (needed > fc->fcp->data_slices_per_group) + PARA_WARNING_LOG("fec parms insufficient for this audio file\n"); + want = PARA_MAX(needed, fc->fcp->data_slices_per_group); + if (want != k) { + int ret; + fec_free(fc->parms); + fc->src_data = para_realloc(fc->src_data, want * sizeof(char *)); + ret = fec_new(want, want + fc->fcp->slices_per_group + - fc->fcp->data_slices_per_group, &fc->parms); + if (ret < 0) + return ret; + k = want; + fc->num_extra_slices = 0; + if (k > fc->fcp->data_slices_per_group) { + fc->num_extra_slices = k - fc->fcp->data_slices_per_group; + PARA_NOTICE_LOG("using %d extra slices\n", + fc->num_extra_slices); + } + } fc->stream_start = *now; fc->first_stream_chunk = mmd->current_chunk; - fc->group.first_chunk = mmd->current_chunk; - fc->group.num = 0; + g->first_chunk = mmd->current_chunk; + g->num = 0; } else { - fc->group.first_chunk += fc->group.num_chunks; - fc->group.num++; + g->first_chunk += g->num_chunks; + g->num++; } - if (fc->group.first_chunk >= mmd->afd.afhi.chunks_total) + if (g->first_chunk >= mmd->afd.afhi.chunks_total) return 0; - max_group_size = (fc->fcp->max_slice_bytes - FEC_HEADER_SIZE) * k; - afh_get_chunk(fc->group.first_chunk, &mmd->afd.afhi, vsst->map, - &start_buf, &len); - for (i = fc->group.first_chunk, fc->group.bytes = 0; - i < mmd->afd.afhi.chunks_total; i++) { - const char *buf; - + if (need_audio_header(fc, vsst)) + g->num_header_slices = num_slices(vsst->header_len, fc); + else + g->num_header_slices = 0; + afh_get_chunk(g->first_chunk, &mmd->afd.afhi, vsst->map, &start_buf, + &len); + data_slices = k - g->num_header_slices; + assert(data_slices); + max_data_size = slice_bytes * data_slices; + g->bytes = 0; + for (i = g->first_chunk; i < mmd->afd.afhi.chunks_total; i++) { afh_get_chunk(i, &mmd->afd.afhi, vsst->map, &buf, &len); - if (fc->group.bytes + len > max_group_size) + if (g->bytes + len > max_data_size) break; - fc->group.bytes += len; + g->bytes += len; + } + g->num_chunks = i - g->first_chunk; + assert(g->num_chunks); + fc->current_slice_num = 0; + + /* setup header slices */ + buf = vsst->header_buf; + for (i = 0; i < g->num_header_slices; i++) { + fc->src_data[i] = (const unsigned char *)buf; + buf += slice_bytes; } - fc->group.num_chunks = i - fc->group.first_chunk; - fc->slice.num = 0; - fc->slice.bytes = ROUND_UP(fc->group.bytes, k) / k; - /* FIXME: Avoid division by zero in a better way */ - if (!fc->slice.bytes) - return 0; - /* The last slice will not be fully used */ - last_payload_size = fc->group.bytes % fc->slice.bytes; - if (!last_payload_size) - last_payload_size = fc->slice.bytes; - - tv_scale(fc->group.first_chunk - fc->first_stream_chunk, chunk_tv, - &tmp); - tv_add(&fc->stream_start, &tmp, &fc->group.start); - if (fc->group.num) /* quick hack to avoid buffer underruns */ - fc->group.start.tv_sec--; - tv_scale(fc->group.num_chunks, chunk_tv, &fc->group.duration); - tv_divide(fc->fcp->slices_per_group, &fc->group.duration, - &fc->group.slice_duration); - - for (i = 0; i < k; i++) - fc->src_data[i] = (const unsigned char *)start_buf - + i * fc->slice.bytes; - - if (start_buf + k * fc->slice.bytes > vsst->map + mmd->size) { - /* can not use last slice as it goes beyond the map */ - if (fc->extra_src_buf_size < fc->slice.bytes) { - fc->extra_src_buf = para_realloc(fc->extra_src_buf, fc->slice.bytes); - fc->extra_src_buf_size = fc->slice.bytes; - } - memcpy(fc->extra_src_buf, start_buf + (k - 1) * fc->slice.bytes, - last_payload_size); - memset(fc->extra_src_buf + last_payload_size, 0, - fc->slice.bytes - last_payload_size); - fc->src_data[k - 1] = fc->extra_src_buf; + /* setup data slices */ + buf = start_buf; + for (i = g->num_header_slices; i < k; i++) { + if (buf + slice_bytes > vsst->map + mmd->size) + /* + * Can not use the memory mapped audio file for this + * slice as it goes beyond the map. This slice will not + * be fully used. + */ + break; + fc->src_data[i] = (const unsigned char *)buf; + buf += slice_bytes; } - if (fc->enc_buf_size < fc->slice.bytes + FEC_HEADER_SIZE) { - fc->enc_buf_size = fc->slice.bytes + FEC_HEADER_SIZE; - fc->enc_buf = para_realloc(fc->enc_buf, fc->enc_buf_size); + if (i < k) { + uint32_t payload_size = vsst->map + mmd->size - buf; + memcpy(fc->extra_src_buf, buf, payload_size); + fc->src_data[i] = fc->extra_src_buf; + i++; + /* use arbitrary data for all remaining slices */ + buf = vsst->map; + for (; i < k; i++) + fc->src_data[i] = (const unsigned char *)buf; } - PARA_INFO_LOG("FEC group %d: %d chunks (%d - %d), duration: %lums\n", - fc->group.num, fc->group.num_chunks, fc->group.first_chunk, - fc->group.first_chunk + fc->group.num_chunks - 1, - tv2ms(&fc->group.duration)); + + /* setup group timing */ + tv_scale(g->first_chunk - fc->first_stream_chunk, chunk_tv, &tmp); + tv_add(&fc->stream_start, &tmp, &g->start); + if (g->num) /* quick hack to avoid buffer underruns */ + g->start.tv_sec--; + tv_scale(g->num_chunks, chunk_tv, &tmp); /* group duration */ + tv_divide(fc->fcp->slices_per_group + fc->num_extra_slices, + &tmp, &g->slice_duration); + + PARA_DEBUG_LOG("FEC group %d: %d chunks (%d - %d), %d header slices, %d data slices\n", + g->num, g->num_chunks, g->first_chunk, + g->first_chunk + g->num_chunks - 1, + g->num_header_slices, data_slices + ); + PARA_DEBUG_LOG("durations (group/chunk/slice): %lu/%lu/%lu\n", + tv2ms(&tmp), tv2ms(chunk_tv), tv2ms(&g->slice_duration)); return 1; } static int compute_next_fec_slice(struct fec_client *fc, struct vss_task *vsst) { - if (fc->first_stream_chunk < 0 || fc->slice.num - == fc->fcp->slices_per_group) { + if (fc->first_stream_chunk < 0 || fc->current_slice_num + == fc->fcp->slices_per_group + fc->num_extra_slices) { if (!setup_next_fec_group(fc, vsst)) return 0; } - write_fec_header(fc); + write_fec_header(fc, vsst); fec_encode(fc->parms, fc->src_data, fc->enc_buf + FEC_HEADER_SIZE, - fc->slice.num, fc->slice.bytes); + fc->current_slice_num, + fc->fcp->max_slice_bytes - FEC_HEADER_SIZE); return 1; } @@ -335,6 +389,10 @@ int vss_add_fec_client(struct fec_client_parms *fcp, struct fec_client **result) goto err; fc->first_stream_chunk = -1; /* stream not yet started */ fc->src_data = para_malloc(fc->fcp->slices_per_group * sizeof(char *)); + fc->enc_buf = para_calloc(fc->fcp->max_slice_bytes); + fc->num_extra_slices = 0; + fc->extra_src_buf = para_calloc(fc->fcp->max_slice_bytes); + fc->next_header_time.tv_sec = 0; para_list_add(&fc->node, &fec_client_list); *result = fc; return 1; @@ -374,7 +432,7 @@ static int next_slice_is_due(struct fec_client *fc, struct timeval *diff) if (fc->first_stream_chunk < 0) return 1; - tv_scale(fc->slice.num, &fc->group.slice_duration, &tmp); + tv_scale(fc->current_slice_num, &fc->group.slice_duration, &tmp); tv_add(&tmp, &fc->group.start, &next); ret = tv_diff(&next, now, diff); return ret < 0? 1 : 0; @@ -564,8 +622,6 @@ static int need_to_request_new_audio_file(struct vss_task *vsst) return 1; } - - /** * Compute the timeout for para_server's main select-loop. * @@ -739,18 +795,19 @@ static void vss_send(struct vss_task *vsst) mmd->new_vss_status_flags |= VSS_NEXT; return; } - if (!mmd->chunks_sent) { - struct timeval tmp; - mmd->stream_start = *now; - tv_scale(mmd->current_chunk, &mmd->afd.afhi.chunk_tv, &tmp); - mmd->offset = tv2ms(&tmp); - mmd->events++; - } compute_chunk_time(mmd->chunks_sent, &mmd->afd.afhi.chunk_tv, &mmd->stream_start, &due); if (tv_diff(&due, now, NULL) <= 0) { const char *buf; size_t len; + + if (!mmd->chunks_sent) { + struct timeval tmp; + mmd->stream_start = *now; + tv_scale(mmd->current_chunk, &mmd->afd.afhi.chunk_tv, &tmp); + mmd->offset = tv2ms(&tmp); + mmd->events++; + } /* * We call the send function also in case of empty chunks as * they might have still some data queued which can be sent in @@ -764,6 +821,8 @@ static void vss_send(struct vss_task *vsst) senders[i].send(mmd->current_chunk, mmd->chunks_sent, buf, len, vsst->header_buf, vsst->header_len); } + mmd->chunks_sent++; + mmd->current_chunk++; } list_for_each_entry_safe(fc, tmp_fc, &fec_client_list, node) { if (!next_slice_is_due(fc, NULL)) @@ -771,15 +830,12 @@ static void vss_send(struct vss_task *vsst) if (!compute_next_fec_slice(fc, vsst)) continue; PARA_DEBUG_LOG("sending %d:%d (%u bytes)\n", fc->group.num, - fc->slice.num, fc->slice.bytes); + fc->current_slice_num, fc->fcp->max_slice_bytes); fc->fcp->send((char *)fc->enc_buf, - fc->slice.bytes + FEC_HEADER_SIZE, + fc->fcp->max_slice_bytes, fc->fcp->private_data); - fc->slice.num++; + fc->current_slice_num++; } - mmd->new_vss_status_flags |= VSS_PLAYING; - mmd->chunks_sent++; - mmd->current_chunk++; } static void vss_post_select(struct sched *s, struct task *t) @@ -787,6 +843,7 @@ static void vss_post_select(struct sched *s, struct task *t) int ret, i; struct vss_task *vsst = container_of(t, struct vss_task, task); + if (mmd->sender_cmd_data.cmd_num >= 0) { int num = mmd->sender_cmd_data.cmd_num, sender_num = mmd->sender_cmd_data.sender_num; @@ -831,7 +888,7 @@ void init_vss_task(int afs_socket) conf.announce_time_arg : 300, autoplay_delay = conf.autoplay_delay_arg > 0? conf.autoplay_delay_arg : 0; - + vsst->header_interval.tv_sec = 5; /* should this be configurable? */ vsst->afs_socket = afs_socket; vsst->task.pre_select = vss_pre_select; vsst->task.post_select = vss_post_select;