# HG changeset patch # User Paul Boddie # Date 1242926646 -7200 # Node ID a560bcdaca9601ef45c495c7c139d043c2bed497 # Parent 970c7d8a5b1b8f09db8a039b57b0c33e2f2c52bd Added incremental access to Map instance results, along with an iteration interface. Added an example of pmap and iteration. Updated release notes. diff -r 970c7d8a5b1b -r a560bcdaca96 README.txt --- a/README.txt Sun May 17 23:29:54 2009 +0200 +++ b/README.txt Thu May 21 19:24:06 2009 +0200 @@ -42,11 +42,13 @@ simple_managed_queue Yes Yes Yes simple_managed Yes Yes Yes simple_pmap Yes +simple_pmap_iter Yes simple_start_queue Yes Yes Yes simple_start Yes Yes -The simplest parallel program is simple_pmap.py which employs the pmap -function resembling the built-in map function in Python. +The simplest parallel programs are simple_pmap.py and simple_pmap_iter.py +which employ the pmap function resembling the built-in map function in +Python. Other simple programs are those employing the Queue class, together with those using the manage method which associates functions or callables with Queue or @@ -143,13 +145,18 @@ ------------ This software depends on standard library features which are stated as being -available only on "UNIX"; it has only been tested on a GNU/Linux system. +available only on "UNIX"; it has only been tested repeatedly on a GNU/Linux +system, and occasionally on systems running OpenSolaris. New in pprocess 0.4.1 (Changes since pprocess 0.4) -------------------------------------------------- * Fixed the get_number_of_cores function to work with /proc/cpuinfo where the "physical id" field is missing. + * Changed the Map class to permit incremental access to received results + from completed parts of the sequence of inputs, also adding an iteration + interface. + * Added an example, simple_pmap_iter.py, to demonstrate iteration over maps. New in pprocess 0.4 (Changes since pprocess 0.3.1) -------------------------------------------------- diff -r 970c7d8a5b1b -r a560bcdaca96 examples/simple_pmap_iter.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/examples/simple_pmap_iter.py Thu May 21 19:24:06 2009 +0200 @@ -0,0 +1,54 @@ +#!/usr/bin/env python + +""" +A simple example of parallel computation using map-style processing. +""" + +import pprocess +import time +#import random + +# Array size and a limit on the number of processes. + +N = 10 +limit = 10 +delay = 1 + +# Work function. + +def calculate(t): + + "A supposedly time-consuming calculation on 't'." + + i, j = t + #time.sleep(delay * random.random()) + time.sleep(delay) + return i * N + j + +# Main program. + +if __name__ == "__main__": + + t = time.time() + + # Initialise an array. + + sequence = [] + for i in range(0, N): + for j in range(0, N): + sequence.append((i, j)) + + # Perform the work. + + results = pprocess.pmap(calculate, sequence, limit=limit) + + # Show the results. + + for i, result in enumerate(results): + print result, + if i % N == N - 1: + print + + print "Time taken:", time.time() - t + +# vim: tabstop=4 expandtab shiftwidth=4 diff -r 970c7d8a5b1b -r a560bcdaca96 packages/ubuntu-feisty/python-pprocess/debian/changelog --- a/packages/ubuntu-feisty/python-pprocess/debian/changelog Sun May 17 23:29:54 2009 +0200 +++ b/packages/ubuntu-feisty/python-pprocess/debian/changelog Thu May 21 19:24:06 2009 +0200 @@ -2,8 +2,13 @@ * Fixed the get_number_of_cores function to work with /proc/cpuinfo where the "physical id" field is missing. + * Changed the Map class to permit incremental access to + received results from completed parts of the sequence of + inputs, also adding an iteration interface. + * Added an example, simple_pmap_iter.py, to demonstrate + iteration over maps. - -- Paul Boddie Sun, 17 May 2009 23:28:27 +0200 + -- Paul Boddie Thu, 21 May 2009 19:13:47 +0200 pprocess (0.4-0ubuntu1) feisty; urgency=low diff -r 970c7d8a5b1b -r a560bcdaca96 packages/ubuntu-hoary/python2.4-parallel-pprocess/debian/changelog --- a/packages/ubuntu-hoary/python2.4-parallel-pprocess/debian/changelog Sun May 17 23:29:54 2009 +0200 +++ b/packages/ubuntu-hoary/python2.4-parallel-pprocess/debian/changelog Thu May 21 19:24:06 2009 +0200 @@ -2,8 +2,13 @@ * Fixed the get_number_of_cores function to work with /proc/cpuinfo where the "physical id" field is missing. + * Changed the Map class to permit incremental access to + received results from completed parts of the sequence of + inputs, also adding an iteration interface. + * Added an example, simple_pmap_iter.py, to demonstrate + iteration over maps. - -- Paul Boddie Sun, 17 May 2009 23:28:59 +0200 + -- Paul Boddie Thu, 21 May 2009 19:13:32 +0200 parallel-pprocess (0.4-0ubuntu1) hoary; urgency=low diff -r 970c7d8a5b1b -r a560bcdaca96 pprocess.py --- a/pprocess.py Sun May 17 23:29:54 2009 +0200 +++ b/pprocess.py Thu May 21 19:24:06 2009 +0200 @@ -38,6 +38,10 @@ except NameError: from sets import Set as set +# Special values. + +class Undefined: pass + # Communications. class AcknowledgementError(Exception): @@ -631,6 +635,7 @@ self.channel_number = 0 self.channels = {} self.results = [] + self.current_index = 0 def add(self, channel): @@ -648,7 +653,7 @@ process and the created process. """ - self.results.append(None) # placeholder + self.results.append(Undefined) # placeholder Exchange.start(self, callable, *args, **kw) def create(self): @@ -660,7 +665,7 @@ this exchange. """ - self.results.append(None) # placeholder + self.results.append(Undefined) # placeholder return Exchange.create(self) def __call__(self, callable, sequence): @@ -683,14 +688,6 @@ return self - def __getitem__(self, i): - self.finish() - return self.results[i] - - def __iter__(self): - self.finish() - return iter(self.results) - def store_data(self, channel): "Accumulate the incoming data, associating results with channels." @@ -699,6 +696,58 @@ self.results[self.channels[channel]] = data del self.channels[channel] + def __iter__(self): + return self + + def next(self): + + "Return the next element in the map." + + try: + return self._next() + except IndexError: + pass + + while self.active(): + self.store() + try: + return self._next() + except IndexError: + pass + else: + raise StopIteration + + def __getitem__(self, i): + + "Return element 'i' from the map." + + try: + return self._get(i) + except IndexError: + pass + + while self.active(): + self.store() + try: + return self._get(i) + except IndexError: + pass + else: + raise IndexError, i + + # Helper methods for the above access methods. + + def _next(self): + result = self._get(self.current_index) + self.current_index += 1 + return result + + def _get(self, i): + result = self.results[i] + if result is Undefined or isinstance(i, slice) and Undefined in result: + raise IndexError, i + return result + class Queue(Exchange): """