1.1 --- a/pprocess.py Thu Jun 19 21:43:44 2008 +0200
1.2 +++ b/pprocess.py Sun Jul 15 23:09:04 2007 +0000
1.3 @@ -22,10 +22,11 @@
1.4 --------
1.5
1.6 The recommended styles of programming using pprocess involve the "Thread-style
1.7 -Processing" and "Convenient Message Exchanges" sections below, although
1.8 -developers may wish to read the "Message Exchanges" section for more details of
1.9 -the API concerned, and the "Fork-style Processing" section may be of interest to
1.10 -those with experience of large scale parallel processing systems.
1.11 +Processing", "Convenient Message Exchanges", "Managed Callables" and "Map-style
1.12 +Processing" sections below, although developers may wish to read the "Message
1.13 +Exchanges" section for more details of the API concerned, and the "Fork-style
1.14 +Processing" section may be of interest to those with experience of large scale
1.15 +parallel processing systems.
1.16
1.17 Thread-style Processing
1.18 -----------------------
1.19 @@ -148,6 +149,45 @@
1.20 exchange API as described above. However, it permits much simpler and clearer
1.21 code.
1.22
1.23 +Managed Callables
1.24 +-----------------
1.25 +
1.26 +A further simplification of the above convenient use of message exchanges
1.27 +involves the creation of callables (eg. functions) which are automatically
1.28 +monitored by an exchange. We create such a callable by calling the wrap method
1.29 +on an exchange:
1.30 +
1.31 +myfn = exchange.wrap(fn)
1.32 +
1.33 +This callable can then be invoked instead of using the exchange's start method:
1.34 +
1.35 +myfn(arg1, arg2, named1=value1, named2=value2)
1.36 +
1.37 +The exchange's finish method can be used as usual to process incoming data.
1.38 +
1.39 +Map-style Processing
1.40 +--------------------
1.41 +
1.42 +In situations where a callable would normally be used in conjunction with the
1.43 +Python built-in map function, an alternative solution can be adopted by using
1.44 +the pmap function:
1.45 +
1.46 +pprocess.pmap(fn, sequence)
1.47 +
1.48 +Here, the sequence would have to contain elements that each contain the required
1.49 +parameters of the specified callable, fn.
1.50 +
1.51 +Making Existing Functions Parallel
1.52 +----------------------------------
1.53 +
1.54 +In making a program parallel, existing functions which only return results can
1.55 +be manually modified to accept and use channels to communicate results back to
1.56 +the main process. However, a simple alternative is to use the MakeParallel class
1.57 +to provide a wrapper around unmodified functions which will return the results
1.58 +from those functions in the channels provided. For example:
1.59 +
1.60 +fn = pprocess.MakeParallel(originalfn)
1.61 +
1.62 Signals and Waiting
1.63 -------------------
1.64
1.65 @@ -175,7 +215,7 @@
1.66 http://www.greenend.org.uk/rjk/2001/06/poll.html
1.67 """
1.68
1.69 -__version__ = "0.2.5"
1.70 +__version__ = "0.2.6"
1.71
1.72 import os
1.73 import sys
1.74 @@ -187,6 +227,8 @@
1.75 except ImportError:
1.76 import pickle
1.77
1.78 +# Communications.
1.79 +
1.80 class AcknowledgementError(Exception):
1.81 pass
1.82
1.83 @@ -279,6 +321,8 @@
1.84 finally:
1.85 self._send("OK")
1.86
1.87 +# Management of processes and communications.
1.88 +
1.89 class Exchange:
1.90
1.91 """
1.92 @@ -417,7 +461,7 @@
1.93
1.94 # Convenience methods.
1.95
1.96 - def start(self, callable, *args, **kwargs):
1.97 + def start(self, callable, *args, **kw):
1.98
1.99 """
1.100 Using pprocess.start, create a new process for the given 'callable'
1.101 @@ -425,7 +469,96 @@
1.102 created between this process and the created process.
1.103 """
1.104
1.105 - self.add_wait(start(callable, *args, **kwargs))
1.106 + self.add_wait(start(callable, *args, **kw))
1.107 +
1.108 + def manage(self, callable):
1.109 +
1.110 + """
1.111 + Wrap the given 'callable' in an object which can then be called in the
1.112 + same way as 'callable', but with new processes and communications
1.113 + managed automatically.
1.114 + """
1.115 +
1.116 + return ManagedCallable(callable, self)
1.117 +
1.118 +class ManagedCallable:
1.119 +
1.120 + "A callable managed by an exchange."
1.121 +
1.122 + def __init__(self, callable, exchange):
1.123 +
1.124 + """
1.125 + Wrap the given 'callable', using the given 'exchange' to monitor the
1.126 + channels created for communications between this and the created
1.127 + processes.
1.128 + """
1.129 +
1.130 + self.callable = callable
1.131 + self.exchange = exchange
1.132 +
1.133 + def __call__(self, *args, **kw):
1.134 +
1.135 + "Invoke the callable with the supplied arguments."
1.136 +
1.137 + self.exchange.start(self.callable, *args, **kw)
1.138 +
1.139 +# Abstractions and utilities.
1.140 +
1.141 +class Map(Exchange):
1.142 +
1.143 + "An exchange which can be used like the built-in 'map' function."
1.144 +
1.145 + def add(self, channel):
1.146 +
1.147 + "Add the given 'channel' to the exchange."
1.148 +
1.149 + Exchange.add(self, channel)
1.150 + self.channels.append(channel)
1.151 +
1.152 + def __call__(self, callable, sequence):
1.153 +
1.154 + "Invoke 'callable' for each element in the 'sequence'."
1.155 +
1.156 + # Remember the channel addition order to order output.
1.157 +
1.158 + self.channels = []
1.159 + self.results = {}
1.160 +
1.161 + for i in sequence:
1.162 + self.start(callable, *i)
1.163 + self.finish()
1.164 +
1.165 + # NOTE: Could use a generator instead.
1.166 +
1.167 + result = []
1.168 + for channel in self.channels:
1.169 + result.append(self.results[channel])
1.170 + return result
1.171 +
1.172 + def store_data(self, channel):
1.173 +
1.174 + "Accumulate the incoming data, associating results with channels."
1.175 +
1.176 + data = channel.receive()
1.177 + self.results[channel] = data
1.178 +
1.179 +class MakeParallel:
1.180 +
1.181 + "A wrapper around functions making them able to communicate results."
1.182 +
1.183 + def __init__(self, callable):
1.184 +
1.185 + "Initialise the wrapper with the given 'callable'."
1.186 +
1.187 + self.callable = callable
1.188 +
1.189 + def __call__(self, channel, *args, **kw):
1.190 +
1.191 + "Invoke the callable and return its result via the given 'channel'."
1.192 +
1.193 + channel.send(self.callable(*args, **kw))
1.194 +
1.195 +# Utility functions.
1.196
1.197 def create():
1.198
1.199 @@ -446,7 +579,7 @@
1.200 child.close()
1.201 return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0))
1.202
1.203 -def start(callable, *args, **kwargs):
1.204 +def start(callable, *args, **kw):
1.205
1.206 """
1.207 Create a new process which shall start running in the given 'callable'.
1.208 @@ -460,7 +593,7 @@
1.209 if channel.pid == 0:
1.210 try:
1.211 try:
1.212 - callable(channel, *args, **kwargs)
1.213 + callable(channel, *args, **kw)
1.214 except:
1.215 exc_type, exc_value, exc_traceback = sys.exc_info()
1.216 channel.send(exc_value)
1.217 @@ -480,4 +613,11 @@
1.218 except OSError:
1.219 pass
1.220
1.221 +def pmap(callable, sequence):
1.222 +
1.223 + "A parallel version of the built-in map function."
1.224 +
1.225 + mymap = Map()
1.226 + return mymap(callable, sequence)
1.227 +
1.228 # vim: tabstop=4 expandtab shiftwidth=4