From: Andre Noll Date: Mon, 22 Oct 2007 20:23:31 +0000 (+0200) Subject: Switch to the new afs. X-Git-Tag: v0.3.0~272 X-Git-Url: http://git.tue.mpg.de/?a=commitdiff_plain;h=61250cf03241bf73662dac3753e44660a572fa2a;p=paraslash.git Switch to the new afs. This obsoletes get_audio_file() of vss.c. It is being replaced by a call to the afs layer from vss_post_select() which requests a struct audio_format_data for the next audio file. The afd struct, the chunk table and the path of the new audio file are stored in a shared memory area. The id of that area is sent through the afs-server socket. An open fd for the underlying audio file is send to the server process as well using usual socket magic. The vss task of the server process attaches the shared memory area and mmaps the open fd to start audio streaming. The code is still quite buggy, but let's do bug fixes and removal of the old audio file selectors in susequent patches. --- diff --git a/aac_afh.c b/aac_afh.c index d0d437e9..76b2849d 100644 --- a/aac_afh.c +++ b/aac_afh.c @@ -11,10 +11,11 @@ /** \file aac_afh.c para_server's aac audio format handler */ #include "para.h" -#include "afh.h" -#include "server.h" #include "error.h" #include "string.h" +#include "afh.h" +#include "afs.h" +#include "server.h" #include "aac.h" static int aac_find_stsz(unsigned char *buf, size_t buflen, off_t *skip) diff --git a/afs.c b/afs.c index c4e8d169..2e11539c 100644 --- a/afs.c +++ b/afs.c @@ -10,15 +10,15 @@ #include #include "server.cmdline.h" #include "para.h" +#include "error.h" #include "string.h" #include "afh.h" +#include "afs.h" #include "server.h" -#include "error.h" #include /* readdir() */ #include #include #include "net.h" -#include "afs.h" #include "ipc.h" #include "list.h" #include "sched.h" @@ -75,6 +75,11 @@ struct command_task { struct task task; }; +static int server_socket; +static struct command_task command_task_struct; +static struct signal_task signal_task_struct; + + /** * A random number used to "authenticate" the connection. * @@ -425,6 +430,40 @@ int stdin_command(int fd, struct osl_object *arg_obj, callback_function *f, return ret; } +int pass_afd(int fd, char *buf, size_t size) +{ + struct msghdr msg = {.msg_iov = NULL}; + struct cmsghdr *cmsg; + char control[255]; + int ret; + struct iovec iov; + + iov.iov_base = buf; + iov.iov_len = size; + + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + + msg.msg_control = control; + msg.msg_controllen = sizeof(control); + + cmsg = CMSG_FIRSTHDR(&msg); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SCM_RIGHTS; + cmsg->cmsg_len = CMSG_LEN(sizeof(int)); + *(int *)CMSG_DATA(cmsg) = fd; + + /* Sum of the length of all control messages in the buffer */ + msg.msg_controllen = cmsg->cmsg_len; + PARA_NOTICE_LOG("passing %zu bytes and fd %d\n", size, fd); + ret = sendmsg(server_socket, &msg, 0); + if (ret < 0) { + ret = -ERRNO_TO_PARA_ERROR(errno); + return ret; + } + return 1; +} + /** * Open the audio file with highest score. * @@ -437,33 +476,35 @@ int stdin_command(int fd, struct osl_object *arg_obj, callback_function *f, * * \sa close_audio_file(), open_and_update_audio_file(). */ -int open_next_audio_file(struct audio_file_data *afd) +int open_next_audio_file(void) { struct osl_row *aft_row; - int ret; - for (;;) { - ret = score_get_best(&aft_row, &afd->score); - if (ret < 0) - return ret; - ret = open_and_update_audio_file(aft_row, afd); - if (ret >= 0) - return ret; - } -} + struct audio_file_data afd; + int ret, shmid; + char buf[8]; -/** - * Free all resources which were allocated by open_next_audio_file(). - * - * \param afd The structure previously filled in by open_next_audio_file(). - * - * \return The return value of the underlying call to para_munmap(). - * - * \sa open_next_audio_file(). - */ -int close_audio_file(struct audio_file_data *afd) -{ - free(afd->afhi.chunk_table); - return para_munmap(afd->map.data, afd->map.size); + PARA_NOTICE_LOG("getting next af\n"); + ret = score_get_best(&aft_row, &afd.score); + if (ret < 0) + return ret; + ret = open_and_update_audio_file(aft_row, &afd); + if (ret < 0) + return ret; + shmid = ret; + PARA_NOTICE_LOG("shmid: %u\n", shmid); + if (!write_ok(server_socket)) { + PARA_EMERG_LOG("afs_socket not writable\n"); + goto destroy; + } + *(uint32_t *)buf = NEXT_AUDIO_FILE; + *(uint32_t *)(buf + 4) = (uint32_t)shmid; + ret = pass_afd(afd.fd, buf, 8); + if (ret >= 0) + return ret; + PARA_ERROR_LOG("%s\n", PARA_STRERROR(-ret)); +destroy: + shm_destroy(shmid); + return ret; } static enum play_mode init_admissible_files(void) @@ -572,10 +613,6 @@ static int open_afs_tables(void) return ret; } -static int server_socket; -static struct command_task command_task_struct; -static struct signal_task signal_task_struct; - static void unregister_tasks(void) { unregister_task(&command_task_struct.task); @@ -639,6 +676,7 @@ static void command_pre_select(struct sched *s, struct task *t) struct command_task *ct = t->private_data; struct afs_client *client; + para_fd_set(server_socket, &s->rfds, &s->max_fileno); para_fd_set(ct->fd, &s->rfds, &s->max_fileno); list_for_each_entry(client, &afs_client_list, node) para_fd_set(client->fd, &s->rfds, &s->max_fileno); @@ -700,6 +738,27 @@ out: return ret; } +static void execute_server_command(void) +{ + char buf[8]; + int ret = recv_bin_buffer(server_socket, buf, sizeof(buf) - 1); + + if (ret <= 0) { + if (ret < 0) + PARA_ERROR_LOG("%s\n", PARA_STRERROR(-ret)); + return; + } + buf[ret] = '\0'; + PARA_NOTICE_LOG("received: %s\n", buf); + if (!strcmp(buf, "new")) { + ret = open_next_audio_file(); + PARA_NOTICE_LOG("ret: %d\n", ret); + return; + } + PARA_ERROR_LOG("unknown command\n"); + +} + static void execute_afs_command(int fd, uint32_t expected_cookie) { uint32_t cookie; @@ -741,7 +800,10 @@ static void command_post_select(struct sched *s, struct task *t) struct sockaddr_un unix_addr; struct afs_client *client, *tmp; - /* First, check the list of connected clients. */ + if (FD_ISSET(server_socket, &s->rfds)) + execute_server_command(); + + /* Check the list of connected clients. */ list_for_each_entry_safe(client, tmp, &afs_client_list, node) { if (FD_ISSET(client->fd, &s->rfds)) execute_afs_command(client->fd, ct->cookie); @@ -756,7 +818,7 @@ static void command_post_select(struct sched *s, struct task *t) list_del(&client->node); free(client); } - /* Next, accept connections on the local socket. */ + /* Accept connections on the local socket. */ if (!FD_ISSET(ct->fd, &s->rfds)) goto out; t->ret = para_accept(ct->fd, &unix_addr, sizeof(unix_addr)); diff --git a/afs.cmd b/afs.cmd index 15a30d0b..92f2e56a 100644 --- a/afs.cmd +++ b/afs.cmd @@ -3,7 +3,7 @@ SF: afs.c aft.c attribute.c HC: Prototypes for the commands of the audio file selector. CC: Array of commands for the audio file selector. AT: server_command -IN: para afh server list user_list +IN: para error string afh afs server list user_list SN: list of afs commands TM: mood lyr img pl --- diff --git a/afs.h b/afs.h index db931d98..61630726 100644 --- a/afs.h +++ b/afs.h @@ -72,11 +72,16 @@ enum play_mode {PLAY_MODE_MOOD, PLAY_MODE_PLAYLIST}; struct audio_file_data { enum play_mode current_play_mode; + int fd; long score; struct afs_info afsi; struct audio_format_info afhi; char *path; - struct osl_object map; +}; + +enum afs_server_code { + NEXT_AUDIO_FILE, + AFD_CHANGE }; /** Flags passed to for_each_matching_row(). */ @@ -124,7 +129,6 @@ int send_option_arg_callback_request(struct osl_object *options, int stdin_command(int fd, struct osl_object *arg_obj, callback_function *f, unsigned max_len, struct osl_object *result); int string_compare(const struct osl_object *obj1, const struct osl_object *obj2); -int open_next_audio_file(struct audio_file_data *afd); int close_audio_file(struct audio_file_data *afd); int for_each_matching_row(struct pattern_match_data *pmd); @@ -151,6 +155,7 @@ int get_attribute_text(uint64_t *atts, const char *delim, char **text); void aft_init(struct afs_table *t); int aft_get_row_of_path(const char *path, struct osl_row **row); int open_and_update_audio_file(struct osl_row *aft_row, struct audio_file_data *afd); +int load_afd(int shmid, struct audio_file_data *afd); int load_afsi(struct afs_info *afsi, struct osl_object *obj); void save_afsi(struct afs_info *afsi, struct osl_object *obj); int get_afsi_of_row(const struct osl_row *row, struct afs_info *afsi); diff --git a/afs_common.c b/afs_common.c index 17b4e16f..c45b3d90 100644 --- a/afs_common.c +++ b/afs_common.c @@ -12,16 +12,17 @@ #include "para.h" +#include "error.h" +#include "string.h" #include "fd.h" #include "server.cmdline.h" #include "afh.h" +#include "afs.h" #include "server.h" #include "vss.h" #include /* readdir() */ #include /* stat */ #include /* mode_t */ -#include "error.h" -#include "string.h" /** * Traverse the given directory recursively. diff --git a/aft.c b/aft.c index aae6d66b..ff2691f9 100644 --- a/aft.c +++ b/aft.c @@ -17,6 +17,7 @@ #include "net.h" #include "vss.h" #include "fd.h" +#include "ipc.h" static struct osl_table *audio_file_table; @@ -372,11 +373,23 @@ enum chunk_info_offsets{ CHUNK_TABLE_OFFSET = 20, }; -/* TODO: audio format handlers could just produce this */ -static void save_chunk_info(struct audio_format_info *afhi, char *buf) +static void save_chunk_table(struct audio_format_info *afhi, char *buf) { int i; + for (i = 0; i < afhi->chunks_total; i++) + write_u32(buf + 4 * i, afhi->chunk_table[i]); +} +static void load_chunk_table(struct audio_format_info *afhi, char *buf) +{ + int i; + for (i = 0; i < afhi->chunks_total; i++) + afhi->chunk_table[i] = read_u32(buf + 4 * i); +} + +/* TODO: audio format handlers could just produce this */ +static void save_chunk_info(struct audio_format_info *afhi, char *buf) +{ if (!afhi) return; write_u32(buf + CHUNKS_TOTAL_OFFSET, afhi->chunks_total); @@ -384,14 +397,12 @@ static void save_chunk_info(struct audio_format_info *afhi, char *buf) write_u32(buf + HEADER_OFFSET_OFFSET, afhi->header_offset); write_u32(buf + CHUNK_TV_TV_SEC_OFFSET, afhi->chunk_tv.tv_sec); write_u32(buf + CHUNK_TV_TV_USEC, afhi->chunk_tv.tv_usec); - for (i = 0; i < afhi->chunks_total; i++) - write_u32(buf + CHUNK_TABLE_OFFSET + 4 * i, afhi->chunk_table[i]); + save_chunk_table(afhi, buf + CHUNK_TABLE_OFFSET); } static int load_chunk_info(struct osl_object *obj, struct audio_format_info *afhi) { char *buf = obj->data; - int i; if (obj->size < CHUNK_TABLE_OFFSET) return -E_BAD_DATA_SIZE; @@ -405,8 +416,7 @@ static int load_chunk_info(struct osl_object *obj, struct audio_format_info *afh if (afhi->chunks_total * 4 + CHUNK_TABLE_OFFSET > obj->size) return -E_BAD_DATA_SIZE; afhi->chunk_table = para_malloc(afhi->chunks_total * sizeof(size_t)); - for (i = 0; i < afhi->chunks_total; i++) - afhi->chunk_table[i] = read_u32(buf + CHUNK_TABLE_OFFSET + 4 * i); + load_chunk_table(afhi, buf + CHUNK_TABLE_OFFSET); return 1; } @@ -581,6 +591,7 @@ int get_afhi_of_row(const struct osl_row *row, struct audio_format_info *afhi) return 1; } +#if 0 /** * Get the chunk table of an audio file, given a row of the audio file table. * @@ -602,6 +613,58 @@ static int get_chunk_table_of_row(const struct osl_row *row, struct audio_format osl_close_disk_object(&obj); return ret; } +#endif + +/* returns shmid on success */ +static int save_afd(struct audio_file_data *afd) +{ + size_t path_size = strlen(afd->path) + 1; + size_t size = sizeof(*afd) + path_size + + 4 * afd->afhi.chunks_total; + + PARA_NOTICE_LOG("size: %zu\n", size); + int shmid, ret = shm_new(size); + void *shm_afd; + char *buf; + + if (ret < 0) + return ret; + shmid = ret; + ret = shm_attach(shmid, ATTACH_RW, &shm_afd); + if (ret < 0) + goto err; + *(struct audio_file_data *)shm_afd = *afd; + buf = shm_afd; + buf += sizeof(*afd); + strcpy(buf, afd->path); + buf += path_size; + save_chunk_table(&afd->afhi, buf); + shm_detach(shm_afd); + return shmid; +err: + shm_destroy(shmid); + return ret; +} + +int load_afd(int shmid, struct audio_file_data *afd) +{ + void *shm_afd; + char *buf; + int ret; + + ret = shm_attach(shmid, ATTACH_RO, &shm_afd); + if (ret < 0) + return ret; + *afd = *(struct audio_file_data *)shm_afd; + buf = shm_afd; + buf += sizeof(*afd); + afd->path = para_strdup(buf); + buf += strlen(buf) + 1; + afd->afhi.chunk_table = para_malloc(afd->afhi.chunks_total * sizeof(size_t)); + load_chunk_table(&afd->afhi, buf); + shm_detach(shm_afd); + return 1; +} /** * Mmap the given audio file and update statistics. @@ -622,6 +685,7 @@ int open_and_update_audio_file(struct osl_row *aft_row, struct audio_file_data * struct afs_info new_afsi; int ret = get_hash_of_row(aft_row, &aft_hash); struct afsi_change_event_data aced; + struct osl_object map, chunk_table_obj; if (ret < 0) return ret; @@ -637,29 +701,39 @@ int open_and_update_audio_file(struct osl_row *aft_row, struct audio_file_data * ret = get_afhi_of_row(aft_row, &afd->afhi); if (ret < 0) return ret; - ret = get_chunk_table_of_row(aft_row, &afd->afhi); + ret = osl_open_disk_object(audio_file_table, aft_row, + AFTCOL_CHUNKS, &chunk_table_obj); if (ret < 0) return ret; - ret = mmap_full_file(afd->path, O_RDONLY, &afd->map.data, - &afd->map.size, NULL); + ret = mmap_full_file(afd->path, O_RDONLY, &map.data, + &map.size, &afd->fd); if (ret < 0) goto err; - hash_function(afd->map.data, afd->map.size, file_hash); - ret = -E_HASH_MISMATCH; - if (hash_compare(file_hash, aft_hash)) + hash_function(map.data, map.size, file_hash); + ret = hash_compare(file_hash, aft_hash); + para_munmap(map.data, map.size); + if (ret) { + ret = -E_HASH_MISMATCH; goto err; + } new_afsi = afd->afsi; new_afsi.num_played++; new_afsi.last_played = time(NULL); save_afsi(&new_afsi, &afsi_obj); /* in-place update */ + ret = load_chunk_info(&chunk_table_obj, &afd->afhi); + if (ret < 0) + goto err; + aced.aft_row = aft_row; aced.old_afsi = &afd->afsi; afs_event(AFSI_CHANGE, NULL, &aced); - - return ret; -err: + ret = save_afd(afd); + if (ret < 0) + goto err; free(afd->afhi.chunk_table); +err: + osl_close_disk_object(&chunk_table_obj); return ret; } @@ -1550,7 +1624,7 @@ static int add_one_audio_file(const char *path, const void *private_data) const struct private_add_data *pad = private_data; struct audio_format_info afhi, *afhi_ptr = NULL; struct osl_row *pb = NULL, *hs = NULL; /* path brother/hash sister */ - struct osl_object map, obj = {.data = NULL}, query, result; + struct osl_object map, obj = {.data = NULL}, query, result = {.data = NULL}; HASH_TYPE hash[HASH_SIZE]; afhi.header_offset = 0; @@ -1617,7 +1691,7 @@ static int add_one_audio_file(const char *path, const void *private_data) save_audio_file_info(hash, path, afhi_ptr, pad->flags, format_num, &obj); /* Ask afs to consider this entry for adding. */ ret = send_callback_request(com_add_callback, &obj, &result); - if (result.data && result.size) { + if (ret >= 0 && result.data && result.size) { ret2 = send_va_buffer(pad->fd, "%s", (char *)result.data); free(result.data); if (ret >= 0 && ret2 < 0) diff --git a/command.c b/command.c index 97924f22..a84d3b86 100644 --- a/command.c +++ b/command.c @@ -13,17 +13,18 @@ #include #include "para.h" +#include "error.h" #include "server.cmdline.h" #include "afs_common.h" +#include "string.h" #include "afh.h" +#include "afs.h" #include "server.h" #include "vss.h" #include "send.h" #include "rc4.h" -#include "error.h" #include "net.h" #include "daemon.h" -#include "string.h" #include "fd.h" #include "list.h" #include "user_list.h" diff --git a/dccp_send.c b/dccp_send.c index c506fd96..1ea8de25 100644 --- a/dccp_send.c +++ b/dccp_send.c @@ -16,14 +16,15 @@ #include "para.h" #include "error.h" +#include "string.h" #include "afh.h" +#include "afs.h" #include "server.h" #include "net.h" #include "list.h" #include "vss.h" #include "send.h" #include "dccp.h" -#include "string.h" #include "fd.h" #include "close_on_fork.h" #include "chunk_queue.h" diff --git a/error.h b/error.h index 1c3aaa34..b6670df8 100644 --- a/error.h +++ b/error.h @@ -64,7 +64,6 @@ extern const char **para_errlist[]; PARA_ERROR(STAT, "can not stat file"), \ PARA_ERROR(FSTAT, "fstat error"), \ PARA_ERROR(RENAME, "rename failed"), \ - PARA_ERROR(MUNMAP, "munmap failed"), \ PARA_ERROR(WRITE, "write error"), \ PARA_ERROR(LSEEK, "lseek error"), \ PARA_ERROR(BUSY, "table is busy"), \ @@ -312,6 +311,8 @@ extern const char **para_errlist[]; PARA_ERROR(BAD_AUDIO_FILE_SUFFIX, "unknown suffix"), \ PARA_ERROR(AUDIO_FORMAT, "audio format not recognized"), \ PARA_ERROR(CHUNK, "unable to get chunk"), \ + PARA_ERROR(SHORT_AFS_READ, "short read from afs socket"), \ + PARA_ERROR(BAD_AFS_CODE, "received junk from afs"), \ #define AFS_COMMON_ERRORS \ @@ -409,6 +410,7 @@ extern const char **para_errlist[]; PARA_ERROR(CHDIR_PERM, "insufficient permissions to chdir"), \ PARA_ERROR(EMPTY, "file empty"), \ PARA_ERROR(MMAP, "mmap error"), \ + PARA_ERROR(MUNMAP, "munmap failed"), \ #define WRITE_ERRORS \ diff --git a/fd.c b/fd.c index 83c6cd5b..f9b3d9d0 100644 --- a/fd.c +++ b/fd.c @@ -332,3 +332,47 @@ out: *fd_ptr = fd; return ret; } + +/** + * A wrapper for munmap(2). + * + * \param start The start address of the memory mapping. + * \param length The size of the mapping. + * + * \return Positive on success, \p -E_MUNMAP on errors. + * + * \sa munmap(2), mmap_full_file(). + */ +int para_munmap(void *start, size_t length) +{ + if (munmap(start, length) >= 0) + return 1; + PARA_ERROR_LOG("munmap (%p/%zu) failed: %s\n", start, length, + strerror(errno)); + return -E_MUNMAP; +} + +/** + * check a file descriptor for writability + * + * \param fd the file descriptor + * + * \return positive if fd is ready for writing, zero if it isn't, negative if + * an error occurred. + */ + +int write_ok(int fd) +{ + struct timeval tv = {0, 0}; + fd_set wfds; + int ret; +again: + FD_ZERO(&wfds); + FD_SET(fd, &wfds); + tv.tv_sec = 0; + tv.tv_usec = 0; + ret = select(fd + 1, NULL, &wfds, NULL, &tv); + if (ret < 0 && errno == EINTR) + goto again; + return ret; +} diff --git a/fd.h b/fd.h index 0b25959e..224f832c 100644 --- a/fd.h +++ b/fd.h @@ -20,3 +20,5 @@ int para_fchdir(int fd); int para_chdir(const char *path); int mmap_full_file(const char *filename, int open_mode, void **map, size_t *size, int *fd_ptr); +int para_munmap(void *start, size_t length); +int write_ok(int fd); diff --git a/http_send.c b/http_send.c index 7ee3e556..7035f279 100644 --- a/http_send.c +++ b/http_send.c @@ -10,17 +10,18 @@ #include #include "para.h" +#include "error.h" +#include "string.h" #include "server.cmdline.h" #include "afh.h" +#include "afs.h" #include "server.h" #include "http.h" #include "vss.h" #include "send.h" #include "list.h" #include "close_on_fork.h" -#include "error.h" #include "net.h" -#include "string.h" #include "fd.h" #include "chunk_queue.h" diff --git a/mp3_afh.c b/mp3_afh.c index ae7f117b..e36cdef9 100644 --- a/mp3_afh.c +++ b/mp3_afh.c @@ -17,10 +17,11 @@ */ #include "para.h" -#include "afh.h" -#include "server.h" #include "error.h" +#include "afh.h" #include "string.h" +#include "afs.h" +#include "server.h" /** \cond some defines and structs which are only used in this file */ diff --git a/mysql_selector.c b/mysql_selector.c index eaee16a9..5b145dca 100644 --- a/mysql_selector.c +++ b/mysql_selector.c @@ -12,17 +12,18 @@ /** \endcond */ #include "para.h" +#include "error.h" +#include "string.h" #include "server.cmdline.h" #include "afh.h" +#include "afs.h" #include "server.h" #include "vss.h" #include "afs_common.h" #include #include #include -#include "error.h" #include "net.h" -#include "string.h" #include "list.h" #include "user_list.h" #include "mysql_selector_command_list.h" diff --git a/mysql_selector.cmd b/mysql_selector.cmd index 5bfcc957..a49d6d1e 100644 --- a/mysql_selector.cmd +++ b/mysql_selector.cmd @@ -3,7 +3,7 @@ SF: mysql_selector.c HC: prototypes for the commands of the mysql audio file selector CC: array of commands for the mysql audio file selector AT: server_command -IN: para afh server list user_list +IN: para error string afh afs server list user_list SN: list of mysql selector commands --- N: cam diff --git a/ogg_afh.c b/ogg_afh.c index 71ee7e30..39859c08 100644 --- a/ogg_afh.c +++ b/ogg_afh.c @@ -12,9 +12,10 @@ #include "para.h" #include "afh.h" -#include "server.h" #include "error.h" #include "string.h" +#include "afs.h" +#include "server.h" /** must be big enough to hold header */ #define CHUNK_SIZE 32768 diff --git a/ortp_send.c b/ortp_send.c index 9095a8f8..084fdf57 100644 --- a/ortp_send.c +++ b/ortp_send.c @@ -11,13 +11,15 @@ #include "server.cmdline.h" #include "para.h" +#include "error.h" +#include "string.h" #include "afh.h" +#include "afs.h" #include "server.h" #include "vss.h" #include "send.h" #include "list.h" #include "ortp.h" -#include "string.h" /** \cond convert in_addr to ascii */ #define TARGET_ADDR(oc) inet_ntoa((oc)->addr) @@ -168,6 +170,8 @@ static void ortp_send(long unsigned current_chunk, long unsigned chunks_sent, if (self->status != SENDER_ON) return; + +// PARA_NOTICE_LOG("sending %lu\n", current_chunk); chunk_tv = vss_chunk_time(); if (!chunk_tv) return; diff --git a/osl.h b/osl.h index 1fec9865..cb3194be 100644 --- a/osl.h +++ b/osl.h @@ -183,21 +183,3 @@ ssize_t para_write_all(int fd, const void *buf, size_t size); int para_lseek(int fd, off_t *offset, int whence); int para_write_file(const char *filename, const void *buf, size_t size); -/** - * A wrapper for munmap(2). - * - * \param start The start address of the memory mapping. - * \param length The size of the mapping. - * - * \return Positive on success, \p -E_MUNMAP on errors. - * - * \sa munmap(2), mmap_full_file(). - */ -_static_inline_ int para_munmap(void *start, size_t length) -{ - if (munmap(start, length) >= 0) - return 1; - PARA_ERROR_LOG("munmap (%p/%zu) failed: %s\n", start, length, - strerror(errno)); - return -E_MUNMAP; -} diff --git a/playlist_selector.c b/playlist_selector.c index 6a74a922..2d076ee6 100644 --- a/playlist_selector.c +++ b/playlist_selector.c @@ -8,12 +8,13 @@ #include #include "para.h" +#include "error.h" +#include "string.h" #include "afh.h" +#include "afs.h" #include "server.h" #include "afs_common.h" -#include "error.h" #include "net.h" -#include "string.h" #include "ipc.h" #include "list.h" #include "user_list.h" diff --git a/playlist_selector.cmd b/playlist_selector.cmd index 188b16e1..b8190258 100644 --- a/playlist_selector.cmd +++ b/playlist_selector.cmd @@ -3,7 +3,7 @@ SF: playlist_selector.c HC: prototypes for the commands of the playlist audio file selector CC: array of commands for the playlist audio file selector AT: server_command -IN: para afh server list user_list +IN: para error string afh afs server list user_list SN: list of playlist selector commands --- N: ppl diff --git a/random_selector.c b/random_selector.c index bcbdae69..ac82058d 100644 --- a/random_selector.c +++ b/random_selector.c @@ -8,13 +8,14 @@ #include /* gettimeofday */ #include "para.h" +#include "error.h" +#include "string.h" #include "server.cmdline.h" #include "afh.h" +#include "afs.h" #include "server.h" #include "afs_common.h" -#include "error.h" #include "net.h" -#include "string.h" #include "random_selector_command_list.h" extern struct misc_meta_data *mmd; diff --git a/random_selector.cmd b/random_selector.cmd index ef68cff4..67f07db1 100644 --- a/random_selector.cmd +++ b/random_selector.cmd @@ -3,7 +3,7 @@ SF: random_selector.c HC: prototypes for the commands of the random audio file selector CC: array of commands for the random audio file selector AT: server_command -IN: para afh server list user_list +IN: para error string afh afs server list user_list SN: list of random selector commands --- N: random_info diff --git a/send.h b/send.h index 8b4317e3..94d14cf8 100644 --- a/send.h +++ b/send.h @@ -84,27 +84,3 @@ struct sender { int (*client_cmds[NUM_SENDER_CMDS])(struct sender_command_data*); }; -/** - * check a file descriptor for writability - * - * \param fd the file descriptor - * - * \return positive if fd is ready for writing, zero if it isn't, negative if - * an error occurred. - */ - -static inline int write_ok(int fd) -{ - struct timeval tv = {0, 0}; - fd_set wfds; - int ret; -again: - FD_ZERO(&wfds); - FD_SET(fd, &wfds); - tv.tv_sec = 0; - tv.tv_usec = 0; - ret = select(fd + 1, NULL, &wfds, NULL, &tv); - if (ret < 0 && errno == EINTR) - goto again; - return ret; -} diff --git a/server.c b/server.c index 7171c6e2..5777ec43 100644 --- a/server.c +++ b/server.c @@ -20,25 +20,25 @@ #include #include "para.h" +#include "error.h" #include "server.cmdline.h" #include "afs_common.h" #include "afh.h" +#include "string.h" +#include "afs.h" #include "server.h" #include "vss.h" #include "config.h" #include "close_on_fork.h" #include "send.h" -#include "error.h" #include "net.h" #include "daemon.h" -#include "string.h" #include "ipc.h" #include "fd.h" #include "list.h" #include "sched.h" #include "signal.h" #include "user_list.h" -#include "afs.h" /** define the array of error lists needed by para_server */ INIT_SERVER_ERRLISTS; @@ -352,7 +352,7 @@ out: } uint32_t afs_socket_cookie; -static int afs_socket; +int afs_socket; static pid_t afs_pid; static void init_afs(void) @@ -499,14 +499,14 @@ repeat: /* check socket and signal pipe in any case */ para_fd_set(sockfd, &rfds, &max_fileno); para_fd_set(signal_pipe, &rfds, &max_fileno); - timeout = vss_preselect(); + timeout = vss_preselect(&rfds, &wfds, &max_fileno); status_refresh(); for (i = 0; senders[i].name; i++) { if (senders[i].status != SENDER_ON) continue; if (!senders[i].pre_select) continue; - senders[i].pre_select( &max_fileno, &rfds, &wfds); + senders[i].pre_select(&max_fileno, &rfds, &wfds); } if (selectors[mmd->selector_num].pre_select) { ret = selectors[mmd->selector_num].pre_select(&rfds, &wfds); @@ -515,6 +515,7 @@ repeat: mmd_unlock(); ret = para_select(max_fileno + 1, &rfds, &wfds, timeout); mmd_lock(); + vss_post_select(&rfds, &wfds); if (mmd->selector_change >= 0) change_selector(); if (selectors[mmd->selector_num].post_select) diff --git a/server.cmd b/server.cmd index da895480..62b90fda 100644 --- a/server.cmd +++ b/server.cmd @@ -3,7 +3,7 @@ SF: command.c HC: prototypes for the server command handlers CC: array of server commands AT: server_command -IN: para afh server list user_list +IN: para error string afh afs server list user_list SN: list of server commands --- N: chs diff --git a/server.h b/server.h index 9c98cf62..86d4f173 100644 --- a/server.h +++ b/server.h @@ -59,57 +59,60 @@ struct sender_command_data{ * date. */ struct misc_meta_data { -/** information on the current audio file */ + /** information on the current audio file */ struct audio_format_info afi; -/** the size of the current audio file in bytes */ + /** the size of the current audio file in bytes */ size_t size; -/** the full path of the current audio file */ + /** the full path of the current audio file */ char filename[_POSIX_PATH_MAX]; -/** the last modification file of the current audio file */ + /** the last modification file of the current audio file */ time_t mtime; -/** the number of the current audio format */ + /** the number of the current audio format */ int audio_format; -/** the "old" status flags -- commands may only read them */ + /** the "old" status flags -- commands may only read them */ unsigned int vss_status_flags; -/** the new status flags -- commands may set them **/ + /** The new status flags -- commands may set them. */ unsigned int new_vss_status_flags; -/** the number of data chunks sent for the current audio file */ + /** the number of data chunks sent for the current audio file */ long unsigned chunks_sent; -/** set by the jmp/ff commands to the new position in chunks */ + /** set by the jmp/ff commands to the new position in chunks */ long unsigned repos_request; -/** the number of the chunk currently sent out*/ + /** The number of the chunk currently sent out. */ long unsigned current_chunk; -/** the milliseconds that have been skipped of the current audio file */ + /** the milliseconds that have been skipped of the current audio file */ long offset; -/** the time para_server started to stream */ + /** the time para_server started to stream */ struct timeval stream_start; -/** the event counter - * - * commands may increase this to force a status update to be sent to all - * connected clients -*/ + /** + * The event counter. + * + * Commands may increase this to force a status update to be sent to all + * connected clients. + */ unsigned int events; -/** the number of audio files already sent */ + /** the number of audio files already sent */ unsigned int num_played; -/** the number of executed commands */ + /** the number of executed commands */ unsigned int num_commands; -/** the number of connections para_server received so far */ + /** the number of connections para_server received so far */ unsigned int num_connects; -/** the number of connections currently active */ + /** the number of connections currently active */ unsigned int active_connections; -/** the process id of para_server */ + /** the process id of para_server */ pid_t server_pid; -/** a string that gets filled in by the current audio file selector */ + /** a string that gets filled in by the current audio file selector */ char selector_info[MMD_INFO_SIZE]; -/** the number if the current audio file selector */ + /** The number of the current audio file selector. */ int selector_num; -/** commands set this to non-zero to change the current selector */ + /** commands set this to non-zero to change the current selector */ int selector_change; -/** used by the sender command */ + /** used by the sender command */ struct sender_command_data sender_cmd_data; + struct audio_file_data afd; }; extern struct server_args_info conf; +extern int afs_socket; int handle_connect(int fd, struct sockaddr_in *addr); void mmd_unlock(void); diff --git a/string.c b/string.c index bf107834..8c97ccf7 100644 --- a/string.c +++ b/string.c @@ -63,7 +63,8 @@ __must_check __malloc void *para_malloc(size_t size) void *p = malloc(size); if (!p) { - PARA_EMERG_LOG("%s", "malloc failed, aborting\n"); + PARA_EMERG_LOG("malloc failed (size = %zu), aborting\n", + size); exit(EXIT_FAILURE); } return p; diff --git a/vss.c b/vss.c index 5b7ed320..dbf8547c 100644 --- a/vss.c +++ b/vss.c @@ -17,14 +17,17 @@ #include #include "para.h" +#include "error.h" +#include "string.h" #include "afh.h" +#include "afs.h" #include "server.h" +#include "net.h" #include "server.cmdline.h" #include "afs_common.h" #include "vss.h" #include "send.h" -#include "error.h" -#include "string.h" +#include "ipc.h" #include "fd.h" extern const char *status_item_list[]; @@ -497,6 +500,14 @@ struct timeval *vss_chunk_time(void) return &mmd->afi.chunk_tv; } +enum afs_socket_status { + AFS_SOCKET_READY, + AFS_SOCKET_CHECK_FOR_WRITE, + AFS_SOCKET_AFD_PENDING +}; + +static enum afs_socket_status afsss; + /** * compute the timeout for para_server's main select-loop * @@ -514,12 +525,15 @@ struct timeval *vss_chunk_time(void) * \return A pointer to a struct timeval containing the timeout for the next * chunk of data to be sent, or NULL if we're not sending right now. */ -struct timeval *vss_preselect(void) +struct timeval *vss_preselect(fd_set *rfds, fd_set *wfds, int *max_fileno) { struct audio_format_handler *af = NULL; int i, format; struct timeval *ret; -again: + + para_fd_set(afs_socket, rfds, max_fileno); + +//again: format = mmd->audio_format; if (format >= 0) af = afl + format; @@ -553,12 +567,113 @@ again: if (!ret && !map && vss_playing() && !(mmd->new_vss_status_flags & VSS_NOMORE)) { PARA_DEBUG_LOG("%s", "ready and playing, but no audio file\n"); - vss_get_audio_file(); - goto again; + //vss_get_audio_file(); + if (afsss == AFS_SOCKET_READY) { + para_fd_set(afs_socket, wfds, max_fileno); + afsss = AFS_SOCKET_CHECK_FOR_WRITE; + } +// goto again; } return ret; } +static int recv_afs_msg(int *fd, uint32_t *code, uint32_t *data) +{ + char control[255], buf[8]; + struct msghdr msg = {.msg_iov = NULL}; + struct cmsghdr *cmsg; + struct iovec iov; + int ret = 0; + + iov.iov_base = buf; + iov.iov_len = sizeof(buf); + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + msg.msg_control = control; + msg.msg_controllen = sizeof(control); + memset(buf, 0, sizeof(buf)); + ret = recvmsg(afs_socket, &msg, 0); + if (ret < 0) + return -ERRNO_TO_PARA_ERROR(errno); + if (iov.iov_len != sizeof(buf)) + return -E_SHORT_AFS_READ; + *code = *(uint32_t*)buf; + *data = *(uint32_t*)(buf + 4); + cmsg = CMSG_FIRSTHDR(&msg); + for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) { + if (cmsg->cmsg_level != SOL_SOCKET + || cmsg->cmsg_type != SCM_RIGHTS) + continue; + if ((cmsg->cmsg_len - CMSG_LEN(0)) / sizeof(int) != 1) + continue; + *fd = *(int *)CMSG_DATA(cmsg); + } + return 1; +} + +static void recv_afs_result(void) +{ + int ret, passed_fd = -1, shmid; + uint32_t afs_code = 0, afs_data = 0; + struct stat statbuf; + struct timeval now; + + ret = recv_afs_msg(&passed_fd, &afs_code, &afs_data); + if (ret < 0) + goto err; + PARA_NOTICE_LOG("got the fd: %d, code: %u, shmid: %u\n", + passed_fd, afs_code, afs_data); + ret = -E_BAD_AFS_CODE; + if (afs_code != NEXT_AUDIO_FILE) + goto err; + afsss = AFS_SOCKET_READY; + shmid = afs_data; + ret = load_afd(shmid, &mmd->afd); + if (ret < 0) + goto err; + shm_destroy(shmid); + PARA_NOTICE_LOG("next audio file: %s (%lu chunks)\n", mmd->afd.path, + mmd->afd.afhi.chunks_total); + ret = fstat(passed_fd, &statbuf); + if (ret < 0) { + ret = -ERRNO_TO_PARA_ERROR(errno); + goto err; + } + mmd->size = statbuf.st_size; + mmd->mtime = statbuf.st_mtime; + map = para_mmap(mmd->size, PROT_READ, MAP_PRIVATE, + passed_fd, 0); + strcpy(mmd->filename, mmd->afd.path); /* FIXME: check length */ + mmd->afi.header_len = 0; /* default: no header */ + mmd->audio_format = mmd->afd.afsi.audio_format_id; + mmd->chunks_sent = 0; + mmd->current_chunk = 0; + mmd->offset = 0; + mmd->events++; + mmd->num_played++; + mmd->afi = mmd->afd.afhi; + mmd->new_vss_status_flags &= (~VSS_NEXT); + gettimeofday(&now, NULL); + tv_add(&now, &announce_tv, &data_send_barrier); + return; +err: + if (passed_fd >= 0) + close(passed_fd); + PARA_ERROR_LOG("%s\n", PARA_STRERROR(-ret)); +} + +void vss_post_select(fd_set *rfds, fd_set *wfds) +{ + int ret; + + if (FD_ISSET(afs_socket, rfds)) + recv_afs_result(); + if (afsss != AFS_SOCKET_CHECK_FOR_WRITE || !FD_ISSET(afs_socket, wfds)) + return; + ret = send_buffer(afs_socket, "new"); + afsss = AFS_SOCKET_AFD_PENDING; +} + static void get_chunk(long unsigned chunk_num, char **buf, size_t *len) { size_t pos = mmd->afi.chunk_table[chunk_num]; diff --git a/vss.h b/vss.h index 9b10a5e8..0ef9892c 100644 --- a/vss.h +++ b/vss.h @@ -7,7 +7,8 @@ /** \file vss.h exported functions from vss.c (para_server) */ void vss_init(void); void vss_send_chunk(void); -struct timeval *vss_preselect(void); +struct timeval *vss_preselect(fd_set *rfds, fd_set *wfds, int *max_fileno); +void vss_post_select(fd_set *rfds, fd_set *wfds); const char *audio_format_name(int); unsigned int vss_playing(void); unsigned int vss_next(void);