This adds the public cq_force_enqueue() function to chunk_queue.c.
Unlike cq_enqueue() the new function does not return an error if the
queue is full, but drops old chunks from the queue in order to make
space for the new chunk.
The upd sender uses the new function to avoid kicking targets.
free(qc);
}
+/**
+ * Force to add a chunk to the given queue.
+ *
+ * \param cq See \ref cq_enqueue.
+ * \param buf See \ref cq_enqueue.
+ * \param num_bytes See \ref cq_enqueue.
+ *
+ * If queuing the given buffer would result in exceeding the maximal queue
+ * size, buffers are dropped from the beginning of the queue. Note that this
+ * function still might fail.
+ *
+ * \return Standard.
+ */
+int cq_force_enqueue(struct chunk_queue *cq, const char *buf, size_t num_bytes)
+{
+ int ret;
+
+ if (num_bytes > cq->max_pending)
+ return -E_QUEUE;
+ for (;;) {
+ ret = cq_enqueue(cq, buf, num_bytes);
+ if (ret >= 0)
+ return ret;
+ cq_dequeue(cq);
+ }
+ /* never reached */
+}
+
/**
* Change the number of bytes sent for the current queued chunk.
*
int cq_get(struct queued_chunk *qc, const char **buf, size_t *len);
struct chunk_queue *cq_new(size_t max_pending);
void cq_destroy(struct chunk_queue *cq);
+int cq_force_enqueue(struct chunk_queue *cq, const char *buf, size_t num_bytes);
if (!len)
return 0;
if (!ret) { /* still data left in the queue */
- ret = cq_enqueue(ut->cq, buf, len);
- if (ret < 0)
- goto fail;
+ ret = cq_force_enqueue(ut->cq, buf, len);
+ assert(ret >= 0);
}
ret = write_nonblock(ut->fd, buf, len, 0);
if (ret == -ERRNO_TO_PARA_ERROR(ECONNREFUSED))
if (ret < 0)
goto fail;
if (ret != len) {
- ret = cq_enqueue(ut->cq, buf + ret, len - ret);
- if (ret < 0)
- goto fail;
+ ret = cq_force_enqueue(ut->cq, buf + ret, len - ret);
+ assert(ret >= 0);
}
return 1;
fail: