pprocess

Annotated pprocess.py

154:83f5e82fe033
2008-09-06 Paul Boddie Added tag rel-0-4 for changeset d973429408ad
paulb@40 1
#!/usr/bin/env python
paulb@40 2
paulb@40 3
"""
paulb@40 4
A simple parallel processing API for Python, inspired somewhat by the thread
paulb@40 5
module, slightly less by pypar, and slightly less still by pypvm.
paulb@40 6
paulb@135 7
Copyright (C) 2005, 2006, 2007, 2008 Paul Boddie <paul@boddie.org.uk>
paulb@41 8
paulb@79 9
This program is free software; you can redistribute it and/or modify it under
paulb@79 10
the terms of the GNU Lesser General Public License as published by the Free
paulb@79 11
Software Foundation; either version 3 of the License, or (at your option) any
paulb@79 12
later version.
paulb@41 13
paulb@79 14
This program is distributed in the hope that it will be useful, but WITHOUT
paulb@79 15
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
paulb@79 16
FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
paulb@79 17
details.
paulb@41 18
paulb@79 19
You should have received a copy of the GNU Lesser General Public License along
paulb@79 20
with this program.  If not, see <http://www.gnu.org/licenses/>.
paulb@40 21
"""
paulb@40 22
paulb@142 23
__version__ = "0.4"
paulb@40 24
paulb@40 25
import os
paulb@40 26
import sys
paulb@40 27
import select
paulb@40 28
import socket
paulb@150 29
import platform
paulb@40 30
paulb@40 31
try:
paulb@40 32
    import cPickle as pickle
paulb@40 33
except ImportError:
paulb@40 34
    import pickle
paulb@40 35
paulb@135 36
try:
paulb@135 37
    set
paulb@135 38
except NameError:
paulb@135 39
    from sets import Set as set
paulb@135 40
paulb@84 41
# Communications.
paulb@84 42
paulb@40 43
class AcknowledgementError(Exception):
paulb@40 44
    pass
paulb@40 45
paulb@40 46
class Channel:
paulb@40 47
paulb@40 48
    "A communications channel."
paulb@40 49
paulb@40 50
    def __init__(self, pid, read_pipe, write_pipe):
paulb@40 51
paulb@40 52
        """
paulb@40 53
        Initialise the channel with a process identifier 'pid', a 'read_pipe'
paulb@40 54
        from which messages will be received, and a 'write_pipe' into which
paulb@40 55
        messages will be sent.
paulb@40 56
        """
paulb@40 57
paulb@40 58
        self.pid = pid
paulb@40 59
        self.read_pipe = read_pipe
paulb@40 60
        self.write_pipe = write_pipe
paulb@40 61
paulb@40 62
    def __del__(self):
paulb@40 63
paulb@40 64
        # Since signals don't work well with I/O, we close pipes and wait for
paulb@40 65
        # created processes upon finalisation.
paulb@40 66
paulb@40 67
        self.close()
paulb@40 68
paulb@40 69
    def close(self):
paulb@40 70
paulb@40 71
        "Explicitly close the channel."
paulb@40 72
paulb@140 73
        if self.read_pipe is not None:
paulb@40 74
            self.read_pipe.close()
paulb@140 75
            self.read_pipe = None
paulb@140 76
        if self.write_pipe is not None:
paulb@40 77
            self.write_pipe.close()
paulb@140 78
            self.write_pipe = None
paulb@140 79
        #self.wait(os.WNOHANG)
paulb@40 80
paulb@40 81
    def wait(self, options=0):
paulb@40 82
paulb@40 83
        "Wait for the created process, if any, to exit."
paulb@40 84
paulb@40 85
        if self.pid != 0:
paulb@40 86
            try:
paulb@40 87
                os.waitpid(self.pid, options)
paulb@40 88
            except OSError:
paulb@40 89
                pass
paulb@40 90
paulb@40 91
    def _send(self, obj):
paulb@40 92
paulb@40 93
        "Send the given object 'obj' through the channel."
paulb@40 94
paulb@40 95
        pickle.dump(obj, self.write_pipe)
paulb@40 96
        self.write_pipe.flush()
paulb@40 97
paulb@40 98
    def send(self, obj):
paulb@40 99
paulb@40 100
        """
paulb@40 101
        Send the given object 'obj' through the channel. Then wait for an
paulb@40 102
        acknowledgement. (The acknowledgement makes the caller wait, thus
paulb@40 103
        preventing processes from exiting and disrupting the communications
paulb@40 104
        channel and losing data.)
paulb@40 105
        """
paulb@40 106
paulb@40 107
        self._send(obj)
paulb@40 108
        if self._receive() != "OK":
paulb@40 109
            raise AcknowledgementError, obj
paulb@40 110
paulb@40 111
    def _receive(self):
paulb@40 112
paulb@40 113
        "Receive an object through the channel, returning the object."
paulb@40 114
paulb@40 115
        obj = pickle.load(self.read_pipe)
paulb@40 116
        if isinstance(obj, Exception):
paulb@40 117
            raise obj
paulb@40 118
        else:
paulb@40 119
            return obj
paulb@40 120
paulb@40 121
    def receive(self):
paulb@40 122
paulb@40 123
        """
paulb@40 124
        Receive an object through the channel, returning the object. Send an
paulb@40 125
        acknowledgement of receipt. (The acknowledgement makes the sender wait,
paulb@40 126
        thus preventing processes from exiting and disrupting the communications
paulb@40 127
        channel and losing data.)
paulb@40 128
        """
