paulb@69 | 1 | #!/usr/bin/env python |
paulb@69 | 2 | |
paulb@69 | 3 | """ |
paulb@85 | 4 | A simple example of parallel computation using message exchanges directly, |
paulb@85 | 5 | exchanges and managed callables, and map-style processing with parallel-enabled |
paulb@85 | 6 | functions. |
paulb@69 | 7 | """ |
paulb@69 | 8 | |
paulb@64 | 9 | import pprocess |
paulb@64 | 10 | import time |
paulb@64 | 11 | |
paulb@69 | 12 | # Array size and a limit on the number of processes. |
paulb@69 | 13 | |
paulb@77 | 14 | N = 10 |
paulb@64 | 15 | limit = 10 |
paulb@77 | 16 | delay = 1 |
paulb@64 | 17 | |
paulb@64 | 18 | def make_array(n): |
paulb@69 | 19 | |
paulb@69 | 20 | "Make an 'n' * 'n' array initialised with zeros." |
paulb@69 | 21 | |
paulb@64 | 22 | return [list(x) for x in [(0,) * n] * n] |
paulb@64 | 23 | |
paulb@85 | 24 | # Work function and monitoring class. |
paulb@85 | 25 | |
paulb@64 | 26 | def calculate(ch, i, j): |
paulb@69 | 27 | |
paulb@69 | 28 | """ |
paulb@69 | 29 | A time-consuming calculation, using 'ch' to communicate with the parent |
paulb@69 | 30 | process, with 'i' and 'j' as operands. |
paulb@69 | 31 | """ |
paulb@69 | 32 | |
paulb@77 | 33 | time.sleep(delay) |
paulb@64 | 34 | ch.send((i, j, i * N + j)) |
paulb@64 | 35 | |
paulb@69 | 36 | class MyExchange(pprocess.Exchange): |
paulb@69 | 37 | |
paulb@69 | 38 | "Parallel convenience class containing the array assignment operation." |
paulb@64 | 39 | |
paulb@69 | 40 | def store_data(self, ch): |
paulb@64 | 41 | i, j, result = ch.receive() |
paulb@69 | 42 | self.D[i][j] = result |
paulb@69 | 43 | |
paulb@85 | 44 | # Alternative work function. |
paulb@85 | 45 | |
paulb@85 | 46 | def calculate_seq(i, j): |
paulb@85 | 47 | |
paulb@85 | 48 | """ |
paulb@85 | 49 | A version of 'calculate' with a simpler result suitable for sequential |
paulb@85 | 50 | collection. It accepts no channel, pretending to be a function used in a |
paulb@85 | 51 | non-parallel context. |
paulb@85 | 52 | """ |
paulb@85 | 53 | |
paulb@85 | 54 | time.sleep(delay) |
paulb@85 | 55 | return i * N + j |
paulb@85 | 56 | |
paulb@85 | 57 | # The alternative work function converted into a parallel-enabled version. |
paulb@85 | 58 | |
paulb@85 | 59 | calculate2 = pprocess.MakeParallel(calculate_seq) |
paulb@85 | 60 | |
paulb@69 | 61 | # Main program. |
paulb@64 | 62 | |
paulb@64 | 63 | if __name__ == "__main__": |
paulb@69 | 64 | |
paulb@69 | 65 | # Initialise the communications exchange with a limit on the number of |
paulb@69 | 66 | # channels/processes. |
paulb@69 | 67 | |
paulb@69 | 68 | exchange = MyExchange(limit=limit) |
paulb@69 | 69 | |
paulb@69 | 70 | # Initialise an array - it is stored in the exchange to permit automatic |
paulb@69 | 71 | # assignment of values as the data arrives. |
paulb@69 | 72 | |
paulb@69 | 73 | exchange.D = make_array(N) |
paulb@69 | 74 | |
paulb@69 | 75 | # The parallel computation. |
paulb@64 | 76 | |
paulb@64 | 77 | print "Calculating..." |
paulb@64 | 78 | for i in range(0, N): |
paulb@64 | 79 | for j in range(0, N): |
paulb@81 | 80 | exchange.start(calculate, i, j) |
paulb@77 | 81 | print i, j |
paulb@64 | 82 | |
paulb@64 | 83 | print "Finishing..." |
paulb@69 | 84 | exchange.finish() |
paulb@69 | 85 | |
paulb@69 | 86 | # Show the result. |
paulb@64 | 87 | |
paulb@64 | 88 | print |
paulb@69 | 89 | for row in exchange.D: |
paulb@64 | 90 | print row |
paulb@64 | 91 | |
paulb@85 | 92 | # Try again with managed callables. |
paulb@85 | 93 | |
paulb@85 | 94 | exchange.D = make_array(N) |
paulb@85 | 95 | calc = exchange.manage(calculate) |
paulb@85 | 96 | |
paulb@85 | 97 | print "Calculating..." |
paulb@85 | 98 | for i in range(0, N): |
paulb@85 | 99 | for j in range(0, N): |
paulb@85 | 100 | calc(i, j) |
paulb@85 | 101 | print i, j |
paulb@85 | 102 | |
paulb@85 | 103 | print "Finishing..." |
paulb@85 | 104 | exchange.finish() |
paulb@85 | 105 | |
paulb@85 | 106 | print |
paulb@85 | 107 | for row in exchange.D: |
paulb@85 | 108 | print row |
paulb@85 | 109 | |
paulb@85 | 110 | # Try again with a map, building the results sequentially. |
paulb@85 | 111 | |
paulb@85 | 112 | mymap = pprocess.pmap |
paulb@85 | 113 | results = [] |
paulb@85 | 114 | |
paulb@85 | 115 | print "Calculating..." |
paulb@85 | 116 | for i in range(0, N): |
paulb@85 | 117 | sequence = [] |
paulb@85 | 118 | for j in range(0, N): |
paulb@85 | 119 | sequence.append((i, j)) |
paulb@85 | 120 | results.append(mymap(calculate2, sequence)) |
paulb@85 | 121 | print i |
paulb@85 | 122 | |
paulb@85 | 123 | print |
paulb@85 | 124 | for row in results: |
paulb@85 | 125 | print row |
paulb@85 | 126 | |
paulb@64 | 127 | # vim: tabstop=4 expandtab shiftwidth=4 |