# HG changeset patch # User paulb # Date 1168815158 0 # Node ID d4ddb6100cd3863c7775e003d6cc9a3c9c49698b # Parent 3a3b94c775772a31115ad6685fbb6bcb53e78e7c [project @ 2007-01-14 22:52:38 by paulb] Added extensions to the Exchange API in order to provide more convenient result processing through specially defined subclasses of Exchange. Updated the docstring and copyright information. diff -r 3a3b94c77577 -r d4ddb6100cd3 pprocess.py --- a/pprocess.py Sun Jan 14 22:28:43 2007 +0000 +++ b/pprocess.py Sun Jan 14 22:52:38 2007 +0000 @@ -4,7 +4,7 @@ A simple parallel processing API for Python, inspired somewhat by the thread module, slightly less by pypar, and slightly less still by pypvm. -Copyright (C) 2005, 2006 Paul Boddie +Copyright (C) 2005, 2006, 2007 Paul Boddie This software is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as @@ -23,6 +23,12 @@ -------- +The recommended styles of programming using pprocess involve the "Thread-style +Processing" and "Convenient Message Exchanges" sections below, although +developers may wish to read the "Message Exchanges" section for more details of +the API concerned, and the "Fork-style Processing" section may be of interest to +those with experience of large scale parallel processing systems. + Thread-style Processing ----------------------- @@ -99,6 +105,45 @@ specifying the timeout in seconds, where 0 means a non-blocking poll as stated in the select module's select function documentation). +Convenient Message Exchanges +---------------------------- + +A convenient form of message exchanges can be adopted by defining a subclass of +the Exchange class and defining a particular method: + +class MyExchange(Exchange): + def store_data(self, channel): + data = channel.receive() + # Do something with data here. + +The exact operations performed on the received data might be as simple as +storing it on an instance attribute. To make use of the exchange, we would +instantiate it as usual: + +exchange = MyExchange() # populate the exchange later +exchange = MyExchange(limit=10) # set a limit for later population + +The exchange can now be used in a simpler fashion than that shown above. We can +add channels as before using the add method, or we can choose to only add +channels if the specified limit of channels is not exceeded: + +exchange.add(channel) # add a channel as normal +exchange.add_wait(channel) # add a channel, waiting if the limit would be + # exceeded + +We can explicitly wait for "free space" for channels by calling the wait method: + +exchange.wait() + +Finally, when finishing the computation, we can choose to merely call the finish +method and have the remaining data processed automatically: + +exchange.finish() + +Clearly, this approach is less flexible but more convenient than the raw message +exchange API as described above. However, it permits much simpler and clearer +code. + Signals and Waiting ------------------- @@ -126,7 +171,7 @@ http://www.greenend.org.uk/rjk/2001/06/poll.html """ -__version__ = "0.2.2" +__version__ = "0.2.3" import os import sys @@ -234,18 +279,26 @@ """ A communications exchange that can be used to detect channels which are - ready to communicate. + ready to communicate. Subclasses of this class can define the 'store_data' + method in order to enable the 'add_wait', 'wait' and 'finish' methods. """ - def __init__(self, channels=None, autoclose=1): + def __init__(self, channels=None, limit=None, autoclose=1): """ - Initialise the exchange with an optional list of 'channels'. If the - optional 'autoclose' parameter is set to a false value, channels will - not be closed automatically when they are removed from the exchange - by - default they are closed when removed. + Initialise the exchange with an optional list of 'channels'. + + If the optional 'limit' is specified, restrictions on the addition of + new channels can be enforced and observed through the 'add_wait', 'wait' + and 'finish' methods. To make use of these methods, create a subclass of + this class and define a working 'store_data' method. + + If the optional 'autoclose' parameter is set to a false value, channels + will not be closed automatically when they are removed from the exchange + - by default they are closed when removed. """ + self.limit = limit self.autoclose = autoclose self.readables = {} self.removed = [] @@ -309,6 +362,55 @@ channel.close() channel.wait() + # Enhanced exchange methods involving channel limits. + + def add_wait(self, channel): + + """ + Add the given 'channel' to the exchange, waiting if the limit on active + channels would be exceeded by adding the channel. + """ + + self.wait() + self.add(channel) + + def wait(self): + + """ + Test for the limit on channels, blocking and reading incoming data until + the number of channels is below the limit. + """ + + # If limited, block until channels have been closed. + + while self.limit is not None and len(self.active()) >= self.limit: + self.store() + + def finish(self): + + """ + Finish the use of the exchange by waiting for all channels to complete. + """ + + while self.active(): + self.store() + + def store(self): + + "For each ready channel, process the incoming data." + + for channel in self.ready(): + self.store_data(channel) + + def store_data(self, channel): + + """ + Store incoming data from the specified 'channel'. In subclasses of this + class, such data could be stored using instance attributes. + """ + + raise NotImplementedError, "store_data" + def create(): """