#include "audiod.cmdline.h"
#include "list.h"
#include "close_on_fork.h"
+#include "sched.h"
#include "recv.h"
#include "filter.h"
#include "grab_client.cmdline.h"
recv_cmdline_objs="recv.cmdline http_recv.cmdline dccp_recv.cmdline"
recv_errlist_objs="http_recv recv_common recv time string net dccp_recv
- dccp fd"
+ dccp fd sched stdout"
recv_ldflags=""
filter_cmdline_objs="filter.cmdline compress_filter.cmdline"
#include "para.h"
#include "error.h"
#include "dccp.h"
+#include "list.h"
+#include "sched.h"
#include "recv.h"
#include "string.h"
#include "net.h"
+#include "fd.h"
#include "dccp_recv.cmdline.h"
return NULL;
}
-static int dccp_recv_pre_select(struct receiver_node *rn, fd_set *rfds,
- __a_unused fd_set *wfds, __a_unused struct timeval *timeout)
+static void dccp_recv_pre_select(struct sched *s, struct task *t)
{
- struct private_dccp_recv_data *pdd = rn->private_data;
+ struct private_dccp_recv_data *pdd = t->private_data;
if (!pdd)
- return -1;
- FD_SET(pdd->fd, rfds);
- return pdd->fd;
+ return ;
+ para_fd_set(pdd->fd, &s->rfds, &s->max_fileno);
}
-static int dccp_recv_post_select(struct receiver_node *rn, int select_ret,
- fd_set *rfds, __a_unused fd_set *wfds)
+static void dccp_recv_post_select(struct sched *s, struct task *t)
{
- int ret;
+ struct receiver_node *rn = t->private_data;
struct private_dccp_recv_data *pdd = rn->private_data;
- if (!select_ret || !pdd || !FD_ISSET(pdd->fd, rfds))
- return 1; /* nothing to do */
+ t->ret = 1;
+ if (!s->select_ret || !pdd || !FD_ISSET(pdd->fd, &s->rfds))
+ return; /* nothing to do */
+ t->ret = -E_DCCP_OVERRUN;
if (rn->loaded >= DCCP_BUFSIZE)
- return -E_DCCP_OVERRUN;
- ret = recv_bin_buffer(pdd->fd, rn->buf + rn->loaded,
+ return;
+ t->ret = recv_bin_buffer(pdd->fd, rn->buf + rn->loaded,
DCCP_BUFSIZE - rn->loaded);
- if (ret <= 0) {
- PARA_INFO_LOG("%s\n", ret? PARA_STRERROR(-ret) : "eof");
- return ret;
+ if (t->ret <= 0) {
+ rn->eof = 1;
+ if (!t->ret)
+ t->ret = -E_DCCP_RECV_EOF;
+ return;
}
- rn->loaded += ret;
- return 1;
+ rn->loaded += t->ret;
}
/**
SS_AUDIOD,
SS_EXEC,
SS_STDIN,
+ SS_STDOUT,
SS_SIGNAL,
SS_STRING,
SS_STAT,
PARA_ERROR(STDIN_READ, "failed to read from stdin"), \
PARA_ERROR(STDIN_EOF, "end of file"), \
+
+#define STDOUT_ERRORS \
+ PARA_ERROR(STDOUT_WRITE, "failed to write to stdout"), \
+ PARA_ERROR(STDOUT_EOF, "end of file"), \
+
+
#define NET_ERRORS \
PARA_ERROR(SEND, "send error"), \
PARA_ERROR(RECV, "receive error"), \
PARA_ERROR(TOO_MANY_BAD_CHUNKS, "too many consecutive bad chunks"), \
PARA_ERROR(INVALID_HEADER, "invalid header packet"), \
PARA_ERROR(OVERRUN, "outout buffer overrun"), \
+ PARA_ERROR(ORTP_RECV_EOF, "ortp_recv: end of file"), \
#define HTTP_RECV_ERRORS \
PARA_ERROR(SEND_HTTP_REQUEST, "failed to send http request"), \
PARA_ERROR(MISSING_OK, "did not receive OK message from peer"), \
- PARA_ERROR(HTTP_RECV_BUF, "did not receive buffer")
+ PARA_ERROR(HTTP_RECV_BUF, "did not receive buffer"), \
+ PARA_ERROR(HTTP_RECV_EOF, "http_recv: end of file"), \
#define RECV_ERRORS \
PARA_ERROR(DCCP_SOCKET, "can not create dccp socket"), \
PARA_ERROR(DCCP_PACKET_SIZE, "failed to set dccp packet size"), \
PARA_ERROR(DCCP_SERVICE, "could not get service code"), \
+ PARA_ERROR(DCCP_RECV_EOF, "dccp_recv: end of file"), \
#define DCCP_RECV_ERRORS \
SS_ENUM(GUI);
SS_ENUM(SCHED);
SS_ENUM(STDIN);
+SS_ENUM(STDOUT);
SS_ENUM(WAV);
SS_ENUM(COMPRESS);
SS_ENUM(TIME);
#include "para.h"
#include "http.h"
+#include "list.h"
+#include "sched.h"
#include "recv.h"
#include "http_recv.cmdline.h"
#include "error.h"
#include "net.h"
#include "string.h"
+#include "fd.h"
/** the output buffer size of the http receiver */
#define BUFSIZE (32 * 1024)
return ret;
}
-static int http_pre_select(struct receiver_node *rn, fd_set *rfds, fd_set *wfds,
- __a_unused struct timeval *timeout)
+static void http_recv_pre_select(struct sched *s, struct task *t)
{
+ struct receiver_node *rn = t->private_data;
struct private_http_recv_data *phd = rn->private_data;
+ t->ret = 1;
if (phd->status == HTTP_CONNECTED)
- FD_SET(phd->fd, wfds);
+ para_fd_set(phd->fd, &s->wfds, &s->max_fileno);
else
- FD_SET(phd->fd, rfds);
- return phd->fd;
+ para_fd_set(phd->fd, &s->rfds, &s->max_fileno);
}
-static int http_post_select(struct receiver_node *rn, int select_ret,
- fd_set *rfds, fd_set *wfds)
+
+static void http_recv_post_select(struct sched *s, struct task *t)
{
- int ret;
+ struct receiver_node *rn = t->private_data;
struct private_http_recv_data *phd = rn->private_data;
- if (!select_ret) /* we're not interested in timeouts */
- return 1;
+ t->ret = 1;
+ if (!s->select_ret) /* we're not interested in timeouts */
+ return;
if (phd->status == HTTP_CONNECTED) {
char *rq;
- if (!FD_ISSET(phd->fd, wfds))
- return 1; /* nothing to do */
+ if (!FD_ISSET(phd->fd, &s->wfds))
+ return; /* nothing to do */
rq = make_request_msg();
PARA_NOTICE_LOG("%s", "sending http request\n");
- ret = send_va_buffer(phd->fd, "%s", rq);
+ t->ret = send_va_buffer(phd->fd, "%s", rq);
free(rq);
- if (ret < 0)
- return E_SEND_HTTP_REQUEST;
- phd->status = HTTP_SENT_GET_REQUEST;
- return 1;
+ if (t->ret > 0)
+ phd->status = HTTP_SENT_GET_REQUEST;
+ return;
}
- if (!FD_ISSET(phd->fd, rfds))
- return 1; /* nothing to do */
+ if (!FD_ISSET(phd->fd, &s->rfds))
+ return; /* nothing to do */
if (phd->status == HTTP_SENT_GET_REQUEST) {
- ret = recv_pattern(phd->fd, HTTP_OK_MSG, MAXLINE);
- if (ret < 0)
- return -E_MISSING_OK;
+ t->ret = recv_pattern(phd->fd, HTTP_OK_MSG, MAXLINE);
+ if (t->ret < 0)
+ return;
PARA_NOTICE_LOG("%s", "received ok msg, streaming\n");
phd->status = HTTP_STREAMING;
- return 1;
+ return;
}
+ t->ret = -E_OVERRUN;
/* already streaming */
- if (rn->loaded >= BUFSIZE) {
- PARA_ERROR_LOG("%s", "buffer overrun\n");
- return -E_OVERRUN;
+ if (rn->loaded >= BUFSIZE)
+ return;
+ t->ret = recv_bin_buffer(phd->fd, rn->buf + rn->loaded,
+ BUFSIZE - rn->loaded);
+ if (t->ret <= 0) {
+ rn->eof = 1;
+ if (!t->ret)
+ t->ret = -E_HTTP_RECV_EOF;
+ return;
}
- ret = recv_bin_buffer(phd->fd, rn->buf + rn->loaded, BUFSIZE - rn->loaded);
- if (ret <= 0) {
- PARA_NOTICE_LOG("recv returned %d/%zd\n", ret, BUFSIZE - rn->loaded);
- return ret < 0? -E_HTTP_RECV_BUF : 0;
- }
- rn->loaded += ret;
- return 1;
+ rn->loaded += t->ret;
}
static void http_recv_close(struct receiver_node *rn)
{
r->open = http_recv_open;
r->close = http_recv_close;
- r->pre_select = http_pre_select;
- r->post_select = http_post_select;
+ r->pre_select = http_recv_pre_select;
+ r->post_select = http_recv_post_select;
r->shutdown = http_shutdown;
r->parse_config = http_recv_parse_config;
}
#include "para.h"
#include "ortp.h"
+#include "list.h"
+#include "sched.h"
#include "recv.h"
#include "ortp_recv.cmdline.h"
uint32_t chunk_ts;
};
-static int ortp_recv_pre_select(struct receiver_node *rn,
- __a_unused fd_set *rfds, __a_unused fd_set *wfds,
- struct timeval *timeout)
+static void ortp_recv_pre_select(struct sched *s, struct task *t)
{
+ struct receiver_node *rn = t->private_data;
struct private_ortp_recv_data *pord = rn->private_data;
- struct timeval now, tmp;
+ struct timeval tmp;
- gettimeofday(&now, NULL);
- if (tv_diff(&now, &pord->next_chunk, &tmp) >= 0) {
+ if (tv_diff(&s->now, &pord->next_chunk, &tmp) >= 0) {
tmp.tv_sec = 0;
tmp.tv_usec = 1000;
}
- if (tv_diff(timeout, &tmp, NULL) > 0)
- *timeout = tmp;
- return -1; /* we did not modify the fd sets */
+ if (tv_diff(&s->timeout, &tmp, NULL) > 0)
+ s->timeout = tmp;
+ t->ret = 1;
}
static void compute_next_chunk(unsigned chunk_time,
pord->next_chunk.tv_usec);
}
-static int ortp_recv_post_select(struct receiver_node *rn,
- __a_unused int select_ret, __a_unused fd_set *rfds,
- __a_unused fd_set *wfds)
+static void ortp_recv_post_select(struct sched *s, struct task *t)
{
+ struct receiver_node *rn = t->private_data;
struct private_ortp_recv_data *pord = rn->private_data;
mblk_t *mp;
- int ret, packet_type, stream_type;
+ int packet_type, stream_type;
char tmpbuf[CHUNK_SIZE + 3];
- struct timeval now;
unsigned chunk_time;
- gettimeofday(&now, NULL);
// PARA_DEBUG_LOG("rn: %p, pord: %p, session: %p\n", rn, pord, pord->session);
- if (pord->start.tv_sec) {
- struct timeval diff;
- if (tv_diff(&now, &pord->next_chunk, &diff) < 0)
- return 1;
- }
+ t->ret = 1;
+ if (pord->start.tv_sec)
+ if (tv_diff(&s->now, &pord->next_chunk, NULL) < 0)
+ return;
mp = rtp_session_recvm_with_ts(pord->session, pord->timestamp);
if (!mp) {
struct timeval min_delay = {0, 100};
// PARA_INFO_LOG("nope, chunk_ts = %d, loaded: %d, bad: %d\n",
// pord->timestamp, rn->loaded, pord->c_bad);
pord->c_bad++;
+ t->ret = -E_TOO_MANY_BAD_CHUNKS;
if ((pord->c_bad > 5000 && pord->start.tv_sec) || pord->c_bad > 10000)
- return -E_TOO_MANY_BAD_CHUNKS;
- tv_add(&now, &min_delay, &pord->next_chunk);
- return 1;
+ return;
+ t->ret = 1;
+ tv_add(&s->now, &min_delay, &pord->next_chunk);
+ return;
}
/* okay, we have a chunk of data */
if (!pord->start.tv_sec)
- pord->start = now;
- ret = msg_to_buf(mp, tmpbuf, CHUNK_SIZE);
-// PARA_DEBUG_LOG("have it ts = %d, chunk_ts = %d, loaded: %d, "
-// "bad: %d, len: %d\n", pord->timestamp, pord->chunk_ts,
-// rn->loaded, pord->c_bad, ret);
- if (ret < ORTP_AUDIO_HEADER_LEN) {
- if (ret < 0)
- ret = -E_MSG_TO_BUF;
+ pord->start = s->now;
+ t->ret = msg_to_buf(mp, tmpbuf, CHUNK_SIZE);
+ if (t->ret < ORTP_AUDIO_HEADER_LEN) {
+ rn->eof = 1;
+ if (t->ret < 0)
+ t->ret = -E_MSG_TO_BUF;
else
- ret = 0;
+ t->ret = -E_ORTP_RECV_EOF;
goto err_out;
}
packet_type = READ_PACKET_TYPE(tmpbuf);
switch (packet_type) {
unsigned header_len, payload_len;
case ORTP_EOF:
- ret = 0;
+ rn->eof = 1;
+ t->ret = -E_ORTP_RECV_EOF;
goto err_out;
case ORTP_BOF:
- PARA_INFO_LOG("bof (%d)\n", ret);
+ PARA_INFO_LOG("bof (%d)\n", t->ret);
pord->have_header = 1;
/* fall through */
case ORTP_DATA:
if (!pord->have_header && stream_type)
/* can't use the data, wait for header */
goto success;
- if (ret + rn->loaded >= CHUNK_SIZE + ORTP_AUDIO_HEADER_LEN) {
- ret = -E_OVERRUN;
+ if (t->ret + rn->loaded >= CHUNK_SIZE + ORTP_AUDIO_HEADER_LEN) {
+ t->ret = -E_OVERRUN;
goto err_out;
}
- if (ret > ORTP_AUDIO_HEADER_LEN) {
+ if (t->ret > ORTP_AUDIO_HEADER_LEN) {
memcpy(rn->buf + rn->loaded, tmpbuf + ORTP_AUDIO_HEADER_LEN,
- ret - ORTP_AUDIO_HEADER_LEN);
- rn->loaded += ret - ORTP_AUDIO_HEADER_LEN;
+ t->ret - ORTP_AUDIO_HEADER_LEN);
+ rn->loaded += t->ret - ORTP_AUDIO_HEADER_LEN;
}
goto success;
case ORTP_HEADER:
header_len = READ_HEADER_LEN(tmpbuf);
PARA_DEBUG_LOG("header packet (%d bytes), header len: %d\n",
- ret, header_len);
+ t->ret, header_len);
if (!pord->have_header) {
pord->have_header = 1;
memcpy(rn->buf, tmpbuf + ORTP_AUDIO_HEADER_LEN,
- ret - ORTP_AUDIO_HEADER_LEN);
- rn->loaded = ret - ORTP_AUDIO_HEADER_LEN;
+ t->ret - ORTP_AUDIO_HEADER_LEN);
+ rn->loaded = t->ret - ORTP_AUDIO_HEADER_LEN;
goto success;
}
- if (header_len + ORTP_AUDIO_HEADER_LEN > ret) {
- ret = -E_INVALID_HEADER;
+ if (header_len + ORTP_AUDIO_HEADER_LEN > t->ret) {
+ t->ret = -E_INVALID_HEADER;
goto err_out;
}
- payload_len = ret - ORTP_AUDIO_HEADER_LEN - header_len;
+ payload_len = t->ret - ORTP_AUDIO_HEADER_LEN - header_len;
// PARA_INFO_LOG("len: %d header_len: %d, payload_len: %d, loaded: %d\n", ret,
// header_len, payload_len, rn->loaded);
if (rn->loaded + payload_len > CHUNK_SIZE) {
- ret = -E_OVERRUN;
+ t->ret = -E_OVERRUN;
goto err_out;
}
if (payload_len)
memcpy(rn->buf + rn->loaded, tmpbuf
- + (ret - payload_len), payload_len);
+ + (t->ret - payload_len), payload_len);
rn->loaded += payload_len;
goto success;
}
success:
+ t->ret = 1;
freemsg(mp);
if (pord->c_bad) {
pord->c_bad = 0;
- pord->next_chunk = now;
+ pord->next_chunk = s->now;
}
compute_next_chunk(chunk_time, pord);
- return 1;
+ return;
err_out:
freemsg(mp);
- return ret;
+ return;
}
static void ortp_shutdown(void)
*/
#include "para.h"
+#include "list.h"
+#include "sched.h"
#include "recv.h"
#include "recv.cmdline.h"
#include "fd.h"
#include "error.h"
+#include "stdout.h"
struct gengetopt_args_info conf;
return check_receiver_arg(conf.receiver_arg, receiver_num);
}
+#if 0
int main(int argc, char *argv[])
{
int ret, eof = 0, max, r_opened = 0, receiver_num;
PARA_ERROR_LOG("%s\n", PARA_STRERROR(-ret));
return ret;
}
+#endif
+
+void rn_event_handler(struct task *t)
+{
+ PARA_ERROR_LOG("%s\n", PARA_STRERROR(-t->ret));
+ unregister_task(t);
+}
+
+void stdout_event_handler(struct task *t)
+{
+ PARA_ERROR_LOG("%s\n", PARA_STRERROR(-t->ret));
+ unregister_task(t);
+}
+
+int main(int argc, char *argv[])
+{
+ int ret, eof = 0, max, r_opened = 0, receiver_num;
+ struct timeval timeout;
+ struct receiver *r = NULL;
+ fd_set rfds, wfds;
+ struct receiver_node rn;
+ struct stdout_task sot;
+ struct sched s;
+
+ init_sched();
+ s.default_timeout.tv_sec = 1;
+ s.default_timeout.tv_usec = 0;
+
+ memset(&rn, 0, sizeof(struct receiver_node));
+ for (ret = 0; receivers[ret].name; ret++)
+ receivers[ret].init(&receivers[ret]);
+ ret = -E_RECV_SYNTAX;
+ rn.conf = parse_config(argc, argv, &receiver_num);
+ if (!rn.conf) {
+ PARA_EMERG_LOG("%s", "parse failed\n");
+ goto out;
+ }
+ r = &receivers[receiver_num];
+ rn.receiver = r;
+ ret = r->open(&rn);
+ if (ret < 0)
+ goto out;
+ r_opened = 1;
+
+ sot.task.private_data = &sot;
+ sot.task.pre_select = stdout_pre_select;
+ sot.task.post_select = stdout_post_select;
+ sot.task.event_handler = stdout_event_handler;
+ sot.task.flags = 0;
+ sprintf(sot.task.status, "stdout writer");
+ sot.buf = rn.buf;
+ sot.loaded = &rn.loaded;
+ sot.eof = &rn.eof;
+ register_task(&sot.task);
+
+ rn.task.private_data = &rn;
+ rn.task.pre_select = r->pre_select;
+ rn.task.post_select = r->post_select;
+ rn.task.event_handler = rn_event_handler;
+ rn.task.flags = 0;
+ sprintf(rn.task.status, "receiver node");
+ register_task(&rn.task);
+
+
+ ret = sched(&s);
+out:
+ if (r_opened)
+ r->close(&rn);
+ if (r)
+ r->shutdown();
+ if (ret < 0)
+ PARA_ERROR_LOG("%s\n", PARA_STRERROR(-ret));
+ return ret;
+}
int eof;
/** pointer to the configuration data for this instance */
void *conf;
+ /** the task associated with this instance */
+ struct task task;
};
/**
*
* The pre_select function gets called from the driving application before
* entering its select loop. The receiver may use this hook to add any file
- * descriptors to \a rfds and \a wfds in order to check the result later in the
- * post_select hook.
+ * descriptors to the sets of file descriptors given by \a s.
*
- * \a timeout is a value-result parameter, initially containing the timeout for
- * select() which was set by the application or by another receiver node. If
- * the receiver wants its pre_select function to be called at some earlier time
- * than the time determined by \a timeout, it may set \a timeout to an
- * appropriate smaller value. However, it must never increase this timeout.
*
- * This function must return the highest-numbered descriptor it wants to being
- * checked, or -1 if no file descriptors should be checked for this run.
- *
- * \sa select(2), receiver_node:private_data, time.c
+ * \sa select(2), time.c struct task, struct sched
*/
- int (*pre_select)(struct receiver_node *rn, fd_set *rfds,
- fd_set *wfds, struct timeval *timeout);
+ void (*pre_select)(struct sched *s, struct task *t);
/**
*
*
* evaluate the result from select()
*
- * If the call to select() was succesful, this hook gets called. It should
- * check all file descriptors which were added to any of the the fd sets during
- * the previous call to pre_select. According to the result, it may then use
- * any non-blocking I/O to establish a connection or to receive the audio data.
+ * This hook gets called after the call to select(). It should check all file
+ * descriptors which were added to any of the the fd sets during the previous
+ * call to pre_select. According to the result, it may then use any
+ * non-blocking I/O to establish a connection or to receive the audio data.
*
- * A negative return value is interpreted as an error.
*
* \sa select(2), struct receiver
*/
- int (*post_select)(struct receiver_node *rn, int select_ret,
- fd_set *rfds, fd_set *wfds);
+ void (*post_select)(struct sched *s, struct task *t);
};
#include "para.h"
+#include "list.h"
+#include "sched.h"
#include "recv.h"
#include "string.h"
--- /dev/null
+#include "para.h"
+#include "string.h"
+#include "list.h"
+#include "sched.h"
+#include "fd.h"
+#include "error.h"
+#include "stdout.h"
+
+void stdout_pre_select(struct sched *s, struct task *t)
+{
+ struct stdout_task *sot = t->private_data;
+
+ t->ret = 1;
+ sot->check_fd = 0;
+ if (!*sot->loaded)
+ return;
+ sot->check_fd = 1;
+ para_fd_set(STDOUT_FILENO, &s->wfds, &s->max_fileno);
+}
+
+void stdout_post_select(struct sched *s, struct task *t)
+{
+ struct stdout_task *sot = t->private_data;
+ ssize_t ret;
+
+ t->ret = 1;
+ if (!sot->check_fd) {
+ if (*sot->eof)
+ t->ret = -E_STDOUT_EOF;
+ return;
+ }
+ if (!FD_ISSET(STDOUT_FILENO, &s->wfds))
+ return;
+ t->ret = -E_STDOUT_WRITE;
+ ret = write(STDOUT_FILENO, sot->buf, *sot->loaded);
+ if (ret <= 0)
+ return;
+ *sot->loaded -= ret;
+ t->ret = 1;
+}
--- /dev/null
+struct stdout_task {
+ char *buf;
+ size_t *bufsize;
+ size_t *loaded;
+ int *eof;
+ struct task task;
+ int check_fd;
+};
+
+void stdout_pre_select(struct sched *s, struct task *t);
+void stdout_post_select(struct sched *s, struct task *t);
if (t->ret != -E_STDIN_EOF)
PARA_ERROR_LOG("%s\n", PARA_STRERROR(-t->ret));
else
- PARA_ERROR_LOG("%s\n", PARA_STRERROR(-t->ret));
+ PARA_INFO_LOG("%s\n", PARA_STRERROR(-t->ret));
}
int main(int argc, char *argv[])