1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
1.2 +++ b/examples/simple_persistent_queue.py Mon Jun 02 22:44:33 2008 +0000
1.3 @@ -0,0 +1,117 @@
1.4 +#!/usr/bin/env python
1.5 +
1.6 +"""
1.7 +A simple example of parallel computation using persistent queues and
1.8 +communications.
1.9 +"""
1.10 +
1.11 +import pprocess
1.12 +import time
1.13 +#import random
1.14 +import sys
1.15 +
1.16 +# Array size and a limit on the number of processes.
1.17 +
1.18 +N = 10
1.19 +limit = 2 # since N background processes will be used, this is reduced
1.20 +delay = 1
1.21 +
1.22 +# Work function and monitoring class.
1.23 +
1.24 +def calculate(i, j):
1.25 +
1.26 + """
1.27 + A supposedly time-consuming calculation on 'i' and 'j'.
1.28 + """
1.29 +
1.30 + #time.sleep(delay * random.random())
1.31 + time.sleep(delay)
1.32 + return (i, j, i * N + j)
1.33 +
1.34 +# Background computation.
1.35 +
1.36 +def task(i):
1.37 +
1.38 + # Initialise the communications queue with a limit on the number of
1.39 + # channels/processes.
1.40 +
1.41 + queue = pprocess.Queue(limit=limit)
1.42 +
1.43 + # Initialise an array.
1.44 +
1.45 + results = [0] * N
1.46 +
1.47 + # Wrap the calculate function and manage it.
1.48 +
1.49 + calc = queue.manage(pprocess.MakeParallel(calculate))
1.50 +
1.51 + # Perform the work.
1.52 +
1.53 + print "Calculating..."
1.54 + for j in range(0, N):
1.55 + calc(i, j)
1.56 +
1.57 + # Store the results as they arrive.
1.58 +
1.59 + print "Finishing..."
1.60 + for i, j, result in queue:
1.61 + results[j] = result
1.62 +
1.63 + return i, results
1.64 +
1.65 +# Main program.
1.66 +
1.67 +if __name__ == "__main__":
1.68 +
1.69 + t = time.time()
1.70 +
1.71 + if "--reconnect" not in sys.argv:
1.72 +
1.73 + # Wrap the computation and manage it.
1.74 +
1.75 + ptask = pprocess.MakeParallel(task)
1.76 +
1.77 + for i in range(0, N):
1.78 +
1.79 + # Make a distinct callable for each part of the computation.
1.80 +
1.81 + ptask_i = pprocess.BackgroundCallable("task-%d.socket" % i, ptask)
1.82 +
1.83 + # Perform the work.
1.84 +
1.85 + ptask_i(i)
1.86 +
1.87 + # Discard the callable.
1.88 +
1.89 + del ptask
1.90 + print "Discarded the callable."
1.91 +
1.92 + if "--start" not in sys.argv:
1.93 +
1.94 + # Open a queue and reconnect to the task.
1.95 +
1.96 + print "Opening a queue."
1.97 + queue = pprocess.PersistentQueue()
1.98 + for i in range(0, N):
1.99 + queue.connect("task-%d.socket" % i)
1.100 +
1.101 + # Initialise an array.
1.102 +
1.103 + results = [0] * N
1.104 +
1.105 + # Wait for the results.
1.106 +
1.107 + print "Waiting for persistent results"
1.108 + for i, result in queue:
1.109 + results[i] = result
1.110 +
1.111 + # Show the results.
1.112 +
1.113 + for i in range(0, N):
1.114 + for result in results[i]:
1.115 + print result,
1.116 + print
1.117 +
1.118 + print "Time taken:", time.time() - t
1.119 +
1.120 +# vim: tabstop=4 expandtab shiftwidth=4