pprocess

Annotated pprocess.py

159:448313b5b34e
2009-06-05 Paul Boddie Added more documentation about continuous communications.
paulb@40 1
#!/usr/bin/env python
paulb@40 2
paulb@40 3
"""
paulb@40 4
A simple parallel processing API for Python, inspired somewhat by the thread
paulb@40 5
module, slightly less by pypar, and slightly less still by pypvm.
paulb@40 6
paul@155 7
Copyright (C) 2005, 2006, 2007, 2008, 2009 Paul Boddie <paul@boddie.org.uk>
paulb@41 8
paulb@79 9
This program is free software; you can redistribute it and/or modify it under
paulb@79 10
the terms of the GNU Lesser General Public License as published by the Free
paulb@79 11
Software Foundation; either version 3 of the License, or (at your option) any
paulb@79 12
later version.
paulb@41 13
paulb@79 14
This program is distributed in the hope that it will be useful, but WITHOUT
paulb@79 15
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
paulb@79 16
FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
paulb@79 17
details.
paulb@41 18
paulb@79 19
You should have received a copy of the GNU Lesser General Public License along
paulb@79 20
with this program.  If not, see <http://www.gnu.org/licenses/>.
paulb@40 21
"""
paulb@40 22
paul@155 23
__version__ = "0.4.1"
paulb@40 24
paulb@40 25
import os
paulb@40 26
import sys
paulb@40 27
import select
paulb@40 28
import socket
paulb@150 29
import platform
paulb@40 30
paulb@40 31
try:
paulb@40 32
    import cPickle as pickle
paulb@40 33
except ImportError:
paulb@40 34
    import pickle
paulb@40 35
paulb@135 36
try:
paulb@135 37
    set
paulb@135 38
except NameError:
paulb@135 39
    from sets import Set as set
paulb@135 40
paul@156 41
# Special values.
paul@156 42
paul@156 43
class Undefined: pass
paul@156 44
paulb@84 45
# Communications.
paulb@84 46
paulb@40 47
class AcknowledgementError(Exception):
paulb@40 48
    pass
paulb@40 49
paulb@40 50
class Channel:
paulb@40 51
paulb@40 52
    "A communications channel."
paulb@40 53
paulb@40 54
    def __init__(self, pid, read_pipe, write_pipe):
paulb@40 55
paulb@40 56
        """
paulb@40 57
        Initialise the channel with a process identifier 'pid', a 'read_pipe'
paulb@40 58
        from which messages will be received, and a 'write_pipe' into which
paulb@40 59
        messages will be sent.
paulb@40 60
        """
paulb@40 61
paulb@40 62
        self.pid = pid
paulb@40 63
        self.read_pipe = read_pipe
paulb@40 64
        self.write_pipe = write_pipe
paulb@40 65
paulb@40 66
    def __del__(self):
paulb@40 67
paulb@40 68
        # Since signals don't work well with I/O, we close pipes and wait for
paulb@40 69
        # created processes upon finalisation.
paulb@40 70
paulb@40 71
        self.close()
paulb@40 72
paulb@40 73
    def close(self):
paulb@40 74
paulb@40 75
        "Explicitly close the channel."
paulb@40 76
paulb@140 77
        if self.read_pipe is not None:
paulb@40 78
            self.read_pipe.close()
paulb@140 79
            self.read_pipe = None
paulb@140 80
        if self.write_pipe is not None:
paulb@40 81
            self.write_pipe.close()
paulb@140 82
            self.write_pipe = None
paulb@140 83
        #self.wait(os.WNOHANG)
paulb@40 84
paulb@40 85
    def wait(self, options=0):
paulb@40 86
paulb@40 87
        "Wait for the created process, if any, to exit."
paulb@40 88
paulb@40 89
        if self.pid != 0:
paulb@40 90
            try:
paulb@40 91
                os.waitpid(self.pid, options)
paulb@40 92
            except OSError:
paulb@40 93
                pass
paulb@40 94
paulb@40 95
    def _send(self, obj):
paulb@40 96
paulb@40 97
        "Send the given object 'obj' through the channel."
paulb@40 98
paulb@40 99
        pickle.dump(obj, self.write_pipe)
paulb@40 100
        self.write_pipe.flush()
paulb@40 101
paulb@40 102
    def send(self, obj):
paulb@40 103
paulb@40 104
        """
paulb@40 105
        Send the given object 'obj' through the channel. Then wait for an
paulb@40 106
        acknowledgement. (The acknowledgement makes the caller wait, thus
paulb@40 107
        preventing processes from exiting and disrupting the communications
paulb@40 108
        channel and losing data.)
paulb@40 109
        """
paulb@40 110
paulb@40 111
        self._send(obj)
paulb@40 112
        if self._receive() != "OK":
paulb@40 113
            raise AcknowledgementError, obj
paulb@40 114
paulb@40 115
    def _receive(self):
paulb@40 116
paulb@40 117
        "Receive an object through the channel, returning the object."
paulb@40 118
paulb@40 119
        obj = pickle.load(self.read_pipe)
paulb@40 120
        if isinstance(obj, Exception):
paulb@40 121
            raise obj
paulb@40 122
        else:
paulb@40 123
            return obj
paulb@40 124
paulb@40 125
    def receive(self):
paulb@40 126
paulb@40 127
        """
paulb@40 128
        Receive an object through the channel, returning the object. Send an
paulb@40 129
        acknowledgement of receipt. (The acknowledgement makes the sender wait,
paulb@40 130
        thus preventing processes from exiting and disrupting the communications
paulb@40 131
        channel and losing data.)
paulb@40 132
        """
paulb@40 133
paulb@40 134
        try:
paulb@40 135
            obj = self._receive()
paulb@40 136
            return obj
paulb@40 137
        finally:
paulb@40 138
            self._send("OK")
paulb@40 139
paulb@140 140
class PersistentChannel(Channel):
paulb@140 141
paulb@140 142
    """
paulb@140 143
    A persistent communications channel which can handle peer disconnection,
paulb@140 144
    acting as a server, meaning that this channel is associated with a specific
paulb@140 145
    address which can be contacted by other processes.
paulb@140 146
    """
