1 #!/usr/bin/env python 2 3 """ 4 A simple example of parallel computation using exchanges and managed callables. 5 """ 6 7 import pprocess 8 import time 9 10 # Array size and a limit on the number of processes. 11 12 N = 10 13 limit = 10 14 delay = 1 15 16 # Work function and monitoring class. 17 18 def calculate(i, j): 19 20 """ 21 A supposedly time-consuming calculation on 'i' and 'j'. 22 """ 23 24 time.sleep(delay) 25 return (i, j, i * N + j) 26 27 class MyExchange(pprocess.Exchange): 28 29 "Parallel convenience class containing the array assignment operation." 30 31 def store_data(self, ch): 32 i, j, result = ch.receive() 33 self.D[i*N+j] = result 34 35 # Main program. 36 37 if __name__ == "__main__": 38 39 t = time.time() 40 41 # Initialise the communications exchange with a limit on the number of 42 # channels/processes. 43 44 exchange = MyExchange(limit=limit) 45 46 # Initialise an array - it is stored in the exchange to permit automatic 47 # assignment of values as the data arrives. 48 49 results = exchange.D = [0] * N * N 50 51 # Wrap the calculate function and manage it. 52 53 calc = exchange.manage(pprocess.MakeParallel(calculate)) 54 55 # Perform the work. 56 57 print "Calculating..." 58 for i in range(0, N): 59 for j in range(0, N): 60 calc(i, j) 61 62 # Wait for the results. 63 64 print "Finishing..." 65 exchange.finish() 66 67 # Show the results. 68 69 for i in range(0, N): 70 for result in results[i*N:i*N+N]: 71 print result, 72 print 73 74 print "Time taken:", time.time() - t 75 76 # vim: tabstop=4 expandtab shiftwidth=4