1.1 --- a/pprocess.py Thu Jun 19 21:44:53 2008 +0200
1.2 +++ b/pprocess.py Sat Nov 24 00:00:15 2007 +0000
1.3 @@ -18,289 +18,9 @@
1.4
1.5 You should have received a copy of the GNU Lesser General Public License along
1.6 with this program. If not, see <http://www.gnu.org/licenses/>.
1.7 -
1.8 ---------
1.9 -
1.10 -Thread-style Processing
1.11 ------------------------
1.12 -
1.13 -To create new processes to run a function or any callable object, specify the
1.14 -"callable" and any arguments as follows:
1.15 -
1.16 -channel = pprocess.start(fn, arg1, arg2, named1=value1, named2=value2)
1.17 -
1.18 -This returns a channel which can then be used to communicate with the created
1.19 -process. Meanwhile, in the created process, the given callable will be invoked
1.20 -with another channel as its first argument followed by the specified arguments:
1.21 -
1.22 -def fn(channel, arg1, arg2, named1, named2):
1.23 - # Read from and write to the channel.
1.24 - # Return value is ignored.
1.25 - ...
1.26 -
1.27 -Fork-style Processing
1.28 ----------------------
1.29 -
1.30 -To create new processes in a similar way to that employed when using os.fork
1.31 -(ie. the fork system call on various operating systems), use the following
1.32 -method:
1.33 -
1.34 -channel = pprocess.create()
1.35 -if channel.pid == 0:
1.36 - # This code is run by the created process.
1.37 - # Read from and write to the channel to communicate with the
1.38 - # creating/calling process.
1.39 - # An explicit exit of the process may be desirable to prevent the process
1.40 - # from running code which is intended for the creating/calling process.
1.41 - ...
1.42 - pprocess.exit(channel)
1.43 -else:
1.44 - # This code is run by the creating/calling process.
1.45 - # Read from and write to the channel to communicate with the created
1.46 - # process.
1.47 - ...
1.48 -
1.49 -Message Exchanges
1.50 ------------------
1.51 -
1.52 -When creating many processes, each providing results for the consumption of the
1.53 -main process, the collection of those results in an efficient fashion can be
1.54 -problematic: if some processes take longer than others, and if we decide to read
1.55 -from those processes when they are not ready instead of other processes which
1.56 -are ready, the whole activity will take much longer than necessary.
1.57 -
1.58 -One solution to the problem of knowing when to read from channels is to create
1.59 -an Exchange object, optionally initialising it with a list of channels through
1.60 -which data is expected to arrive:
1.61 -
1.62 -exchange = pprocess.Exchange() # populate the exchange later
1.63 -exchange = pprocess.Exchange(channels) # populate the exchange with channels
1.64 -
1.65 -We can add channels to the exchange using the add method:
1.66 -
1.67 -exchange.add(channel)
1.68 -
1.69 -To test whether an exchange is active - that is, whether it is actually
1.70 -monitoring any channels - we can use the active method which returns all
1.71 -channels being monitored by the exchange:
1.72 -
1.73 -channels = exchange.active()
1.74 -
1.75 -We may then check the exchange to see whether any data is ready to be received;
1.76 -for example:
1.77 -
1.78 -for channel in exchange.ready():
1.79 - # Read from and write to the channel.
1.80 - ...
1.81 -
1.82 -If we do not wish to wait indefinitely for a list of channels, we can set a
1.83 -timeout value as an argument to the ready method (as a floating point number
1.84 -specifying the timeout in seconds, where 0 means a non-blocking poll as stated
1.85 -in the select module's select function documentation).
1.86 -
1.87 -Convenient Message Exchanges
1.88 -----------------------------
1.89 -
1.90 -A convenient form of message exchanges can be adopted by defining a subclass of
1.91 -the Exchange class and defining a particular method:
1.92 -
1.93 -class MyExchange(pprocess.Exchange):
1.94 - def store_data(self, channel):
1.95 - data = channel.receive()
1.96 - # Do something with data here.
1.97 -
1.98 -The exact operations performed on the received data might be as simple as
1.99 -storing it on an instance attribute. To make use of the exchange, we would
1.100 -instantiate it as usual:
1.101 -
1.102 -exchange = MyExchange() # populate the exchange later
1.103 -exchange = MyExchange(limit=10) # set a limit for later population
1.104 -
1.105 -The exchange can now be used in a simpler fashion than that shown above. We can
1.106 -add channels as before using the add method, or we can choose to only add
1.107 -channels if the specified limit of channels is not exceeded:
1.108 -
1.109 -exchange.add(channel) # add a channel as normal
1.110 -exchange.add_wait(channel) # add a channel, waiting if the limit would be
1.111 - # exceeded
1.112 -
1.113 -Or we can request that the exchange create a channel on our behalf:
1.114 -
1.115 -channel = exchange.create()
1.116 -
1.117 -We can even start processes and monitor channels without ever handling the
1.118 -channel ourselves:
1.119 -
1.120 -exchange.start(fn, arg1, arg2, named1=value1, named2=value2)
1.121 -
1.122 -We can explicitly wait for "free space" for channels by calling the wait method,
1.123 -although the start and add_wait methods make this less interesting:
1.124 -
1.125 -exchange.wait()
1.126 -
1.127 -Finally, when finishing the computation, we can choose to merely call the finish
1.128 -method and have the remaining data processed automatically:
1.129 -
1.130 -exchange.finish()
1.131 -
1.132 -Clearly, this approach is less flexible but more convenient than the raw message
1.133 -exchange API as described above. However, it permits much simpler and clearer
1.134 -code.
1.135 -
1.136 -Exchanges as Queues
1.137 --------------------
1.138 -
1.139 -Instead of having to subclass the pprocess.Exchange class and to define the
1.140 -store_data method, it might be more desirable to let the exchange manage the
1.141 -communications between created and creating processes and to let the creating
1.142 -process just consume received data as it arrives, without particular regard for
1.143 -the order of the received data - perhaps the creating process has its own way of
1.144 -managing such issues.
1.145 -
1.146 -For such situations, the Queue class may be instantiated and channels added to
1.147 -the queue using the various methods provided:
1.148 -
1.149 -queue = pprocess.Queue(limit=10)
1.150 -channel = queue.create()
1.151 -if channel:
1.152 - # Do some computation.
1.153 - pprocess.exit(channel)
1.154 -
1.155 -The results can then be consumed by treating the queue like an iterator:
1.156 -
1.157 -for result in queue:
1.158 - # Capture each result.
1.159 -
1.160 -This approach does not, of course, require the direct handling of channels. One
1.161 -could instead use the start method on the queue to create processes and to
1.162 -initiate computations (since a queue is merely an enhanced exchange with a
1.163 -specific implementation of the store_data method).
1.164 -
1.165 -Exchanges as Maps
1.166 ------------------
1.167 -
1.168 -Where the above Queue class appears like an attractive solution for the
1.169 -management of the results of computations, but where the order of their
1.170 -consumption by the creating process remains important, the Map class may offer a
1.171 -suitable way of collecting and accessing results:
1.172 -
1.173 -results = pprocess.Map(limit=10)
1.174 -for value in inputs:
1.175 - results.start(fn, args)
1.176 -
1.177 -The results can then be consumed in an order corresponding to the order of the
1.178 -computations which produced them:
1.179 -
1.180 -for result in results:
1.181 - # Process each result.
1.182 -
1.183 -Internally, the Map object records a particular ordering of channels, ensuring
1.184 -that the received results can be mapped to this ordering, and that the results
1.185 -can be made available with this ordering preserved.
1.186 -
1.187 -Managed Callables
1.188 ------------------
1.189 -
1.190 -A further simplification of the above convenient use of message exchanges
1.191 -involves the creation of callables (eg. functions) which are automatically
1.192 -monitored by an exchange. We create such a callable by calling the manage method
1.193 -on an exchange:
1.194 -
1.195 -myfn = exchange.manage(fn)
1.196 -
1.197 -This callable can then be invoked instead of using the exchange's start method:
1.198 -
1.199 -myfn(arg1, arg2, named1=value1, named2=value2)
1.200 -
1.201 -The exchange's finish method can be used as usual to process incoming data.
1.202 -
1.203 -Making Existing Functions Parallel
1.204 -----------------------------------
1.205 -
1.206 -In making a program parallel, existing functions which only return results can
1.207 -be manually modified to accept and use channels to communicate results back to
1.208 -the main process. However, a simple alternative is to use the MakeParallel class
1.209 -to provide a wrapper around unmodified functions which will return the results
1.210 -from those functions in the channels provided. For example:
1.211 -
1.212 -fn = pprocess.MakeParallel(originalfn)
1.213 -
1.214 -Map-style Processing
1.215 ---------------------
1.216 -
1.217 -In situations where a callable would normally be used in conjunction with the
1.218 -Python built-in map function, an alternative solution can be adopted by using
1.219 -the pmap function:
1.220 -
1.221 -pprocess.pmap(fn, sequence)
1.222 -
1.223 -Here, the sequence would have to contain elements that each contain the required
1.224 -parameters of the specified callable, fn. Note that the callable does not need
1.225 -to be a parallel-aware function which has a channel argument: the pmap function
1.226 -automatically wraps the given callable internally.
1.227 -
1.228 -Reusing Processes and Channels
1.229 -------------------------------
1.230 -
1.231 -So far, all parallel computations have been done with newly-created processes.
1.232 -However, this can seem somewhat inefficient, especially if processes are being
1.233 -continually created and destroyed (although if this happens too often, the
1.234 -amount of work done by each process may be too little, anyway). One solution is
1.235 -to retain processes after they have done their work and request that they
1.236 -perform more work for each new parallel task or invocation. To enable the reuse
1.237 -of processes in this way, a special keyword argument may be specified when
1.238 -creating Exchange objects (and subclasses such as Map and Queue). For example:
1.239 -
1.240 -exchange = MyExchange(limit=10, reuse=1) # reuse up to 10 processes
1.241 -
1.242 -Code invoked through such exchanges must be aware of channels and be constructed
1.243 -in such a way that it does not terminate after sending a result back to the
1.244 -creating process. Instead, it should repeatedly wait for subsequent sets of
1.245 -parameters (compatible with those either in the signature of a callable or with
1.246 -the original values read from the channel). Reusable code is terminated when the
1.247 -special value of None is sent from the creating process to the created process,
1.248 -indicating that no more parameters will be sent; this should cause the code to
1.249 -terminate.
1.250 -
1.251 -Making Existing Functions Parallel and Reusable
1.252 ------------------------------------------------
1.253 -
1.254 -An easier way of making reusable code sections for parallel use is to employ the
1.255 -MakeReusable class to wrap an existing callable:
1.256 -
1.257 -fn = pprocess.MakeReusable(originalfn)
1.258 -
1.259 -This wraps the callable in a similar fashion to MakeParallel, but provides the
1.260 -necessary mechanisms described above for reusable code.
1.261 -
1.262 -Signals and Waiting
1.263 --------------------
1.264 -
1.265 -When created/child processes terminate, one would typically want to be informed
1.266 -of such conditions using a signal handler. Unfortunately, Python seems to have
1.267 -issues with restartable reads from file descriptors when interrupted by signals:
1.268 -
1.269 -http://mail.python.org/pipermail/python-dev/2002-September/028572.html
1.270 -http://twistedmatrix.com/bugs/issue733
1.271 -
1.272 -Select and Poll
1.273 ----------------
1.274 -
1.275 -The exact combination of conditions indicating closed pipes remains relatively
1.276 -obscure. Here is a message/thread describing them (in the context of another
1.277 -topic):
1.278 -
1.279 -http://twistedmatrix.com/pipermail/twisted-python/2005-February/009666.html
1.280 -
1.281 -It would seem, from using sockets and from studying the asyncore module, that
1.282 -sockets are more predictable than pipes.
1.283 -
1.284 -Notes about poll implementations can be found here:
1.285 -
1.286 -http://www.greenend.org.uk/rjk/2001/06/poll.html
1.287 """
1.288
1.289 -__version__ = "0.3"
1.290 +__version__ = "0.3.1"
1.291
1.292 import os
1.293 import sys