From: Andre Date: Tue, 21 Feb 2006 14:52:45 +0000 (+0100) Subject: fix the plm database tool X-Git-Tag: v0.2.11~66 X-Git-Url: http://git.tue.mpg.de/?a=commitdiff_plain;h=c5517c98d28a74a89086b0641e3633d16eaf9df7;p=paraslash.git fix the plm database tool (Re)loading the playlist uses the new mutex and shm helpers and is hopefully a race-free implementation. It works as follows: The plm init function (parent, server context) reserves a shared memory area (client data) as well as two unlocked mutexes. The first mutex serializes access for racing clients that want to replace the playlist. The second mutex serializes the access between parent and child (com_lpl()). com_lpl() loads the playlist from the client into a tmp buffer. It then creates a shm area (client_shm) and copies that buffer to the area. Further, another mutex (shm) is created. This mutex is initially locked. Next, it grabs the client lock and the server lock, and updates client_data with the id of client_shm. Then the parent is woken up (SIGUSR1) and the server lock is released. com_lpl() finally goes to sleep by acquiring the shm lock. In this situation, only the parent may run. It grabs the server lock, reads client_data to attach client_shm and reloads the playlist with data from client_shm. When ready, the parent detaches client_shm, and drops the shm lock and the server lock. This wakes up com_lpl() which destroys client_shm, the shm mutex and the tmp buffer. --- diff --git a/db.h b/db.h index 69d4d9ed..3347d3ad 100644 --- a/db.h +++ b/db.h @@ -20,6 +20,13 @@ #include +enum supported_dbtools {DBT_DOPEY, +#ifdef HAVE_MYSQL + DBT_MYSQL, +#endif + NUM_DBTOOLS +}; + int find_audio_files(const char *dirname, int (*f)(const char *, const char *)); /** @@ -112,6 +119,9 @@ int (*pre_select)(fd_set *rfds, fd_set *wfds); * which are ready for reading/writing. */ void (*post_select)(fd_set *rfds, fd_set *wfds); +/** + * each dbtool has its private data pointer */ +void *private_data; }; int mysql_dbtool_init(struct dbtool*); diff --git a/error.h b/error.h index 0c4a05f4..98ca4ec6 100644 --- a/error.h +++ b/error.h @@ -233,6 +233,7 @@ extern const char **para_errlist[]; PARA_ERROR(SEM_REMOVE, "can not remove semaphore"), \ PARA_ERROR(SHM_GET, "failed to allocate shared memory area"), \ PARA_ERROR(SHM_DESTROY, "failed to destroy shared memory area"), \ + PARA_ERROR(SHM_ATTACH, "can not attach shared memory area"), \ PARA_ERROR(SHM_DETACH, "can not detach shared memory area"), \ /* these do not need error handling (yet) */ diff --git a/ipc.c b/ipc.c index 9ae2b3a0..ae9ad574 100644 --- a/ipc.c +++ b/ipc.c @@ -11,7 +11,7 @@ int mutex_new(void) return ret < 0? -E_SEM_GET : ret; } -int mutex_remove(int id) +int mutex_destroy(int id) { int ret = semctl(id, 0, IPC_RMID); return ret < 0? -E_SEM_REMOVE : 1; @@ -92,12 +92,16 @@ int shm_destroy(int id) * * \sa semop(2) */ -void *shm_attach(int id, enum shm_attach_mode mode) +int shm_attach(int id, enum shm_attach_mode mode, void **result) { - if (mode == ATTACH_RW) - return shmat(id, NULL, 0); - return shmat(id, NULL, SHM_RDONLY); + if (mode == ATTACH_RW) { + *result = shmat(id, NULL, 0); + return *result? 1 : -E_SHM_ATTACH; + } + *result = shmat(id, NULL, SHM_RDONLY); + return *result? 1 : -E_SHM_ATTACH; } + int shm_detach(void *addr) { int ret = shmdt(addr); diff --git a/ipc.h b/ipc.h index 25c56d26..733cba6d 100644 --- a/ipc.h +++ b/ipc.h @@ -3,5 +3,10 @@ enum shm_attach_mode {ATTACH_RO, ATTACH_RW}; int mutex_new(void); +int mutex_destroy(int id); void mutex_lock(int id); void mutex_unlock(int id); +int shm_new(size_t size); +int shm_attach(int id, enum shm_attach_mode mode, void **result); +int shm_detach(void *addr); +int shm_destroy(int id); diff --git a/para.h b/para.h index e645cb1c..049262d3 100644 --- a/para.h +++ b/para.h @@ -225,9 +225,3 @@ __printf_2_3 void para_log(int, char*, ...); -enum supported_dbtools {DBT_DOPEY, -#ifdef HAVE_MYSQL - DBT_MYSQL, -#endif - NUM_DBTOOLS -}; diff --git a/plm_dbtool.c b/plm_dbtool.c index fd18f254..4597e15c 100644 --- a/plm_dbtool.c +++ b/plm_dbtool.c @@ -16,21 +16,43 @@ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111, USA. */ -/** \file plm_dbtool.c Simple playlist manager for paraslash */ +/** \file plm_dbtool.c Playlist manager for paraslash */ -#include /* gettimeofday */ -#include "server.cmdline.h" #include "server.h" #include "db.h" #include "error.h" #include "net.h" #include "string.h" +#include "ipc.h" -#define MAX_PLAYLIST_LEN 10000 +struct plm_client_data { + size_t size; +/** allocated and set by com_lpl() (child) */ + int shm_id; +/** initially locked, gets unlocked by parent when it is done */ + int mutex; +/** return value, set by parent */ + int retval; +}; + +/** data specific to the plm database tool */ +struct private_plm_data { +/** guards against concurrent client access */ + int client_mutex; +/** guards against concurrent parent-child access */ + int server_mutex; +/** pointer to the client data */ + struct plm_client_data *client_data; +/** id of the shm corresponding to \a client_data */ + int client_data_shm_id; +}; + +/** we refuse to load playlists bigger than that */ #define MAX_PLAYLIST_BYTES (1024 * 1024) static unsigned playlist_len, playlist_size, current_playlist_entry; static char **playlist; +static struct dbtool *self; static int com_ppl(int, int, char **); static int com_lpl(int, int, char **); @@ -53,8 +75,8 @@ static struct server_command cmds[] = { .description = "load playlist", .synopsis = "lpl", .help = -"Read a new playlist from stdin" - +"Read a new playlist from stdin. Example:\n" +"\tfind /audio -name '*.mp3' | para_client lpl" }, { .name = NULL, } @@ -63,54 +85,95 @@ static struct server_command cmds[] = { static void playlist_add(char *path) { if (playlist_len >= playlist_size) { - if (playlist_size >= MAX_PLAYLIST_LEN) - return; - playlist_size *= 2; + playlist_size = 2 * playlist_size + 1; playlist = para_realloc(playlist, playlist_size * sizeof(char *)); } - PARA_DEBUG_LOG("adding #%d: %s\n", playlist_len, path); - playlist[playlist_len] = para_strdup(path); - playlist_len++; + PARA_DEBUG_LOG("adding #%d/%d: %s\n", playlist_len, playlist_size, path); + playlist[playlist_len++] = para_strdup(path); +} + +static int send_playlist_to_server(const char *buf, size_t size) +{ + struct private_plm_data *ppd = self->private_data; + int ret, shm_mutex = -1, shm_id = -1; + void *shm = NULL; + + PARA_DEBUG_LOG("new playlist (%d bytes)\n", size); + + ret = mutex_new(); + if (ret < 0) + return ret; + shm_mutex = ret; + + ret = shm_new(size); + if (ret < 0) + goto out; + shm_id = ret; + + ret = shm_attach(shm_id, ATTACH_RW, &shm); + if (ret < 0) + goto out; + mutex_lock(shm_mutex); + memcpy(shm, buf, size); + mutex_lock(ppd->client_mutex); + mutex_lock(ppd->server_mutex); + ppd->client_data->size = size; + ppd->client_data->shm_id = shm_id; + ppd->client_data->mutex = shm_mutex; + kill(getppid(), SIGUSR1); /* wake up the server */ + mutex_unlock(ppd->server_mutex); + mutex_lock(shm_mutex); /* wait until server is done */ + mutex_unlock(shm_mutex); + ret = ppd->client_data->retval; + mutex_unlock(ppd->client_mutex); + shm_detach(shm); +out: + if (shm_id >= 0) + shm_destroy(shm_id); + mutex_destroy(shm_mutex); + PARA_DEBUG_LOG("returning %d\n", ret); + return ret; } static int com_lpl(int fd, __unused int argc, __unused char *argv[]) { - unsigned i, loaded = 0; - char buf[_POSIX_PATH_MAX]; + unsigned loaded = 0; + size_t bufsize = 4096; /* guess that's enough */ + char *buf = para_malloc(bufsize); ssize_t ret; - - PARA_DEBUG_LOG("freeing playlist (%d entries)\n", playlist_len); - for (i = 0; i < playlist_len; i++) - free(playlist[i]); - current_playlist_entry = 0; - playlist_len = 0; ret = send_buffer(fd, AWAITING_DATA_MSG); if (ret < 0) - return ret; + goto out; again: - ret = recv_bin_buffer(fd, buf + loaded, sizeof(buf) - loaded); + ret = recv_bin_buffer(fd, buf + loaded, bufsize - loaded); if (ret < 0) - goto err_out; + goto out; if (!ret) { - PARA_DEBUG_LOG("loaded playlist (%d entries)\n", playlist_len); - return playlist_len; + ret = send_playlist_to_server(buf, loaded); + goto out; } loaded += ret; - loaded = for_each_line(buf, loaded, &playlist_add, 0); - if (loaded >= sizeof(buf)) - goto err_out; + ret = -E_LOAD_PLAYLIST; + if (loaded >= MAX_PLAYLIST_BYTES) + goto out; + if (loaded >= bufsize) { + bufsize *= 2; + buf = para_realloc(buf, bufsize); + } goto again; -err_out: - return -E_LOAD_PLAYLIST; +out: + free(buf); + return ret; } static int com_ppl(int fd, __unused int argc, __unused char *argv[]) { unsigned i; - PARA_DEBUG_LOG("sending playlist (%d entries)\n", playlist_len); + PARA_DEBUG_LOG("sending playlist to client (%d entries)\n", playlist_len); for (i = 0; i < playlist_len; i++) { - int ret = send_buffer(fd, playlist[i]); + int ret = send_va_buffer(fd, "%s\n", playlist[ + (i + current_playlist_entry) % playlist_len]); if (ret < 0) return ret; } @@ -122,7 +185,6 @@ static char **plm_get_audio_file_list(unsigned int num) char **file_list; unsigned i; - return NULL; num = MIN(num, playlist_len); if (!num) return NULL; @@ -135,13 +197,74 @@ static char **plm_get_audio_file_list(unsigned int num) return file_list; } +static void free_playlist_contents(void) +{ + int i; + + PARA_DEBUG_LOG("freeing playlist (%d entries)\n", playlist_len); + for (i = 0; i < playlist_len; i++) + free(playlist[i]); + current_playlist_entry = 0; + playlist_len = 0; +} + static void plm_shutdown(void) { - /* free the playlist */ + struct private_plm_data *ppd = self->private_data; + + shm_detach(ppd->client_data); + shm_destroy(ppd->client_data_shm_id); + mutex_destroy(ppd->server_mutex); + mutex_destroy(ppd->client_mutex); + free(ppd); + free_playlist_contents(); + free(playlist); + playlist = NULL; + playlist_len = 0; + playlist_size = 0; +} + +static void plm_post_select(__unused fd_set *rfds, __unused fd_set *wfds) +{ + struct private_plm_data *ppd = self->private_data; + struct plm_client_data *pcd = ppd->client_data; + int ret; + void *shm; + + mutex_lock(ppd->server_mutex); + if (!pcd->size) + goto out; + free_playlist_contents(); + ret = shm_attach(pcd->shm_id, ATTACH_RW, &shm); + if (ret < 0) { + PARA_ERROR_LOG("%s\n", PARA_STRERROR(-ret)); + goto out; + } + PARA_DEBUG_LOG("loading new playlist (%d bytes)\n", pcd->size); + ret = for_each_line((char *)shm, pcd->size, &playlist_add, 0); + shm_detach(shm); + PARA_NOTICE_LOG("new playlist (%d entries)\n", playlist_len); + pcd->retval = 1; + pcd->size = 0; + mutex_unlock(pcd->mutex); +out: + mutex_unlock(ppd->server_mutex); +} + +void plm_update_audio_file(char *audio_file) +{ + unsigned i; + + for (i = 0; i < playlist_len; i++) { + unsigned j = (current_playlist_entry + i) % playlist_len; + if (strcmp(playlist[j], audio_file)) + continue; + current_playlist_entry = (j + 1) % playlist_len; + } } /** - * the init function for the plm database tool + * the init function for the plm database tool * * Init all function pointers of \a db * @@ -149,11 +272,53 @@ static void plm_shutdown(void) */ int plm_dbtool_init(struct dbtool *db) { - playlist = para_calloc(100 * sizeof(char *)); /* guess 100 is enough */ - playlist_size = 100; - sprintf(mmd->dbinfo, "plm initialized"); + int ret; + struct private_plm_data *ppd = NULL; + void *shm = NULL; + + self = db; db->cmd_list = cmds; db->get_audio_file_list = plm_get_audio_file_list; db->shutdown = plm_shutdown; + db->post_select = plm_post_select; + db->update_audio_file = plm_update_audio_file; + ppd = para_calloc(sizeof(struct private_plm_data)); + db->private_data = ppd; + + ppd->client_mutex = -1; + ppd->server_mutex = -1; + ppd->client_data_shm_id = -1; + ppd->client_data = NULL; + + ret = mutex_new(); + if (ret < 0) + goto err_out; + ppd->client_mutex = ret; + + ret = mutex_new(); + if (ret < 0) + goto err_out; + ppd->server_mutex = ret; + + ret = shm_new(sizeof(struct plm_client_data)); + if (ret < 0) + goto err_out; + ppd->client_data_shm_id = ret; + + ret = shm_attach(ppd->client_data_shm_id, ATTACH_RW, &shm); + if (ret < 0) + goto err_out; + ppd->client_data = shm; + ppd->client_data->size = 0; + sprintf(mmd->dbinfo, "plm initialized"); return 1; +err_out: + if (ppd->client_data_shm_id >= 0) + shm_destroy(ppd->client_data_shm_id); + if (ppd->client_mutex >= 0) + mutex_destroy(ppd->client_mutex); + if (ppd->server_mutex >= 0) + mutex_destroy(ppd->server_mutex); + free(ppd); + return ret; } diff --git a/server.c b/server.c index c185b3e1..a6fca8b6 100644 --- a/server.c +++ b/server.c @@ -297,9 +297,9 @@ static void setup_signal_handling(void) ret += para_install_sighandler(SIGTERM); ret += para_install_sighandler(SIGHUP); ret += para_install_sighandler(SIGCHLD); + ret += para_install_sighandler(SIGUSR1); signal(SIGPIPE, SIG_IGN); - signal(SIGUSR1, SIG_IGN); - if (ret != 4) { + if (ret != 5) { PARA_EMERG_LOG("%s", "could not install signal handlers\n"); exit(EXIT_FAILURE); } @@ -503,6 +503,10 @@ repeat: &max_fileno, &rfds, &wfds); } + if (dblist[mmd->dbt_num].pre_select) { + ret = dblist[mmd->dbt_num].pre_select(&rfds, &wfds); + max_fileno = MAX(max_fileno, ret); + } mmd_unlock(); // PARA_DEBUG_LOG("%s: select (max = %i)\n", __func__, max_fileno); ret = select(max_fileno + 1, &rfds, &wfds, NULL, timeout); @@ -511,6 +515,8 @@ repeat: mmd_lock(); if (mmd->dbt_change >= 0) handle_dbt_change(); + if (dblist[mmd->dbt_num].post_select) + dblist[mmd->dbt_num].post_select(&rfds, &wfds); if (ret < 0 && err == EINTR) goto repeat; if (ret < 0) { diff --git a/server.h b/server.h index ba9b39e4..fe6d8581 100644 --- a/server.h +++ b/server.h @@ -147,8 +147,6 @@ struct misc_meta_data{ int dbt_change; /** used by the sender command */ struct sender_command_data sender_cmd_data; -/** each dbtool has its private data pointer */ - void *private_dbtool_data[NUM_DBTOOLS]; };