2.1 --- a/pprocess.py Thu Jun 19 21:44:53 2008 +0200
2.2 +++ b/pprocess.py Sat Nov 24 00:00:15 2007 +0000
2.3 @@ -18,289 +18,9 @@
2.4
2.5 You should have received a copy of the GNU Lesser General Public License along
2.6 with this program. If not, see <http://www.gnu.org/licenses/>.
2.7 -
2.8 ---------
2.9 -
2.10 -Thread-style Processing
2.11 ------------------------
2.12 -
2.13 -To create new processes to run a function or any callable object, specify the
2.14 -"callable" and any arguments as follows:
2.15 -
2.16 -channel = pprocess.start(fn, arg1, arg2, named1=value1, named2=value2)
2.17 -
2.18 -This returns a channel which can then be used to communicate with the created
2.19 -process. Meanwhile, in the created process, the given callable will be invoked
2.20 -with another channel as its first argument followed by the specified arguments:
2.21 -
2.22 -def fn(channel, arg1, arg2, named1, named2):
2.23 - # Read from and write to the channel.
2.24 - # Return value is ignored.
2.25 - ...
2.26 -
2.27 -Fork-style Processing
2.28 ----------------------
2.29 -
2.30 -To create new processes in a similar way to that employed when using os.fork
2.31 -(ie. the fork system call on various operating systems), use the following
2.32 -method:
2.33 -
2.34 -channel = pprocess.create()
2.35 -if channel.pid == 0:
2.36 - # This code is run by the created process.
2.37 - # Read from and write to the channel to communicate with the
2.38 - # creating/calling process.
2.39 - # An explicit exit of the process may be desirable to prevent the process
2.40 - # from running code which is intended for the creating/calling process.
2.41 - ...
2.42 - pprocess.exit(channel)
2.43 -else:
2.44 - # This code is run by the creating/calling process.
2.45 - # Read from and write to the channel to communicate with the created
2.46 - # process.
2.47 - ...
2.48 -
2.49 -Message Exchanges
2.50 ------------------
2.51 -
2.52 -When creating many processes, each providing results for the consumption of the
2.53 -main process, the collection of those results in an efficient fashion can be
2.54 -problematic: if some processes take longer than others, and if we decide to read
2.55 -from those processes when they are not ready instead of other processes which
2.56 -are ready, the whole activity will take much longer than necessary.
2.57 -
2.58 -One solution to the problem of knowing when to read from channels is to create
2.59 -an Exchange object, optionally initialising it with a list of channels through
2.60 -which data is expected to arrive:
2.61 -
2.62 -exchange = pprocess.Exchange() # populate the exchange later
2.63 -exchange = pprocess.Exchange(channels) # populate the exchange with channels
2.64 -
2.65 -We can add channels to the exchange using the add method:
2.66 -
2.67 -exchange.add(channel)
2.68 -
2.69 -To test whether an exchange is active - that is, whether it is actually
2.70 -monitoring any channels - we can use the active method which returns all
2.71 -channels being monitored by the exchange:
2.72 -
2.73 -channels = exchange.active()
2.74 -
2.75 -We may then check the exchange to see whether any data is ready to be received;
2.76 -for example:
2.77 -
2.78 -for channel in exchange.ready():
2.79 - # Read from and write to the channel.
2.80 - ...
2.81 -
2.82 -If we do not wish to wait indefinitely for a list of channels, we can set a
2.83 -timeout value as an argument to the ready method (as a floating point number
2.84 -specifying the timeout in seconds, where 0 means a non-blocking poll as stated
2.85 -in the select module's select function documentation).
2.86 -
2.87 -Convenient Message Exchanges
2.88 -----------------------------
2.89 -
2.90 -A convenient form of message exchanges can be adopted by defining a subclass of
2.91 -the Exchange class and defining a particular method:
2.92 -
2.93 -class MyExchange(pprocess.Exchange):
2.94 - def store_data(self, channel):
2.95 - data = channel.receive()
2.96 - # Do something with data here.
2.97 -
2.98 -The exact operations performed on the received data might be as simple as
2.99 -storing it on an instance attribute. To make use of the exchange, we would
2.100 -instantiate it as usual:
2.101 -
2.102 -exchange = MyExchange() # populate the exchange later
2.103 -exchange = MyExchange(limit=10) # set a limit for later population
2.104 -
2.105 -The exchange can now be used in a simpler fashion than that shown above. We can
2.106 -add channels as before using the add method, or we can choose to only add
2.107 -channels if the specified limit of channels is not exceeded:
2.108 -
2.109 -exchange.add(channel) # add a channel as normal
2.110 -exchange.add_wait(channel) # add a channel, waiting if the limit would be
2.111 - # exceeded
2.112 -
2.113 -Or we can request that the exchange create a channel on our behalf:
2.114 -
2.115 -channel = exchange.create()
2.116 -
2.117 -We can even start processes and monitor channels without ever handling the
2.118 -channel ourselves:
2.119 -
2.120 -exchange.start(fn, arg1, arg2, named1=value1, named2=value2)
2.121 -
2.122 -We can explicitly wait for "free space" for channels by calling the wait method,
2.123 -although the start and add_wait methods make this less interesting:
2.124 -
2.125 -exchange.wait()
2.126 -
2.127 -Finally, when finishing the computation, we can choose to merely call the finish
2.128 -method and have the remaining data processed automatically:
2.129 -
2.130 -exchange.finish()
2.131 -
2.132 -Clearly, this approach is less flexible but more convenient than the raw message
2.133 -exchange API as described above. However, it permits much simpler and clearer
2.134 -code.
2.135 -
2.136 -Exchanges as Queues
2.137 --------------------
2.138 -
2.139 -Instead of having to subclass the pprocess.Exchange class and to define the
2.140 -store_data method, it might be more desirable to let the exchange manage the
2.141 -communications between created and creating processes and to let the creating
2.142 -process just consume received data as it arrives, without particular regard for
2.143 -the order of the received data - perhaps the creating process has its own way of
2.144 -managing such issues.
2.145 -
2.146 -For such situations, the Queue class may be instantiated and channels added to
2.147 -the queue using the various methods provided:
2.148 -
2.149 -queue = pprocess.Queue(limit=10)
2.150 -channel = queue.create()
2.151 -if channel:
2.152 - # Do some computation.
2.153 - pprocess.exit(channel)
2.154 -
2.155 -The results can then be consumed by treating the queue like an iterator:
2.156 -
2.157 -for result in queue:
2.158 - # Capture each result.
2.159 -
2.160 -This approach does not, of course, require the direct handling of channels. One
2.161 -could instead use the start method on the queue to create processes and to
2.162 -initiate computations (since a queue is merely an enhanced exchange with a
2.163 -specific implementation of the store_data method).
2.164 -
2.165 -Exchanges as Maps
2.166 ------------------
2.167 -
2.168 -Where the above Queue class appears like an attractive solution for the
2.169 -management of the results of computations, but where the order of their
2.170 -consumption by the creating process remains important, the Map class may offer a
2.171 -suitable way of collecting and accessing results:
2.172 -
2.173 -results = pprocess.Map(limit=10)
2.174 -for value in inputs:
2.175 - results.start(fn, args)
2.176 -
2.177 -The results can then be consumed in an order corresponding to the order of the
2.178 -computations which produced them:
2.179 -
2.180 -for result in results:
2.181 - # Process each result.
2.182 -
2.183 -Internally, the Map object records a particular ordering of channels, ensuring
2.184 -that the received results can be mapped to this ordering, and that the results
2.185 -can be made available with this ordering preserved.
2.186 -
2.187 -Managed Callables
2.188 ------------------
2.189 -
2.190 -A further simplification of the above convenient use of message exchanges
2.191 -involves the creation of callables (eg. functions) which are automatically
2.192 -monitored by an exchange. We create such a callable by calling the manage method
2.193 -on an exchange:
2.194 -
2.195 -myfn = exchange.manage(fn)
2.196 -
2.197 -This callable can then be invoked instead of using the exchange's start method:
2.198 -
2.199 -myfn(arg1, arg2, named1=value1, named2=value2)
2.200 -
2.201 -The exchange's finish method can be used as usual to process incoming data.
2.202 -
2.203 -Making Existing Functions Parallel
2.204 -----------------------------------
2.205 -
2.206 -In making a program parallel, existing functions which only return results can
2.207 -be manually modified to accept and use channels to communicate results back to
2.208 -the main process. However, a simple alternative is to use the MakeParallel class
2.209 -to provide a wrapper around unmodified functions which will return the results
2.210 -from those functions in the channels provided. For example:
2.211 -
2.212 -fn = pprocess.MakeParallel(originalfn)
2.213 -
2.214 -Map-style Processing
2.215 ---------------------
2.216 -
2.217 -In situations where a callable would normally be used in conjunction with the
2.218 -Python built-in map function, an alternative solution can be adopted by using
2.219 -the pmap function:
2.220 -
2.221 -pprocess.pmap(fn, sequence)
2.222 -
2.223 -Here, the sequence would have to contain elements that each contain the required
2.224 -parameters of the specified callable, fn. Note that the callable does not need
2.225 -to be a parallel-aware function which has a channel argument: the pmap function
2.226 -automatically wraps the given callable internally.
2.227 -
2.228 -Reusing Processes and Channels
2.229 -------------------------------
2.230 -
2.231 -So far, all parallel computations have been done with newly-created processes.
2.232 -However, this can seem somewhat inefficient, especially if processes are being
2.233 -continually created and destroyed (although if this happens too often, the
2.234 -amount of work done by each process may be too little, anyway). One solution is
2.235 -to retain processes after they have done their work and request that they
2.236 -perform more work for each new parallel task or invocation. To enable the reuse
2.237 -of processes in this way, a special keyword argument may be specified when
2.238 -creating Exchange objects (and subclasses such as Map and Queue). For example:
2.239 -
2.240 -exchange = MyExchange(limit=10, reuse=1) # reuse up to 10 processes
2.241 -
2.242 -Code invoked through such exchanges must be aware of channels and be constructed
2.243 -in such a way that it does not terminate after sending a result back to the
2.244 -creating process. Instead, it should repeatedly wait for subsequent sets of
2.245 -parameters (compatible with those either in the signature of a callable or with
2.246 -the original values read from the channel). Reusable code is terminated when the
2.247 -special value of None is sent from the creating process to the created process,
2.248 -indicating that no more parameters will be sent; this should cause the code to
2.249 -terminate.
2.250 -
2.251 -Making Existing Functions Parallel and Reusable
2.252 ------------------------------------------------
2.253 -
2.254 -An easier way of making reusable code sections for parallel use is to employ the
2.255 -MakeReusable class to wrap an existing callable:
2.256 -
2.257 -fn = pprocess.MakeReusable(originalfn)
2.258 -
2.259 -This wraps the callable in a similar fashion to MakeParallel, but provides the
2.260 -necessary mechanisms described above for reusable code.
2.261 -
2.262 -Signals and Waiting
2.263 --------------------
2.264 -
2.265 -When created/child processes terminate, one would typically want to be informed
2.266 -of such conditions using a signal handler. Unfortunately, Python seems to have
2.267 -issues with restartable reads from file descriptors when interrupted by signals:
2.268 -
2.269 -http://mail.python.org/pipermail/python-dev/2002-September/028572.html
2.270 -http://twistedmatrix.com/bugs/issue733
2.271 -
2.272 -Select and Poll
2.273 ----------------
2.274 -
2.275 -The exact combination of conditions indicating closed pipes remains relatively
2.276 -obscure. Here is a message/thread describing them (in the context of another
2.277 -topic):
2.278 -
2.279 -http://twistedmatrix.com/pipermail/twisted-python/2005-February/009666.html
2.280 -
2.281 -It would seem, from using sockets and from studying the asyncore module, that
2.282 -sockets are more predictable than pipes.
2.283 -
2.284 -Notes about poll implementations can be found here:
2.285 -
2.286 -http://www.greenend.org.uk/rjk/2001/06/poll.html
2.287 """
2.288
2.289 -__version__ = "0.3"
2.290 +__version__ = "0.3.1"
2.291
2.292 import os
2.293 import sys