paulb@140 147
paulb@140 148
    def __init__(self, pid, endpoint, address):
paulb@140 149
        Channel.__init__(self, pid, None, None)
paulb@140 150
        self.endpoint = endpoint
paulb@140 151
        self.address = address
paulb@140 152
        self.poller = select.poll()
paulb@142 153
paulb@142 154
        # Listen for connections before this process is interested in
paulb@142 155
        # communicating. It is not desirable to wait for connections at this
paulb@142 156
        # point because this will block the process.
paulb@142 157
paulb@142 158
        self.endpoint.listen(1)
paulb@140 159
paulb@140 160
    def close(self):
paulb@142 161
paulb@142 162
        "Close the persistent channel and remove the socket file."
paulb@142 163
paulb@140 164
        Channel.close(self)
paulb@140 165
        try:
paulb@140 166
            os.unlink(self.address)
paulb@140 167
        except OSError:
paulb@140 168
            pass
paulb@140 169
paulb@140 170
    def _ensure_pipes(self):
paulb@140 171
paulb@140 172
        "Ensure that the channel is capable of communicating."
paulb@140 173
paulb@140 174
        if self.read_pipe is None or self.write_pipe is None:
paulb@142 175
paulb@142 176
            # Accept any incoming connections.
paulb@142 177
paulb@140 178
            endpoint, address = self.endpoint.accept()
paulb@140 179
            self.read_pipe = endpoint.makefile("r", 0)
paulb@140 180
            self.write_pipe = endpoint.makefile("w", 0)
paulb@140 181
paulb@140 182
            # Monitor the write pipe for error conditions.
paulb@140 183
paulb@140 184
            fileno = self.write_pipe.fileno()
paulb@140 185
            self.poller.register(fileno, select.POLLOUT | select.POLLHUP | select.POLLNVAL | select.POLLERR)
paulb@140 186
paulb@140 187
    def _reset_pipes(self):
paulb@142 188
paulb@142 189
        "Discard the existing connection."
paulb@142 190
paulb@140 191
        fileno = self.write_pipe.fileno()
paulb@140 192
        self.poller.unregister(fileno)
paulb@140 193
        self.read_pipe = None
paulb@140 194
        self.write_pipe = None
paulb@142 195
        self.endpoint.listen(1)
paulb@140 196
paulb@140 197
    def _ensure_communication(self, timeout=None):
paulb@140 198
paulb@140 199
        "Ensure that sending and receiving are possible."
paulb@140 200
paulb@140 201
        while 1:
paulb@142 202
            self._ensure_pipes()
paulb@140 203
            fileno = self.write_pipe.fileno()
paulb@140 204
            fds = self.poller.poll(timeout)
paulb@140 205
            for fd, status in fds:
paulb@140 206
                if fd != fileno:
paulb@140 207
                    continue
paulb@140 208
                if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR):
paulb@142 209
paulb@142 210
                    # Broken connection: discard it and start all over again.
paulb@142 211
paulb@140 212
                    self._reset_pipes()
paulb@140 213
                    break
paulb@140 214
            else:
paulb@140 215
                return
paulb@140 216
paulb@140 217
    def _send(self, obj):
paulb@140 218
paulb@140 219
        "Send the given object 'obj' through the channel."
paulb@140 220
paulb@140 221
        self._ensure_communication()
paulb@140 222
        Channel._send(self, obj)
paulb@140 223
paulb@140 224
    def _receive(self):
paulb@140 225
paulb@140 226
        "Receive an object through the channel, returning the object."
paulb@140 227
paulb@140 228
        self._ensure_communication()
paulb@140 229
        return Channel._receive(self)
paulb@140 230
paulb@84 231
# Management of processes and communications.
paulb@84 232
paulb@40 233
class Exchange:
paulb@40 234
paulb@40 235
    """
paulb@40 236
    A communications exchange that can be used to detect channels which are
paulb@67 237
    ready to communicate. Subclasses of this class can define the 'store_data'
paulb@67 238
    method in order to enable the 'add_wait', 'wait' and 'finish' methods.
paul@158 239
paul@158 240
    Once exchanges are populated with active channels, use of the principal
paul@158 241
    methods of the exchange typically cause the 'store' method to be invoked,
paul@158 242
    resulting in the processing of any incoming data.
paulb@40 243
    """
paulb@40 244
paul@157 245
    def __init__(self, channels=None, limit=None, reuse=0, continuous=0, autoclose=1):
paulb@40 246
paulb@40 247
        """
paulb@67 248
        Initialise the exchange with an optional list of 'channels'.
paulb@67 249
paulb@67 250
        If the optional 'limit' is specified, restrictions on the addition of
paulb@67 251
        new channels can be enforced and observed through the 'add_wait', 'wait'
paulb@67 252
        and 'finish' methods. To make use of these methods, create a subclass of
paulb@67 253
        this class and define a working 'store_data' method.
paulb@67 254
paulb@116 255
        If the optional 'reuse' parameter is set to a true value, channels and
paul@157 256
        processes will be reused for waiting computations, but the callable will
paul@157 257
        be invoked for each computation.
paul@157 258
paul@157 259
        If the optional 'continuous' parameter is set to a true value, channels
paul@157 260
        and processes will be retained after receiving data sent from such
paul@157 261
        processes, since it will be assumed that they will communicate more
paul@157 262
        data.
paulb@116 263
paulb@67 264
        If the optional 'autoclose' parameter is set to a false value, channels
paulb@67 265
        will not be closed automatically when they are removed from the exchange
paulb@67 266
        - by default they are closed when removed.
paulb@40 267
        """
paulb@40 268
paulb@67 269
        self.limit = limit
paulb@116 270
        self.reuse = reuse
paulb@116 271
        self.autoclose = autoclose
paul@157 272
        self.continuous = continuous
paul@157 273
paulb@99 274
        self.waiting = []
paulb@40 275
        self.readables = {}
paulb@58 276
        self.removed = []
paulb@40 277
        self.poller = select.poll()
