From 5667dca4697c3bbbe4830699ee91df800bf2efd3 Mon Sep 17 00:00:00 2001 From: Andre Noll Date: Tue, 6 Apr 2010 19:51:17 +0200 Subject: [PATCH] udp_send: Force chunk queuing. This adds the public cq_force_enqueue() function to chunk_queue.c. Unlike cq_enqueue() the new function does not return an error if the queue is full, but drops old chunks from the queue in order to make space for the new chunk. The upd sender uses the new function to avoid kicking targets. --- chunk_queue.c | 28 ++++++++++++++++++++++++++++ chunk_queue.h | 1 + udp_send.c | 10 ++++------ 3 files changed, 33 insertions(+), 6 deletions(-) diff --git a/chunk_queue.c b/chunk_queue.c index ad28190a..5b102f28 100644 --- a/chunk_queue.c +++ b/chunk_queue.c @@ -92,6 +92,34 @@ void cq_dequeue(struct chunk_queue *cq) free(qc); } +/** + * Force to add a chunk to the given queue. + * + * \param cq See \ref cq_enqueue. + * \param buf See \ref cq_enqueue. + * \param num_bytes See \ref cq_enqueue. + * + * If queuing the given buffer would result in exceeding the maximal queue + * size, buffers are dropped from the beginning of the queue. Note that this + * function still might fail. + * + * \return Standard. + */ +int cq_force_enqueue(struct chunk_queue *cq, const char *buf, size_t num_bytes) +{ + int ret; + + if (num_bytes > cq->max_pending) + return -E_QUEUE; + for (;;) { + ret = cq_enqueue(cq, buf, num_bytes); + if (ret >= 0) + return ret; + cq_dequeue(cq); + } + /* never reached */ +} + /** * Change the number of bytes sent for the current queued chunk. * diff --git a/chunk_queue.h b/chunk_queue.h index 3c138eb5..9e794ba8 100644 --- a/chunk_queue.h +++ b/chunk_queue.h @@ -16,3 +16,4 @@ void cq_update(struct chunk_queue *cq, size_t sent); int cq_get(struct queued_chunk *qc, const char **buf, size_t *len); struct chunk_queue *cq_new(size_t max_pending); void cq_destroy(struct chunk_queue *cq); +int cq_force_enqueue(struct chunk_queue *cq, const char *buf, size_t num_bytes); diff --git a/udp_send.c b/udp_send.c index 533af63a..c84f416a 100644 --- a/udp_send.c +++ b/udp_send.c @@ -244,9 +244,8 @@ static int udp_send_fec(char *buf, size_t len, void *private_data) if (!len) return 0; if (!ret) { /* still data left in the queue */ - ret = cq_enqueue(ut->cq, buf, len); - if (ret < 0) - goto fail; + ret = cq_force_enqueue(ut->cq, buf, len); + assert(ret >= 0); } ret = write_nonblock(ut->fd, buf, len, 0); if (ret == -ERRNO_TO_PARA_ERROR(ECONNREFUSED)) @@ -254,9 +253,8 @@ static int udp_send_fec(char *buf, size_t len, void *private_data) if (ret < 0) goto fail; if (ret != len) { - ret = cq_enqueue(ut->cq, buf + ret, len - ret); - if (ret < 0) - goto fail; + ret = cq_force_enqueue(ut->cq, buf + ret, len - ret); + assert(ret >= 0); } return 1; fail: -- 2.39.5