1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
1.2 +++ b/examples/simple_background_queue.py Mon Jun 02 22:44:33 2008 +0000
1.3 @@ -0,0 +1,106 @@
1.4 +#!/usr/bin/env python
1.5 +
1.6 +"""
1.7 +A simple example of parallel computation using a queue, background callables and
1.8 +persistent communications.
1.9 +"""
1.10 +
1.11 +import pprocess
1.12 +import time
1.13 +#import random
1.14 +import sys
1.15 +
1.16 +# Array size and a limit on the number of processes.
1.17 +
1.18 +N = 10
1.19 +limit = 10
1.20 +delay = 1
1.21 +
1.22 +# Work function and monitoring class.
1.23 +
1.24 +def calculate(i, j):
1.25 +
1.26 + """
1.27 + A supposedly time-consuming calculation on 'i' and 'j'.
1.28 + """
1.29 +
1.30 + #time.sleep(delay * random.random())
1.31 + time.sleep(delay)
1.32 + return (i, j, i * N + j)
1.33 +
1.34 +# Main computation.
1.35 +
1.36 +def task():
1.37 +
1.38 + # Initialise the communications queue with a limit on the number of
1.39 + # channels/processes.
1.40 +
1.41 + queue = pprocess.Queue(limit=limit)
1.42 +
1.43 + # Initialise an array.
1.44 +
1.45 + results = [0] * N * N
1.46 +
1.47 + # Wrap the calculate function and manage it.
1.48 +
1.49 + calc = queue.manage(pprocess.MakeParallel(calculate))
1.50 +
1.51 + # Perform the work.
1.52 +
1.53 + print "Calculating..."
1.54 + for i in range(0, N):
1.55 + for j in range(0, N):
1.56 + calc(i, j)
1.57 +
1.58 + # Store the results as they arrive.
1.59 +
1.60 + print "Finishing..."
1.61 + for i, j, result in queue:
1.62 + results[i*N+j] = result
1.63 +
1.64 + return results
1.65 +
1.66 +# Main program.
1.67 +
1.68 +if __name__ == "__main__":
1.69 +
1.70 + t = time.time()
1.71 +
1.72 + if "--reconnect" not in sys.argv:
1.73 +
1.74 + # Wrap the computation and manage it.
1.75 +
1.76 + ptask = pprocess.BackgroundCallable("task.socket", pprocess.MakeParallel(task))
1.77 +
1.78 + # Perform the work.
1.79 +
1.80 + ptask()
1.81 +
1.82 + # Discard the callable.
1.83 +
1.84 + del ptask
1.85 + print "Discarded the callable."
1.86 +
1.87 + if "--start" not in sys.argv:
1.88 +
1.89 + # Open a queue and reconnect to the task.
1.90 +
1.91 + print "Opening a queue."
1.92 + queue = pprocess.BackgroundQueue("task.socket")
1.93 +
1.94 + # Wait for the results.
1.95 +
1.96 + print "Waiting for persistent results"
1.97 + for results in queue:
1.98 + pass # should only be one element
1.99 +
1.100 + # Show the results.
1.101 +
1.102 + for i in range(0, N):
1.103 + for result in results[i*N:i*N+N]:
1.104 + print result,
1.105 + print
1.106 +
1.107 + print "Time taken:", time.time() - t
1.108 +
1.109 +# vim: tabstop=4 expandtab shiftwidth=4
2.1 --- a/examples/simple_persistent_managed_queue.py Mon Jun 02 22:43:43 2008 +0000
2.2 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000
2.3 @@ -1,112 +0,0 @@
2.4 -#!/usr/bin/env python
2.5 -
2.6 -"""
2.7 -A simple example of parallel computation using a queue, managed callables and
2.8 -persistent communications.
2.9 -"""
2.10 -
2.11 -import pprocess
2.12 -import time
2.13 -#import random
2.14 -import sys
2.15 -
2.16 -# Array size and a limit on the number of processes.
2.17 -
2.18 -N = 10
2.19 -limit = 10
2.20 -delay = 1
2.21 -
2.22 -# Work function and monitoring class.
2.23 -
2.24 -def calculate(i, j):
2.25 -
2.26 - """
2.27 - A supposedly time-consuming calculation on 'i' and 'j'.
2.28 - """
2.29 -
2.30 - #time.sleep(delay * random.random())
2.31 - time.sleep(delay)
2.32 - return (i, j, i * N + j)
2.33 -
2.34 -# Main computation.
2.35 -
2.36 -def task():
2.37 -
2.38 - # Initialise the communications queue with a limit on the number of
2.39 - # channels/processes.
2.40 -
2.41 - queue = pprocess.Queue(limit=limit)
2.42 -
2.43 - # Initialise an array.
2.44 -
2.45 - results = [0] * N * N
2.46 -
2.47 - # Wrap the calculate function and manage it.
2.48 -
2.49 - calc = queue.manage(pprocess.MakeParallel(calculate))
2.50 -
2.51 - # Perform the work.
2.52 -
2.53 - print "Calculating..."
2.54 - for i in range(0, N):
2.55 - for j in range(0, N):
2.56 - calc(i, j)
2.57 -
2.58 - # Store the results as they arrive.
2.59 -
2.60 - print "Finishing..."
2.61 - for i, j, result in queue:
2.62 - results[i*N+j] = result
2.63 -
2.64 - return results
2.65 -
2.66 -# Main program.
2.67 -
2.68 -if __name__ == "__main__":
2.69 -
2.70 - t = time.time()
2.71 -
2.72 - if "--reconnect" not in sys.argv:
2.73 -
2.74 - queue = pprocess.PersistentQueue(limit=1)
2.75 -
2.76 - # Wrap the computation and manage it.
2.77 -
2.78 - ptask = queue.manage("task.socket", pprocess.MakeParallel(task))
2.79 -
2.80 - # Perform the work.
2.81 -
2.82 - ptask()
2.83 -
2.84 - # Close down the queue.
2.85 -
2.86 - del queue
2.87 - print "Closed original queue."
2.88 -
2.89 - if "--start" not in sys.argv:
2.90 -
2.91 - # Open another queue.
2.92 -
2.93 - print "Opening new queue."
2.94 - queue = pprocess.PersistentQueue(limit=1)
2.95 -
2.96 - # Reconnect to the task.
2.97 -
2.98 - queue.connect("task.socket")
2.99 -
2.100 - # Wait for the results.
2.101 -
2.102 - print "Waiting for persistent results"
2.103 - for results in queue:
2.104 - pass # should only be one element
2.105 -
2.106 - # Show the results.
2.107 -
2.108 - for i in range(0, N):
2.109 - for result in results[i*N:i*N+N]:
2.110 - print result,
2.111 - print
2.112 -
2.113 - print "Time taken:", time.time() - t
2.114 -
2.115 -# vim: tabstop=4 expandtab shiftwidth=4
3.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
3.2 +++ b/examples/simple_persistent_queue.py Mon Jun 02 22:44:33 2008 +0000
3.3 @@ -0,0 +1,117 @@
3.4 +#!/usr/bin/env python
3.5 +
3.6 +"""
3.7 +A simple example of parallel computation using persistent queues and
3.8 +communications.
3.9 +"""
3.10 +
3.11 +import pprocess
3.12 +import time
3.13 +#import random
3.14 +import sys
3.15 +
3.16 +# Array size and a limit on the number of processes.
3.17 +
3.18 +N = 10
3.19 +limit = 2 # since N background processes will be used, this is reduced
3.20 +delay = 1
3.21 +
3.22 +# Work function and monitoring class.
3.23 +
3.24 +def calculate(i, j):
3.25 +
3.26 + """
3.27 + A supposedly time-consuming calculation on 'i' and 'j'.
3.28 + """
3.29 +
3.30 + #time.sleep(delay * random.random())
3.31 + time.sleep(delay)
3.32 + return (i, j, i * N + j)
3.33 +
3.34 +# Background computation.
3.35 +
3.36 +def task(i):
3.37 +
3.38 + # Initialise the communications queue with a limit on the number of
3.39 + # channels/processes.
3.40 +
3.41 + queue = pprocess.Queue(limit=limit)
3.42 +
3.43 + # Initialise an array.
3.44 +
3.45 + results = [0] * N
3.46 +
3.47 + # Wrap the calculate function and manage it.
3.48 +
3.49 + calc = queue.manage(pprocess.MakeParallel(calculate))
3.50 +
3.51 + # Perform the work.
3.52 +
3.53 + print "Calculating..."
3.54 + for j in range(0, N):
3.55 + calc(i, j)
3.56 +
3.57 + # Store the results as they arrive.
3.58 +
3.59 + print "Finishing..."
3.60 + for i, j, result in queue:
3.61 + results[j] = result
3.62 +
3.63 + return i, results
3.64 +
3.65 +# Main program.
3.66 +
3.67 +if __name__ == "__main__":
3.68 +
3.69 + t = time.time()
3.70 +
3.71 + if "--reconnect" not in sys.argv:
3.72 +
3.73 + # Wrap the computation and manage it.
3.74 +
3.75 + ptask = pprocess.MakeParallel(task)
3.76 +
3.77 + for i in range(0, N):
3.78 +
3.79 + # Make a distinct callable for each part of the computation.
3.80 +
3.81 + ptask_i = pprocess.BackgroundCallable("task-%d.socket" % i, ptask)
3.82 +
3.83 + # Perform the work.
3.84 +
3.85 + ptask_i(i)
3.86 +
3.87 + # Discard the callable.
3.88 +
3.89 + del ptask
3.90 + print "Discarded the callable."
3.91 +
3.92 + if "--start" not in sys.argv:
3.93 +
3.94 + # Open a queue and reconnect to the task.
3.95 +
3.96 + print "Opening a queue."
3.97 + queue = pprocess.PersistentQueue()
3.98 + for i in range(0, N):
3.99 + queue.connect("task-%d.socket" % i)
3.100 +
3.101 + # Initialise an array.
3.102 +
3.103 + results = [0] * N
3.104 +
3.105 + # Wait for the results.
3.106 +
3.107 + print "Waiting for persistent results"
3.108 + for i, result in queue:
3.109 + results[i] = result
3.110 +
3.111 + # Show the results.
3.112 +
3.113 + for i in range(0, N):
3.114 + for result in results[i]:
3.115 + print result,
3.116 + print
3.117 +
3.118 + print "Time taken:", time.time() - t
3.119 +
3.120 +# vim: tabstop=4 expandtab shiftwidth=4