1 #!/usr/bin/env python 2 3 """ 4 A simple parallel processing API for Python, inspired somewhat by the thread 5 module, slightly less by pypar, and slightly less still by pypvm. 6 7 Copyright (C) 2005, 2006, 2007 Paul Boddie <paul@boddie.org.uk> 8 9 This program is free software; you can redistribute it and/or modify it under 10 the terms of the GNU Lesser General Public License as published by the Free 11 Software Foundation; either version 3 of the License, or (at your option) any 12 later version. 13 14 This program is distributed in the hope that it will be useful, but WITHOUT 15 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 16 FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 17 details. 18 19 You should have received a copy of the GNU Lesser General Public License along 20 with this program. If not, see <http://www.gnu.org/licenses/>. 21 """ 22 23 __version__ = "0.3.1" 24 25 import os 26 import sys 27 import select 28 import socket 29 30 try: 31 import cPickle as pickle 32 except ImportError: 33 import pickle 34 35 # Communications. 36 37 class AcknowledgementError(Exception): 38 pass 39 40 class Channel: 41 42 "A communications channel." 43 44 def __init__(self, pid, read_pipe, write_pipe): 45 46 """ 47 Initialise the channel with a process identifier 'pid', a 'read_pipe' 48 from which messages will be received, and a 'write_pipe' into which 49 messages will be sent. 50 """ 51 52 self.pid = pid 53 self.read_pipe = read_pipe 54 self.write_pipe = write_pipe 55 self.closed = 0 56 57 def __del__(self): 58 59 # Since signals don't work well with I/O, we close pipes and wait for 60 # created processes upon finalisation. 61 62 self.close() 63 64 def close(self): 65 66 "Explicitly close the channel." 67 68 if not self.closed: 69 self.closed = 1 70 self.read_pipe.close() 71 self.write_pipe.close() 72 #self.wait(os.WNOHANG) 73 74 def wait(self, options=0): 75 76 "Wait for the created process, if any, to exit." 77 78 if self.pid != 0: 79 try: 80 os.waitpid(self.pid, options) 81 except OSError: 82 pass 83 84 def _send(self, obj): 85 86 "Send the given object 'obj' through the channel." 87 88 pickle.dump(obj, self.write_pipe) 89 self.write_pipe.flush() 90 91 def send(self, obj): 92 93 """ 94 Send the given object 'obj' through the channel. Then wait for an 95 acknowledgement. (The acknowledgement makes the caller wait, thus 96 preventing processes from exiting and disrupting the communications 97 channel and losing data.) 98 """ 99 100 self._send(obj) 101 if self._receive() != "OK": 102 raise AcknowledgementError, obj 103 104 def _receive(self): 105 106 "Receive an object through the channel, returning the object." 107 108 obj = pickle.load(self.read_pipe) 109 if isinstance(obj, Exception): 110 raise obj 111 else: 112 return obj 113 114 def receive(self): 115 116 """ 117 Receive an object through the channel, returning the object. Send an 118 acknowledgement of receipt. (The acknowledgement makes the sender wait, 119 thus preventing processes from exiting and disrupting the communications 120 channel and losing data.) 121 """ 122 123 try: 124 obj = self._receive() 125 return obj 126 finally: 127 self._send("OK") 128 129 # Management of processes and communications. 130 131 class Exchange: 132 133 """ 134 A communications exchange that can be used to detect channels which are 135 ready to communicate. Subclasses of this class can define the 'store_data' 136 method in order to enable the 'add_wait', 'wait' and 'finish' methods. 137 """ 138 139 def __init__(self, channels=None, limit=None, reuse=0, autoclose=1): 140 141 """ 142 Initialise the exchange with an optional list of 'channels'. 143 144 If the optional 'limit' is specified, restrictions on the addition of 145 new channels can be enforced and observed through the 'add_wait', 'wait' 146 and 'finish' methods. To make use of these methods, create a subclass of 147 this class and define a working 'store_data' method. 148 149 If the optional 'reuse' parameter is set to a true value, channels and 150 processes will be reused for waiting computations. 151 152 If the optional 'autoclose' parameter is set to a false value, channels 153 will not be closed automatically when they are removed from the exchange 154 - by default they are closed when removed. 155 """ 156 157 self.limit = limit 158 self.reuse = reuse 159 self.autoclose = autoclose 160 self.waiting = [] 161 self.readables = {} 162 self.removed = [] 163 self.poller = select.poll() 164 for channel in channels or []: 165 self.add(channel) 166 167 def add(self, channel): 168 169 "Add the given 'channel' to the exchange." 170 171 self.readables[channel.read_pipe.fileno()] = channel 172 self.poller.register(channel.read_pipe.fileno(), select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR) 173 174 def active(self): 175 176 "Return a list of active channels." 177 178 return self.readables.values() 179 180 def ready(self, timeout=None): 181 182 """ 183 Wait for a period of time specified by the optional 'timeout' (or until 184 communication is possible) and return a list of channels which are ready 185 to be read from. 186 """ 187 188 fds = self.poller.poll(timeout) 189 readables = [] 190 self.removed = [] 191 192 for fd, status in fds: 193 channel = self.readables[fd] 194 removed = 0 195 196 # Remove ended/error channels. 197 198 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR): 199 self.remove(channel) 200 self.removed.append(channel) 201 removed = 1 202 203 # Record readable channels. 204 205 if status & select.POLLIN: 206 if not (removed and self.autoclose): 207 readables.append(channel) 208 209 return readables 210 211 def remove(self, channel): 212 213 """ 214 Remove the given 'channel' from the exchange. 215 """ 216 217 del self.readables[channel.read_pipe.fileno()] 218 self.poller.unregister(channel.read_pipe.fileno()) 219 if self.autoclose: 220 channel.close() 221 channel.wait() 222 223 # Enhanced exchange methods involving channel limits. 224 225 def add_wait(self, channel): 226 227 """ 228 Add the given 'channel' to the exchange, waiting if the limit on active 229 channels would be exceeded by adding the channel. 230 """ 231 232 self.wait() 233 self.add(channel) 234 235 def wait(self): 236 237 """ 238 Test for the limit on channels, blocking and reading incoming data until 239 the number of channels is below the limit. 240 """ 241 242 # If limited, block until channels have been closed. 243 244 while self.limit is not None and len(self.active()) >= self.limit: 245 self.store() 246 247 def start_waiting(self, channel): 248 249 """ 250 Start a waiting process given the reception of data on the given 251 'channel'. 252 """ 253 254 if self.waiting: 255 callable, args, kw = self.waiting.pop() 256 257 # Try and reuse existing channels if possible. 258 259 if self.reuse: 260 261 # Re-add the channel - this may update information related to 262 # the channel in subclasses. 263 264 self.add(channel) 265 channel.send((args, kw)) 266 else: 267 self.add(start(callable, *args, **kw)) 268 269 # Where channels are being reused, but where no processes are waiting 270 # any more, send a special value to tell them to quit. 271 272 elif self.reuse: 273 channel.send(None) 274 275 def finish(self): 276 277 """ 278 Finish the use of the exchange by waiting for all channels to complete. 279 """ 280 281 while self.active(): 282 self.store() 283 284 def store(self): 285 286 "For each ready channel, process the incoming data." 287 288 for channel in self.ready(): 289 self.store_data(channel) 290 self.start_waiting(channel) 291 292 def store_data(self, channel): 293 294 """ 295 Store incoming data from the specified 'channel'. In subclasses of this 296 class, such data could be stored using instance attributes. 297 """ 298 299 raise NotImplementedError, "store_data" 300 301 # Convenience methods. 302 303 def start(self, callable, *args, **kw): 304 305 """ 306 Using pprocess.start, create a new process for the given 'callable' 307 using any additional arguments provided. Then, monitor the channel 308 created between this process and the created process. 309 """ 310 311 if self.limit is not None and len(self.active()) >= self.limit: 312 self.waiting.insert(0, (callable, args, kw)) 313 return 314 315 self.add_wait(start(callable, *args, **kw)) 316 317 def create(self): 318 319 """ 320 Using pprocess.create, create a new process and return the created 321 communications channel to the created process. In the creating process, 322 return None - the channel receiving data from the created process will 323 be automatically managed by this exchange. 324 """ 325 326 channel = create() 327 if channel.pid == 0: 328 return channel 329 else: 330 self.add_wait(channel) 331 return None 332 333 def manage(self, callable): 334 335 """ 336 Wrap the given 'callable' in an object which can then be called in the 337 same way as 'callable', but with new processes and communications 338 managed automatically. 339 """ 340 341 return ManagedCallable(callable, self) 342 343 class ManagedCallable: 344 345 "A callable managed by an exchange." 346 347 def __init__(self, callable, exchange): 348 349 """ 350 Wrap the given 'callable', using the given 'exchange' to monitor the 351 channels created for communications between this and the created 352 processes. Note that the 'callable' must be parallel-aware (that is, 353 have a 'channel' parameter). Use the MakeParallel class to wrap other 354 kinds of callable objects. 355 """ 356 357 self.callable = callable 358 self.exchange = exchange 359 360 def __call__(self, *args, **kw): 361 362 "Invoke the callable with the supplied arguments." 363 364 self.exchange.start(self.callable, *args, **kw) 365 366 # Abstractions and utilities. 367 368 class Map(Exchange): 369 370 "An exchange which can be used like the built-in 'map' function." 371 372 def __init__(self, *args, **kw): 373 Exchange.__init__(self, *args, **kw) 374 self.init() 375 376 def init(self): 377 378 "Remember the channel addition order to order output." 379 380 self.channel_number = 0 381 self.channels = {} 382 self.results = [] 383 384 def add(self, channel): 385 386 "Add the given 'channel' to the exchange." 387 388 Exchange.add(self, channel) 389 self.channels[channel] = self.channel_number 390 self.channel_number += 1 391 392 def start(self, callable, *args, **kw): 393 394 """ 395 Using pprocess.start, create a new process for the given 'callable' 396 using any additional arguments provided. Then, monitor the channel 397 created between this process and the created process. 398 """ 399 400 self.results.append(None) # placeholder 401 Exchange.start(self, callable, *args, **kw) 402 403 def create(self): 404 405 """ 406 Using pprocess.create, create a new process and return the created 407 communications channel to the created process. In the creating process, 408 return None - the channel receiving data from the created process will 409 be automatically managed by this exchange. 410 """ 411 412 self.results.append(None) # placeholder 413 return Exchange.create(self) 414 415 def __call__(self, callable, sequence): 416 417 "Wrap and invoke 'callable' for each element in the 'sequence'." 418 419 if not isinstance(callable, MakeParallel): 420 wrapped = MakeParallel(callable) 421 else: 422 wrapped = callable 423 424 self.init() 425 426 # Start processes for each element in the sequence. 427 428 for i in sequence: 429 self.start(wrapped, i) 430 431 # Access to the results occurs through this object. 432 433 return self 434 435 def __getitem__(self, i): 436 self.finish() 437 return self.results[i] 438 439 def __iter__(self): 440 self.finish() 441 return iter(self.results) 442 443 def store_data(self, channel): 444 445 "Accumulate the incoming data, associating results with channels." 446 447 data = channel.receive() 448 self.results[self.channels[channel]] = data 449 del self.channels[channel] 450 451 class Queue(Exchange): 452 453 """ 454 An exchange acting as a queue, making data from created processes available 455 in the order in which it is received. 456 """ 457 458 def __init__(self, *args, **kw): 459 Exchange.__init__(self, *args, **kw) 460 self.queue = [] 461 462 def store_data(self, channel): 463 464 "Accumulate the incoming data, associating results with channels." 465 466 data = channel.receive() 467 self.queue.insert(0, data) 468 469 def __iter__(self): 470 return self 471 472 def next(self): 473 474 "Return the next element in the queue." 475 476 if self.queue: 477 return self.queue.pop() 478 while self.active(): 479 self.store() 480 if self.queue: 481 return self.queue.pop() 482 else: 483 raise StopIteration 484 485 class MakeParallel: 486 487 "A wrapper around functions making them able to communicate results." 488 489 def __init__(self, callable): 490 491 """ 492 Initialise the wrapper with the given 'callable'. This object will then 493 be able to accept a 'channel' parameter when invoked, and to forward the 494 result of the given 'callable' via the channel provided back to the 495 invoking process. 496 """ 497 498 self.callable = callable 499 500 def __call__(self, channel, *args, **kw): 501 502 "Invoke the callable and return its result via the given 'channel'." 503 504 channel.send(self.callable(*args, **kw)) 505 506 class MakeReusable(MakeParallel): 507 508 """ 509 A wrapper around functions making them able to communicate results in a 510 reusable fashion. 511 """ 512 513 def __call__(self, channel, *args, **kw): 514 515 "Invoke the callable and return its result via the given 'channel'." 516 517 channel.send(self.callable(*args, **kw)) 518 t = channel.receive() 519 while t is not None: 520 args, kw = t 521 channel.send(self.callable(*args, **kw)) 522 t = channel.receive() 523 524 # Utility functions. 525 526 def create(): 527 528 """ 529 Create a new process, returning a communications channel to both the 530 creating process and the created process. 531 """ 532 533 parent, child = socket.socketpair() 534 for s in [parent, child]: 535 s.setblocking(1) 536 537 pid = os.fork() 538 if pid == 0: 539 parent.close() 540 return Channel(pid, child.makefile("r", 0), child.makefile("w", 0)) 541 else: 542 child.close() 543 return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0)) 544 545 def exit(channel): 546 547 """ 548 Terminate a created process, closing the given 'channel'. 549 """ 550 551 channel.close() 552 os._exit(0) 553 554 def start(callable, *args, **kw): 555 556 """ 557 Create a new process which shall start running in the given 'callable'. 558 Additional arguments to the 'callable' can be given as additional arguments 559 to this function. 560 561 Return a communications channel to the creating process. For the created 562 process, supply a channel as the 'channel' parameter in the given 'callable' 563 so that it may send data back to the creating process. 564 """ 565 566 channel = create() 567 if channel.pid == 0: 568 try: 569 try: 570 callable(channel, *args, **kw) 571 except: 572 exc_type, exc_value, exc_traceback = sys.exc_info() 573 channel.send(exc_value) 574 finally: 575 exit(channel) 576 else: 577 return channel 578 579 def waitall(): 580 581 "Wait for all created processes to terminate." 582 583 try: 584 while 1: 585 os.wait() 586 except OSError: 587 pass 588 589 def pmap(callable, sequence, limit=None): 590 591 """ 592 A parallel version of the built-in map function with an optional process 593 'limit'. The given 'callable' should not be parallel-aware (that is, have a 594 'channel' parameter) since it will be wrapped for parallel communications 595 before being invoked. 596 597 Return the processed 'sequence' where each element in the sequence is 598 processed by a different process. 599 """ 600 601 mymap = Map(limit=limit) 602 return mymap(callable, sequence) 603 604 # vim: tabstop=4 expandtab shiftwidth=4