1 <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> 2 <html xmlns="http://www.w3.org/1999/xhtml" lang="en-gb"> 3 <head> 4 <meta http-equiv="content-type" content="text/html; charset=UTF-8" /> 5 <title>pprocess - Reference</title> 6 <link href="styles.css" rel="stylesheet" type="text/css" /> 7 </head> 8 <body> 9 10 <h1>pprocess - Reference</h1> 11 12 <p>The <code>pprocess</code> module defines a simple parallel processing API 13 for Python, inspired somewhat by the <code>thread</code> module, slightly less 14 by <a href="http://datamining.anu.edu.au/~ole/pypar/">pypar</a>, and slightly 15 less still by <a href="http://pypvm.sourceforge.net/">pypvm</a>.</p> 16 17 <p>This document complements the <a href="tutorial.html">tutorial</a> by 18 providing an overview of the different styles of parallel programming supported 19 by the module. For an introduction and in order to get a clearer idea of the 20 most suitable styles for your own programs, consult the 21 <a href="tutorial.html">tutorial</a>.</p> 22 23 <ul> 24 <li><a href="#Thread">Thread-style Processing</a></li> 25 <li><a href="#Fork">Fork-style Processing</a></li> 26 <li><a href="#Exchanges">Message Exchanges</a></li> 27 <li><a href="#Convenient">Convenient Message Exchanges</a></li> 28 <li><a href="#Queues">Exchanges as Queues</a></li> 29 <li><a href="#Maps">Exchanges as Maps</a></li> 30 <li><a href="#Managed">Managed Callables</a></li> 31 <li><a href="#MakeParallel">Making Existing Functions Parallel</a></li> 32 <li><a href="#Map">Map-style Processing</a></li> 33 <li><a href="#Reusing">Reusing Processes and Channels</a></li> 34 <li><a href="#MakeReusable">Making Existing Functions Parallel and Reusable</a></li> 35 <li><a href="#Implementation">Implementation Notes</a></li> 36 </ul> 37 38 <h2 id="Thread">Thread-style Processing</h2> 39 40 <p>To create new processes to run a function or any callable object, specify the 41 "callable" and any arguments as follows:</p> 42 43 <pre> 44 channel = pprocess.start(fn, arg1, arg2, named1=value1, named2=value2) 45 </pre> 46 47 <p>This returns a channel which can then be used to communicate with the created 48 process. Meanwhile, in the created process, the given callable will be invoked 49 with another channel as its first argument followed by the specified arguments:</p> 50 51 <pre> 52 def fn(channel, arg1, arg2, named1, named2): 53 # Read from and write to the channel. 54 # Return value is ignored. 55 ... 56 </pre> 57 58 <h2 id="Fork">Fork-style Processing</h2> 59 60 <p>To create new processes in a similar way to that employed when using <code>os.fork</code> 61 (ie. the <code>fork</code> system call on various operating systems), use the following 62 method:</p> 63 64 <pre> 65 channel = pprocess.create() 66 if channel.pid == 0: 67 # This code is run by the created process. 68 # Read from and write to the channel to communicate with the 69 # creating/calling process. 70 # An explicit exit of the process may be desirable to prevent the process 71 # from running code which is intended for the creating/calling process. 72 ... 73 pprocess.exit(channel) 74 else: 75 # This code is run by the creating/calling process. 76 # Read from and write to the channel to communicate with the created 77 # process. 78 ... 79 </pre> 80 81 <h2 id="Exchanges">Message Exchanges</h2> 82 83 <p>When creating many processes, each providing results for the consumption of the 84 main process, the collection of those results in an efficient fashion can be 85 problematic: if some processes take longer than others, and if we decide to read 86 from those processes when they are not ready instead of other processes which 87 are ready, the whole activity will take much longer than necessary.</p> 88 89 <p>One solution to the problem of knowing when to read from channels is to create 90 an <code>Exchange</code> object, optionally initialising it with a list of channels 91 through which data is expected to arrive:</p> 92 93 <pre> 94 exchange = pprocess.Exchange() # populate the exchange later 95 exchange = pprocess.Exchange(channels) # populate the exchange with channels 96 </pre> 97 98 <p>We can add channels to the exchange using the <code>add</code> method:</p> 99 100 <pre> 101 exchange.add(channel) 102 </pre> 103 104 <p>To test whether an exchange is active - that is, whether it is actually 105 monitoring any channels - we can use the <code>active</code> method which 106 returns all channels being monitored by the exchange:</p> 107 108 <pre> 109 channels = exchange.active() 110 </pre> 111 112 <p>We may then check the exchange to see whether any data is ready to be received; 113 for example:</p> 114 115 <pre> 116 for channel in exchange.ready(): 117 # Read from and write to the channel. 118 ... 119 </pre> 120 121 <p>If we do not wish to wait indefinitely for a list of channels, we can set a 122 timeout value as an argument to the <code>ready</code> method (as a floating 123 point number specifying the timeout in seconds, where <code>0</code> means a 124 non-blocking poll as stated in the <code>select</code> module's <code>select</code> 125 function documentation).</p> 126 127 <h2 id="Convenient">Convenient Message Exchanges</h2> 128 129 <p>A convenient form of message exchanges can be adopted by defining a subclass of 130 the <code>Exchange</code> class and defining a particular method:</p> 131 132 <pre> 133 class MyExchange(pprocess.Exchange): 134 def store_data(self, channel): 135 data = channel.receive() 136 # Do something with data here. 137 </pre> 138 139 <p>The exact operations performed on the received data might be as simple as 140 storing it on an instance attribute. To make use of the exchange, we would 141 instantiate it as usual:</p> 142 143 <pre> 144 exchange = MyExchange() # populate the exchange later 145 exchange = MyExchange(limit=10) # set a limit for later population 146 </pre> 147 148 <p>The exchange can now be used in a simpler fashion than that shown above. We can 149 add channels as before using the <code>add</code> method, or we can choose to only 150 add channels if the specified limit of channels is not exceeded:</p> 151 152 <pre> 153 exchange.add(channel) # add a channel as normal 154 exchange.add_wait(channel) # add a channel, waiting if the limit would be 155 # exceeded 156 </pre> 157 158 <p>Or we can request that the exchange create a channel on our behalf:</p> 159 160 <pre> 161 channel = exchange.create() 162 </pre> 163 164 <p>We can even start processes and monitor channels without ever handling the 165 channel ourselves:</p> 166 167 <pre> 168 exchange.start(fn, arg1, arg2, named1=value1, named2=value2) 169 </pre> 170 171 <p>We can explicitly wait for "free space" for channels by calling the 172 <code>wait</code> method, although the <code>start</code> and <code>add_wait</code> 173 methods make this less interesting:</p> 174 175 <pre> 176 exchange.wait() 177 </pre> 178 179 <p>Finally, when finishing the computation, we can choose to merely call the 180 <code>finish</code> method and have the remaining data processed automatically:</p> 181 182 <pre> 183 exchange.finish() 184 </pre> 185 186 <p>Clearly, this approach is less flexible but more convenient than the raw message 187 exchange API as described above. However, it permits much simpler and clearer 188 code.</p> 189 190 <h2 id="Queues">Exchanges as Queues</h2> 191 192 <p>Instead of having to subclass the <code>pprocess.Exchange</code> class and 193 to define the <code>store_data</code> method, it might be more desirable to let 194 the exchange manage the communications between created and creating processes 195 and to let the creating process just consume received data as it arrives, 196 without particular regard for the order of the received data - perhaps the 197 creating process has its own way of managing such issues.</p> 198 199 <p>For such situations, the <code>Queue</code> class may be instantiated and 200 channels added to the queue using the various methods provided:</p> 201 202 <pre> 203 queue = pprocess.Queue(limit=10) 204 channel = queue.create() 205 if channel: 206 # Do some computation. 207 pprocess.exit(channel) 208 </pre> 209 210 <p>The results can then be consumed by treating the queue like an iterator:</p> 211 212 <pre> 213 for result in queue: 214 # Capture each result. 215 </pre> 216 217 <p>This approach does not, of course, require the direct handling of channels. 218 One could instead use the <code>start</code> method on the queue to create 219 processes and to initiate computations (since a queue is merely an enhanced 220 exchange with a specific implementation of the <code>store_data</code> 221 method).</p> 222 223 <h2 id="Maps">Exchanges as Maps</h2> 224 225 <p>Where the above <code>Queue</code> class appears like an attractive solution 226 for the management of the results of computations, but where the order of their 227 consumption by the creating process remains important, the <code>Map</code> 228 class may offer a suitable way of collecting and accessing results:</p> 229 230 <pre> 231 results = pprocess.Map(limit=10) 232 for value in inputs: 233 results.start(fn, args) 234 </pre> 235 236 <p>The results can then be consumed in an order corresponding to the order of the 237 computations which produced them:</p> 238 239 <pre> 240 for result in results: 241 # Process each result. 242 </pre> 243 244 <p>Internally, the <code>Map</code> object records a particular ordering of 245 channels, ensuring that the received results can be mapped to this ordering, 246 and that the results can be made available with this ordering preserved.</p> 247 248 <h2 id="Managed">Managed Callables</h2> 249 250 <p>A further simplification of the above convenient use of message exchanges 251 involves the creation of callables (eg. functions) which are automatically 252 monitored by an exchange. We create such a callable by calling the 253 <code>manage</code> method on an exchange:</p> 254 255 <pre> 256 myfn = exchange.manage(fn) 257 </pre> 258 259 <p>This callable can then be invoked instead of using the exchange's 260 <code>start</code> method:</p> 261 262 <pre> 263 myfn(arg1, arg2, named1=value1, named2=value2) 264 </pre> 265 266 <p>The exchange's <code>finish</code> method can be used as usual to process 267 incoming data.</p> 268 269 <h2 id="MakeParallel">Making Existing Functions Parallel</h2> 270 271 <p>In making a program parallel, existing functions which only return results can 272 be manually modified to accept and use channels to communicate results back to 273 the main process. However, a simple alternative is to use the <code>MakeParallel</code> 274 class to provide a wrapper around unmodified functions which will return the results 275 from those functions in the channels provided. For example:</p> 276 277 <pre> 278 fn = pprocess.MakeParallel(originalfn) 279 </pre> 280 281 <h2 id="Map">Map-style Processing</h2> 282 283 <p>In situations where a callable would normally be used in conjunction with the 284 Python built-in <code>map</code> function, an alternative solution can be adopted by using 285 the <code>pmap</code> function:</p> 286 287 <pre> 288 pprocess.pmap(fn, sequence) 289 </pre> 290 291 <p>Here, the sequence would have to contain elements that each contain the 292 required parameters of the specified callable, <code>fn</code>. Note that the 293 callable does not need to be a parallel-aware function which has a 294 <code>channel</code> argument: the <code>pmap</code> function automatically 295 wraps the given callable internally.</p> 296 297 <h2 id="Reusing">Reusing Processes and Channels</h2> 298 299 <p>So far, all parallel computations have been done with newly-created 300 processes. However, this can seem somewhat inefficient, especially if processes 301 are being continually created and destroyed (although if this happens too 302 often, the amount of work done by each process may be too little, anyway). One 303 solution is to retain processes after they have done their work and request 304 that they perform more work for each new parallel task or invocation. To enable 305 the reuse of processes in this way, a special keyword argument may be specified 306 when creating <code>Exchange</code> instances (and instances of subclasses such 307 as <code>Map</code> and <code>Queue</code>). For example:</p> 308 309 <pre> 310 exchange = MyExchange(limit=10, reuse=1) # reuse up to 10 processes 311 </pre> 312 313 <p>Code invoked through such exchanges must be aware of channels and be 314 constructed in such a way that it does not terminate after sending a result 315 back to the creating process. Instead, it should repeatedly wait for subsequent 316 sets of parameters (compatible with those either in the signature of a callable 317 or with the original values read from the channel). Reusable code is terminated 318 when the special value of <code>None</code> is sent from the creating process 319 to the created process, indicating that no more parameters will be sent; this 320 should cause the code to terminate.</p> 321 322 <h2 id="MakeReusable">Making Existing Functions Parallel and Reusable</h2> 323 324 <p>An easier way of making reusable code sections for parallel use is to employ the 325 <code>MakeReusable</code> class to wrap an existing callable:</p> 326 327 <pre> 328 fn = pprocess.MakeReusable(originalfn) 329 </pre> 330 331 <p>This wraps the callable in a similar fashion to <code>MakeParallel</code>, but 332 provides the necessary mechanisms described above for reusable code.</p> 333 334 <h2 id="Implementation">Implementation Notes</h2> 335 336 <h3>Signals and Waiting</h3> 337 338 <p>When created/child processes terminate, one would typically want to be informed 339 of such conditions using a signal handler. Unfortunately, Python seems to have 340 issues with restartable reads from file descriptors when interrupted by signals:</p> 341 342 <ul> 343 <li><a href="http://mail.python.org/pipermail/python-dev/2002-September/028572.html">http://mail.python.org/pipermail/python-dev/2002-September/028572.html</a></li> 344 <li><a href="http://twistedmatrix.com/bugs/issue733">http://twistedmatrix.com/bugs/issue733</a></li> 345 </ul> 346 347 <h3>Select and Poll</h3> 348 349 <p>The exact combination of conditions indicating closed pipes remains relatively 350 obscure. Here is a message/thread describing them (in the context of another 351 topic):</p> 352 353 <ul> 354 <li><a href="http://twistedmatrix.com/pipermail/twisted-python/2005-February/009666.html">http://twistedmatrix.com/pipermail/twisted-python/2005-February/009666.html</a></li> 355 </ul> 356 357 <p>It would seem, from using sockets and from studying the <code>asyncore</code> 358 module, that sockets are more predictable than pipes.</p> 359 360 <p>Notes about <code>poll</code> implementations can be found here:</p> 361 362 <ul> 363 <li><a href="http://www.greenend.org.uk/rjk/2001/06/poll.html">http://www.greenend.org.uk/rjk/2001/06/poll.html</a></li> 364 </ul> 365 366 </body> 367 </html>