1 #!/usr/bin/env python 2 3 """ 4 A simple example of parallel computation using message exchanges directly. 5 """ 6 7 import pprocess 8 import time 9 #import random 10 11 # Array size and a limit on the number of processes. 12 13 N = 10 14 limit = 10 15 delay = 1 16 17 # Work function and monitoring class. 18 19 def calculate(ch, i, j): 20 21 """ 22 A supposedly time-consuming calculation on 'i' and 'j', using 'ch' to 23 communicate with the parent process. 24 """ 25 26 #time.sleep(delay * random.random()) 27 time.sleep(delay) 28 ch.send((i, j, i * N + j)) 29 30 class MyExchange(pprocess.Exchange): 31 32 "Parallel convenience class containing the array assignment operation." 33 34 def store_data(self, ch): 35 i, j, result = ch.receive() 36 self.D[i*N+j] = result 37 38 # Main program. 39 40 if __name__ == "__main__": 41 42 t = time.time() 43 44 # Initialise the communications exchange with a limit on the number of 45 # channels/processes. 46 47 exchange = MyExchange(limit=limit) 48 49 # Initialise an array - it is stored in the exchange to permit automatic 50 # assignment of values as the data arrives. 51 52 results = exchange.D = [0] * N * N 53 54 # Perform the work. 55 56 print "Calculating..." 57 for i in range(0, N): 58 for j in range(0, N): 59 exchange.start(calculate, i, j) 60 61 # Wait for the results. 62 63 print "Finishing..." 64 exchange.finish() 65 66 # Show the results. 67 68 for i in range(0, N): 69 for result in results[i*N:i*N+N]: 70 print result, 71 print 72 73 print "Time taken:", time.time() - t 74 75 # vim: tabstop=4 expandtab shiftwidth=4