pprocess

examples/simple_managed.py

140:60e9dd615839
2008-06-01 paulb [project @ 2008-06-01 14:50:28 by paulb] Added support for persistent/background processes, which involves a certain amount of reorganisation to support different styles of process creation and interprocess communication.
     1 #!/usr/bin/env python     2      3 """     4 A simple example of parallel computation using exchanges and managed callables.     5 """     6      7 import pprocess     8 import time     9 #import random    10     11 # Array size and a limit on the number of processes.    12     13 N = 10    14 limit = 10    15 delay = 1    16     17 # Work function and monitoring class.    18     19 def calculate(i, j):    20     21     """    22     A supposedly time-consuming calculation on 'i' and 'j'.    23     """    24     25     #time.sleep(delay * random.random())    26     time.sleep(delay)    27     return (i, j, i * N + j)    28     29 class MyExchange(pprocess.Exchange):    30     31     "Parallel convenience class containing the array assignment operation."    32     33     def store_data(self, ch):    34         i, j, result = ch.receive()    35         self.D[i*N+j] = result    36     37 # Main program.    38     39 if __name__ == "__main__":    40     41     t = time.time()    42     43     # Initialise the communications exchange with a limit on the number of    44     # channels/processes.    45     46     exchange = MyExchange(limit=limit)    47     48     # Initialise an array - it is stored in the exchange to permit automatic    49     # assignment of values as the data arrives.    50     51     results = exchange.D = [0] * N * N    52     53     # Wrap the calculate function and manage it.    54     55     calc = exchange.manage(pprocess.MakeParallel(calculate))    56     57     # Perform the work.    58     59     print "Calculating..."    60     for i in range(0, N):    61         for j in range(0, N):    62             calc(i, j)    63     64     # Wait for the results.    65     66     print "Finishing..."    67     exchange.finish()    68     69     # Show the results.    70     71     for i in range(0, N):    72         for result in results[i*N:i*N+N]:    73             print result,    74         print    75     76     print "Time taken:", time.time() - t    77     78 # vim: tabstop=4 expandtab shiftwidth=4