libxl: Permit multithreaded event waiting
authorIan Jackson <ian.jackson@eu.citrix.com>
Fri, 27 Jan 2012 17:01:23 +0000 (17:01 +0000)
committerIan Jackson <ian.jackson@eu.citrix.com>
Fri, 27 Jan 2012 17:01:23 +0000 (17:01 +0000)
Previously, the context would be locked whenever we were waiting in
libxl's own call to poll (waiting for operating system events).

This would mean that multiple simultaneous calls to libxl_event_wait
in different threads with different parameters would not work
properly.

If we simply unlock the context, it would be possible for another
thread to discover the occurrence of the event we were waiting for,
without us even waking up, and we would remain in poll.  So we need a
way to wake up other threads: a pipe, one for each thread in poll.

We also need to move some variables from globals in the ctx to be
per-polling-thread.

Signed-off-by: Ian Jackson <ian.jackson@eu.citrix.com>
Acked-by: Ian Campbell <ian.campbell@citrix.com>
Committed-by: Ian Jackson <Ian.Jackson@eu.citrix.com>
tools/libxl/libxl.c
tools/libxl/libxl_event.c
tools/libxl/libxl_internal.h

index 07725ded4e028830bdb93b699c93aaaf04293b43..aa546a0a1f13d0db30617b4d31ca78e0b2bea378 100644 (file)
@@ -49,8 +49,9 @@ int libxl_ctx_alloc(libxl_ctx **pctx, int version,
 
     ctx->osevent_hooks = 0;
 
-    ctx->fd_polls = 0;
-    ctx->fd_rindex = 0;
+    LIBXL_LIST_INIT(&ctx->pollers_event);
+    LIBXL_LIST_INIT(&ctx->pollers_idle);
+
     LIBXL_LIST_INIT(&ctx->efds);
     LIBXL_TAILQ_INIT(&ctx->etimes);
 
@@ -61,6 +62,9 @@ int libxl_ctx_alloc(libxl_ctx **pctx, int version,
     LIBXL_TAILQ_INIT(&ctx->death_list);
     libxl__ev_xswatch_init(&ctx->death_watch);
 
+    rc = libxl__poller_init(ctx, &ctx->poller_app);
+    if (rc) goto out;
+
     if ( stat(XENSTORE_PID_FILE, &stat_buf) != 0 ) {
         LIBXL__LOG_ERRNO(ctx, LIBXL__LOG_ERROR, "Is xenstore daemon running?\n"
                      "failed to stat %s", XENSTORE_PID_FILE);
@@ -135,8 +139,14 @@ int libxl_ctx_free(libxl_ctx *ctx)
     libxl_version_info_dispose(&ctx->version_info);
     if (ctx->xsh) xs_daemon_close(ctx->xsh);
 
-    free(ctx->fd_polls);
-    free(ctx->fd_rindex);
+    libxl__poller_dispose(&ctx->poller_app);
+    assert(LIBXL_LIST_EMPTY(&ctx->pollers_event));
+    libxl__poller *poller, *poller_tmp;
+    LIBXL_LIST_FOREACH_SAFE(poller, &ctx->pollers_idle, entry, poller_tmp) {
+        libxl__poller_dispose(poller);
+        free(poller);
+    }
+
     free(ctx->watch_slots);
 
     discard_events(&ctx->occurred);
index 69ad318a2d7bcbf0725c5f8ebac0c8c3277d46ba..73dfd9d90a342ba1bc96875000a2278abcc73e63 100644 (file)
@@ -510,9 +510,9 @@ void libxl__ev_xswatch_deregister(libxl__gc *gc, libxl__ev_xswatch *w)
  * osevent poll
  */
 
-static int beforepoll_internal(libxl__gc *gc, int *nfds_io,
-                               struct pollfd *fds, int *timeout_upd,
-                               struct timeval now)
+static int beforepoll_internal(libxl__gc *gc, libxl__poller *poller,
+                               int *nfds_io, struct pollfd *fds,
+                               int *timeout_upd, struct timeval now)
 {
     libxl__ev_fd *efd;
     int rc;
@@ -534,7 +534,7 @@ static int beforepoll_internal(libxl__gc *gc, int *nfds_io,
          * not to mess with fd_rindex.
          */
 
-        int maxfd = 0;
+        int maxfd = poller->wakeup_pipe[0] + 1;
         LIBXL_LIST_FOREACH(efd, &CTX->efds, entry) {
             if (!efd->events)
                 continue;
@@ -542,30 +542,39 @@ static int beforepoll_internal(libxl__gc *gc, int *nfds_io,
                 maxfd = efd->fd + 1;
         }
         /* make sure our array is as big as *nfds_io */
-        if (CTX->fd_rindex_allocd < maxfd) {
+        if (poller->fd_rindex_allocd < maxfd) {
             assert(maxfd < INT_MAX / sizeof(int) / 2);
-            int *newarray = realloc(CTX->fd_rindex, sizeof(int) * maxfd);
+            int *newarray = realloc(poller->fd_rindex, sizeof(int) * maxfd);
             if (!newarray) { rc = ERROR_NOMEM; goto out; }
-            memset(newarray + CTX->fd_rindex_allocd, 0,
-                   sizeof(int) * (maxfd - CTX->fd_rindex_allocd));
-            CTX->fd_rindex = newarray;
-            CTX->fd_rindex_allocd = maxfd;
+            memset(newarray + poller->fd_rindex_allocd, 0,
+                   sizeof(int) * (maxfd - poller->fd_rindex_allocd));
+            poller->fd_rindex = newarray;
+            poller->fd_rindex_allocd = maxfd;
         }
     }
 
     int used = 0;
-    LIBXL_LIST_FOREACH(efd, &CTX->efds, entry) {
-        if (!efd->events)
-            continue;
-        if (used < *nfds_io) {
-            fds[used].fd = efd->fd;
-            fds[used].events = efd->events;
-            fds[used].revents = 0;
-            assert(efd->fd < CTX->fd_rindex_allocd);
-            CTX->fd_rindex[efd->fd] = used;
-        }
-        used++;
-    }
+
+#define REQUIRE_FD(req_fd, req_events, efd) do{                 \
+        if ((req_events)) {                                     \
+            if (used < *nfds_io) {                              \
+                fds[used].fd = (req_fd);                        \
+                fds[used].events = (req_events);                \
+                fds[used].revents = 0;                          \
+                assert((req_fd) < poller->fd_rindex_allocd);    \
+                poller->fd_rindex[(req_fd)] = used;             \
+            }                                                   \
+            used++;                                             \
+        }                                                       \
+    }while(0)
+
+    LIBXL_LIST_FOREACH(efd, &CTX->efds, entry)
+        REQUIRE_FD(efd->fd, efd->events, efd);
+
+    REQUIRE_FD(poller->wakeup_pipe[0], POLLIN, 0);
+
+#undef REQUIRE_FD
+
     rc = used <= *nfds_io ? 0 : ERROR_BUFFERFULL;
 
     *nfds_io = used;
@@ -599,22 +608,23 @@ int libxl_osevent_beforepoll(libxl_ctx *ctx, int *nfds_io,
 {
     EGC_INIT(ctx);
     CTX_LOCK;
-    int rc = beforepoll_internal(gc, nfds_io, fds, timeout_upd, now);
+    int rc = beforepoll_internal(gc, &ctx->poller_app,
+                                 nfds_io, fds, timeout_upd, now);
     CTX_UNLOCK;
     EGC_FREE;
     return rc;
 }
 
-static int afterpoll_check_fd(libxl_ctx *ctx,
+static int afterpoll_check_fd(libxl__poller *poller,
                               const struct pollfd *fds, int nfds,
                               int fd, int events)
     /* returns mask of events which were requested and occurred */
 {
-    if (fd >= ctx->fd_rindex_allocd)
+    if (fd >= poller->fd_rindex_allocd)
         /* added after we went into poll, have to try again */
         return 0;
 
-    int slot = ctx->fd_rindex[fd];
+    int slot = poller->fd_rindex[fd];
 
     if (slot >= nfds)
         /* stale slot entry; again, added afterwards */
@@ -630,22 +640,31 @@ static int afterpoll_check_fd(libxl_ctx *ctx,
     return revents;
 }
 
-static void afterpoll_internal(libxl__egc *egc,
+static void afterpoll_internal(libxl__egc *egc, libxl__poller *poller,
                                int nfds, const struct pollfd *fds,
                                struct timeval now)
 {
     EGC_GC;
     libxl__ev_fd *efd;
 
+
     LIBXL_LIST_FOREACH(efd, &CTX->efds, entry) {
         if (!efd->events)
             continue;
 
-        int revents = afterpoll_check_fd(CTX,fds,nfds, efd->fd,efd->events);
+        int revents = afterpoll_check_fd(poller,fds,nfds, efd->fd,efd->events);
         if (revents)
             efd->func(egc, efd, efd->fd, efd->events, revents);
     }
 
+    if (afterpoll_check_fd(poller,fds,nfds, poller->wakeup_pipe[0],POLLIN)) {
+        char buf[256];
+        int r = read(poller->wakeup_pipe[0], buf, sizeof(buf));
+        if (r < 0)
+            if (errno != EINTR && errno != EWOULDBLOCK)
+                LIBXL__EVENT_DISASTER(egc, "read wakeup", errno, 0);
+    }
+
     for (;;) {
         libxl__ev_time *etime = LIBXL_TAILQ_FIRST(&CTX->etimes);
         if (!etime)
@@ -667,7 +686,7 @@ void libxl_osevent_afterpoll(libxl_ctx *ctx, int nfds, const struct pollfd *fds,
 {
     EGC_INIT(ctx);
     CTX_LOCK;
-    afterpoll_internal(egc, nfds, fds, now);
+    afterpoll_internal(egc, &ctx->poller_app, nfds, fds, now);
     CTX_UNLOCK;
     EGC_FREE;
 }
@@ -790,7 +809,10 @@ void libxl__event_occurred(libxl__egc *egc, libxl_event *event)
         LIBXL_TAILQ_INSERT_TAIL(&egc->occurred_for_callback, event, link);
         return;
     } else {
+        libxl__poller *poller;
         LIBXL_TAILQ_INSERT_TAIL(&CTX->occurred, event, link);
+        LIBXL_LIST_FOREACH(poller, &CTX->pollers_event, entry)
+            libxl__poller_wakeup(egc, poller);
     }
 }
 
@@ -858,7 +880,94 @@ int libxl_event_check(libxl_ctx *ctx, libxl_event **event_r,
     return rc;
 }
 
-static int eventloop_iteration(libxl__egc *egc) {
+/*
+ * Manipulation of pollers
+ */
+
+int libxl__poller_init(libxl_ctx *ctx, libxl__poller *p)
+{
+    int r, rc;
+    p->fd_polls = 0;
+    p->fd_rindex = 0;
+
+    r = pipe(p->wakeup_pipe);
+    if (r) {
+        LIBXL__LOG_ERRNO(ctx, LIBXL__LOG_ERROR, "cannot create poller pipe");
+        rc = ERROR_FAIL;
+        goto out;
+    }
+
+    rc = libxl_fd_set_nonblock(ctx, p->wakeup_pipe[0], 1);
+    if (rc) goto out;
+
+    rc = libxl_fd_set_nonblock(ctx, p->wakeup_pipe[1], 1);
+    if (rc) goto out;
+
+    return 0;
+
+ out:
+    libxl__poller_dispose(p);
+    return rc;
+}
+
+void libxl__poller_dispose(libxl__poller *p)
+{
+    if (p->wakeup_pipe[1] > 0) close(p->wakeup_pipe[1]);
+    if (p->wakeup_pipe[0] > 0) close(p->wakeup_pipe[0]);
+    free(p->fd_polls);
+    free(p->fd_rindex);
+}
+
+libxl__poller *libxl__poller_get(libxl_ctx *ctx)
+{
+    /* must be called with ctx locked */
+    int rc;
+
+    libxl__poller *p = LIBXL_LIST_FIRST(&ctx->pollers_idle);
+    if (p)
+        return p;
+
+    p = malloc(sizeof(*p));
+    if (!p) {
+        LIBXL__LOG_ERRNO(ctx, LIBXL__LOG_ERROR, "cannot allocate poller");
+        return 0;
+    }
+    memset(p, 0, sizeof(*p));
+
+    rc = libxl__poller_init(ctx, p);
+    if (rc) return NULL;
+
+    return p;
+}
+
+void libxl__poller_put(libxl_ctx *ctx, libxl__poller *p)
+{
+    LIBXL_LIST_INSERT_HEAD(&ctx->pollers_idle, p, entry);
+}
+
+void libxl__poller_wakeup(libxl__egc *egc, libxl__poller *p)
+{
+    static const char buf[1] = "";
+
+    for (;;) {
+        int r = write(p->wakeup_pipe[1], buf, 1);
+        if (r==1) return;
+        assert(r==-1);
+        if (errno == EINTR) continue;
+        if (errno == EWOULDBLOCK) return;
+        LIBXL__EVENT_DISASTER(egc, "cannot poke watch pipe", errno, 0);
+        return;
+    }
+}
+
+/*
+ * Main event loop iteration
+ */
+
+static int eventloop_iteration(libxl__egc *egc, libxl__poller *poller) {
+    /* The CTX must be locked EXACTLY ONCE so that this function
+     * can unlock it when it polls.
+     */
     EGC_GC;
     int rc;
     struct timeval now;
@@ -871,23 +980,27 @@ static int eventloop_iteration(libxl__egc *egc) {
     int timeout;
 
     for (;;) {
-        int nfds = CTX->fd_polls_allocd;
+        int nfds = poller->fd_polls_allocd;
         timeout = -1;
-        rc = beforepoll_internal(gc, &nfds, CTX->fd_polls, &timeout, now);
+        rc = beforepoll_internal(gc, poller, &nfds, poller->fd_polls,
+                                 &timeout, now);
         if (!rc) break;
         if (rc != ERROR_BUFFERFULL) goto out;
 
         struct pollfd *newarray =
             (nfds > INT_MAX / sizeof(struct pollfd) / 2) ? 0 :
-            realloc(CTX->fd_polls, sizeof(*newarray) * nfds);
+            realloc(poller->fd_polls, sizeof(*newarray) * nfds);
 
         if (!newarray) { rc = ERROR_NOMEM; goto out; }
 
-        CTX->fd_polls = newarray;
-        CTX->fd_polls_allocd = nfds;
+        poller->fd_polls = newarray;
+        poller->fd_polls_allocd = nfds;
     }
 
-    rc = poll(CTX->fd_polls, CTX->fd_polls_allocd, timeout);
+    CTX_UNLOCK;
+    rc = poll(poller->fd_polls, poller->fd_polls_allocd, timeout);
+    CTX_LOCK;
+
     if (rc < 0) {
         if (errno == EINTR)
             return 0; /* will go round again if caller requires */
@@ -900,7 +1013,8 @@ static int eventloop_iteration(libxl__egc *egc) {
     rc = libxl__gettimeofday(gc, &now);
     if (rc) goto out;
 
-    afterpoll_internal(egc, CTX->fd_polls_allocd, CTX->fd_polls, now);
+    afterpoll_internal(egc, poller,
+                       poller->fd_polls_allocd, poller->fd_polls, now);
 
     CTX_UNLOCK;
 
@@ -914,15 +1028,19 @@ int libxl_event_wait(libxl_ctx *ctx, libxl_event **event_r,
                      libxl_event_predicate *pred, void *pred_user)
 {
     int rc;
+    libxl__poller *poller = NULL;
 
     EGC_INIT(ctx);
     CTX_LOCK;
 
+    poller = libxl__poller_get(ctx);
+    if (!poller) { rc = ERROR_FAIL; goto out; }
+
     for (;;) {
         rc = event_check_internal(egc, event_r, typemask, pred, pred_user);
         if (rc != ERROR_NOT_READY) goto out;
 
-        rc = eventloop_iteration(egc);
+        rc = eventloop_iteration(egc, poller);
         if (rc) goto out;
 
         /* we unlock and cleanup the egc each time we go through this loop,
@@ -936,6 +1054,8 @@ int libxl_event_wait(libxl_ctx *ctx, libxl_event **event_r,
     }
 
  out:
+    libxl__poller_put(ctx, poller);
+
     CTX_UNLOCK;
     EGC_FREE;
     return rc;
index 8e80f24a22834810e6853110eeae629cac256a16..d1b96c16fda720e81471bcaacb3113db62a436be 100644 (file)
@@ -207,6 +207,33 @@ struct libxl__evgen_disk_eject {
 _hidden void
 libxl__evdisable_disk_eject(libxl__gc*, libxl_evgen_disk_eject*);
 
+typedef struct libxl__poller libxl__poller;
+struct libxl__poller {
+    /*
+     * These are used to allow other threads to wake up a thread which
+     * may be stuck in poll, because whatever it was waiting for
+     * hadn't happened yet.  Threads which generate events will write
+     * a byte to each pipe.  A thread which is waiting will empty its
+     * own pipe, and put its poller on the pollers_event list, before
+     * releasing the ctx lock and going into poll; when it comes out
+     * of poll it will take the poller off the pollers_event list.
+     *
+     * When a thread is done with a poller it should put it onto
+     * pollers_idle, where it can be reused later.
+     *
+     * The "poller_app" is never idle, but is sometimes on
+     * pollers_event.
+     */
+    LIBXL_LIST_ENTRY(libxl__poller) entry;
+
+    struct pollfd *fd_polls;
+    int fd_polls_allocd;
+
+    int fd_rindex_allocd;
+    int *fd_rindex; /* see libxl_osevent_beforepoll */
+
+    int wakeup_pipe[2]; /* 0 means no fd allocated */
+};
 
 struct libxl__ctx {
     xentoollog_logger *lg;
@@ -237,10 +264,9 @@ struct libxl__ctx {
       /* See the comment for OSEVENT_HOOK_INTERN in libxl_event.c
        * for restrictions on the use of the osevent fields. */
 
-    struct pollfd *fd_polls;
-    int fd_polls_allocd;
-    int fd_rindex_allocd;
-    int *fd_rindex; /* see libxl_osevent_beforepoll */
+    libxl__poller poller_app; /* libxl_osevent_beforepoll and _afterpoll */
+    LIBXL_LIST_HEAD(, libxl__poller) pollers_event, pollers_idle;
+
     LIBXL_LIST_HEAD(, libxl__ev_fd) efds;
     LIBXL_TAILQ_HEAD(, libxl__ev_time) etimes;
 
@@ -526,6 +552,22 @@ _hidden void libxl__event_disaster(libxl__egc*, const char *msg, int errnoval,
     libxl__event_disaster(egc, msg, errnoval, type, __FILE__,__LINE__,__func__)
 
 
+/* Fills in, or disposes of, the resources held by, a poller whose
+ * space the caller has allocated.  ctx must be locked. */
+int libxl__poller_init(libxl_ctx *ctx, libxl__poller *p);
+void libxl__poller_dispose(libxl__poller *p);
+
+/* Obtain a fresh poller from malloc or the idle list, and put it
+ * away again afterwards.  _get can fail, returning NULL.
+ * ctx must be locked. */
+libxl__poller *libxl__poller_get(libxl_ctx *ctx);
+void libxl__poller_put(libxl_ctx *ctx, libxl__poller *p);
+
+/* Notifies whoever is polling using p that they should wake up.
+ * ctx must be locked. */
+void libxl__poller_wakeup(libxl__egc *egc, libxl__poller *p);
+
+
 /* from xl_dom */
 _hidden libxl_domain_type libxl__domain_type(libxl__gc *gc, uint32_t domid);
 _hidden int libxl__domain_shutdown_reason(libxl__gc *gc, uint32_t domid);