1 #!/usr/bin/env python 2 3 """ 4 A simple parallel processing API for Python, inspired somewhat by the thread 5 module, slightly less by pypar, and slightly less still by pypvm. 6 7 Copyright (C) 2005, 2006, 2007 Paul Boddie <paul@boddie.org.uk> 8 9 This software is free software; you can redistribute it and/or 10 modify it under the terms of the GNU General Public License as 11 published by the Free Software Foundation; either version 2 of 12 the License, or (at your option) any later version. 13 14 This software is distributed in the hope that it will be useful, 15 but WITHOUT ANY WARRANTY; without even the implied warranty of 16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 17 GNU General Public License for more details. 18 19 You should have received a copy of the GNU General Public 20 License along with this library; see the file LICENCE.txt 21 If not, write to the Free Software Foundation, Inc., 22 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA 23 24 -------- 25 26 The recommended styles of programming using pprocess involve the "Thread-style 27 Processing" and "Convenient Message Exchanges" sections below, although 28 developers may wish to read the "Message Exchanges" section for more details of 29 the API concerned, and the "Fork-style Processing" section may be of interest to 30 those with experience of large scale parallel processing systems. 31 32 Thread-style Processing 33 ----------------------- 34 35 To create new processes to run a function or any callable object, specify the 36 "callable" and any arguments as follows: 37 38 channel = start(fn, arg1, arg2, named1=value1, named2=value2) 39 40 This returns a channel which can then be used to communicate with the created 41 process. Meanwhile, in the created process, the given callable will be invoked 42 with another channel as its first argument followed by the specified arguments: 43 44 def fn(channel, arg1, arg2, named1, named2): 45 # Read from and write to the channel. 46 # Return value is ignored. 47 ... 48 49 Fork-style Processing 50 --------------------- 51 52 To create new processes in a similar way to that employed when using os.fork 53 (ie. the fork system call on various operating systems), use the following 54 method: 55 56 channel = create() 57 if channel.pid == 0: 58 # This code is run by the created process. 59 # Read from and write to the channel to communicate with the 60 # creating/calling process. 61 # An explicit exit of the process may be desirable to prevent the process 62 # from running code which is intended for the creating/calling process. 63 ... 64 else: 65 # This code is run by the creating/calling process. 66 # Read from and write to the channel to communicate with the created 67 # process. 68 ... 69 70 Message Exchanges 71 ----------------- 72 73 When creating many processes, each providing results for the consumption of the 74 main process, the collection of those results in an efficient fashion can be 75 problematic: if some processes take longer than others, and if we decide to read 76 from those processes when they are not ready instead of other processes which 77 are ready, the whole activity will take much longer than necessary. 78 79 One solution to the problem of knowing when to read from channels is to create 80 an Exchange object, optionally initialising it with a list of channels through 81 which data is expected to arrive: 82 83 exchange = Exchange() # populate the exchange later 84 exchange = Exchange(channels) # populate the exchange with channels 85 86 We can add channels to the exchange using the add method: 87 88 exchange.add(channel) 89 90 To test whether an exchange is active - that is, whether it is actually 91 monitoring any channels - we can use the active method which returns all 92 channels being monitored by the exchange: 93 94 channels = exchange.active() 95 96 We may then check the exchange to see whether any data is ready to be received; 97 for example: 98 99 for channel in exchange.ready(): 100 # Read from and write to the channel. 101 ... 102 103 If we do not wish to wait indefinitely for a list of channels, we can set a 104 timeout value as an argument to the ready method (as a floating point number 105 specifying the timeout in seconds, where 0 means a non-blocking poll as stated 106 in the select module's select function documentation). 107 108 Convenient Message Exchanges 109 ---------------------------- 110 111 A convenient form of message exchanges can be adopted by defining a subclass of 112 the Exchange class and defining a particular method: 113 114 class MyExchange(Exchange): 115 def store_data(self, channel): 116 data = channel.receive() 117 # Do something with data here. 118 119 The exact operations performed on the received data might be as simple as 120 storing it on an instance attribute. To make use of the exchange, we would 121 instantiate it as usual: 122 123 exchange = MyExchange() # populate the exchange later 124 exchange = MyExchange(limit=10) # set a limit for later population 125 126 The exchange can now be used in a simpler fashion than that shown above. We can 127 add channels as before using the add method, or we can choose to only add 128 channels if the specified limit of channels is not exceeded: 129 130 exchange.add(channel) # add a channel as normal 131 exchange.add_wait(channel) # add a channel, waiting if the limit would be 132 # exceeded 133 134 We can explicitly wait for "free space" for channels by calling the wait method: 135 136 exchange.wait() 137 138 Finally, when finishing the computation, we can choose to merely call the finish 139 method and have the remaining data processed automatically: 140 141 exchange.finish() 142 143 Clearly, this approach is less flexible but more convenient than the raw message 144 exchange API as described above. However, it permits much simpler and clearer 145 code. 146 147 Signals and Waiting 148 ------------------- 149 150 When created/child processes terminate, one would typically want to be informed 151 of such conditions using a signal handler. Unfortunately, Python seems to have 152 issues with restartable reads from file descriptors when interrupted by signals: 153 154 http://mail.python.org/pipermail/python-dev/2002-September/028572.html 155 http://twistedmatrix.com/bugs/issue733 156 157 Select and Poll 158 --------------- 159 160 The exact combination of conditions indicating closed pipes remains relatively 161 obscure. Here is a message/thread describing them (in the context of another 162 topic): 163 164 http://twistedmatrix.com/pipermail/twisted-python/2005-February/009666.html 165 166 It would seem, from using sockets and from studying the asyncore module, that 167 sockets are more predictable than pipes. 168 169 Notes about poll implementations can be found here: 170 171 http://www.greenend.org.uk/rjk/2001/06/poll.html 172 """ 173 174 __version__ = "0.2.3" 175 176 import os 177 import sys 178 import select 179 import socket 180 181 try: 182 import cPickle as pickle 183 except ImportError: 184 import pickle 185 186 class AcknowledgementError(Exception): 187 pass 188 189 class Channel: 190 191 "A communications channel." 192 193 def __init__(self, pid, read_pipe, write_pipe): 194 195 """ 196 Initialise the channel with a process identifier 'pid', a 'read_pipe' 197 from which messages will be received, and a 'write_pipe' into which 198 messages will be sent. 199 """ 200 201 self.pid = pid 202 self.read_pipe = read_pipe 203 self.write_pipe = write_pipe 204 self.closed = 0 205 206 def __del__(self): 207 208 # Since signals don't work well with I/O, we close pipes and wait for 209 # created processes upon finalisation. 210 211 self.close() 212 213 def close(self): 214 215 "Explicitly close the channel." 216 217 if not self.closed: 218 self.closed = 1 219 self.read_pipe.close() 220 self.write_pipe.close() 221 #self.wait(os.WNOHANG) 222 223 def wait(self, options=0): 224 225 "Wait for the created process, if any, to exit." 226 227 if self.pid != 0: 228 try: 229 os.waitpid(self.pid, options) 230 except OSError: 231 pass 232 233 def _send(self, obj): 234 235 "Send the given object 'obj' through the channel." 236 237 pickle.dump(obj, self.write_pipe) 238 self.write_pipe.flush() 239 240 def send(self, obj): 241 242 """ 243 Send the given object 'obj' through the channel. Then wait for an 244 acknowledgement. (The acknowledgement makes the caller wait, thus 245 preventing processes from exiting and disrupting the communications 246 channel and losing data.) 247 """ 248 249 self._send(obj) 250 if self._receive() != "OK": 251 raise AcknowledgementError, obj 252 253 def _receive(self): 254 255 "Receive an object through the channel, returning the object." 256 257 obj = pickle.load(self.read_pipe) 258 if isinstance(obj, Exception): 259 raise obj 260 else: 261 return obj 262 263 def receive(self): 264 265 """ 266 Receive an object through the channel, returning the object. Send an 267 acknowledgement of receipt. (The acknowledgement makes the sender wait, 268 thus preventing processes from exiting and disrupting the communications 269 channel and losing data.) 270 """ 271 272 try: 273 obj = self._receive() 274 return obj 275 finally: 276 self._send("OK") 277 278 class Exchange: 279 280 """ 281 A communications exchange that can be used to detect channels which are 282 ready to communicate. Subclasses of this class can define the 'store_data' 283 method in order to enable the 'add_wait', 'wait' and 'finish' methods. 284 """ 285 286 def __init__(self, channels=None, limit=None, autoclose=1): 287 288 """ 289 Initialise the exchange with an optional list of 'channels'. 290 291 If the optional 'limit' is specified, restrictions on the addition of 292 new channels can be enforced and observed through the 'add_wait', 'wait' 293 and 'finish' methods. To make use of these methods, create a subclass of 294 this class and define a working 'store_data' method. 295 296 If the optional 'autoclose' parameter is set to a false value, channels 297 will not be closed automatically when they are removed from the exchange 298 - by default they are closed when removed. 299 """ 300 301 self.limit = limit 302 self.autoclose = autoclose 303 self.readables = {} 304 self.removed = [] 305 self.poller = select.poll() 306 for channel in channels or []: 307 self.add(channel) 308 309 def add(self, channel): 310 311 "Add the given 'channel' to the exchange." 312 313 self.readables[channel.read_pipe.fileno()] = channel 314 self.poller.register(channel.read_pipe.fileno(), select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR) 315 316 def active(self): 317 318 "Return a list of active channels." 319 320 return self.readables.values() 321 322 def ready(self, timeout=None): 323 324 """ 325 Wait for a period of time specified by the optional 'timeout' (or until 326 communication is possible) and return a list of channels which are ready 327 to be read from. 328 """ 329 330 fds = self.poller.poll(timeout) 331 readables = [] 332 self.removed = [] 333 334 for fd, status in fds: 335 channel = self.readables[fd] 336 removed = 0 337 338 # Remove ended/error channels. 339 340 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR): 341 self.remove(channel) 342 self.removed.append(channel) 343 removed = 1 344 345 # Record readable channels. 346 347 if status & select.POLLIN: 348 if not (removed and self.autoclose): 349 readables.append(channel) 350 351 return readables 352 353 def remove(self, channel): 354 355 """ 356 Remove the given 'channel' from the exchange. 357 """ 358 359 del self.readables[channel.read_pipe.fileno()] 360 self.poller.unregister(channel.read_pipe.fileno()) 361 if self.autoclose: 362 channel.close() 363 channel.wait() 364 365 # Enhanced exchange methods involving channel limits. 366 367 def add_wait(self, channel): 368 369 """ 370 Add the given 'channel' to the exchange, waiting if the limit on active 371 channels would be exceeded by adding the channel. 372 """ 373 374 self.wait() 375 self.add(channel) 376 377 def wait(self): 378 379 """ 380 Test for the limit on channels, blocking and reading incoming data until 381 the number of channels is below the limit. 382 """ 383 384 # If limited, block until channels have been closed. 385 386 while self.limit is not None and len(self.active()) >= self.limit: 387 self.store() 388 389 def finish(self): 390 391 """ 392 Finish the use of the exchange by waiting for all channels to complete. 393 """ 394 395 while self.active(): 396 self.store() 397 398 def store(self): 399 400 "For each ready channel, process the incoming data." 401 402 for channel in self.ready(): 403 self.store_data(channel) 404 405 def store_data(self, channel): 406 407 """ 408 Store incoming data from the specified 'channel'. In subclasses of this 409 class, such data could be stored using instance attributes. 410 """ 411 412 raise NotImplementedError, "store_data" 413 414 def create(): 415 416 """ 417 Create a new process, returning a communications channel to both the 418 creating process and the created process. 419 """ 420 421 parent, child = socket.socketpair() 422 for s in [parent, child]: 423 s.setblocking(1) 424 425 pid = os.fork() 426 if pid == 0: 427 parent.close() 428 return Channel(pid, child.makefile("r"), child.makefile("w")) 429 else: 430 child.close() 431 return Channel(pid, parent.makefile("r"), parent.makefile("w")) 432 433 def start(callable, *args, **kwargs): 434 435 """ 436 Create a new process which shall start running in the given 'callable'. 437 Return a communications channel to the creating process, and supply such a 438 channel to the created process as the 'channel' parameter in the given 439 'callable'. Additional arguments to the 'callable' can be given as 440 additional arguments to this function. 441 """ 442 443 channel = create() 444 if channel.pid == 0: 445 try: 446 try: 447 callable(channel, *args, **kwargs) 448 except: 449 exc_type, exc_value, exc_traceback = sys.exc_info() 450 channel.send(exc_value) 451 finally: 452 channel.close() 453 os._exit(0) 454 else: 455 return channel 456 457 def waitall(): 458 459 "Wait for all created processes to terminate." 460 461 try: 462 while 1: 463 os.wait() 464 except OSError: 465 pass 466 467 # vim: tabstop=4 expandtab shiftwidth=4