pprocess

Annotated pprocess.py

149:eeaa043dbfb1
2008-06-04 paulb [project @ 2008-06-04 22:13:31 by paulb] Added a link to the reference document.
paulb@40 1
#!/usr/bin/env python
paulb@40 2
paulb@40 3
"""
paulb@40 4
A simple parallel processing API for Python, inspired somewhat by the thread
paulb@40 5
module, slightly less by pypar, and slightly less still by pypvm.
paulb@40 6
paulb@135 7
Copyright (C) 2005, 2006, 2007, 2008 Paul Boddie <paul@boddie.org.uk>
paulb@41 8
paulb@79 9
This program is free software; you can redistribute it and/or modify it under
paulb@79 10
the terms of the GNU Lesser General Public License as published by the Free
paulb@79 11
Software Foundation; either version 3 of the License, or (at your option) any
paulb@79 12
later version.
paulb@41 13
paulb@79 14
This program is distributed in the hope that it will be useful, but WITHOUT
paulb@79 15
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
paulb@79 16
FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
paulb@79 17
details.
paulb@41 18
paulb@79 19
You should have received a copy of the GNU Lesser General Public License along
paulb@79 20
with this program.  If not, see <http://www.gnu.org/licenses/>.
paulb@40 21
"""
paulb@40 22
paulb@142 23
__version__ = "0.4"
paulb@40 24
paulb@40 25
import os
paulb@40 26
import sys
paulb@40 27
import select
paulb@40 28
import socket
paulb@40 29
paulb@40 30
try:
paulb@40 31
    import cPickle as pickle
paulb@40 32
except ImportError:
paulb@40 33
    import pickle
paulb@40 34
paulb@135 35
try:
paulb@135 36
    set
paulb@135 37
except NameError:
paulb@135 38
    from sets import Set as set
paulb@135 39
paulb@84 40
# Communications.
paulb@84 41
paulb@40 42
class AcknowledgementError(Exception):
paulb@40 43
    pass
paulb@40 44
paulb@40 45
class Channel:
paulb@40 46
paulb@40 47
    "A communications channel."
paulb@40 48
paulb@40 49
    def __init__(self, pid, read_pipe, write_pipe):
paulb@40 50
paulb@40 51
        """
paulb@40 52
        Initialise the channel with a process identifier 'pid', a 'read_pipe'
paulb@40 53
        from which messages will be received, and a 'write_pipe' into which
paulb@40 54
        messages will be sent.
paulb@40 55
        """
paulb@40 56
paulb@40 57
        self.pid = pid
paulb@40 58
        self.read_pipe = read_pipe
paulb@40 59
        self.write_pipe = write_pipe
paulb@40 60
paulb@40 61
    def __del__(self):
paulb@40 62
paulb@40 63
        # Since signals don't work well with I/O, we close pipes and wait for
paulb@40 64
        # created processes upon finalisation.
paulb@40 65
paulb@40 66
        self.close()
paulb@40 67
paulb@40 68
    def close(self):
paulb@40 69
paulb@40 70
        "Explicitly close the channel."
paulb@40 71
paulb@140 72
        if self.read_pipe is not None:
paulb@40 73
            self.read_pipe.close()
paulb@140 74
            self.read_pipe = None
paulb@140 75
        if self.write_pipe is not None:
paulb@40 76
            self.write_pipe.close()
paulb@140 77
            self.write_pipe = None
paulb@140 78
        #self.wait(os.WNOHANG)
paulb@40 79
paulb@40 80
    def wait(self, options=0):
paulb@40 81
paulb@40 82
        "Wait for the created process, if any, to exit."
paulb@40 83
paulb@40 84
        if self.pid != 0:
paulb@40 85
            try:
paulb@40 86
                os.waitpid(self.pid, options)
paulb@40 87
            except OSError:
paulb@40 88
                pass
paulb@40 89
paulb@40 90
    def _send(self, obj):
paulb@40 91
paulb@40 92
        "Send the given object 'obj' through the channel."
paulb@40 93
paulb@40 94
        pickle.dump(obj, self.write_pipe)
paulb@40 95
        self.write_pipe.flush()
paulb@40 96
paulb@40 97
    def send(self, obj):
paulb@40 98
paulb@40 99
        """
paulb@40 100
        Send the given object 'obj' through the channel. Then wait for an
paulb@40 101
        acknowledgement. (The acknowledgement makes the caller wait, thus
paulb@40 102
        preventing processes from exiting and disrupting the communications
paulb@40 103
        channel and losing data.)
paulb@40 104
        """
paulb@40 105
paulb@40 106
        self._send(obj)
paulb@40 107
        if self._receive() != "OK":
paulb@40 108
            raise AcknowledgementError, obj
paulb@40 109
paulb@40 110
    def _receive(self):
paulb@40 111
paulb@40 112
        "Receive an object through the channel, returning the object."
paulb@40 113
paulb@40 114
        obj = pickle.load(self.read_pipe)
paulb@40 115
        if isinstance(obj, Exception):
paulb@40 116
            raise obj
paulb@40 117
        else:
paulb@40 118
            return obj
paulb@40 119
paulb@40 120
    def receive(self):
