1.1 --- a/examples/simple_persistent_managed_queue.py Mon Jun 02 22:43:43 2008 +0000
1.2 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000
1.3 @@ -1,112 +0,0 @@
1.4 -#!/usr/bin/env python
1.5 -
1.6 -"""
1.7 -A simple example of parallel computation using a queue, managed callables and
1.8 -persistent communications.
1.9 -"""
1.10 -
1.11 -import pprocess
1.12 -import time
1.13 -#import random
1.14 -import sys
1.15 -
1.16 -# Array size and a limit on the number of processes.
1.17 -
1.18 -N = 10
1.19 -limit = 10
1.20 -delay = 1
1.21 -
1.22 -# Work function and monitoring class.
1.23 -
1.24 -def calculate(i, j):
1.25 -
1.26 - """
1.27 - A supposedly time-consuming calculation on 'i' and 'j'.
1.28 - """
1.29 -
1.30 - #time.sleep(delay * random.random())
1.31 - time.sleep(delay)
1.32 - return (i, j, i * N + j)
1.33 -
1.34 -# Main computation.
1.35 -
1.36 -def task():
1.37 -
1.38 - # Initialise the communications queue with a limit on the number of
1.39 - # channels/processes.
1.40 -
1.41 - queue = pprocess.Queue(limit=limit)
1.42 -
1.43 - # Initialise an array.
1.44 -
1.45 - results = [0] * N * N
1.46 -
1.47 - # Wrap the calculate function and manage it.
1.48 -
1.49 - calc = queue.manage(pprocess.MakeParallel(calculate))
1.50 -
1.51 - # Perform the work.
1.52 -
1.53 - print "Calculating..."
1.54 - for i in range(0, N):
1.55 - for j in range(0, N):
1.56 - calc(i, j)
1.57 -
1.58 - # Store the results as they arrive.
1.59 -
1.60 - print "Finishing..."
1.61 - for i, j, result in queue:
1.62 - results[i*N+j] = result
1.63 -
1.64 - return results
1.65 -
1.66 -# Main program.
1.67 -
1.68 -if __name__ == "__main__":
1.69 -
1.70 - t = time.time()
1.71 -
1.72 - if "--reconnect" not in sys.argv:
1.73 -
1.74 - queue = pprocess.PersistentQueue(limit=1)
1.75 -
1.76 - # Wrap the computation and manage it.
1.77 -
1.78 - ptask = queue.manage("task.socket", pprocess.MakeParallel(task))
1.79 -
1.80 - # Perform the work.
1.81 -
1.82 - ptask()
1.83 -
1.84 - # Close down the queue.
1.85 -
1.86 - del queue
1.87 - print "Closed original queue."
1.88 -
1.89 - if "--start" not in sys.argv:
1.90 -
1.91 - # Open another queue.
1.92 -
1.93 - print "Opening new queue."
1.94 - queue = pprocess.PersistentQueue(limit=1)
1.95 -
1.96 - # Reconnect to the task.
1.97 -
1.98 - queue.connect("task.socket")
1.99 -
1.100 - # Wait for the results.
1.101 -
1.102 - print "Waiting for persistent results"
1.103 - for results in queue:
1.104 - pass # should only be one element
1.105 -
1.106 - # Show the results.
1.107 -
1.108 - for i in range(0, N):
1.109 - for result in results[i*N:i*N+N]:
1.110 - print result,
1.111 - print
1.112 -
1.113 - print "Time taken:", time.time() - t
1.114 -
1.115 -# vim: tabstop=4 expandtab shiftwidth=4