From: Yiyang Zhou Date: Wed, 4 Aug 2021 13:55:04 +0000 (+0800) Subject: [PATCH] THRIFT-5449: Use poll instead of select in Python TNonblockingServer if avail... X-Git-Tag: archive/raspbian/0.16.0-5+rpi1^2~2 X-Git-Url: https://dgit.raspbian.org/?a=commitdiff_plain;h=162ee2ca2c6dd761d99647857b8ebef786c734e7;p=thrift.git [PATCH] THRIFT-5449: Use poll instead of select in Python TNonblockingServer if available Client: Python Gbp-Pq: Name THRIFT-5449.patch --- diff --git a/lib/py/src/server/TNonblockingServer.py b/lib/py/src/server/TNonblockingServer.py index ac06496..fdf6779 100644 --- a/lib/py/src/server/TNonblockingServer.py +++ b/lib/py/src/server/TNonblockingServer.py @@ -253,6 +253,7 @@ class TNonblockingServer(object): self._read, self._write = socket.socketpair() self.prepared = False self._stop = False + self.poll = select.poll() if hasattr(select, 'poll') else None def setNumThreads(self, num): """Set the number of worker threads that should be created.""" @@ -318,13 +319,53 @@ class TNonblockingServer(object): else: return select.select(readable, writable, readable) + (True,) + def _poll_select(self): + """Does poll on open connections, if available.""" + remaining = [] + + self.poll.register(self.socket.handle.fileno(), select.POLLIN | select.POLLRDNORM) + self.poll.register(self._read.fileno(), select.POLLIN | select.POLLRDNORM) + + for i, connection in list(self.clients.items()): + if connection.is_readable(): + self.poll.register(connection.fileno(), select.POLLIN | select.POLLRDNORM | select.POLLERR | select.POLLHUP | select.POLLNVAL) + if connection.remaining or connection.received: + remaining.append(connection.fileno()) + if connection.is_writeable(): + self.poll.register(connection.fileno(), select.POLLOUT | select.POLLWRNORM) + if connection.is_closed(): + try: + self.poll.unregister(i) + except KeyError: + logger.debug("KeyError in unregistering connections...") + del self.clients[i] + if remaining: + return remaining, [], [], False + + rlist = [] + wlist = [] + xlist = [] + pollres = self.poll.poll() + for fd, event in pollres: + if event & (select.POLLERR | select.POLLHUP | select.POLLNVAL): + xlist.append(fd) + elif event & (select.POLLOUT | select.POLLWRNORM): + wlist.append(fd) + elif event & (select.POLLIN | select.POLLRDNORM): + rlist.append(fd) + else: # should be impossible + logger.debug("reached an impossible state in _poll_select") + xlist.append(fd) + + return rlist, wlist, xlist, True + def handle(self): """Handle requests. WARNING! You must call prepare() BEFORE calling handle() """ assert self.prepared, "You have to call prepare before handle" - rset, wset, xset, selected = self._select() + rset, wset, xset, selected = self._select() if not self.poll else self._poll_select() for readable in rset: if readable == self._read.fileno(): # don't care i just need to clean readable flag @@ -343,6 +384,8 @@ class TNonblockingServer(object): connection.read() if connection.received: connection.status = WAIT_PROCESS + if self.poll: + self.poll.unregister(connection.fileno()) msg = connection.received.popleft() itransport = TTransport.TMemoryBuffer(msg.buffer, msg.offset) otransport = TTransport.TMemoryBuffer() @@ -354,7 +397,6 @@ class TNonblockingServer(object): self.clients[writeable].write() for oob in xset: self.clients[oob].close() - del self.clients[oob] def close(self): """Closes the server."""