1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
1.2 +++ b/examples/simple_persistent_managed.py Sun Jun 01 14:50:46 2008 +0000
1.3 @@ -0,0 +1,132 @@
1.4 +#!/usr/bin/env python
1.5 +
1.6 +"""
1.7 +A simple example of parallel computation using exchanges, managed callables and
1.8 +persistent communications.
1.9 +"""
1.10 +
1.11 +import pprocess
1.12 +import time
1.13 +#import random
1.14 +import sys
1.15 +
1.16 +# Array size and a limit on the number of processes.
1.17 +
1.18 +N = 10
1.19 +limit = 10
1.20 +delay = 1
1.21 +
1.22 +# Work function and monitoring class.
1.23 +
1.24 +def calculate(i, j):
1.25 +
1.26 + """
1.27 + A supposedly time-consuming calculation on 'i' and 'j'.
1.28 + """
1.29 +
1.30 + #time.sleep(delay * random.random())
1.31 + time.sleep(delay)
1.32 + return (i, j, i * N + j)
1.33 +
1.34 +class MyExchange(pprocess.Exchange):
1.35 +
1.36 + "Parallel convenience class containing the array assignment operation."
1.37 +
1.38 + def store_data(self, ch):
1.39 + i, j, result = ch.receive()
1.40 + self.D[i*N+j] = result
1.41 +
1.42 +class MyPersistentExchange(pprocess.PersistentExchange):
1.43 +
1.44 + "A persistent exchange."
1.45 +
1.46 + def store_data(self, ch):
1.47 + self.D[:] = ch.receive()
1.48 +
1.49 +# Main computation.
1.50 +
1.51 +def task():
1.52 +
1.53 + # Initialise the communications exchange with a limit on the number of
1.54 + # channels/processes.
1.55 +
1.56 + exchange = MyExchange(limit=limit)
1.57 +
1.58 + # Initialise an array - it is stored in the exchange to permit automatic
1.59 + # assignment of values as the data arrives.
1.60 +
1.61 + results = exchange.D = [0] * N * N
1.62 +
1.63 + # Wrap the calculate function and manage it.
1.64 +
1.65 + calc = exchange.manage(pprocess.MakeParallel(calculate))
1.66 +
1.67 + # Perform the work.
1.68 +
1.69 + for i in range(0, N):
1.70 + for j in range(0, N):
1.71 + calc(i, j)
1.72 +
1.73 + # Wait for the results.
1.74 +
1.75 + exchange.finish()
1.76 + return results
1.77 +
1.78 +# Main program.
1.79 +
1.80 +if __name__ == "__main__":
1.81 +
1.82 + t = time.time()
1.83 +
1.84 + # Initialise an array - it is stored in the exchange to permit automatic
1.85 + # assignment of values as the data arrives.
1.86 +
1.87 + results = [0] * N * N
1.88 +
1.89 + if "--reconnect" not in sys.argv:
1.90 +
1.91 + # Initialise the persistent exchange with a single process for the main
1.92 + # computation.
1.93 +
1.94 + exchange = MyPersistentExchange(limit=1)
1.95 +
1.96 + # Wrap the computation and manage it.
1.97 +
1.98 + ptask = exchange.manage("task.socket", pprocess.MakeParallel(task))
1.99 +
1.100 + # Perform the work.
1.101 +
1.102 + ptask()
1.103 +
1.104 + # Close down the exchange.
1.105 +
1.106 + del exchange
1.107 + print "Closed original exchange."
1.108 +
1.109 + if "--start" not in sys.argv:
1.110 +
1.111 + # Open another exchange.
1.112 +
1.113 + print "Opening new exchange."
1.114 + exchange = MyPersistentExchange(limit=1)
1.115 + exchange.D = results
1.116 +
1.117 + # Reconnect to the task.
1.118 +
1.119 + exchange.connect("task.socket")
1.120 +
1.121 + # Wait for the results.
1.122 +
1.123 + print "Waiting for persistent results"
1.124 + exchange.finish()
1.125 +
1.126 + # Show the results.
1.127 +
1.128 + for i in range(0, N):
1.129 + for result in results[i*N:i*N+N]:
1.130 + print result,
1.131 + print
1.132 +
1.133 + print "Time taken:", time.time() - t
1.134 +
1.135 +# vim: tabstop=4 expandtab shiftwidth=4
2.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
2.2 +++ b/examples/simple_persistent_managed_queue.py Sun Jun 01 14:50:46 2008 +0000
2.3 @@ -0,0 +1,112 @@
2.4 +#!/usr/bin/env python
2.5 +
2.6 +"""
2.7 +A simple example of parallel computation using a queue, managed callables and
2.8 +persistent communications.
2.9 +"""
2.10 +
2.11 +import pprocess
2.12 +import time
2.13 +#import random
2.14 +import sys
2.15 +
2.16 +# Array size and a limit on the number of processes.
2.17 +
2.18 +N = 10
2.19 +limit = 10
2.20 +delay = 1
2.21 +
2.22 +# Work function and monitoring class.
2.23 +
2.24 +def calculate(i, j):
2.25 +
2.26 + """
2.27 + A supposedly time-consuming calculation on 'i' and 'j'.
2.28 + """
2.29 +
2.30 + #time.sleep(delay * random.random())
2.31 + time.sleep(delay)
2.32 + return (i, j, i * N + j)
2.33 +
2.34 +# Main computation.
2.35 +
2.36 +def task():
2.37 +
2.38 + # Initialise the communications queue with a limit on the number of
2.39 + # channels/processes.
2.40 +
2.41 + queue = pprocess.Queue(limit=limit)
2.42 +
2.43 + # Initialise an array.
2.44 +
2.45 + results = [0] * N * N
2.46 +
2.47 + # Wrap the calculate function and manage it.
2.48 +
2.49 + calc = queue.manage(pprocess.MakeParallel(calculate))
2.50 +
2.51 + # Perform the work.
2.52 +
2.53 + print "Calculating..."
2.54 + for i in range(0, N):
2.55 + for j in range(0, N):
2.56 + calc(i, j)
2.57 +
2.58 + # Store the results as they arrive.
2.59 +
2.60 + print "Finishing..."
2.61 + for i, j, result in queue:
2.62 + results[i*N+j] = result
2.63 +
2.64 + return results
2.65 +
2.66 +# Main program.
2.67 +
2.68 +if __name__ == "__main__":
2.69 +
2.70 + t = time.time()
2.71 +
2.72 + if "--reconnect" not in sys.argv:
2.73 +
2.74 + queue = pprocess.PersistentQueue(limit=1)
2.75 +
2.76 + # Wrap the computation and manage it.
2.77 +
2.78 + ptask = queue.manage("task.socket", pprocess.MakeParallel(task))
2.79 +
2.80 + # Perform the work.
2.81 +
2.82 + ptask()
2.83 +
2.84 + # Close down the queue.
2.85 +
2.86 + del queue
2.87 + print "Closed original queue."
2.88 +
2.89 + if "--start" not in sys.argv:
2.90 +
2.91 + # Open another queue.
2.92 +
2.93 + print "Opening new queue."
2.94 + queue = pprocess.PersistentQueue(limit=1)
2.95 +
2.96 + # Reconnect to the task.
2.97 +
2.98 + queue.connect("task.socket")
2.99 +
2.100 + # Wait for the results.
2.101 +
2.102 + print "Waiting for persistent results"
2.103 + for results in queue:
2.104 + pass # should only be one element
2.105 +
2.106 + # Show the results.
2.107 +
2.108 + for i in range(0, N):
2.109 + for result in results[i*N:i*N+N]:
2.110 + print result,
2.111 + print
2.112 +
2.113 + print "Time taken:", time.time() - t
2.114 +
2.115 +# vim: tabstop=4 expandtab shiftwidth=4