1 #!/usr/bin/env python 2 3 """ 4 A simple parallel processing API for Python, inspired somewhat by the thread 5 module, slightly less by pypar, and slightly less still by pypvm. 6 7 Copyright (C) 2005, 2006, 2007, 2008 Paul Boddie <paul@boddie.org.uk> 8 9 This program is free software; you can redistribute it and/or modify it under 10 the terms of the GNU Lesser General Public License as published by the Free 11 Software Foundation; either version 3 of the License, or (at your option) any 12 later version. 13 14 This program is distributed in the hope that it will be useful, but WITHOUT 15 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 16 FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 17 details. 18 19 You should have received a copy of the GNU Lesser General Public License along 20 with this program. If not, see <http://www.gnu.org/licenses/>. 21 """ 22 23 __version__ = "0.3.2" 24 25 import os 26 import sys 27 import select 28 import socket 29 30 try: 31 import cPickle as pickle 32 except ImportError: 33 import pickle 34 35 try: 36 set 37 except NameError: 38 from sets import Set as set 39 40 # Communications. 41 42 class AcknowledgementError(Exception): 43 pass 44 45 class Channel: 46 47 "A communications channel." 48 49 def __init__(self, pid, read_pipe, write_pipe): 50 51 """ 52 Initialise the channel with a process identifier 'pid', a 'read_pipe' 53 from which messages will be received, and a 'write_pipe' into which 54 messages will be sent. 55 """ 56 57 self.pid = pid 58 self.read_pipe = read_pipe 59 self.write_pipe = write_pipe 60 self.closed = 0 61 62 def __del__(self): 63 64 # Since signals don't work well with I/O, we close pipes and wait for 65 # created processes upon finalisation. 66 67 self.close() 68 69 def close(self): 70 71 "Explicitly close the channel." 72 73 if not self.closed: 74 self.closed = 1 75 self.read_pipe.close() 76 self.write_pipe.close() 77 #self.wait(os.WNOHANG) 78 79 def wait(self, options=0): 80 81 "Wait for the created process, if any, to exit." 82 83 if self.pid != 0: 84 try: 85 os.waitpid(self.pid, options) 86 except OSError: 87 pass 88 89 def _send(self, obj): 90 91 "Send the given object 'obj' through the channel." 92 93 pickle.dump(obj, self.write_pipe) 94 self.write_pipe.flush() 95 96 def send(self, obj): 97 98 """ 99 Send the given object 'obj' through the channel. Then wait for an 100 acknowledgement. (The acknowledgement makes the caller wait, thus 101 preventing processes from exiting and disrupting the communications 102 channel and losing data.) 103 """ 104 105 self._send(obj) 106 if self._receive() != "OK": 107 raise AcknowledgementError, obj 108 109 def _receive(self): 110 111 "Receive an object through the channel, returning the object." 112 113 obj = pickle.load(self.read_pipe) 114 if isinstance(obj, Exception): 115 raise obj 116 else: 117 return obj 118 119 def receive(self): 120 121 """ 122 Receive an object through the channel, returning the object. Send an 123 acknowledgement of receipt. (The acknowledgement makes the sender wait, 124 thus preventing processes from exiting and disrupting the communications 125 channel and losing data.) 126 """ 127 128 try: 129 obj = self._receive() 130 return obj 131 finally: 132 self._send("OK") 133 134 # Management of processes and communications. 135 136 class Exchange: 137 138 """ 139 A communications exchange that can be used to detect channels which are 140 ready to communicate. Subclasses of this class can define the 'store_data' 141 method in order to enable the 'add_wait', 'wait' and 'finish' methods. 142 """ 143 144 def __init__(self, channels=None, limit=None, reuse=0, autoclose=1): 145 146 """ 147 Initialise the exchange with an optional list of 'channels'. 148 149 If the optional 'limit' is specified, restrictions on the addition of 150 new channels can be enforced and observed through the 'add_wait', 'wait' 151 and 'finish' methods. To make use of these methods, create a subclass of 152 this class and define a working 'store_data' method. 153 154 If the optional 'reuse' parameter is set to a true value, channels and 155 processes will be reused for waiting computations. 156 157 If the optional 'autoclose' parameter is set to a false value, channels 158 will not be closed automatically when they are removed from the exchange 159 - by default they are closed when removed. 160 """ 161 162 self.limit = limit 163 self.reuse = reuse 164 self.autoclose = autoclose 165 self.waiting = [] 166 self.readables = {} 167 self.removed = [] 168 self.poller = select.poll() 169 for channel in channels or []: 170 self.add(channel) 171 172 def add(self, channel): 173 174 "Add the given 'channel' to the exchange." 175 176 self.readables[channel.read_pipe.fileno()] = channel 177 self.poller.register(channel.read_pipe.fileno(), select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR) 178 179 def active(self): 180 181 "Return a list of active channels." 182 183 return self.readables.values() 184 185 def ready(self, timeout=None): 186 187 """ 188 Wait for a period of time specified by the optional 'timeout' (or until 189 communication is possible) and return a list of channels which are ready 190 to be read from. 191 """ 192 193 fds = self.poller.poll(timeout) 194 readables = [] 195 self.removed = [] 196 197 for fd, status in fds: 198 channel = self.readables[fd] 199 removed = 0 200 201 # Remove ended/error channels. 202 203 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR): 204 self.remove(channel) 205 self.removed.append(channel) 206 removed = 1 207 208 # Record readable channels. 209 210 if status & select.POLLIN: 211 if not (removed and self.autoclose): 212 readables.append(channel) 213 214 return readables 215 216 def remove(self, channel): 217 218 """ 219 Remove the given 'channel' from the exchange. 220 """ 221 222 del self.readables[channel.read_pipe.fileno()] 223 self.poller.unregister(channel.read_pipe.fileno()) 224 if self.autoclose: 225 channel.close() 226 channel.wait() 227 228 # Enhanced exchange methods involving channel limits. 229 230 def add_wait(self, channel): 231 232 """ 233 Add the given 'channel' to the exchange, waiting if the limit on active 234 channels would be exceeded by adding the channel. 235 """ 236 237 self.wait() 238 self.add(channel) 239 240 def wait(self): 241 242 """ 243 Test for the limit on channels, blocking and reading incoming data until 244 the number of channels is below the limit. 245 """ 246 247 # If limited, block until channels have been closed. 248 249 while self.limit is not None and len(self.active()) >= self.limit: 250 self.store() 251 252 def start_waiting(self, channel): 253 254 """ 255 Start a waiting process given the reception of data on the given 256 'channel'. 257 """ 258 259 if self.waiting: 260 callable, args, kw = self.waiting.pop() 261 262 # Try and reuse existing channels if possible. 263 264 if self.reuse: 265 266 # Re-add the channel - this may update information related to 267 # the channel in subclasses. 268 269 self.add(channel) 270 channel.send((args, kw)) 271 else: 272 self.add(start(callable, *args, **kw)) 273 274 # Where channels are being reused, but where no processes are waiting 275 # any more, send a special value to tell them to quit. 276 277 elif self.reuse: 278 channel.send(None) 279 280 def finish(self): 281 282 """ 283 Finish the use of the exchange by waiting for all channels to complete. 284 """ 285 286 while self.active(): 287 self.store() 288 289 def store(self): 290 291 "For each ready channel, process the incoming data." 292 293 for channel in self.ready(): 294 self.store_data(channel) 295 self.start_waiting(channel) 296 297 def store_data(self, channel): 298 299 """ 300 Store incoming data from the specified 'channel'. In subclasses of this 301 class, such data could be stored using instance attributes. 302 """ 303 304 raise NotImplementedError, "store_data" 305 306 # Convenience methods. 307 308 def start(self, callable, *args, **kw): 309 310 """ 311 Using pprocess.start, create a new process for the given 'callable' 312 using any additional arguments provided. Then, monitor the channel 313 created between this process and the created process. 314 """ 315 316 if self.limit is not None and len(self.active()) >= self.limit: 317 self.waiting.insert(0, (callable, args, kw)) 318 return 319 320 self.add_wait(start(callable, *args, **kw)) 321 322 def create(self): 323 324 """ 325 Using pprocess.create, create a new process and return the created 326 communications channel to the created process. In the creating process, 327 return None - the channel receiving data from the created process will 328 be automatically managed by this exchange. 329 """ 330 331 channel = create() 332 if channel.pid == 0: 333 return channel 334 else: 335 self.add_wait(channel) 336 return None 337 338 def manage(self, callable): 339 340 """ 341 Wrap the given 'callable' in an object which can then be called in the 342 same way as 'callable', but with new processes and communications 343 managed automatically. 344 """ 345 346 return ManagedCallable(callable, self) 347 348 class ManagedCallable: 349 350 "A callable managed by an exchange." 351 352 def __init__(self, callable, exchange): 353 354 """ 355 Wrap the given 'callable', using the given 'exchange' to monitor the 356 channels created for communications between this and the created 357 processes. Note that the 'callable' must be parallel-aware (that is, 358 have a 'channel' parameter). Use the MakeParallel class to wrap other 359 kinds of callable objects. 360 """ 361 362 self.callable = callable 363 self.exchange = exchange 364 365 def __call__(self, *args, **kw): 366 367 "Invoke the callable with the supplied arguments." 368 369 self.exchange.start(self.callable, *args, **kw) 370 371 # Abstractions and utilities. 372 373 class Map(Exchange): 374 375 "An exchange which can be used like the built-in 'map' function." 376 377 def __init__(self, *args, **kw): 378 Exchange.__init__(self, *args, **kw) 379 self.init() 380 381 def init(self): 382 383 "Remember the channel addition order to order output." 384 385 self.channel_number = 0 386 self.channels = {} 387 self.results = [] 388 389 def add(self, channel): 390 391 "Add the given 'channel' to the exchange." 392 393 Exchange.add(self, channel) 394 self.channels[channel] = self.channel_number 395 self.channel_number += 1 396 397 def start(self, callable, *args, **kw): 398 399 """ 400 Using pprocess.start, create a new process for the given 'callable' 401 using any additional arguments provided. Then, monitor the channel 402 created between this process and the created process. 403 """ 404 405 self.results.append(None) # placeholder 406 Exchange.start(self, callable, *args, **kw) 407 408 def create(self): 409 410 """ 411 Using pprocess.create, create a new process and return the created 412 communications channel to the created process. In the creating process, 413 return None - the channel receiving data from the created process will 414 be automatically managed by this exchange. 415 """ 416 417 self.results.append(None) # placeholder 418 return Exchange.create(self) 419 420 def __call__(self, callable, sequence): 421 422 "Wrap and invoke 'callable' for each element in the 'sequence'." 423 424 if not isinstance(callable, MakeParallel): 425 wrapped = MakeParallel(callable) 426 else: 427 wrapped = callable 428 429 self.init() 430 431 # Start processes for each element in the sequence. 432 433 for i in sequence: 434 self.start(wrapped, i) 435 436 # Access to the results occurs through this object. 437 438 return self 439 440 def __getitem__(self, i): 441 self.finish() 442 return self.results[i] 443 444 def __iter__(self): 445 self.finish() 446 return iter(self.results) 447 448 def store_data(self, channel): 449 450 "Accumulate the incoming data, associating results with channels." 451 452 data = channel.receive() 453 self.results[self.channels[channel]] = data 454 del self.channels[channel] 455 456 class Queue(Exchange): 457 458 """ 459 An exchange acting as a queue, making data from created processes available 460 in the order in which it is received. 461 """ 462 463 def __init__(self, *args, **kw): 464 Exchange.__init__(self, *args, **kw) 465 self.queue = [] 466 467 def store_data(self, channel): 468 469 "Accumulate the incoming data, associating results with channels." 470 471 data = channel.receive() 472 self.queue.insert(0, data) 473 474 def __iter__(self): 475 return self 476 477 def next(self): 478 479 "Return the next element in the queue." 480 481 if self.queue: 482 return self.queue.pop() 483 while self.active(): 484 self.store() 485 if self.queue: 486 return self.queue.pop() 487 else: 488 raise StopIteration 489 490 class MakeParallel: 491 492 "A wrapper around functions making them able to communicate results." 493 494 def __init__(self, callable): 495 496 """ 497 Initialise the wrapper with the given 'callable'. This object will then 498 be able to accept a 'channel' parameter when invoked, and to forward the 499 result of the given 'callable' via the channel provided back to the 500 invoking process. 501 """ 502 503 self.callable = callable 504 505 def __call__(self, channel, *args, **kw): 506 507 "Invoke the callable and return its result via the given 'channel'." 508 509 channel.send(self.callable(*args, **kw)) 510 511 class MakeReusable(MakeParallel): 512 513 """ 514 A wrapper around functions making them able to communicate results in a 515 reusable fashion. 516 """ 517 518 def __call__(self, channel, *args, **kw): 519 520 "Invoke the callable and return its result via the given 'channel'." 521 522 channel.send(self.callable(*args, **kw)) 523 t = channel.receive() 524 while t is not None: 525 args, kw = t 526 channel.send(self.callable(*args, **kw)) 527 t = channel.receive() 528 529 # Utility functions. 530 531 _cpuinfo_fields = "physical id", "core id" 532 533 def get_number_of_cores(): 534 535 """ 536 Return the number of distinct, genuine processor cores. If the platform is 537 not supported by this function, None is returned. 538 """ 539 540 try: 541 f = open("/proc/cpuinfo") 542 try: 543 processors = set() 544 processor = [None, None] 545 546 for line in f.xreadlines(): 547 for i, field in enumerate(_cpuinfo_fields): 548 if line.startswith(field): 549 t = line.split(":") 550 processor[i] = int(t[1].strip()) 551 break 552 else: 553 if line.startswith("processor") and processor[0] is not None: 554 processors.add(tuple(processor)) 555 processor = [None, None] 556 557 if processor[0] is not None: 558 processors.add(tuple(processor)) 559 560 return len(processors) 561 562 finally: 563 f.close() 564 565 except OSError: 566 return None 567 568 def create(): 569 570 """ 571 Create a new process, returning a communications channel to both the 572 creating process and the created process. 573 """ 574 575 parent, child = socket.socketpair() 576 for s in [parent, child]: 577 s.setblocking(1) 578 579 pid = os.fork() 580 if pid == 0: 581 parent.close() 582 return Channel(pid, child.makefile("r", 0), child.makefile("w", 0)) 583 else: 584 child.close() 585 return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0)) 586 587 def exit(channel): 588 589 """ 590 Terminate a created process, closing the given 'channel'. 591 """ 592 593 channel.close() 594 os._exit(0) 595 596 def start(callable, *args, **kw): 597 598 """ 599 Create a new process which shall start running in the given 'callable'. 600 Additional arguments to the 'callable' can be given as additional arguments 601 to this function. 602 603 Return a communications channel to the creating process. For the created 604 process, supply a channel as the 'channel' parameter in the given 'callable' 605 so that it may send data back to the creating process. 606 """ 607 608 channel = create() 609 if channel.pid == 0: 610 try: 611 try: 612 callable(channel, *args, **kw) 613 except: 614 exc_type, exc_value, exc_traceback = sys.exc_info() 615 channel.send(exc_value) 616 finally: 617 exit(channel) 618 else: 619 return channel 620 621 def waitall(): 622 623 "Wait for all created processes to terminate." 624 625 try: 626 while 1: 627 os.wait() 628 except OSError: 629 pass 630 631 def pmap(callable, sequence, limit=None): 632 633 """ 634 A parallel version of the built-in map function with an optional process 635 'limit'. The given 'callable' should not be parallel-aware (that is, have a 636 'channel' parameter) since it will be wrapped for parallel communications 637 before being invoked. 638 639 Return the processed 'sequence' where each element in the sequence is 640 processed by a different process. 641 """ 642 643 mymap = Map(limit=limit) 644 return mymap(callable, sequence) 645 646 # vim: tabstop=4 expandtab shiftwidth=4