paulb@40 121
paulb@40 122
        """
paulb@40 123
        Receive an object through the channel, returning the object. Send an
paulb@40 124
        acknowledgement of receipt. (The acknowledgement makes the sender wait,
paulb@40 125
        thus preventing processes from exiting and disrupting the communications
paulb@40 126
        channel and losing data.)
paulb@40 127
        """
paulb@40 128
paulb@40 129
        try:
paulb@40 130
            obj = self._receive()
paulb@40 131
            return obj
paulb@40 132
        finally:
paulb@40 133
            self._send("OK")
paulb@40 134
paulb@140 135
class PersistentChannel(Channel):
paulb@140 136
paulb@140 137
    """
paulb@140 138
    A persistent communications channel which can handle peer disconnection,
paulb@140 139
    acting as a server, meaning that this channel is associated with a specific
paulb@140 140
    address which can be contacted by other processes.
paulb@140 141
    """
paulb@140 142
paulb@140 143
    def __init__(self, pid, endpoint, address):
paulb@140 144
        Channel.__init__(self, pid, None, None)
paulb@140 145
        self.endpoint = endpoint
paulb@140 146
        self.address = address
paulb@140 147
        self.poller = select.poll()
paulb@142 148
paulb@142 149
        # Listen for connections before this process is interested in
paulb@142 150
        # communicating. It is not desirable to wait for connections at this
paulb@142 151
        # point because this will block the process.
paulb@142 152
paulb@142 153
        self.endpoint.listen(1)
paulb@140 154
paulb@140 155
    def close(self):
paulb@142 156
paulb@142 157
        "Close the persistent channel and remove the socket file."
paulb@142 158
paulb@140 159
        Channel.close(self)
paulb@140 160
        try:
paulb@140 161
            os.unlink(self.address)
paulb@140 162
        except OSError:
paulb@140 163
            pass
paulb@140 164
paulb@140 165
    def _ensure_pipes(self):
paulb@140 166
paulb@140 167
        "Ensure that the channel is capable of communicating."
paulb@140 168
paulb@140 169
        if self.read_pipe is None or self.write_pipe is None:
paulb@142 170
paulb@142 171
            # Accept any incoming connections.
paulb@142 172
paulb@140 173
            endpoint, address = self.endpoint.accept()
paulb@140 174
            self.read_pipe = endpoint.makefile("r", 0)
paulb@140 175
            self.write_pipe = endpoint.makefile("w", 0)
paulb@140 176
paulb@140 177
            # Monitor the write pipe for error conditions.
paulb@140 178
paulb@140 179
            fileno = self.write_pipe.fileno()
paulb@140 180
            self.poller.register(fileno, select.POLLOUT | select.POLLHUP | select.POLLNVAL | select.POLLERR)
paulb@140 181
paulb@140 182
    def _reset_pipes(self):
paulb@142 183
paulb@142 184
        "Discard the existing connection."
paulb@142 185
paulb@140 186
        fileno = self.write_pipe.fileno()
paulb@140 187
        self.poller.unregister(fileno)
paulb@140 188
        self.read_pipe = None
paulb@140 189
        self.write_pipe = None
paulb@142 190
        self.endpoint.listen(1)
paulb@140 191
paulb@140 192
    def _ensure_communication(self, timeout=None):
paulb@140 193
paulb@140 194
        "Ensure that sending and receiving are possible."
paulb@140 195
paulb@140 196
        while 1:
paulb@142 197
            self._ensure_pipes()
paulb@140 198
            fileno = self.write_pipe.fileno()
paulb@140 199
            fds = self.poller.poll(timeout)
paulb@140 200
            for fd, status in fds:
paulb@140 201
                if fd != fileno:
paulb@140 202
                    continue
paulb@140 203
                if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR):
paulb@142 204
paulb@142 205
                    # Broken connection: discard it and start all over again.
paulb@142 206
paulb@140 207
                    self._reset_pipes()
paulb@140 208
                    break
paulb@140 209
            else:
paulb@140 210
                return
paulb@140 211
paulb@140 212
    def _send(self, obj):
paulb@140 213
paulb@140 214
        "Send the given object 'obj' through the channel."
paulb@140 215
paulb@140 216
        self._ensure_communication()
paulb@140 217
        Channel._send(self, obj)
paulb@140 218
paulb@140 219
    def _receive(self):
paulb@140 220
paulb@140 221
        "Receive an object through the channel, returning the object."
paulb@140 222
paulb@140 223
        self._ensure_communication()
paulb@140 224
        return Channel._receive(self)
paulb@140 225
paulb@84 226
# Management of processes and communications.
paulb@84 227
paulb@40 228
class Exchange:
paulb@40 229
paulb@40 230
    """
paulb@40 231
    A communications exchange that can be used to detect channels which are
paulb@67 232
    ready to communicate. Subclasses of this class can define the 'store_data'
paulb@67 233
    method in order to enable the 'add_wait', 'wait' and 'finish' methods.
paulb@40 234
    """
paulb@40 235
paulb@116 236
    def __init__(self, channels=None, limit=None, reuse=0, autoclose=1):
