static bool conn_can_read(struct connection *conn)
{
- return conn->funcs->can_read(conn) && !conn->is_ignored;
+ if (!conn->funcs->can_read(conn))
+ return false;
+
+ if (conn->is_ignored)
+ return false;
+
+ /*
+ * For stalled connection, we want to process the pending
+ * command as soon as live-update has aborted.
+ */
+ if (conn->is_stalled)
+ return !lu_is_pending();
+
+ return true;
}
static bool conn_can_write(struct connection *conn)
if (!list_empty(&conn->out_list))
events |= POLLOUT;
conn->pollfd_idx = set_fd(conn->fd, events);
+ /*
+ * For stalled connection, we want to process the
+ * pending command as soon as live-update has aborted.
+ */
+ if (conn->is_stalled && !lu_is_pending())
+ *ptimeout = 0;
}
}
}
struct connection *conn = req->data;
struct buffered_data *saved_in = conn->in;
+ if (lu_is_pending())
+ return false;
+
/*
* Part of process_message() expects conn->in to contains the
* processed response. So save the current conn->in and restore it
sockmsg_string(conn->in->hdr.msg.type),
conn->in->hdr.msg.len, conn);
+ conn->is_stalled = false;
+ /*
+ * Currently, Live-Update is not supported if there is active
+ * transactions. In order to reduce the number of retry, delay
+ * any new request to start a transaction if Live-Update is pending
+ * and there are no transactions in-flight.
+ *
+ * If we can't delay the request, then mark the connection as
+ * stalled. This will ignore new requests until Live-Update happened
+ * or it was aborted.
+ */
+ if (lu_is_pending() && conn->transaction_started == 0 &&
+ conn->in->hdr.msg.type == XS_TRANSACTION_START) {
+ trace("Delaying transaction start for connection %p req_id %u\n",
+ conn, conn->in->hdr.msg.req_id);
+
+ if (delay_request(conn, conn->in, process_delayed_message,
+ conn, false) != 0) {
+ trace("Stalling connection %p\n", conn);
+ conn->is_stalled = true;
+ }
+ return;
+ }
+
process_message(conn, conn->in);
assert(conn->in == NULL);
new->pollfd_idx = -1;
new->funcs = funcs;
new->is_ignored = false;
+ new->is_stalled = false;
new->transaction_started = 0;
INIT_LIST_HEAD(&new->out_list);
INIT_LIST_HEAD(&new->watches);