From: Andre Noll Date: Sun, 11 Jan 2009 22:18:04 +0000 (+0100) Subject: Major udp sender/receiver improvements. X-Git-Tag: v0.3.4~75^2~18 X-Git-Url: http://git.tue.mpg.de/?a=commitdiff_plain;h=01556501c32d5ec155cc7e2cd78307ec1f177a07;p=paraslash.git Major udp sender/receiver improvements. - Reserve some bytes in the udp header for future extensions. - Simplify udp audio header reading/writing. - Handle short reads in udp_recv. - Extended sanity checks for the udp audio header. - Fix a memory leak in the error path of udp_recv_open(). --- diff --git a/error.h b/error.h index 347327a6..897aeb28 100644 --- a/error.h +++ b/error.h @@ -208,11 +208,10 @@ extern const char **para_errlist[]; #define UDP_RECV_ERRORS \ - PARA_ERROR(UDP_NO_MAGIC, "not enough magic"), \ PARA_ERROR(UDP_SYNTAX, "udp_recv syntax error"), \ - PARA_ERROR(UDP_SHORT_PACKET, "udp data packet too short"), \ - PARA_ERROR(INVALID_HEADER, "invalid header packet"), \ - PARA_ERROR(OVERRUN, "output buffer overrun"), \ + PARA_ERROR(UDP_BAD_HEADER, "invalid udp audio header"), \ + PARA_ERROR(UDP_OVERRUN, "output buffer overrun"), \ + PARA_ERROR(UDP_BAD_STREAM_TYPE, "invalid stream type"), \ #define HTTP_RECV_ERRORS \ diff --git a/udp_header.h b/udp_header.h index 24f6bb87..83157e6b 100644 --- a/udp_header.h +++ b/udp_header.h @@ -13,14 +13,16 @@ * this header, the type of the current audio stream and the * type of this * data chunk is coded. */ -#define UDP_AUDIO_HEADER_LEN 8 +#define UDP_AUDIO_HEADER_LEN 16 /** The possible stream types. */ enum udp_stream_type { /** Used for mp3 and aac streams. */ UDP_PLAIN_STREAM, - /* Ogg vorbis streams. */ - UDP_HEADER_STREAM + /** Ogg vorbis streams. */ + UDP_HEADER_STREAM, + /** stream type not yet known. */ + UDP_UNKNOWN_STREAM }; /** The possible packet types. */ @@ -32,106 +34,66 @@ enum udp_audio_packet_type { /** Combined header/data packet (ogg only). */ UDP_HEADER_PACKET, /** Packet contains only audio file data. */ - UDP_DATA_PACKET + UDP_DATA_PACKET, + /** Invalid packet type. */ + UDP_UNKNOWN_PACKET +}; + +/** The contents of an udp audio header. */ +struct udp_audio_header { + /** see \ref udp_stream_type. */ + uint8_t stream_type; + /** see \ref udp_audio_packet_type. */ + uint8_t packet_type; + /** Non-zero only for header packets. */ + uint16_t header_len; + /** Length of header plus audio file data. */ + uint16_t payload_len; }; /** - * Write the magic bytes to the beginning of a buffer. + * Write a struct udp_audio_header to a buffer. + * + * \param buf The buffer to write to. + * \param h The audio header to write. + * + * Used by the udp sender. * - * \param buf The buffer. */ -_static_inline_ void udp_write_magic(char *buf) +_static_inline_ void write_udp_audio_header(char *buf, struct udp_audio_header *h) { memcpy(buf, "UDPM", 4); + write_u8(buf + 4, h->stream_type); + write_u8(buf + 5, h->packet_type); + write_u16(buf + 6, h->header_len); + write_u16(buf + 8, h->payload_len); + memset(buf + 10, 0, 6); } /** - * Check whether this buffer contains the magic bytes. + * Used by the udp receiver to read a struct udp_audio_header from a buffer. * - * \param buf The buffer. - * \param len The number of bytes of \a buf. + * \param buf The buffer to read from. + * \param len The length of \a buf. + * \param h Result pointer. * - * \return Positive if \a buf contains the magic bytes, - * -1 otherwise. + * \return 1 if \a buf contains a valid udp audio header, -1 else. */ -_static_inline_ int udp_check_magic(char *buf, size_t len) +_static_inline_ int read_udp_audio_header(char *buf, size_t len, + struct udp_audio_header *h) { if (len < 4) - return -1; + goto err; if (memcmp(buf, "UDPM", 4)) - return -1; + goto err; + h->stream_type = read_u8(buf + 4); + h->packet_type = read_u8(buf + 5); + h->header_len = read_u16(buf + 6); + h->payload_len = read_u16(buf + 8); return 1; -} - -/** - * Write the type of the audio stream to a buffer. - * - * \param buf The buffer. - * \param type The type to be written. - * - * \sa \ref udp_stream_type. - */ -_static_inline_ void udp_write_stream_type(char *buf, uint8_t type) -{ - write_u8(buf + 4, type); -} - -/** - * Read the type of the audio stream from a buffer. - * - * \param buf The buffer. - * - * \return Either UDP_PLAIN_STREAM or UDP_HEADER_STREAM. - * \sa \ref udp_stream_type. - */ -_static_inline_ uint8_t udp_read_stream_type(char *buf) -{ - return read_u8(buf + 4); -} - -/** - * Write the type of this packet to a buffer. - * - * \param buf The buffer. - * \param type The type to be written. - * - * \sa \ref udp_audio_packet_type. - */ -_static_inline_ void udp_write_packet_type(char *buf, uint8_t type) -{ - write_u8(buf + 5, type); -} - -/** - * Read the type of this buffer. - * - * \param buf The buffer. - * - * \return The packet type, see \ref udp_stream_type. - */ -_static_inline_ uint8_t udp_read_packet_type(char *buf) -{ - return read_u8(buf + 5); -} - -/** - * Write the length of the header (non-zero only for ogg streams). - * - * \param buf The buffer. - * \param len The length of the header in bytes. - */ -_static_inline_ void udp_write_header_len(char *buf, uint16_t len) -{ - write_u16(buf + 6, len); -} - -/** - * Read the length of the header. - * - * \param buf The buffer. - * \return The header length in bytes. - */ -_static_inline_ uint16_t udp_read_header_len(char *buf) -{ - return read_u16(buf + 6); +err: + h->stream_type = UDP_UNKNOWN_STREAM; + h->packet_type = UDP_UNKNOWN_PACKET; + h->header_len = h->payload_len = 0; + return -1; } diff --git a/udp_recv.c b/udp_recv.c index 66dc36b9..bdc25411 100644 --- a/udp_recv.c +++ b/udp_recv.c @@ -8,6 +8,7 @@ #include #include "para.h" +#include "error.h" #include "portable_io.h" #include "udp_header.h" #include "list.h" @@ -15,7 +16,6 @@ #include "ggo.h" #include "recv.h" #include "udp_recv.cmdline.h" -#include "error.h" #include "audiod.h" #include "string.h" #include "net.h" @@ -43,6 +43,10 @@ struct private_udp_recv_data { int have_header; /** The socket file descriptor. */ int fd; + /** Non-zero on short reads. */ + uint16_t need_more; + /** Copied from the first audio header received. */ + uint16_t stream_type; }; static void udp_recv_pre_select(struct sched *s, struct task *t) @@ -58,14 +62,74 @@ static int enough_space(size_t nbytes, size_t loaded) return nbytes + loaded < UDP_RECV_CHUNK_SIZE; } +/* + * Perform some sanity checks on an udp audio file header. + * + * return: negative on error, 0: discard data, 1: use data + */ +static int examine_audio_header(struct private_udp_recv_data *purd, + struct udp_audio_header *uah, size_t packet_size) +{ + /* payload_len includes header */ + if (uah->payload_len < uah->header_len) + return -E_UDP_BAD_HEADER; + switch (uah->packet_type) { + case UDP_EOF_PACKET: + return -E_RECV_EOF; + case UDP_BOF_PACKET: + purd->have_header = 1; + /* fall through */ + case UDP_DATA_PACKET: + if (uah->header_len) /* header in no-header packet */ + return -E_UDP_BAD_HEADER; + break; + case UDP_HEADER_PACKET: + if (!uah->header_len) /** no header in header packet */ + return -E_UDP_BAD_HEADER; + break; + default: /* bad packet type */ + return -E_UDP_BAD_HEADER; + } + /* check stream type */ + if (uah->stream_type != UDP_PLAIN_STREAM && + uah->stream_type != UDP_HEADER_STREAM) + return -E_UDP_BAD_STREAM_TYPE; + if (purd->stream_type == UDP_UNKNOWN_STREAM) + purd->stream_type = uah->stream_type; + /* stream type must not change */ + if (uah->stream_type != purd->stream_type) + return -E_UDP_BAD_STREAM_TYPE; + if (!purd->have_header && uah->stream_type == UDP_HEADER_STREAM) + /* can't use the data, wait for header packet */ + return 0; + if (packet_size < uah->payload_len + UDP_AUDIO_HEADER_LEN) + /* we read only a part of the package */ + purd->need_more = uah->payload_len + + UDP_AUDIO_HEADER_LEN - packet_size; + return 1; +} + +static int add_rn_output(struct receiver_node *rn, char *buf, size_t len) +{ + if (!len) + return 1; + if (!enough_space(len, rn->loaded)) + return -E_UDP_OVERRUN; + memcpy(rn->buf + rn->loaded, buf, len); + rn->loaded += len; + return 1; +} + static void udp_recv_post_select(__a_unused struct sched *s, struct task *t) { struct receiver_node *rn = container_of(t, struct receiver_node, task); struct private_udp_recv_data *purd = rn->private_data; int ret; char tmpbuf[UDP_RECV_CHUNK_SIZE]; + uint16_t data_len; + char *data_buf; size_t packet_size; - uint8_t stream_type, packet_type; + struct udp_audio_header uah; if (rn->output_error && *rn->output_error < 0) { t->error = *rn->output_error; @@ -84,71 +148,45 @@ static void udp_recv_post_select(__a_unused struct sched *s, struct task *t) if (!ret) return; packet_size = ret; - t->error = -E_UDP_SHORT_PACKET; - if (packet_size < UDP_AUDIO_HEADER_LEN) - return; - t->error = -E_UDP_NO_MAGIC; - if (udp_check_magic(tmpbuf, packet_size) < 0) - return; - stream_type = udp_read_stream_type(tmpbuf); - packet_type = udp_read_packet_type(tmpbuf); -// PARA_INFO_LOG("packet type: %d, stream type: %d," -// " loaded: %u\n", packet_type, -// (unsigned) stream_type, rn->loaded); - switch (packet_type) { - uint16_t header_len, payload_len; + for (;;) { + uint16_t num; - case UDP_EOF_PACKET: - t->error = -E_RECV_EOF; - return; - case UDP_BOF_PACKET: - PARA_INFO_LOG("bof (%zu)\n", packet_size); - purd->have_header = 1; - /* fall through */ - case UDP_DATA_PACKET: - if (!purd->have_header && stream_type == UDP_HEADER_STREAM) - /* can't use the data, wait for header */ - goto success; - payload_len = packet_size - UDP_AUDIO_HEADER_LEN; - if (!payload_len) - goto success; - t->error = -E_OVERRUN; - if (!enough_space(payload_len, rn->loaded)) - return; - memcpy(rn->buf + rn->loaded, tmpbuf + UDP_AUDIO_HEADER_LEN, - payload_len); - rn->loaded += payload_len; - goto success; - case UDP_HEADER_PACKET: - header_len = udp_read_header_len(tmpbuf); - if (header_len + UDP_AUDIO_HEADER_LEN > packet_size) { - t->error = -E_INVALID_HEADER; - return; + if (!purd->need_more) { + ret = read_udp_audio_header(tmpbuf, packet_size, &uah); + if (ret >= 0) + break; + goto success; /* drop data */ } -// PARA_DEBUG_LOG("header packet (%zu bytes), header len: %d\n", -// packet_size, header_len); - if (!purd->have_header) { - t->error = -E_OVERRUN; - if (!enough_space(header_len, rn->loaded)) - return; - purd->have_header = 1; - rn->loaded = header_len; - memcpy(rn->buf, tmpbuf + UDP_AUDIO_HEADER_LEN, - rn->loaded); + num = PARA_MIN(purd->need_more, (uint16_t)packet_size); + assert(num > 0); + t->error = add_rn_output(rn, tmpbuf, num); + if (t->error < 0) + return; + purd->need_more -= num; + if (packet_size <= num) goto success; + packet_size -= num; + memmove(tmpbuf, tmpbuf + num, packet_size); + } + assert(!purd->need_more); + t->error = examine_audio_header(purd, &uah, packet_size); + if (t->error <= 0) + return; + data_len = uah.payload_len; + data_buf = tmpbuf + UDP_AUDIO_HEADER_LEN; + if (uah.packet_type == UDP_HEADER_PACKET) { + if (purd->have_header) { /* skip header */ + data_buf += uah.header_len; + data_len -= uah.header_len; + } else { /* only use the header */ + purd->have_header = 1; + data_len = uah.header_len; } - payload_len = packet_size - UDP_AUDIO_HEADER_LEN - header_len; - if (!payload_len) - goto success; - t->error = -E_OVERRUN; - if (!enough_space(payload_len, rn->loaded)) - return; - memcpy(rn->buf + rn->loaded, tmpbuf + UDP_AUDIO_HEADER_LEN + - header_len, payload_len); - rn->loaded += payload_len; } + t->error = add_rn_output(rn, data_buf, data_len); + return; success: - t->error = 0; + t->error = 1; } static void udp_shutdown(void) @@ -190,14 +228,19 @@ static int udp_recv_open(struct receiver_node *rn) purd = rn->private_data; ret = create_udp_recv_socket(c->host_arg, c->port_arg); if (ret < 0) - return ret; + goto err; purd->fd = ret; ret = mark_fd_nonblocking(purd->fd); if (ret < 0) - return ret; + goto err; + purd->stream_type = UDP_UNKNOWN_STREAM; PARA_NOTICE_LOG("receiving from %s:%d, fd=%d\n", c->host_arg, c->port_arg, purd->fd); return purd->fd; +err: + free(rn->private_data); + free(rn->buf); + return ret; } /** diff --git a/udp_send.c b/udp_send.c index e83239d3..b0f55bc3 100644 --- a/udp_send.c +++ b/udp_send.c @@ -136,9 +136,12 @@ static void udp_shutdown_targets(void) { char buf[UDP_AUDIO_HEADER_LEN]; struct udp_target *ut, *tmp; + struct udp_audio_header uah = { + .stream_type = UDP_UNKNOWN_STREAM, + .packet_type = UDP_EOF_PACKET, + }; - udp_write_packet_type(buf, UDP_EOF_PACKET); - udp_write_magic(buf); + write_udp_audio_header(buf, &uah); list_for_each_entry_safe(ut, tmp, &targets, node) { if (ut->fd < 0) continue; @@ -165,11 +168,10 @@ static void udp_send(long unsigned current_chunk, __a_unused long unsigned chunk const char *buf, size_t len, const char *header_buf, size_t header_len) { - size_t sendbuf_len; - uint8_t packet_type = UDP_DATA_PACKET; char *sendbuf; + size_t sendbuf_len; struct timeval *chunk_tv; - uint8_t stream_type = header_len? UDP_HEADER_STREAM : UDP_PLAIN_STREAM; + struct udp_audio_header uah; // PARA_NOTICE_LOG("len: %zd, header_len: %zd\n", len, header_len); if (sender_status != SENDER_ON) @@ -181,22 +183,22 @@ static void udp_send(long unsigned current_chunk, __a_unused long unsigned chunk return; if (list_empty(&targets)) return; - if (!need_extra_header(current_chunk)) - header_len = 0; + uah.stream_type = header_len? UDP_HEADER_STREAM : UDP_PLAIN_STREAM; + uah.header_len = need_extra_header(current_chunk)? header_len : 0; if (!current_chunk) - packet_type = UDP_BOF_PACKET; - else if (header_len) - packet_type = UDP_HEADER_PACKET; - sendbuf_len = UDP_AUDIO_HEADER_LEN + header_len + len; + uah.packet_type = UDP_BOF_PACKET; + else if (uah.header_len) + uah.packet_type = UDP_HEADER_PACKET; + else + uah.packet_type = UDP_DATA_PACKET; + uah.payload_len = uah.header_len + len; + sendbuf_len = UDP_AUDIO_HEADER_LEN + uah.payload_len; sendbuf = para_malloc(sendbuf_len); - udp_write_magic(sendbuf); - udp_write_stream_type(sendbuf, stream_type); - udp_write_packet_type(sendbuf, packet_type); - udp_write_header_len(sendbuf, header_len); - if (header_len) + write_udp_audio_header(sendbuf, &uah); + if (uah.header_len) memcpy(sendbuf + UDP_AUDIO_HEADER_LEN, header_buf, - header_len); - memcpy(sendbuf + UDP_AUDIO_HEADER_LEN + header_len, buf, len); + uah.header_len); + memcpy(sendbuf + UDP_AUDIO_HEADER_LEN + uah.header_len, buf, len); udp_send_buf(sendbuf, sendbuf_len); free(sendbuf); }