1.1 --- a/pprocess.py Sun Jan 14 22:28:43 2007 +0000
1.2 +++ b/pprocess.py Sun Jan 14 22:52:38 2007 +0000
1.3 @@ -4,7 +4,7 @@
1.4 A simple parallel processing API for Python, inspired somewhat by the thread
1.5 module, slightly less by pypar, and slightly less still by pypvm.
1.6
1.7 -Copyright (C) 2005, 2006 Paul Boddie <paul@boddie.org.uk>
1.8 +Copyright (C) 2005, 2006, 2007 Paul Boddie <paul@boddie.org.uk>
1.9
1.10 This software is free software; you can redistribute it and/or
1.11 modify it under the terms of the GNU General Public License as
1.12 @@ -23,6 +23,12 @@
1.13
1.14 --------
1.15
1.16 +The recommended styles of programming using pprocess involve the "Thread-style
1.17 +Processing" and "Convenient Message Exchanges" sections below, although
1.18 +developers may wish to read the "Message Exchanges" section for more details of
1.19 +the API concerned, and the "Fork-style Processing" section may be of interest to
1.20 +those with experience of large scale parallel processing systems.
1.21 +
1.22 Thread-style Processing
1.23 -----------------------
1.24
1.25 @@ -99,6 +105,45 @@
1.26 specifying the timeout in seconds, where 0 means a non-blocking poll as stated
1.27 in the select module's select function documentation).
1.28
1.29 +Convenient Message Exchanges
1.30 +----------------------------
1.31 +
1.32 +A convenient form of message exchanges can be adopted by defining a subclass of
1.33 +the Exchange class and defining a particular method:
1.34 +
1.35 +class MyExchange(Exchange):
1.36 + def store_data(self, channel):
1.37 + data = channel.receive()
1.38 + # Do something with data here.
1.39 +
1.40 +The exact operations performed on the received data might be as simple as
1.41 +storing it on an instance attribute. To make use of the exchange, we would
1.42 +instantiate it as usual:
1.43 +
1.44 +exchange = MyExchange() # populate the exchange later
1.45 +exchange = MyExchange(limit=10) # set a limit for later population
1.46 +
1.47 +The exchange can now be used in a simpler fashion than that shown above. We can
1.48 +add channels as before using the add method, or we can choose to only add
1.49 +channels if the specified limit of channels is not exceeded:
1.50 +
1.51 +exchange.add(channel) # add a channel as normal
1.52 +exchange.add_wait(channel) # add a channel, waiting if the limit would be
1.53 + # exceeded
1.54 +
1.55 +We can explicitly wait for "free space" for channels by calling the wait method:
1.56 +
1.57 +exchange.wait()
1.58 +
1.59 +Finally, when finishing the computation, we can choose to merely call the finish
1.60 +method and have the remaining data processed automatically:
1.61 +
1.62 +exchange.finish()
1.63 +
1.64 +Clearly, this approach is less flexible but more convenient than the raw message
1.65 +exchange API as described above. However, it permits much simpler and clearer
1.66 +code.
1.67 +
1.68 Signals and Waiting
1.69 -------------------
1.70
1.71 @@ -126,7 +171,7 @@
1.72 http://www.greenend.org.uk/rjk/2001/06/poll.html
1.73 """
1.74
1.75 -__version__ = "0.2.2"
1.76 +__version__ = "0.2.3"
1.77
1.78 import os
1.79 import sys
1.80 @@ -234,18 +279,26 @@
1.81
1.82 """
1.83 A communications exchange that can be used to detect channels which are
1.84 - ready to communicate.
1.85 + ready to communicate. Subclasses of this class can define the 'store_data'
1.86 + method in order to enable the 'add_wait', 'wait' and 'finish' methods.
1.87 """
1.88
1.89 - def __init__(self, channels=None, autoclose=1):
1.90 + def __init__(self, channels=None, limit=None, autoclose=1):
1.91
1.92 """
1.93 - Initialise the exchange with an optional list of 'channels'. If the
1.94 - optional 'autoclose' parameter is set to a false value, channels will
1.95 - not be closed automatically when they are removed from the exchange - by
1.96 - default they are closed when removed.
1.97 + Initialise the exchange with an optional list of 'channels'.
1.98 +
1.99 + If the optional 'limit' is specified, restrictions on the addition of
1.100 + new channels can be enforced and observed through the 'add_wait', 'wait'
1.101 + and 'finish' methods. To make use of these methods, create a subclass of
1.102 + this class and define a working 'store_data' method.
1.103 +
1.104 + If the optional 'autoclose' parameter is set to a false value, channels
1.105 + will not be closed automatically when they are removed from the exchange
1.106 + - by default they are closed when removed.
1.107 """
1.108
1.109 + self.limit = limit
1.110 self.autoclose = autoclose
1.111 self.readables = {}
1.112 self.removed = []
1.113 @@ -309,6 +362,55 @@
1.114 channel.close()
1.115 channel.wait()
1.116
1.117 + # Enhanced exchange methods involving channel limits.
1.118 +
1.119 + def add_wait(self, channel):
1.120 +
1.121 + """
1.122 + Add the given 'channel' to the exchange, waiting if the limit on active
1.123 + channels would be exceeded by adding the channel.
1.124 + """
1.125 +
1.126 + self.wait()
1.127 + self.add(channel)
1.128 +
1.129 + def wait(self):
1.130 +
1.131 + """
1.132 + Test for the limit on channels, blocking and reading incoming data until
1.133 + the number of channels is below the limit.
1.134 + """
1.135 +
1.136 + # If limited, block until channels have been closed.
1.137 +
1.138 + while self.limit is not None and len(self.active()) >= self.limit:
1.139 + self.store()
1.140 +
1.141 + def finish(self):
1.142 +
1.143 + """
1.144 + Finish the use of the exchange by waiting for all channels to complete.
1.145 + """
1.146 +
1.147 + while self.active():
1.148 + self.store()
1.149 +
1.150 + def store(self):
1.151 +
1.152 + "For each ready channel, process the incoming data."
1.153 +
1.154 + for channel in self.ready():
1.155 + self.store_data(channel)
1.156 +
1.157 + def store_data(self, channel):
1.158 +
1.159 + """
1.160 + Store incoming data from the specified 'channel'. In subclasses of this
1.161 + class, such data could be stored using instance attributes.
1.162 + """
1.163 +
1.164 + raise NotImplementedError, "store_data"
1.165 +
1.166 def create():
1.167
1.168 """