paulb@40 129
paulb@40 130
        try:
paulb@40 131
            obj = self._receive()
paulb@40 132
            return obj
paulb@40 133
        finally:
paulb@40 134
            self._send("OK")
paulb@40 135
paulb@140 136
class PersistentChannel(Channel):
paulb@140 137
paulb@140 138
    """
paulb@140 139
    A persistent communications channel which can handle peer disconnection,
paulb@140 140
    acting as a server, meaning that this channel is associated with a specific
paulb@140 141
    address which can be contacted by other processes.
paulb@140 142
    """
paulb@140 143
paulb@140 144
    def __init__(self, pid, endpoint, address):
paulb@140 145
        Channel.__init__(self, pid, None, None)
paulb@140 146
        self.endpoint = endpoint
paulb@140 147
        self.address = address
paulb@140 148
        self.poller = select.poll()
paulb@142 149
paulb@142 150
        # Listen for connections before this process is interested in
paulb@142 151
        # communicating. It is not desirable to wait for connections at this
paulb@142 152
        # point because this will block the process.
paulb@142 153
paulb@142 154
        self.endpoint.listen(1)
paulb@140 155
paulb@140 156
    def close(self):
paulb@142 157
paulb@142 158
        "Close the persistent channel and remove the socket file."
paulb@142 159
paulb@140 160
        Channel.close(self)
paulb@140 161
        try:
paulb@140 162
            os.unlink(self.address)
paulb@140 163
        except OSError:
paulb@140 164
            pass
paulb@140 165
paulb@140 166
    def _ensure_pipes(self):
paulb@140 167
paulb@140 168
        "Ensure that the channel is capable of communicating."
paulb@140 169
paulb@140 170
        if self.read_pipe is None or self.write_pipe is None:
paulb@142 171
paulb@142 172
            # Accept any incoming connections.
paulb@142 173
paulb@140 174
            endpoint, address = self.endpoint.accept()
paulb@140 175
            self.read_pipe = endpoint.makefile("r", 0)
paulb@140 176
            self.write_pipe = endpoint.makefile("w", 0)
paulb@140 177
paulb@140 178
            # Monitor the write pipe for error conditions.
paulb@140 179
paulb@140 180
            fileno = self.write_pipe.fileno()
paulb@140 181
            self.poller.register(fileno, select.POLLOUT | select.POLLHUP | select.POLLNVAL | select.POLLERR)
paulb@140 182
paulb@140 183
    def _reset_pipes(self):
paulb@142 184
paulb@142 185
        "Discard the existing connection."
paulb@142 186
paulb@140 187
        fileno = self.write_pipe.fileno()
paulb@140 188
        self.poller.unregister(fileno)
paulb@140 189
        self.read_pipe = None
paulb@140 190
        self.write_pipe = None
paulb@142 191
        self.endpoint.listen(1)
paulb@140 192
paulb@140 193
    def _ensure_communication(self, timeout=None):
paulb@140 194
paulb@140 195
        "Ensure that sending and receiving are possible."
paulb@140 196
paulb@140 197
        while 1:
paulb@142 198
            self._ensure_pipes()
paulb@140 199
            fileno = self.write_pipe.fileno()
paulb@140 200
            fds = self.poller.poll(timeout)
paulb@140 201
            for fd, status in fds:
paulb@140 202
                if fd != fileno:
paulb@140 203
                    continue
paulb@140 204
                if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR):
paulb@142 205
paulb@142 206
                    # Broken connection: discard it and start all over again.
paulb@142 207
paulb@140 208
                    self._reset_pipes()
paulb@140 209
                    break
paulb@140 210
            else:
paulb@140 211
                return
paulb@140 212
paulb@140 213
    def _send(self, obj):
paulb@140 214
paulb@140 215
        "Send the given object 'obj' through the channel."
paulb@140 216
paulb@140 217
        self._ensure_communication()
paulb@140 218
        Channel._send(self, obj)
paulb@140 219
paulb@140 220
    def _receive(self):
paulb@140 221
paulb@140 222
        "Receive an object through the channel, returning the object."
paulb@140 223
paulb@140 224
        self._ensure_communication()
paulb@140 225
        return Channel._receive(self)
paulb@140 226
paulb@84 227
# Management of processes and communications.
paulb@84 228
paulb@40 229
class Exchange:
paulb@40 230
paulb@40 231
    """
paulb@40 232
    A communications exchange that can be used to detect channels which are
paulb@67 233
    ready to communicate. Subclasses of this class can define the 'store_data'
paulb@67 234
    method in order to enable the 'add_wait', 'wait' and 'finish' methods.
paulb@40 235
    """
paulb@40 236
paulb@116 237
    def __init__(self, channels=None, limit=None, reuse=0, autoclose=1):
paulb@40 238
paulb@40 239
        """
paulb@67 240
        Initialise the exchange with an optional list of 'channels'.
paulb@67 241
paulb@67 242
        If the optional 'limit' is specified, restrictions on the addition of
paulb@67 243
        new channels can be enforced and observed through the 'add_wait', 'wait'
paulb@67 244
        and 'finish' methods. To make use of these methods, create a subclass of
paulb@67 245
        this class and define a working 'store_data' method.
paulb@67 246
paulb@116 247
        If the optional 'reuse' parameter is set to a true value, channels and
paulb@116 248
        processes will be reused for waiting computations.
paulb@116 249
paulb@67 250
        If the optional 'autoclose' parameter is set to a false value, channels
paulb@67 251
        will not be closed automatically when they are removed from the exchange
paulb@67 252
        - by default they are closed when removed.
paulb@40 253
        """