paulb@40 237
paulb@40 238
        """
paulb@67 239
        Initialise the exchange with an optional list of 'channels'.
paulb@67 240
paulb@67 241
        If the optional 'limit' is specified, restrictions on the addition of
paulb@67 242
        new channels can be enforced and observed through the 'add_wait', 'wait'
paulb@67 243
        and 'finish' methods. To make use of these methods, create a subclass of
paulb@67 244
        this class and define a working 'store_data' method.
paulb@67 245
paulb@116 246
        If the optional 'reuse' parameter is set to a true value, channels and
paulb@116 247
        processes will be reused for waiting computations.
paulb@116 248
paulb@67 249
        If the optional 'autoclose' parameter is set to a false value, channels
paulb@67 250
        will not be closed automatically when they are removed from the exchange
paulb@67 251
        - by default they are closed when removed.
paulb@40 252
        """
paulb@40 253
paulb@67 254
        self.limit = limit
paulb@116 255
        self.reuse = reuse
paulb@116 256
        self.autoclose = autoclose
paulb@99 257
        self.waiting = []
paulb@40 258
        self.readables = {}
paulb@58 259
        self.removed = []
paulb@40 260
        self.poller = select.poll()
paulb@40 261
        for channel in channels or []:
paulb@40 262
            self.add(channel)
paulb@40 263
paulb@40 264
    def add(self, channel):
paulb@40 265
paulb@40 266
        "Add the given 'channel' to the exchange."
paulb@40 267
paulb@140 268
        fileno = channel.read_pipe.fileno()
paulb@140 269
        self.readables[fileno] = channel
paulb@140 270
        self.poller.register(fileno, select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR)
paulb@40 271
paulb@40 272
    def active(self):
paulb@40 273
paulb@40 274
        "Return a list of active channels."
paulb@40 275
paulb@40 276
        return self.readables.values()
paulb@40 277
paulb@40 278
    def ready(self, timeout=None):
paulb@40 279
paulb@40 280
        """
paulb@40 281
        Wait for a period of time specified by the optional 'timeout' (or until
paulb@40 282
        communication is possible) and return a list of channels which are ready
paulb@40 283
        to be read from.
paulb@40 284
        """
paulb@40 285
paulb@40 286
        fds = self.poller.poll(timeout)
paulb@40 287
        readables = []
paulb@58 288
        self.removed = []
paulb@58 289
paulb@40 290
        for fd, status in fds:
paulb@40 291
            channel = self.readables[fd]
paulb@55 292
            removed = 0
paulb@40 293
paulb@40 294
            # Remove ended/error channels.
paulb@40 295
paulb@40 296
            if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR):
paulb@40 297
                self.remove(channel)
paulb@58 298
                self.removed.append(channel)
paulb@55 299
                removed = 1
paulb@40 300
paulb@40 301
            # Record readable channels.
paulb@40 302
paulb@55 303
            if status & select.POLLIN:
paulb@55 304
                if not (removed and self.autoclose):
paulb@55 305
                    readables.append(channel)
paulb@40 306
paulb@40 307
        return readables
paulb@40 308
paulb@40 309
    def remove(self, channel):
paulb@40 310
paulb@40 311
        """
paulb@40 312
        Remove the given 'channel' from the exchange.
paulb@40 313
        """
paulb@40 314
paulb@142 315
        fileno = channel.read_pipe.fileno()
paulb@142 316
        del self.readables[fileno]
paulb@142 317
        self.poller.unregister(fileno)
paulb@40 318
        if self.autoclose:
paulb@40 319
            channel.close()
paulb@40 320
            channel.wait()
paulb@40 321
paulb@67 322
    # Enhanced exchange methods involving channel limits.
paulb@67 323
paulb@67 324
    def add_wait(self, channel):
paulb@67 325
paulb@67 326
        """
paulb@67 327
        Add the given 'channel' to the exchange, waiting if the limit on active
paulb@67 328
        channels would be exceeded by adding the channel.
paulb@67 329
        """
paulb@67 330
paulb@67 331
        self.wait()
paulb@67 332
        self.add(channel)
paulb@67 333
paulb@67 334
    def wait(self):
paulb@67 335
paulb@67 336
        """
paulb@67 337
        Test for the limit on channels, blocking and reading incoming data until
paulb@67 338
        the number of channels is below the limit.
paulb@67 339
        """
paulb@67 340
paulb@67 341
        # If limited, block until channels have been closed.
paulb@67 342
paulb@67 343
        while self.limit is not None and len(self.active()) >= self.limit:
paulb@67 344
            self.store()
paulb@67 345
paulb@140 346
    def _get_waiting(self, channel):
paulb@99 347
paulb@99 348
        """
paulb@140 349
        Get waiting callable and argument information for new processes, given
paulb@140 350
        the reception of data on the given 'channel'.
paulb@99 351
        """
paulb@99 352
paulb@99 353
        if self.waiting:
paulb@99 354
            callable, args, kw = self.waiting.pop()
paulb@116 355
paulb@116 356
            # Try and reuse existing channels if possible.
paulb@116 357
paulb@116 358
            if self.reuse:
paulb@119 359
paulb@119 360
                # Re-add the channel - this may update information related to
paulb@119 361
                # the channel in subclasses.
paulb@119 362
paulb@119 363
                self.add(channel)
paulb@116 364
                channel.send((args, kw))
paulb@116 365
            else:
paulb@140 366
                return callable, args, kw
paulb@116 367
paulb@116 368
        # Where channels are being reused, but where no processes are waiting