paul@157 278
paulb@40 279
        for channel in channels or []:
paulb@40 280
            self.add(channel)
paulb@40 281
paul@158 282
    # Core methods, registering and reporting on channels.
paul@158 283
paulb@40 284
    def add(self, channel):
paulb@40 285
paulb@40 286
        "Add the given 'channel' to the exchange."
paulb@40 287
paulb@140 288
        fileno = channel.read_pipe.fileno()
paulb@140 289
        self.readables[fileno] = channel
paulb@140 290
        self.poller.register(fileno, select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR)
paulb@40 291
paulb@40 292
    def active(self):
paulb@40 293
paulb@40 294
        "Return a list of active channels."
paulb@40 295
paulb@40 296
        return self.readables.values()
paulb@40 297
paulb@40 298
    def ready(self, timeout=None):
paulb@40 299
paulb@40 300
        """
paul@158 301
        Wait for a period of time specified by the optional 'timeout' in
paul@158 302
        milliseconds (or until communication is possible) and return a list of
paul@158 303
        channels which are ready to be read from.
paulb@40 304
        """
paulb@40 305
paulb@40 306
        fds = self.poller.poll(timeout)
paulb@40 307
        readables = []
paulb@58 308
        self.removed = []
paulb@58 309
paulb@40 310
        for fd, status in fds:
paulb@40 311
            channel = self.readables[fd]
paulb@55 312
            removed = 0
paulb@40 313
paulb@40 314
            # Remove ended/error channels.
paulb@40 315
paulb@40 316
            if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR):
paulb@40 317
                self.remove(channel)
paulb@58 318
                self.removed.append(channel)
paulb@55 319
                removed = 1
paulb@40 320
paulb@40 321
            # Record readable channels.
paulb@40 322
paulb@55 323
            if status & select.POLLIN:
paulb@55 324
                if not (removed and self.autoclose):
paulb@55 325
                    readables.append(channel)
paulb@40 326
paulb@40 327
        return readables
paulb@40 328
paulb@40 329
    def remove(self, channel):
paulb@40 330
paulb@40 331
        """
paulb@40 332
        Remove the given 'channel' from the exchange.
paulb@40 333
        """
paulb@40 334
paulb@142 335
        fileno = channel.read_pipe.fileno()
paulb@142 336
        del self.readables[fileno]
paulb@142 337
        self.poller.unregister(fileno)
paulb@40 338
        if self.autoclose:
paulb@40 339
            channel.close()
paulb@40 340
            channel.wait()
paulb@40 341
paulb@67 342
    # Enhanced exchange methods involving channel limits.
paulb@67 343
paul@158 344
    def unfinished(self):
paul@158 345
paul@158 346
        "Return whether the exchange still has work scheduled or in progress."
paul@158 347
paul@158 348
        return self.active() or self.waiting
paul@158 349
paul@158 350
    def busy(self):
paul@158 351
paul@158 352
        "Return whether the exchange uses as many channels as it is allowed to."
paul@158 353
paul@158 354
        return self.limit is not None and len(self.active()) >= self.limit
paul@158 355
paulb@67 356
    def add_wait(self, channel):
paulb@67 357
paulb@67 358
        """
paulb@67 359
        Add the given 'channel' to the exchange, waiting if the limit on active
paulb@67 360
        channels would be exceeded by adding the channel.
paulb@67 361
        """
paulb@67 362
paulb@67 363
        self.wait()
paulb@67 364
        self.add(channel)
paulb@67 365
paulb@67 366
    def wait(self):
paulb@67 367
paulb@67 368
        """
paulb@67 369
        Test for the limit on channels, blocking and reading incoming data until
paulb@67 370
        the number of channels is below the limit.
paulb@67 371
        """
paulb@67 372
paulb@67 373
        # If limited, block until channels have been closed.
paulb@67 374
paul@158 375
        while self.busy():
paul@158 376
            self.store()
paul@158 377
paul@158 378
    def finish(self):
paul@158 379
paul@158 380
        """
paul@158 381
        Finish the use of the exchange by waiting for all channels to complete.
paul@158 382
        """
paul@158 383
paul@158 384
        while self.unfinished():
paulb@67 385
            self.store()
paulb@67 386
paul@158 387
    def store(self, timeout=None):
paul@158 388
paul@158 389
        """
paul@158 390
        For each ready channel, process the incoming data. If the optional
paul@158 391
        'timeout' parameter (a duration in milliseconds) is specified, wait only
paul@158 392
        for the specified duration if no channels are ready to provide data.
paul@158 393
        """
paul@158 394
paul@158 395
        # Either process input from active channels.
paul@158 396
paul@158 397
        if self.active():
paul@158 398
            for channel in self.ready(timeout):
paul@158 399
                self.store_data(channel)
paul@158 400
                self.start_waiting(channel)
paul@158 401
paul@158 402
        # Or schedule new processes and channels.
paul@158 403
paul@158 404
        else:
paul@158 405
            while self.waiting and not self.busy():
paul@158 406
                callable, args, kw = self.waiting.pop()
paul@158 407
                self.start(callable, *args, **kw)
paul@158 408
paul@158 409
    def store_data(self, channel):
paul@158 410
paul@158 411
        """
paul@158 412
        Store incoming data from the specified 'channel'. In subclasses of this
paul@158 413
        class, such data could be stored using instance attributes.
paul@158 414
        """
paul@158 415
paul@158 416
        raise NotImplementedError, "store_data"
paul@158 417
paul@158 418
    # Support for the convenience methods.
paul@158 419
paulb@140 420
    def _get_waiting(self, channel):
paulb@99 421
paulb@99 422
        """
paulb@140 423
        Get waiting callable and argument information for new processes, given
paulb@140 424
        the reception of data on the given 'channel'.
paulb@99 425
        """
paulb@99 426
paul@158 427
        # For continuous channels, no scheduling is requested.
paul@158 428
paul@158 429
        if self.waiting and not self.continuous:
paul@158 430
paul@158 431
            # Schedule this callable and arguments.
