1.1 --- a/parallel.py Wed Sep 28 22:43:40 2005 +0000
1.2 +++ b/parallel.py Thu Sep 29 16:20:06 2005 +0000
1.3 @@ -52,10 +52,15 @@
1.4 are ready, the whole activity will take much longer than necessary.
1.5
1.6 One solution to the problem of knowing when to read from channels is to create
1.7 -an Exchange object, initialising it with a list of channels through which data
1.8 -is expected to arrive:
1.9 +an Exchange object, optionally initialising it with a list of channels through
1.10 +which data is expected to arrive:
1.11
1.12 -exchange = Exchange(channels)
1.13 +exchange = Exchange() # populate the exchange later
1.14 +exchange = Exchange(channels) # populate the exchange with channels
1.15 +
1.16 +We can add channels to the exchange using the add method:
1.17 +
1.18 +exchange.add(channel)
1.19
1.20 To test whether an exchange is active - that is, whether it is actually
1.21 monitoring any channels - we can use the active method which returns all
1.22 @@ -93,6 +98,9 @@
1.23 topic):
1.24
1.25 http://twistedmatrix.com/pipermail/twisted-python/2005-February/009666.html
1.26 +
1.27 +It would seem, from using sockets and from studying the asycore module, that
1.28 +sockets are more predictable than pipes.
1.29 """
1.30
1.31 __version__ = "0.1"
1.32 @@ -107,6 +115,9 @@
1.33 except ImportError:
1.34 import pickle
1.35
1.36 +class AcknowledgementError(Exception):
1.37 + pass
1.38 +
1.39 class Channel:
1.40
1.41 "A communications channel."
1.42 @@ -129,17 +140,17 @@
1.43 # Since signals don't work well with I/O, we close pipes and wait for
1.44 # created processes upon finalisation.
1.45
1.46 - if not self.closed:
1.47 - self.close()
1.48 + self.close()
1.49
1.50 def close(self):
1.51
1.52 "Explicitly close the channel."
1.53
1.54 - self.closed = 1
1.55 - self.read_pipe.close()
1.56 - self.write_pipe.close()
1.57 - self.wait(os.WNOHANG)
1.58 + if not self.closed:
1.59 + self.closed = 1
1.60 + self.read_pipe.close()
1.61 + self.write_pipe.close()
1.62 + #self.wait(os.WNOHANG)
1.63
1.64 def wait(self, options=0):
1.65
1.66 @@ -151,14 +162,27 @@
1.67 except OSError:
1.68 pass
1.69
1.70 - def send(self, obj):
1.71 + def _send(self, obj):
1.72
1.73 "Send the given object 'obj' through the channel."
1.74
1.75 pickle.dump(obj, self.write_pipe)
1.76 self.write_pipe.flush()
1.77
1.78 - def receive(self):
1.79 + def send(self, obj):
1.80 +
1.81 + """
1.82 + Send the given object 'obj' through the channel. Then wait for an
1.83 + acknowledgement. (The acknowledgement makes the caller wait, thus
1.84 + preventing processes from exiting and disrupting the communications
1.85 + channel and losing data.)
1.86 + """
1.87 +
1.88 + self._send(obj)
1.89 + if self._receive() != "OK":
1.90 + raise AcknowledgementError, obj
1.91 +
1.92 + def _receive(self):
1.93
1.94 "Receive an object through the channel, returning the object."
1.95
1.96 @@ -168,6 +192,21 @@
1.97 else:
1.98 return obj
1.99
1.100 + def receive(self):
1.101 +
1.102 + """
1.103 + Receive an object through the channel, returning the object. Send an
1.104 + acknowledgement of receipt. (The acknowledgement makes the sender wait,
1.105 + thus preventing processes from exiting and disrupting the communications
1.106 + channel and losing data.)
1.107 + """
1.108 +
1.109 + try:
1.110 + obj = self._receive()
1.111 + return obj
1.112 + finally:
1.113 + self._send("OK")
1.114 +
1.115 class Exchange:
1.116
1.117 """
1.118 @@ -175,15 +214,27 @@
1.119 ready to communicate.
1.120 """
1.121
1.122 - def __init__(self, channels):
1.123 + def __init__(self, channels=None, autoclose=1):
1.124
1.125 - "Initialise the exchange with the given 'channels'."
1.126 + """
1.127 + Initialise the exchange with an optional list of 'channels'. If the
1.128 + optional 'autoclose' parameter is set to a false value, channels will
1.129 + not be closed automatically when they are removed from the exchange - by
1.130 + default they are closed when removed.
1.131 + """
1.132
1.133 + self.autoclose = autoclose
1.134 self.readables = {}
1.135 self.poller = select.poll()
1.136 - for channel in channels:
1.137 - self.readables[channel.read_pipe.fileno()] = channel
1.138 - self.poller.register(channel.read_pipe.fileno(), select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR)
1.139 + for channel in channels or []:
1.140 + self.add(channel)
1.141 +
1.142 + def add(self, channel):
1.143 +
1.144 + "Add the given 'channel' to the exchange."
1.145 +
1.146 + self.readables[channel.read_pipe.fileno()] = channel
1.147 + self.poller.register(channel.read_pipe.fileno(), select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR)
1.148
1.149 def active(self):
1.150
1.151 @@ -224,6 +275,9 @@
1.152
1.153 del self.readables[channel.read_pipe.fileno()]
1.154 self.poller.unregister(channel.read_pipe.fileno())
1.155 + if self.autoclose:
1.156 + channel.close()
1.157 + channel.wait()
1.158
1.159 def create():
1.160