paulb@116 369
        # any more, send a special value to tell them to quit.
paulb@116 370
paulb@116 371
        elif self.reuse:
paulb@116 372
            channel.send(None)
paulb@99 373
paulb@140 374
        return None
paulb@140 375
paulb@67 376
    def finish(self):
paulb@67 377
paulb@67 378
        """
paulb@67 379
        Finish the use of the exchange by waiting for all channels to complete.
paulb@67 380
        """
paulb@67 381
paulb@67 382
        while self.active():
paulb@67 383
            self.store()
paulb@67 384
paulb@67 385
    def store(self):
paulb@67 386
paulb@67 387
        "For each ready channel, process the incoming data."
paulb@67 388
paulb@67 389
        for channel in self.ready():
paulb@67 390
            self.store_data(channel)
paulb@116 391
            self.start_waiting(channel)
paulb@67 392
paulb@67 393
    def store_data(self, channel):
paulb@67 394
paulb@67 395
        """
paulb@67 396
        Store incoming data from the specified 'channel'. In subclasses of this
paulb@67 397
        class, such data could be stored using instance attributes.
paulb@67 398
        """
paulb@67 399
paulb@67 400
        raise NotImplementedError, "store_data"
paulb@67 401
paulb@140 402
    # Support for the convenience methods.
paulb@140 403
paulb@140 404
    def _set_waiting(self, callable, args, kw):
paulb@140 405
paulb@140 406
        """
paulb@140 407
        Support process creation by returning whether the given 'callable' has
paulb@140 408
        been queued for later invocation.
paulb@140 409
        """
paulb@140 410
paulb@140 411
        if self.limit is not None and len(self.active()) >= self.limit:
paulb@140 412
            self.waiting.insert(0, (callable, args, kw))
paulb@140 413
            return 1
paulb@140 414
        else:
paulb@140 415
            return 0
paulb@140 416
paulb@140 417
    def _get_channel_for_process(self, channel):
paulb@140 418
paulb@140 419
        """
paulb@140 420
        Support process creation by returning the given 'channel' to the
paulb@140 421
        creating process, and None to the created process.
paulb@140 422
        """
paulb@140 423
paulb@140 424
        if channel.pid == 0:
paulb@140 425
            return channel
paulb@140 426
        else:
paulb@140 427
            self.add_wait(channel)
paulb@140 428
            return None
paulb@140 429
paulb@140 430
    # Methods for overriding, related to the convenience methods.
paulb@140 431
paulb@140 432
    def start_waiting(self, channel):
paulb@140 433
paulb@140 434
        """
paulb@140 435
        Start a waiting process given the reception of data on the given
paulb@140 436
        'channel'.
paulb@140 437
        """
paulb@140 438
paulb@140 439
        details = self._get_waiting(channel)
paulb@140 440
        if details is not None:
paulb@140 441
            callable, args, kw = details
paulb@140 442
            self.add(start(callable, *args, **kw))
paulb@140 443
paulb@79 444
    # Convenience methods.
paulb@79 445
paulb@84 446
    def start(self, callable, *args, **kw):
paulb@79 447
paulb@79 448
        """
paulb@140 449
        Create a new process for the given 'callable' using any additional
paulb@140 450
        arguments provided. Then, monitor the channel created between this
paulb@140 451
        process and the created process.
paulb@79 452
        """
paulb@79 453
paulb@140 454
        if self._set_waiting(callable, args, kw):
paulb@99 455
            return
paulb@99 456
paulb@84 457
        self.add_wait(start(callable, *args, **kw))
paulb@84 458
paulb@97 459
    def create(self):
paulb@97 460
paulb@97 461
        """
paulb@140 462
        Create a new process and return the created communications channel to
paulb@140 463
        the created process. In the creating process, return None - the channel
paulb@140 464
        receiving data from the created process will be automatically managed by
paulb@140 465
        this exchange.
paulb@97 466
        """
paulb@97 467
paulb@97 468
        channel = create()
paulb@140 469
        return self._get_channel_for_process(channel)
paulb@97 470
paulb@84 471
    def manage(self, callable):
paulb@84 472
paulb@84 473
        """
paulb@84 474
        Wrap the given 'callable' in an object which can then be called in the
paulb@84 475
        same way as 'callable', but with new processes and communications
paulb@84 476
        managed automatically.
paulb@84 477
        """
paulb@84 478
paulb@84 479
        return ManagedCallable(callable, self)
paulb@84 480
paulb@140 481
class Persistent:
paulb@140 482
paulb@140 483
    """
paulb@140 484
    A mix-in class providing methods to exchanges for the management of
paulb@140 485
    persistent communications.
paulb@140 486
    """
paulb@140 487
paulb@140 488
    def start_waiting(self, channel):
paulb@140 489
paulb@140 490
        """
paulb@140 491
        Start a waiting process given the reception of data on the given
paulb@140 492
        'channel'.
paulb@140 493
        """
paulb@140 494
paulb@140 495
        details = self._get_waiting(channel)
paulb@140 496
        if details is not None:
paulb@140 497
            callable, args, kw = details
paulb@140 498
            self.add(start_persistent(channel.address, callable, *args, **kw))
paulb@140 499
paulb@140 500
    def start(self, address, callable, *args, **kw):
