# HG changeset patch # User paulb # Date 1184540944 0 # Node ID 2b9b685985afaecbe26d1a8266fe78c9383d91df # Parent 37cca64518fbecb097e8e13e8d00d4caa0d2aeb8 [project @ 2007-07-15 23:09:04 by paulb] Added support for wrapping and managing callables automatically, for map-style processing, and for making existing functions parallel-aware. Updated release information. diff -r 37cca64518fb -r 2b9b685985af PKG-INFO --- a/PKG-INFO Thu Jun 19 21:43:44 2008 +0200 +++ b/PKG-INFO Sun Jul 15 23:09:04 2007 +0000 @@ -1,11 +1,11 @@ Metadata-Version: 1.1 Name: parallel -Version: 0.2.5 +Version: 0.2.6 Author: Paul Boddie Author-email: paul at boddie org uk Maintainer: Paul Boddie Maintainer-email: paul at boddie org uk -Download-url: http://www.boddie.org.uk/python/downloads/parallel-0.2.5.tar.gz +Download-url: http://www.boddie.org.uk/python/downloads/parallel-0.2.6.tar.gz Summary: Elementary parallel programming for Python License: LGPL (version 3 or later) Description: The pprocess module provides elementary support for parallel diff -r 37cca64518fb -r 2b9b685985af README.txt --- a/README.txt Thu Jun 19 21:43:44 2008 +0200 +++ b/README.txt Sun Jul 15 23:09:04 2007 +0000 @@ -63,6 +63,16 @@ This software depends on standard library features which are stated as being available only on "UNIX"; it has only been tested on a GNU/Linux system. +New in parallel 0.2.6 (Changes since parallel 0.2.5) +---------------------------------------------------- + + * Added managed callables: wrappers around callables which cause them to be + automatically managed by the exchange from which they were acquired. + * Added MakeParallel: a wrapper instantiated around a normal function which + sends the result of that function over the supplied channel when invoked. + * Added a Map class which attempts to emulate the built-in map function. + * Extended and updated the examples. + New in parallel 0.2.5 (Changes since parallel 0.2.4) ---------------------------------------------------- diff -r 37cca64518fb -r 2b9b685985af pprocess.py --- a/pprocess.py Thu Jun 19 21:43:44 2008 +0200 +++ b/pprocess.py Sun Jul 15 23:09:04 2007 +0000 @@ -22,10 +22,11 @@ -------- The recommended styles of programming using pprocess involve the "Thread-style -Processing" and "Convenient Message Exchanges" sections below, although -developers may wish to read the "Message Exchanges" section for more details of -the API concerned, and the "Fork-style Processing" section may be of interest to -those with experience of large scale parallel processing systems. +Processing", "Convenient Message Exchanges", "Managed Callables" and "Map-style +Processing" sections below, although developers may wish to read the "Message +Exchanges" section for more details of the API concerned, and the "Fork-style +Processing" section may be of interest to those with experience of large scale +parallel processing systems. Thread-style Processing ----------------------- @@ -148,6 +149,45 @@ exchange API as described above. However, it permits much simpler and clearer code. +Managed Callables +----------------- + +A further simplification of the above convenient use of message exchanges +involves the creation of callables (eg. functions) which are automatically +monitored by an exchange. We create such a callable by calling the wrap method +on an exchange: + +myfn = exchange.wrap(fn) + +This callable can then be invoked instead of using the exchange's start method: + +myfn(arg1, arg2, named1=value1, named2=value2) + +The exchange's finish method can be used as usual to process incoming data. + +Map-style Processing +-------------------- + +In situations where a callable would normally be used in conjunction with the +Python built-in map function, an alternative solution can be adopted by using +the pmap function: + +pprocess.pmap(fn, sequence) + +Here, the sequence would have to contain elements that each contain the required +parameters of the specified callable, fn. + +Making Existing Functions Parallel +---------------------------------- + +In making a program parallel, existing functions which only return results can +be manually modified to accept and use channels to communicate results back to +the main process. However, a simple alternative is to use the MakeParallel class +to provide a wrapper around unmodified functions which will return the results +from those functions in the channels provided. For example: + +fn = pprocess.MakeParallel(originalfn) + Signals and Waiting ------------------- @@ -175,7 +215,7 @@ http://www.greenend.org.uk/rjk/2001/06/poll.html """ -__version__ = "0.2.5" +__version__ = "0.2.6" import os import sys @@ -187,6 +227,8 @@ except ImportError: import pickle +# Communications. + class AcknowledgementError(Exception): pass @@ -279,6 +321,8 @@ finally: self._send("OK") +# Management of processes and communications. + class Exchange: """ @@ -417,7 +461,7 @@ # Convenience methods. - def start(self, callable, *args, **kwargs): + def start(self, callable, *args, **kw): """ Using pprocess.start, create a new process for the given 'callable' @@ -425,7 +469,96 @@ created between this process and the created process. """ - self.add_wait(start(callable, *args, **kwargs)) + self.add_wait(start(callable, *args, **kw)) + + def manage(self, callable): + + """ + Wrap the given 'callable' in an object which can then be called in the + same way as 'callable', but with new processes and communications + managed automatically. + """ + + return ManagedCallable(callable, self) + +class ManagedCallable: + + "A callable managed by an exchange." + + def __init__(self, callable, exchange): + + """ + Wrap the given 'callable', using the given 'exchange' to monitor the + channels created for communications between this and the created + processes. + """ + + self.callable = callable + self.exchange = exchange + + def __call__(self, *args, **kw): + + "Invoke the callable with the supplied arguments." + + self.exchange.start(self.callable, *args, **kw) + +# Abstractions and utilities. + +class Map(Exchange): + + "An exchange which can be used like the built-in 'map' function." + + def add(self, channel): + + "Add the given 'channel' to the exchange." + + Exchange.add(self, channel) + self.channels.append(channel) + + def __call__(self, callable, sequence): + + "Invoke 'callable' for each element in the 'sequence'." + + # Remember the channel addition order to order output. + + self.channels = [] + self.results = {} + + for i in sequence: + self.start(callable, *i) + self.finish() + + # NOTE: Could use a generator instead. + + result = [] + for channel in self.channels: + result.append(self.results[channel]) + return result + + def store_data(self, channel): + + "Accumulate the incoming data, associating results with channels." + + data = channel.receive() + self.results[channel] = data + +class MakeParallel: + + "A wrapper around functions making them able to communicate results." + + def __init__(self, callable): + + "Initialise the wrapper with the given 'callable'." + + self.callable = callable + + def __call__(self, channel, *args, **kw): + + "Invoke the callable and return its result via the given 'channel'." + + channel.send(self.callable(*args, **kw)) + +# Utility functions. def create(): @@ -446,7 +579,7 @@ child.close() return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0)) -def start(callable, *args, **kwargs): +def start(callable, *args, **kw): """ Create a new process which shall start running in the given 'callable'. @@ -460,7 +593,7 @@ if channel.pid == 0: try: try: - callable(channel, *args, **kwargs) + callable(channel, *args, **kw) except: exc_type, exc_value, exc_traceback = sys.exc_info() channel.send(exc_value) @@ -480,4 +613,11 @@ except OSError: pass +def pmap(callable, sequence): + + "A parallel version of the built-in map function." + + mymap = Map() + return mymap(callable, sequence) + # vim: tabstop=4 expandtab shiftwidth=4