int check_w;
/** The position of this client in the client list. */
struct list_head node;
- /** Te list of pending packets for this client. */
- struct list_head packet_queue;
+ /** The list of pending chunks for this client. */
+ struct list_head chunk_queue;
/** The number of pending bytes for this client. */
- unsigned long pq_bytes;
+ unsigned long cq_bytes;
};
/**
- * Describes one queued data packet for a client.
+ * Describes one queued chunk of the chunk queue.
*
* The send function of the http sender checks each client fd for writing. If a
- * client fd is not ready, it tries to queue that packet for this client until
+ * client fd is not ready, it tries to queue that chunk for this client until
* the number of queued bytes exceeds \p MAX_BACKLOG.
*/
-struct queued_packet {
- /** The length of the packet in bytes. */
- unsigned int len;
- /** Pointer to the packet data. */
- char *packet;
- /** Position of the packet in the packet list. */
+struct queued_chunk {
+ /** The number of the queued chunk, -1U means header. */
+ unsigned chunk_num;
+ /** The number of bytes already sent. */
+ unsigned sent;
+ /** Position of the chunk in the chunk queue. */
struct list_head node;
};
static void http_shutdown_client(struct http_client *hc, const char *msg)
{
- struct queued_packet *qp, *tmp;
+ struct queued_chunk *qc, *tmp;
PARA_INFO_LOG("shutting down %s on fd %d (%s)\n", CLIENT_ADDR(hc),
hc->fd, msg);
numclients--;
close(hc->fd);
del_close_on_fork_list(hc->fd);
- list_for_each_entry_safe(qp, tmp, &hc->packet_queue, node) {
- free(qp->packet);
- list_del(&qp->node);
- free(qp);
+ list_for_each_entry_safe(qc, tmp, &hc->chunk_queue, node) {
+ list_del(&qc->node);
+ free(qc);
}
list_del(&hc->node);
free(hc);
return http_send_msg(hc, HTTP_ERR_MSG);
}
-static int queue_packet(struct http_client *hc, const char *buf, size_t len)
+static int queue_chunk(struct http_client *hc, long unsigned chunk_num,
+ size_t sent)
{
- struct queued_packet *qp;
- if (hc->pq_bytes + len > MAX_BACKLOG) {
- http_shutdown_client(hc, "packet queue overrun");
+ struct queued_chunk *qc;
+ char *buf;
+ size_t len;
+ int ret;
+
+ if (chunk_num != -1U) {
+ ret = vss_get_chunk(chunk_num, &buf, &len);
+ if (ret < 0)
+ return ret;
+ } else
+ buf = vss_get_header(&len);
+ if (hc->cq_bytes + len > MAX_BACKLOG)
return -E_QUEUE;
- }
- qp = para_malloc(sizeof(struct queued_packet));
- hc->pq_bytes += len;
- qp->packet = para_malloc(len);
- memcpy(qp->packet, buf, len);
- qp->len = len;
- list_add_tail(&qp->node, &hc->packet_queue);
- PARA_INFO_LOG("%lu bytes queued for fd %d\n", hc->pq_bytes, hc->fd);
+ qc = para_malloc(sizeof(struct queued_chunk));
+ hc->cq_bytes += len;
+ qc->chunk_num = chunk_num;
+ qc->sent = sent;
+ list_add_tail(&qc->node, &hc->chunk_queue);
+ PARA_INFO_LOG("%lu bytes queued for fd %d\n", hc->cq_bytes, hc->fd);
return 1;
}
-static int send_queued_packets(struct http_client *hc)
+static int send_queued_chunks(struct http_client *hc)
{
int ret;
- struct queued_packet *qp, *tmp;
+ struct queued_chunk *qc, *tmp;
- if (list_empty(&hc->packet_queue))
+ if (list_empty(&hc->chunk_queue))
return 1;
- list_for_each_entry_safe(qp, tmp, &hc->packet_queue, node) {
+ list_for_each_entry_safe(qc, tmp, &hc->chunk_queue, node) {
+ char *buf;
+ size_t len;
ret = write_ok(hc->fd);
if (ret <= 0)
return ret? -E_WRITE_OK : 0;
- ret = write(hc->fd, qp->packet, qp->len);
+ if (qc->chunk_num != -1U) {
+ ret = vss_get_chunk(qc->chunk_num, &buf, &len);
+ if (ret < 0)
+ return ret;
+ } else
+ buf = vss_get_header(&len);
+ assert(len && len > qc->sent);
+ ret = write(hc->fd, buf + qc->sent, len - qc->sent);
if (ret < 0)
- return ret;
- if (ret != qp->len) {
- qp->len -= ret;
- memmove(qp->packet, qp->packet + ret, qp->len);
+ return -1; /* FIXME */
+ hc->cq_bytes -= ret;
+ if (ret != len - qc->sent) {
+ qc->sent += ret;
return 0;
}
- hc->pq_bytes -= qp->len;
- free(qp->packet);
- list_del(&qp->node);
- free(qp);
+ list_del(&qc->node);
+ free(qc);
}
return 1;
}
+static int queue_chunk_or_shutdown(struct http_client *hc, long unsigned chunk_num,
+ size_t sent)
+{
+ int ret = queue_chunk(hc, chunk_num, sent);
+ if (ret < 0)
+ http_shutdown_client(hc, "queue error");
+ return ret;
+}
+
static void http_send( long unsigned current_chunk,
__a_unused long unsigned chunks_sent, const char *buf, size_t len)
{
if (hbuf && hlen > 0 && current_chunk) {
/* need to send header */
PARA_INFO_LOG("queueing header: %d\n", hlen);
- if (queue_packet(hc, hbuf, hlen) < 0)
+ if (queue_chunk_or_shutdown(hc, -1U, 0) < 0)
continue;
} else
- PARA_INFO_LOG("%s", "no need to queue header\n");
+ PARA_INFO_LOG("no need to queue header\n");
hc->status = HTTP_STREAMING;
}
- ret = send_queued_packets(hc);
+ ret = send_queued_chunks(hc);
if (ret < 0) {
- http_shutdown_client(hc, "send error");
+ http_shutdown_client(hc, "queue send error");
continue;
}
if (!len)
continue;
if (!ret || write_ok(hc->fd) <= 0) {
PARA_INFO_LOG("fd %d not ready (%lu bytes queued),"
- " trying to queue packet\n", hc->fd,
- hc->pq_bytes);
- queue_packet(hc, buf, len);
+ " trying to queue chunk\n", hc->fd,
+ hc->cq_bytes);
+ queue_chunk_or_shutdown(hc, current_chunk, 0);
continue;
}
// PARA_DEBUG_LOG("sending %d -> %s\n", len, CLIENT_ADDR(hc));
ret = write(hc->fd, buf, len);
+// PARA_DEBUG_LOG("ret: %d\n", ret);
if (ret < 0) {
http_shutdown_client(hc, "send error");
continue;
}
if (ret != len)
- queue_packet(hc, buf + ret, len - ret);
+ queue_chunk_or_shutdown(hc, current_chunk, ret);
}
}
goto err_out;
}
hc->status = HTTP_CONNECTED;
- INIT_LIST_HEAD(&hc->packet_queue);
+ INIT_LIST_HEAD(&hc->chunk_queue);
PARA_INFO_LOG("accepted client #%d: %s (fd %d)\n", numclients,
CLIENT_ADDR(hc), hc->fd);
numclients++;
return ret;
}
+static void get_chunk(long unsigned chunk_num, char **buf, size_t *len)
+{
+ size_t pos = mmd->afi.chunk_table[chunk_num];
+ *buf = map + pos;
+ *len = mmd->afi.chunk_table[chunk_num + 1] - pos;
+}
+
+/**
+ * Get the data of the given chunk.
+ *
+ * \param chunk_num The number of the desired chunk.
+ * \param buf Chunk data.
+ * \param len Chunk length in bytes.
+ *
+ * \return Positive on success, negative on errors.
+ */
+int vss_get_chunk(long unsigned chunk_num, char **buf, size_t *len)
+{
+ if (mmd->audio_format < 0 || !map || !vss_playing())
+ return -E_CHUNK;
+ if (chunk_num >= mmd->afi.chunks_total)
+ return -E_CHUNK;
+ get_chunk(chunk_num, buf, len);
+ return 1;
+}
+
/**
* main sending function
*
{
int i;
struct audio_format_handler *af;
- ssize_t pos;
- size_t len;
struct timeval now, due;
if (mmd->audio_format < 0 || !map || !vss_playing())
mmd->new_vss_status_flags |= VSS_NEXT;
return vss_eof();
}
- pos = mmd->afi.chunk_table[mmd->current_chunk];
- len = mmd->afi.chunk_table[mmd->current_chunk + 1] - pos;
/*
* We call the send function also in case of empty chunks as they
* might have still some data queued which can be sent in this case.
mmd->offset = tv2ms(&tmp);
mmd->events++;
}
- for (i = 0; senders[i].name; i++)
- senders[i].send(mmd->current_chunk, mmd->chunks_sent,
- map + pos, len);
+ for (i = 0; senders[i].name; i++) {
+ char *buf;
+ size_t len;
+ get_chunk(mmd->current_chunk, &buf, &len);
+ senders[i].send(mmd->current_chunk, mmd->chunks_sent, buf, len);
+ }
mmd->new_vss_status_flags |= VSS_PLAYING;
mmd->chunks_sent++;
mmd->current_chunk++;