pprocess

Annotated pprocess.py

139:49b5b0a3fa61
2008-05-13 paulb [project @ 2008-05-13 18:05:40 by paulb] Updated release notes.
paulb@40 1
#!/usr/bin/env python
paulb@40 2
paulb@40 3
"""
paulb@40 4
A simple parallel processing API for Python, inspired somewhat by the thread
paulb@40 5
module, slightly less by pypar, and slightly less still by pypvm.
paulb@40 6
paulb@135 7
Copyright (C) 2005, 2006, 2007, 2008 Paul Boddie <paul@boddie.org.uk>
paulb@41 8
paulb@79 9
This program is free software; you can redistribute it and/or modify it under
paulb@79 10
the terms of the GNU Lesser General Public License as published by the Free
paulb@79 11
Software Foundation; either version 3 of the License, or (at your option) any
paulb@79 12
later version.
paulb@41 13
paulb@79 14
This program is distributed in the hope that it will be useful, but WITHOUT
paulb@79 15
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
paulb@79 16
FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
paulb@79 17
details.
paulb@41 18
paulb@79 19
You should have received a copy of the GNU Lesser General Public License along
paulb@79 20
with this program.  If not, see <http://www.gnu.org/licenses/>.
paulb@40 21
"""
paulb@40 22
paulb@135 23
__version__ = "0.3.2"
paulb@40 24
paulb@40 25
import os
paulb@40 26
import sys
paulb@40 27
import select
paulb@40 28
import socket
paulb@40 29
paulb@40 30
try:
paulb@40 31
    import cPickle as pickle
paulb@40 32
except ImportError:
paulb@40 33
    import pickle
paulb@40 34
paulb@135 35
try:
paulb@135 36
    set
paulb@135 37
except NameError:
paulb@135 38
    from sets import Set as set
paulb@135 39
paulb@84 40
# Communications.
paulb@84 41
paulb@40 42
class AcknowledgementError(Exception):
paulb@40 43
    pass
paulb@40 44
paulb@40 45
class Channel:
paulb@40 46
paulb@40 47
    "A communications channel."
paulb@40 48
paulb@40 49
    def __init__(self, pid, read_pipe, write_pipe):
paulb@40 50
paulb@40 51
        """
paulb@40 52
        Initialise the channel with a process identifier 'pid', a 'read_pipe'
paulb@40 53
        from which messages will be received, and a 'write_pipe' into which
paulb@40 54
        messages will be sent.
paulb@40 55
        """
paulb@40 56
paulb@40 57
        self.pid = pid
paulb@40 58
        self.read_pipe = read_pipe
paulb@40 59
        self.write_pipe = write_pipe
paulb@40 60
        self.closed = 0
paulb@40 61
paulb@40 62
    def __del__(self):
paulb@40 63
paulb@40 64
        # Since signals don't work well with I/O, we close pipes and wait for
paulb@40 65
        # created processes upon finalisation.
paulb@40 66
paulb@40 67
        self.close()
paulb@40 68
paulb@40 69
    def close(self):
paulb@40 70
paulb@40 71
        "Explicitly close the channel."
paulb@40 72
paulb@40 73
        if not self.closed:
paulb@40 74
            self.closed = 1
paulb@40 75
            self.read_pipe.close()
paulb@40 76
            self.write_pipe.close()
paulb@40 77
            #self.wait(os.WNOHANG)
paulb@40 78
paulb@40 79
    def wait(self, options=0):
paulb@40 80
paulb@40 81
        "Wait for the created process, if any, to exit."
paulb@40 82
paulb@40 83
        if self.pid != 0:
paulb@40 84
            try:
paulb@40 85
                os.waitpid(self.pid, options)
paulb@40 86
            except OSError:
paulb@40 87
                pass
paulb@40 88
paulb@40 89
    def _send(self, obj):
paulb@40 90
paulb@40 91
        "Send the given object 'obj' through the channel."
