# HG changeset patch # User paulb # Date 1128010806 0 # Node ID ab1476a5606eb46ecfa445fcc6ac557a8cc63962 # Parent f6dc55a422875235085c25823db220c74080a390 [project @ 2005-09-29 16:20:06 by paulb] Introduced acknowledgements into the Channel methods to prevent data loss when a sending process exits and disrupts/destroys the socket communications. Removed the wait call in the Channel class's close method. Introduced automatic channel closing in the Exchange class, where channels are closed and the created processes explicitly waited for. Added an add method to the Exchange class, and made the initialiser accept no arguments. diff -r f6dc55a42287 -r ab1476a5606e parallel.py --- a/parallel.py Wed Sep 28 22:43:40 2005 +0000 +++ b/parallel.py Thu Sep 29 16:20:06 2005 +0000 @@ -52,10 +52,15 @@ are ready, the whole activity will take much longer than necessary. One solution to the problem of knowing when to read from channels is to create -an Exchange object, initialising it with a list of channels through which data -is expected to arrive: +an Exchange object, optionally initialising it with a list of channels through +which data is expected to arrive: -exchange = Exchange(channels) +exchange = Exchange() # populate the exchange later +exchange = Exchange(channels) # populate the exchange with channels + +We can add channels to the exchange using the add method: + +exchange.add(channel) To test whether an exchange is active - that is, whether it is actually monitoring any channels - we can use the active method which returns all @@ -93,6 +98,9 @@ topic): http://twistedmatrix.com/pipermail/twisted-python/2005-February/009666.html + +It would seem, from using sockets and from studying the asycore module, that +sockets are more predictable than pipes. """ __version__ = "0.1" @@ -107,6 +115,9 @@ except ImportError: import pickle +class AcknowledgementError(Exception): + pass + class Channel: "A communications channel." @@ -129,17 +140,17 @@ # Since signals don't work well with I/O, we close pipes and wait for # created processes upon finalisation. - if not self.closed: - self.close() + self.close() def close(self): "Explicitly close the channel." - self.closed = 1 - self.read_pipe.close() - self.write_pipe.close() - self.wait(os.WNOHANG) + if not self.closed: + self.closed = 1 + self.read_pipe.close() + self.write_pipe.close() + #self.wait(os.WNOHANG) def wait(self, options=0): @@ -151,14 +162,27 @@ except OSError: pass - def send(self, obj): + def _send(self, obj): "Send the given object 'obj' through the channel." pickle.dump(obj, self.write_pipe) self.write_pipe.flush() - def receive(self): + def send(self, obj): + + """ + Send the given object 'obj' through the channel. Then wait for an + acknowledgement. (The acknowledgement makes the caller wait, thus + preventing processes from exiting and disrupting the communications + channel and losing data.) + """ + + self._send(obj) + if self._receive() != "OK": + raise AcknowledgementError, obj + + def _receive(self): "Receive an object through the channel, returning the object." @@ -168,6 +192,21 @@ else: return obj + def receive(self): + + """ + Receive an object through the channel, returning the object. Send an + acknowledgement of receipt. (The acknowledgement makes the sender wait, + thus preventing processes from exiting and disrupting the communications + channel and losing data.) + """ + + try: + obj = self._receive() + return obj + finally: + self._send("OK") + class Exchange: """ @@ -175,15 +214,27 @@ ready to communicate. """ - def __init__(self, channels): + def __init__(self, channels=None, autoclose=1): - "Initialise the exchange with the given 'channels'." + """ + Initialise the exchange with an optional list of 'channels'. If the + optional 'autoclose' parameter is set to a false value, channels will + not be closed automatically when they are removed from the exchange - by + default they are closed when removed. + """ + self.autoclose = autoclose self.readables = {} self.poller = select.poll() - for channel in channels: - self.readables[channel.read_pipe.fileno()] = channel - self.poller.register(channel.read_pipe.fileno(), select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR) + for channel in channels or []: + self.add(channel) + + def add(self, channel): + + "Add the given 'channel' to the exchange." + + self.readables[channel.read_pipe.fileno()] = channel + self.poller.register(channel.read_pipe.fileno(), select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR) def active(self): @@ -224,6 +275,9 @@ del self.readables[channel.read_pipe.fileno()] self.poller.unregister(channel.read_pipe.fileno()) + if self.autoclose: + channel.close() + channel.wait() def create():