paul@158 432
paulb@99 433
            callable, args, kw = self.waiting.pop()
paulb@116 434
paulb@116 435
            # Try and reuse existing channels if possible.
paulb@116 436
paulb@116 437
            if self.reuse:
paulb@119 438
paulb@119 439
                # Re-add the channel - this may update information related to
paulb@119 440
                # the channel in subclasses.
paulb@119 441
paulb@119 442
                self.add(channel)
paulb@116 443
                channel.send((args, kw))
paul@157 444
paulb@116 445
            else:
paulb@140 446
                return callable, args, kw
paulb@116 447
paulb@116 448
        # Where channels are being reused, but where no processes are waiting
paulb@116 449
        # any more, send a special value to tell them to quit.
paulb@116 450
paulb@116 451
        elif self.reuse:
paulb@116 452
            channel.send(None)
paulb@99 453
paulb@140 454
        return None
paulb@140 455
paulb@140 456
    def _set_waiting(self, callable, args, kw):
paulb@140 457
paulb@140 458
        """
paulb@140 459
        Support process creation by returning whether the given 'callable' has
paulb@140 460
        been queued for later invocation.
paulb@140 461
        """
paulb@140 462
paul@158 463
        if self.busy():
paulb@140 464
            self.waiting.insert(0, (callable, args, kw))
paulb@140 465
            return 1
paulb@140 466
        else:
paulb@140 467
            return 0
paulb@140 468
paulb@140 469
    def _get_channel_for_process(self, channel):
paulb@140 470
paulb@140 471
        """
paulb@140 472
        Support process creation by returning the given 'channel' to the
paulb@140 473
        creating process, and None to the created process.
paulb@140 474
        """
paulb@140 475
paulb@140 476
        if channel.pid == 0:
paulb@140 477
            return channel
paulb@140 478
        else:
paulb@140 479
            self.add_wait(channel)
paulb@140 480
            return None
paulb@140 481
paulb@140 482
    # Methods for overriding, related to the convenience methods.
paulb@140 483
paulb@140 484
    def start_waiting(self, channel):
paulb@140 485
paulb@140 486
        """
paulb@140 487
        Start a waiting process given the reception of data on the given
paulb@140 488
        'channel'.
paulb@140 489
        """
paulb@140 490
paulb@140 491
        details = self._get_waiting(channel)
paulb@140 492
        if details is not None:
paulb@140 493
            callable, args, kw = details
paulb@140 494
            self.add(start(callable, *args, **kw))
paulb@140 495
paulb@79 496
    # Convenience methods.
paulb@79 497
paulb@84 498
    def start(self, callable, *args, **kw):
paulb@79 499
paulb@79 500
        """
paulb@140 501
        Create a new process for the given 'callable' using any additional
paulb@140 502
        arguments provided. Then, monitor the channel created between this
paulb@140 503
        process and the created process.
paulb@79 504
        """
paulb@79 505
paulb@140 506
        if self._set_waiting(callable, args, kw):
paulb@99 507
            return
paulb@99 508
paulb@84 509
        self.add_wait(start(callable, *args, **kw))
paulb@84 510
paulb@97 511
    def create(self):
paulb@97 512
paulb@97 513
        """
paulb@140 514
        Create a new process and return the created communications channel to
paulb@140 515
        the created process. In the creating process, return None - the channel
paulb@140 516
        receiving data from the created process will be automatically managed by
paulb@140 517
        this exchange.
paulb@97 518
        """
paulb@97 519
paulb@97 520
        channel = create()
paulb@140 521
        return self._get_channel_for_process(channel)
paulb@97 522
paulb@84 523
    def manage(self, callable):
paulb@84 524
paulb@84 525
        """
paulb@84 526
        Wrap the given 'callable' in an object which can then be called in the
paulb@84 527
        same way as 'callable', but with new processes and communications
paulb@84 528
        managed automatically.
paulb@84 529
        """
paulb@84 530
paulb@84 531
        return ManagedCallable(callable, self)
paulb@84 532
paulb@140 533
class Persistent:
paulb@140 534
paulb@140 535
    """
paulb@140 536
    A mix-in class providing methods to exchanges for the management of
paulb@140 537
    persistent communications.
paulb@140 538
    """
paulb@140 539
paulb@140 540
    def start_waiting(self, channel):
paulb@140 541
paulb@140 542
        """
paulb@140 543
        Start a waiting process given the reception of data on the given
paulb@140 544
        'channel'.
paulb@140 545
        """
paulb@140 546
paulb@140 547
        details = self._get_waiting(channel)
paulb@140 548
        if details is not None:
paulb@140 549
            callable, args, kw = details
paulb@140 550
            self.add(start_persistent(channel.address, callable, *args, **kw))
paulb@140 551
paulb@140 552
    def start(self, address, callable, *args, **kw):
paulb@140 553
paulb@140 554
        """
paulb@140 555
        Create a new process, located at the given 'address', for the given
paulb@140 556
        'callable' using any additional arguments provided. Then, monitor the
paulb@140 557
        channel created between this process and the created process.
paulb@140 558
        """
paulb@140 559
paulb@140 560
        if self._set_waiting(callable, args, kw):
paulb@140 561
            return
paulb@140 562
paulb@140 563
        start_persistent(address, callable, *args, **kw)
paulb@140 564
paulb@140 565
    def create(self, address):
paulb@140 566
paulb@140 567
        """
paulb@140 568
        Create a new process, located at the given 'address', and return the
paulb@140 569
        created communications channel to the created process. In the creating
paulb@140 570
        process, return None - the channel receiving data from the created
paulb@140 571
        process will be automatically managed by this exchange.
paulb@140 572
        """
paulb@140 573
paulb@140 574
        channel = create_persistent(address)
paulb@140 575
        return self._get_channel_for_process(channel)
paulb@140 576
paulb@140 577
    def manage(self, address, callable):
paulb@140 578
paulb@140 579
        """
paulb@140 580
        Using the given 'address', publish the given 'callable' in an object
paulb@140 581
        which can then be called in the same way as 'callable', but with new
paulb@140 582
        processes and communications managed automatically.
paulb@140 583
        """
