para_fd_set(purd->fd, &s->rfds, &s->max_fileno);
}
-static int enough_space(size_t packet_size, size_t loaded)
+static int enough_space(size_t nbytes, size_t loaded)
{
- return packet_size + loaded < UDP_RECV_CHUNK_SIZE + UDP_AUDIO_HEADER_LEN;
+ return nbytes + loaded < UDP_RECV_CHUNK_SIZE;
}
static void udp_recv_post_select(__a_unused struct sched *s, struct task *t)
return;
ret = recv_bin_buffer(purd->fd, tmpbuf, UDP_RECV_CHUNK_SIZE);
if (ret < 0) {
- t->error = 0;
- if (errno != EINTR && errno != EAGAIN)
- t->error = -ERRNO_TO_PARA_ERROR(errno);
+ if (is_errno(ret, EINTR) || is_errno(ret, EAGAIN))
+ goto success;
+ t->error = ret;
return;
}
+ t->error = -E_RECV_EOF;
+ if (!ret)
+ return;
packet_size = ret;
- if (packet_size < UDP_AUDIO_HEADER_LEN) {
- t->error = -E_UDP_SHORT_PACKET; /* FIXME: We shouldn't fail here */
+ t->error = -E_UDP_SHORT_PACKET;
+ if (packet_size < UDP_AUDIO_HEADER_LEN)
return;
- }
- if (udp_check_magic(tmpbuf, packet_size) < 0) {
- t->error = -E_UDP_NO_MAGIC;
+ t->error = -E_UDP_NO_MAGIC;
+ if (udp_check_magic(tmpbuf, packet_size) < 0)
return;
- }
stream_type = udp_read_stream_type(tmpbuf);
packet_type = udp_read_packet_type(tmpbuf);
// PARA_INFO_LOG("packet type: %d, stream type: %d,"
// (unsigned) stream_type, rn->loaded);
switch (packet_type) {
uint16_t header_len, payload_len;
+
case UDP_EOF_PACKET:
t->error = -E_RECV_EOF;
return;
/* fall through */
case UDP_DATA_PACKET:
if (!purd->have_header && stream_type == UDP_HEADER_STREAM)
- /* can't use the data, wait for header */
+ /* can't use the data, wait for header */
goto success;
- if (!enough_space(packet_size, rn->loaded)) {
- t->error = -E_OVERRUN;
+ payload_len = packet_size - UDP_AUDIO_HEADER_LEN;
+ if (!payload_len)
+ goto success;
+ t->error = -E_OVERRUN;
+ if (!enough_space(payload_len, rn->loaded))
return;
- }
- if (packet_size > UDP_AUDIO_HEADER_LEN) {
- memcpy(rn->buf + rn->loaded, tmpbuf + UDP_AUDIO_HEADER_LEN,
- packet_size - UDP_AUDIO_HEADER_LEN);
- rn->loaded += packet_size - UDP_AUDIO_HEADER_LEN;
- }
+ memcpy(rn->buf + rn->loaded, tmpbuf + UDP_AUDIO_HEADER_LEN,
+ payload_len);
+ rn->loaded += payload_len;
goto success;
case UDP_HEADER_PACKET:
- if (!enough_space(packet_size, rn->loaded)) {
- t->error = -E_OVERRUN;
- return;
- }
header_len = udp_read_header_len(tmpbuf);
if (header_len + UDP_AUDIO_HEADER_LEN > packet_size) {
t->error = -E_INVALID_HEADER;
// PARA_DEBUG_LOG("header packet (%zu bytes), header len: %d\n",
// packet_size, header_len);
if (!purd->have_header) {
+ t->error = -E_OVERRUN;
+ if (!enough_space(header_len, rn->loaded))
+ return;
purd->have_header = 1;
rn->loaded = header_len;
memcpy(rn->buf, tmpbuf + UDP_AUDIO_HEADER_LEN,
rn->loaded);
-// sleep(1);
goto success;
}
payload_len = packet_size - UDP_AUDIO_HEADER_LEN - header_len;
- if (rn->loaded + payload_len > UDP_RECV_CHUNK_SIZE) {
- t->error = -E_OVERRUN;
+ if (!payload_len)
+ goto success;
+ t->error = -E_OVERRUN;
+ if (!enough_space(payload_len, rn->loaded))
return;
- }
- if (payload_len)
- memcpy(rn->buf + rn->loaded, tmpbuf
- + (packet_size - payload_len), payload_len);
+ memcpy(rn->buf + rn->loaded, tmpbuf + UDP_AUDIO_HEADER_LEN +
+ header_len, payload_len);
rn->loaded += payload_len;
}
success:
if (ut->fd < 0)
continue;
ret = write_all(ut->fd, buf, &written);
- if (ret < 0)
+ if (ret < 0) /* TODO: Use chunk queueing */
return udp_delete_target(ut, "send error");
if (written != len)
PARA_WARNING_LOG("short write %zu/%zu\n", written, len);
}
}
-static void udp_init_session(struct udp_target *ut)
+static int udp_init_session(struct udp_target *ut)
{
+ int ret;
+
+ if (ut->fd >= 0) /* nothing to do */
+ return 0;
PARA_NOTICE_LOG("sending to udp %s:%d\n", TARGET_ADDR(ut), ut->port);
- ut->fd = create_udp_send_socket(TARGET_ADDR(ut), ut->port, 10);
+ /* TODO: Make ttl configurable. */
+ ret = create_udp_send_socket(TARGET_ADDR(ut), ut->port, 10);
+ if (ret < 0)
+ return ret;
+ ut->fd = ret;
+ return mark_fd_nonblocking(ut->fd);
}
static void udp_shutdown_targets(void)
struct udp_target *ut, *tmp;
udp_write_packet_type(buf, UDP_EOF_PACKET);
+ udp_write_magic(buf);
list_for_each_entry_safe(ut, tmp, &targets, node) {
if (ut->fd < 0)
continue;
- PARA_INFO_LOG("sending eof to udp target %s:%d\n",
- TARGET_ADDR(ut), ut->port);
write(ut->fd, buf, UDP_AUDIO_HEADER_LEN);
+ close(ut->fd);
+ ut->fd = -1;
}
}
{
struct udp_target *ut, *tmp;
size_t sendbuf_len;
- int packet_type = UDP_DATA_PACKET;
+ uint8_t packet_type = UDP_DATA_PACKET;
+ int ret;
char *sendbuf;
struct timeval *chunk_tv;
uint8_t stream_type = header_len? UDP_HEADER_STREAM : UDP_PLAIN_STREAM;
if (list_empty(&targets))
return;
list_for_each_entry_safe(ut, tmp, &targets, node) {
- if (ut->fd < 0)
- udp_init_session(ut);
+ ret = udp_init_session(ut);
+ if (ret < 0)
+ udp_delete_target(ut, para_strerror(-ret));
}
if (!need_extra_header(current_chunk))
header_len = 0;