1 #!/usr/bin/env python 2 3 """ 4 A simple example of parallel computation using exchanges and managed callables. 5 """ 6 7 import pprocess 8 import time 9 10 # Array size and a limit on the number of processes. 11 12 N = 10 13 limit = 10 14 delay = 1 15 16 # Work function and monitoring class. 17 18 def calculate(i, j): 19 20 """ 21 A supposedly time-consuming calculation on 'i' and 'j'. 22 """ 23 24 time.sleep(delay) 25 return (i, j, i * N + j) 26 27 class MyExchange(pprocess.Exchange): 28 29 "Parallel convenience class containing the array assignment operation." 30 31 def store_data(self, ch): 32 i, j, result = ch.receive() 33 self.D[i*N+j] = result 34 35 # Main program. 36 37 if __name__ == "__main__": 38 39 # Initialise the communications exchange with a limit on the number of 40 # channels/processes. 41 42 exchange = MyExchange(limit=limit) 43 t = time.time() 44 45 # Initialise an array - it is stored in the exchange to permit automatic 46 # assignment of values as the data arrives. 47 48 exchange.D = [0] * N * N 49 50 # Wrap the calculate function and manage it. 51 52 calc = exchange.manage(pprocess.MakeParallel(calculate)) 53 54 # Perform the work. 55 56 print "Calculating..." 57 for i in range(0, N): 58 for j in range(0, N): 59 calc(i, j) 60 61 # Wait for the results. 62 63 print "Finishing..." 64 exchange.finish() 65 66 # Show the results. 67 68 print "Time taken:", time.time() - t 69 for i in range(0, N): 70 for result in exchange.D[i*N:i*N+N]: 71 print result, 72 print 73 74 # vim: tabstop=4 expandtab shiftwidth=4