paulb@140 584
paulb@140 585
        return PersistentCallable(address, callable, self)
paulb@140 586
paulb@140 587
    def connect(self, address):
paulb@140 588
paulb@140 589
        "Connect to a process which is contactable via the given 'address'."
paulb@140 590
paulb@140 591
        channel = connect_persistent(address)
paulb@140 592
        self.add_wait(channel)
paulb@140 593
paulb@84 594
class ManagedCallable:
paulb@84 595
paulb@84 596
    "A callable managed by an exchange."
paulb@84 597
paulb@84 598
    def __init__(self, callable, exchange):
paulb@84 599
paulb@84 600
        """
paulb@84 601
        Wrap the given 'callable', using the given 'exchange' to monitor the
paulb@84 602
        channels created for communications between this and the created
paulb@94 603
        processes. Note that the 'callable' must be parallel-aware (that is,
paulb@94 604
        have a 'channel' parameter). Use the MakeParallel class to wrap other
paulb@94 605
        kinds of callable objects.
paulb@84 606
        """
paulb@84 607
paulb@84 608
        self.callable = callable
paulb@84 609
        self.exchange = exchange
paulb@84 610
paulb@84 611
    def __call__(self, *args, **kw):
paulb@84 612
paulb@84 613
        "Invoke the callable with the supplied arguments."
paulb@84 614
paulb@84 615
        self.exchange.start(self.callable, *args, **kw)
paulb@84 616
paulb@140 617
class PersistentCallable:
paulb@140 618
paulb@140 619
    "A callable which sets up a persistent communications channel."
paulb@140 620
paulb@140 621
    def __init__(self, address, callable, exchange):
paulb@140 622
paulb@140 623
        """
paulb@140 624
        Using the given 'address', wrap the given 'callable', using the given
paulb@140 625
        'exchange' to monitor the channels created for communications between
paulb@140 626
        this and the created processes, so that when it is called, a background
paulb@140 627
        process is started within which the 'callable' will run. Note that the
paulb@140 628
        'callable' must be parallel-aware (that is, have a 'channel' parameter).
paulb@140 629
        Use the MakeParallel class to wrap other kinds of callable objects.
paulb@140 630
        """
paulb@140 631
paulb@140 632
        self.callable = callable
paulb@140 633
        self.exchange = exchange
paulb@140 634
        self.address = address
paulb@140 635
paulb@140 636
    def __call__(self, *args, **kw):
paulb@140 637
paulb@142 638
        "Invoke the callable with the supplied arguments."
paulb@140 639
paulb@140 640
        self.exchange.start(self.address, self.callable, *args, **kw)
paulb@140 641
paulb@142 642
class BackgroundCallable:
paulb@142 643
paulb@142 644
    """
paulb@142 645
    A callable which sets up a persistent communications channel, but is
paulb@142 646
    unmanaged by an exchange.
paulb@142 647
    """
paulb@142 648
paulb@142 649
    def __init__(self, address, callable):
paulb@142 650
paulb@142 651
        """
paulb@142 652
        Using the given 'address', wrap the given 'callable'. This object can
paulb@142 653
        then be invoked, but the wrapped callable will be run in a background
paulb@142 654
        process. Note that the 'callable' must be parallel-aware (that is, have
paulb@142 655
        a 'channel' parameter). Use the MakeParallel class to wrap other kinds
paulb@142 656
        of callable objects.
paulb@142 657
        """
paulb@142 658
paulb@142 659
        self.callable = callable
paulb@142 660
        self.address = address
paulb@142 661
paulb@142 662
    def __call__(self, *args, **kw):
paulb@142 663
paulb@142 664
        "Invoke the callable with the supplied arguments."
paulb@142 665
paulb@142 666
        start_persistent(self.address, self.callable, *args, **kw)
paulb@142 667
paulb@84 668
# Abstractions and utilities.
paulb@84 669
paulb@84 670
class Map(Exchange):
paulb@84 671
paulb@84 672
    "An exchange which can be used like the built-in 'map' function."
paulb@84 673
paulb@107 674
    def __init__(self, *args, **kw):
paulb@107 675
        Exchange.__init__(self, *args, **kw)
paulb@107 676
        self.init()
paulb@107 677
paulb@107 678
    def init(self):
paulb@107 679
paulb@107 680
        "Remember the channel addition order to order output."
paulb@107 681
paulb@107 682
        self.channel_number = 0
paulb@107 683
        self.channels = {}
paulb@107 684
        self.results = []
paul@156 685
        self.current_index = 0
paulb@107 686
paulb@84 687
    def add(self, channel):
paulb@84 688
paulb@84 689
        "Add the given 'channel' to the exchange."
paulb@84 690
paulb@84 691
        Exchange.add(self, channel)
paulb@92 692
        self.channels[channel] = self.channel_number
paulb@92 693
        self.channel_number += 1
paulb@84 694
paulb@107 695
    def start(self, callable, *args, **kw):
paulb@107 696
paulb@107 697
        """
paulb@140 698
        Create a new process for the given 'callable' using any additional
paulb@140 699
        arguments provided. Then, monitor the channel created between this
paulb@140 700
        process and the created process.
paulb@107 701
        """
paulb@107 702
paul@156 703
        self.results.append(Undefined) # placeholder
paulb@107 704
        Exchange.start(self, callable, *args, **kw)
paulb@107 705
paulb@110 706
    def create(self):
paulb@110 707
paulb@110 708
        """
paulb@140 709
        Create a new process and return the created communications channel to
paulb@140 710
        the created process. In the creating process, return None - the channel
paulb@140 711
        receiving data from the created process will be automatically managed by
paulb@140 712
        this exchange.
paulb@110 713
        """
paulb@110 714
paul@156 715
        self.results.append(Undefined) # placeholder
paulb@110 716
        return Exchange.create(self)
paulb@110 717
paulb@84 718
    def __call__(self, callable, sequence):
paulb@84 719
paulb@89 720
        "Wrap and invoke 'callable' for each element in the 'sequence'."
