1 #!/usr/bin/env python 2 3 """ 4 A simple example of parallel computation using exchanges, managed callables and 5 persistent communications. 6 """ 7 8 import pprocess 9 import time 10 #import random 11 import sys 12 13 # Array size and a limit on the number of processes. 14 15 N = 10 16 limit = 10 17 delay = 1 18 19 # Work function and monitoring class. 20 21 def calculate(i, j): 22 23 """ 24 A supposedly time-consuming calculation on 'i' and 'j'. 25 """ 26 27 #time.sleep(delay * random.random()) 28 time.sleep(delay) 29 return (i, j, i * N + j) 30 31 class MyExchange(pprocess.Exchange): 32 33 "Parallel convenience class containing the array assignment operation." 34 35 def store_data(self, ch): 36 i, j, result = ch.receive() 37 self.D[i*N+j] = result 38 39 class MyPersistentExchange(pprocess.PersistentExchange): 40 41 "A persistent exchange." 42 43 def store_data(self, ch): 44 self.D[:] = ch.receive() 45 46 # Main computation. 47 48 def task(): 49 50 # Initialise the communications exchange with a limit on the number of 51 # channels/processes. 52 53 exchange = MyExchange(limit=limit) 54 55 # Initialise an array - it is stored in the exchange to permit automatic 56 # assignment of values as the data arrives. 57 58 results = exchange.D = [0] * N * N 59 60 # Wrap the calculate function and manage it. 61 62 calc = exchange.manage(pprocess.MakeParallel(calculate)) 63 64 # Perform the work. 65 66 for i in range(0, N): 67 for j in range(0, N): 68 calc(i, j) 69 70 # Wait for the results. 71 72 exchange.finish() 73 return results 74 75 # Main program. 76 77 if __name__ == "__main__": 78 79 t = time.time() 80 81 # Initialise an array - it is stored in the exchange to permit automatic 82 # assignment of values as the data arrives. 83 84 results = [0] * N * N 85 86 if "--reconnect" not in sys.argv: 87 88 # Initialise the persistent exchange with a single process for the main 89 # computation. 90 91 exchange = MyPersistentExchange(limit=1) 92 93 # Wrap the computation and manage it. 94 95 ptask = exchange.manage("task.socket", pprocess.MakeParallel(task)) 96 97 # Perform the work. 98 99 ptask() 100 101 # Close down the exchange. 102 103 del exchange 104 print "Closed original exchange." 105 106 if "--start" not in sys.argv: 107 108 # Open another exchange. 109 110 print "Opening new exchange." 111 exchange = MyPersistentExchange(limit=1) 112 exchange.D = results 113 114 # Reconnect to the task. 115 116 exchange.connect("task.socket") 117 118 # Wait for the results. 119 120 print "Waiting for persistent results" 121 exchange.finish() 122 123 # Show the results. 124 125 for i in range(0, N): 126 for result in results[i*N:i*N+N]: 127 print result, 128 print 129 130 print "Time taken:", time.time() - t 131 132 # vim: tabstop=4 expandtab shiftwidth=4