paulb@141 | 1 | #!/usr/bin/env python |
paulb@141 | 2 | |
paulb@141 | 3 | """ |
paulb@141 | 4 | A simple example of parallel computation using exchanges, managed callables and |
paulb@141 | 5 | persistent communications. |
paulb@141 | 6 | """ |
paulb@141 | 7 | |
paulb@141 | 8 | import pprocess |
paulb@141 | 9 | import time |
paulb@141 | 10 | #import random |
paulb@141 | 11 | import sys |
paulb@141 | 12 | |
paulb@141 | 13 | # Array size and a limit on the number of processes. |
paulb@141 | 14 | |
paulb@141 | 15 | N = 10 |
paulb@141 | 16 | limit = 10 |
paulb@141 | 17 | delay = 1 |
paulb@141 | 18 | |
paulb@141 | 19 | # Work function and monitoring class. |
paulb@141 | 20 | |
paulb@141 | 21 | def calculate(i, j): |
paulb@141 | 22 | |
paulb@141 | 23 | """ |
paulb@141 | 24 | A supposedly time-consuming calculation on 'i' and 'j'. |
paulb@141 | 25 | """ |
paulb@141 | 26 | |
paulb@141 | 27 | #time.sleep(delay * random.random()) |
paulb@141 | 28 | time.sleep(delay) |
paulb@141 | 29 | return (i, j, i * N + j) |
paulb@141 | 30 | |
paulb@141 | 31 | class MyExchange(pprocess.Exchange): |
paulb@141 | 32 | |
paulb@141 | 33 | "Parallel convenience class containing the array assignment operation." |
paulb@141 | 34 | |
paulb@141 | 35 | def store_data(self, ch): |
paulb@141 | 36 | i, j, result = ch.receive() |
paulb@141 | 37 | self.D[i*N+j] = result |
paulb@141 | 38 | |
paulb@141 | 39 | class MyPersistentExchange(pprocess.PersistentExchange): |
paulb@141 | 40 | |
paulb@141 | 41 | "A persistent exchange." |
paulb@141 | 42 | |
paulb@141 | 43 | def store_data(self, ch): |
paulb@141 | 44 | self.D[:] = ch.receive() |
paulb@141 | 45 | |
paulb@141 | 46 | # Main computation. |
paulb@141 | 47 | |
paulb@141 | 48 | def task(): |
paulb@141 | 49 | |
paulb@141 | 50 | # Initialise the communications exchange with a limit on the number of |
paulb@141 | 51 | # channels/processes. |
paulb@141 | 52 | |
paulb@141 | 53 | exchange = MyExchange(limit=limit) |
paulb@141 | 54 | |
paulb@141 | 55 | # Initialise an array - it is stored in the exchange to permit automatic |
paulb@141 | 56 | # assignment of values as the data arrives. |
paulb@141 | 57 | |
paulb@141 | 58 | results = exchange.D = [0] * N * N |
paulb@141 | 59 | |
paulb@141 | 60 | # Wrap the calculate function and manage it. |
paulb@141 | 61 | |
paulb@141 | 62 | calc = exchange.manage(pprocess.MakeParallel(calculate)) |
paulb@141 | 63 | |
paulb@141 | 64 | # Perform the work. |
paulb@141 | 65 | |
paulb@141 | 66 | for i in range(0, N): |
paulb@141 | 67 | for j in range(0, N): |
paulb@141 | 68 | calc(i, j) |
paulb@141 | 69 | |
paulb@141 | 70 | # Wait for the results. |
paulb@141 | 71 | |
paulb@141 | 72 | exchange.finish() |
paulb@141 | 73 | return results |
paulb@141 | 74 | |
paulb@141 | 75 | # Main program. |
paulb@141 | 76 | |
paulb@141 | 77 | if __name__ == "__main__": |
paulb@141 | 78 | |
paulb@141 | 79 | t = time.time() |
paulb@141 | 80 | |
paulb@141 | 81 | # Initialise an array - it is stored in the exchange to permit automatic |
paulb@141 | 82 | # assignment of values as the data arrives. |
paulb@141 | 83 | |
paulb@141 | 84 | results = [0] * N * N |
paulb@141 | 85 | |
paulb@141 | 86 | if "--reconnect" not in sys.argv: |
paulb@141 | 87 | |
paulb@141 | 88 | # Initialise the persistent exchange with a single process for the main |
paulb@141 | 89 | # computation. |
paulb@141 | 90 | |
paulb@141 | 91 | exchange = MyPersistentExchange(limit=1) |
paulb@141 | 92 | |
paulb@141 | 93 | # Wrap the computation and manage it. |
paulb@141 | 94 | |
paulb@141 | 95 | ptask = exchange.manage("task.socket", pprocess.MakeParallel(task)) |
paulb@141 | 96 | |
paulb@141 | 97 | # Perform the work. |
paulb@141 | 98 | |
paulb@141 | 99 | ptask() |
paulb@141 | 100 | |
paulb@141 | 101 | # Close down the exchange. |
paulb@141 | 102 | |
paulb@141 | 103 | del exchange |
paulb@141 | 104 | print "Closed original exchange." |
paulb@141 | 105 | |
paulb@141 | 106 | if "--start" not in sys.argv: |
paulb@141 | 107 | |
paulb@141 | 108 | # Open another exchange. |
paulb@141 | 109 | |
paulb@141 | 110 | print "Opening new exchange." |
paulb@141 | 111 | exchange = MyPersistentExchange(limit=1) |
paulb@141 | 112 | exchange.D = results |
paulb@141 | 113 | |
paulb@141 | 114 | # Reconnect to the task. |
paulb@141 | 115 | |
paulb@141 | 116 | exchange.connect("task.socket") |
paulb@141 | 117 | |
paulb@141 | 118 | # Wait for the results. |
paulb@141 | 119 | |
paulb@141 | 120 | print "Waiting for persistent results" |
paulb@141 | 121 | exchange.finish() |
paulb@141 | 122 | |
paulb@141 | 123 | # Show the results. |
paulb@141 | 124 | |
paulb@141 | 125 | for i in range(0, N): |
paulb@141 | 126 | for result in results[i*N:i*N+N]: |
paulb@141 | 127 | print result, |
paulb@141 | 128 | print |
paulb@141 | 129 | |
paulb@141 | 130 | print "Time taken:", time.time() - t |
paulb@141 | 131 | |
paulb@141 | 132 | # vim: tabstop=4 expandtab shiftwidth=4 |