1 #!/usr/bin/env python 2 3 """ 4 A simple example of parallel computation using message exchanges directly, 5 exchanges and managed callables, and map-style processing with parallel-enabled 6 functions. 7 """ 8 9 import pprocess 10 import time 11 12 # Array size and a limit on the number of processes. 13 14 N = 10 15 limit = 10 16 delay = 1 17 18 def make_array(n): 19 20 "Make an 'n' * 'n' array initialised with zeros." 21 22 return [list(x) for x in [(0,) * n] * n] 23 24 # Work function and monitoring class. 25 26 def calculate(ch, i, j): 27 28 """ 29 A time-consuming calculation, using 'ch' to communicate with the parent 30 process, with 'i' and 'j' as operands. 31 """ 32 33 time.sleep(delay) 34 ch.send((i, j, i * N + j)) 35 36 class MyExchange(pprocess.Exchange): 37 38 "Parallel convenience class containing the array assignment operation." 39 40 def store_data(self, ch): 41 i, j, result = ch.receive() 42 self.D[i][j] = result 43 44 # Alternative work function. 45 46 def calculate_seq(i, j): 47 48 """ 49 A version of 'calculate' with a simpler result suitable for sequential 50 collection. It accepts no channel, pretending to be a function used in a 51 non-parallel context. 52 """ 53 54 time.sleep(delay) 55 return i * N + j 56 57 # The alternative work function converted into a parallel-enabled version. 58 59 calculate2 = pprocess.MakeParallel(calculate_seq) 60 61 # Main program. 62 63 if __name__ == "__main__": 64 65 # Initialise the communications exchange with a limit on the number of 66 # channels/processes. 67 68 exchange = MyExchange(limit=limit) 69 70 # Initialise an array - it is stored in the exchange to permit automatic 71 # assignment of values as the data arrives. 72 73 exchange.D = make_array(N) 74 75 # The parallel computation. 76 77 print "Calculating..." 78 for i in range(0, N): 79 for j in range(0, N): 80 exchange.start(calculate, i, j) 81 print i, j 82 83 print "Finishing..." 84 exchange.finish() 85 86 # Show the result. 87 88 print 89 for row in exchange.D: 90 print row 91 92 # Try again with managed callables. 93 94 exchange.D = make_array(N) 95 calc = exchange.manage(calculate) 96 97 print "Calculating..." 98 for i in range(0, N): 99 for j in range(0, N): 100 calc(i, j) 101 print i, j 102 103 print "Finishing..." 104 exchange.finish() 105 106 print 107 for row in exchange.D: 108 print row 109 110 # Try again with a map, building the results sequentially. 111 112 mymap = pprocess.pmap 113 results = [] 114 115 print "Calculating..." 116 for i in range(0, N): 117 sequence = [] 118 for j in range(0, N): 119 sequence.append((i, j)) 120 results.append(mymap(calculate2, sequence)) 121 print i 122 123 print 124 for row in results: 125 print row 126 127 # vim: tabstop=4 expandtab shiftwidth=4