paulb@40 92
paulb@40 93
        pickle.dump(obj, self.write_pipe)
paulb@40 94
        self.write_pipe.flush()
paulb@40 95
paulb@40 96
    def send(self, obj):
paulb@40 97
paulb@40 98
        """
paulb@40 99
        Send the given object 'obj' through the channel. Then wait for an
paulb@40 100
        acknowledgement. (The acknowledgement makes the caller wait, thus
paulb@40 101
        preventing processes from exiting and disrupting the communications
paulb@40 102
        channel and losing data.)
paulb@40 103
        """
paulb@40 104
paulb@40 105
        self._send(obj)
paulb@40 106
        if self._receive() != "OK":
paulb@40 107
            raise AcknowledgementError, obj
paulb@40 108
paulb@40 109
    def _receive(self):
paulb@40 110
paulb@40 111
        "Receive an object through the channel, returning the object."
paulb@40 112
paulb@40 113
        obj = pickle.load(self.read_pipe)
paulb@40 114
        if isinstance(obj, Exception):
paulb@40 115
            raise obj
paulb@40 116
        else:
paulb@40 117
            return obj
paulb@40 118
paulb@40 119
    def receive(self):
paulb@40 120
paulb@40 121
        """
paulb@40 122
        Receive an object through the channel, returning the object. Send an
paulb@40 123
        acknowledgement of receipt. (The acknowledgement makes the sender wait,
paulb@40 124
        thus preventing processes from exiting and disrupting the communications
paulb@40 125
        channel and losing data.)
paulb@40 126
        """
paulb@40 127
paulb@40 128
        try:
paulb@40 129
            obj = self._receive()
paulb@40 130
            return obj
paulb@40 131
        finally:
paulb@40 132
            self._send("OK")
paulb@40 133
paulb@84 134
# Management of processes and communications.
paulb@84 135
paulb@40 136
class Exchange:
paulb@40 137
paulb@40 138
    """
paulb@40 139
    A communications exchange that can be used to detect channels which are
paulb@67 140
    ready to communicate. Subclasses of this class can define the 'store_data'
paulb@67 141
    method in order to enable the 'add_wait', 'wait' and 'finish' methods.
paulb@40 142
    """
paulb@40 143
paulb@116 144
    def __init__(self, channels=None, limit=None, reuse=0, autoclose=1):
paulb@40 145
paulb@40 146
        """
paulb@67 147
        Initialise the exchange with an optional list of 'channels'.
paulb@67 148
paulb@67 149
        If the optional 'limit' is specified, restrictions on the addition of
paulb@67 150
        new channels can be enforced and observed through the 'add_wait', 'wait'
paulb@67 151
        and 'finish' methods. To make use of these methods, create a subclass of
paulb@67 152
        this class and define a working 'store_data' method.
paulb@67 153
paulb@116 154
        If the optional 'reuse' parameter is set to a true value, channels and
paulb@116 155
        processes will be reused for waiting computations.
paulb@116 156
paulb@67 157
        If the optional 'autoclose' parameter is set to a false value, channels
paulb@67 158
        will not be closed automatically when they are removed from the exchange
paulb@67 159
        - by default they are closed when removed.
paulb@40 160
        """
paulb@40 161
paulb@67 162
        self.limit = limit
paulb@116 163
        self.reuse = reuse
paulb@116 164
        self.autoclose = autoclose
paulb@99 165
        self.waiting = []
paulb@40 166
        self.readables = {}
paulb@58 167
        self.removed = []
paulb@40 168
        self.poller = select.poll()
paulb@40 169
        for channel in channels or []:
paulb@40 170
            self.add(channel)
paulb@40 171
paulb@40 172
    def add(self, channel):
paulb@40 173
paulb@40 174
        "Add the given 'channel' to the exchange."
paulb@40 175
paulb@40 176
        self.readables[channel.read_pipe.fileno()] = channel