paulb@140 501
paulb@140 502
        """
paulb@140 503
        Create a new process, located at the given 'address', for the given
paulb@140 504
        'callable' using any additional arguments provided. Then, monitor the
paulb@140 505
        channel created between this process and the created process.
paulb@140 506
        """
paulb@140 507
paulb@140 508
        if self._set_waiting(callable, args, kw):
paulb@140 509
            return
paulb@140 510
paulb@140 511
        start_persistent(address, callable, *args, **kw)
paulb@140 512
paulb@140 513
    def create(self, address):
paulb@140 514
paulb@140 515
        """
paulb@140 516
        Create a new process, located at the given 'address', and return the
paulb@140 517
        created communications channel to the created process. In the creating
paulb@140 518
        process, return None - the channel receiving data from the created
paulb@140 519
        process will be automatically managed by this exchange.
paulb@140 520
        """
paulb@140 521
paulb@140 522
        channel = create_persistent(address)
paulb@140 523
        return self._get_channel_for_process(channel)
paulb@140 524
paulb@140 525
    def manage(self, address, callable):
paulb@140 526
paulb@140 527
        """
paulb@140 528
        Using the given 'address', publish the given 'callable' in an object
paulb@140 529
        which can then be called in the same way as 'callable', but with new
paulb@140 530
        processes and communications managed automatically.
paulb@140 531
        """
paulb@140 532
paulb@140 533
        return PersistentCallable(address, callable, self)
paulb@140 534
paulb@140 535
    def connect(self, address):
paulb@140 536
paulb@140 537
        "Connect to a process which is contactable via the given 'address'."
paulb@140 538
paulb@140 539
        channel = connect_persistent(address)
paulb@140 540
        self.add_wait(channel)
paulb@140 541
paulb@84 542
class ManagedCallable:
paulb@84 543
paulb@84 544
    "A callable managed by an exchange."
paulb@84 545
paulb@84 546
    def __init__(self, callable, exchange):
paulb@84 547
paulb@84 548
        """
paulb@84 549
        Wrap the given 'callable', using the given 'exchange' to monitor the
paulb@84 550
        channels created for communications between this and the created
paulb@94 551
        processes. Note that the 'callable' must be parallel-aware (that is,
paulb@94 552
        have a 'channel' parameter). Use the MakeParallel class to wrap other
paulb@94 553
        kinds of callable objects.
paulb@84 554
        """
paulb@84 555
paulb@84 556
        self.callable = callable
paulb@84 557
        self.exchange = exchange
paulb@84 558
paulb@84 559
    def __call__(self, *args, **kw):
paulb@84 560
paulb@84 561
        "Invoke the callable with the supplied arguments."
paulb@84 562
paulb@84 563
        self.exchange.start(self.callable, *args, **kw)
paulb@84 564
paulb@140 565
class PersistentCallable:
paulb@140 566
paulb@140 567
    "A callable which sets up a persistent communications channel."
paulb@140 568
paulb@140 569
    def __init__(self, address, callable, exchange):
paulb@140 570
paulb@140 571
        """
paulb@140 572
        Using the given 'address', wrap the given 'callable', using the given
paulb@140 573
        'exchange' to monitor the channels created for communications between
paulb@140 574
        this and the created processes, so that when it is called, a background
paulb@140 575
        process is started within which the 'callable' will run. Note that the
paulb@140 576
        'callable' must be parallel-aware (that is, have a 'channel' parameter).
paulb@140 577
        Use the MakeParallel class to wrap other kinds of callable objects.
paulb@140 578
        """
paulb@140 579
paulb@140 580
        self.callable = callable
paulb@140 581
        self.exchange = exchange
paulb@140 582
        self.address = address
paulb@140 583
paulb@140 584
    def __call__(self, *args, **kw):
paulb@140 585
paulb@142 586
        "Invoke the callable with the supplied arguments."
paulb@140 587
paulb@140 588
        self.exchange.start(self.address, self.callable, *args, **kw)
paulb@140 589
paulb@142 590
class BackgroundCallable:
paulb@142 591
paulb@142 592
    """
paulb@142 593
    A callable which sets up a persistent communications channel, but is
paulb@142 594
    unmanaged by an exchange.
paulb@142 595
    """
paulb@142 596
paulb@142 597
    def __init__(self, address, callable):
paulb@142 598
paulb@142 599
        """
paulb@142 600
        Using the given 'address', wrap the given 'callable'. This object can
paulb@142 601
        then be invoked, but the wrapped callable will be run in a background
paulb@142 602
        process. Note that the 'callable' must be parallel-aware (that is, have
paulb@142 603
        a 'channel' parameter). Use the MakeParallel class to wrap other kinds
paulb@142 604
        of callable objects.
paulb@142 605
        """
paulb@142 606
paulb@142 607
        self.callable = callable
paulb@142 608
        self.address = address
paulb@142 609
paulb@142 610
    def __call__(self, *args, **kw):
paulb@142 611
paulb@142 612
        "Invoke the callable with the supplied arguments."
paulb@142 613
paulb@142 614
        start_persistent(self.address, self.callable, *args, **kw)
paulb@142 615
paulb@84 616
# Abstractions and utilities.
paulb@84 617
paulb@84 618
class Map(Exchange):
paulb@84 619
paulb@84 620
    "An exchange which can be used like the built-in 'map' function."
