1 <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> 2 <html xmlns="http://www.w3.org/1999/xhtml" lang="en-gb"> 3 <head> 4 <meta http-equiv="content-type" content="text/html; charset=UTF-8" /> 5 <title>pprocess - Reference</title> 6 <link href="styles.css" rel="stylesheet" type="text/css" /> 7 </head> 8 <body> 9 10 <h1>pprocess - Reference</h1> 11 12 <p>The <code>pprocess</code> module defines a simple parallel processing API 13 for Python, inspired somewhat by the <code>thread</code> module, slightly less 14 by <a href="http://datamining.anu.edu.au/~ole/pypar/">pypar</a>, and slightly 15 less still by <a href="http://pypvm.sourceforge.net/">pypvm</a>.</p> 16 17 <p>This document complements the <a href="tutorial.html">tutorial</a> by 18 providing an overview of the different styles of parallel programming supported 19 by the module. For an introduction and in order to get a clearer idea of the 20 most suitable styles for your own programs, consult the 21 <a href="tutorial.html">tutorial</a>.</p> 22 23 <ul> 24 <li><a href="#Thread">Thread-style Processing</a></li> 25 <li><a href="#Fork">Fork-style Processing</a></li> 26 <li><a href="#Exchanges">Message Exchanges</a></li> 27 <li><a href="#Convenient">Convenient Message Exchanges</a></li> 28 <li><a href="#Queues">Exchanges as Queues</a></li> 29 <li><a href="#Maps">Exchanges as Maps</a></li> 30 <li><a href="#Managed">Managed Callables</a></li> 31 <li><a href="#MakeParallel">Making Existing Functions Parallel</a></li> 32 <li><a href="#Map">Map-style Processing</a></li> 33 <li><a href="#Reusing">Reusing Processes and Channels</a></li> 34 <li><a href="#MakeReusable">Making Existing Functions Parallel and Reusable</a></li> 35 <li><a href="#BackgroundCallable">Background Processes and Callables</a></li> 36 <li><a href="#PersistentQueue">Background and Persistent Queues</a></li> 37 <li><a href="#Implementation">Implementation Notes</a></li> 38 </ul> 39 40 <h2 id="Thread">Thread-style Processing</h2> 41 42 <p>To create new processes to run a function or any callable object, specify the 43 "callable" and any arguments as follows:</p> 44 45 <pre> 46 channel = pprocess.start(fn, arg1, arg2, named1=value1, named2=value2) 47 </pre> 48 49 <p>This returns a channel which can then be used to communicate with the created 50 process. Meanwhile, in the created process, the given callable will be invoked 51 with another channel as its first argument followed by the specified arguments:</p> 52 53 <pre> 54 def fn(channel, arg1, arg2, named1, named2): 55 # Read from and write to the channel. 56 # Return value is ignored. 57 ... 58 </pre> 59 60 <h2 id="Fork">Fork-style Processing</h2> 61 62 <p>To create new processes in a similar way to that employed when using <code>os.fork</code> 63 (ie. the <code>fork</code> system call on various operating systems), use the following 64 method:</p> 65 66 <pre> 67 channel = pprocess.create() 68 if channel.pid == 0: 69 # This code is run by the created process. 70 # Read from and write to the channel to communicate with the 71 # creating/calling process. 72 # An explicit exit of the process may be desirable to prevent the process 73 # from running code which is intended for the creating/calling process. 74 ... 75 pprocess.exit(channel) 76 else: 77 # This code is run by the creating/calling process. 78 # Read from and write to the channel to communicate with the created 79 # process. 80 ... 81 </pre> 82 83 <h2 id="Exchanges">Message Exchanges</h2> 84 85 <p>When creating many processes, each providing results for the consumption of the 86 main process, the collection of those results in an efficient fashion can be 87 problematic: if some processes take longer than others, and if we decide to read 88 from those processes when they are not ready instead of other processes which 89 are ready, the whole activity will take much longer than necessary.</p> 90 91 <p>One solution to the problem of knowing when to read from channels is to create 92 an <code>Exchange</code> object, optionally initialising it with a list of channels 93 through which data is expected to arrive:</p> 94 95 <pre> 96 exchange = pprocess.Exchange() # populate the exchange later 97 exchange = pprocess.Exchange(channels) # populate the exchange with channels 98 </pre> 99 100 <p>We can add channels to the exchange using the <code>add</code> method:</p> 101 102 <pre> 103 exchange.add(channel) 104 </pre> 105 106 <p>To test whether an exchange is active - that is, whether it is actually 107 monitoring any channels - we can use the <code>active</code> method which 108 returns all channels being monitored by the exchange:</p> 109 110 <pre> 111 channels = exchange.active() 112 </pre> 113 114 <p>We may then check the exchange to see whether any data is ready to be received; 115 for example:</p> 116 117 <pre> 118 for channel in exchange.ready(): 119 # Read from and write to the channel. 120 ... 121 </pre> 122 123 <p>If we do not wish to wait indefinitely for a list of channels, we can set a 124 timeout value as an argument to the <code>ready</code> method (as a floating 125 point number specifying the timeout in seconds, where <code>0</code> means a 126 non-blocking poll as stated in the <code>select</code> module's <code>select</code> 127 function documentation).</p> 128 129 <h2 id="Convenient">Convenient Message Exchanges</h2> 130 131 <p>A convenient form of message exchanges can be adopted by defining a subclass of 132 the <code>Exchange</code> class and defining a particular method:</p> 133 134 <pre> 135 class MyExchange(pprocess.Exchange): 136 def store_data(self, channel): 137 data = channel.receive() 138 # Do something with data here. 139 </pre> 140 141 <p>The exact operations performed on the received data might be as simple as 142 storing it on an instance attribute. To make use of the exchange, we would 143 instantiate it as usual:</p> 144 145 <pre> 146 exchange = MyExchange() # populate the exchange later 147 exchange = MyExchange(limit=10) # set a limit for later population 148 </pre> 149 150 <p>The exchange can now be used in a simpler fashion than that shown above. We can 151 add channels as before using the <code>add</code> method, or we can choose to only 152 add channels if the specified limit of channels is not exceeded:</p> 153 154 <pre> 155 exchange.add(channel) # add a channel as normal 156 exchange.add_wait(channel) # add a channel, waiting if the limit would be 157 # exceeded 158 </pre> 159 160 <p>Or we can request that the exchange create a channel on our behalf:</p> 161 162 <pre> 163 channel = exchange.create() 164 </pre> 165 166 <p>We can even start processes and monitor channels without ever handling the 167 channel ourselves:</p> 168 169 <pre> 170 exchange.start(fn, arg1, arg2, named1=value1, named2=value2) 171 </pre> 172 173 <p>We can explicitly wait for "free space" for channels by calling the 174 <code>wait</code> method, although the <code>start</code> and <code>add_wait</code> 175 methods make this less interesting:</p> 176 177 <pre> 178 exchange.wait() 179 </pre> 180 181 <p>Finally, when finishing the computation, we can choose to merely call the 182 <code>finish</code> method and have the remaining data processed automatically:</p> 183 184 <pre> 185 exchange.finish() 186 </pre> 187 188 <p>Clearly, this approach is less flexible but more convenient than the raw message 189 exchange API as described above. However, it permits much simpler and clearer 190 code.</p> 191 192 <h2 id="Queues">Exchanges as Queues</h2> 193 194 <p>Instead of having to subclass the <code>pprocess.Exchange</code> class and 195 to define the <code>store_data</code> method, it might be more desirable to let 196 the exchange manage the communications between created and creating processes 197 and to let the creating process just consume received data as it arrives, 198 without particular regard for the order of the received data - perhaps the 199 creating process has its own way of managing such issues.</p> 200 201 <p>For such situations, the <code>Queue</code> class may be instantiated and 202 channels added to the queue using the various methods provided:</p> 203 204 <pre> 205 queue = pprocess.Queue(limit=10) 206 channel = queue.create() 207 if channel: 208 # Do some computation. 209 pprocess.exit(channel) 210 </pre> 211 212 <p>The results can then be consumed by treating the queue like an iterator:</p> 213 214 <pre> 215 for result in queue: 216 # Capture each result. 217 </pre> 218 219 <p>This approach does not, of course, require the direct handling of channels. 220 One could instead use the <code>start</code> method on the queue to create 221 processes and to initiate computations (since a queue is merely an enhanced 222 exchange with a specific implementation of the <code>store_data</code> 223 method).</p> 224 225 <h2 id="Maps">Exchanges as Maps</h2> 226 227 <p>Where the above <code>Queue</code> class appears like an attractive solution 228 for the management of the results of computations, but where the order of their 229 consumption by the creating process remains important, the <code>Map</code> 230 class may offer a suitable way of collecting and accessing results:</p> 231 232 <pre> 233 results = pprocess.Map(limit=10) 234 for value in inputs: 235 results.start(fn, args) 236 </pre> 237 238 <p>The results can then be consumed in an order corresponding to the order of the 239 computations which produced them:</p> 240 241 <pre> 242 for result in results: 243 # Process each result. 244 </pre> 245 246 <p>Internally, the <code>Map</code> object records a particular ordering of 247 channels, ensuring that the received results can be mapped to this ordering, 248 and that the results can be made available with this ordering preserved.</p> 249 250 <h2 id="Managed">Managed Callables</h2> 251 252 <p>A further simplification of the above convenient use of message exchanges 253 involves the creation of callables (eg. functions) which are automatically 254 monitored by an exchange. We create such a callable by calling the 255 <code>manage</code> method on an exchange:</p> 256 257 <pre> 258 myfn = exchange.manage(fn) 259 </pre> 260 261 <p>This callable can then be invoked instead of using the exchange's 262 <code>start</code> method:</p> 263 264 <pre> 265 myfn(arg1, arg2, named1=value1, named2=value2) 266 </pre> 267 268 <p>The exchange's <code>finish</code> method can be used as usual to process 269 incoming data.</p> 270 271 <h2 id="MakeParallel">Making Existing Functions Parallel</h2> 272 273 <p>In making a program parallel, existing functions which only return results can 274 be manually modified to accept and use channels to communicate results back to 275 the main process. However, a simple alternative is to use the <code>MakeParallel</code> 276 class to provide a wrapper around unmodified functions which will return the results 277 from those functions in the channels provided. For example:</p> 278 279 <pre> 280 fn = pprocess.MakeParallel(originalfn) 281 </pre> 282 283 <h2 id="Map">Map-style Processing</h2> 284 285 <p>In situations where a callable would normally be used in conjunction with the 286 Python built-in <code>map</code> function, an alternative solution can be adopted by using 287 the <code>pmap</code> function:</p> 288 289 <pre> 290 pprocess.pmap(fn, sequence) 291 </pre> 292 293 <p>Here, the sequence would have to contain elements that each contain the 294 required parameters of the specified callable, <code>fn</code>. Note that the 295 callable does not need to be a parallel-aware function which has a 296 <code>channel</code> argument: the <code>pmap</code> function automatically 297 wraps the given callable internally.</p> 298 299 <h2 id="Reusing">Reusing Processes and Channels</h2> 300 301 <p>So far, all parallel computations have been done with newly-created 302 processes. However, this can seem somewhat inefficient, especially if processes 303 are being continually created and destroyed (although if this happens too 304 often, the amount of work done by each process may be too little, anyway). One 305 solution is to retain processes after they have done their work and request 306 that they perform more work for each new parallel task or invocation. To enable 307 the reuse of processes in this way, a special keyword argument may be specified 308 when creating <code>Exchange</code> instances (and instances of subclasses such 309 as <code>Map</code> and <code>Queue</code>). For example:</p> 310 311 <pre> 312 exchange = MyExchange(limit=10, reuse=1) # reuse up to 10 processes 313 </pre> 314 315 <p>Code invoked through such exchanges must be aware of channels and be 316 constructed in such a way that it does not terminate after sending a result 317 back to the creating process. Instead, it should repeatedly wait for subsequent 318 sets of parameters (compatible with those either in the signature of a callable 319 or with the original values read from the channel). Reusable code is terminated 320 when the special value of <code>None</code> is sent from the creating process 321 to the created process, indicating that no more parameters will be sent; this 322 should cause the code to terminate.</p> 323 324 <h2 id="MakeReusable">Making Existing Functions Parallel and Reusable</h2> 325 326 <p>An easier way of making reusable code sections for parallel use is to employ the 327 <code>MakeReusable</code> class to wrap an existing callable:</p> 328 329 <pre> 330 fn = pprocess.MakeReusable(originalfn) 331 </pre> 332 333 <p>This wraps the callable in a similar fashion to <code>MakeParallel</code>, but 334 provides the necessary mechanisms described above for reusable code.</p> 335 336 <h2 id="Continuous">Continuous Processes and Channels</h2> 337 338 <p>Much of the usage of exchanges so far has concentrated on processes which 339 are created, whose callables are invoked, and then, once those callables have 340 returned, either they are invoked again in the same process (when reused) or 341 in a new process (when not reused). However, the underlying mechanisms 342 actually support processes whose callables not only receive input at the start 343 of their execution and send output at the end of their execution, but may 344 provide output on a continuous basis (similar to iterator or generator 345 objects).</p> 346 347 <p>To enable support for continuous communications between processes, a 348 keyword argument must be specified when creating an <code>Exchange</code> 349 instance (or an instance of a subclass of <code>Exchange</code> such as 350 <code>Map</code> or <code>Queue</code>):</p> 351 352 <pre> 353 exchange = MyExchange(limit=10, continuous=1) # support up to 10 processes 354 </pre> 355 356 <p>Code invoked in this mode of communication must be aware of channels, since 357 it will need to explicitly send data via a channel to the creating process, 358 instead of terminating and sending data only once (as would be done 359 automatically using convenience classes such as 360 <code>MakeParallel</code>).</p> 361 362 <h2 id="BackgroundCallable">Background Processes and Callables</h2> 363 364 <p>So far, all parallel computations have involved created processes which 365 depend on the existence of the created process to collect results and to 366 communicate with these created processes, preventing the created process from 367 terminating, even if the created processes actually perform work and potentially 368 create output which need not concern the process which created them. In order to 369 separate creating and created processes, the concept of a background process 370 (also known as a daemon process) is introduced.</p> 371 372 <p>The <code>BackgroundCallable</code> class acts somewhat like the 373 <code>manage</code> method on exchange-based objects, although no exchange is 374 immediately involved, and instances of <code>BackgroundCallable</code> provide 375 wrappers around existing parallel-aware callables which then be invoked in order 376 to initiate a background computation in a created process. For example:</p> 377 378 <pre> 379 backgroundfn = pprocess.BackgroundCallable(address, fn) 380 </pre> 381 382 <p>This wraps the supplied callable (which can itself be the result of using 383 <code>MakeParallel</code>), with the resulting wrapper lending itself to 384 invocation like any other function. One distinguishing feature is that of the 385 <code>address</code>: in order to contact the background process after 386 invocation to (amongst other things) receive any result, a specific address 387 must be given to define the contact point between the created process and any 388 processes seeking to connect to it. Since these "persistent" communications 389 employ special files (specifically UNIX-domain sockets), the address must be a 390 suitable filename.</p> 391 392 <h2 id="PersistentQueue">Background and Persistent Queues</h2> 393 394 <p>Background processes employing persistent communications require adaptations 395 of the facilities described in the sections above. For a single background 396 process, the <code>BackgroundQueue</code> function is sufficient to create a 397 queue-like object which can monitor the communications channel between the 398 connecting process and a background process. For example:</p> 399 400 <pre> 401 queue = pprocess.BackgroundQueue(address) 402 </pre> 403 404 <p>This code will cause the process reachable via the given <code>address</code> 405 to be contacted and any results made available via the created queue-like 406 object.</p> 407 408 <p>Where many background processes have been created, a single 409 <code>PersistentQueue</code> object can monitor their communications by being 410 connected to them all, as in the following example:</p> 411 412 <pre> 413 queue = pprocess.PersistentQueue() 414 for address in addresses: 415 queue.connect(address) 416 </pre> 417 418 <p>Here, the queue monitors all previously created processes whose addresses 419 reside in the <code>addresses</code> sequence. Upon iterating over the queue, 420 results will be taken from whichever process happens to have data available in 421 no particular pre-defined order.</p> 422 423 <h2 id="Implementation">Implementation Notes</h2> 424 425 <h3>Signals and Waiting</h3> 426 427 <p>When created/child processes terminate, one would typically want to be informed 428 of such conditions using a signal handler. Unfortunately, Python seems to have 429 issues with restartable reads from file descriptors when interrupted by signals:</p> 430 431 <ul> 432 <li><a href="http://mail.python.org/pipermail/python-dev/2002-September/028572.html">http://mail.python.org/pipermail/python-dev/2002-September/028572.html</a></li> 433 <li><a href="http://twistedmatrix.com/bugs/issue733">http://twistedmatrix.com/bugs/issue733</a></li> 434 </ul> 435 436 <h3>Select and Poll</h3> 437 438 <p>The exact combination of conditions indicating closed pipes remains relatively 439 obscure. Here is a message/thread describing them (in the context of another 440 topic):</p> 441 442 <ul> 443 <li><a href="http://twistedmatrix.com/pipermail/twisted-python/2005-February/009666.html">http://twistedmatrix.com/pipermail/twisted-python/2005-February/009666.html</a></li> 444 </ul> 445 446 <p>It would seem, from using sockets and from studying the <code>asyncore</code> 447 module, that sockets are more predictable than pipes.</p> 448 449 <p>Notes about <code>poll</code> implementations can be found here:</p> 450 451 <ul> 452 <li><a href="http://www.greenend.org.uk/rjk/2001/06/poll.html">http://www.greenend.org.uk/rjk/2001/06/poll.html</a></li> 453 </ul> 454 455 </body> 456 </html>