1 #!/usr/bin/env python 2 3 """ 4 A simple example of parallel computation using message exchanges and the create 5 function. 6 """ 7 8 import pprocess 9 import time 10 11 # Array size and a limit on the number of processes. 12 13 N = 10 14 limit = 10 15 delay = 1 16 17 # Monitoring class. 18 19 class MyExchange(pprocess.Exchange): 20 21 "Parallel convenience class containing the array assignment operation." 22 23 def store_data(self, ch): 24 i, j, result = ch.receive() 25 self.D[i*N+j] = result 26 27 # Main program. 28 29 if __name__ == "__main__": 30 31 # Initialise the communications exchange with a limit on the number of 32 # channels/processes. 33 34 exchange = MyExchange(limit=limit) 35 t = time.time() 36 37 # Initialise an array - it is stored in the exchange to permit automatic 38 # assignment of values as the data arrives. 39 40 exchange.D = [0] * N * N 41 42 # The parallel computation. 43 # NOTE: Could use the with statement in the loop to package the 44 # NOTE: try...finally functionality. 45 46 print "Calculating..." 47 for i in range(0, N): 48 for j in range(0, N): 49 ch = exchange.create() 50 if ch: 51 try: # Calculation work. 52 53 time.sleep(delay) 54 ch.send((i, j, i * N + j)) 55 56 finally: # Important finalisation. 57 58 pprocess.exit(ch) 59 60 print "Finishing..." 61 exchange.finish() 62 63 # Show the result. 64 65 print "Time taken:", time.time() - t 66 for i in range(0, N): 67 for result in exchange.D[i*N:i*N+N]: 68 print result, 69 print 70 71 # vim: tabstop=4 expandtab shiftwidth=4