From: Andre Noll Date: Sat, 12 Apr 2008 12:35:16 +0000 (+0200) Subject: chunk_queue: Store a pointer to the data and the chunk size. X-Git-Tag: v0.3.3~84 X-Git-Url: http://git.tue.mpg.de/?a=commitdiff_plain;h=e61fefa8429ed4c87098f7a81373a8cf952ee8c7;p=paraslash.git chunk_queue: Store a pointer to the data and the chunk size. Storing the chunk number has the disadvantage that the queuing code must call into vss to get the chunk. This is unneccessary and requires the map pointer of vss.c to be global. --- diff --git a/chunk_queue.c b/chunk_queue.c index 84f4a4a8..f7f503ab 100644 --- a/chunk_queue.c +++ b/chunk_queue.c @@ -31,10 +31,10 @@ struct chunk_queue { /** Describes one queued chunk in a chunk queue. */ struct queued_chunk { - /** The number of the queued chunk, -1U means header. */ - unsigned chunk_num; - /** The number of bytes already sent. */ - unsigned sent; + /** Pointer to the data to be queued. */ + const char *buf; + /** The number of bytes of this chunk. */ + size_t num_bytes; /** Position of the chunk in the chunk queue. */ struct list_head node; }; @@ -43,32 +43,21 @@ struct queued_chunk { * Add a chunk to the given queue. * * \param cq the queue to add the chunk to. - * \param chunk_num The number of the chunk to be queued. - * \param sent The number of bytes of this chunk that the sender was able to - * send. + * \param buf Pointer to the data to be queued. + * \param num_bytes The size of \a buf. * - * \return Positive on success, negative on errors. + * \return Standard. */ -int cq_enqueue(struct chunk_queue *cq, long unsigned chunk_num, - size_t sent) +int cq_enqueue(struct chunk_queue *cq, const char *buf, size_t num_bytes) { struct queued_chunk *qc; - char *buf; - size_t len; - int ret; - if (chunk_num != -1U) { - ret = vss_get_chunk(chunk_num, &buf, &len); - if (ret < 0) - return ret; - } else - vss_get_header(&buf, &len); - if (cq->num_pending + len > cq->max_pending) + if (cq->num_pending + num_bytes > cq->max_pending) return -E_QUEUE; qc = para_malloc(sizeof(struct queued_chunk)); - cq->num_pending += len; - qc->chunk_num = chunk_num; - qc->sent = sent; + cq->num_pending += num_bytes; + qc->buf = buf; + qc->num_bytes = num_bytes; list_add_tail(&qc->node, &cq->q); PARA_DEBUG_LOG("%lu bytes queued for %p\n", cq->num_pending, &cq->q); return 1; @@ -111,7 +100,8 @@ void cq_update(struct chunk_queue *cq, size_t sent) { struct queued_chunk *qc = cq_peek(cq); assert(qc); - qc->sent += sent; + qc->num_bytes -= sent; + qc->buf += sent; cq->num_pending -= sent; } @@ -124,19 +114,10 @@ void cq_update(struct chunk_queue *cq, size_t sent) * * \return Positive on success, negative on errors. */ -int cq_get(struct queued_chunk *qc, char **buf, size_t *len) +int cq_get(struct queued_chunk *qc, const char **buf, size_t *num_bytes) { - int ret; - - if (qc->chunk_num != -1U) { - ret = vss_get_chunk(qc->chunk_num, buf, len); - if (ret < 0) - return ret; - } else - vss_get_header(buf, len); - assert(*len > qc->sent); - *buf += qc->sent; - *len -= qc->sent; + *buf = qc->buf; + *num_bytes = qc->num_bytes; return 1; } diff --git a/chunk_queue.h b/chunk_queue.h index a902ee05..9959b74c 100644 --- a/chunk_queue.h +++ b/chunk_queue.h @@ -9,10 +9,10 @@ struct chunk_queue; struct queued_chunk; -int cq_enqueue(struct chunk_queue *cq, long unsigned chunk_num, size_t sent); +int cq_enqueue(struct chunk_queue *cq, const char *buf, size_t num_bytes); struct queued_chunk *cq_peek(struct chunk_queue *cq); void cq_dequeue(struct chunk_queue *cq); void cq_update(struct chunk_queue *cq, size_t sent); -int cq_get(struct queued_chunk *qc, char **buf, size_t *len); +int cq_get(struct queued_chunk *qc, const char **buf, size_t *len); struct chunk_queue *cq_new(size_t max_pending); void cq_destroy(struct chunk_queue *cq); diff --git a/send_common.c b/send_common.c index 43aeb2b3..c542e7e6 100644 --- a/send_common.c +++ b/send_common.c @@ -123,10 +123,9 @@ static int write_nonblock(int fd, const char *buf, size_t len, } static int queue_chunk_or_shutdown(struct sender_client *sc, - struct sender_status *ss, long unsigned chunk_num, - size_t sent) + struct sender_status *ss, const char *buf, size_t num_bytes) { - int ret = cq_enqueue(sc->cq, chunk_num, sent); + int ret = cq_enqueue(sc->cq, buf, num_bytes); if (ret < 0) shutdown_client(sc, ss); return ret; @@ -138,7 +137,7 @@ static int send_queued_chunks(struct sender_client *sc, { struct queued_chunk *qc; while ((qc = cq_peek(sc->cq))) { - char *buf; + const char *buf; size_t len; int ret; cq_get(qc, &buf, &len); @@ -178,7 +177,7 @@ void send_chunk(struct sender_client *sc, struct sender_status *ss, vss_get_header(&header_buf, &header_len); if (header_buf && header_len > 0) { - ret = queue_chunk_or_shutdown(sc, ss, -1U, 0); + ret = queue_chunk_or_shutdown(sc, ss, header_buf, header_len); if (ret < 0) goto out; } @@ -192,7 +191,7 @@ void send_chunk(struct sender_client *sc, struct sender_status *ss, if (!len) goto out; if (!ret) { /* still data left in the queue */ - ret = queue_chunk_or_shutdown(sc, ss, current_chunk, 0); + ret = queue_chunk_or_shutdown(sc, ss, buf, len); goto out; } ret = write_nonblock(sc->fd, buf, len, max_bytes_per_write); @@ -201,7 +200,7 @@ void send_chunk(struct sender_client *sc, struct sender_status *ss, goto out; } if (ret != len) - ret = queue_chunk_or_shutdown(sc, ss, current_chunk, ret); + ret = queue_chunk_or_shutdown(sc, ss, buf + ret, len - ret); out: if (ret < 0) PARA_NOTICE_LOG("%s\n", para_strerror(-ret));