paulb@40 254
paulb@67 255
        self.limit = limit
paulb@116 256
        self.reuse = reuse
paulb@116 257
        self.autoclose = autoclose
paulb@99 258
        self.waiting = []
paulb@40 259
        self.readables = {}
paulb@58 260
        self.removed = []
paulb@40 261
        self.poller = select.poll()
paulb@40 262
        for channel in channels or []:
paulb@40 263
            self.add(channel)
paulb@40 264
paulb@40 265
    def add(self, channel):
paulb@40 266
paulb@40 267
        "Add the given 'channel' to the exchange."
paulb@40 268
paulb@140 269
        fileno = channel.read_pipe.fileno()
paulb@140 270
        self.readables[fileno] = channel
paulb@140 271
        self.poller.register(fileno, select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR)
paulb@40 272
paulb@40 273
    def active(self):
paulb@40 274
paulb@40 275
        "Return a list of active channels."
paulb@40 276
paulb@40 277
        return self.readables.values()
paulb@40 278
paulb@40 279
    def ready(self, timeout=None):
paulb@40 280
paulb@40 281
        """
paulb@40 282
        Wait for a period of time specified by the optional 'timeout' (or until
paulb@40 283
        communication is possible) and return a list of channels which are ready
paulb@40 284
        to be read from.
paulb@40 285
        """
paulb@40 286
paulb@40 287
        fds = self.poller.poll(timeout)
paulb@40 288
        readables = []
paulb@58 289
        self.removed = []
paulb@58 290
paulb@40 291
        for fd, status in fds:
paulb@40 292
            channel = self.readables[fd]
paulb@55 293
            removed = 0
paulb@40 294
paulb@40 295
            # Remove ended/error channels.
paulb@40 296
paulb@40 297
            if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR):
paulb@40 298
                self.remove(channel)
paulb@58 299
                self.removed.append(channel)
paulb@55 300
                removed = 1
paulb@40 301
paulb@40 302
            # Record readable channels.
paulb@40 303
paulb@55 304
            if status & select.POLLIN:
paulb@55 305
                if not (removed and self.autoclose):
paulb@55 306
                    readables.append(channel)
paulb@40 307
paulb@40 308
        return readables
paulb@40 309
paulb@40 310
    def remove(self, channel):
paulb@40 311
paulb@40 312
        """
paulb@40 313
        Remove the given 'channel' from the exchange.
paulb@40 314
        """
paulb@40 315
paulb@142 316
        fileno = channel.read_pipe.fileno()
paulb@142 317
        del self.readables[fileno]
paulb@142 318
        self.poller.unregister(fileno)
paulb@40 319
        if self.autoclose:
paulb@40 320
            channel.close()
paulb@40 321
            channel.wait()
paulb@40 322
paulb@67 323
    # Enhanced exchange methods involving channel limits.
paulb@67 324
paulb@67 325
    def add_wait(self, channel):
paulb@67 326
paulb@67 327
        """
paulb@67 328
        Add the given 'channel' to the exchange, waiting if the limit on active
paulb@67 329
        channels would be exceeded by adding the channel.
paulb@67 330
        """
paulb@67 331
paulb@67 332
        self.wait()
paulb@67 333
        self.add(channel)
paulb@67 334
paulb@67 335
    def wait(self):
paulb@67 336
paulb@67 337
        """
paulb@67 338
        Test for the limit on channels, blocking and reading incoming data until
paulb@67 339
        the number of channels is below the limit.
paulb@67 340
        """
paulb@67 341
paulb@67 342
        # If limited, block until channels have been closed.
paulb@67 343
paulb@67 344
        while self.limit is not None and len(self.active()) >= self.limit:
paulb@67 345
            self.store()
paulb@67 346
paulb@140 347
    def _get_waiting(self, channel):
paulb@99 348
paulb@99 349
        """
paulb@140 350
        Get waiting callable and argument information for new processes, given
paulb@140 351
        the reception of data on the given 'channel'.
paulb@99 352
        """
paulb@99 353
paulb@99 354
        if self.waiting:
paulb@99 355
            callable, args, kw = self.waiting.pop()
paulb@116 356
paulb@116 357
            # Try and reuse existing channels if possible.
paulb@116 358
paulb@116 359
            if self.reuse:
paulb@119 360
paulb@119 361
                # Re-add the channel - this may update information related to
paulb@119 362
                # the channel in subclasses.
paulb@119 363
paulb@119 364
                self.add(channel)
paulb@116 365
                channel.send((args, kw))
paulb@116 366
            else:
paulb@140 367
                return callable, args, kw
paulb@116 368
paulb@116 369
        # Where channels are being reused, but where no processes are waiting
paulb@116 370
        # any more, send a special value to tell them to quit.
paulb@116 371
paulb@116 372
        elif self.reuse:
paulb@116 373
            channel.send(None)
paulb@99 374
paulb@140 375
        return None
paulb@140 376
paulb@67 377
    def finish(self):
paulb@67 378
paulb@67 379
        """
paulb@67 380
        Finish the use of the exchange by waiting for all channels to complete.
paulb@67 381
        """
paulb@67 382
paulb@67 383
        while self.active():
paulb@67 384
            self.store()
paulb@67 385
paulb@67 386
    def store(self):
paulb@67 387
paulb@67 388
        "For each ready channel, process the incoming data."
paulb@67 389
paulb@67 390
        for channel in self.ready():
paulb@67 391
            self.store_data(channel)
