1 #!/usr/bin/env python 2 3 """ 4 A simple example of parallel computation using a queue, managed callables and 5 persistent communications. 6 """ 7 8 import pprocess 9 import time 10 #import random 11 import sys 12 13 # Array size and a limit on the number of processes. 14 15 N = 10 16 limit = 10 17 delay = 1 18 19 # Work function and monitoring class. 20 21 def calculate(i, j): 22 23 """ 24 A supposedly time-consuming calculation on 'i' and 'j'. 25 """ 26 27 #time.sleep(delay * random.random()) 28 time.sleep(delay) 29 return (i, j, i * N + j) 30 31 # Main computation. 32 33 def task(): 34 35 # Initialise the communications queue with a limit on the number of 36 # channels/processes. 37 38 queue = pprocess.Queue(limit=limit) 39 40 # Initialise an array. 41 42 results = [0] * N * N 43 44 # Wrap the calculate function and manage it. 45 46 calc = queue.manage(pprocess.MakeParallel(calculate)) 47 48 # Perform the work. 49 50 print "Calculating..." 51 for i in range(0, N): 52 for j in range(0, N): 53 calc(i, j) 54 55 # Store the results as they arrive. 56 57 print "Finishing..." 58 for i, j, result in queue: 59 results[i*N+j] = result 60 61 return results 62 63 # Main program. 64 65 if __name__ == "__main__": 66 67 t = time.time() 68 69 if "--reconnect" not in sys.argv: 70 71 queue = pprocess.PersistentQueue(limit=1) 72 73 # Wrap the computation and manage it. 74 75 ptask = queue.manage("task.socket", pprocess.MakeParallel(task)) 76 77 # Perform the work. 78 79 ptask() 80 81 # Close down the queue. 82 83 del queue 84 print "Closed original queue." 85 86 if "--start" not in sys.argv: 87 88 # Open another queue. 89 90 print "Opening new queue." 91 queue = pprocess.PersistentQueue(limit=1) 92 93 # Reconnect to the task. 94 95 queue.connect("task.socket") 96 97 # Wait for the results. 98 99 print "Waiting for persistent results" 100 for results in queue: 101 pass # should only be one element 102 103 # Show the results. 104 105 for i in range(0, N): 106 for result in results[i*N:i*N+N]: 107 print result, 108 print 109 110 print "Time taken:", time.time() - t 111 112 # vim: tabstop=4 expandtab shiftwidth=4