paulb@40 177
        self.poller.register(channel.read_pipe.fileno(), select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR)
paulb@40 178
paulb@40 179
    def active(self):
paulb@40 180
paulb@40 181
        "Return a list of active channels."
paulb@40 182
paulb@40 183
        return self.readables.values()
paulb@40 184
paulb@40 185
    def ready(self, timeout=None):
paulb@40 186
paulb@40 187
        """
paulb@40 188
        Wait for a period of time specified by the optional 'timeout' (or until
paulb@40 189
        communication is possible) and return a list of channels which are ready
paulb@40 190
        to be read from.
paulb@40 191
        """
paulb@40 192
paulb@40 193
        fds = self.poller.poll(timeout)
paulb@40 194
        readables = []
paulb@58 195
        self.removed = []
paulb@58 196
paulb@40 197
        for fd, status in fds:
paulb@40 198
            channel = self.readables[fd]
paulb@55 199
            removed = 0
paulb@40 200
paulb@40 201
            # Remove ended/error channels.
paulb@40 202
paulb@40 203
            if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR):
paulb@40 204
                self.remove(channel)
paulb@58 205
                self.removed.append(channel)
paulb@55 206
                removed = 1
paulb@40 207
paulb@40 208
            # Record readable channels.
paulb@40 209
paulb@55 210
            if status & select.POLLIN:
paulb@55 211
                if not (removed and self.autoclose):
paulb@55 212
                    readables.append(channel)
paulb@40 213
paulb@40 214
        return readables
paulb@40 215
paulb@40 216
    def remove(self, channel):
paulb@40 217
paulb@40 218
        """
paulb@40 219
        Remove the given 'channel' from the exchange.
paulb@40 220
        """
paulb@40 221
paulb@40 222
        del self.readables[channel.read_pipe.fileno()]
paulb@40 223
        self.poller.unregister(channel.read_pipe.fileno())
paulb@40 224
        if self.autoclose:
paulb@40 225
            channel.close()
paulb@40 226
            channel.wait()
paulb@40 227
paulb@67 228
    # Enhanced exchange methods involving channel limits.
paulb@67 229
paulb@67 230
    def add_wait(self, channel):
paulb@67 231
paulb@67 232
        """
paulb@67 233
        Add the given 'channel' to the exchange, waiting if the limit on active
paulb@67 234
        channels would be exceeded by adding the channel.
paulb@67 235
        """
paulb@67 236
paulb@67 237
        self.wait()
paulb@67 238
        self.add(channel)
paulb@67 239
paulb@67 240
    def wait(self):
paulb@67 241
paulb@67 242
        """
paulb@67 243
        Test for the limit on channels, blocking and reading incoming data until
paulb@67 244
        the number of channels is below the limit.
paulb@67 245
        """
paulb@67 246
paulb@67 247
        # If limited, block until channels have been closed.
paulb@67 248
paulb@67 249
        while self.limit is not None and len(self.active()) >= self.limit:
paulb@67 250
            self.store()
paulb@67 251
paulb@116 252
    def start_waiting(self, channel):
paulb@99 253
paulb@99 254
        """
paulb@116 255
        Start a waiting process given the reception of data on the given
paulb@116 256
        'channel'.
paulb@99 257
        """
paulb@99 258
paulb@99 259
        if self.waiting:
paulb@99 260
            callable, args, kw = self.waiting.pop()
paulb@116 261
paulb@116 262
            # Try and reuse existing channels if possible.
paulb@116 263
paulb@116 264
            if self.reuse:
paulb@119 265
paulb@119 266
                # Re-add the channel - this may update information related to
paulb@119 267
                # the channel in subclasses.
paulb@119 268
paulb@119 269
                self.add(channel)
paulb@116 270
                channel.send((args, kw))
paulb@116 271
            else:
paulb@116 272
                self.add(start(callable, *args, **kw))
