1 #!/usr/bin/env python 2 3 """ 4 A simple example of parallel computation using exchanges and managed callables. 5 """ 6 7 import pprocess 8 import time 9 #import random 10 11 # Array size and a limit on the number of processes. 12 13 N = 10 14 limit = 10 15 delay = 1 16 17 # Work function and monitoring class. 18 19 def calculate(i, j): 20 21 """ 22 A supposedly time-consuming calculation on 'i' and 'j'. 23 """ 24 25 #time.sleep(delay * random.random()) 26 time.sleep(delay) 27 return (i, j, i * N + j) 28 29 class MyExchange(pprocess.Exchange): 30 31 "Parallel convenience class containing the array assignment operation." 32 33 def store_data(self, ch): 34 i, j, result = ch.receive() 35 self.D[i*N+j] = result 36 37 # Main program. 38 39 if __name__ == "__main__": 40 41 t = time.time() 42 43 # Initialise the communications exchange with a limit on the number of 44 # channels/processes. 45 46 exchange = MyExchange(limit=limit) 47 48 # Initialise an array - it is stored in the exchange to permit automatic 49 # assignment of values as the data arrives. 50 51 results = exchange.D = [0] * N * N 52 53 # Wrap the calculate function and manage it. 54 55 calc = exchange.manage(pprocess.MakeParallel(calculate)) 56 57 # Perform the work. 58 59 print "Calculating..." 60 for i in range(0, N): 61 for j in range(0, N): 62 calc(i, j) 63 64 # Wait for the results. 65 66 print "Finishing..." 67 exchange.finish() 68 69 # Show the results. 70 71 for i in range(0, N): 72 for result in results[i*N:i*N+N]: 73 print result, 74 print 75 76 print "Time taken:", time.time() - t 77 78 # vim: tabstop=4 expandtab shiftwidth=4