# HG changeset patch # User paulb # Date 1127842004 0 # Node ID 0432c66b661e33c11fce6536632289399c7d26ab # Parent b18381a29301284b06faa6d874ddc3b0c12a0f08 [project @ 2005-09-27 17:26:44 by paulb] Changed the Exchange ready method to use poll rather than select. This should now detect pipe closures and automatically remove channels. Made a separate wait method for the Channel class. Changed the start function to explicitly close the open channel. diff -r b18381a29301 -r 0432c66b661e parallel.py --- a/parallel.py Tue Sep 27 14:18:01 2005 +0000 +++ b/parallel.py Tue Sep 27 17:26:44 2005 +0000 @@ -77,6 +77,7 @@ issues with restartable reads from file descriptors when interrupted by signals: http://mail.python.org/pipermail/python-dev/2002-September/028572.html +http://twistedmatrix.com/bugs/issue733 """ import os @@ -120,6 +121,12 @@ self.closed = 1 self.read_pipe.close() self.write_pipe.close() + self.wait() + + def wait(self): + + "Wait for the created process, if any, to exit." + if self.pid != 0: try: os.waitpid(self.pid, os.WNOHANG) @@ -155,8 +162,10 @@ "Initialise the exchange with the given 'channels'." self.readables = {} + self.poller = select.poll() for channel in channels: - self.readables[channel.read_pipe] = channel + self.readables[channel.read_pipe.fileno()] = channel + self.poller.register(channel.read_pipe.fileno(), select.POLLIN | select.POLLOUT | select.POLLHUP | select.POLLNVAL | select.POLLERR) def active(self): @@ -172,17 +181,22 @@ to be read from. """ - try: - if timeout is not None: - t = select.select(self.readables.keys(), [], [], timeout) - else: - t = select.select(self.readables.keys(), [], []) - except select.error: - return [] + fds = self.poller.poll(timeout) + readables = [] + for fd, status in fds: + channel = self.readables[fd] + + # Record readable channels. - readable_fds, writable_fds, exceptional_fds = t - readable = [self.readables[fd] for fd in readable_fds] - return readable + if status & select.POLLIN: + readables.append(channel) + + # Remove ended/error channels. + + if status & (select.POLLOUT | select.POLLHUP | select.POLLNVAL | select.POLLERR): + self.remove(channel) + + return readables def remove(self, channel): @@ -190,7 +204,8 @@ Remove the given 'channel' from the exchange. """ - del self.readables[channel.read_pipe] + del self.readables[channel.read_pipe.fileno()] + self.poller.unregister(channel.read_pipe.fileno()) def create(): @@ -227,7 +242,8 @@ exc_type, exc_value, exc_traceback = sys.exc_info() channel.send(exc_value) finally: - sys.exit(0) + channel.close() + raise SystemExit else: return channel