1 #!/usr/bin/env python 2 3 """ 4 A simple example of parallel computation using persistent queues and 5 communications. 6 """ 7 8 import pprocess 9 import time 10 #import random 11 import sys 12 13 # Array size and a limit on the number of processes. 14 15 N = 10 16 limit = 2 # since N background processes will be used, this is reduced 17 delay = 1 18 19 # Work function and monitoring class. 20 21 def calculate(i, j): 22 23 """ 24 A supposedly time-consuming calculation on 'i' and 'j'. 25 """ 26 27 #time.sleep(delay * random.random()) 28 time.sleep(delay) 29 return (i, j, i * N + j) 30 31 # Background computation. 32 33 def task(i): 34 35 # Initialise the communications queue with a limit on the number of 36 # channels/processes. 37 38 queue = pprocess.Queue(limit=limit) 39 40 # Initialise an array. 41 42 results = [0] * N 43 44 # Wrap the calculate function and manage it. 45 46 calc = queue.manage(pprocess.MakeParallel(calculate)) 47 48 # Perform the work. 49 50 print "Calculating..." 51 for j in range(0, N): 52 calc(i, j) 53 54 # Store the results as they arrive. 55 56 print "Finishing..." 57 for i, j, result in queue: 58 results[j] = result 59 60 return i, results 61 62 # Main program. 63 64 if __name__ == "__main__": 65 66 t = time.time() 67 68 if "--reconnect" not in sys.argv: 69 70 # Wrap the computation and manage it. 71 72 ptask = pprocess.MakeParallel(task) 73 74 for i in range(0, N): 75 76 # Make a distinct callable for each part of the computation. 77 78 ptask_i = pprocess.BackgroundCallable("task-%d.socket" % i, ptask) 79 80 # Perform the work. 81 82 ptask_i(i) 83 84 # Discard the callable. 85 86 del ptask 87 print "Discarded the callable." 88 89 if "--start" not in sys.argv: 90 91 # Open a queue and reconnect to the task. 92 93 print "Opening a queue." 94 queue = pprocess.PersistentQueue() 95 for i in range(0, N): 96 queue.connect("task-%d.socket" % i) 97 98 # Initialise an array. 99 100 results = [0] * N 101 102 # Wait for the results. 103 104 print "Waiting for persistent results" 105 for i, result in queue: 106 results[i] = result 107 108 # Show the results. 109 110 for i in range(0, N): 111 for result in results[i]: 112 print result, 113 print 114 115 print "Time taken:", time.time() - t 116 117 # vim: tabstop=4 expandtab shiftwidth=4