paulb@116 273
paulb@116 274
        # Where channels are being reused, but where no processes are waiting
paulb@116 275
        # any more, send a special value to tell them to quit.
paulb@116 276
paulb@116 277
        elif self.reuse:
paulb@116 278
            channel.send(None)
paulb@99 279
paulb@67 280
    def finish(self):
paulb@67 281
paulb@67 282
        """
paulb@67 283
        Finish the use of the exchange by waiting for all channels to complete.
paulb@67 284
        """
paulb@67 285
paulb@67 286
        while self.active():
paulb@67 287
            self.store()
paulb@67 288
paulb@67 289
    def store(self):
paulb@67 290
paulb@67 291
        "For each ready channel, process the incoming data."
paulb@67 292
paulb@67 293
        for channel in self.ready():
paulb@67 294
            self.store_data(channel)
paulb@116 295
            self.start_waiting(channel)
paulb@67 296
paulb@67 297
    def store_data(self, channel):
paulb@67 298
paulb@67 299
        """
paulb@67 300
        Store incoming data from the specified 'channel'. In subclasses of this
paulb@67 301
        class, such data could be stored using instance attributes.
paulb@67 302
        """
paulb@67 303
paulb@67 304
        raise NotImplementedError, "store_data"
paulb@67 305
paulb@79 306
    # Convenience methods.
paulb@79 307
paulb@84 308
    def start(self, callable, *args, **kw):
paulb@79 309
paulb@79 310
        """
paulb@79 311
        Using pprocess.start, create a new process for the given 'callable'
paulb@79 312
        using any additional arguments provided. Then, monitor the channel
paulb@79 313
        created between this process and the created process.
paulb@79 314
        """
paulb@79 315
paulb@99 316
        if self.limit is not None and len(self.active()) >= self.limit:
paulb@99 317
            self.waiting.insert(0, (callable, args, kw))
paulb@99 318
            return
paulb@99 319
paulb@84 320
        self.add_wait(start(callable, *args, **kw))
paulb@84 321
paulb@97 322
    def create(self):
paulb@97 323
paulb@97 324
        """
paulb@97 325
        Using pprocess.create, create a new process and return the created
paulb@97 326
        communications channel to the created process. In the creating process,
paulb@97 327
        return None - the channel receiving data from the created process will
paulb@97 328
        be automatically managed by this exchange.
paulb@97 329
        """
paulb@97 330
paulb@97 331
        channel = create()
paulb@97 332
        if channel.pid == 0:
paulb@97 333
            return channel
paulb@97 334
        else:
paulb@97 335
            self.add_wait(channel)
paulb@97 336
            return None
paulb@97 337
paulb@84 338
    def manage(self, callable):
paulb@84 339
paulb@84 340
        """
paulb@84 341
        Wrap the given 'callable' in an object which can then be called in the
paulb@84 342
        same way as 'callable', but with new processes and communications
paulb@84 343
        managed automatically.
paulb@84 344
        """
paulb@84 345
paulb@84 346
        return ManagedCallable(callable, self)
paulb@84 347
paulb@84 348
class ManagedCallable:
paulb@84 349
paulb@84 350
    "A callable managed by an exchange."
paulb@84 351
paulb@84 352
    def __init__(self, callable, exchange):
paulb@84 353
paulb@84 354
        """
paulb@84 355
        Wrap the given 'callable', using the given 'exchange' to monitor the
paulb@84 356
        channels created for communications between this and the created
paulb@94 357
        processes. Note that the 'callable' must be parallel-aware (that is,
paulb@94 358
        have a 'channel' parameter). Use the MakeParallel class to wrap other
paulb@94 359
        kinds of callable objects.
paulb@84 360
        """
paulb@84 361
paulb@84 362
        self.callable = callable
paulb@84 363
        self.exchange = exchange
paulb@84 364
paulb@84 365
    def __call__(self, *args, **kw):
