pprocess

examples/simple_background_queue.py

147:6d1f2970de78
2008-06-02 paulb [project @ 2008-06-02 22:44:40 by paulb] Updated release information.
     1 #!/usr/bin/env python     2      3 """     4 A simple example of parallel computation using a queue, background callables and     5 persistent communications.     6 """     7      8 import pprocess     9 import time    10 #import random    11 import sys    12     13 # Array size and a limit on the number of processes.    14     15 N = 10    16 limit = 10    17 delay = 1    18     19 # Work function and monitoring class.    20     21 def calculate(i, j):    22     23     """    24     A supposedly time-consuming calculation on 'i' and 'j'.    25     """    26     27     #time.sleep(delay * random.random())    28     time.sleep(delay)    29     return (i, j, i * N + j)    30     31 # Main computation.    32     33 def task():    34     35     # Initialise the communications queue with a limit on the number of    36     # channels/processes.    37     38     queue = pprocess.Queue(limit=limit)    39     40     # Initialise an array.    41     42     results = [0] * N * N    43     44     # Wrap the calculate function and manage it.    45     46     calc = queue.manage(pprocess.MakeParallel(calculate))    47     48     # Perform the work.    49     50     print "Calculating..."    51     for i in range(0, N):    52         for j in range(0, N):    53             calc(i, j)    54     55     # Store the results as they arrive.    56     57     print "Finishing..."    58     for i, j, result in queue:    59         results[i*N+j] = result    60     61     return results    62     63 # Main program.    64     65 if __name__ == "__main__":    66     67     t = time.time()    68     69     if "--reconnect" not in sys.argv:    70     71         # Wrap the computation and manage it.    72     73         ptask = pprocess.BackgroundCallable("task.socket", pprocess.MakeParallel(task))    74     75         # Perform the work.    76     77         ptask()    78     79         # Discard the callable.    80     81         del ptask    82         print "Discarded the callable."    83     84     if "--start" not in sys.argv:    85     86         # Open a queue and reconnect to the task.    87     88         print "Opening a queue."    89         queue = pprocess.BackgroundQueue("task.socket")    90     91         # Wait for the results.    92     93         print "Waiting for persistent results"    94         for results in queue:    95             pass # should only be one element    96     97         # Show the results.    98     99         for i in range(0, N):   100             for result in results[i*N:i*N+N]:   101                 print result,   102             print   103    104         print "Time taken:", time.time() - t   105    106 # vim: tabstop=4 expandtab shiftwidth=4