paulb@116 392
            self.start_waiting(channel)
paulb@67 393
paulb@67 394
    def store_data(self, channel):
paulb@67 395
paulb@67 396
        """
paulb@67 397
        Store incoming data from the specified 'channel'. In subclasses of this
paulb@67 398
        class, such data could be stored using instance attributes.
paulb@67 399
        """
paulb@67 400
paulb@67 401
        raise NotImplementedError, "store_data"
paulb@67 402
paulb@140 403
    # Support for the convenience methods.
paulb@140 404
paulb@140 405
    def _set_waiting(self, callable, args, kw):
paulb@140 406
paulb@140 407
        """
paulb@140 408
        Support process creation by returning whether the given 'callable' has
paulb@140 409
        been queued for later invocation.
paulb@140 410
        """
paulb@140 411
paulb@140 412
        if self.limit is not None and len(self.active()) >= self.limit:
paulb@140 413
            self.waiting.insert(0, (callable, args, kw))
paulb@140 414
            return 1
paulb@140 415
        else:
paulb@140 416
            return 0
paulb@140 417
paulb@140 418
    def _get_channel_for_process(self, channel):
paulb@140 419
paulb@140 420
        """
paulb@140 421
        Support process creation by returning the given 'channel' to the
paulb@140 422
        creating process, and None to the created process.
paulb@140 423
        """
paulb@140 424
paulb@140 425
        if channel.pid == 0:
paulb@140 426
            return channel
paulb@140 427
        else:
paulb@140 428
            self.add_wait(channel)
paulb@140 429
            return None
paulb@140 430
paulb@140 431
    # Methods for overriding, related to the convenience methods.
paulb@140 432
paulb@140 433
    def start_waiting(self, channel):
paulb@140 434
paulb@140 435
        """
paulb@140 436
        Start a waiting process given the reception of data on the given
paulb@140 437
        'channel'.
paulb@140 438
        """
paulb@140 439
paulb@140 440
        details = self._get_waiting(channel)
paulb@140 441
        if details is not None:
paulb@140 442
            callable, args, kw = details
paulb@140 443
            self.add(start(callable, *args, **kw))
paulb@140 444
paulb@79 445
    # Convenience methods.
paulb@79 446
paulb@84 447
    def start(self, callable, *args, **kw):
paulb@79 448
paulb@79 449
        """
paulb@140 450
        Create a new process for the given 'callable' using any additional
paulb@140 451
        arguments provided. Then, monitor the channel created between this
paulb@140 452
        process and the created process.
paulb@79 453
        """
paulb@79 454
paulb@140 455
        if self._set_waiting(callable, args, kw):
paulb@99 456
            return
paulb@99 457
paulb@84 458
        self.add_wait(start(callable, *args, **kw))
paulb@84 459
paulb@97 460
    def create(self):
paulb@97 461
paulb@97 462
        """
paulb@140 463
        Create a new process and return the created communications channel to
paulb@140 464
        the created process. In the creating process, return None - the channel
paulb@140 465
        receiving data from the created process will be automatically managed by
paulb@140 466
        this exchange.
paulb@97 467
        """
paulb@97 468
paulb@97 469
        channel = create()
paulb@140 470
        return self._get_channel_for_process(channel)
paulb@97 471
paulb@84 472
    def manage(self, callable):
paulb@84 473
paulb@84 474
        """
paulb@84 475
        Wrap the given 'callable' in an object which can then be called in the
paulb@84 476
        same way as 'callable', but with new processes and communications
paulb@84 477
        managed automatically.
paulb@84 478
        """
paulb@84 479
paulb@84 480
        return ManagedCallable(callable, self)
paulb@84 481
paulb@140 482
class Persistent:
paulb@140 483
paulb@140 484
    """
paulb@140 485
    A mix-in class providing methods to exchanges for the management of
paulb@140 486
    persistent communications.
paulb@140 487
    """
paulb@140 488
paulb@140 489
    def start_waiting(self, channel):
paulb@140 490
paulb@140 491
        """
paulb@140 492
        Start a waiting process given the reception of data on the given
paulb@140 493
        'channel'.
paulb@140 494
        """
paulb@140 495
paulb@140 496
        details = self._get_waiting(channel)
paulb@140 497
        if details is not None:
paulb@140 498
            callable, args, kw = details
paulb@140 499
            self.add(start_persistent(channel.address, callable, *args, **kw))
paulb@140 500
paulb@140 501
    def start(self, address, callable, *args, **kw):
paulb@140 502
paulb@140 503
        """
paulb@140 504
        Create a new process, located at the given 'address', for the given
paulb@140 505
        'callable' using any additional arguments provided. Then, monitor the
paulb@140 506
        channel created between this process and the created process.
paulb@140 507
        """
paulb@140 508
paulb@140 509
        if self._set_waiting(callable, args, kw):
paulb@140 510
            return
paulb@140 511
paulb@140 512
        start_persistent(address, callable, *args, **kw)
paulb@140 513
paulb@140 514
    def create(self, address):
paulb@140 515
paulb@140 516
        """
paulb@140 517
        Create a new process, located at the given 'address', and return the
paulb@140 518
        created communications channel to the created process. In the creating
paulb@140 519
        process, return None - the channel receiving data from the created
paulb@140 520
        process will be automatically managed by this exchange.
paulb@140 521
        """
paulb@140 522
paulb@140 523
        channel = create_persistent(address)
paulb@140 524
        return self._get_channel_for_process(channel)
paulb@140 525
paulb@140 526
    def manage(self, address, callable):