paulb@84 621
paulb@107 622
    def __init__(self, *args, **kw):
paulb@107 623
        Exchange.__init__(self, *args, **kw)
paulb@107 624
        self.init()
paulb@107 625
paulb@107 626
    def init(self):
paulb@107 627
paulb@107 628
        "Remember the channel addition order to order output."
paulb@107 629
paulb@107 630
        self.channel_number = 0
paulb@107 631
        self.channels = {}
paulb@107 632
        self.results = []
paulb@107 633
paulb@84 634
    def add(self, channel):
paulb@84 635
paulb@84 636
        "Add the given 'channel' to the exchange."
paulb@84 637
paulb@84 638
        Exchange.add(self, channel)
paulb@92 639
        self.channels[channel] = self.channel_number
paulb@92 640
        self.channel_number += 1
paulb@84 641
paulb@107 642
    def start(self, callable, *args, **kw):
paulb@107 643
paulb@107 644
        """
paulb@140 645
        Create a new process for the given 'callable' using any additional
paulb@140 646
        arguments provided. Then, monitor the channel created between this
paulb@140 647
        process and the created process.
paulb@107 648
        """
paulb@107 649
paulb@107 650
        self.results.append(None) # placeholder
paulb@107 651
        Exchange.start(self, callable, *args, **kw)
paulb@107 652
paulb@110 653
    def create(self):
paulb@110 654
paulb@110 655
        """
paulb@140 656
        Create a new process and return the created communications channel to
paulb@140 657
        the created process. In the creating process, return None - the channel
paulb@140 658
        receiving data from the created process will be automatically managed by
paulb@140 659
        this exchange.
paulb@110 660
        """
paulb@110 661
paulb@110 662
        self.results.append(None) # placeholder
paulb@110 663
        return Exchange.create(self)
paulb@110 664
paulb@84 665
    def __call__(self, callable, sequence):
paulb@84 666
paulb@89 667
        "Wrap and invoke 'callable' for each element in the 'sequence'."
paulb@89 668
paulb@92 669
        if not isinstance(callable, MakeParallel):
paulb@92 670
            wrapped = MakeParallel(callable)
paulb@92 671
        else:
paulb@92 672
            wrapped = callable
paulb@84 673
paulb@107 674
        self.init()
paulb@84 675
paulb@107 676
        # Start processes for each element in the sequence.
paulb@84 677
paulb@84 678
        for i in sequence:
paulb@92 679
            self.start(wrapped, i)
paulb@107 680
paulb@107 681
        # Access to the results occurs through this object.
paulb@107 682
paulb@107 683
        return self
paulb@84 684
paulb@107 685
    def __getitem__(self, i):
paulb@107 686
        self.finish()
paulb@107 687
        return self.results[i]
paulb@84 688
paulb@107 689
    def __iter__(self):
paulb@107 690
        self.finish()
paulb@107 691
        return iter(self.results)
paulb@84 692
paulb@84 693
    def store_data(self, channel):
paulb@84 694
paulb@84 695
        "Accumulate the incoming data, associating results with channels."
paulb@84 696
paulb@84 697
        data = channel.receive()
paulb@92 698
        self.results[self.channels[channel]] = data
paulb@92 699
        del self.channels[channel]
paulb@84 700
paulb@97 701
class Queue(Exchange):
paulb@97 702
paulb@97 703
    """
paulb@97 704
    An exchange acting as a queue, making data from created processes available
paulb@97 705
    in the order in which it is received.
paulb@97 706
    """
paulb@97 707
paulb@97 708
    def __init__(self, *args, **kw):
paulb@97 709
        Exchange.__init__(self, *args, **kw)
paulb@97 710
        self.queue = []
paulb@97 711
paulb@97 712
    def store_data(self, channel):
paulb@97 713
paulb@97 714
        "Accumulate the incoming data, associating results with channels."
paulb@97 715
paulb@97 716
        data = channel.receive()
paulb@97 717
        self.queue.insert(0, data)
paulb@97 718
paulb@97 719
    def __iter__(self):
paulb@97 720
        return self
paulb@97 721
paulb@97 722
    def next(self):
paulb@97 723
paulb@97 724
        "Return the next element in the queue."
paulb@97 725
paulb@97 726
        if self.queue:
paulb@97 727
            return self.queue.pop()
paulb@97 728
        while self.active():
paulb@97 729
            self.store()
paulb@97 730
            if self.queue:
paulb@97 731
                return self.queue.pop()
paulb@97 732
        else:
paulb@97 733
            raise StopIteration
paulb@97 734
paulb@84 735
class MakeParallel:
paulb@84 736
paulb@84 737
    "A wrapper around functions making them able to communicate results."
paulb@84 738
paulb@84 739
    def __init__(self, callable):
paulb@84 740
paulb@94 741
        """
paulb@94 742
        Initialise the wrapper with the given 'callable'. This object will then
paulb@94 743
        be able to accept a 'channel' parameter when invoked, and to forward the
paulb@94 744
        result of the given 'callable' via the channel provided back to the
paulb@94 745
        invoking process.
paulb@94 746
        """
paulb@84 747
paulb@84 748
        self.callable = callable
paulb@84 749
paulb@84 750
    def __call__(self, channel, *args, **kw):
