* osevent poll
*/
-static int beforepoll_internal(libxl__gc *gc, int *nfds_io,
- struct pollfd *fds, int *timeout_upd,
- struct timeval now)
+static int beforepoll_internal(libxl__gc *gc, libxl__poller *poller,
+ int *nfds_io, struct pollfd *fds,
+ int *timeout_upd, struct timeval now)
{
libxl__ev_fd *efd;
int rc;
* not to mess with fd_rindex.
*/
- int maxfd = 0;
+ int maxfd = poller->wakeup_pipe[0] + 1;
LIBXL_LIST_FOREACH(efd, &CTX->efds, entry) {
if (!efd->events)
continue;
maxfd = efd->fd + 1;
}
/* make sure our array is as big as *nfds_io */
- if (CTX->fd_rindex_allocd < maxfd) {
+ if (poller->fd_rindex_allocd < maxfd) {
assert(maxfd < INT_MAX / sizeof(int) / 2);
- int *newarray = realloc(CTX->fd_rindex, sizeof(int) * maxfd);
+ int *newarray = realloc(poller->fd_rindex, sizeof(int) * maxfd);
if (!newarray) { rc = ERROR_NOMEM; goto out; }
- memset(newarray + CTX->fd_rindex_allocd, 0,
- sizeof(int) * (maxfd - CTX->fd_rindex_allocd));
- CTX->fd_rindex = newarray;
- CTX->fd_rindex_allocd = maxfd;
+ memset(newarray + poller->fd_rindex_allocd, 0,
+ sizeof(int) * (maxfd - poller->fd_rindex_allocd));
+ poller->fd_rindex = newarray;
+ poller->fd_rindex_allocd = maxfd;
}
}
int used = 0;
- LIBXL_LIST_FOREACH(efd, &CTX->efds, entry) {
- if (!efd->events)
- continue;
- if (used < *nfds_io) {
- fds[used].fd = efd->fd;
- fds[used].events = efd->events;
- fds[used].revents = 0;
- assert(efd->fd < CTX->fd_rindex_allocd);
- CTX->fd_rindex[efd->fd] = used;
- }
- used++;
- }
+
+#define REQUIRE_FD(req_fd, req_events, efd) do{ \
+ if ((req_events)) { \
+ if (used < *nfds_io) { \
+ fds[used].fd = (req_fd); \
+ fds[used].events = (req_events); \
+ fds[used].revents = 0; \
+ assert((req_fd) < poller->fd_rindex_allocd); \
+ poller->fd_rindex[(req_fd)] = used; \
+ } \
+ used++; \
+ } \
+ }while(0)
+
+ LIBXL_LIST_FOREACH(efd, &CTX->efds, entry)
+ REQUIRE_FD(efd->fd, efd->events, efd);
+
+ REQUIRE_FD(poller->wakeup_pipe[0], POLLIN, 0);
+
+#undef REQUIRE_FD
+
rc = used <= *nfds_io ? 0 : ERROR_BUFFERFULL;
*nfds_io = used;
{
EGC_INIT(ctx);
CTX_LOCK;
- int rc = beforepoll_internal(gc, nfds_io, fds, timeout_upd, now);
+ int rc = beforepoll_internal(gc, &ctx->poller_app,
+ nfds_io, fds, timeout_upd, now);
CTX_UNLOCK;
EGC_FREE;
return rc;
}
-static int afterpoll_check_fd(libxl_ctx *ctx,
+static int afterpoll_check_fd(libxl__poller *poller,
const struct pollfd *fds, int nfds,
int fd, int events)
/* returns mask of events which were requested and occurred */
{
- if (fd >= ctx->fd_rindex_allocd)
+ if (fd >= poller->fd_rindex_allocd)
/* added after we went into poll, have to try again */
return 0;
- int slot = ctx->fd_rindex[fd];
+ int slot = poller->fd_rindex[fd];
if (slot >= nfds)
/* stale slot entry; again, added afterwards */
return revents;
}
-static void afterpoll_internal(libxl__egc *egc,
+static void afterpoll_internal(libxl__egc *egc, libxl__poller *poller,
int nfds, const struct pollfd *fds,
struct timeval now)
{
EGC_GC;
libxl__ev_fd *efd;
+
LIBXL_LIST_FOREACH(efd, &CTX->efds, entry) {
if (!efd->events)
continue;
- int revents = afterpoll_check_fd(CTX,fds,nfds, efd->fd,efd->events);
+ int revents = afterpoll_check_fd(poller,fds,nfds, efd->fd,efd->events);
if (revents)
efd->func(egc, efd, efd->fd, efd->events, revents);
}
+ if (afterpoll_check_fd(poller,fds,nfds, poller->wakeup_pipe[0],POLLIN)) {
+ char buf[256];
+ int r = read(poller->wakeup_pipe[0], buf, sizeof(buf));
+ if (r < 0)
+ if (errno != EINTR && errno != EWOULDBLOCK)
+ LIBXL__EVENT_DISASTER(egc, "read wakeup", errno, 0);
+ }
+
for (;;) {
libxl__ev_time *etime = LIBXL_TAILQ_FIRST(&CTX->etimes);
if (!etime)
{
EGC_INIT(ctx);
CTX_LOCK;
- afterpoll_internal(egc, nfds, fds, now);
+ afterpoll_internal(egc, &ctx->poller_app, nfds, fds, now);
CTX_UNLOCK;
EGC_FREE;
}
LIBXL_TAILQ_INSERT_TAIL(&egc->occurred_for_callback, event, link);
return;
} else {
+ libxl__poller *poller;
LIBXL_TAILQ_INSERT_TAIL(&CTX->occurred, event, link);
+ LIBXL_LIST_FOREACH(poller, &CTX->pollers_event, entry)
+ libxl__poller_wakeup(egc, poller);
}
}
return rc;
}
-static int eventloop_iteration(libxl__egc *egc) {
+/*
+ * Manipulation of pollers
+ */
+
+int libxl__poller_init(libxl_ctx *ctx, libxl__poller *p)
+{
+ int r, rc;
+ p->fd_polls = 0;
+ p->fd_rindex = 0;
+
+ r = pipe(p->wakeup_pipe);
+ if (r) {
+ LIBXL__LOG_ERRNO(ctx, LIBXL__LOG_ERROR, "cannot create poller pipe");
+ rc = ERROR_FAIL;
+ goto out;
+ }
+
+ rc = libxl_fd_set_nonblock(ctx, p->wakeup_pipe[0], 1);
+ if (rc) goto out;
+
+ rc = libxl_fd_set_nonblock(ctx, p->wakeup_pipe[1], 1);
+ if (rc) goto out;
+
+ return 0;
+
+ out:
+ libxl__poller_dispose(p);
+ return rc;
+}
+
+void libxl__poller_dispose(libxl__poller *p)
+{
+ if (p->wakeup_pipe[1] > 0) close(p->wakeup_pipe[1]);
+ if (p->wakeup_pipe[0] > 0) close(p->wakeup_pipe[0]);
+ free(p->fd_polls);
+ free(p->fd_rindex);
+}
+
+libxl__poller *libxl__poller_get(libxl_ctx *ctx)
+{
+ /* must be called with ctx locked */
+ int rc;
+
+ libxl__poller *p = LIBXL_LIST_FIRST(&ctx->pollers_idle);
+ if (p)
+ return p;
+
+ p = malloc(sizeof(*p));
+ if (!p) {
+ LIBXL__LOG_ERRNO(ctx, LIBXL__LOG_ERROR, "cannot allocate poller");
+ return 0;
+ }
+ memset(p, 0, sizeof(*p));
+
+ rc = libxl__poller_init(ctx, p);
+ if (rc) return NULL;
+
+ return p;
+}
+
+void libxl__poller_put(libxl_ctx *ctx, libxl__poller *p)
+{
+ LIBXL_LIST_INSERT_HEAD(&ctx->pollers_idle, p, entry);
+}
+
+void libxl__poller_wakeup(libxl__egc *egc, libxl__poller *p)
+{
+ static const char buf[1] = "";
+
+ for (;;) {
+ int r = write(p->wakeup_pipe[1], buf, 1);
+ if (r==1) return;
+ assert(r==-1);
+ if (errno == EINTR) continue;
+ if (errno == EWOULDBLOCK) return;
+ LIBXL__EVENT_DISASTER(egc, "cannot poke watch pipe", errno, 0);
+ return;
+ }
+}
+
+/*
+ * Main event loop iteration
+ */
+
+static int eventloop_iteration(libxl__egc *egc, libxl__poller *poller) {
+ /* The CTX must be locked EXACTLY ONCE so that this function
+ * can unlock it when it polls.
+ */
EGC_GC;
int rc;
struct timeval now;
int timeout;
for (;;) {
- int nfds = CTX->fd_polls_allocd;
+ int nfds = poller->fd_polls_allocd;
timeout = -1;
- rc = beforepoll_internal(gc, &nfds, CTX->fd_polls, &timeout, now);
+ rc = beforepoll_internal(gc, poller, &nfds, poller->fd_polls,
+ &timeout, now);
if (!rc) break;
if (rc != ERROR_BUFFERFULL) goto out;
struct pollfd *newarray =
(nfds > INT_MAX / sizeof(struct pollfd) / 2) ? 0 :
- realloc(CTX->fd_polls, sizeof(*newarray) * nfds);
+ realloc(poller->fd_polls, sizeof(*newarray) * nfds);
if (!newarray) { rc = ERROR_NOMEM; goto out; }
- CTX->fd_polls = newarray;
- CTX->fd_polls_allocd = nfds;
+ poller->fd_polls = newarray;
+ poller->fd_polls_allocd = nfds;
}
- rc = poll(CTX->fd_polls, CTX->fd_polls_allocd, timeout);
+ CTX_UNLOCK;
+ rc = poll(poller->fd_polls, poller->fd_polls_allocd, timeout);
+ CTX_LOCK;
+
if (rc < 0) {
if (errno == EINTR)
return 0; /* will go round again if caller requires */
rc = libxl__gettimeofday(gc, &now);
if (rc) goto out;
- afterpoll_internal(egc, CTX->fd_polls_allocd, CTX->fd_polls, now);
+ afterpoll_internal(egc, poller,
+ poller->fd_polls_allocd, poller->fd_polls, now);
CTX_UNLOCK;
libxl_event_predicate *pred, void *pred_user)
{
int rc;
+ libxl__poller *poller = NULL;
EGC_INIT(ctx);
CTX_LOCK;
+ poller = libxl__poller_get(ctx);
+ if (!poller) { rc = ERROR_FAIL; goto out; }
+
for (;;) {
rc = event_check_internal(egc, event_r, typemask, pred, pred_user);
if (rc != ERROR_NOT_READY) goto out;
- rc = eventloop_iteration(egc);
+ rc = eventloop_iteration(egc, poller);
if (rc) goto out;
/* we unlock and cleanup the egc each time we go through this loop,
}
out:
+ libxl__poller_put(ctx, poller);
+
CTX_UNLOCK;
EGC_FREE;
return rc;
_hidden void
libxl__evdisable_disk_eject(libxl__gc*, libxl_evgen_disk_eject*);
+typedef struct libxl__poller libxl__poller;
+struct libxl__poller {
+ /*
+ * These are used to allow other threads to wake up a thread which
+ * may be stuck in poll, because whatever it was waiting for
+ * hadn't happened yet. Threads which generate events will write
+ * a byte to each pipe. A thread which is waiting will empty its
+ * own pipe, and put its poller on the pollers_event list, before
+ * releasing the ctx lock and going into poll; when it comes out
+ * of poll it will take the poller off the pollers_event list.
+ *
+ * When a thread is done with a poller it should put it onto
+ * pollers_idle, where it can be reused later.
+ *
+ * The "poller_app" is never idle, but is sometimes on
+ * pollers_event.
+ */
+ LIBXL_LIST_ENTRY(libxl__poller) entry;
+
+ struct pollfd *fd_polls;
+ int fd_polls_allocd;
+
+ int fd_rindex_allocd;
+ int *fd_rindex; /* see libxl_osevent_beforepoll */
+
+ int wakeup_pipe[2]; /* 0 means no fd allocated */
+};
struct libxl__ctx {
xentoollog_logger *lg;
/* See the comment for OSEVENT_HOOK_INTERN in libxl_event.c
* for restrictions on the use of the osevent fields. */
- struct pollfd *fd_polls;
- int fd_polls_allocd;
- int fd_rindex_allocd;
- int *fd_rindex; /* see libxl_osevent_beforepoll */
+ libxl__poller poller_app; /* libxl_osevent_beforepoll and _afterpoll */
+ LIBXL_LIST_HEAD(, libxl__poller) pollers_event, pollers_idle;
+
LIBXL_LIST_HEAD(, libxl__ev_fd) efds;
LIBXL_TAILQ_HEAD(, libxl__ev_time) etimes;
libxl__event_disaster(egc, msg, errnoval, type, __FILE__,__LINE__,__func__)
+/* Fills in, or disposes of, the resources held by, a poller whose
+ * space the caller has allocated. ctx must be locked. */
+int libxl__poller_init(libxl_ctx *ctx, libxl__poller *p);
+void libxl__poller_dispose(libxl__poller *p);
+
+/* Obtain a fresh poller from malloc or the idle list, and put it
+ * away again afterwards. _get can fail, returning NULL.
+ * ctx must be locked. */
+libxl__poller *libxl__poller_get(libxl_ctx *ctx);
+void libxl__poller_put(libxl_ctx *ctx, libxl__poller *p);
+
+/* Notifies whoever is polling using p that they should wake up.
+ * ctx must be locked. */
+void libxl__poller_wakeup(libxl__egc *egc, libxl__poller *p);
+
+
/* from xl_dom */
_hidden libxl_domain_type libxl__domain_type(libxl__gc *gc, uint32_t domid);
_hidden int libxl__domain_shutdown_reason(libxl__gc *gc, uint32_t domid);