paulb@140 527
paulb@140 528
        """
paulb@140 529
        Using the given 'address', publish the given 'callable' in an object
paulb@140 530
        which can then be called in the same way as 'callable', but with new
paulb@140 531
        processes and communications managed automatically.
paulb@140 532
        """
paulb@140 533
paulb@140 534
        return PersistentCallable(address, callable, self)
paulb@140 535
paulb@140 536
    def connect(self, address):
paulb@140 537
paulb@140 538
        "Connect to a process which is contactable via the given 'address'."
paulb@140 539
paulb@140 540
        channel = connect_persistent(address)
paulb@140 541
        self.add_wait(channel)
paulb@140 542
paulb@84 543
class ManagedCallable:
paulb@84 544
paulb@84 545
    "A callable managed by an exchange."
paulb@84 546
paulb@84 547
    def __init__(self, callable, exchange):
paulb@84 548
paulb@84 549
        """
paulb@84 550
        Wrap the given 'callable', using the given 'exchange' to monitor the
paulb@84 551
        channels created for communications between this and the created
paulb@94 552
        processes. Note that the 'callable' must be parallel-aware (that is,
paulb@94 553
        have a 'channel' parameter). Use the MakeParallel class to wrap other
paulb@94 554
        kinds of callable objects.
paulb@84 555
        """
paulb@84 556
paulb@84 557
        self.callable = callable
paulb@84 558
        self.exchange = exchange
paulb@84 559
paulb@84 560
    def __call__(self, *args, **kw):
paulb@84 561
paulb@84 562
        "Invoke the callable with the supplied arguments."
paulb@84 563
paulb@84 564
        self.exchange.start(self.callable, *args, **kw)
paulb@84 565
paulb@140 566
class PersistentCallable:
paulb@140 567
paulb@140 568
    "A callable which sets up a persistent communications channel."
paulb@140 569
paulb@140 570
    def __init__(self, address, callable, exchange):
paulb@140 571
paulb@140 572
        """
paulb@140 573
        Using the given 'address', wrap the given 'callable', using the given
paulb@140 574
        'exchange' to monitor the channels created for communications between
paulb@140 575
        this and the created processes, so that when it is called, a background
paulb@140 576
        process is started within which the 'callable' will run. Note that the
paulb@140 577
        'callable' must be parallel-aware (that is, have a 'channel' parameter).
paulb@140 578
        Use the MakeParallel class to wrap other kinds of callable objects.
paulb@140 579
        """
paulb@140 580
paulb@140 581
        self.callable = callable
paulb@140 582
        self.exchange = exchange
paulb@140 583
        self.address = address
paulb@140 584
paulb@140 585
    def __call__(self, *args, **kw):
paulb@140 586
paulb@142 587
        "Invoke the callable with the supplied arguments."
paulb@140 588
paulb@140 589
        self.exchange.start(self.address, self.callable, *args, **kw)
paulb@140 590
paulb@142 591
class BackgroundCallable:
paulb@142 592
paulb@142 593
    """
paulb@142 594
    A callable which sets up a persistent communications channel, but is
paulb@142 595
    unmanaged by an exchange.
paulb@142 596
    """
paulb@142 597
paulb@142 598
    def __init__(self, address, callable):
paulb@142 599
paulb@142 600
        """
paulb@142 601
        Using the given 'address', wrap the given 'callable'. This object can
paulb@142 602
        then be invoked, but the wrapped callable will be run in a background
paulb@142 603
        process. Note that the 'callable' must be parallel-aware (that is, have
paulb@142 604
        a 'channel' parameter). Use the MakeParallel class to wrap other kinds
paulb@142 605
        of callable objects.
paulb@142 606
        """
paulb@142 607
paulb@142 608
        self.callable = callable
paulb@142 609
        self.address = address
paulb@142 610
paulb@142 611
    def __call__(self, *args, **kw):
paulb@142 612
paulb@142 613
        "Invoke the callable with the supplied arguments."
paulb@142 614
paulb@142 615
        start_persistent(self.address, self.callable, *args, **kw)
paulb@142 616
paulb@84 617
# Abstractions and utilities.
paulb@84 618
paulb@84 619
class Map(Exchange):
paulb@84 620
paulb@84 621
    "An exchange which can be used like the built-in 'map' function."
paulb@84 622
paulb@107 623
    def __init__(self, *args, **kw):
paulb@107 624
        Exchange.__init__(self, *args, **kw)
paulb@107 625
        self.init()
paulb@107 626
paulb@107 627
    def init(self):
paulb@107 628
paulb@107 629
        "Remember the channel addition order to order output."
paulb@107 630
paulb@107 631
        self.channel_number = 0
paulb@107 632
        self.channels = {}
paulb@107 633
        self.results = []
paulb@107 634
paulb@84 635
    def add(self, channel):
paulb@84 636
paulb@84 637
        "Add the given 'channel' to the exchange."
paulb@84 638
paulb@84 639
        Exchange.add(self, channel)
paulb@92 640
        self.channels[channel] = self.channel_number
paulb@92 641
        self.channel_number += 1
paulb@84 642
paulb@107 643
    def start(self, callable, *args, **kw):
paulb@107 644
paulb@107 645
        """
paulb@140 646
        Create a new process for the given 'callable' using any additional
paulb@140 647
        arguments provided. Then, monitor the channel created between this
paulb@140 648
        process and the created process.
paulb@107 649
        """
paulb@107 650
paulb@107 651
        self.results.append(None) # placeholder
