# HG changeset patch # User paulb # Date 1212446673 0 # Node ID 11411cb3d9bc61fffba40dbe2c4ce6da6f9bd19c # Parent 614aa092ba4e7bcb0608e7c752fe2ab0d2eb8e4d [project @ 2008-06-02 22:44:33 by paulb] Added persistent and background examples (with some renaming). diff -r 614aa092ba4e -r 11411cb3d9bc examples/simple_background_queue.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/examples/simple_background_queue.py Mon Jun 02 22:44:33 2008 +0000 @@ -0,0 +1,106 @@ +#!/usr/bin/env python + +""" +A simple example of parallel computation using a queue, background callables and +persistent communications. +""" + +import pprocess +import time +#import random +import sys + +# Array size and a limit on the number of processes. + +N = 10 +limit = 10 +delay = 1 + +# Work function and monitoring class. + +def calculate(i, j): + + """ + A supposedly time-consuming calculation on 'i' and 'j'. + """ + + #time.sleep(delay * random.random()) + time.sleep(delay) + return (i, j, i * N + j) + +# Main computation. + +def task(): + + # Initialise the communications queue with a limit on the number of + # channels/processes. + + queue = pprocess.Queue(limit=limit) + + # Initialise an array. + + results = [0] * N * N + + # Wrap the calculate function and manage it. + + calc = queue.manage(pprocess.MakeParallel(calculate)) + + # Perform the work. + + print "Calculating..." + for i in range(0, N): + for j in range(0, N): + calc(i, j) + + # Store the results as they arrive. + + print "Finishing..." + for i, j, result in queue: + results[i*N+j] = result + + return results + +# Main program. + +if __name__ == "__main__": + + t = time.time() + + if "--reconnect" not in sys.argv: + + # Wrap the computation and manage it. + + ptask = pprocess.BackgroundCallable("task.socket", pprocess.MakeParallel(task)) + + # Perform the work. + + ptask() + + # Discard the callable. + + del ptask + print "Discarded the callable." + + if "--start" not in sys.argv: + + # Open a queue and reconnect to the task. + + print "Opening a queue." + queue = pprocess.BackgroundQueue("task.socket") + + # Wait for the results. + + print "Waiting for persistent results" + for results in queue: + pass # should only be one element + + # Show the results. + + for i in range(0, N): + for result in results[i*N:i*N+N]: + print result, + print + + print "Time taken:", time.time() - t + +# vim: tabstop=4 expandtab shiftwidth=4 diff -r 614aa092ba4e -r 11411cb3d9bc examples/simple_persistent_managed_queue.py --- a/examples/simple_persistent_managed_queue.py Mon Jun 02 22:43:43 2008 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,112 +0,0 @@ -#!/usr/bin/env python - -""" -A simple example of parallel computation using a queue, managed callables and -persistent communications. -""" - -import pprocess -import time -#import random -import sys - -# Array size and a limit on the number of processes. - -N = 10 -limit = 10 -delay = 1 - -# Work function and monitoring class. - -def calculate(i, j): - - """ - A supposedly time-consuming calculation on 'i' and 'j'. - """ - - #time.sleep(delay * random.random()) - time.sleep(delay) - return (i, j, i * N + j) - -# Main computation. - -def task(): - - # Initialise the communications queue with a limit on the number of - # channels/processes. - - queue = pprocess.Queue(limit=limit) - - # Initialise an array. - - results = [0] * N * N - - # Wrap the calculate function and manage it. - - calc = queue.manage(pprocess.MakeParallel(calculate)) - - # Perform the work. - - print "Calculating..." - for i in range(0, N): - for j in range(0, N): - calc(i, j) - - # Store the results as they arrive. - - print "Finishing..." - for i, j, result in queue: - results[i*N+j] = result - - return results - -# Main program. - -if __name__ == "__main__": - - t = time.time() - - if "--reconnect" not in sys.argv: - - queue = pprocess.PersistentQueue(limit=1) - - # Wrap the computation and manage it. - - ptask = queue.manage("task.socket", pprocess.MakeParallel(task)) - - # Perform the work. - - ptask() - - # Close down the queue. - - del queue - print "Closed original queue." - - if "--start" not in sys.argv: - - # Open another queue. - - print "Opening new queue." - queue = pprocess.PersistentQueue(limit=1) - - # Reconnect to the task. - - queue.connect("task.socket") - - # Wait for the results. - - print "Waiting for persistent results" - for results in queue: - pass # should only be one element - - # Show the results. - - for i in range(0, N): - for result in results[i*N:i*N+N]: - print result, - print - - print "Time taken:", time.time() - t - -# vim: tabstop=4 expandtab shiftwidth=4 diff -r 614aa092ba4e -r 11411cb3d9bc examples/simple_persistent_queue.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/examples/simple_persistent_queue.py Mon Jun 02 22:44:33 2008 +0000 @@ -0,0 +1,117 @@ +#!/usr/bin/env python + +""" +A simple example of parallel computation using persistent queues and +communications. +""" + +import pprocess +import time +#import random +import sys + +# Array size and a limit on the number of processes. + +N = 10 +limit = 2 # since N background processes will be used, this is reduced +delay = 1 + +# Work function and monitoring class. + +def calculate(i, j): + + """ + A supposedly time-consuming calculation on 'i' and 'j'. + """ + + #time.sleep(delay * random.random()) + time.sleep(delay) + return (i, j, i * N + j) + +# Background computation. + +def task(i): + + # Initialise the communications queue with a limit on the number of + # channels/processes. + + queue = pprocess.Queue(limit=limit) + + # Initialise an array. + + results = [0] * N + + # Wrap the calculate function and manage it. + + calc = queue.manage(pprocess.MakeParallel(calculate)) + + # Perform the work. + + print "Calculating..." + for j in range(0, N): + calc(i, j) + + # Store the results as they arrive. + + print "Finishing..." + for i, j, result in queue: + results[j] = result + + return i, results + +# Main program. + +if __name__ == "__main__": + + t = time.time() + + if "--reconnect" not in sys.argv: + + # Wrap the computation and manage it. + + ptask = pprocess.MakeParallel(task) + + for i in range(0, N): + + # Make a distinct callable for each part of the computation. + + ptask_i = pprocess.BackgroundCallable("task-%d.socket" % i, ptask) + + # Perform the work. + + ptask_i(i) + + # Discard the callable. + + del ptask + print "Discarded the callable." + + if "--start" not in sys.argv: + + # Open a queue and reconnect to the task. + + print "Opening a queue." + queue = pprocess.PersistentQueue() + for i in range(0, N): + queue.connect("task-%d.socket" % i) + + # Initialise an array. + + results = [0] * N + + # Wait for the results. + + print "Waiting for persistent results" + for i, result in queue: + results[i] = result + + # Show the results. + + for i in range(0, N): + for result in results[i]: + print result, + print + + print "Time taken:", time.time() - t + +# vim: tabstop=4 expandtab shiftwidth=4