#include "para.h"
#include "fd.h"
#include "string.h"
+#include "list.h"
+#include "sched.h"
#include "write.h"
#include <alsa/asoundlib.h>
snd_pcm_t *handle;
/** determined and set by alsa_open() */
size_t bytes_per_frame;
+struct timeval next_chunk;
};
/*
unsigned buffer_time = 0;
int err;
snd_pcm_info_t *info;
- snd_output_t *log;
snd_pcm_uframes_t period_size;
- struct private_alsa_data *pad = para_malloc(sizeof(struct private_alsa_data));
+ struct private_alsa_data *pad = para_calloc(sizeof(struct private_alsa_data));
w->private_data = pad;
snd_pcm_info_alloca(&info);
- if (snd_output_stdio_attach(&log, stderr, 0) < 0)
- return -E_ALSA_LOG;
err = snd_pcm_open(&pad->handle, conf.device_arg,
SND_PCM_STREAM_PLAYBACK, 0);
if (err < 0)
// PARA_ERROR_LOG("%s\n", "failed to set nonblock mode");
return period_size * pad->bytes_per_frame;
}
+static void alsa_write_pre_select(struct sched *s, struct task *t)
+{
+ struct writer_node *wn = t->private_data;
+ struct private_alsa_data *pad = wn->private_data;
+ struct writer_node_group *wng = wn->wng;
+ struct timeval diff;
-/**
- * push out pcm frames
- * \param data pointer do data to be written
- * \param nbytes number of bytes (not frames)
- *
- * \return Number of bytes written, -E_ALSA_WRITE on errors.
- */
-static int alsa_write(char *data, size_t nbytes, struct writer_node *wn)
+ t->ret = 0;
+ if (wng->eof && *wng->loaded < pad->bytes_per_frame)
+ return;
+ t->ret = 1;
+ if (*wng->loaded < pad->bytes_per_frame)
+ return;
+ if (tv_diff(&s->now, &pad->next_chunk, &diff) < 0) {
+ if (tv_diff(&s->timeout, &diff, NULL) < 0)
+ s->timeout = diff;
+ } else {
+ s->timeout.tv_sec = 0;
+ s->timeout.tv_usec = 0;
+ }
+}
+
+static void alsa_write_post_select(struct sched *s, struct task *t)
{
+ struct writer_node *wn = t->private_data;
struct private_alsa_data *pad = wn->private_data;
- size_t frames = nbytes / pad->bytes_per_frame;
- unsigned char *d = (unsigned char*)data;
- snd_pcm_sframes_t r, result = 0;
-
- while (frames > 0) {
- /* write interleaved frames */
- r = snd_pcm_writei(pad->handle, d, frames);
- if (r < 0)
- PARA_ERROR_LOG("write error: %s\n", snd_strerror(r));
- if (r == -EAGAIN || (r >= 0 && r < frames))
- snd_pcm_wait(pad->handle, 1);
- else if (r == -EPIPE)
+ struct writer_node_group *wng = wn->wng;
+ size_t frames = *wng->loaded / pad->bytes_per_frame;
+ snd_pcm_sframes_t ret, result = 0;
+ unsigned char *data = (unsigned char*)wng->buf;
+
+ t->ret = 0;
+ if (!frames)
+ return;
+ if (tv_diff(&s->now, &pad->next_chunk, NULL) < 0)
+ return;
+// PARA_INFO_LOG("%zd frames\n", frames);
+// while (frames > 0) {
+ ret = snd_pcm_writei(pad->handle, data, frames);
+ if (ret == -EAGAIN || (ret >= 0 && ret < frames)) {
+ struct timeval tv = {0, 1000 * 10};
+ PARA_INFO_LOG("EAGAIN. frames: %d, ret: %lu\n", frames, ret);
+ tv_add(&s->now, &tv, &pad->next_chunk);
+// snd_pcm_wait(pad->handle, 1);
+ } else if (ret == -EPIPE) {
+ PARA_INFO_LOG("%s", "EPIPE\n");
snd_pcm_prepare(pad->handle);
- else if (r < 0)
- return -E_ALSA_WRITE;
- if (r > 0) {
- result += r;
- frames -= r;
- d += r * pad->bytes_per_frame;
+ } else if (ret < 0) {
+ PARA_INFO_LOG("ALSA ERR %d\n", frames);
+ t->ret = -E_ALSA_WRITE;
+ return;
}
- }
- return result * pad->bytes_per_frame;
+ if (ret >= 0) {
+ result += ret;
+ frames -= ret;
+ data += ret * pad->bytes_per_frame;
+ }
+// if (ret == -EAGAIN)
+// break;
+// }
+ t->ret = result * pad->bytes_per_frame;
+// PARA_INFO_LOG("ret: %d, frames: %zd\n", t->ret, frames);
}
static void alsa_close(struct writer_node *wn)
void alsa_writer_init(struct writer *w)
{
w->open = alsa_open;
- w->write = alsa_write;
w->close = alsa_close;
+ w->pre_select = alsa_write_pre_select;
+ w->post_select = alsa_write_post_select;
w->shutdown = NULL; /* nothing to do */
}
audio_formats[s->format], slot_num);
if (s->fci)
s->fci->error = 42;
+ kill_stream_writer(slot_num);
}
}
}
server_ldflags=""
write_cmdline_objs="write.cmdline"
-write_errlist_objs="write write_common file_writer time fd string"
+write_errlist_objs="write write_common file_writer time fd string sched stdin"
write_ldflags=""
write_writers="file"
SS_ORTP_RECV,
SS_AUDIOD,
SS_EXEC,
+ SS_SCHED,
+ SS_STDIN,
SS_SIGNAL,
SS_STRING,
SS_STAT,
extern const char **para_errlist[];
/** \endcond */
+#define STDIN_ERRORS \
+ PARA_ERROR(STDIN_READ, "failed to read from stdin"), \
+
+
+#define SCHED_ERRORS \
+ PARA_ERROR(PRE_EOF, "pre_select returned zero"), \
+ PARA_ERROR(POST_EOF, "post_select returned zero"), \
+
+
+
+
#define NET_ERRORS \
PARA_ERROR(SEND, "send error"), \
PARA_ERROR(RECV, "receive error"), \
PARA_ERROR(DCCP_OVERRUN, "dccp output buffer buffer overrun"), \
PARA_ERROR(DCCP_CONNECT, "dccp connect error"), \
+
#define DCCP_SEND_ERRORS \
PARA_ERROR(DCCP_BIND, "dccp bind error"), \
PARA_ERROR(DCCP_LISTEN, "dccp listen error"), \
PARA_ERROR(DCCP_WRITE, "dccp write error"), \
+
#define FD_ERRORS \
PARA_ERROR(F_GETFL, "failed to get fd flags"), \
PARA_ERROR(F_SETFL, "failed to set fd flags"), \
PARA_ERROR(WRITE_SYNTAX, "para_write syntax error"), \
PARA_ERROR(WRITE_OVERRUN, "buffer overrun"), \
PARA_ERROR(PREMATURE_END, "premature end of audio file"), \
+ PARA_ERROR(NO_WAV_HEADER, "wave header not found"), \
+ PARA_ERROR(NO_DELAY, "no initial delay"), \
#define ALSA_WRITER_ERRORS \
#define FILE_WRITER_ERRORS \
PARA_ERROR(FW_WRITE, "file writer write error"), \
PARA_ERROR(FW_OPEN, "file writer: can not open output file"), \
+ PARA_ERROR(FW_NO_FILE, "task started without open file"), \
#define WRITE_COMMON_ERRORS \
PARA_ERROR(AACDEC_INIT, "failed to init aac decoder"), \
PARA_ERROR(AAC_DECODE, "aac decode error"), \
+
/**
* the subsystem shift
*
/** \cond popcorn time */
SS_ENUM(GUI);
+SS_ENUM(SCHED);
+SS_ENUM(STDIN);
SS_ENUM(WAV);
SS_ENUM(COMPRESS);
SS_ENUM(TIME);
/** \file file_writer.c simple output plugin for testing purposes */
#include "para.h"
+#include "list.h"
+#include "sched.h"
#include "write.h"
#include "string.h"
+#include "fd.h"
#include "error.h"
/** data specific to the file writer */
struct private_file_writer_data {
/** the file descriptor of the output file */
int fd;
+int check_fd;
};
-static int file_writer_open(struct writer_node *w)
+static int file_writer_open(struct writer_node *wn)
{
struct private_file_writer_data *pfwd = para_calloc(
sizeof(struct private_file_writer_data));
free(home);
free(tmp);
- w->private_data = pfwd;
+ wn->private_data = pfwd;
pfwd->fd = open(filename, O_WRONLY | O_CREAT, S_IRUSR | S_IWUSR);
free(filename);
if (pfwd->fd >= 0)
return ret;
}
+static void file_writer_pre_select(struct sched *s, struct task *t)
+{
+ struct writer_node *wn = t->private_data;
+ struct private_file_writer_data *pfwd = wn->private_data;
+ struct writer_node_group *wng = wn->wng;
+
+// PARA_INFO_LOG("task %p check_fd: %d\n", t, pfwd->check_fd);
+ pfwd->check_fd = 0;
+ t->ret = -E_FW_NO_FILE;
+ if (pfwd->fd <= 0)
+ return;
+ t->ret = 0;
+ if (!*wng->loaded)
+ return;
+ t->ret = 1;
+ para_fd_set(pfwd->fd, &s->wfds, &s->max_fileno);
+ pfwd->check_fd = 1;
+}
+
+static void file_writer_post_select(struct sched *s, struct task *t)
+{
+ struct writer_node *wn = t->private_data;
+ struct private_file_writer_data *pfwd = wn->private_data;
+ struct writer_node_group *wng = wn->wng;
+
+ t->ret = 0;
+ if (!pfwd->check_fd)
+ return;
+ if (!*wng->loaded)
+ return;
+ if (!FD_ISSET(pfwd->fd, &s->wfds))
+ return;
+// PARA_INFO_LOG("writing %zd\n", *wng->loaded);
+ t->ret = write(pfwd->fd, wng->buf, *wng->loaded);
+ if (t->ret < 0)
+ t->ret = -E_FW_WRITE;
+}
+
static void file_writer_close(struct writer_node *wn)
{
struct private_file_writer_data *pfwd = wn->private_data;
{
w->open = file_writer_open;
w->write = file_writer_write;
+ w->pre_select = file_writer_pre_select;
+ w->post_select = file_writer_post_select;
w->close = file_writer_close;
w->shutdown = NULL; /* nothing to do */
}
n = list_entry(pos->member.next, typeof(*pos), member); \
&pos->member != (head); \
pos = n, n = list_entry(n->member.next, typeof(*n), member))
+/**
+ * list_for_each_entry_safe_reverse - iterate backwards over list of given type safe against
+ * removal of list entry
+ * @pos: the type * to use as a loop counter.
+ * @n: another type * to use as temporary storage
+ * @head: the head for your list.
+ * @member: the name of the list_struct within the struct.
+ */
+#define list_for_each_entry_safe_reverse(pos, n, head, member) \
+ for (pos = list_entry((head)->prev, typeof(*pos), member), \
+ n = list_entry(pos->member.prev, typeof(*pos), member); \
+ &pos->member != (head); \
+ pos = n, n = list_entry(n->member.prev, typeof(*n), member))
+
#endif /* _LIST_H */
--- /dev/null
+#include <sys/time.h>
+#include "para.h"
+#include "ipc.h"
+#include "fd.h"
+#include "list.h"
+#include "sched.h"
+#include "string.h"
+#include "error.h"
+
+struct list_head pre_select_list;
+struct list_head post_select_list;
+
+static void sched_preselect(struct sched *s)
+{
+ struct task *t, *tmp;
+again:
+ list_for_each_entry_safe(t, tmp, &pre_select_list, pre_select_node) {
+ t->pre_select(s, t);
+ if (t->ret > 0 || !t->error_handler)
+ continue;
+ if (t->ret < 0) {
+ t->error_handler(t);
+ goto again;
+ }
+ if (!(t->flags & PRE_EOF_IS_ERROR))
+ continue;
+ t->ret = -E_PRE_EOF;
+ t->error_handler(t);
+ goto again;
+ }
+}
+
+static void sched_post_select(struct sched *s)
+{
+ struct task *t, *tmp;
+
+ list_for_each_entry_safe(t, tmp, &post_select_list, post_select_node) {
+ t->post_select(s, t);
+ if (t->ret > 0 || !t->error_handler)
+ continue;
+ if (t->ret < 0) {
+ t->error_handler(t);
+ continue;
+ }
+ if (!(t->flags & POST_EOF_IS_ERROR))
+ continue;
+ t->ret = -E_POST_EOF;
+ t->error_handler(t);
+ }
+}
+
+int sched(struct sched *s)
+{
+
+ gettimeofday(&s->now, NULL);
+again:
+ FD_ZERO(&s->rfds);
+ FD_ZERO(&s->wfds);
+ s->timeout = s->default_timeout;
+ s->max_fileno = -1;
+ sched_preselect(s);
+ s->select_ret = para_select(s->max_fileno + 1, &s->rfds,
+ &s->wfds, &s->timeout);
+ if (s->select_ret < 0)
+ return s->select_ret;
+ gettimeofday(&s->now, NULL);
+ sched_post_select(s);
+ if (list_empty(&pre_select_list) && list_empty(&post_select_list))
+ return 0;
+ goto again;
+}
+
+void *register_task(struct task *t)
+{
+ PARA_INFO_LOG("registering task %p\n", t);
+ if (t->pre_select) {
+ PARA_DEBUG_LOG("pre_select: %p\n", &t->pre_select);
+ if (t->flags & PRE_ADD_TAIL)
+ list_add_tail(&t->pre_select_node, &pre_select_list);
+ else
+ list_add(&t->pre_select_node, &pre_select_list);
+ }
+ if (t->post_select) {
+ PARA_DEBUG_LOG("post_select: %p\n", &t->pre_select);
+ if (t->flags & POST_ADD_TAIL)
+ list_add_tail(&t->post_select_node, &post_select_list);
+ else
+ list_add(&t->post_select_node, &post_select_list);
+ }
+ return t;
+}
+
+void unregister_task(struct task *t)
+{
+ PARA_INFO_LOG("unregistering task %p\n", t);
+ if (t->pre_select)
+ list_del(&t->pre_select_node);
+ if (t->post_select)
+ list_del(&t->post_select_node);
+};
+
+void init_sched(void)
+{
+ INIT_LIST_HEAD(&pre_select_list);
+ INIT_LIST_HEAD(&post_select_list);
+};
+
+void sched_shutdown(void)
+{
+ struct task *t, *tmp;
+
+ list_for_each_entry_safe(t, tmp, &pre_select_list, pre_select_node)
+ unregister_task(t);
+ /* remove tasks which do not have a pre_select hook */
+ list_for_each_entry_safe(t, tmp, &post_select_list, post_select_node)
+ unregister_task(t);
+};
+
+
+//char *get_tast_list();
--- /dev/null
+struct sched {
+ struct timeval now, timeout;
+ int max_fileno;
+ fd_set rfds, wfds;
+ int select_ret;
+ struct timeval default_timeout;
+};
+
+struct task {
+ void *private_data;
+ unsigned flags;
+ int ret;
+ void (*pre_select)(struct sched *s, struct task *t);
+ void (*post_select)(struct sched *s, struct task *t);
+ void (*error_handler)(struct task *t);
+ struct list_head pre_select_node;
+ struct list_head post_select_node;
+ char status[MAXLINE];
+};
+
+enum task_flags {
+ PRE_ADD_TAIL = 1,
+ POST_ADD_TAIL = 2,
+ PRE_EOF_IS_ERROR = 4,
+ POST_EOF_IS_ERROR = 8,
+};
+
+void *register_task(struct task *t);
+void unregister_task(struct task *t);
+int sched(struct sched *s);
+void init_sched(void);
--- /dev/null
+#include "para.h"
+#include "string.h"
+#include "list.h"
+#include "sched.h"
+#include "fd.h"
+#include "error.h"
+#include "stdin.h"
+
+void stdin_pre_select(struct sched *s, struct task *t)
+{
+ struct stdin_task *sit = t->private_data;
+ if (sit->loaded < sit->bufsize)
+ para_fd_set(STDIN_FILENO, &s->rfds, &s->max_fileno);
+ t->ret = 1; /* success */
+}
+
+void stdin_post_select(struct sched *s, struct task *t)
+{
+ struct stdin_task *sit = t->private_data;
+ ssize_t ret;
+
+ t->ret = 1;
+ if (sit->loaded >= sit->bufsize)
+ return;
+ if (!FD_ISSET(STDIN_FILENO, &s->rfds))
+ return;
+ ret = read(STDIN_FILENO, sit->buf + sit->loaded, sit->bufsize - sit->loaded);
+ if (ret < 0)
+ t->ret = -E_STDIN_READ;
+ else {
+ sit->loaded += ret;
+ t->ret = ret;
+ }
+ sprintf(t->status,
+ "%p stdin reader: loaded = %d, ret = %d",
+ sit, sit->loaded, t->ret);
+}
+
+#if 0
+void stdin_init(struct stdin_task *sit)
+{
+ sit->task.private_data = sit;
+ sit->task.pre_select = stdin_pre_select;
+ sit->task.post_select = stdin_post_select;
+ sit->task.flags = POST_EOF_IS_ERROR;
+ sprintf(sit->task.status, "%p stdin reader: initialized", &sit->task);
+}
+#endif
--- /dev/null
+struct stdin_task {
+ char *buf;
+ size_t bufsize;
+ size_t loaded;
+ struct task task;
+};
+
+void stdin_pre_select(struct sched *s, struct task *t);
+void stdin_post_select(struct sched *s, struct task *t);
#include "para.h"
#include "string.h"
#include "write.cmdline.h"
+#include "list.h"
+#include "sched.h"
+#include "stdin.h"
#include "write.h"
#include "write_common.h"
#include "fd.h"
-
-#include <sys/time.h> /* gettimeofday */
-
#include "error.h"
-#define WAV_HEADER_LEN 44
+INIT_WRITE_ERRLISTS;
-static char *audiobuf;
-static struct timeval *start_time;
-struct gengetopt_args_info conf;
+struct check_wav_task {
+ char *buf;
+ size_t *loaded;
+ unsigned channels;
+ unsigned sample_rate;
+ struct task task;
+};
-INIT_WRITE_ERRLISTS;
+struct initial_delay_task {
+ struct timeval start_time;
+ struct task task;
+};
-void para_log(int ll, const char* fmt,...)
-{
- va_list argp;
+struct gengetopt_args_info conf;
+struct stdin_task sit;
+struct check_wav_task cwt;
+struct initial_delay_task idt;
+static struct writer_node_group *wng;
- if (ll < conf.loglevel_arg)
- return;
- va_start(argp, fmt);
- vfprintf(stderr, fmt, argp);
- va_end(argp);
-}
+#define WAV_HEADER_LEN 44
/**
- * read WAV_HEADER_LEN bytes from stdin to audio buffer
+ * test if audio buffer contains a valid wave header
*
- * \return -E_READ_HDR on errors and on eof before WAV_HEADER_LEN could be
- * read. A positive return value indicates success.
+ * \return If not, return 0, otherwise, store number of channels and sample rate
+ * in struct conf and return WAV_HEADER_LEN.
*/
-static int read_wav_header(void)
+static void check_wav_pre_select(struct sched *s, struct task *t)
{
- ssize_t ret, count = 0;
+ struct check_wav_task *cwt = t->private_data;
+ unsigned char *a;
- while (count < WAV_HEADER_LEN) {
- ret = read(STDIN_FILENO, audiobuf + count, WAV_HEADER_LEN - count);
- if (ret <= 0)
- return -E_READ_HDR;
- count += ret;
+ if (*cwt->loaded < WAV_HEADER_LEN) {
+ t->ret = 1;
+ return;
}
- return 1;
+ a = (unsigned char*)cwt->buf;
+ t->ret = -E_NO_WAV_HEADER;
+ if (a[0] != 'R' || a[1] != 'I' || a[2] != 'F' || a[3] != 'F')
+ return;
+ cwt->channels = (unsigned) a[22];
+ cwt->sample_rate = a[24] + (a[25] << 8) + (a[26] << 16) + (a[27] << 24);
+ *cwt->loaded -= WAV_HEADER_LEN;
+ memmove(cwt->buf, cwt->buf + WAV_HEADER_LEN, *cwt->loaded);
+ t->ret = 0;
+ PARA_INFO_LOG("channels: %d, sample_rate: %d\n", cwt->channels, cwt->sample_rate);
}
-/**
- * check if current time is later than start_time
- * \param diff pointer to write remaining time to
- *
- * If start_time was not given, or current time is later than given
- * start_time, return 0. Otherwise, return 1 and write the time
- * difference between current time and start_time to diff. diff may be
- * NULL.
- *
- */
-static int start_time_in_future(struct timeval *diff)
+static void initial_delay_pre_select(struct sched *s, struct task *t)
{
- struct timeval now;
-
- if (!conf.start_time_given)
- return 0;
- gettimeofday(&now, NULL);
- return tv_diff(start_time, &now, diff) > 0? 1 : 0;
-}
+ struct initial_delay_task *idt = t->private_data;
+ struct timeval diff;
-/**
- * sleep until time given at command line
- *
- * This is called if the initial buffer is filled. It returns
- * immediately if no start_time was given at the command line
- * or if the given start time is in the past.
- *
- */
-static void do_initial_delay(struct timeval *delay)
-{
- do
- para_select(1, NULL, NULL, delay);
- while (start_time_in_future(delay));
+ PARA_ERROR_LOG("task %p, ret: %d\n", t, t->ret);
+ t->ret = -E_NO_DELAY;
+ if (!idt->start_time.tv_sec && !idt->start_time.tv_usec)
+ return;
+ t->ret = 0; /* timeout */
+ if (tv_diff(&s->now, &idt->start_time, &diff) > 0)
+ return;
+ t->ret = 1;
+ if (tv_diff(&s->timeout , &diff, NULL) > 0)
+ s->timeout = diff;
}
-static int read_stdin(char *buf, size_t bytes_to_load, size_t *loaded)
-{
- ssize_t ret;
-
- while (*loaded < bytes_to_load) {
- ret = read(STDIN_FILENO, buf + *loaded, bytes_to_load - *loaded);
- if (ret <= 0) {
- if (ret < 0)
- ret = -E_READ_STDIN;
- return ret;
- }
- *loaded += ret;
- }
- return 1;
-}
-/**
- * play raw pcm data
- * \param loaded number of bytes already loaded
- *
- * If start_time was given, prebuffer data until buffer is full or
- * start_time is reached. In any case, do not start playing before
- * start_time.
- *
- * \return positive on success, negative on errors.
- */
-static int pcm_write(struct writer_node_group *wng, size_t loaded)
+void para_log(int ll, const char* fmt,...)
{
- size_t bufsize, prebuf_size, bytes_to_load;
- struct timeval delay;
- int ret, not_yet_started = 1;
+ va_list argp;
- ret = wng_open(wng);
- if (ret < 0)
- goto out;
- PARA_INFO_LOG("max chunk_bytes: %zd\n", wng->max_chunk_bytes);
- bufsize = (conf.bufsize_arg * 1024 / wng->max_chunk_bytes)
- * wng->max_chunk_bytes;
- audiobuf = para_realloc(audiobuf, bufsize);
- prebuf_size = conf.prebuffer_arg * bufsize / 100;
- bytes_to_load = PARA_MIN(prebuf_size, wng->max_chunk_bytes);
- ret = read_stdin(audiobuf, bytes_to_load, &loaded);
- if (ret <= 0 || loaded < bytes_to_load) {
- if (ret >= 0)
- ret = -E_PREMATURE_END;
- goto out;
- }
- if (not_yet_started && start_time && start_time_in_future(&delay))
- do_initial_delay(&delay);
- not_yet_started = 0;
-again:
- ret = wng_write(wng, audiobuf, &loaded);
- if (ret <= 0)
- goto out;
- ret = -E_WRITE_OVERRUN;
- if (loaded >= bufsize)
- goto out;
- bytes_to_load = PARA_MIN(wng->max_chunk_bytes, bufsize);
- ret = read_stdin(audiobuf, bytes_to_load, &loaded);
- if (ret < 0)
- goto out;
- if (!ret)
- wng->eof = 1;
- goto again;
-out:
- wng_close(wng);
- return ret;
+ if (ll < conf.loglevel_arg)
+ return;
+ va_start(argp, fmt);
+ vfprintf(stderr, fmt, argp);
+ va_end(argp);
}
static struct writer_node_group *check_args(void)
{
int i, ret = -E_WRITE_SYNTAX;
- static struct timeval tv;
struct writer_node_group *wng = NULL;
if (conf.list_writers_given) {
if (sscanf(conf.start_time_arg, "%lu:%lu",
&sec, &usec) != 2)
goto out;
- tv.tv_sec = sec;
- tv.tv_usec = usec;
- start_time = &tv;
+ idt.start_time.tv_sec = sec;
+ idt.start_time.tv_usec = usec;
}
if (!conf.writer_given) {
wng = setup_default_wng();
return NULL;
}
-/**
- * test if audio buffer contains a valid wave header
- *
- * \return If not, return 0, otherwise, store number of channels and sample rate
- * in struct conf and return WAV_HEADER_LEN.
- */
-static size_t check_wave(void)
+static void idt_error_handler(struct task *t)
{
- unsigned char *a = (unsigned char*)audiobuf;
- if (a[0] != 'R' || a[1] != 'I' || a[2] != 'F' || a[3] != 'F')
- return WAV_HEADER_LEN;
- conf.channels_arg = (unsigned) a[22];
- conf.sample_rate_arg = a[24] + (a[25] << 8) + (a[26] << 16) + (a[27] << 24);
- return 0;
+ PARA_ERROR_LOG("task %p, ret: %d\n", t, t->ret);
+ int ret;
+ unregister_task(t);
+ wng->buf = sit.buf;
+ wng->loaded = &sit.loaded;
+ ret = wng_open(wng);
+ if (ret < 0) {
+ PARA_ERROR_LOG("%s\n", PARA_STRERROR(-ret));
+ exit(EXIT_FAILURE);
+ }
+}
+
+static void cwt_error_handler(struct task *t)
+{
+ PARA_ERROR_LOG("task %p, ret: %d\n", t, t->ret);
+ if (t->ret < 0) {
+ PARA_ERROR_LOG("%s\n", PARA_STRERROR(-t->ret));
+ if (t->ret != -E_NO_WAV_HEADER && t->ret != -E_PRE_EOF)
+ exit(EXIT_FAILURE);
+ if (t->ret == -E_PRE_EOF) {
+ conf.channels_arg = cwt.channels;
+ conf.sample_rate_arg = cwt.sample_rate;
+ }
+ }
+ unregister_task(t);
+ idt.task.pre_select = initial_delay_pre_select;
+ idt.task.private_data = &idt;
+ idt.task.error_handler = idt_error_handler;
+ idt.task.flags = PRE_EOF_IS_ERROR;
+ register_task(&idt.task);
+}
+
+static void stdin_error_handler(struct task *t)
+{
+ unregister_task(t);
+ PARA_INFO_LOG("task %p, ret: %d\n", t, t->ret);
+ if (t->ret < 0)
+ PARA_ERROR_LOG("%s\n", PARA_STRERROR(-t->ret));
+ wng->eof = 1;
}
int main(int argc, char *argv[])
{
int ret = -E_WRITE_SYNTAX;
- struct writer_node_group *wng = NULL;
+ struct sched s;
cmdline_parser(argc, argv, &conf);
wng = check_args();
if (!wng)
goto out;
init_supported_writers();
- audiobuf = para_malloc(WAV_HEADER_LEN);
- ret = read_wav_header();
- if (ret < 0)
- goto out;
- ret = pcm_write(wng, check_wave());
+ init_sched();
+
+ sit.bufsize = 16 * 1024,
+ sit.buf = para_malloc(16 * 1024),
+ sit.loaded = 0,
+ sit.task.pre_select = stdin_pre_select;
+ sit.task.post_select = stdin_post_select;
+ sit.task.error_handler = stdin_error_handler;
+ sit.task.flags = POST_EOF_IS_ERROR;
+ sit.task.private_data = &sit;
+ register_task(&sit.task);
+
+ cwt.task.pre_select = check_wav_pre_select;
+ cwt.task.private_data = &cwt;
+ cwt.task.error_handler = cwt_error_handler;
+ cwt.buf = sit.buf;
+ cwt.loaded = &sit.loaded;
+ cwt.task.flags = PRE_EOF_IS_ERROR;
+ register_task(&cwt.task);
+
+ s.default_timeout.tv_sec = 1;
+ s.default_timeout.tv_usec = 0;
+ ret = sched(&s);
+
out:
- wng_destroy(wng);
- free(audiobuf);
- if (ret < 0)
+ if (ret < 0) {
PARA_ERROR_LOG("%s\n", PARA_STRERROR(-ret));
+ ret = EXIT_FAILURE;
+ } else
+ ret = EXIT_SUCCESS;
return ret;
}
void *private_data;
/** send that many bytes in one go */
int chunk_bytes;
+ struct task task;
+ struct writer_node_group *wng;
};
/** describes one supported writer */
*
*/
int (*write)(char *data, size_t nbytes, struct writer_node *);
+void (*pre_select)(struct sched *s, struct task *t);
+void (*post_select)(struct sched *s, struct task *t);
/**
* close one instance of the writer
*
size_t max_chunk_bytes;
/** non-zero if end of file was encountered */
int eof;
+char *buf;
+size_t *loaded;
+struct task task;
};
/** loop over each writer node in a writer group */
#include "para.h"
#include "string.h"
+#include "list.h"
+#include "sched.h"
#include "write.h"
#include "error.h"
return ret;
}
+static void wng_post_select(struct sched *s, struct task *t)
+{
+ struct writer_node_group *g = t->private_data;
+ int i;
+ size_t min_written = 0;
+
+ FOR_EACH_WRITER_NODE(i, g) {
+ struct writer_node *wn = &g->writer_nodes[i];
+ t->ret = wn->task.ret;
+ if (t->ret < 0)
+ return;
+ if (!i)
+ min_written = t->ret;
+ else
+ min_written = PARA_MIN(min_written, t->ret);
+ }
+ *g->loaded -= min_written;
+ if (!*g->loaded && g->eof)
+ t->ret = 0;
+ else
+ t->ret = 1;
+ if (*g->loaded && min_written)
+ memmove(g->buf, g->buf + min_written, *g->loaded);
+}
+
int wng_open(struct writer_node_group *g)
{
int i, ret = 1;
goto out;
wn->chunk_bytes = ret;
g->max_chunk_bytes = PARA_MAX(g->max_chunk_bytes, ret);
+ wn->wng = g;
+ PARA_DEBUG_LOG("pre_select: %p\n", &wn->writer->pre_select);
+ PARA_DEBUG_LOG("post_select: %p\n", &wn->writer->post_select);
+ wn->task.pre_select = wn->writer->pre_select;
+ wn->task.post_select = wn->writer->post_select;
+ wn->task.private_data = wn;
+ register_task(&wn->task);
}
+ register_task(&g->task);
out:
return ret;
}
+void wng_destroy(struct writer_node_group *g)
+{
+ if (!g)
+ return;
+ free(g->written);
+ free(g->writer_nodes);
+ free(g);
+}
+
void wng_close(struct writer_node_group *g)
{
int i;
FOR_EACH_WRITER_NODE(i, g) {
struct writer_node *wn = &g->writer_nodes[i];
+ unregister_task(&wn->task);
wn->writer->close(wn);
}
}
+static void wng_error_handler(struct task *t)
+{
+ struct writer_node_group *g = t->private_data;
+
+ PARA_INFO_LOG("%p: ret = %d\n", t, t->ret);
+ unregister_task(t);
+ wng_close(g);
+ wng_destroy(g);
+}
+
struct writer_node_group *wng_new(unsigned num_writers)
{
struct writer_node_group *g = para_calloc(sizeof(struct writer_node_group));
g->writer_nodes = para_calloc(num_writers
* sizeof(struct writer_node));
g->written = para_calloc(num_writers * sizeof(size_t));
+ g->task.private_data = g;
+ g->task.post_select = wng_post_select;
+ g->task.error_handler = wng_error_handler;
+ g->task.flags = POST_ADD_TAIL | POST_EOF_IS_ERROR;
return g;
}
-void wng_destroy(struct writer_node_group *g)
-{
- if (!g)
- return;
- free(g->written);
- free(g->writer_nodes);
- free(g);
-}
-
void init_supported_writers(void)
{
int i;