paulb@107 652
        Exchange.start(self, callable, *args, **kw)
paulb@107 653
paulb@110 654
    def create(self):
paulb@110 655
paulb@110 656
        """
paulb@140 657
        Create a new process and return the created communications channel to
paulb@140 658
        the created process. In the creating process, return None - the channel
paulb@140 659
        receiving data from the created process will be automatically managed by
paulb@140 660
        this exchange.
paulb@110 661
        """
paulb@110 662
paulb@110 663
        self.results.append(None) # placeholder
paulb@110 664
        return Exchange.create(self)
paulb@110 665
paulb@84 666
    def __call__(self, callable, sequence):
paulb@84 667
paulb@89 668
        "Wrap and invoke 'callable' for each element in the 'sequence'."
paulb@89 669
paulb@92 670
        if not isinstance(callable, MakeParallel):
paulb@92 671
            wrapped = MakeParallel(callable)
paulb@92 672
        else:
paulb@92 673
            wrapped = callable
paulb@84 674
paulb@107 675
        self.init()
paulb@84 676
paulb@107 677
        # Start processes for each element in the sequence.
paulb@84 678
paulb@84 679
        for i in sequence:
paulb@92 680
            self.start(wrapped, i)
paulb@107 681
paulb@107 682
        # Access to the results occurs through this object.
paulb@107 683
paulb@107 684
        return self
paulb@84 685
paulb@107 686
    def __getitem__(self, i):
paulb@107 687
        self.finish()
paulb@107 688
        return self.results[i]
paulb@84 689
paulb@107 690
    def __iter__(self):
paulb@107 691
        self.finish()
paulb@107 692
        return iter(self.results)
paulb@84 693
paulb@84 694
    def store_data(self, channel):
paulb@84 695
paulb@84 696
        "Accumulate the incoming data, associating results with channels."
paulb@84 697
paulb@84 698
        data = channel.receive()
paulb@92 699
        self.results[self.channels[channel]] = data
paulb@92 700
        del self.channels[channel]
paulb@84 701
paulb@97 702
class Queue(Exchange):
paulb@97 703
paulb@97 704
    """
paulb@97 705
    An exchange acting as a queue, making data from created processes available
paulb@97 706
    in the order in which it is received.
paulb@97 707
    """
paulb@97 708
paulb@97 709
    def __init__(self, *args, **kw):
paulb@97 710
        Exchange.__init__(self, *args, **kw)
paulb@97 711
        self.queue = []
paulb@97 712
paulb@97 713
    def store_data(self, channel):
paulb@97 714
paulb@97 715
        "Accumulate the incoming data, associating results with channels."
paulb@97 716
paulb@97 717
        data = channel.receive()
paulb@97 718
        self.queue.insert(0, data)
paulb@97 719
paulb@97 720
    def __iter__(self):
paulb@97 721
        return self
paulb@97 722
paulb@97 723
    def next(self):
paulb@97 724
paulb@97 725
        "Return the next element in the queue."
paulb@97 726
paulb@97 727
        if self.queue:
paulb@97 728
            return self.queue.pop()
paulb@97 729
        while self.active():
paulb@97 730
            self.store()
paulb@97 731
            if self.queue:
paulb@97 732
                return self.queue.pop()
paulb@97 733
        else:
paulb@97 734
            raise StopIteration
paulb@97 735
paulb@84 736
class MakeParallel:
paulb@84 737
paulb@84 738
    "A wrapper around functions making them able to communicate results."
paulb@84 739
paulb@84 740
    def __init__(self, callable):
paulb@84 741
paulb@94 742
        """
paulb@94 743
        Initialise the wrapper with the given 'callable'. This object will then
paulb@94 744
        be able to accept a 'channel' parameter when invoked, and to forward the
paulb@94 745
        result of the given 'callable' via the channel provided back to the
paulb@94 746
        invoking process.
paulb@94 747
        """
paulb@84 748
paulb@84 749
        self.callable = callable
paulb@84 750
paulb@84 751
    def __call__(self, channel, *args, **kw):
paulb@84 752
paulb@84 753
        "Invoke the callable and return its result via the given 'channel'."
paulb@84 754
paulb@84 755
        channel.send(self.callable(*args, **kw))
paulb@84 756
paulb@119 757
class MakeReusable(MakeParallel):
paulb@119 758
paulb@119 759
    """
paulb@119 760
    A wrapper around functions making them able to communicate results in a
paulb@119 761
    reusable fashion.
paulb@119 762
    """
paulb@119 763
paulb@119 764
    def __call__(self, channel, *args, **kw):
paulb@119 765
paulb@119 766
        "Invoke the callable and return its result via the given 'channel'."
paulb@119 767
paulb@119 768
        channel.send(self.callable(*args, **kw))
paulb@119 769
        t = channel.receive()
paulb@119 770
        while t is not None:
paulb@119 771
            args, kw = t
paulb@119 772
            channel.send(self.callable(*args, **kw))
paulb@119 773
            t = channel.receive()
paulb@119 774
paulb@140 775
# Persistent variants.
paulb@140 776
paulb@140 777
class PersistentExchange(Persistent, Exchange):
paulb@140 778
paulb@140 779
    "An exchange which manages persistent communications."
paulb@140 780
paulb@140 781
    pass
paulb@140 782
paulb@140 783
class PersistentQueue(Persistent, Queue):
paulb@140 784
paulb@140 785
    "A queue which manages persistent communications."
paulb@140 786
paulb@140 787
    pass
