pprocess

examples/simple_persistent_managed.py

151:db8366eed1d8
2008-06-08 paulb [project @ 2008-06-08 19:44:10 by paulb] Fixed pipe closure.
     1 #!/usr/bin/env python     2      3 """     4 A simple example of parallel computation using exchanges, managed callables and     5 persistent communications.     6 """     7      8 import pprocess     9 import time    10 #import random    11 import sys    12     13 # Array size and a limit on the number of processes.    14     15 N = 10    16 limit = 10    17 delay = 1    18     19 # Work function and monitoring class.    20     21 def calculate(i, j):    22     23     """    24     A supposedly time-consuming calculation on 'i' and 'j'.    25     """    26     27     #time.sleep(delay * random.random())    28     time.sleep(delay)    29     return (i, j, i * N + j)    30     31 class MyExchange(pprocess.Exchange):    32     33     "Parallel convenience class containing the array assignment operation."    34     35     def store_data(self, ch):    36         i, j, result = ch.receive()    37         self.D[i*N+j] = result    38     39 class MyPersistentExchange(pprocess.PersistentExchange):    40     41     "A persistent exchange."    42     43     def store_data(self, ch):    44         self.D[:] = ch.receive()    45     46 # Main computation.    47     48 def task():    49     50     # Initialise the communications exchange with a limit on the number of    51     # channels/processes.    52     53     exchange = MyExchange(limit=limit)    54     55     # Initialise an array - it is stored in the exchange to permit automatic    56     # assignment of values as the data arrives.    57     58     results = exchange.D = [0] * N * N    59     60     # Wrap the calculate function and manage it.    61     62     calc = exchange.manage(pprocess.MakeParallel(calculate))    63     64     # Perform the work.    65     66     for i in range(0, N):    67         for j in range(0, N):    68             calc(i, j)    69     70     # Wait for the results.    71     72     exchange.finish()    73     return results    74     75 # Main program.    76     77 if __name__ == "__main__":    78     79     t = time.time()    80     81     # Initialise an array - it is stored in the exchange to permit automatic    82     # assignment of values as the data arrives.    83     84     results = [0] * N * N    85     86     if "--reconnect" not in sys.argv:    87     88         # Initialise the persistent exchange with a single process for the main    89         # computation.    90     91         exchange = MyPersistentExchange(limit=1)    92     93         # Wrap the computation and manage it.    94     95         ptask = exchange.manage("task.socket", pprocess.MakeParallel(task))    96     97         # Perform the work.    98     99         ptask()   100    101         # Close down the exchange.   102    103         del exchange   104         print "Closed original exchange."   105    106     if "--start" not in sys.argv:   107    108         # Open another exchange.   109    110         print "Opening new exchange."   111         exchange = MyPersistentExchange(limit=1)   112         exchange.D = results   113    114         # Reconnect to the task.   115    116         exchange.connect("task.socket")   117    118         # Wait for the results.   119    120         print "Waiting for persistent results"   121         exchange.finish()   122    123         # Show the results.   124    125         for i in range(0, N):   126             for result in results[i*N:i*N+N]:   127                 print result,   128             print   129    130         print "Time taken:", time.time() - t   131    132 # vim: tabstop=4 expandtab shiftwidth=4