# HG changeset patch # User paulb # Date 1212331846 0 # Node ID 7ffca924cff4746943b2d92cc48f5b1ec722c0d8 # Parent 60e9dd6158393aee60a49dcf6cb47728ffa9ab08 [project @ 2008-06-01 14:50:46 by paulb] Added examples for the persistent/background process support. diff -r 60e9dd615839 -r 7ffca924cff4 examples/simple_persistent_managed.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/examples/simple_persistent_managed.py Sun Jun 01 14:50:46 2008 +0000 @@ -0,0 +1,132 @@ +#!/usr/bin/env python + +""" +A simple example of parallel computation using exchanges, managed callables and +persistent communications. +""" + +import pprocess +import time +#import random +import sys + +# Array size and a limit on the number of processes. + +N = 10 +limit = 10 +delay = 1 + +# Work function and monitoring class. + +def calculate(i, j): + + """ + A supposedly time-consuming calculation on 'i' and 'j'. + """ + + #time.sleep(delay * random.random()) + time.sleep(delay) + return (i, j, i * N + j) + +class MyExchange(pprocess.Exchange): + + "Parallel convenience class containing the array assignment operation." + + def store_data(self, ch): + i, j, result = ch.receive() + self.D[i*N+j] = result + +class MyPersistentExchange(pprocess.PersistentExchange): + + "A persistent exchange." + + def store_data(self, ch): + self.D[:] = ch.receive() + +# Main computation. + +def task(): + + # Initialise the communications exchange with a limit on the number of + # channels/processes. + + exchange = MyExchange(limit=limit) + + # Initialise an array - it is stored in the exchange to permit automatic + # assignment of values as the data arrives. + + results = exchange.D = [0] * N * N + + # Wrap the calculate function and manage it. + + calc = exchange.manage(pprocess.MakeParallel(calculate)) + + # Perform the work. + + for i in range(0, N): + for j in range(0, N): + calc(i, j) + + # Wait for the results. + + exchange.finish() + return results + +# Main program. + +if __name__ == "__main__": + + t = time.time() + + # Initialise an array - it is stored in the exchange to permit automatic + # assignment of values as the data arrives. + + results = [0] * N * N + + if "--reconnect" not in sys.argv: + + # Initialise the persistent exchange with a single process for the main + # computation. + + exchange = MyPersistentExchange(limit=1) + + # Wrap the computation and manage it. + + ptask = exchange.manage("task.socket", pprocess.MakeParallel(task)) + + # Perform the work. + + ptask() + + # Close down the exchange. + + del exchange + print "Closed original exchange." + + if "--start" not in sys.argv: + + # Open another exchange. + + print "Opening new exchange." + exchange = MyPersistentExchange(limit=1) + exchange.D = results + + # Reconnect to the task. + + exchange.connect("task.socket") + + # Wait for the results. + + print "Waiting for persistent results" + exchange.finish() + + # Show the results. + + for i in range(0, N): + for result in results[i*N:i*N+N]: + print result, + print + + print "Time taken:", time.time() - t + +# vim: tabstop=4 expandtab shiftwidth=4 diff -r 60e9dd615839 -r 7ffca924cff4 examples/simple_persistent_managed_queue.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/examples/simple_persistent_managed_queue.py Sun Jun 01 14:50:46 2008 +0000 @@ -0,0 +1,112 @@ +#!/usr/bin/env python + +""" +A simple example of parallel computation using a queue, managed callables and +persistent communications. +""" + +import pprocess +import time +#import random +import sys + +# Array size and a limit on the number of processes. + +N = 10 +limit = 10 +delay = 1 + +# Work function and monitoring class. + +def calculate(i, j): + + """ + A supposedly time-consuming calculation on 'i' and 'j'. + """ + + #time.sleep(delay * random.random()) + time.sleep(delay) + return (i, j, i * N + j) + +# Main computation. + +def task(): + + # Initialise the communications queue with a limit on the number of + # channels/processes. + + queue = pprocess.Queue(limit=limit) + + # Initialise an array. + + results = [0] * N * N + + # Wrap the calculate function and manage it. + + calc = queue.manage(pprocess.MakeParallel(calculate)) + + # Perform the work. + + print "Calculating..." + for i in range(0, N): + for j in range(0, N): + calc(i, j) + + # Store the results as they arrive. + + print "Finishing..." + for i, j, result in queue: + results[i*N+j] = result + + return results + +# Main program. + +if __name__ == "__main__": + + t = time.time() + + if "--reconnect" not in sys.argv: + + queue = pprocess.PersistentQueue(limit=1) + + # Wrap the computation and manage it. + + ptask = queue.manage("task.socket", pprocess.MakeParallel(task)) + + # Perform the work. + + ptask() + + # Close down the queue. + + del queue + print "Closed original queue." + + if "--start" not in sys.argv: + + # Open another queue. + + print "Opening new queue." + queue = pprocess.PersistentQueue(limit=1) + + # Reconnect to the task. + + queue.connect("task.socket") + + # Wait for the results. + + print "Waiting for persistent results" + for results in queue: + pass # should only be one element + + # Show the results. + + for i in range(0, N): + for result in results[i*N:i*N+N]: + print result, + print + + print "Time taken:", time.time() - t + +# vim: tabstop=4 expandtab shiftwidth=4