pprocess

examples/simple_persistent_managed_queue.py

145:614aa092ba4e
2008-06-02 paulb [project @ 2008-06-02 22:43:43 by paulb] Added details of the persistent and background features and examples.
     1 #!/usr/bin/env python     2      3 """     4 A simple example of parallel computation using a queue, managed callables and     5 persistent communications.     6 """     7      8 import pprocess     9 import time    10 #import random    11 import sys    12     13 # Array size and a limit on the number of processes.    14     15 N = 10    16 limit = 10    17 delay = 1    18     19 # Work function and monitoring class.    20     21 def calculate(i, j):    22     23     """    24     A supposedly time-consuming calculation on 'i' and 'j'.    25     """    26     27     #time.sleep(delay * random.random())    28     time.sleep(delay)    29     return (i, j, i * N + j)    30     31 # Main computation.    32     33 def task():    34     35     # Initialise the communications queue with a limit on the number of    36     # channels/processes.    37     38     queue = pprocess.Queue(limit=limit)    39     40     # Initialise an array.    41     42     results = [0] * N * N    43     44     # Wrap the calculate function and manage it.    45     46     calc = queue.manage(pprocess.MakeParallel(calculate))    47     48     # Perform the work.    49     50     print "Calculating..."    51     for i in range(0, N):    52         for j in range(0, N):    53             calc(i, j)    54     55     # Store the results as they arrive.    56     57     print "Finishing..."    58     for i, j, result in queue:    59         results[i*N+j] = result    60     61     return results    62     63 # Main program.    64     65 if __name__ == "__main__":    66     67     t = time.time()    68     69     if "--reconnect" not in sys.argv:    70     71         queue = pprocess.PersistentQueue(limit=1)    72     73         # Wrap the computation and manage it.    74     75         ptask = queue.manage("task.socket", pprocess.MakeParallel(task))    76     77         # Perform the work.    78     79         ptask()    80     81         # Close down the queue.    82     83         del queue    84         print "Closed original queue."    85     86     if "--start" not in sys.argv:    87     88         # Open another queue.    89     90         print "Opening new queue."    91         queue = pprocess.PersistentQueue(limit=1)    92     93         # Reconnect to the task.    94     95         queue.connect("task.socket")    96     97         # Wait for the results.    98     99         print "Waiting for persistent results"   100         for results in queue:   101             pass # should only be one element   102    103         # Show the results.   104    105         for i in range(0, N):   106             for result in results[i*N:i*N+N]:   107                 print result,   108             print   109    110         print "Time taken:", time.time() - t   111    112 # vim: tabstop=4 expandtab shiftwidth=4