+/*
+ * Copyright (C) 2011-2012 Andre Noll <maan@systemlinux.org>
+ *
+ * Licensed under the GPL v2. For licencing details see COPYING.
+ */
+
+/** \file afh_recv.c Receiver for streaming local files. */
+
#include <regex.h>
#include <sys/types.h>
#include <stdbool.h>
#include "afh_recv.cmdline.h"
#include "string.h"
#include "fd.h"
+#include "afh.h"
+
+struct private_afh_recv_data {
+ int fd;
+ void *map;
+ size_t map_size;
+ struct afh_info afhi;
+ int audio_format_num;
+ long unsigned first_chunk;
+ long unsigned last_chunk;
+ struct timeval stream_start;
+ uint32_t current_chunk;
+};
+
+static int afh_execute(struct btr_node *btrn, const char *cmd, char **result)
+{
+ struct receiver_node *rn = btr_context(btrn);
+ struct private_afh_recv_data *pard = rn->private_data;
+
+ *result = NULL;
+ if (!strcmp(cmd, "seconds_total")) {
+ *result = make_message("%lu", pard->afhi.seconds_total);
+ return 1;
+ }
+ if (!strcmp(cmd, "chunks_total")) {
+ *result = make_message("%lu", pard->afhi.chunks_total);
+ return 1;
+ }
+ if (!strcmp(cmd, "afhi")) {
+ afh_get_afhi_txt(pard->audio_format_num, &pard->afhi, result);
+ return 1;
+ }
+ if (!strncmp(cmd, "repos", 5)) {
+ int32_t x;
+ int ret = para_atoi32(cmd + 5, &x);
+ if (ret < 0)
+ return ret;
+ if (x >= pard->afhi.chunks_total)
+ return -ERRNO_TO_PARA_ERROR(EINVAL);
+ pard->first_chunk = pard->current_chunk = x;
+ rn->task.error = 0;
+ return 1;
+ }
+ return -E_BTR_NAVAIL;
+}
+
+static void *afh_recv_parse_config(int argc, char **argv)
+{
+ struct afh_recv_args_info *tmp = para_calloc(sizeof(*tmp));
+
+ if (!afh_recv_cmdline_parser(argc, argv, tmp))
+ return tmp;
+ free(tmp);
+ return NULL;
+}
+
+static void afh_recv_free_config(void *conf)
+{
+ if (!conf)
+ return;
+ afh_recv_cmdline_parser_free(conf);
+ free(conf);
+}
+
+static int afh_recv_open(struct receiver_node *rn)
+{
+ struct afh_recv_args_info *conf = rn->conf;
+ struct private_afh_recv_data *pard;
+ struct afh_info *afhi;
+ char *filename = conf->filename_arg;
+
+ int ret;
+
+ if (!filename || *filename == '\0')
+ return -E_AFH_RECV_BAD_FILENAME;
+ rn->private_data = pard = para_calloc(sizeof(*pard));
+ afhi = &pard->afhi;
+ ret = mmap_full_file(filename, O_RDONLY, &pard->map,
+ &pard->map_size, &pard->fd);
+ if (ret < 0)
+ goto out;
+ ret = compute_afhi(filename, pard->map, pard->map_size,
+ pard->fd, afhi);
+ if (ret < 0)
+ goto out_unmap;
+ pard->audio_format_num = ret;
+ ret = -ERRNO_TO_PARA_ERROR(EINVAL);
+ if (afhi->chunks_total == 0)
+ goto out_clear_afhi;
+ if (PARA_ABS(conf->begin_chunk_arg) >= afhi->chunks_total)
+ goto out_clear_afhi;
+ if (conf->begin_chunk_arg >= 0)
+ pard->first_chunk = conf->begin_chunk_arg;
+ else
+ pard->first_chunk = afhi->chunks_total + conf->begin_chunk_arg;
+ if (conf->end_chunk_given) {
+ ret = -ERRNO_TO_PARA_ERROR(EINVAL);
+ if (PARA_ABS(conf->end_chunk_arg) > afhi->chunks_total)
+ goto out_clear_afhi;
+ if (conf->end_chunk_arg >= 0)
+ pard->last_chunk = conf->end_chunk_arg;
+ else
+ pard->last_chunk = afhi->chunks_total + conf->end_chunk_arg;
+ } else
+ pard->last_chunk = afhi->chunks_total - 1;
+ ret = -ERRNO_TO_PARA_ERROR(EINVAL);
+ if (pard->first_chunk >= pard->last_chunk)
+ goto out_clear_afhi;
+ pard->current_chunk = pard->first_chunk;
+ return pard->audio_format_num;
+out_clear_afhi:
+ clear_afhi(afhi);
+out_unmap:
+ para_munmap(pard->map, pard->map_size);
+ close(pard->fd);
+out:
+ freep(&rn->private_data);
+ return ret;
+}
+
+static void afh_recv_close(struct receiver_node *rn)
+{
+ struct private_afh_recv_data *pard;
+
+ if (!rn || !rn->private_data)
+ return;
+ pard = rn->private_data;
+ clear_afhi(&pard->afhi);
+ para_munmap(pard->map, pard->map_size);
+ close(pard->fd);
+ freep(&rn->private_data);
+}
+
+static void afh_recv_pre_select(struct sched *s, struct task *t)
+{
+ struct receiver_node *rn = container_of(t, struct receiver_node, task);
+ struct private_afh_recv_data *pard = rn->private_data;
+ struct afh_info *afhi = &pard->afhi;
+ struct afh_recv_args_info *conf = rn->conf;
+ struct timeval chunk_time;
+ int state = generic_recv_pre_select(s, t);
+
+ if (state <= 0)
+ return;
+ if (!conf->just_in_time_given) {
+ sched_min_delay(s);
+ return;
+ }
+ compute_chunk_time(pard->current_chunk - pard->first_chunk,
+ &afhi->chunk_tv, &pard->stream_start, &chunk_time);
+ sched_request_barrier_or_min_delay(&chunk_time, s);
+}
+
+static void afh_recv_post_select(__a_unused struct sched *s, struct task *t)
+{
+ struct receiver_node *rn = container_of(t, struct receiver_node, task);
+ struct afh_recv_args_info *conf = rn->conf;
+ struct private_afh_recv_data *pard = rn->private_data;
+ struct btr_node *btrn = rn->btrn;
+ struct afh_info *afhi = &pard->afhi;
+ int ret;
+ char *buf;
+ const char *start, *end;
+ size_t size;
+ struct timeval chunk_time;
+
+ ret = btr_node_status(btrn, 0, BTR_NT_ROOT);
+ if (ret <= 0)
+ goto out;
+ if (pard->first_chunk > 0 && !conf->no_header_given) {
+ char *header;
+ afh_get_header(afhi, pard->audio_format_num, pard->map,
+ pard->map_size, &header, &size);
+ if (size > 0) {
+ PARA_INFO_LOG("writing header (%zu bytes)\n", size);
+ buf = para_malloc(size);
+ memcpy(buf, header, size);
+ btr_add_output(buf, size, btrn);
+ afh_free_header(header, pard->audio_format_num);
+ }
+ }
+ if (!conf->just_in_time_given) {
+ afh_get_chunk(pard->first_chunk, afhi, pard->map, &start, &size);
+ afh_get_chunk(pard->last_chunk, afhi, pard->map, &end, &size);
+ end += size;
+ PARA_INFO_LOG("adding %zu bytes\n", end - start);
+ btr_add_output_dont_free(start, end - start, btrn);
+ ret = -E_RECV_EOF;
+ goto out;
+ }
+ if (pard->current_chunk == pard->first_chunk)
+ pard->stream_start = *now;
+ else {
+ compute_chunk_time(pard->current_chunk - pard->first_chunk,
+ &afhi->chunk_tv, &pard->stream_start, &chunk_time);
+ ret = tv_diff(&chunk_time, now, NULL);
+ if (ret > 0)
+ goto out;
+ }
+ afh_get_chunk(pard->current_chunk, afhi, pard->map, &start, &size);
+ PARA_DEBUG_LOG("adding chunk %u\n", pard->current_chunk);
+ btr_add_output_dont_free(start, size, btrn);
+ if (pard->current_chunk >= pard->last_chunk) {
+ ret = -E_RECV_EOF;
+ goto out;
+ }
+ pard->current_chunk++;
+ ret = 1;
+out:
+ if (ret < 0) {
+ btr_remove_node(&rn->btrn);
+ pard->current_chunk = pard->first_chunk;
+ }
+ t->error = ret;
+}
/**
* The init function of the afh receiver.
{
struct afh_recv_args_info dummy;
+ afh_init();
afh_recv_cmdline_parser_init(&dummy);
-#if 0
r->open = afh_recv_open;
r->close = afh_recv_close;
r->pre_select = afh_recv_pre_select;
r->post_select = afh_recv_post_select;
r->parse_config = afh_recv_parse_config;
r->free_config = afh_recv_free_config;
-#endif
+ r->execute = afh_execute;
r->help = (struct ggo_help) {
.short_help = afh_recv_args_info_help,
.detailed_help = afh_recv_args_info_detailed_help