paulb@84 366
paulb@84 367
        "Invoke the callable with the supplied arguments."
paulb@84 368
paulb@84 369
        self.exchange.start(self.callable, *args, **kw)
paulb@84 370
paulb@84 371
# Abstractions and utilities.
paulb@84 372
paulb@84 373
class Map(Exchange):
paulb@84 374
paulb@84 375
    "An exchange which can be used like the built-in 'map' function."
paulb@84 376
paulb@107 377
    def __init__(self, *args, **kw):
paulb@107 378
        Exchange.__init__(self, *args, **kw)
paulb@107 379
        self.init()
paulb@107 380
paulb@107 381
    def init(self):
paulb@107 382
paulb@107 383
        "Remember the channel addition order to order output."
paulb@107 384
paulb@107 385
        self.channel_number = 0
paulb@107 386
        self.channels = {}
paulb@107 387
        self.results = []
paulb@107 388
paulb@84 389
    def add(self, channel):
paulb@84 390
paulb@84 391
        "Add the given 'channel' to the exchange."
paulb@84 392
paulb@84 393
        Exchange.add(self, channel)
paulb@92 394
        self.channels[channel] = self.channel_number
paulb@92 395
        self.channel_number += 1
paulb@84 396
paulb@107 397
    def start(self, callable, *args, **kw):
paulb@107 398
paulb@107 399
        """
paulb@107 400
        Using pprocess.start, create a new process for the given 'callable'
paulb@107 401
        using any additional arguments provided. Then, monitor the channel
paulb@107 402
        created between this process and the created process.
paulb@107 403
        """
paulb@107 404
paulb@107 405
        self.results.append(None) # placeholder
paulb@107 406
        Exchange.start(self, callable, *args, **kw)
paulb@107 407
paulb@110 408
    def create(self):
paulb@110 409
paulb@110 410
        """
paulb@110 411
        Using pprocess.create, create a new process and return the created
paulb@110 412
        communications channel to the created process. In the creating process,
paulb@110 413
        return None - the channel receiving data from the created process will
paulb@110 414
        be automatically managed by this exchange.
paulb@110 415
        """
paulb@110 416
paulb@110 417
        self.results.append(None) # placeholder
paulb@110 418
        return Exchange.create(self)
paulb@110 419
paulb@84 420
    def __call__(self, callable, sequence):
paulb@84 421
paulb@89 422
        "Wrap and invoke 'callable' for each element in the 'sequence'."
paulb@89 423
paulb@92 424
        if not isinstance(callable, MakeParallel):
paulb@92 425
            wrapped = MakeParallel(callable)
paulb@92 426
        else:
paulb@92 427
            wrapped = callable
paulb@84 428
paulb@107 429
        self.init()
paulb@84 430
paulb@107 431
        # Start processes for each element in the sequence.
paulb@84 432
paulb@84 433
        for i in sequence:
paulb@92 434
            self.start(wrapped, i)
paulb@107 435
paulb@107 436
        # Access to the results occurs through this object.
paulb@107 437
paulb@107 438
        return self
paulb@84 439
paulb@107 440
    def __getitem__(self, i):
paulb@107 441
        self.finish()
paulb@107 442
        return self.results[i]
paulb@84 443
paulb@107 444
    def __iter__(self):
paulb@107 445
        self.finish()
paulb@107 446
        return iter(self.results)
paulb@84 447
paulb@84 448
    def store_data(self, channel):
paulb@84 449
paulb@84 450
        "Accumulate the incoming data, associating results with channels."
paulb@84 451
paulb@84 452
        data = channel.receive()
paulb@92 453
        self.results[self.channels[channel]] = data
paulb@92 454
        del self.channels[channel]
paulb@84 455
paulb@97 456
class Queue(Exchange):
paulb@97 457
paulb@97 458
    """
paulb@97 459
    An exchange acting as a queue, making data from created processes available
paulb@97 460
    in the order in which it is received.
paulb@97 461
    """
