pprocess

Changeset

129:7e614f0cccb6
2007-11-24 paulb raw files shortlog changelog graph [project @ 2007-11-24 00:00:18 by paulb] Moved the reference material from the module docstring to a separate document.
docs/reference.html (file)
     1.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     1.2 +++ b/docs/reference.html	Sat Nov 24 00:00:18 2007 +0000
     1.3 @@ -0,0 +1,367 @@
     1.4 +<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
     1.5 +<html xmlns="http://www.w3.org/1999/xhtml" lang="en-gb">
     1.6 +<head>
     1.7 +  <meta http-equiv="content-type" content="text/html; charset=UTF-8" />
     1.8 +  <title>pprocess - Reference</title>
     1.9 +  <link href="styles.css" rel="stylesheet" type="text/css" />
    1.10 +</head>
    1.11 +<body>
    1.12 +
    1.13 +<h1>pprocess - Reference</h1>
    1.14 +
    1.15 +<p>The <code>pprocess</code> module defines a simple parallel processing API
    1.16 +for Python, inspired somewhat by the <code>thread</code> module, slightly less
    1.17 +by <a href="http://datamining.anu.edu.au/~ole/pypar/">pypar</a>, and slightly
    1.18 +less still by <a href="http://pypvm.sourceforge.net/">pypvm</a>.</p>
    1.19 +
    1.20 +<p>This document complements the <a href="tutorial.html">tutorial</a> by
    1.21 +providing an overview of the different styles of parallel programming supported
    1.22 +by the module. For an introduction and in order to get a clearer idea of the
    1.23 +most suitable styles for your own programs, consult the
    1.24 +<a href="tutorial.html">tutorial</a>.</p>
    1.25 +
    1.26 +<ul>
    1.27 +<li><a href="#Thread">Thread-style Processing</a></li>
    1.28 +<li><a href="#Fork">Fork-style Processing</a></li>
    1.29 +<li><a href="#Exchanges">Message Exchanges</a></li>
    1.30 +<li><a href="#Convenient">Convenient Message Exchanges</a></li>
    1.31 +<li><a href="#Queues">Exchanges as Queues</a></li>
    1.32 +<li><a href="#Maps">Exchanges as Maps</a></li>
    1.33 +<li><a href="#Managed">Managed Callables</a></li>
    1.34 +<li><a href="#MakeParallel">Making Existing Functions Parallel</a></li>
    1.35 +<li><a href="#Map">Map-style Processing</a></li>
    1.36 +<li><a href="#Reusing">Reusing Processes and Channels</a></li>
    1.37 +<li><a href="#MakeReusable">Making Existing Functions Parallel and Reusable</a></li>
    1.38 +<li><a href="#Implementation">Implementation Notes</a></li>
    1.39 +</ul>
    1.40 +
    1.41 +<h2 id="Thread">Thread-style Processing</h2>
    1.42 +
    1.43 +<p>To create new processes to run a function or any callable object, specify the
    1.44 +"callable" and any arguments as follows:</p>
    1.45 +
    1.46 +<pre>
    1.47 +channel = pprocess.start(fn, arg1, arg2, named1=value1, named2=value2)
    1.48 +</pre>
    1.49 +
    1.50 +<p>This returns a channel which can then be used to communicate with the created
    1.51 +process. Meanwhile, in the created process, the given callable will be invoked
    1.52 +with another channel as its first argument followed by the specified arguments:</p>
    1.53 +
    1.54 +<pre>
    1.55 +def fn(channel, arg1, arg2, named1, named2):
    1.56 +    # Read from and write to the channel.
    1.57 +    # Return value is ignored.
    1.58 +    ...
    1.59 +</pre>
    1.60 +
    1.61 +<h2 id="Fork">Fork-style Processing</h2>
    1.62 +
    1.63 +<p>To create new processes in a similar way to that employed when using <code>os.fork</code>
    1.64 +(ie. the <code>fork</code> system call on various operating systems), use the following
    1.65 +method:</p>
    1.66 +
    1.67 +<pre>
    1.68 +channel = pprocess.create()
    1.69 +if channel.pid == 0:
    1.70 +    # This code is run by the created process.
    1.71 +    # Read from and write to the channel to communicate with the
    1.72 +    # creating/calling process.
    1.73 +    # An explicit exit of the process may be desirable to prevent the process
    1.74 +    # from running code which is intended for the creating/calling process.
    1.75 +    ...
    1.76 +    pprocess.exit(channel)
    1.77 +else:
    1.78 +    # This code is run by the creating/calling process.
    1.79 +    # Read from and write to the channel to communicate with the created
    1.80 +    # process.
    1.81 +    ...
    1.82 +</pre>
    1.83 +
    1.84 +<h2 id="Exchanges">Message Exchanges</h2>
    1.85 +
    1.86 +<p>When creating many processes, each providing results for the consumption of the
    1.87 +main process, the collection of those results in an efficient fashion can be
    1.88 +problematic: if some processes take longer than others, and if we decide to read
    1.89 +from those processes when they are not ready instead of other processes which
    1.90 +are ready, the whole activity will take much longer than necessary.</p>
    1.91 +
    1.92 +<p>One solution to the problem of knowing when to read from channels is to create
    1.93 +an <code>Exchange</code> object, optionally initialising it with a list of channels
    1.94 +through which data is expected to arrive:</p>
    1.95 +
    1.96 +<pre>
    1.97 +exchange = pprocess.Exchange()           # populate the exchange later
    1.98 +exchange = pprocess.Exchange(channels)   # populate the exchange with channels
    1.99 +</pre>
   1.100 +
   1.101 +<p>We can add channels to the exchange using the <code>add</code> method:</p>
   1.102 +
   1.103 +<pre>
   1.104 +exchange.add(channel)
   1.105 +</pre>
   1.106 +
   1.107 +<p>To test whether an exchange is active - that is, whether it is actually
   1.108 +monitoring any channels - we can use the <code>active</code> method which
   1.109 +returns all channels being monitored by the exchange:</p>
   1.110 +
   1.111 +<pre>
   1.112 +channels = exchange.active()
   1.113 +</pre>
   1.114 +
   1.115 +<p>We may then check the exchange to see whether any data is ready to be received;
   1.116 +for example:</p>
   1.117 +
   1.118 +<pre>
   1.119 +for channel in exchange.ready():
   1.120 +    # Read from and write to the channel.
   1.121 +    ...
   1.122 +</pre>
   1.123 +
   1.124 +<p>If we do not wish to wait indefinitely for a list of channels, we can set a
   1.125 +timeout value as an argument to the <code>ready</code> method (as a floating
   1.126 +point number specifying the timeout in seconds, where <code>0</code> means a
   1.127 +non-blocking poll as stated in the <code>select</code> module's <code>select</code>
   1.128 +function documentation).</p>
   1.129 +
   1.130 +<h2 id="Convenient">Convenient Message Exchanges</h2>
   1.131 +
   1.132 +<p>A convenient form of message exchanges can be adopted by defining a subclass of
   1.133 +the <code>Exchange</code> class and defining a particular method:</p>
   1.134 +
   1.135 +<pre>
   1.136 +class MyExchange(pprocess.Exchange):
   1.137 +    def store_data(self, channel):
   1.138 +        data = channel.receive()
   1.139 +        # Do something with data here.
   1.140 +</pre>
   1.141 +
   1.142 +<p>The exact operations performed on the received data might be as simple as
   1.143 +storing it on an instance attribute. To make use of the exchange, we would
   1.144 +instantiate it as usual:</p>
   1.145 +
   1.146 +<pre>
   1.147 +exchange = MyExchange()         # populate the exchange later
   1.148 +exchange = MyExchange(limit=10) # set a limit for later population
   1.149 +</pre>
   1.150 +
   1.151 +<p>The exchange can now be used in a simpler fashion than that shown above. We can
   1.152 +add channels as before using the <code>add</code> method, or we can choose to only
   1.153 +add channels if the specified limit of channels is not exceeded:</p>
   1.154 +
   1.155 +<pre>
   1.156 +exchange.add(channel)           # add a channel as normal
   1.157 +exchange.add_wait(channel)      # add a channel, waiting if the limit would be
   1.158 +                                # exceeded
   1.159 +</pre>
   1.160 +
   1.161 +<p>Or we can request that the exchange create a channel on our behalf:</p>
   1.162 +
   1.163 +<pre>
   1.164 +channel = exchange.create()
   1.165 +</pre>
   1.166 +
   1.167 +<p>We can even start processes and monitor channels without ever handling the
   1.168 +channel ourselves:</p>
   1.169 +
   1.170 +<pre>
   1.171 +exchange.start(fn, arg1, arg2, named1=value1, named2=value2)
   1.172 +</pre>
   1.173 +
   1.174 +<p>We can explicitly wait for "free space" for channels by calling the
   1.175 +<code>wait</code> method, although the <code>start</code> and <code>add_wait</code>
   1.176 +methods make this less interesting:</p>
   1.177 +
   1.178 +<pre>
   1.179 +exchange.wait()
   1.180 +</pre>
   1.181 +
   1.182 +<p>Finally, when finishing the computation, we can choose to merely call the
   1.183 +<code>finish</code> method and have the remaining data processed automatically:</p>
   1.184 +
   1.185 +<pre>
   1.186 +exchange.finish()
   1.187 +</pre>
   1.188 +
   1.189 +<p>Clearly, this approach is less flexible but more convenient than the raw message
   1.190 +exchange API as described above. However, it permits much simpler and clearer
   1.191 +code.</p>
   1.192 +
   1.193 +<h2 id="Queues">Exchanges as Queues</h2>
   1.194 +
   1.195 +<p>Instead of having to subclass the <code>pprocess.Exchange</code> class and
   1.196 +to define the <code>store_data</code> method, it might be more desirable to let
   1.197 +the exchange manage the communications between created and creating processes
   1.198 +and to let the creating process just consume received data as it arrives,
   1.199 +without particular regard for the order of the received data - perhaps the
   1.200 +creating process has its own way of managing such issues.</p>
   1.201 +
   1.202 +<p>For such situations, the <code>Queue</code> class may be instantiated and
   1.203 +channels added to the queue using the various methods provided:</p>
   1.204 +
   1.205 +<pre>
   1.206 +queue = pprocess.Queue(limit=10)
   1.207 +channel = queue.create()
   1.208 +if channel:
   1.209 +    # Do some computation.
   1.210 +    pprocess.exit(channel)
   1.211 +</pre>
   1.212 +
   1.213 +<p>The results can then be consumed by treating the queue like an iterator:</p>
   1.214 +
   1.215 +<pre>
   1.216 +for result in queue:
   1.217 +    # Capture each result.
   1.218 +</pre>
   1.219 +
   1.220 +<p>This approach does not, of course, require the direct handling of channels.
   1.221 +One could instead use the <code>start</code> method on the queue to create
   1.222 +processes and to initiate computations (since a queue is merely an enhanced
   1.223 +exchange with a specific implementation of the <code>store_data</code>
   1.224 +method).</p>
   1.225 +
   1.226 +<h2 id="Maps">Exchanges as Maps</h2>
   1.227 +
   1.228 +<p>Where the above <code>Queue</code> class appears like an attractive solution
   1.229 +for the management of the results of computations, but where the order of their
   1.230 +consumption by the creating process remains important, the <code>Map</code>
   1.231 +class may offer a suitable way of collecting and accessing results:</p>
   1.232 +
   1.233 +<pre>
   1.234 +results = pprocess.Map(limit=10)
   1.235 +for value in inputs:
   1.236 +    results.start(fn, args)
   1.237 +</pre>
   1.238 +
   1.239 +<p>The results can then be consumed in an order corresponding to the order of the
   1.240 +computations which produced them:</p>
   1.241 +
   1.242 +<pre>
   1.243 +for result in results:
   1.244 +    # Process each result.
   1.245 +</pre>
   1.246 +
   1.247 +<p>Internally, the <code>Map</code> object records a particular ordering of
   1.248 +channels, ensuring that the received results can be mapped to this ordering,
   1.249 +and that the results can be made available with this ordering preserved.</p>
   1.250 +
   1.251 +<h2 id="Managed">Managed Callables</h2>
   1.252 +
   1.253 +<p>A further simplification of the above convenient use of message exchanges
   1.254 +involves the creation of callables (eg. functions) which are automatically
   1.255 +monitored by an exchange. We create such a callable by calling the
   1.256 +<code>manage</code> method on an exchange:</p>
   1.257 +
   1.258 +<pre>
   1.259 +myfn = exchange.manage(fn)
   1.260 +</pre>
   1.261 +
   1.262 +<p>This callable can then be invoked instead of using the exchange's
   1.263 +<code>start</code> method:</p>
   1.264 +
   1.265 +<pre>
   1.266 +myfn(arg1, arg2, named1=value1, named2=value2)
   1.267 +</pre>
   1.268 +
   1.269 +<p>The exchange's <code>finish</code> method can be used as usual to process
   1.270 +incoming data.</p>
   1.271 +
   1.272 +<h2 id="MakeParallel">Making Existing Functions Parallel</h2>
   1.273 +
   1.274 +<p>In making a program parallel, existing functions which only return results can
   1.275 +be manually modified to accept and use channels to communicate results back to
   1.276 +the main process. However, a simple alternative is to use the <code>MakeParallel</code>
   1.277 +class to provide a wrapper around unmodified functions which will return the results
   1.278 +from those functions in the channels provided. For example:</p>
   1.279 +
   1.280 +<pre>
   1.281 +fn = pprocess.MakeParallel(originalfn)
   1.282 +</pre>
   1.283 +
   1.284 +<h2 id="Map">Map-style Processing</h2>
   1.285 +
   1.286 +<p>In situations where a callable would normally be used in conjunction with the
   1.287 +Python built-in <code>map</code> function, an alternative solution can be adopted by using
   1.288 +the <code>pmap</code> function:</p>
   1.289 +
   1.290 +<pre>
   1.291 +pprocess.pmap(fn, sequence)
   1.292 +</pre>
   1.293 +
   1.294 +<p>Here, the sequence would have to contain elements that each contain the
   1.295 +required parameters of the specified callable, <code>fn</code>. Note that the
   1.296 +callable does not need to be a parallel-aware function which has a
   1.297 +<code>channel</code> argument: the <code>pmap</code> function automatically
   1.298 +wraps the given callable internally.</p>
   1.299 +
   1.300 +<h2 id="Reusing">Reusing Processes and Channels</h2>
   1.301 +
   1.302 +<p>So far, all parallel computations have been done with newly-created
   1.303 +processes. However, this can seem somewhat inefficient, especially if processes
   1.304 +are being continually created and destroyed (although if this happens too
   1.305 +often, the amount of work done by each process may be too little, anyway). One
   1.306 +solution is to retain processes after they have done their work and request
   1.307 +that they perform more work for each new parallel task or invocation. To enable
   1.308 +the reuse of processes in this way, a special keyword argument may be specified
   1.309 +when creating <code>Exchange</code> instances (and instances of subclasses such
   1.310 +as <code>Map</code> and <code>Queue</code>). For example:</p>
   1.311 +
   1.312 +<pre>
   1.313 +exchange = MyExchange(limit=10, reuse=1) # reuse up to 10 processes
   1.314 +</pre>
   1.315 +
   1.316 +<p>Code invoked through such exchanges must be aware of channels and be
   1.317 +constructed in such a way that it does not terminate after sending a result
   1.318 +back to the creating process. Instead, it should repeatedly wait for subsequent
   1.319 +sets of parameters (compatible with those either in the signature of a callable
   1.320 +or with the original values read from the channel). Reusable code is terminated
   1.321 +when the special value of <code>None</code> is sent from the creating process
   1.322 +to the created process, indicating that no more parameters will be sent; this
   1.323 +should cause the code to terminate.</p>
   1.324 +
   1.325 +<h2 id="MakeReusable">Making Existing Functions Parallel and Reusable</h2>
   1.326 +
   1.327 +<p>An easier way of making reusable code sections for parallel use is to employ the
   1.328 +<code>MakeReusable</code> class to wrap an existing callable:</p>
   1.329 +
   1.330 +<pre>
   1.331 +fn = pprocess.MakeReusable(originalfn)
   1.332 +</pre>
   1.333 +
   1.334 +<p>This wraps the callable in a similar fashion to <code>MakeParallel</code>, but
   1.335 +provides the necessary mechanisms described above for reusable code.</p>
   1.336 +
   1.337 +<h2 id="Implementation">Implementation Notes</h2>
   1.338 +
   1.339 +<h3>Signals and Waiting</h3>
   1.340 +
   1.341 +<p>When created/child processes terminate, one would typically want to be informed
   1.342 +of such conditions using a signal handler. Unfortunately, Python seems to have
   1.343 +issues with restartable reads from file descriptors when interrupted by signals:</p>
   1.344 +
   1.345 +<ul>
   1.346 +<li><a href="http://mail.python.org/pipermail/python-dev/2002-September/028572.html">http://mail.python.org/pipermail/python-dev/2002-September/028572.html</a></li>
   1.347 +<li><a href="http://twistedmatrix.com/bugs/issue733">http://twistedmatrix.com/bugs/issue733</a></li>
   1.348 +</ul>
   1.349 +
   1.350 +<h3>Select and Poll</h3>
   1.351 +
   1.352 +<p>The exact combination of conditions indicating closed pipes remains relatively
   1.353 +obscure. Here is a message/thread describing them (in the context of another
   1.354 +topic):</p>
   1.355 +
   1.356 +<ul>
   1.357 +<li><a href="http://twistedmatrix.com/pipermail/twisted-python/2005-February/009666.html">http://twistedmatrix.com/pipermail/twisted-python/2005-February/009666.html</a></li>
   1.358 +</ul>
   1.359 +
   1.360 +<p>It would seem, from using sockets and from studying the <code>asyncore</code>
   1.361 +module, that sockets are more predictable than pipes.</p>
   1.362 +
   1.363 +<p>Notes about <code>poll</code> implementations can be found here:</p>
   1.364 +
   1.365 +<ul>
   1.366 +<li><a href="http://www.greenend.org.uk/rjk/2001/06/poll.html">http://www.greenend.org.uk/rjk/2001/06/poll.html</a></li>
   1.367 +</ul>
   1.368 +
   1.369 +</body>
   1.370 +</html>