pprocess

examples/simple_persistent_queue.py

149:eeaa043dbfb1
2008-06-04 paulb [project @ 2008-06-04 22:13:31 by paulb] Added a link to the reference document.
     1 #!/usr/bin/env python     2      3 """     4 A simple example of parallel computation using persistent queues and     5 communications.     6 """     7      8 import pprocess     9 import time    10 #import random    11 import sys    12     13 # Array size and a limit on the number of processes.    14     15 N = 10    16 limit = 2 # since N background processes will be used, this is reduced    17 delay = 1    18     19 # Work function and monitoring class.    20     21 def calculate(i, j):    22     23     """    24     A supposedly time-consuming calculation on 'i' and 'j'.    25     """    26     27     #time.sleep(delay * random.random())    28     time.sleep(delay)    29     return (i, j, i * N + j)    30     31 # Background computation.    32     33 def task(i):    34     35     # Initialise the communications queue with a limit on the number of    36     # channels/processes.    37     38     queue = pprocess.Queue(limit=limit)    39     40     # Initialise an array.    41     42     results = [0] * N    43     44     # Wrap the calculate function and manage it.    45     46     calc = queue.manage(pprocess.MakeParallel(calculate))    47     48     # Perform the work.    49     50     print "Calculating..."    51     for j in range(0, N):    52         calc(i, j)    53     54     # Store the results as they arrive.    55     56     print "Finishing..."    57     for i, j, result in queue:    58         results[j] = result    59     60     return i, results    61     62 # Main program.    63     64 if __name__ == "__main__":    65     66     t = time.time()    67     68     if "--reconnect" not in sys.argv:    69     70         # Wrap the computation and manage it.    71     72         ptask = pprocess.MakeParallel(task)    73     74         for i in range(0, N):    75     76             # Make a distinct callable for each part of the computation.    77     78             ptask_i = pprocess.BackgroundCallable("task-%d.socket" % i, ptask)    79     80             # Perform the work.    81     82             ptask_i(i)    83     84         # Discard the callable.    85     86         del ptask    87         print "Discarded the callable."    88     89     if "--start" not in sys.argv:    90     91         # Open a queue and reconnect to the task.    92     93         print "Opening a queue."    94         queue = pprocess.PersistentQueue()    95         for i in range(0, N):    96             queue.connect("task-%d.socket" % i)    97     98         # Initialise an array.    99    100         results = [0] * N   101    102         # Wait for the results.   103    104         print "Waiting for persistent results"   105         for i, result in queue:   106             results[i] = result   107    108         # Show the results.   109    110         for i in range(0, N):   111             for result in results[i]:   112                 print result,   113             print   114    115         print "Time taken:", time.time() - t   116    117 # vim: tabstop=4 expandtab shiftwidth=4