1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
1.2 +++ b/docs/reference.html Sat Nov 24 00:00:18 2007 +0000
1.3 @@ -0,0 +1,367 @@
1.4 +<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
1.5 +<html xmlns="http://www.w3.org/1999/xhtml" lang="en-gb">
1.6 +<head>
1.7 + <meta http-equiv="content-type" content="text/html; charset=UTF-8" />
1.8 + <title>pprocess - Reference</title>
1.9 + <link href="styles.css" rel="stylesheet" type="text/css" />
1.10 +</head>
1.11 +<body>
1.12 +
1.13 +<h1>pprocess - Reference</h1>
1.14 +
1.15 +<p>The <code>pprocess</code> module defines a simple parallel processing API
1.16 +for Python, inspired somewhat by the <code>thread</code> module, slightly less
1.17 +by <a href="http://datamining.anu.edu.au/~ole/pypar/">pypar</a>, and slightly
1.18 +less still by <a href="http://pypvm.sourceforge.net/">pypvm</a>.</p>
1.19 +
1.20 +<p>This document complements the <a href="tutorial.html">tutorial</a> by
1.21 +providing an overview of the different styles of parallel programming supported
1.22 +by the module. For an introduction and in order to get a clearer idea of the
1.23 +most suitable styles for your own programs, consult the
1.24 +<a href="tutorial.html">tutorial</a>.</p>
1.25 +
1.26 +<ul>
1.27 +<li><a href="#Thread">Thread-style Processing</a></li>
1.28 +<li><a href="#Fork">Fork-style Processing</a></li>
1.29 +<li><a href="#Exchanges">Message Exchanges</a></li>
1.30 +<li><a href="#Convenient">Convenient Message Exchanges</a></li>
1.31 +<li><a href="#Queues">Exchanges as Queues</a></li>
1.32 +<li><a href="#Maps">Exchanges as Maps</a></li>
1.33 +<li><a href="#Managed">Managed Callables</a></li>
1.34 +<li><a href="#MakeParallel">Making Existing Functions Parallel</a></li>
1.35 +<li><a href="#Map">Map-style Processing</a></li>
1.36 +<li><a href="#Reusing">Reusing Processes and Channels</a></li>
1.37 +<li><a href="#MakeReusable">Making Existing Functions Parallel and Reusable</a></li>
1.38 +<li><a href="#Implementation">Implementation Notes</a></li>
1.39 +</ul>
1.40 +
1.41 +<h2 id="Thread">Thread-style Processing</h2>
1.42 +
1.43 +<p>To create new processes to run a function or any callable object, specify the
1.44 +"callable" and any arguments as follows:</p>
1.45 +
1.46 +<pre>
1.47 +channel = pprocess.start(fn, arg1, arg2, named1=value1, named2=value2)
1.48 +</pre>
1.49 +
1.50 +<p>This returns a channel which can then be used to communicate with the created
1.51 +process. Meanwhile, in the created process, the given callable will be invoked
1.52 +with another channel as its first argument followed by the specified arguments:</p>
1.53 +
1.54 +<pre>
1.55 +def fn(channel, arg1, arg2, named1, named2):
1.56 + # Read from and write to the channel.
1.57 + # Return value is ignored.
1.58 + ...
1.59 +</pre>
1.60 +
1.61 +<h2 id="Fork">Fork-style Processing</h2>
1.62 +
1.63 +<p>To create new processes in a similar way to that employed when using <code>os.fork</code>
1.64 +(ie. the <code>fork</code> system call on various operating systems), use the following
1.65 +method:</p>
1.66 +
1.67 +<pre>
1.68 +channel = pprocess.create()
1.69 +if channel.pid == 0:
1.70 + # This code is run by the created process.
1.71 + # Read from and write to the channel to communicate with the
1.72 + # creating/calling process.
1.73 + # An explicit exit of the process may be desirable to prevent the process
1.74 + # from running code which is intended for the creating/calling process.
1.75 + ...
1.76 + pprocess.exit(channel)
1.77 +else:
1.78 + # This code is run by the creating/calling process.
1.79 + # Read from and write to the channel to communicate with the created
1.80 + # process.
1.81 + ...
1.82 +</pre>
1.83 +
1.84 +<h2 id="Exchanges">Message Exchanges</h2>
1.85 +
1.86 +<p>When creating many processes, each providing results for the consumption of the
1.87 +main process, the collection of those results in an efficient fashion can be
1.88 +problematic: if some processes take longer than others, and if we decide to read
1.89 +from those processes when they are not ready instead of other processes which
1.90 +are ready, the whole activity will take much longer than necessary.</p>
1.91 +
1.92 +<p>One solution to the problem of knowing when to read from channels is to create
1.93 +an <code>Exchange</code> object, optionally initialising it with a list of channels
1.94 +through which data is expected to arrive:</p>
1.95 +
1.96 +<pre>
1.97 +exchange = pprocess.Exchange() # populate the exchange later
1.98 +exchange = pprocess.Exchange(channels) # populate the exchange with channels
1.99 +</pre>
1.100 +
1.101 +<p>We can add channels to the exchange using the <code>add</code> method:</p>
1.102 +
1.103 +<pre>
1.104 +exchange.add(channel)
1.105 +</pre>
1.106 +
1.107 +<p>To test whether an exchange is active - that is, whether it is actually
1.108 +monitoring any channels - we can use the <code>active</code> method which
1.109 +returns all channels being monitored by the exchange:</p>
1.110 +
1.111 +<pre>
1.112 +channels = exchange.active()
1.113 +</pre>
1.114 +
1.115 +<p>We may then check the exchange to see whether any data is ready to be received;
1.116 +for example:</p>
1.117 +
1.118 +<pre>
1.119 +for channel in exchange.ready():
1.120 + # Read from and write to the channel.
1.121 + ...
1.122 +</pre>
1.123 +
1.124 +<p>If we do not wish to wait indefinitely for a list of channels, we can set a
1.125 +timeout value as an argument to the <code>ready</code> method (as a floating
1.126 +point number specifying the timeout in seconds, where <code>0</code> means a
1.127 +non-blocking poll as stated in the <code>select</code> module's <code>select</code>
1.128 +function documentation).</p>
1.129 +
1.130 +<h2 id="Convenient">Convenient Message Exchanges</h2>
1.131 +
1.132 +<p>A convenient form of message exchanges can be adopted by defining a subclass of
1.133 +the <code>Exchange</code> class and defining a particular method:</p>
1.134 +
1.135 +<pre>
1.136 +class MyExchange(pprocess.Exchange):
1.137 + def store_data(self, channel):
1.138 + data = channel.receive()
1.139 + # Do something with data here.
1.140 +</pre>
1.141 +
1.142 +<p>The exact operations performed on the received data might be as simple as
1.143 +storing it on an instance attribute. To make use of the exchange, we would
1.144 +instantiate it as usual:</p>
1.145 +
1.146 +<pre>
1.147 +exchange = MyExchange() # populate the exchange later
1.148 +exchange = MyExchange(limit=10) # set a limit for later population
1.149 +</pre>
1.150 +
1.151 +<p>The exchange can now be used in a simpler fashion than that shown above. We can
1.152 +add channels as before using the <code>add</code> method, or we can choose to only
1.153 +add channels if the specified limit of channels is not exceeded:</p>
1.154 +
1.155 +<pre>
1.156 +exchange.add(channel) # add a channel as normal
1.157 +exchange.add_wait(channel) # add a channel, waiting if the limit would be
1.158 + # exceeded
1.159 +</pre>
1.160 +
1.161 +<p>Or we can request that the exchange create a channel on our behalf:</p>
1.162 +
1.163 +<pre>
1.164 +channel = exchange.create()
1.165 +</pre>
1.166 +
1.167 +<p>We can even start processes and monitor channels without ever handling the
1.168 +channel ourselves:</p>
1.169 +
1.170 +<pre>
1.171 +exchange.start(fn, arg1, arg2, named1=value1, named2=value2)
1.172 +</pre>
1.173 +
1.174 +<p>We can explicitly wait for "free space" for channels by calling the
1.175 +<code>wait</code> method, although the <code>start</code> and <code>add_wait</code>
1.176 +methods make this less interesting:</p>
1.177 +
1.178 +<pre>
1.179 +exchange.wait()
1.180 +</pre>
1.181 +
1.182 +<p>Finally, when finishing the computation, we can choose to merely call the
1.183 +<code>finish</code> method and have the remaining data processed automatically:</p>
1.184 +
1.185 +<pre>
1.186 +exchange.finish()
1.187 +</pre>
1.188 +
1.189 +<p>Clearly, this approach is less flexible but more convenient than the raw message
1.190 +exchange API as described above. However, it permits much simpler and clearer
1.191 +code.</p>
1.192 +
1.193 +<h2 id="Queues">Exchanges as Queues</h2>
1.194 +
1.195 +<p>Instead of having to subclass the <code>pprocess.Exchange</code> class and
1.196 +to define the <code>store_data</code> method, it might be more desirable to let
1.197 +the exchange manage the communications between created and creating processes
1.198 +and to let the creating process just consume received data as it arrives,
1.199 +without particular regard for the order of the received data - perhaps the
1.200 +creating process has its own way of managing such issues.</p>
1.201 +
1.202 +<p>For such situations, the <code>Queue</code> class may be instantiated and
1.203 +channels added to the queue using the various methods provided:</p>
1.204 +
1.205 +<pre>
1.206 +queue = pprocess.Queue(limit=10)
1.207 +channel = queue.create()
1.208 +if channel:
1.209 + # Do some computation.
1.210 + pprocess.exit(channel)
1.211 +</pre>
1.212 +
1.213 +<p>The results can then be consumed by treating the queue like an iterator:</p>
1.214 +
1.215 +<pre>
1.216 +for result in queue:
1.217 + # Capture each result.
1.218 +</pre>
1.219 +
1.220 +<p>This approach does not, of course, require the direct handling of channels.
1.221 +One could instead use the <code>start</code> method on the queue to create
1.222 +processes and to initiate computations (since a queue is merely an enhanced
1.223 +exchange with a specific implementation of the <code>store_data</code>
1.224 +method).</p>
1.225 +
1.226 +<h2 id="Maps">Exchanges as Maps</h2>
1.227 +
1.228 +<p>Where the above <code>Queue</code> class appears like an attractive solution
1.229 +for the management of the results of computations, but where the order of their
1.230 +consumption by the creating process remains important, the <code>Map</code>
1.231 +class may offer a suitable way of collecting and accessing results:</p>
1.232 +
1.233 +<pre>
1.234 +results = pprocess.Map(limit=10)
1.235 +for value in inputs:
1.236 + results.start(fn, args)
1.237 +</pre>
1.238 +
1.239 +<p>The results can then be consumed in an order corresponding to the order of the
1.240 +computations which produced them:</p>
1.241 +
1.242 +<pre>
1.243 +for result in results:
1.244 + # Process each result.
1.245 +</pre>
1.246 +
1.247 +<p>Internally, the <code>Map</code> object records a particular ordering of
1.248 +channels, ensuring that the received results can be mapped to this ordering,
1.249 +and that the results can be made available with this ordering preserved.</p>
1.250 +
1.251 +<h2 id="Managed">Managed Callables</h2>
1.252 +
1.253 +<p>A further simplification of the above convenient use of message exchanges
1.254 +involves the creation of callables (eg. functions) which are automatically
1.255 +monitored by an exchange. We create such a callable by calling the
1.256 +<code>manage</code> method on an exchange:</p>
1.257 +
1.258 +<pre>
1.259 +myfn = exchange.manage(fn)
1.260 +</pre>
1.261 +
1.262 +<p>This callable can then be invoked instead of using the exchange's
1.263 +<code>start</code> method:</p>
1.264 +
1.265 +<pre>
1.266 +myfn(arg1, arg2, named1=value1, named2=value2)
1.267 +</pre>
1.268 +
1.269 +<p>The exchange's <code>finish</code> method can be used as usual to process
1.270 +incoming data.</p>
1.271 +
1.272 +<h2 id="MakeParallel">Making Existing Functions Parallel</h2>
1.273 +
1.274 +<p>In making a program parallel, existing functions which only return results can
1.275 +be manually modified to accept and use channels to communicate results back to
1.276 +the main process. However, a simple alternative is to use the <code>MakeParallel</code>
1.277 +class to provide a wrapper around unmodified functions which will return the results
1.278 +from those functions in the channels provided. For example:</p>
1.279 +
1.280 +<pre>
1.281 +fn = pprocess.MakeParallel(originalfn)
1.282 +</pre>
1.283 +
1.284 +<h2 id="Map">Map-style Processing</h2>
1.285 +
1.286 +<p>In situations where a callable would normally be used in conjunction with the
1.287 +Python built-in <code>map</code> function, an alternative solution can be adopted by using
1.288 +the <code>pmap</code> function:</p>
1.289 +
1.290 +<pre>
1.291 +pprocess.pmap(fn, sequence)
1.292 +</pre>
1.293 +
1.294 +<p>Here, the sequence would have to contain elements that each contain the
1.295 +required parameters of the specified callable, <code>fn</code>. Note that the
1.296 +callable does not need to be a parallel-aware function which has a
1.297 +<code>channel</code> argument: the <code>pmap</code> function automatically
1.298 +wraps the given callable internally.</p>
1.299 +
1.300 +<h2 id="Reusing">Reusing Processes and Channels</h2>
1.301 +
1.302 +<p>So far, all parallel computations have been done with newly-created
1.303 +processes. However, this can seem somewhat inefficient, especially if processes
1.304 +are being continually created and destroyed (although if this happens too
1.305 +often, the amount of work done by each process may be too little, anyway). One
1.306 +solution is to retain processes after they have done their work and request
1.307 +that they perform more work for each new parallel task or invocation. To enable
1.308 +the reuse of processes in this way, a special keyword argument may be specified
1.309 +when creating <code>Exchange</code> instances (and instances of subclasses such
1.310 +as <code>Map</code> and <code>Queue</code>). For example:</p>
1.311 +
1.312 +<pre>
1.313 +exchange = MyExchange(limit=10, reuse=1) # reuse up to 10 processes
1.314 +</pre>
1.315 +
1.316 +<p>Code invoked through such exchanges must be aware of channels and be
1.317 +constructed in such a way that it does not terminate after sending a result
1.318 +back to the creating process. Instead, it should repeatedly wait for subsequent
1.319 +sets of parameters (compatible with those either in the signature of a callable
1.320 +or with the original values read from the channel). Reusable code is terminated
1.321 +when the special value of <code>None</code> is sent from the creating process
1.322 +to the created process, indicating that no more parameters will be sent; this
1.323 +should cause the code to terminate.</p>
1.324 +
1.325 +<h2 id="MakeReusable">Making Existing Functions Parallel and Reusable</h2>
1.326 +
1.327 +<p>An easier way of making reusable code sections for parallel use is to employ the
1.328 +<code>MakeReusable</code> class to wrap an existing callable:</p>
1.329 +
1.330 +<pre>
1.331 +fn = pprocess.MakeReusable(originalfn)
1.332 +</pre>
1.333 +
1.334 +<p>This wraps the callable in a similar fashion to <code>MakeParallel</code>, but
1.335 +provides the necessary mechanisms described above for reusable code.</p>
1.336 +
1.337 +<h2 id="Implementation">Implementation Notes</h2>
1.338 +
1.339 +<h3>Signals and Waiting</h3>
1.340 +
1.341 +<p>When created/child processes terminate, one would typically want to be informed
1.342 +of such conditions using a signal handler. Unfortunately, Python seems to have
1.343 +issues with restartable reads from file descriptors when interrupted by signals:</p>
1.344 +
1.345 +<ul>
1.346 +<li><a href="http://mail.python.org/pipermail/python-dev/2002-September/028572.html">http://mail.python.org/pipermail/python-dev/2002-September/028572.html</a></li>
1.347 +<li><a href="http://twistedmatrix.com/bugs/issue733">http://twistedmatrix.com/bugs/issue733</a></li>
1.348 +</ul>
1.349 +
1.350 +<h3>Select and Poll</h3>
1.351 +
1.352 +<p>The exact combination of conditions indicating closed pipes remains relatively
1.353 +obscure. Here is a message/thread describing them (in the context of another
1.354 +topic):</p>
1.355 +
1.356 +<ul>
1.357 +<li><a href="http://twistedmatrix.com/pipermail/twisted-python/2005-February/009666.html">http://twistedmatrix.com/pipermail/twisted-python/2005-February/009666.html</a></li>
1.358 +</ul>
1.359 +
1.360 +<p>It would seem, from using sockets and from studying the <code>asyncore</code>
1.361 +module, that sockets are more predictable than pipes.</p>
1.362 +
1.363 +<p>Notes about <code>poll</code> implementations can be found here:</p>
1.364 +
1.365 +<ul>
1.366 +<li><a href="http://www.greenend.org.uk/rjk/2001/06/poll.html">http://www.greenend.org.uk/rjk/2001/06/poll.html</a></li>
1.367 +</ul>
1.368 +
1.369 +</body>
1.370 +</html>