paulb@89 721
paulb@92 722
        if not isinstance(callable, MakeParallel):
paulb@92 723
            wrapped = MakeParallel(callable)
paulb@92 724
        else:
paulb@92 725
            wrapped = callable
paulb@84 726
paulb@107 727
        self.init()
paulb@84 728
paulb@107 729
        # Start processes for each element in the sequence.
paulb@84 730
paulb@84 731
        for i in sequence:
paulb@92 732
            self.start(wrapped, i)
paulb@107 733
paulb@107 734
        # Access to the results occurs through this object.
paulb@107 735
paulb@107 736
        return self
paulb@84 737
paulb@84 738
    def store_data(self, channel):
paulb@84 739
paulb@84 740
        "Accumulate the incoming data, associating results with channels."
paulb@84 741
paulb@84 742
        data = channel.receive()
paulb@92 743
        self.results[self.channels[channel]] = data
paulb@92 744
        del self.channels[channel]
paulb@84 745
paul@156 746
    def __iter__(self):
paul@156 747
        return self
paul@156 748
paul@156 749
    def next(self):
paul@156 750
paul@156 751
        "Return the next element in the map."
paul@156 752
paul@156 753
        try:
paul@156 754
            return self._next()
paul@156 755
        except IndexError:
paul@156 756
            pass
paul@156 757
paul@158 758
        while self.unfinished():
paul@156 759
            self.store()
paul@156 760
            try:
paul@156 761
                return self._next()
paul@156 762
            except IndexError:
paul@156 763
                pass
paul@156 764
        else:
paul@156 765
            raise StopIteration
paul@156 766
paul@156 767
    def __getitem__(self, i):
paul@156 768
paul@156 769
        "Return element 'i' from the map."
paul@156 770
paul@156 771
        try:
paul@156 772
            return self._get(i)
paul@156 773
        except IndexError:
paul@156 774
            pass
paul@156 775
paul@158 776
        while self.unfinished():
paul@156 777
            self.store()
paul@156 778
            try:
paul@156 779
                return self._get(i)
paul@156 780
            except IndexError:
paul@156 781
                pass
paul@156 782
        else:
paul@156 783
            raise IndexError, i
paul@156 784
paul@156 785
    # Helper methods for the above access methods.
paul@156 786
paul@156 787
    def _next(self):
paul@156 788
        result = self._get(self.current_index)
paul@156 789
        self.current_index += 1
paul@156 790
        return result
paul@156 791
paul@156 792
    def _get(self, i):
paul@156 793
        result = self.results[i]
paul@156 794
        if result is Undefined or isinstance(i, slice) and Undefined in result:
paul@156 795
            raise IndexError, i
paul@156 796
        return result
paul@156 797
paulb@97 798
class Queue(Exchange):
paulb@97 799
paulb@97 800
    """
paulb@97 801
    An exchange acting as a queue, making data from created processes available
paulb@97 802
    in the order in which it is received.
paulb@97 803
    """
paulb@97 804
paulb@97 805
    def __init__(self, *args, **kw):
paulb@97 806
        Exchange.__init__(self, *args, **kw)
paulb@97 807
        self.queue = []
paulb@97 808
paulb@97 809
    def store_data(self, channel):
paulb@97 810
paulb@97 811
        "Accumulate the incoming data, associating results with channels."
paulb@97 812
paulb@97 813
        data = channel.receive()
paulb@97 814
        self.queue.insert(0, data)
paulb@97 815
paulb@97 816
    def __iter__(self):
paulb@97 817
        return self
paulb@97 818
paulb@97 819
    def next(self):
paulb@97 820
paulb@97 821
        "Return the next element in the queue."
paulb@97 822
paulb@97 823
        if self.queue:
paulb@97 824
            return self.queue.pop()
paul@158 825
paul@158 826
        while self.unfinished():
paulb@97 827
            self.store()
paulb@97 828
            if self.queue:
paulb@97 829
                return self.queue.pop()
paulb@97 830
        else:
paulb@97 831
            raise StopIteration
paulb@97 832
paul@158 833
    def __len__(self):
paul@158 834
paul@158 835
        "Return the current length of the queue."
paul@158 836
paul@158 837
        return len(self.queue)
paul@158 838
paulb@84 839
class MakeParallel:
paulb@84 840
paulb@84 841
    "A wrapper around functions making them able to communicate results."
paulb@84 842
paulb@84 843
    def __init__(self, callable):
paulb@84 844
paulb@94 845
        """
paulb@94 846
        Initialise the wrapper with the given 'callable'. This object will then
paulb@94 847
        be able to accept a 'channel' parameter when invoked, and to forward the
paulb@94 848
        result of the given 'callable' via the channel provided back to the
paulb@94 849
        invoking process.
paulb@94 850
        """
paulb@84 851
paulb@84 852
        self.callable = callable
paulb@84 853
paulb@84 854
    def __call__(self, channel, *args, **kw):
paulb@84 855
paulb@84 856
        "Invoke the callable and return its result via the given 'channel'."
paulb@84 857
paulb@84 858
        channel.send(self.callable(*args, **kw))
paulb@84 859
paulb@119 860
class MakeReusable(MakeParallel):
paulb@119 861
paulb@119 862
    """
paulb@119 863
    A wrapper around functions making them able to communicate results in a
paulb@119 864
    reusable fashion.
paulb@119 865
    """
paulb@119 866
paulb@119 867
    def __call__(self, channel, *args, **kw):
paulb@119 868
paulb@119 869
        "Invoke the callable and return its result via the given 'channel'."
paulb@119 870
paulb@119 871
        channel.send(self.callable(*args, **kw))
paulb@119 872
        t = channel.receive()
paulb@119 873
        while t is not None:
paulb@119 874
            args, kw = t
paulb@119 875
            channel.send(self.callable(*args, **kw))
paulb@119 876
            t = channel.receive()
paulb@119 877
paulb@140 878
# Persistent variants.
paulb@140 879
paulb@140 880
class PersistentExchange(Persistent, Exchange):
paulb@140 881
paulb@140 882
    "An exchange which manages persistent communications."