paulb@140 788
paulb@142 789
# Convenience functions.
paulb@142 790
paulb@142 791
def BackgroundQueue(address):
paulb@142 792
paulb@142 793
    """
paulb@142 794
    Connect to a process reachable via the given 'address', making the results
paulb@142 795
    of which accessible via a queue.
paulb@142 796
    """
paulb@142 797
paulb@142 798
    queue = PersistentQueue(limit=1)
paulb@142 799
    queue.connect(address)
paulb@142 800
    return queue
paulb@142 801
paulb@142 802
def pmap(callable, sequence, limit=None):
paulb@142 803
paulb@142 804
    """
paulb@142 805
    A parallel version of the built-in map function with an optional process
paulb@142 806
    'limit'. The given 'callable' should not be parallel-aware (that is, have a
paulb@142 807
    'channel' parameter) since it will be wrapped for parallel communications
paulb@142 808
    before being invoked.
paulb@142 809
paulb@142 810
    Return the processed 'sequence' where each element in the sequence is
paulb@142 811
    processed by a different process.
paulb@142 812
    """
paulb@142 813
paulb@142 814
    mymap = Map(limit=limit)
paulb@142 815
    return mymap(callable, sequence)
paulb@142 816
paulb@84 817
# Utility functions.
paulb@79 818
paulb@135 819
_cpuinfo_fields = "physical id", "core id"
paulb@135 820
paul@152 821
def _get_number_of_cores():
paulb@135 822
paulb@135 823
    """
paulb@135 824
    Return the number of distinct, genuine processor cores. If the platform is
paulb@135 825
    not supported by this function, None is returned.
paulb@135 826
    """
paulb@135 827
paulb@135 828
    try:
paulb@135 829
        f = open("/proc/cpuinfo")
paulb@135 830
        try:
paulb@135 831
            processors = set()
paulb@135 832
            processor = [None, None]
paulb@135 833
paulb@135 834
            for line in f.xreadlines():
paulb@135 835
                for i, field in enumerate(_cpuinfo_fields):
paulb@135 836
                    if line.startswith(field):
paulb@135 837
                        t = line.split(":")
paulb@135 838
                        processor[i] = int(t[1].strip())
paulb@135 839
                        break
paulb@135 840
                else:
paulb@135 841
                    if line.startswith("processor") and processor[0] is not None:
paulb@135 842
                        processors.add(tuple(processor))
paulb@135 843
                        processor = [None, None]
paulb@135 844
paulb@135 845
            if processor[0] is not None:
paulb@135 846
                processors.add(tuple(processor))
paulb@135 847
paulb@135 848
            return len(processors)
paulb@135 849
paulb@135 850
        finally:
paulb@135 851
            f.close()
paulb@135 852
paulb@135 853
    except OSError:
paulb@135 854
        return None
paulb@135 855
paul@152 856
def _get_number_of_cores_solaris():
paul@152 857
paul@152 858
    """
paul@152 859
    Return the number of cores for OpenSolaris 2008.05 and possibly other
paul@152 860
    editions of Solaris.
paul@152 861
    """
paul@152 862
paul@152 863
    f = os.popen("psrinfo -p")
paul@152 864
    try:
paul@152 865
        return int(f.read().strip())
paul@152 866
    finally:
paul@152 867
        f.close()
paul@152 868
paulb@142 869
# Low-level functions.
paulb@142 870
paulb@150 871
def create_socketpair():
paulb@40 872
paulb@40 873
    """
paulb@40 874
    Create a new process, returning a communications channel to both the
paulb@40 875
    creating process and the created process.
paulb@40 876
    """
paulb@40 877
paulb@40 878
    parent, child = socket.socketpair()
paulb@40 879
    for s in [parent, child]:
paulb@40 880
        s.setblocking(1)
paulb@40 881
paulb@40 882
    pid = os.fork()
paulb@40 883
    if pid == 0:
paulb@40 884
        parent.close()
paulb@73 885
        return Channel(pid, child.makefile("r", 0), child.makefile("w", 0))
paulb@40 886
    else:
paulb@40 887
        child.close()
paulb@73 888
        return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0))
paulb@40 889
paulb@150 890
def create_pipes():
paulb@150 891
paulb@150 892
    """
paulb@150 893
    Create a new process, returning a communications channel to both the
paulb@150 894
    creating process and the created process.
paulb@150 895
paulb@150 896
    This function uses pipes instead of a socket pair, since some platforms
paulb@150 897
    seem to have problems with poll and such socket pairs.
paulb@150 898
    """
paulb@150 899
paulb@150 900
    pr, cw = os.pipe()
paulb@150 901
    cr, pw = os.pipe()
paulb@150 902
paulb@150 903
    pid = os.fork()
paulb@150 904
    if pid == 0:
paulb@151 905
        os.close(pr)
paulb@151 906
        os.close(pw)
paulb@150 907
        return Channel(pid, os.fdopen(cr, "r", 0), os.fdopen(cw, "w", 0))
paulb@150 908
    else:
paulb@151 909
        os.close(cr)
paulb@151 910
        os.close(cw)
paulb@150 911
        return Channel(pid, os.fdopen(pr, "r", 0), os.fdopen(pw, "w", 0))
paulb@150 912
paulb@150 913
if platform.system() == "SunOS":
paulb@150 914
    create = create_pipes
paul@152 915
    get_number_of_cores = _get_number_of_cores_solaris
paulb@150 916
else:
paulb@150 917
    create = create_socketpair
paul@152 918
    get_number_of_cores = _get_number_of_cores
