3.1 --- a/pprocess.py Thu Jun 19 21:43:44 2008 +0200
3.2 +++ b/pprocess.py Sun Jul 15 23:09:04 2007 +0000
3.3 @@ -22,10 +22,11 @@
3.4 --------
3.5
3.6 The recommended styles of programming using pprocess involve the "Thread-style
3.7 -Processing" and "Convenient Message Exchanges" sections below, although
3.8 -developers may wish to read the "Message Exchanges" section for more details of
3.9 -the API concerned, and the "Fork-style Processing" section may be of interest to
3.10 -those with experience of large scale parallel processing systems.
3.11 +Processing", "Convenient Message Exchanges", "Managed Callables" and "Map-style
3.12 +Processing" sections below, although developers may wish to read the "Message
3.13 +Exchanges" section for more details of the API concerned, and the "Fork-style
3.14 +Processing" section may be of interest to those with experience of large scale
3.15 +parallel processing systems.
3.16
3.17 Thread-style Processing
3.18 -----------------------
3.19 @@ -148,6 +149,45 @@
3.20 exchange API as described above. However, it permits much simpler and clearer
3.21 code.
3.22
3.23 +Managed Callables
3.24 +-----------------
3.25 +
3.26 +A further simplification of the above convenient use of message exchanges
3.27 +involves the creation of callables (eg. functions) which are automatically
3.28 +monitored by an exchange. We create such a callable by calling the wrap method
3.29 +on an exchange:
3.30 +
3.31 +myfn = exchange.wrap(fn)
3.32 +
3.33 +This callable can then be invoked instead of using the exchange's start method:
3.34 +
3.35 +myfn(arg1, arg2, named1=value1, named2=value2)
3.36 +
3.37 +The exchange's finish method can be used as usual to process incoming data.
3.38 +
3.39 +Map-style Processing
3.40 +--------------------
3.41 +
3.42 +In situations where a callable would normally be used in conjunction with the
3.43 +Python built-in map function, an alternative solution can be adopted by using
3.44 +the pmap function:
3.45 +
3.46 +pprocess.pmap(fn, sequence)
3.47 +
3.48 +Here, the sequence would have to contain elements that each contain the required
3.49 +parameters of the specified callable, fn.
3.50 +
3.51 +Making Existing Functions Parallel
3.52 +----------------------------------
3.53 +
3.54 +In making a program parallel, existing functions which only return results can
3.55 +be manually modified to accept and use channels to communicate results back to
3.56 +the main process. However, a simple alternative is to use the MakeParallel class
3.57 +to provide a wrapper around unmodified functions which will return the results
3.58 +from those functions in the channels provided. For example:
3.59 +
3.60 +fn = pprocess.MakeParallel(originalfn)
3.61 +
3.62 Signals and Waiting
3.63 -------------------
3.64
3.65 @@ -175,7 +215,7 @@
3.66 http://www.greenend.org.uk/rjk/2001/06/poll.html
3.67 """
3.68
3.69 -__version__ = "0.2.5"
3.70 +__version__ = "0.2.6"
3.71
3.72 import os
3.73 import sys
3.74 @@ -187,6 +227,8 @@
3.75 except ImportError:
3.76 import pickle
3.77
3.78 +# Communications.
3.79 +
3.80 class AcknowledgementError(Exception):
3.81 pass
3.82
3.83 @@ -279,6 +321,8 @@
3.84 finally:
3.85 self._send("OK")
3.86
3.87 +# Management of processes and communications.
3.88 +
3.89 class Exchange:
3.90
3.91 """
3.92 @@ -417,7 +461,7 @@
3.93
3.94 # Convenience methods.
3.95
3.96 - def start(self, callable, *args, **kwargs):
3.97 + def start(self, callable, *args, **kw):
3.98
3.99 """
3.100 Using pprocess.start, create a new process for the given 'callable'
3.101 @@ -425,7 +469,96 @@
3.102 created between this process and the created process.
3.103 """
3.104
3.105 - self.add_wait(start(callable, *args, **kwargs))
3.106 + self.add_wait(start(callable, *args, **kw))
3.107 +
3.108 + def manage(self, callable):
3.109 +
3.110 + """
3.111 + Wrap the given 'callable' in an object which can then be called in the
3.112 + same way as 'callable', but with new processes and communications
3.113 + managed automatically.
3.114 + """
3.115 +
3.116 + return ManagedCallable(callable, self)
3.117 +
3.118 +class ManagedCallable:
3.119 +
3.120 + "A callable managed by an exchange."
3.121 +
3.122 + def __init__(self, callable, exchange):
3.123 +
3.124 + """
3.125 + Wrap the given 'callable', using the given 'exchange' to monitor the
3.126 + channels created for communications between this and the created
3.127 + processes.
3.128 + """
3.129 +
3.130 + self.callable = callable
3.131 + self.exchange = exchange
3.132 +
3.133 + def __call__(self, *args, **kw):
3.134 +
3.135 + "Invoke the callable with the supplied arguments."
3.136 +
3.137 + self.exchange.start(self.callable, *args, **kw)
3.138 +
3.139 +# Abstractions and utilities.
3.140 +
3.141 +class Map(Exchange):
3.142 +
3.143 + "An exchange which can be used like the built-in 'map' function."
3.144 +
3.145 + def add(self, channel):
3.146 +
3.147 + "Add the given 'channel' to the exchange."
3.148 +
3.149 + Exchange.add(self, channel)
3.150 + self.channels.append(channel)
3.151 +
3.152 + def __call__(self, callable, sequence):
3.153 +
3.154 + "Invoke 'callable' for each element in the 'sequence'."
3.155 +
3.156 + # Remember the channel addition order to order output.
3.157 +
3.158 + self.channels = []
3.159 + self.results = {}
3.160 +
3.161 + for i in sequence:
3.162 + self.start(callable, *i)
3.163 + self.finish()
3.164 +
3.165 + # NOTE: Could use a generator instead.
3.166 +
3.167 + result = []
3.168 + for channel in self.channels:
3.169 + result.append(self.results[channel])
3.170 + return result
3.171 +
3.172 + def store_data(self, channel):
3.173 +
3.174 + "Accumulate the incoming data, associating results with channels."
3.175 +
3.176 + data = channel.receive()
3.177 + self.results[channel] = data
3.178 +
3.179 +class MakeParallel:
3.180 +
3.181 + "A wrapper around functions making them able to communicate results."
3.182 +
3.183 + def __init__(self, callable):
3.184 +
3.185 + "Initialise the wrapper with the given 'callable'."
3.186 +
3.187 + self.callable = callable
3.188 +
3.189 + def __call__(self, channel, *args, **kw):
3.190 +
3.191 + "Invoke the callable and return its result via the given 'channel'."
3.192 +
3.193 + channel.send(self.callable(*args, **kw))
3.194 +
3.195 +# Utility functions.
3.196
3.197 def create():
3.198
3.199 @@ -446,7 +579,7 @@
3.200 child.close()
3.201 return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0))
3.202
3.203 -def start(callable, *args, **kwargs):
3.204 +def start(callable, *args, **kw):
3.205
3.206 """
3.207 Create a new process which shall start running in the given 'callable'.
3.208 @@ -460,7 +593,7 @@
3.209 if channel.pid == 0:
3.210 try:
3.211 try:
3.212 - callable(channel, *args, **kwargs)
3.213 + callable(channel, *args, **kw)
3.214 except:
3.215 exc_type, exc_value, exc_traceback = sys.exc_info()
3.216 channel.send(exc_value)
3.217 @@ -480,4 +613,11 @@
3.218 except OSError:
3.219 pass
3.220
3.221 +def pmap(callable, sequence):
3.222 +
3.223 + "A parallel version of the built-in map function."
3.224 +
3.225 + mymap = Map()
3.226 + return mymap(callable, sequence)
3.227 +
3.228 # vim: tabstop=4 expandtab shiftwidth=4