paulb@140 883
paulb@140 884
    pass
paulb@140 885
paulb@140 886
class PersistentQueue(Persistent, Queue):
paulb@140 887
paulb@140 888
    "A queue which manages persistent communications."
paulb@140 889
paulb@140 890
    pass
paulb@140 891
paulb@142 892
# Convenience functions.
paulb@142 893
paulb@142 894
def BackgroundQueue(address):
paulb@142 895
paulb@142 896
    """
paulb@142 897
    Connect to a process reachable via the given 'address', making the results
paulb@142 898
    of which accessible via a queue.
paulb@142 899
    """
paulb@142 900
paulb@142 901
    queue = PersistentQueue(limit=1)
paulb@142 902
    queue.connect(address)
paulb@142 903
    return queue
paulb@142 904
paulb@142 905
def pmap(callable, sequence, limit=None):
paulb@142 906
paulb@142 907
    """
paulb@142 908
    A parallel version of the built-in map function with an optional process
paulb@142 909
    'limit'. The given 'callable' should not be parallel-aware (that is, have a
paulb@142 910
    'channel' parameter) since it will be wrapped for parallel communications
paulb@142 911
    before being invoked.
paulb@142 912
paulb@142 913
    Return the processed 'sequence' where each element in the sequence is
paulb@142 914
    processed by a different process.
paulb@142 915
    """
paulb@142 916
paulb@142 917
    mymap = Map(limit=limit)
paulb@142 918
    return mymap(callable, sequence)
paulb@142 919
paulb@84 920
# Utility functions.
paulb@79 921
paul@155 922
_cpuinfo_fields = "processor", "physical id", "core id"
paulb@135 923
paul@152 924
def _get_number_of_cores():
paulb@135 925
paulb@135 926
    """
paulb@135 927
    Return the number of distinct, genuine processor cores. If the platform is
paulb@135 928
    not supported by this function, None is returned.
paulb@135 929
    """
paulb@135 930
paulb@135 931
    try:
paulb@135 932
        f = open("/proc/cpuinfo")
paulb@135 933
        try:
paulb@135 934
            processors = set()
paul@155 935
paul@155 936
            # Use the _cpuinfo_field values as "digits" in a larger unique
paul@155 937
            # core identifier.
paul@155 938
paul@155 939
            processor = [None, None, None]
paulb@135 940
paulb@135 941
            for line in f.xreadlines():
paulb@135 942
                for i, field in enumerate(_cpuinfo_fields):
paul@155 943
paul@155 944
                    # Where the field is found, insert the value into the
paul@155 945
                    # appropriate location in the processor identifier.
paul@155 946
paulb@135 947
                    if line.startswith(field):
paulb@135 948
                        t = line.split(":")
paulb@135 949
                        processor[i] = int(t[1].strip())
paulb@135 950
                        break
paul@155 951
paul@155 952
                # Where a new processor description is started, record the
paul@155 953
                # identifier.
paul@155 954
paul@155 955
                if line.startswith("processor") and processor[0] is not None:
paul@155 956
                    processors.add(tuple(processor))
paul@155 957
                    processor = [None, None, None]
paul@155 958
paul@155 959
            # At the end of reading the file, add any unrecorded processors.
paulb@135 960
paulb@135 961
            if processor[0] is not None:
paulb@135 962
                processors.add(tuple(processor))
paulb@135 963
paulb@135 964
            return len(processors)
paulb@135 965
paulb@135 966
        finally:
paulb@135 967
            f.close()
paulb@135 968
paulb@135 969
    except OSError:
paulb@135 970
        return None
paulb@135 971
paul@152 972
def _get_number_of_cores_solaris():
paul@152 973
paul@152 974
    """
paul@152 975
    Return the number of cores for OpenSolaris 2008.05 and possibly other
paul@152 976
    editions of Solaris.
paul@152 977
    """
paul@152 978
paul@152 979
    f = os.popen("psrinfo -p")
paul@152 980
    try:
paul@152 981
        return int(f.read().strip())
paul@152 982
    finally:
paul@152 983
        f.close()
paul@152 984
paulb@142 985
# Low-level functions.
paulb@142 986
paulb@150 987
def create_socketpair():
paulb@40 988
paulb@40 989
    """
paulb@40 990
    Create a new process, returning a communications channel to both the
paulb@40 991
    creating process and the created process.
paulb@40 992
    """
paulb@40 993
paulb@40 994
    parent, child = socket.socketpair()
paulb@40 995
    for s in [parent, child]:
paulb@40 996
        s.setblocking(1)
paulb@40 997
paulb@40 998
    pid = os.fork()
paulb@40 999
    if pid == 0:
paulb@40 1000
        parent.close()
paulb@73 1001
        return Channel(pid, child.makefile("r", 0), child.makefile("w", 0))
paulb@40 1002
    else:
paulb@40 1003
        child.close()
paulb@73 1004
        return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0))
paulb@40 1005
paulb@150 1006
def create_pipes():
paulb@150 1007
paulb@150 1008
    """
paulb@150 1009
    Create a new process, returning a communications channel to both the
paulb@150 1010
    creating process and the created process.
paulb@150 1011
paulb@150 1012
    This function uses pipes instead of a socket pair, since some platforms
paulb@150 1013
    seem to have problems with poll and such socket pairs.
paulb@150 1014
    """
paulb@150 1015
paulb@150 1016
    pr, cw = os.pipe()
paulb@150 1017
    cr, pw = os.pipe()
paulb@150 1018
paulb@150 1019
    pid = os.fork()
paulb@150 1020
    if pid == 0:
paulb@151 1021
        os.close(pr)
paulb@151 1022
        os.close(pw)
paulb@150 1023
        return Channel(pid, os.fdopen(cr, "r", 0), os.fdopen(cw, "w", 0))
paulb@150 1024
    else:
paulb@151 1025
        os.close(cr)
paulb@151 1026
        os.close(cw)
paulb@150 1027
        return Channel(pid, os.fdopen(pr, "r", 0), os.fdopen(pw, "w", 0))
