1.1 --- a/parallel.py Tue Sep 27 14:18:01 2005 +0000
1.2 +++ b/parallel.py Tue Sep 27 17:26:44 2005 +0000
1.3 @@ -77,6 +77,7 @@
1.4 issues with restartable reads from file descriptors when interrupted by signals:
1.5
1.6 http://mail.python.org/pipermail/python-dev/2002-September/028572.html
1.7 +http://twistedmatrix.com/bugs/issue733
1.8 """
1.9
1.10 import os
1.11 @@ -120,6 +121,12 @@
1.12 self.closed = 1
1.13 self.read_pipe.close()
1.14 self.write_pipe.close()
1.15 + self.wait()
1.16 +
1.17 + def wait(self):
1.18 +
1.19 + "Wait for the created process, if any, to exit."
1.20 +
1.21 if self.pid != 0:
1.22 try:
1.23 os.waitpid(self.pid, os.WNOHANG)
1.24 @@ -155,8 +162,10 @@
1.25 "Initialise the exchange with the given 'channels'."
1.26
1.27 self.readables = {}
1.28 + self.poller = select.poll()
1.29 for channel in channels:
1.30 - self.readables[channel.read_pipe] = channel
1.31 + self.readables[channel.read_pipe.fileno()] = channel
1.32 + self.poller.register(channel.read_pipe.fileno(), select.POLLIN | select.POLLOUT | select.POLLHUP | select.POLLNVAL | select.POLLERR)
1.33
1.34 def active(self):
1.35
1.36 @@ -172,17 +181,22 @@
1.37 to be read from.
1.38 """
1.39
1.40 - try:
1.41 - if timeout is not None:
1.42 - t = select.select(self.readables.keys(), [], [], timeout)
1.43 - else:
1.44 - t = select.select(self.readables.keys(), [], [])
1.45 - except select.error:
1.46 - return []
1.47 + fds = self.poller.poll(timeout)
1.48 + readables = []
1.49 + for fd, status in fds:
1.50 + channel = self.readables[fd]
1.51 +
1.52 + # Record readable channels.
1.53
1.54 - readable_fds, writable_fds, exceptional_fds = t
1.55 - readable = [self.readables[fd] for fd in readable_fds]
1.56 - return readable
1.57 + if status & select.POLLIN:
1.58 + readables.append(channel)
1.59 +
1.60 + # Remove ended/error channels.
1.61 +
1.62 + if status & (select.POLLOUT | select.POLLHUP | select.POLLNVAL | select.POLLERR):
1.63 + self.remove(channel)
1.64 +
1.65 + return readables
1.66
1.67 def remove(self, channel):
1.68
1.69 @@ -190,7 +204,8 @@
1.70 Remove the given 'channel' from the exchange.
1.71 """
1.72
1.73 - del self.readables[channel.read_pipe]
1.74 + del self.readables[channel.read_pipe.fileno()]
1.75 + self.poller.unregister(channel.read_pipe.fileno())
1.76
1.77 def create():
1.78
1.79 @@ -227,7 +242,8 @@
1.80 exc_type, exc_value, exc_traceback = sys.exc_info()
1.81 channel.send(exc_value)
1.82 finally:
1.83 - sys.exit(0)
1.84 + channel.close()
1.85 + raise SystemExit
1.86 else:
1.87 return channel
1.88