1 #!/usr/bin/env python 2 3 """ 4 A simple example of parallel computation using message exchanges directly. 5 """ 6 7 import pprocess 8 import time 9 10 # Array size and a limit on the number of processes. 11 12 N = 10 13 limit = 10 14 delay = 1 15 16 # Work function and monitoring class. 17 18 def calculate(ch, i, j): 19 20 """ 21 A supposedly time-consuming calculation on 'i' and 'j', using 'ch' to 22 communicate with the parent process. 23 """ 24 25 time.sleep(delay) 26 ch.send((i, j, i * N + j)) 27 28 class MyExchange(pprocess.Exchange): 29 30 "Parallel convenience class containing the array assignment operation." 31 32 def store_data(self, ch): 33 i, j, result = ch.receive() 34 self.D[i*N+j] = result 35 36 # Main program. 37 38 if __name__ == "__main__": 39 40 t = time.time() 41 42 # Initialise the communications exchange with a limit on the number of 43 # channels/processes. 44 45 exchange = MyExchange(limit=limit) 46 47 # Initialise an array - it is stored in the exchange to permit automatic 48 # assignment of values as the data arrives. 49 50 results = exchange.D = [0] * N * N 51 52 # Perform the work. 53 54 print "Calculating..." 55 for i in range(0, N): 56 for j in range(0, N): 57 exchange.start(calculate, i, j) 58 59 # Wait for the results. 60 61 print "Finishing..." 62 exchange.finish() 63 64 # Show the results. 65 66 for i in range(0, N): 67 for result in results[i*N:i*N+N]: 68 print result, 69 print 70 71 print "Time taken:", time.time() - t 72 73 # vim: tabstop=4 expandtab shiftwidth=4