paulb@97 462
paulb@97 463
    def __init__(self, *args, **kw):
paulb@97 464
        Exchange.__init__(self, *args, **kw)
paulb@97 465
        self.queue = []
paulb@97 466
paulb@97 467
    def store_data(self, channel):
paulb@97 468
paulb@97 469
        "Accumulate the incoming data, associating results with channels."
paulb@97 470
paulb@97 471
        data = channel.receive()
paulb@97 472
        self.queue.insert(0, data)
paulb@97 473
paulb@97 474
    def __iter__(self):
paulb@97 475
        return self
paulb@97 476
paulb@97 477
    def next(self):
paulb@97 478
paulb@97 479
        "Return the next element in the queue."
paulb@97 480
paulb@97 481
        if self.queue:
paulb@97 482
            return self.queue.pop()
paulb@97 483
        while self.active():
paulb@97 484
            self.store()
paulb@97 485
            if self.queue:
paulb@97 486
                return self.queue.pop()
paulb@97 487
        else:
paulb@97 488
            raise StopIteration
paulb@97 489
paulb@84 490
class MakeParallel:
paulb@84 491
paulb@84 492
    "A wrapper around functions making them able to communicate results."
paulb@84 493
paulb@84 494
    def __init__(self, callable):
paulb@84 495
paulb@94 496
        """
paulb@94 497
        Initialise the wrapper with the given 'callable'. This object will then
paulb@94 498
        be able to accept a 'channel' parameter when invoked, and to forward the
paulb@94 499
        result of the given 'callable' via the channel provided back to the
paulb@94 500
        invoking process.
paulb@94 501
        """
paulb@84 502
paulb@84 503
        self.callable = callable
paulb@84 504
paulb@84 505
    def __call__(self, channel, *args, **kw):
paulb@84 506
paulb@84 507
        "Invoke the callable and return its result via the given 'channel'."
paulb@84 508
paulb@84 509
        channel.send(self.callable(*args, **kw))
paulb@84 510
paulb@119 511
class MakeReusable(MakeParallel):
paulb@119 512
paulb@119 513
    """
paulb@119 514
    A wrapper around functions making them able to communicate results in a
paulb@119 515
    reusable fashion.
paulb@119 516
    """
paulb@119 517
paulb@119 518
    def __call__(self, channel, *args, **kw):
paulb@119 519
paulb@119 520
        "Invoke the callable and return its result via the given 'channel'."
paulb@119 521
paulb@119 522
        channel.send(self.callable(*args, **kw))
paulb@119 523
        t = channel.receive()
paulb@119 524
        while t is not None:
paulb@119 525
            args, kw = t
paulb@119 526
            channel.send(self.callable(*args, **kw))
paulb@119 527
            t = channel.receive()
paulb@119 528
paulb@84 529
# Utility functions.
paulb@79 530
paulb@135 531
_cpuinfo_fields = "physical id", "core id"
paulb@135 532
paulb@135 533
def get_number_of_cores():
paulb@135 534
paulb@135 535
    """
paulb@135 536
    Return the number of distinct, genuine processor cores. If the platform is
paulb@135 537
    not supported by this function, None is returned.
paulb@135 538
    """
paulb@135 539
paulb@135 540
    try:
paulb@135 541
        f = open("/proc/cpuinfo")
paulb@135 542
        try:
paulb@135 543
            processors = set()
paulb@135 544
            processor = [None, None]
paulb@135 545
paulb@135 546
            for line in f.xreadlines():
paulb@135 547
                for i, field in enumerate(_cpuinfo_fields):
paulb@135 548
                    if line.startswith(field):
paulb@135 549
                        t = line.split(":")
paulb@135 550
                        processor[i] = int(t[1].strip())
paulb@135 551
                        break
paulb@135 552
                else:
