# HG changeset patch # User paulb # Date 1127771884 0 # Node ID d5bb6c75e31fdce6ceecc313859a2cf53464d190 # Parent da2bd6447bc067ba374c1790690be51970c2e617 [project @ 2005-09-26 21:58:04 by paulb] Added message exchange support for efficient polling of channels. Added signal support for proper tidying of child processes. Added more documentation. diff -r da2bd6447bc0 -r d5bb6c75e31f parallel.py --- a/parallel.py Thu Jun 19 21:42:02 2008 +0200 +++ b/parallel.py Mon Sep 26 21:58:04 2005 +0000 @@ -4,6 +4,9 @@ A simple parallel processing API for Python, inspired somewhat by the thread module, slightly less by pypar, and slightly less still by pypvm. +Thread-style Processing +----------------------- + To create new processes to run a function or any callable object, specify the "callable" and any arguments as follows: @@ -14,10 +17,13 @@ with another channel as its first argument followed by the specified arguments: def fn(channel, arg1, arg2, named1, named2): - # Read to and write from the channel. + # Read from and write to the channel. # Return value is ignored. ... +Fork-style Processing +--------------------- + To create new processes in a similar way to that employed when using os.fork (ie. the fork system call on various operating systems), use the following method: @@ -25,20 +31,49 @@ channel = create() if channel.pid == 0: # This code is run by the created process. - # Read to and write from the channel to communicate with the + # Read from and write to the channel to communicate with the # creating/calling process. # An explicit exit of the process may be desirable to prevent the process # from running code which is intended for the creating/calling process. ... else: # This code is run by the creating/calling process. - # Read to and write from the channel to communicate with the created + # Read from and write to the channel to communicate with the created # process. ... + +Message Exchanges +----------------- + +When creating many processes, each providing results for the consumption of the +main process, the collection of those results in an efficient fashion can be +problematic: if some processes take longer than others, and if we decide to read +from those processes when they are not ready instead of other processes which +are ready, the whole activity will take much longer than necessary. + +One solution to the problem of knowing when to read from channels is to create +an Exchange object, initialising it with a list of channels through which data +is expected to arrive: + +exchange = Exchange(channels) + +We may then check the exchange to see whether any data is ready to be received; +for example: + +for channel in exchange.ready(): + # Read from and write to the channel. + ... + +If we do not wish to wait indefinitely for a list of channels, we can set a +timeout value as an argument to the ready method (as a floating point number +specifying the timeout in seconds, where 0 means a non-blocking poll as stated +in the select module's select function documentation). """ import os import sys +from select import select +from signal import signal, SIGCHLD try: import cPickle as pickle @@ -78,6 +113,38 @@ else: return obj +class Exchange: + + """ + A communications exchange that can be used to detect channels which are + ready to communicate. + """ + + def __init__(self, channels): + + "Initialise the exchange with the given 'channels'." + + self.readables = {} + for channel in channels: + self.readables[channel.read_pipe] = channel + + def ready(self, timeout=None): + + """ + Wait for a period of time specified by the optional 'timeout' (or until + communication is possible) and return a list of channels which are ready + to be read from. + """ + + if timeout is not None: + t = select(self.readables.keys(), [], [], timeout) + else: + t = select(self.readables.keys(), [], []) + + readable_fds, writable_fds, exceptional_fds = t + readable = [self.readables[fd] for fd in readable_fds] + return readable + def create(): """ @@ -117,4 +184,11 @@ else: return channel +# Define and install a handler which waits for terminated child processes. + +def handler(number, frame): + os.wait() + +signal(SIGCHLD, handler) + # vim: tabstop=4 expandtab shiftwidth=4