paulb@150 1028
paulb@150 1029
if platform.system() == "SunOS":
paulb@150 1030
    create = create_pipes
paul@152 1031
    get_number_of_cores = _get_number_of_cores_solaris
paulb@150 1032
else:
paulb@150 1033
    create = create_socketpair
paul@152 1034
    get_number_of_cores = _get_number_of_cores
paulb@150 1035
paulb@140 1036
def create_persistent(address):
paulb@140 1037
paulb@140 1038
    """
paulb@140 1039
    Create a new process, returning a persistent communications channel between
paulb@140 1040
    the creating process and the created process. This channel can be
paulb@140 1041
    disconnected from the creating process and connected to another process, and
paulb@140 1042
    thus can be used to collect results from daemon processes.
paulb@140 1043
paulb@140 1044
    In order to be able to reconnect to created processes, the 'address' of the
paulb@140 1045
    communications endpoint for the created process needs to be provided. This
paulb@140 1046
    should be a filename.
paulb@140 1047
    """
paulb@140 1048
paulb@140 1049
    parent = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
paulb@140 1050
    child = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
paulb@140 1051
    child.bind(address)
paulb@140 1052
paulb@140 1053
    for s in [parent, child]:
paulb@140 1054
        s.setblocking(1)
paulb@140 1055
paulb@140 1056
    pid = os.fork()
paulb@140 1057
    if pid == 0:
paulb@140 1058
        parent.close()
paulb@140 1059
        return PersistentChannel(pid, child, address)
paulb@140 1060
    else:
paulb@140 1061
        child.close()
paulb@140 1062
        #parent.connect(address)
paulb@140 1063
        return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0))
paulb@140 1064
paulb@140 1065
def connect_persistent(address):
paulb@140 1066
paulb@140 1067
    """
paulb@140 1068
    Connect via a persistent channel to an existing created process, reachable
paulb@140 1069
    at the given 'address'.
paulb@140 1070
    """
paulb@140 1071
paulb@140 1072
    parent = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
paulb@140 1073
    parent.setblocking(1)
paulb@140 1074
    parent.connect(address)
paulb@140 1075
    return Channel(0, parent.makefile("r", 0), parent.makefile("w", 0))
paulb@140 1076
paulb@97 1077
def exit(channel):
paulb@97 1078
paulb@97 1079
    """
paulb@97 1080
    Terminate a created process, closing the given 'channel'.
paulb@97 1081
    """
paulb@97 1082
paulb@97 1083
    channel.close()
paulb@97 1084
    os._exit(0)
paulb@97 1085
paulb@84 1086
def start(callable, *args, **kw):
paulb@40 1087
paulb@40 1088
    """
paulb@40 1089
    Create a new process which shall start running in the given 'callable'.
paulb@94 1090
    Additional arguments to the 'callable' can be given as additional arguments
paulb@94 1091
    to this function.
paulb@94 1092
paulb@94 1093
    Return a communications channel to the creating process. For the created
paulb@94 1094
    process, supply a channel as the 'channel' parameter in the given 'callable'
paulb@94 1095
    so that it may send data back to the creating process.
paulb@40 1096
    """
paulb@40 1097
paulb@40 1098
    channel = create()
paulb@40 1099
    if channel.pid == 0:
paulb@40 1100
        try:
paulb@40 1101
            try:
paulb@84 1102
                callable(channel, *args, **kw)
paulb@40 1103
            except:
paulb@40 1104
                exc_type, exc_value, exc_traceback = sys.exc_info()
paulb@40 1105
                channel.send(exc_value)
paulb@40 1106
        finally:
paulb@99 1107
            exit(channel)
paulb@40 1108
    else:
paulb@40 1109
        return channel
paulb@40 1110
paulb@140 1111
def start_persistent(address, callable, *args, **kw):
paulb@140 1112
paulb@140 1113
    """
paulb@140 1114
    Create a new process which shall be reachable using the given 'address' and
paulb@140 1115
    which will start running in the given 'callable'. Additional arguments to
paulb@140 1116
    the 'callable' can be given as additional arguments to this function.
paulb@140 1117
paulb@140 1118
    Return a communications channel to the creating process. For the created
paulb@140 1119
    process, supply a channel as the 'channel' parameter in the given 'callable'
paulb@140 1120
    so that it may send data back to the creating process.
paulb@140 1121
paulb@140 1122
    Note that the created process employs a channel which is persistent: it can
paulb@140 1123
    withstand disconnection from the creating process and subsequent connections
paulb@140 1124
    from other processes.
paulb@140 1125
    """
paulb@140 1126
paulb@140 1127
    channel = create_persistent(address)
paulb@140 1128
    if channel.pid == 0:
paulb@140 1129
        close_streams()
paulb@140 1130
        try:
paulb@140 1131
            try:
paulb@140 1132
                callable(channel, *args, **kw)
paulb@140 1133
            except:
paulb@140 1134
                exc_type, exc_value, exc_traceback = sys.exc_info()
paulb@140 1135
                channel.send(exc_value)
paulb@140 1136
        finally:
paulb@140 1137
            exit(channel)
paulb@140 1138
    else:
paulb@140 1139
        return channel
paulb@140 1140
paulb@140 1141
def close_streams():
paulb@140 1142
paulb@140 1143
    """
paulb@140 1144
    Close streams which keep the current process attached to any creating
paulb@140 1145
    processes.
paulb@140 1146
    """
paulb@140 1147
paulb@140 1148
    os.close(sys.stdin.fileno())
paulb@140 1149
    os.close(sys.stdout.fileno())
paulb@140 1150
    os.close(sys.stderr.fileno())
paulb@140 1151
paulb@40 1152
def waitall():
paulb@40 1153
paulb@40 1154
    "Wait for all created processes to terminate."
paulb@40 1155
paulb@40 1156
    try:
paulb@40 1157
        while 1:
paulb@40 1158
            os.wait()
paulb@40 1159
    except OSError:
paulb@40 1160
        pass
paulb@40 1161
paulb@40 1162
# vim: tabstop=4 expandtab shiftwidth=4