paulb@135 553
                    if line.startswith("processor") and processor[0] is not None:
paulb@135 554
                        processors.add(tuple(processor))
paulb@135 555
                        processor = [None, None]
paulb@135 556
paulb@135 557
            if processor[0] is not None:
paulb@135 558
                processors.add(tuple(processor))
paulb@135 559
paulb@135 560
            return len(processors)
paulb@135 561
paulb@135 562
        finally:
paulb@135 563
            f.close()
paulb@135 564
paulb@135 565
    except OSError:
paulb@135 566
        return None
paulb@135 567
paulb@40 568
def create():
paulb@40 569
paulb@40 570
    """
paulb@40 571
    Create a new process, returning a communications channel to both the
paulb@40 572
    creating process and the created process.
paulb@40 573
    """
paulb@40 574
paulb@40 575
    parent, child = socket.socketpair()
paulb@40 576
    for s in [parent, child]:
paulb@40 577
        s.setblocking(1)
paulb@40 578
paulb@40 579
    pid = os.fork()
paulb@40 580
    if pid == 0:
paulb@40 581
        parent.close()
paulb@73 582
        return Channel(pid, child.makefile("r", 0), child.makefile("w", 0))
paulb@40 583
    else:
paulb@40 584
        child.close()
paulb@73 585
        return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0))
paulb@40 586
paulb@97 587
def exit(channel):
paulb@97 588
paulb@97 589
    """
paulb@97 590
    Terminate a created process, closing the given 'channel'.
paulb@97 591
    """
paulb@97 592
paulb@97 593
    channel.close()
paulb@97 594
    os._exit(0)
paulb@97 595
paulb@84 596
def start(callable, *args, **kw):
paulb@40 597
paulb@40 598
    """
paulb@40 599
    Create a new process which shall start running in the given 'callable'.
paulb@94 600
    Additional arguments to the 'callable' can be given as additional arguments
paulb@94 601
    to this function.
paulb@94 602
paulb@94 603
    Return a communications channel to the creating process. For the created
paulb@94 604
    process, supply a channel as the 'channel' parameter in the given 'callable'
paulb@94 605
    so that it may send data back to the creating process.
paulb@40 606
    """
paulb@40 607
paulb@40 608
    channel = create()
paulb@40 609
    if channel.pid == 0:
paulb@40 610
        try:
paulb@40 611
            try:
paulb@84 612
                callable(channel, *args, **kw)
paulb@40 613
            except:
paulb@40 614
                exc_type, exc_value, exc_traceback = sys.exc_info()
paulb@40 615
                channel.send(exc_value)
paulb@40 616
        finally:
paulb@99 617
            exit(channel)
paulb@40 618
    else:
paulb@40 619
        return channel
paulb@40 620
paulb@40 621
def waitall():
paulb@40 622
paulb@40 623
    "Wait for all created processes to terminate."
paulb@40 624
paulb@40 625
    try:
paulb@40 626
        while 1:
paulb@40 627
            os.wait()
paulb@40 628
    except OSError:
paulb@40 629
        pass
paulb@40 630
paulb@89 631
def pmap(callable, sequence, limit=None):
paulb@84 632
paulb@89 633
    """
paulb@89 634
    A parallel version of the built-in map function with an optional process
paulb@94 635
    'limit'. The given 'callable' should not be parallel-aware (that is, have a
paulb@94 636
    'channel' parameter) since it will be wrapped for parallel communications
paulb@94 637
    before being invoked.
paulb@94 638
paulb@94 639
    Return the processed 'sequence' where each element in the sequence is
paulb@94 640
    processed by a different process.
paulb@89 641
    """
paulb@84 642
paulb@89 643
    mymap = Map(limit=limit)
paulb@84 644
    return mymap(callable, sequence)
paulb@84 645
paulb@40 646
# vim: tabstop=4 expandtab shiftwidth=4