--- /dev/null
+/*
+ * Copyright (C) 2013 Andre Noll <maan@systemlinux.org>
+ *
+ * Licensed under the GPL v2. For licencing details see COPYING.
+ */
+
+/** \file sync_filter.c Playback synchronization filter. */
+
+#include <netinet/in.h>
+#include <sys/socket.h>
+#include <regex.h>
+#include <sys/types.h>
+#include <arpa/inet.h>
+#include <sys/un.h>
+#include <netdb.h>
+
+#include "para.h"
+#include "sync_filter.cmdline.h"
+#include "list.h"
+#include "net.h"
+#include "sched.h"
+#include "ggo.h"
+#include "buffer_tree.h"
+#include "filter.h"
+#include "string.h"
+#include "fd.h"
+#include "error.h"
+
+struct sync_buddy_info {
+ const char *url;
+ char *host;
+ int port;
+ struct addrinfo *ai;
+ bool disabled;
+};
+
+/* per open/close data */
+struct sync_buddy {
+ int fd;
+ struct sync_buddy_info *sbi;
+ bool ping_received;
+ struct list_head node;
+};
+
+struct sync_filter_context {
+ int listen_fd;
+ struct list_head buddies;
+ struct timeval timeout;
+ bool ping_sent;
+};
+
+struct sync_filter_config {
+ struct sync_filter_args_info *conf;
+ struct sync_buddy_info *buddy_info;
+};
+
+#define FOR_EACH_BUDDY(_buddy, _list) \
+ list_for_each_entry(_buddy, _list, node)
+#define FOR_EACH_BUDDY_SAFE(_buddy, _tmp_buddy, _list) \
+ list_for_each_entry_safe(_buddy, _tmp_buddy, _list, node)
+
+static void sync_close_buddy(struct sync_buddy *buddy)
+{
+ if (buddy->fd < 0)
+ return;
+ PARA_DEBUG_LOG("closing %s\n", buddy->sbi->url);
+ close(buddy->fd);
+ buddy->fd = -1;
+}
+
+static void sync_close_buddies(struct sync_filter_context *ctx)
+{
+ struct sync_buddy *buddy;
+
+ FOR_EACH_BUDDY(buddy, &ctx->buddies)
+ sync_close_buddy(buddy);
+}
+
+static void sync_close(struct filter_node *fn)
+{
+ struct sync_filter_context *ctx = fn->private_data;
+
+ sync_close_buddies(ctx);
+ if (ctx->listen_fd >= 0) {
+ close(ctx->listen_fd);
+ ctx->listen_fd = -1;
+ }
+ free(ctx);
+ fn->private_data = NULL;
+}
+
+static void sync_free_config(void *conf)
+{
+ struct sync_filter_config *sfc = conf;
+ int i;
+
+ for (i = 0; i < sfc->conf->buddy_given; i++) {
+ free(sfc->buddy_info[i].host);
+ freeaddrinfo(sfc->buddy_info[i].ai);
+ }
+ sync_filter_cmdline_parser_free(sfc->conf);
+ free(sfc);
+}
+
+static void sync_open(struct filter_node *fn)
+{
+ int i, ret;
+ struct sync_filter_config *sfc = fn->conf;
+ struct sync_buddy *buddy;
+ struct sync_filter_context *ctx;
+
+ assert(sfc);
+
+ ctx = fn->private_data = para_calloc(sizeof(*ctx));
+ INIT_LIST_HEAD(&ctx->buddies);
+ ctx->listen_fd = -1;
+
+ /* create socket to listen for incoming packets */
+ ret = makesock(
+ IPPROTO_UDP,
+ true /* passive */,
+ NULL /* no host required */,
+ sfc->conf->port_arg,
+ NULL /* no flowopts */
+ );
+ if (ret < 0) {
+ PARA_ERROR_LOG("could not create UDP listening socket %d\n",
+ sfc->conf->port_arg);
+ return;
+ }
+ ctx->listen_fd = ret;
+ PARA_INFO_LOG("listening on fd %d\n", ctx->listen_fd);
+
+ for (i = 0; i < sfc->conf->buddy_given; i++) {
+ struct sync_buddy_info *sbi = sfc->buddy_info + i;
+ const char *url = sfc->conf->buddy_arg[i];
+ int fd;
+
+ /* make buddy udp socket from address info */
+ assert(sbi->ai);
+ ret = makesock_addrinfo(
+ IPPROTO_UDP,
+ false /* not passive */,
+ sbi->ai,
+ NULL /* no flowopts */
+ );
+ if (ret < 0) {
+ PARA_WARNING_LOG("could not make socket for %s\n",
+ url);
+ goto fail;
+ }
+ fd = ret;
+ ret = mark_fd_nonblocking(fd);
+ if (ret < 0) {
+ PARA_ERROR_LOG("unable to set nonblock mode for %s\n",
+ url);
+ close(fd);
+ goto fail;
+ }
+ buddy = para_malloc(sizeof(*buddy));
+ buddy->fd = fd;
+ buddy->sbi = sbi;
+ buddy->ping_received = false;
+ para_list_add(&buddy->node, &ctx->buddies);
+
+ PARA_INFO_LOG("opened buddy %s on fd %d\n", url, fd);
+ continue;
+fail:
+ PARA_WARNING_LOG("%s\n", para_strerror(-ret));
+ }
+}
+
+static int sync_parse_config(int argc, char **argv, void **result)
+{
+ int i, ret, n;
+ struct sync_filter_config *sfc;
+ struct sync_filter_args_info *conf = para_malloc(sizeof(*conf));
+
+ sync_filter_cmdline_parser(argc, argv, conf); /* exits on error */
+ sfc = para_calloc(sizeof(*sfc));
+ sfc->conf = conf;
+ n = conf->buddy_given;
+ sfc->buddy_info = para_malloc((n + 1) * sizeof(*sfc->buddy_info));
+ PARA_INFO_LOG("initializing buddy info array of length %d\n", n);
+ for (i = 0; i < n; i++) {
+ const char *url = conf->buddy_arg[i];
+ size_t len = strlen(url);
+ char *host = para_malloc(len + 1);
+ int port;
+ struct addrinfo *ai;
+ struct sync_buddy_info *sbi = sfc->buddy_info + i;
+
+ if (!parse_url(url, host, len, &port)) {
+ free(host);
+ PARA_ERROR_LOG("could not parse url %s\n", url);
+ ret = -ERRNO_TO_PARA_ERROR(EINVAL);
+ goto fail;
+ }
+ if (port < 0)
+ port = conf->port_arg;
+ ret = lookup_address(IPPROTO_UDP, false /* not passive */,
+ host, port, &ai);
+ if (ret < 0) {
+ PARA_ERROR_LOG("host lookup failure for %s\n", url);
+ free(host);
+ goto fail;
+ }
+ sbi->url = url;
+ sbi->host = host;
+ sbi->port = port;
+ sbi->ai = ai;
+ sbi->disabled = false;
+ PARA_DEBUG_LOG("buddy #%d: %s\n", i, url);
+ }
+ *result = sfc;
+ return 1;
+fail:
+ assert(ret < 0);
+ PARA_ERROR_LOG("%s\n", para_strerror(-ret));
+ sync_free_config(sfc);
+ return ret;
+}
+
+/*
+ * True if we sent a packet to all budies and received a packet from each
+ * enabled buddy.
+ */
+static bool sync_complete(struct sync_filter_context *ctx)
+{
+ struct sync_buddy *buddy;
+
+ if (!ctx->ping_sent)
+ return false;
+ FOR_EACH_BUDDY(buddy, &ctx->buddies)
+ if (!buddy->sbi->disabled && !buddy->ping_received)
+ return false;
+ return true;
+}
+
+static void sync_disable_active_buddies(struct sync_filter_context *ctx)
+{
+ struct sync_buddy *buddy;
+
+ FOR_EACH_BUDDY(buddy, &ctx->buddies) {
+ if (buddy->sbi->disabled)
+ continue;
+ PARA_NOTICE_LOG("disabling %s\n", buddy->sbi->url);
+ buddy->sbi->disabled = true;
+ }
+}
+
+static void sync_set_timeout(struct sync_filter_context *ctx,
+ struct sync_filter_config *sfc)
+{
+ struct timeval to;
+
+ ms2tv(sfc->conf->timeout_arg, &to);
+ tv_add(now, &to, &ctx->timeout);
+}
+
+static void sync_pre_select(struct sched *s, struct task *t)
+{
+ int ret;
+ struct filter_node *fn = container_of(t, struct filter_node, task);
+ struct sync_filter_context *ctx = fn->private_data;
+ struct sync_filter_config *sfc = fn->conf;
+
+ if (list_empty(&ctx->buddies))
+ return sched_min_delay(s);
+ if (ctx->listen_fd < 0)
+ return sched_min_delay(s);
+ ret = btr_node_status(fn->btrn, 0, BTR_NT_INTERNAL);
+ if (ret < 0)
+ return sched_min_delay(s);
+ para_fd_set(ctx->listen_fd, &s->rfds, &s->max_fileno);
+ if (ret == 0)
+ return;
+ if (ctx->timeout.tv_sec == 0) { /* must ping buddies */
+ sync_set_timeout(ctx, sfc);
+ return sched_min_delay(s);
+ }
+ if (sync_complete(ctx)) /* push down what we have */
+ return sched_min_delay(s);
+ sched_request_barrier_or_min_delay(&ctx->timeout, s);
+}
+
+static struct sync_buddy *sync_find_buddy(struct sockaddr *addr,
+ struct list_head *list)
+{
+ struct sync_buddy *buddy;
+
+ FOR_EACH_BUDDY(buddy, list)
+ if (sockaddr_equal(buddy->sbi->ai->ai_addr, addr))
+ return buddy;
+ return NULL;
+}
+
+static int sync_post_select(__a_unused struct sched *s, struct task *t)
+{
+ int ret;
+ struct filter_node *fn = container_of(t, struct filter_node, task);
+ struct sync_filter_context *ctx = fn->private_data;
+ struct sync_filter_config *sfc = fn->conf;
+ struct sync_buddy *buddy, *tmp;
+
+ if (list_empty(&ctx->buddies))
+ goto success;
+ ret = -E_SYNC_LISTEN_FD;
+ if (ctx->listen_fd < 0)
+ goto fail;
+ ret = btr_node_status(fn->btrn, 0, BTR_NT_INTERNAL);
+ if (ret < 0)
+ goto fail;
+ if (ret == 0)
+ return 0;
+ if (ctx->timeout.tv_sec == 0)
+ sync_set_timeout(ctx, sfc);
+ else {
+ if (tv_diff(&ctx->timeout, now, NULL) < 0) {
+ sync_disable_active_buddies(ctx);
+ goto success;
+ }
+ }
+ if (!ctx->ping_sent) {
+ FOR_EACH_BUDDY_SAFE(buddy, tmp, &ctx->buddies) {
+ char c = '\0';
+ PARA_INFO_LOG("pinging %s (%s)\n",
+ buddy->sbi->url, buddy->sbi->disabled?
+ "disabled" : "enabled");
+ ret = xwrite(buddy->fd, &c, 1);
+ sync_close_buddy(buddy);
+ if (ret < 0) {
+ PARA_WARNING_LOG("failed to write to %s: %s\n",
+ buddy->sbi->url, para_strerror(-ret));
+ list_del(&buddy->node);
+ }
+ }
+ ctx->ping_sent = true;
+ }
+ if (FD_ISSET(ctx->listen_fd, &s->rfds)) {
+ char c;
+ for (;;) {
+ struct sockaddr src_addr;
+ socklen_t len = sizeof(src_addr);
+ ret = recvfrom(ctx->listen_fd, &c, 1, MSG_DONTWAIT,
+ &src_addr, &len);
+ if (ret < 0) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK)
+ break;
+ ret = -ERRNO_TO_PARA_ERROR(errno);
+ goto fail;
+ }
+ buddy = sync_find_buddy(&src_addr, &ctx->buddies);
+ if (!buddy) {
+ PARA_NOTICE_LOG("pinged by unknown\n");
+ continue;
+ }
+ PARA_DEBUG_LOG("pinged by %s\n", buddy->sbi->url);
+ if (buddy->sbi->disabled) {
+ PARA_NOTICE_LOG("enabling %s\n",
+ buddy->sbi->url);
+ buddy->sbi->disabled = false;
+ }
+ list_del(&buddy->node);
+ }
+ }
+ if (!sync_complete(ctx))
+ return 1;
+ /*
+ * Although all enabled buddies are in sync we do not splice out
+ * ourselves immediately. We rather wait until the timout expires,
+ * or the buddy list has become empty. This opens a time window
+ * for disabled buddies to become enabled by sending us a packet.
+ */
+ btr_pushdown(fn->btrn);
+ return 1;
+success:
+ ret = -E_SYNC_COMPLETE; /* success */
+ goto out;
+fail:
+ PARA_WARNING_LOG("%s\n", para_strerror(-ret));
+out:
+ sync_close_buddies(ctx);
+ btr_splice_out_node(&fn->btrn);
+ assert(ret < 0);
+ return ret;
+}
+
+/**
+ * The synchronization filter.
+ *
+ * \param f Pointer to the struct to initialize.
+ */
+void sync_filter_init(struct filter *f)
+{
+ struct sync_filter_args_info dummy;
+
+ sync_filter_cmdline_parser_init(&dummy);
+ f->open = sync_open;
+ f->close = sync_close;
+ f->pre_select = sync_pre_select;
+ f->post_select = sync_post_select;
+ f->parse_config = sync_parse_config;
+ f->free_config = sync_free_config;
+ f->help = (struct ggo_help)DEFINE_GGO_HELP(sync_filter);
+}