paulb@84 751
paulb@84 752
        "Invoke the callable and return its result via the given 'channel'."
paulb@84 753
paulb@84 754
        channel.send(self.callable(*args, **kw))
paulb@84 755
paulb@119 756
class MakeReusable(MakeParallel):
paulb@119 757
paulb@119 758
    """
paulb@119 759
    A wrapper around functions making them able to communicate results in a
paulb@119 760
    reusable fashion.
paulb@119 761
    """
paulb@119 762
paulb@119 763
    def __call__(self, channel, *args, **kw):
paulb@119 764
paulb@119 765
        "Invoke the callable and return its result via the given 'channel'."
paulb@119 766
paulb@119 767
        channel.send(self.callable(*args, **kw))
paulb@119 768
        t = channel.receive()
paulb@119 769
        while t is not None:
paulb@119 770
            args, kw = t
paulb@119 771
            channel.send(self.callable(*args, **kw))
paulb@119 772
            t = channel.receive()
paulb@119 773
paulb@140 774
# Persistent variants.
paulb@140 775
paulb@140 776
class PersistentExchange(Persistent, Exchange):
paulb@140 777
paulb@140 778
    "An exchange which manages persistent communications."
paulb@140 779
paulb@140 780
    pass
paulb@140 781
paulb@140 782
class PersistentQueue(Persistent, Queue):
paulb@140 783
paulb@140 784
    "A queue which manages persistent communications."
paulb@140 785
paulb@140 786
    pass
paulb@140 787
paulb@142 788
# Convenience functions.
paulb@142 789
paulb@142 790
def BackgroundQueue(address):
paulb@142 791
paulb@142 792
    """
paulb@142 793
    Connect to a process reachable via the given 'address', making the results
paulb@142 794
    of which accessible via a queue.
paulb@142 795
    """
paulb@142 796
paulb@142 797
    queue = PersistentQueue(limit=1)
paulb@142 798
    queue.connect(address)
paulb@142 799
    return queue
paulb@142 800
paulb@142 801
def pmap(callable, sequence, limit=None):
paulb@142 802
paulb@142 803
    """
paulb@142 804
    A parallel version of the built-in map function with an optional process
paulb@142 805
    'limit'. The given 'callable' should not be parallel-aware (that is, have a
paulb@142 806
    'channel' parameter) since it will be wrapped for parallel communications
paulb@142 807
    before being invoked.
paulb@142 808
paulb@142 809
    Return the processed 'sequence' where each element in the sequence is
paulb@142 810
    processed by a different process.
paulb@142 811
    """
paulb@142 812
paulb@142 813
    mymap = Map(limit=limit)
paulb@142 814
    return mymap(callable, sequence)
paulb@142 815
paulb@84 816
# Utility functions.
paulb@79 817
paulb@135 818
_cpuinfo_fields = "physical id", "core id"
paulb@135 819
paulb@135 820
def get_number_of_cores():
paulb@135 821
paulb@135 822
    """
paulb@135 823
    Return the number of distinct, genuine processor cores. If the platform is
paulb@135 824
    not supported by this function, None is returned.
paulb@135 825
    """
paulb@135 826
paulb@135 827
    try:
paulb@135 828
        f = open("/proc/cpuinfo")
paulb@135 829
        try:
paulb@135 830
            processors = set()
paulb@135 831
            processor = [None, None]
paulb@135 832
paulb@135 833
            for line in f.xreadlines():
paulb@135 834
                for i, field in enumerate(_cpuinfo_fields):
paulb@135 835
                    if line.startswith(field):
paulb@135 836
                        t = line.split(":")
paulb@135 837
                        processor[i] = int(t[1].strip())
paulb@135 838
                        break
paulb@135 839
                else:
paulb@135 840
                    if line.startswith("processor") and processor[0] is not None:
paulb@135 841
                        processors.add(tuple(processor))
paulb@135 842
                        processor = [None, None]
paulb@135 843
paulb@135 844
            if processor[0] is not None:
paulb@135 845
                processors.add(tuple(processor))
paulb@135 846
paulb@135 847
            return len(processors)
paulb@135 848
paulb@135 849
        finally:
paulb@135 850
            f.close()
paulb@135 851
paulb@135 852
    except OSError:
paulb@135 853
        return None
paulb@135 854
paulb@142 855
# Low-level functions.
paulb@142 856
paulb@40 857
def create():
paulb@40 858
paulb@40 859
    """
paulb@40 860
    Create a new process, returning a communications channel to both the
paulb@40 861
    creating process and the created process.
paulb@40 862
    """
paulb@40 863
paulb@40 864
    parent, child = socket.socketpair()
paulb@40 865
    for s in [parent, child]:
paulb@40 866
        s.setblocking(1)
paulb@40 867
paulb@40 868
    pid = os.fork()
paulb@40 869
    if pid == 0:
paulb@40 870
        parent.close()
paulb@73 871
        return Channel(pid, child.makefile("r", 0), child.makefile("w", 0))
paulb@40 872
    else:
paulb@40 873
        child.close()
paulb@73 874
        return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0))
