/** Describes one queued chunk in a chunk queue. */
struct queued_chunk {
- /** The number of the queued chunk, -1U means header. */
- unsigned chunk_num;
- /** The number of bytes already sent. */
- unsigned sent;
+ /** Pointer to the data to be queued. */
+ const char *buf;
+ /** The number of bytes of this chunk. */
+ size_t num_bytes;
/** Position of the chunk in the chunk queue. */
struct list_head node;
};
* Add a chunk to the given queue.
*
* \param cq the queue to add the chunk to.
- * \param chunk_num The number of the chunk to be queued.
- * \param sent The number of bytes of this chunk that the sender was able to
- * send.
+ * \param buf Pointer to the data to be queued.
+ * \param num_bytes The size of \a buf.
*
- * \return Positive on success, negative on errors.
+ * \return Standard.
*/
-int cq_enqueue(struct chunk_queue *cq, long unsigned chunk_num,
- size_t sent)
+int cq_enqueue(struct chunk_queue *cq, const char *buf, size_t num_bytes)
{
struct queued_chunk *qc;
- char *buf;
- size_t len;
- int ret;
- if (chunk_num != -1U) {
- ret = vss_get_chunk(chunk_num, &buf, &len);
- if (ret < 0)
- return ret;
- } else
- vss_get_header(&buf, &len);
- if (cq->num_pending + len > cq->max_pending)
+ if (cq->num_pending + num_bytes > cq->max_pending)
return -E_QUEUE;
qc = para_malloc(sizeof(struct queued_chunk));
- cq->num_pending += len;
- qc->chunk_num = chunk_num;
- qc->sent = sent;
+ cq->num_pending += num_bytes;
+ qc->buf = buf;
+ qc->num_bytes = num_bytes;
list_add_tail(&qc->node, &cq->q);
PARA_DEBUG_LOG("%lu bytes queued for %p\n", cq->num_pending, &cq->q);
return 1;
{
struct queued_chunk *qc = cq_peek(cq);
assert(qc);
- qc->sent += sent;
+ qc->num_bytes -= sent;
+ qc->buf += sent;
cq->num_pending -= sent;
}
*
* \return Positive on success, negative on errors.
*/
-int cq_get(struct queued_chunk *qc, char **buf, size_t *len)
+int cq_get(struct queued_chunk *qc, const char **buf, size_t *num_bytes)
{
- int ret;
-
- if (qc->chunk_num != -1U) {
- ret = vss_get_chunk(qc->chunk_num, buf, len);
- if (ret < 0)
- return ret;
- } else
- vss_get_header(buf, len);
- assert(*len > qc->sent);
- *buf += qc->sent;
- *len -= qc->sent;
+ *buf = qc->buf;
+ *num_bytes = qc->num_bytes;
return 1;
}
struct chunk_queue;
struct queued_chunk;
-int cq_enqueue(struct chunk_queue *cq, long unsigned chunk_num, size_t sent);
+int cq_enqueue(struct chunk_queue *cq, const char *buf, size_t num_bytes);
struct queued_chunk *cq_peek(struct chunk_queue *cq);
void cq_dequeue(struct chunk_queue *cq);
void cq_update(struct chunk_queue *cq, size_t sent);
-int cq_get(struct queued_chunk *qc, char **buf, size_t *len);
+int cq_get(struct queued_chunk *qc, const char **buf, size_t *len);
struct chunk_queue *cq_new(size_t max_pending);
void cq_destroy(struct chunk_queue *cq);
}
static int queue_chunk_or_shutdown(struct sender_client *sc,
- struct sender_status *ss, long unsigned chunk_num,
- size_t sent)
+ struct sender_status *ss, const char *buf, size_t num_bytes)
{
- int ret = cq_enqueue(sc->cq, chunk_num, sent);
+ int ret = cq_enqueue(sc->cq, buf, num_bytes);
if (ret < 0)
shutdown_client(sc, ss);
return ret;
{
struct queued_chunk *qc;
while ((qc = cq_peek(sc->cq))) {
- char *buf;
+ const char *buf;
size_t len;
int ret;
cq_get(qc, &buf, &len);
vss_get_header(&header_buf, &header_len);
if (header_buf && header_len > 0) {
- ret = queue_chunk_or_shutdown(sc, ss, -1U, 0);
+ ret = queue_chunk_or_shutdown(sc, ss, header_buf, header_len);
if (ret < 0)
goto out;
}
if (!len)
goto out;
if (!ret) { /* still data left in the queue */
- ret = queue_chunk_or_shutdown(sc, ss, current_chunk, 0);
+ ret = queue_chunk_or_shutdown(sc, ss, buf, len);
goto out;
}
ret = write_nonblock(sc->fd, buf, len, max_bytes_per_write);
goto out;
}
if (ret != len)
- ret = queue_chunk_or_shutdown(sc, ss, current_chunk, ret);
+ ret = queue_chunk_or_shutdown(sc, ss, buf + ret, len - ret);
out:
if (ret < 0)
PARA_NOTICE_LOG("%s\n", para_strerror(-ret));