1 #!/usr/bin/env python 2 3 """ 4 A simple example of parallel computation using message exchanges directly. 5 """ 6 7 import pprocess 8 import time 9 10 # Array size and a limit on the number of processes. 11 12 N = 10 13 limit = 10 14 delay = 1 15 16 # Work function and monitoring class. 17 18 def calculate(ch, i, j): 19 20 """ 21 A time-consuming calculation, using 'ch' to communicate with the parent 22 process, with 'i' and 'j' as operands. 23 """ 24 25 time.sleep(delay) 26 ch.send((i, j, i * N + j)) 27 28 class MyExchange(pprocess.Exchange): 29 30 "Parallel convenience class containing the array assignment operation." 31 32 def store_data(self, ch): 33 i, j, result = ch.receive() 34 self.D[i*N+j] = result 35 36 # Main program. 37 38 if __name__ == "__main__": 39 40 # Initialise the communications exchange with a limit on the number of 41 # channels/processes. 42 43 exchange = MyExchange(limit=limit) 44 t = time.time() 45 46 # Initialise an array - it is stored in the exchange to permit automatic 47 # assignment of values as the data arrives. 48 49 exchange.D = [0] * N * N 50 51 # The parallel computation. 52 53 print "Calculating..." 54 for i in range(0, N): 55 for j in range(0, N): 56 exchange.start(calculate, i, j) 57 58 print "Finishing..." 59 exchange.finish() 60 61 # Show the result. 62 63 print "Time taken:", time.time() - t 64 for i in range(0, N): 65 for result in exchange.D[i*N:i*N+N]: 66 print result, 67 print 68 69 # vim: tabstop=4 expandtab shiftwidth=4