paulb@40 875
paulb@140 876
def create_persistent(address):
paulb@140 877
paulb@140 878
    """
paulb@140 879
    Create a new process, returning a persistent communications channel between
paulb@140 880
    the creating process and the created process. This channel can be
paulb@140 881
    disconnected from the creating process and connected to another process, and
paulb@140 882
    thus can be used to collect results from daemon processes.
paulb@140 883
paulb@140 884
    In order to be able to reconnect to created processes, the 'address' of the
paulb@140 885
    communications endpoint for the created process needs to be provided. This
paulb@140 886
    should be a filename.
paulb@140 887
    """
paulb@140 888
paulb@140 889
    parent = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
paulb@140 890
    child = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
paulb@140 891
    child.bind(address)
paulb@140 892
paulb@140 893
    for s in [parent, child]:
paulb@140 894
        s.setblocking(1)
paulb@140 895
paulb@140 896
    pid = os.fork()
paulb@140 897
    if pid == 0:
paulb@140 898
        parent.close()
paulb@140 899
        return PersistentChannel(pid, child, address)
paulb@140 900
    else:
paulb@140 901
        child.close()
paulb@140 902
        #parent.connect(address)
paulb@140 903
        return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0))
paulb@140 904
paulb@140 905
def connect_persistent(address):
paulb@140 906
paulb@140 907
    """
paulb@140 908
    Connect via a persistent channel to an existing created process, reachable
paulb@140 909
    at the given 'address'.
paulb@140 910
    """
paulb@140 911
paulb@140 912
    parent = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
paulb@140 913
    parent.setblocking(1)
paulb@140 914
    parent.connect(address)
paulb@140 915
    return Channel(0, parent.makefile("r", 0), parent.makefile("w", 0))
paulb@140 916
paulb@97 917
def exit(channel):
paulb@97 918
paulb@97 919
    """
paulb@97 920
    Terminate a created process, closing the given 'channel'.
paulb@97 921
    """
paulb@97 922
paulb@97 923
    channel.close()
paulb@97 924
    os._exit(0)
paulb@97 925
paulb@84 926
def start(callable, *args, **kw):
paulb@40 927
paulb@40 928
    """
paulb@40 929
    Create a new process which shall start running in the given 'callable'.
paulb@94 930
    Additional arguments to the 'callable' can be given as additional arguments
paulb@94 931
    to this function.
paulb@94 932
paulb@94 933
    Return a communications channel to the creating process. For the created
paulb@94 934
    process, supply a channel as the 'channel' parameter in the given 'callable'
paulb@94 935
    so that it may send data back to the creating process.
paulb@40 936
    """
paulb@40 937
paulb@40 938
    channel = create()
paulb@40 939
    if channel.pid == 0:
paulb@40 940
        try:
paulb@40 941
            try:
paulb@84 942
                callable(channel, *args, **kw)
paulb@40 943
            except:
paulb@40 944
                exc_type, exc_value, exc_traceback = sys.exc_info()
paulb@40 945
                channel.send(exc_value)
paulb@40 946
        finally:
paulb@99 947
            exit(channel)
paulb@40 948
    else:
paulb@40 949
        return channel
paulb@40 950
paulb@140 951
def start_persistent(address, callable, *args, **kw):
paulb@140 952
paulb@140 953
    """
paulb@140 954
    Create a new process which shall be reachable using the given 'address' and
paulb@140 955
    which will start running in the given 'callable'. Additional arguments to
paulb@140 956
    the 'callable' can be given as additional arguments to this function.
paulb@140 957
paulb@140 958
    Return a communications channel to the creating process. For the created
paulb@140 959
    process, supply a channel as the 'channel' parameter in the given 'callable'
paulb@140 960
    so that it may send data back to the creating process.
paulb@140 961
paulb@140 962
    Note that the created process employs a channel which is persistent: it can
paulb@140 963
    withstand disconnection from the creating process and subsequent connections
paulb@140 964
    from other processes.
paulb@140 965
    """
paulb@140 966
paulb@140 967
    channel = create_persistent(address)
paulb@140 968
    if channel.pid == 0:
paulb@140 969
        close_streams()
paulb@140 970
        try:
paulb@140 971
            try:
paulb@140 972
                callable(channel, *args, **kw)
paulb@140 973
            except:
paulb@140 974
                exc_type, exc_value, exc_traceback = sys.exc_info()
paulb@140 975
                channel.send(exc_value)
paulb@140 976
        finally:
paulb@140 977
            exit(channel)
paulb@140 978
    else:
paulb@140 979
        return channel
paulb@140 980
paulb@140 981
def close_streams():
paulb@140 982
paulb@140 983
    """
paulb@140 984
    Close streams which keep the current process attached to any creating
paulb@140 985
    processes.
paulb@140 986
    """
paulb@140 987
paulb@140 988
    os.close(sys.stdin.fileno())
paulb@140 989
    os.close(sys.stdout.fileno())
paulb@140 990
    os.close(sys.stderr.fileno())
paulb@140 991
paulb@40 992
def waitall():
paulb@40 993
paulb@40 994
    "Wait for all created processes to terminate."
paulb@40 995
paulb@40 996
    try:
paulb@40 997
        while 1:
paulb@40 998
            os.wait()
paulb@40 999
    except OSError:
paulb@40 1000
        pass
paulb@40 1001
paulb@40 1002
# vim: tabstop=4 expandtab shiftwidth=4