paulb@150 919
paulb@140 920
def create_persistent(address):
paulb@140 921
paulb@140 922
    """
paulb@140 923
    Create a new process, returning a persistent communications channel between
paulb@140 924
    the creating process and the created process. This channel can be
paulb@140 925
    disconnected from the creating process and connected to another process, and
paulb@140 926
    thus can be used to collect results from daemon processes.
paulb@140 927
paulb@140 928
    In order to be able to reconnect to created processes, the 'address' of the
paulb@140 929
    communications endpoint for the created process needs to be provided. This
paulb@140 930
    should be a filename.
paulb@140 931
    """
paulb@140 932
paulb@140 933
    parent = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
paulb@140 934
    child = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
paulb@140 935
    child.bind(address)
paulb@140 936
paulb@140 937
    for s in [parent, child]:
paulb@140 938
        s.setblocking(1)
paulb@140 939
paulb@140 940
    pid = os.fork()
paulb@140 941
    if pid == 0:
paulb@140 942
        parent.close()
paulb@140 943
        return PersistentChannel(pid, child, address)
paulb@140 944
    else:
paulb@140 945
        child.close()
paulb@140 946
        #parent.connect(address)
paulb@140 947
        return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0))
paulb@140 948
paulb@140 949
def connect_persistent(address):
paulb@140 950
paulb@140 951
    """
paulb@140 952
    Connect via a persistent channel to an existing created process, reachable
paulb@140 953
    at the given 'address'.
paulb@140 954
    """
paulb@140 955
paulb@140 956
    parent = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
paulb@140 957
    parent.setblocking(1)
paulb@140 958
    parent.connect(address)
paulb@140 959
    return Channel(0, parent.makefile("r", 0), parent.makefile("w", 0))
paulb@140 960
paulb@97 961
def exit(channel):
paulb@97 962
paulb@97 963
    """
paulb@97 964
    Terminate a created process, closing the given 'channel'.
paulb@97 965
    """
paulb@97 966
paulb@97 967
    channel.close()
paulb@97 968
    os._exit(0)
paulb@97 969
paulb@84 970
def start(callable, *args, **kw):
paulb@40 971
paulb@40 972
    """
paulb@40 973
    Create a new process which shall start running in the given 'callable'.
paulb@94 974
    Additional arguments to the 'callable' can be given as additional arguments
paulb@94 975
    to this function.
paulb@94 976
paulb@94 977
    Return a communications channel to the creating process. For the created
paulb@94 978
    process, supply a channel as the 'channel' parameter in the given 'callable'
paulb@94 979
    so that it may send data back to the creating process.
paulb@40 980
    """
paulb@40 981
paulb@40 982
    channel = create()
paulb@40 983
    if channel.pid == 0:
paulb@40 984
        try:
paulb@40 985
            try:
paulb@84 986
                callable(channel, *args, **kw)
paulb@40 987
            except:
paulb@40 988
                exc_type, exc_value, exc_traceback = sys.exc_info()
paulb@40 989
                channel.send(exc_value)
paulb@40 990
        finally:
paulb@99 991
            exit(channel)
paulb@40 992
    else:
paulb@40 993
        return channel
paulb@40 994
paulb@140 995
def start_persistent(address, callable, *args, **kw):
paulb@140 996
paulb@140 997
    """
paulb@140 998
    Create a new process which shall be reachable using the given 'address' and
paulb@140 999
    which will start running in the given 'callable'. Additional arguments to
paulb@140 1000
    the 'callable' can be given as additional arguments to this function.
paulb@140 1001
paulb@140 1002
    Return a communications channel to the creating process. For the created
paulb@140 1003
    process, supply a channel as the 'channel' parameter in the given 'callable'
paulb@140 1004
    so that it may send data back to the creating process.
paulb@140 1005
paulb@140 1006
    Note that the created process employs a channel which is persistent: it can
paulb@140 1007
    withstand disconnection from the creating process and subsequent connections
paulb@140 1008
    from other processes.
paulb@140 1009
    """
paulb@140 1010
paulb@140 1011
    channel = create_persistent(address)
paulb@140 1012
    if channel.pid == 0:
paulb@140 1013
        close_streams()
paulb@140 1014
        try:
paulb@140 1015
            try:
paulb@140 1016
                callable(channel, *args, **kw)
paulb@140 1017
            except:
paulb@140 1018
                exc_type, exc_value, exc_traceback = sys.exc_info()
paulb@140 1019
                channel.send(exc_value)
paulb@140 1020
        finally:
paulb@140 1021
            exit(channel)
paulb@140 1022
    else:
paulb@140 1023
        return channel
paulb@140 1024
paulb@140 1025
def close_streams():
paulb@140 1026
paulb@140 1027
    """
paulb@140 1028
    Close streams which keep the current process attached to any creating
paulb@140 1029
    processes.
paulb@140 1030
    """
paulb@140 1031
paulb@140 1032
    os.close(sys.stdin.fileno())
paulb@140 1033
    os.close(sys.stdout.fileno())
paulb@140 1034
    os.close(sys.stderr.fileno())
paulb@140 1035
paulb@40 1036
def waitall():
paulb@40 1037
paulb@40 1038
    "Wait for all created processes to terminate."
paulb@40 1039
paulb@40 1040
    try:
paulb@40 1041
        while 1:
paulb@40 1042
            os.wait()
paulb@40 1043
    except OSError:
paulb@40 1044
        pass
paulb@40 1045
paulb@40 1046
# vim: tabstop=4 expandtab shiftwidth=4