From: Andre Noll Date: Sat, 2 Feb 2008 20:28:51 +0000 (+0100) Subject: Intoduce send_common.c and use it from the dccp sender. X-Git-Tag: v0.3.1~54 X-Git-Url: http://git.tue.mpg.de/?a=commitdiff_plain;h=2abba90245f87fab096edfc3faf7df61646b713f;p=paraslash.git Intoduce send_common.c and use it from the dccp sender. This new file contains code that was identical for the dccp and the http sender. A subsequent patch will convert the http sender to also use the shared code in send_common.c --- diff --git a/command.c b/command.c index 3185533d..995dc4a0 100644 --- a/command.c +++ b/command.c @@ -20,12 +20,12 @@ #include "afs.h" #include "server.h" #include "vss.h" +#include "list.h" #include "send.h" #include "rc4.h" #include "net.h" #include "daemon.h" #include "fd.h" -#include "list.h" #include "user_list.h" #include "server_command_list.h" #include "afs_command_list.h" diff --git a/configure.ac b/configure.ac index a8efcc15..c612a542 100644 --- a/configure.ac +++ b/configure.ac @@ -83,7 +83,7 @@ daemon stat crypt http_send close_on_fork ipc acl dccp_send fd user_list chunk_queue afs osl aft mood score attribute blob ringbuffer playlist sha1 rbtree sched audiod grab_client filter_chain wav compress http_recv dccp_recv recv_common write_common file_write audiod_command -client_common recv stdout filter stdin audioc write client fsck exec" +client_common recv stdout filter stdin audioc write client fsck exec send_common" all_executables="server audiod recv filter audioc write client fsck" recv_cmdline_objs="recv.cmdline http_recv.cmdline dccp_recv.cmdline" @@ -117,7 +117,7 @@ server_cmdline_objs="server.cmdline server_command_list afs_command_list" server_errlist_objs="server afh_common mp3_afh vss command net string signal time daemon stat crypt http_send close_on_fork ipc dccp_send fd user_list chunk_queue afs osl aft mood score attribute - blob playlist sha1 rbtree sched acl" + blob playlist sha1 rbtree sched acl send_common" server_ldflags="" server_audio_formats=" mp3" diff --git a/dccp_send.c b/dccp_send.c index 887801a9..c68f5777 100644 --- a/dccp_send.c +++ b/dccp_send.c @@ -36,32 +36,19 @@ static int listen_fd = -1; /** Maximal number of bytes in a chunk queue. */ #define DCCP_MAX_PENDING_BYTES 40000 -/** describes one connected client */ -struct dccp_client { - /** the dccp socket */ - int fd; - /** The socket `name' of the client. */ - char *name; - /** the position of this client in the client list */ - struct list_head node; - /** non-zero if audio file header has been sent */ - int header_sent; - /** The list of pending chunks for this client. */ - struct chunk_queue *cq; -}; +/** Do not write more than that many bytes at once. */ +#define DCCP_MAX_BYTES_PER_WRITE 1024 static void dccp_pre_select(int *max_fileno, fd_set *rfds, __a_unused fd_set *wfds) { - if (listen_fd < 0) - return; - FD_SET(listen_fd, rfds); - *max_fileno = PARA_MAX(*max_fileno, listen_fd); + if (listen_fd >= 0) + para_fd_set(listen_fd, rfds, max_fileno); } static void dccp_post_select(fd_set *rfds, __a_unused fd_set *wfds) { - struct dccp_client *dc; + struct sender_client *sc; int ret, fd; if (listen_fd < 0 || !FD_ISSET(listen_fd, rfds)) @@ -78,161 +65,51 @@ static void dccp_post_select(fd_set *rfds, __a_unused fd_set *wfds) * reduce processing costs a bit. See analogous comment in dccp_recv.c. */ if (shutdown(fd, SHUT_RD) < 0) { - PARA_ERROR_LOG("shutdown(SHUT_RD): %s\n", strerror(errno)); + ret = -ERRNO_TO_PARA_ERROR(errno); goto err; } ret = mark_fd_nonblocking(fd); - if (ret < 0) { - PARA_ERROR_LOG("%s\n", para_strerror(-ret)); + if (ret < 0) goto err; - } - dc = para_calloc(sizeof(struct dccp_client)); - dc->fd = fd; - dc->name = make_message("%s", remote_name(dc->fd)); - PARA_NOTICE_LOG("connection from %s\n", dc->name); - para_list_add(&dc->node, &clients); - add_close_on_fork_list(dc->fd); - dc->cq = cq_new(DCCP_MAX_PENDING_BYTES); + sc = para_calloc(sizeof(*sc)); + sc->fd = fd; + sc->name = make_message("%s", remote_name(sc->fd)); + PARA_NOTICE_LOG("connection from %s\n", sc->name); + para_list_add(&sc->node, &clients); + add_close_on_fork_list(sc->fd); + sc->cq = cq_new(DCCP_MAX_PENDING_BYTES); return; err: + PARA_ERROR_LOG("%s\n", para_strerror(-ret)); close(fd); } -static int dccp_open(void) -{ - int ret = para_listen(AF_UNSPEC, IPPROTO_DCCP, conf.dccp_port_arg); - - if (ret < 0) - return ret; - listen_fd = ret; - ret = mark_fd_nonblocking(listen_fd); - if (ret < 0) { - PARA_EMERG_LOG("%s\n", para_strerror(-ret)); - exit(EXIT_FAILURE); - } - add_close_on_fork_list(listen_fd); - return 1; -} - -static void dccp_shutdown_client(struct dccp_client *dc) -{ - PARA_DEBUG_LOG("shutting down %s (fd %d)\n", dc->name, dc->fd); - free(dc->name); - close(dc->fd); - del_close_on_fork_list(dc->fd); - cq_destroy(dc->cq); - list_del(&dc->node); - free(dc); -} - -/* - * ret: Negative on errors, zero if nothing was written and write returned - * EAGAIN, number of bytes written else. - */ -static int dccp_write(int fd, const char *buf, size_t len) -{ - size_t written = 0; - int ret = 0; - - while (written < len) { - ret = write(fd, buf + written, PARA_MIN(1024, len - written)); - /* - * Error handling: CCID3 has a sending wait queue which fills - * up and is emptied asynchronously. The EAGAIN case means that - * there is currently no space in the wait queue, but this can - * change at any moment and is thus not an error condition. - */ - if (ret < 0 && errno == EAGAIN) - return written; - if (ret < 0) - return -ERRNO_TO_PARA_ERROR(errno); - written += ret; - } - return written; -} - -static int queue_chunk_or_shutdown(struct dccp_client *dc, long unsigned chunk_num, - size_t sent) -{ - int ret = cq_enqueue(dc->cq, chunk_num, sent); - if (ret < 0) { - PARA_NOTICE_LOG("enqueue error\n"); - dccp_shutdown_client(dc); - } - return ret; -} - -static int send_queued_chunks(struct dccp_client *dc) -{ - struct queued_chunk *qc; - while ((qc = cq_peek(dc->cq))) { - char *buf; - size_t len; - int ret; - cq_get(qc, &buf, &len); - ret = dccp_write(dc->fd, buf, len); - if (ret < 0) - return ret; - cq_update(dc->cq, ret); - if (ret != len) - return 1; - cq_dequeue(dc->cq); - } - return 1; -} - static void dccp_send(long unsigned current_chunk, __a_unused long unsigned chunks_sent, const char *buf, size_t len) { - struct dccp_client *dc, *tmp; - int ret; - char *header_buf; + struct sender_client *sc, *tmp; - list_for_each_entry_safe(dc, tmp, &clients, node) { - if (!dc->header_sent && current_chunk) { - size_t header_len; - header_buf = vss_get_header(&header_len); - if (header_buf && header_len > 0) { - if (queue_chunk_or_shutdown(dc, -1U, 0) < 0) - continue; - } - dc->header_sent = 1; - } - ret = send_queued_chunks(dc); - if (ret < 0) { - dccp_shutdown_client(dc); - continue; - } - if (!len) - continue; -// PARA_DEBUG_LOG("writing %d bytes to fd %d\n", len, dc->fd); - ret = dccp_write(dc->fd, buf, len); - if (ret < 0) { - PARA_NOTICE_LOG("%s\n", para_strerror(-ret)); - dccp_shutdown_client(dc); - continue; - } - if (ret != len) - queue_chunk_or_shutdown(dc, current_chunk, ret); - } + list_for_each_entry_safe(sc, tmp, &clients, node) + send_chunk(sc, DCCP_MAX_BYTES_PER_WRITE, current_chunk, buf, + len); } static void dccp_shutdown_clients(void) { - struct dccp_client *dc, *tmp; + struct sender_client *sc, *tmp; - list_for_each_entry_safe(dc, tmp, &clients, node) - dccp_shutdown_client(dc); + list_for_each_entry_safe(sc, tmp, &clients, node) + shutdown_client(sc); } static char *dccp_info(void) { static char *buf; int num_clients = 0; - struct dccp_client *dc, *tmp; + struct sender_client *sc, *tmp; free(buf); - list_for_each_entry_safe(dc, tmp, &clients, node) + list_for_each_entry_safe(sc, tmp, &clients, node) num_clients++; buf = make_message("dccp connected clients: %d\n", num_clients); return buf; @@ -244,9 +121,9 @@ static char *dccp_help(void) } /** - * the init function of the dccp sender + * The init function of the dccp sender. * - * \param s pointer to the dccp sender struct + * \param s pointer to the dccp sender struct. * * It initializes all function pointers of \a s and starts * listening on the given port. @@ -268,7 +145,9 @@ void dccp_send_init(struct sender *s) s->client_cmds[SENDER_ALLOW] = NULL; s->client_cmds[SENDER_ADD] = NULL; s->client_cmds[SENDER_DELETE] = NULL; - ret = dccp_open(); + ret = open_sender(IPPROTO_DCCP, conf.dccp_port_arg); if (ret < 0) PARA_ERROR_LOG("%s\n", para_strerror(-ret)); + else + listen_fd = ret; } diff --git a/error.h b/error.h index ece054b4..3a05fe04 100644 --- a/error.h +++ b/error.h @@ -28,6 +28,7 @@ DEFINE_ERRLIST_OBJECT_ENUM; #define RBTREE_ERRORS #define RECV_ERRORS #define ACL_ERRORS +#define SEND_COMMON_ERRORS extern const char **para_errlist[]; diff --git a/http_send.c b/http_send.c index 2646ebb8..b629d957 100644 --- a/http_send.c +++ b/http_send.c @@ -18,8 +18,8 @@ #include "server.h" #include "http.h" #include "vss.h" -#include "send.h" #include "list.h" +#include "send.h" #include "close_on_fork.h" #include "net.h" #include "fd.h" diff --git a/ortp_send.c b/ortp_send.c index 6a9d19c7..96f4b403 100644 --- a/ortp_send.c +++ b/ortp_send.c @@ -17,8 +17,8 @@ #include "afs.h" #include "server.h" #include "vss.h" -#include "send.h" #include "list.h" +#include "send.h" #include "ortp.h" /** Convert in_addr to ascii. */ diff --git a/send.h b/send.h index 94d8afd8..69ebd377 100644 --- a/send.h +++ b/send.h @@ -87,3 +87,24 @@ struct sender { */ int (*client_cmds[NUM_SENDER_CMDS])(struct sender_command_data*); }; + +/** Describes one client, connected to a paraslash sender. */ +struct sender_client { + /** The file descriptor of the client. */ + int fd; + /** The socket "name" of the client. */ + char *name; + /** The position of this client in the client list. */ + struct list_head node; + /** Non-zero if audio file header has been sent. */ + int header_sent; + /** The list of pending chunks for this client. */ + struct chunk_queue *cq; + /** Data specific to the particular sender. */ + void *private_data; +}; + +int open_sender(unsigned l4type, int port); +void shutdown_client(struct sender_client *sc); +void send_chunk(struct sender_client *sc, size_t max_bytes_per_write, + long unsigned current_chunk, const char *buf, size_t len); diff --git a/send_common.c b/send_common.c new file mode 100644 index 00000000..d6c734b5 --- /dev/null +++ b/send_common.c @@ -0,0 +1,191 @@ +/* + * Copyright (C) 2005-2008 Andre Noll + * + * Licensed under the GPL v2. For licencing details see COPYING. + */ + +/** \file send_common.c Functions used by more than one paraslash sender. */ + +#include +#include "para.h" +#include "error.h" +#include "string.h" +#include "fd.h" +#include "net.h" +#include "list.h" +#include "afh.h" +#include "afs.h" +#include "server.h" +#include "send.h" +#include "close_on_fork.h" +#include "chunk_queue.h" +#include "vss.h" + + +/** + * Open a passive socket of given layer4 type. + * + * Set the resultig file descriptor to nonblocking mode and add it to the list + * of fds that are being closed in the child process when the server calls + * fork(). + * + * \param l4type The transport-layer protocol. + * \param port The port number. + * + * \return The listening fd on success, negative on errors. + */ +int open_sender(unsigned l4type, int port) +{ + int fd, ret = para_listen(AF_UNSPEC, l4type, port); + + if (ret < 0) + return ret; + fd = ret; + ret = mark_fd_nonblocking(fd); + if (ret < 0) { + close(fd); + return ret; + } + add_close_on_fork_list(fd); + return fd; +} + +/** + * Shut down a connected client. + * + * \param sc The client to be shut down. + * + * Close the file descriptor given by \a sc, remove it from the close-on-fork + * list, destroy the chunk queue of this client, delete the client from the + * list of connected clients and free the sender_client struct. + */ +void shutdown_client(struct sender_client *sc) +{ + PARA_INFO_LOG("shutting down %s on fd %d\n", sc->name, sc->fd); + free(sc->name); + close(sc->fd); + del_close_on_fork_list(sc->fd); + cq_destroy(sc->cq); + list_del(&sc->node); + free(sc->private_data); + free(sc); +} + +/** + * Write a buffer to a non-blocking file descriptor. + * + * \param fd The file descriptor. + * \param buf the buffer to write. + * \param len the number of bytes of \a buf. + * \param max_bytes_per_write Do not write more than that many bytes at once. + * + * If \a max_bytes_per_write is non-zero, do not send more than that many bytes + * per write(). + * + * EAGAIN is not considered an error condition. For example CCID3 has a + * sending wait queue which fills up and is emptied asynchronously. The EAGAIN + * case means that there is currently no space in the wait queue, but this can + * change at any moment. + * + * \return Negative on errors, number of bytes written else. + */ +static int write_nonblock(int fd, const char *buf, size_t len, + size_t max_bytes_per_write) +{ + size_t written = 0; + int ret = 0; + + while (written < len) { + size_t num = len - written; + + if (max_bytes_per_write && max_bytes_per_write < num) + num = max_bytes_per_write; + ret = write(fd, buf + written, num); + if (ret < 0 && errno == EAGAIN) + return written; + if (ret < 0) + return -ERRNO_TO_PARA_ERROR(errno); + written += ret; + } + return written; +} + +static int queue_chunk_or_shutdown(struct sender_client *sc, + long unsigned chunk_num, size_t sent) +{ + int ret = cq_enqueue(sc->cq, chunk_num, sent); + if (ret < 0) + shutdown_client(sc); + return ret; +} + +/* return: negative on errors, zero if not everything was sent, one otherwise */ +static int send_queued_chunks(struct sender_client *sc, + size_t max_bytes_per_write) +{ + struct queued_chunk *qc; + while ((qc = cq_peek(sc->cq))) { + char *buf; + size_t len; + int ret; + cq_get(qc, &buf, &len); + ret = write_nonblock(sc->fd, buf, len, max_bytes_per_write); + if (ret < 0) + return ret; + cq_update(sc->cq, ret); + if (ret != len) + return 0; + cq_dequeue(sc->cq); + } + return 1; +} + +/** + * Send one chunk of audio data to a connected client. + * + * \param sc The client. + * \param max_bytes_per_write Split writes to chunks of at most that many bytes. + * \param current_chunk The number of the chunk to write. + * \param buf The data to write. + * \param len The number of bytes of \a buf. + * + * On errors, the client is shut down. If only a part of the buffer could be + * written, the remainder is put into the chunk queue for that client. + */ +void send_chunk(struct sender_client *sc, size_t max_bytes_per_write, + long unsigned current_chunk, const char *buf, size_t len) +{ + int ret; + + if (!sc->header_sent && current_chunk) { + size_t header_len; + char *header_buf = vss_get_header(&header_len); + if (header_buf && header_len > 0) { + ret = queue_chunk_or_shutdown(sc, -1U, 0); + if (ret < 0) + goto out; + } + sc->header_sent = 1; + } + ret = send_queued_chunks(sc, max_bytes_per_write); + if (ret < 0) { + shutdown_client(sc); + goto out; + } + if (!len) + goto out; + if (!ret) { /* still data left in the queue */ + ret = queue_chunk_or_shutdown(sc, current_chunk, 0); + goto out; + } + ret = write_nonblock(sc->fd, buf, len, max_bytes_per_write); + if (ret < 0) { + shutdown_client(sc); + goto out; + } + if (ret != len) + ret = queue_chunk_or_shutdown(sc, current_chunk, ret); +out: + if (ret < 0) + PARA_NOTICE_LOG("%s\n", para_strerror(-ret)); +} diff --git a/server.c b/server.c index f6bc07d6..aa8fdda5 100644 --- a/server.c +++ b/server.c @@ -77,12 +77,12 @@ #include "vss.h" #include "config.h" #include "close_on_fork.h" +#include "list.h" #include "send.h" #include "net.h" #include "daemon.h" #include "ipc.h" #include "fd.h" -#include "list.h" #include "sched.h" #include "signal.h" #include "user_list.h" diff --git a/vss.c b/vss.c index 1f1f6c16..d74cec3c 100644 --- a/vss.c +++ b/vss.c @@ -25,6 +25,7 @@ #include "net.h" #include "server.cmdline.h" #include "vss.h" +#include "list.h" #include "send.h" #include "ipc.h" #include "fd.h"