# HG changeset patch # User paulb # Date 1129050431 0 # Node ID 5d5cb645ac9776311d7ce51874b9996280ed2b0c # Parent b269890969d041dc0d4a4db886e92dc7aea51bc5 [project @ 2005-10-11 17:06:58 by paulb] Changed the name of the module from parallel to pprocess and the name of the package from parallel-module to parallel-pprocess. diff -r b269890969d0 -r 5d5cb645ac97 PKG-INFO --- a/PKG-INFO Thu Jun 19 21:42:43 2008 +0200 +++ b/PKG-INFO Tue Oct 11 17:07:11 2005 +0000 @@ -1,18 +1,18 @@ Metadata-Version: 1.1 Name: parallel -Version: 0.1 +Version: 0.2 Author: Paul Boddie Author-email: paul at boddie org uk Maintainer: Paul Boddie Maintainer-email: paul at boddie org uk -Download-url: http://www.boddie.org.uk/python/downloads/parallel-0.1.tar.gz +Download-url: http://www.boddie.org.uk/python/downloads/parallel-0.2.tar.gz Summary: Elementary parallel programming for Python License: LGPL -Description: The parallel module provides elementary support for parallel +Description: The pprocess module provides elementary support for parallel programming in Python using a fork-based process creation model in conjunction with a channel-based communications model implemented using socketpair and poll. -Keywords: parallel fork socketpair socket channel +Keywords: parallel pprocess fork socketpair socket channel Platform: Any Classifier: Development Status :: 3 - Alpha Classifier: License :: OSI Approved :: GNU Library or Lesser General Public License (LGPL) diff -r b269890969d0 -r 5d5cb645ac97 README.txt --- a/README.txt Thu Jun 19 21:42:43 2008 +0200 +++ b/README.txt Tue Oct 11 17:07:11 2005 +0000 @@ -1,7 +1,7 @@ Introduction ------------ -The parallel module provides elementary support for parallel programming in +The pprocess module provides elementary support for parallel programming in Python using a fork-based process creation model in conjunction with a channel-based communications model implemented using socketpair and poll. @@ -41,10 +41,16 @@ This software depends on standard library features which are stated as being available only on "UNIX"; it has only been tested on a GNU/Linux system. +New in parallel 0.2 (Changes since parallel 0.1) +------------------------------------------------ + + * Changed the name of the included module from parallel to pprocess in order + to avoid naming conflicts with PyParallel. + Release Procedures ------------------ -Update the parallel __version__ attribute. +Update the pprocess __version__ attribute. Change the version number and package filename/directory in the documentation. Update the release notes (see above). Check the release information in the PKG-INFO file. @@ -61,7 +67,7 @@ 2. Make a symbolic link in the distribution's root directory to keep the Debian tools happy: - ln -s packages/debian/python2.4-parallel-module/debian/ + ln -s packages/debian/python2.4-parallel-pprocess/debian/ 3. Run the package builder: diff -r b269890969d0 -r 5d5cb645ac97 packages/debian/python2.4-parallel-module/debian/changelog --- a/packages/debian/python2.4-parallel-module/debian/changelog Thu Jun 19 21:42:43 2008 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,5 +0,0 @@ -parallel-module (0.1-1) unstable; urgency=low - - * New upstream release. - - -- Paul Boddie Wed, 28 Sep 2005 19:18:47 +0200 diff -r b269890969d0 -r 5d5cb645ac97 packages/debian/python2.4-parallel-module/debian/control --- a/packages/debian/python2.4-parallel-module/debian/control Thu Jun 19 21:42:43 2008 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,18 +0,0 @@ -Source: parallel-module -Priority: optional -Maintainer: Paul Boddie -Build-Depends: debhelper (>> 3.0.0) -Standards-Version: 3.5.8 -Section: python - -Package: python2.4-parallel-module -Section: python -Architecture: all -Depends: python2.4 -Description: Elementary parallel programming for Python - The parallel module provides elementary support for parallel - programming in Python using a fork-based process creation model in - conjunction with a channel-based communications model implemented - using socketpair and poll. - . - This package is built for Python 2.4.x. diff -r b269890969d0 -r 5d5cb645ac97 packages/debian/python2.4-parallel-module/debian/copyright --- a/packages/debian/python2.4-parallel-module/debian/copyright Thu Jun 19 21:42:43 2008 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,32 +0,0 @@ -Package creator: - - Paul Boddie - -The origin of the package is: - - http://www.python.org/pypi/parallel - -Package author: - - Paul Boddie - -Copyright: - -Licence Agreement for parallel ------------------------------- - -Copyright (C) 2005 Paul Boddie - -This library is free software; you can redistribute it and/or -modify it under the terms of the GNU Lesser General Public -License as published by the Free Software Foundation; either -version 2.1 of the License, or (at your option) any later version. - -This library is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -Lesser General Public License for more details. - -You should have received a copy of the GNU Lesser General Public -License along with this library; if not, write to the Free Software -Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA diff -r b269890969d0 -r 5d5cb645ac97 packages/debian/python2.4-parallel-module/debian/docs --- a/packages/debian/python2.4-parallel-module/debian/docs Thu Jun 19 21:42:43 2008 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,2 +0,0 @@ -README.txt -docs diff -r b269890969d0 -r 5d5cb645ac97 packages/debian/python2.4-parallel-module/debian/postinst --- a/packages/debian/python2.4-parallel-module/debian/postinst Thu Jun 19 21:42:43 2008 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,46 +0,0 @@ -#! /bin/sh -# postinst script for parallel -# -# see: dh_installdeb(1) - -set -e - -# summary of how this script can be called: -# * `configure' -# * `abort-upgrade' -# * `abort-remove' `in-favour' -# -# * `abort-deconfigure' `in-favour' -# `removing' -# -# for details, see http://www.debian.org/doc/debian-policy/ or -# the debian-policy package -# -# quoting from the policy: -# Any necessary prompting should almost always be confined to the -# post-installation script, and should be protected with a conditional -# so that unnecessary prompting doesn't happen if a package's -# installation fails and the `postinst' is called with `abort-upgrade', -# `abort-remove' or `abort-deconfigure'. - -PACKAGE=python2.4-parallel-module -VERSION=2.4 -LIB="/usr/lib/python$VERSION" - -case "$1" in - configure|abort-upgrade|abort-remove|abort-deconfigure) - /usr/bin/python$VERSION -O -c "import parallel" - /usr/bin/python$VERSION -c "import parallel" - ;; - - *) - echo "postinst called with unknown argument \`$1'" >&2 - exit 1 - ;; -esac - - - -exit 0 - - diff -r b269890969d0 -r 5d5cb645ac97 packages/debian/python2.4-parallel-module/debian/prerm --- a/packages/debian/python2.4-parallel-module/debian/prerm Thu Jun 19 21:42:43 2008 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,24 +0,0 @@ -#! /bin/sh -# prerm script for parallel - -set -e - -PACKAGE=python2.4-parallel-module -VERSION=2.4 -LIB="/usr/lib/python$VERSION" -DIR="$LIB/site-packages" - -case "$1" in - remove|upgrade|failed-upgrade) - rm ${DIR}/parallel.py[co] - ;; - - *) - echo "prerm called with unknown argument \`$1'" >&2 - exit 1 - ;; -esac - - - -exit 0 diff -r b269890969d0 -r 5d5cb645ac97 packages/debian/python2.4-parallel-module/debian/rules --- a/packages/debian/python2.4-parallel-module/debian/rules Thu Jun 19 21:42:43 2008 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,49 +0,0 @@ -#!/usr/bin/make -f -# Sample debian/rules that uses debhelper. -# GNU copyright 1997 to 1999 by Joey Hess. - -# Uncomment this to turn on verbose mode. -#export DH_VERBOSE=1 - -# This is the debhelper compatibility version to use. -export DH_COMPAT=4 - - - -build: build-stamp - /usr/bin/python2.4 setup.py build -build-stamp: - touch build-stamp - -configure: - # Do nothing - -clean: - dh_testdir - dh_testroot - rm -f build-stamp - - -rm -rf build - - dh_clean - -install: build - dh_testdir - dh_testroot - dh_clean -k - /usr/bin/python2.4 setup.py install --no-compile --prefix=$(CURDIR)/debian/python2.4-parallel-module/usr - -# Build architecture-independent files here. -binary-indep: install - dh_testdir - dh_testroot - - dh_installdocs - dh_installdeb - dh_gencontrol - dh_md5sums - dh_builddeb -# We have nothing to do by default. - -binary: binary-indep -.PHONY: build clean binary-indep binary install diff -r b269890969d0 -r 5d5cb645ac97 packages/debian/python2.4-parallel-pprocess/debian/changelog --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/packages/debian/python2.4-parallel-pprocess/debian/changelog Tue Oct 11 17:07:11 2005 +0000 @@ -0,0 +1,11 @@ +parallel-pprocess (0.2-0ubuntu1) unstable; urgency=low + + * Changed the parallel module to pprocess. + + -- Paul Boddie Tue, 11 Oct 2005 18:50:24 +0200 + +parallel-module (0.1-0ubuntu1) unstable; urgency=low + + * New upstream release. + + -- Paul Boddie Wed, 28 Sep 2005 19:18:47 +0200 diff -r b269890969d0 -r 5d5cb645ac97 packages/debian/python2.4-parallel-pprocess/debian/control --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/packages/debian/python2.4-parallel-pprocess/debian/control Tue Oct 11 17:07:11 2005 +0000 @@ -0,0 +1,18 @@ +Source: parallel-pprocess +Priority: optional +Maintainer: Paul Boddie +Build-Depends: debhelper (>> 3.0.0) +Standards-Version: 3.5.8 +Section: python + +Package: python2.4-parallel-pprocess +Section: python +Architecture: all +Depends: python2.4 +Description: Elementary parallel programming for Python + The pprocess module provides elementary support for parallel + programming in Python using a fork-based process creation model in + conjunction with a channel-based communications model implemented + using socketpair and poll. + . + This package is built for Python 2.4.x. diff -r b269890969d0 -r 5d5cb645ac97 packages/debian/python2.4-parallel-pprocess/debian/copyright --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/packages/debian/python2.4-parallel-pprocess/debian/copyright Tue Oct 11 17:07:11 2005 +0000 @@ -0,0 +1,32 @@ +Package creator: + + Paul Boddie + +The origin of the package is: + + http://www.python.org/pypi/parallel + +Package author: + + Paul Boddie + +Copyright: + +Licence Agreement for parallel +------------------------------ + +Copyright (C) 2005 Paul Boddie + +This library is free software; you can redistribute it and/or +modify it under the terms of the GNU Lesser General Public +License as published by the Free Software Foundation; either +version 2.1 of the License, or (at your option) any later version. + +This library is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +Lesser General Public License for more details. + +You should have received a copy of the GNU Lesser General Public +License along with this library; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA diff -r b269890969d0 -r 5d5cb645ac97 packages/debian/python2.4-parallel-pprocess/debian/docs --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/packages/debian/python2.4-parallel-pprocess/debian/docs Tue Oct 11 17:07:11 2005 +0000 @@ -0,0 +1,2 @@ +README.txt +docs diff -r b269890969d0 -r 5d5cb645ac97 packages/debian/python2.4-parallel-pprocess/debian/postinst --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/packages/debian/python2.4-parallel-pprocess/debian/postinst Tue Oct 11 17:07:11 2005 +0000 @@ -0,0 +1,46 @@ +#! /bin/sh +# postinst script for parallel +# +# see: dh_installdeb(1) + +set -e + +# summary of how this script can be called: +# * `configure' +# * `abort-upgrade' +# * `abort-remove' `in-favour' +# +# * `abort-deconfigure' `in-favour' +# `removing' +# +# for details, see http://www.debian.org/doc/debian-policy/ or +# the debian-policy package +# +# quoting from the policy: +# Any necessary prompting should almost always be confined to the +# post-installation script, and should be protected with a conditional +# so that unnecessary prompting doesn't happen if a package's +# installation fails and the `postinst' is called with `abort-upgrade', +# `abort-remove' or `abort-deconfigure'. + +PACKAGE=python2.4-parallel-pprocess +VERSION=2.4 +LIB="/usr/lib/python$VERSION" + +case "$1" in + configure|abort-upgrade|abort-remove|abort-deconfigure) + /usr/bin/python$VERSION -O -c "import pprocess" + /usr/bin/python$VERSION -c "import pprocess" + ;; + + *) + echo "postinst called with unknown argument \`$1'" >&2 + exit 1 + ;; +esac + + + +exit 0 + + diff -r b269890969d0 -r 5d5cb645ac97 packages/debian/python2.4-parallel-pprocess/debian/prerm --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/packages/debian/python2.4-parallel-pprocess/debian/prerm Tue Oct 11 17:07:11 2005 +0000 @@ -0,0 +1,24 @@ +#! /bin/sh +# prerm script for parallel + +set -e + +PACKAGE=python2.4-parallel-pprocess +VERSION=2.4 +LIB="/usr/lib/python$VERSION" +DIR="$LIB/site-packages" + +case "$1" in + remove|upgrade|failed-upgrade) + rm ${DIR}/pprocess.py[co] + ;; + + *) + echo "prerm called with unknown argument \`$1'" >&2 + exit 1 + ;; +esac + + + +exit 0 diff -r b269890969d0 -r 5d5cb645ac97 packages/debian/python2.4-parallel-pprocess/debian/rules --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/packages/debian/python2.4-parallel-pprocess/debian/rules Tue Oct 11 17:07:11 2005 +0000 @@ -0,0 +1,49 @@ +#!/usr/bin/make -f +# Sample debian/rules that uses debhelper. +# GNU copyright 1997 to 1999 by Joey Hess. + +# Uncomment this to turn on verbose mode. +#export DH_VERBOSE=1 + +# This is the debhelper compatibility version to use. +export DH_COMPAT=4 + + + +build: build-stamp + /usr/bin/python2.4 setup.py build +build-stamp: + touch build-stamp + +configure: + # Do nothing + +clean: + dh_testdir + dh_testroot + rm -f build-stamp + + -rm -rf build + + dh_clean + +install: build + dh_testdir + dh_testroot + dh_clean -k + /usr/bin/python2.4 setup.py install --no-compile --prefix=$(CURDIR)/debian/python2.4-parallel-pprocess/usr + +# Build architecture-independent files here. +binary-indep: install + dh_testdir + dh_testroot + + dh_installdocs + dh_installdeb + dh_gencontrol + dh_md5sums + dh_builddeb +# We have nothing to do by default. + +binary: binary-indep +.PHONY: build clean binary-indep binary install diff -r b269890969d0 -r 5d5cb645ac97 parallel.py --- a/parallel.py Thu Jun 19 21:42:43 2008 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,335 +0,0 @@ -#!/usr/bin/env python - -""" -A simple parallel processing API for Python, inspired somewhat by the thread -module, slightly less by pypar, and slightly less still by pypvm. - -Thread-style Processing ------------------------ - -To create new processes to run a function or any callable object, specify the -"callable" and any arguments as follows: - -channel = start(fn, arg1, arg2, named1=value1, named2=value2) - -This returns a channel which can then be used to communicate with the created -process. Meanwhile, in the created process, the given callable will be invoked -with another channel as its first argument followed by the specified arguments: - -def fn(channel, arg1, arg2, named1, named2): - # Read from and write to the channel. - # Return value is ignored. - ... - -Fork-style Processing ---------------------- - -To create new processes in a similar way to that employed when using os.fork -(ie. the fork system call on various operating systems), use the following -method: - -channel = create() -if channel.pid == 0: - # This code is run by the created process. - # Read from and write to the channel to communicate with the - # creating/calling process. - # An explicit exit of the process may be desirable to prevent the process - # from running code which is intended for the creating/calling process. - ... -else: - # This code is run by the creating/calling process. - # Read from and write to the channel to communicate with the created - # process. - ... - -Message Exchanges ------------------ - -When creating many processes, each providing results for the consumption of the -main process, the collection of those results in an efficient fashion can be -problematic: if some processes take longer than others, and if we decide to read -from those processes when they are not ready instead of other processes which -are ready, the whole activity will take much longer than necessary. - -One solution to the problem of knowing when to read from channels is to create -an Exchange object, optionally initialising it with a list of channels through -which data is expected to arrive: - -exchange = Exchange() # populate the exchange later -exchange = Exchange(channels) # populate the exchange with channels - -We can add channels to the exchange using the add method: - -exchange.add(channel) - -To test whether an exchange is active - that is, whether it is actually -monitoring any channels - we can use the active method which returns all -channels being monitored by the exchange: - -channels = exchange.active() - -We may then check the exchange to see whether any data is ready to be received; -for example: - -for channel in exchange.ready(): - # Read from and write to the channel. - ... - -If we do not wish to wait indefinitely for a list of channels, we can set a -timeout value as an argument to the ready method (as a floating point number -specifying the timeout in seconds, where 0 means a non-blocking poll as stated -in the select module's select function documentation). - -Signals and Waiting -------------------- - -When created/child processes terminate, one would typically want to be informed -of such conditions using a signal handler. Unfortunately, Python seems to have -issues with restartable reads from file descriptors when interrupted by signals: - -http://mail.python.org/pipermail/python-dev/2002-September/028572.html -http://twistedmatrix.com/bugs/issue733 - -Select and Poll ---------------- - -The exact combination of conditions indicating closed pipes remains relatively -obscure. Here is a message/thread describing them (in the context of another -topic): - -http://twistedmatrix.com/pipermail/twisted-python/2005-February/009666.html - -It would seem, from using sockets and from studying the asycore module, that -sockets are more predictable than pipes. -""" - -__version__ = "0.1" - -import os -import sys -import select -import socket - -try: - import cPickle as pickle -except ImportError: - import pickle - -class AcknowledgementError(Exception): - pass - -class Channel: - - "A communications channel." - - def __init__(self, pid, read_pipe, write_pipe): - - """ - Initialise the channel with a process identifier 'pid', a 'read_pipe' - from which messages will be received, and a 'write_pipe' into which - messages will be sent. - """ - - self.pid = pid - self.read_pipe = read_pipe - self.write_pipe = write_pipe - self.closed = 0 - - def __del__(self): - - # Since signals don't work well with I/O, we close pipes and wait for - # created processes upon finalisation. - - self.close() - - def close(self): - - "Explicitly close the channel." - - if not self.closed: - self.closed = 1 - self.read_pipe.close() - self.write_pipe.close() - #self.wait(os.WNOHANG) - - def wait(self, options=0): - - "Wait for the created process, if any, to exit." - - if self.pid != 0: - try: - os.waitpid(self.pid, options) - except OSError: - pass - - def _send(self, obj): - - "Send the given object 'obj' through the channel." - - pickle.dump(obj, self.write_pipe) - self.write_pipe.flush() - - def send(self, obj): - - """ - Send the given object 'obj' through the channel. Then wait for an - acknowledgement. (The acknowledgement makes the caller wait, thus - preventing processes from exiting and disrupting the communications - channel and losing data.) - """ - - self._send(obj) - if self._receive() != "OK": - raise AcknowledgementError, obj - - def _receive(self): - - "Receive an object through the channel, returning the object." - - obj = pickle.load(self.read_pipe) - if isinstance(obj, Exception): - raise obj - else: - return obj - - def receive(self): - - """ - Receive an object through the channel, returning the object. Send an - acknowledgement of receipt. (The acknowledgement makes the sender wait, - thus preventing processes from exiting and disrupting the communications - channel and losing data.) - """ - - try: - obj = self._receive() - return obj - finally: - self._send("OK") - -class Exchange: - - """ - A communications exchange that can be used to detect channels which are - ready to communicate. - """ - - def __init__(self, channels=None, autoclose=1): - - """ - Initialise the exchange with an optional list of 'channels'. If the - optional 'autoclose' parameter is set to a false value, channels will - not be closed automatically when they are removed from the exchange - by - default they are closed when removed. - """ - - self.autoclose = autoclose - self.readables = {} - self.poller = select.poll() - for channel in channels or []: - self.add(channel) - - def add(self, channel): - - "Add the given 'channel' to the exchange." - - self.readables[channel.read_pipe.fileno()] = channel - self.poller.register(channel.read_pipe.fileno(), select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR) - - def active(self): - - "Return a list of active channels." - - return self.readables.values() - - def ready(self, timeout=None): - - """ - Wait for a period of time specified by the optional 'timeout' (or until - communication is possible) and return a list of channels which are ready - to be read from. - """ - - fds = self.poller.poll(timeout) - readables = [] - for fd, status in fds: - channel = self.readables[fd] - - # Remove ended/error channels. - - if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR): - self.remove(channel) - - # Record readable channels. - - elif status & select.POLLIN: - readables.append(channel) - - return readables - - def remove(self, channel): - - """ - Remove the given 'channel' from the exchange. - """ - - del self.readables[channel.read_pipe.fileno()] - self.poller.unregister(channel.read_pipe.fileno()) - if self.autoclose: - channel.close() - channel.wait() - -def create(): - - """ - Create a new process, returning a communications channel to both the - creating process and the created process. - """ - - parent, child = socket.socketpair() - for s in [parent, child]: - s.setblocking(1) - - pid = os.fork() - if pid == 0: - parent.close() - return Channel(pid, child.makefile("r"), child.makefile("w")) - else: - child.close() - return Channel(pid, parent.makefile("r"), parent.makefile("w")) - -def start(callable, *args, **kwargs): - - """ - Create a new process which shall start running in the given 'callable'. - Return a communications channel to the creating process, and supply such a - channel to the created process as the 'channel' parameter in the given - 'callable'. Additional arguments to the 'callable' can be given as - additional arguments to this function. - """ - - channel = create() - if channel.pid == 0: - try: - try: - callable(channel, *args, **kwargs) - except: - exc_type, exc_value, exc_traceback = sys.exc_info() - channel.send(exc_value) - finally: - channel.close() - sys.exit(0) - else: - return channel - -def waitall(): - - "Wait for all created processes to terminate." - - try: - while 1: - os.wait() - except OSError: - pass - -# vim: tabstop=4 expandtab shiftwidth=4 diff -r b269890969d0 -r 5d5cb645ac97 pprocess.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pprocess.py Tue Oct 11 17:07:11 2005 +0000 @@ -0,0 +1,335 @@ +#!/usr/bin/env python + +""" +A simple parallel processing API for Python, inspired somewhat by the thread +module, slightly less by pypar, and slightly less still by pypvm. + +Thread-style Processing +----------------------- + +To create new processes to run a function or any callable object, specify the +"callable" and any arguments as follows: + +channel = start(fn, arg1, arg2, named1=value1, named2=value2) + +This returns a channel which can then be used to communicate with the created +process. Meanwhile, in the created process, the given callable will be invoked +with another channel as its first argument followed by the specified arguments: + +def fn(channel, arg1, arg2, named1, named2): + # Read from and write to the channel. + # Return value is ignored. + ... + +Fork-style Processing +--------------------- + +To create new processes in a similar way to that employed when using os.fork +(ie. the fork system call on various operating systems), use the following +method: + +channel = create() +if channel.pid == 0: + # This code is run by the created process. + # Read from and write to the channel to communicate with the + # creating/calling process. + # An explicit exit of the process may be desirable to prevent the process + # from running code which is intended for the creating/calling process. + ... +else: + # This code is run by the creating/calling process. + # Read from and write to the channel to communicate with the created + # process. + ... + +Message Exchanges +----------------- + +When creating many processes, each providing results for the consumption of the +main process, the collection of those results in an efficient fashion can be +problematic: if some processes take longer than others, and if we decide to read +from those processes when they are not ready instead of other processes which +are ready, the whole activity will take much longer than necessary. + +One solution to the problem of knowing when to read from channels is to create +an Exchange object, optionally initialising it with a list of channels through +which data is expected to arrive: + +exchange = Exchange() # populate the exchange later +exchange = Exchange(channels) # populate the exchange with channels + +We can add channels to the exchange using the add method: + +exchange.add(channel) + +To test whether an exchange is active - that is, whether it is actually +monitoring any channels - we can use the active method which returns all +channels being monitored by the exchange: + +channels = exchange.active() + +We may then check the exchange to see whether any data is ready to be received; +for example: + +for channel in exchange.ready(): + # Read from and write to the channel. + ... + +If we do not wish to wait indefinitely for a list of channels, we can set a +timeout value as an argument to the ready method (as a floating point number +specifying the timeout in seconds, where 0 means a non-blocking poll as stated +in the select module's select function documentation). + +Signals and Waiting +------------------- + +When created/child processes terminate, one would typically want to be informed +of such conditions using a signal handler. Unfortunately, Python seems to have +issues with restartable reads from file descriptors when interrupted by signals: + +http://mail.python.org/pipermail/python-dev/2002-September/028572.html +http://twistedmatrix.com/bugs/issue733 + +Select and Poll +--------------- + +The exact combination of conditions indicating closed pipes remains relatively +obscure. Here is a message/thread describing them (in the context of another +topic): + +http://twistedmatrix.com/pipermail/twisted-python/2005-February/009666.html + +It would seem, from using sockets and from studying the asycore module, that +sockets are more predictable than pipes. +""" + +__version__ = "0.2" + +import os +import sys +import select +import socket + +try: + import cPickle as pickle +except ImportError: + import pickle + +class AcknowledgementError(Exception): + pass + +class Channel: + + "A communications channel." + + def __init__(self, pid, read_pipe, write_pipe): + + """ + Initialise the channel with a process identifier 'pid', a 'read_pipe' + from which messages will be received, and a 'write_pipe' into which + messages will be sent. + """ + + self.pid = pid + self.read_pipe = read_pipe + self.write_pipe = write_pipe + self.closed = 0 + + def __del__(self): + + # Since signals don't work well with I/O, we close pipes and wait for + # created processes upon finalisation. + + self.close() + + def close(self): + + "Explicitly close the channel." + + if not self.closed: + self.closed = 1 + self.read_pipe.close() + self.write_pipe.close() + #self.wait(os.WNOHANG) + + def wait(self, options=0): + + "Wait for the created process, if any, to exit." + + if self.pid != 0: + try: + os.waitpid(self.pid, options) + except OSError: + pass + + def _send(self, obj): + + "Send the given object 'obj' through the channel." + + pickle.dump(obj, self.write_pipe) + self.write_pipe.flush() + + def send(self, obj): + + """ + Send the given object 'obj' through the channel. Then wait for an + acknowledgement. (The acknowledgement makes the caller wait, thus + preventing processes from exiting and disrupting the communications + channel and losing data.) + """ + + self._send(obj) + if self._receive() != "OK": + raise AcknowledgementError, obj + + def _receive(self): + + "Receive an object through the channel, returning the object." + + obj = pickle.load(self.read_pipe) + if isinstance(obj, Exception): + raise obj + else: + return obj + + def receive(self): + + """ + Receive an object through the channel, returning the object. Send an + acknowledgement of receipt. (The acknowledgement makes the sender wait, + thus preventing processes from exiting and disrupting the communications + channel and losing data.) + """ + + try: + obj = self._receive() + return obj + finally: + self._send("OK") + +class Exchange: + + """ + A communications exchange that can be used to detect channels which are + ready to communicate. + """ + + def __init__(self, channels=None, autoclose=1): + + """ + Initialise the exchange with an optional list of 'channels'. If the + optional 'autoclose' parameter is set to a false value, channels will + not be closed automatically when they are removed from the exchange - by + default they are closed when removed. + """ + + self.autoclose = autoclose + self.readables = {} + self.poller = select.poll() + for channel in channels or []: + self.add(channel) + + def add(self, channel): + + "Add the given 'channel' to the exchange." + + self.readables[channel.read_pipe.fileno()] = channel + self.poller.register(channel.read_pipe.fileno(), select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR) + + def active(self): + + "Return a list of active channels." + + return self.readables.values() + + def ready(self, timeout=None): + + """ + Wait for a period of time specified by the optional 'timeout' (or until + communication is possible) and return a list of channels which are ready + to be read from. + """ + + fds = self.poller.poll(timeout) + readables = [] + for fd, status in fds: + channel = self.readables[fd] + + # Remove ended/error channels. + + if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR): + self.remove(channel) + + # Record readable channels. + + elif status & select.POLLIN: + readables.append(channel) + + return readables + + def remove(self, channel): + + """ + Remove the given 'channel' from the exchange. + """ + + del self.readables[channel.read_pipe.fileno()] + self.poller.unregister(channel.read_pipe.fileno()) + if self.autoclose: + channel.close() + channel.wait() + +def create(): + + """ + Create a new process, returning a communications channel to both the + creating process and the created process. + """ + + parent, child = socket.socketpair() + for s in [parent, child]: + s.setblocking(1) + + pid = os.fork() + if pid == 0: + parent.close() + return Channel(pid, child.makefile("r"), child.makefile("w")) + else: + child.close() + return Channel(pid, parent.makefile("r"), parent.makefile("w")) + +def start(callable, *args, **kwargs): + + """ + Create a new process which shall start running in the given 'callable'. + Return a communications channel to the creating process, and supply such a + channel to the created process as the 'channel' parameter in the given + 'callable'. Additional arguments to the 'callable' can be given as + additional arguments to this function. + """ + + channel = create() + if channel.pid == 0: + try: + try: + callable(channel, *args, **kwargs) + except: + exc_type, exc_value, exc_traceback = sys.exc_info() + channel.send(exc_value) + finally: + channel.close() + sys.exit(0) + else: + return channel + +def waitall(): + + "Wait for all created processes to terminate." + + try: + while 1: + os.wait() + except OSError: + pass + +# vim: tabstop=4 expandtab shiftwidth=4 diff -r b269890969d0 -r 5d5cb645ac97 setup.py --- a/setup.py Thu Jun 19 21:42:43 2008 +0200 +++ b/setup.py Tue Oct 11 17:07:11 2005 +0000 @@ -2,7 +2,7 @@ from distutils.core import setup -import parallel +import pprocess setup( name = "parallel", @@ -10,6 +10,6 @@ author = "Paul Boddie", author_email = "paul@boddie.org.uk", url = "http://www.python.org/pypi/parallel", - version = parallel.__version__, - py_modules = ["parallel"] + version = pprocess.__version__, + py_modules = ["pprocess"] ) diff -r b269890969d0 -r 5d5cb645ac97 tests/create_loop.py --- a/tests/create_loop.py Thu Jun 19 21:42:43 2008 +0200 +++ b/tests/create_loop.py Tue Oct 11 17:07:11 2005 +0000 @@ -1,6 +1,6 @@ #!/usr/bin/env python -from parallel import create +from pprocess import create limit = 100 channel = create() diff -r b269890969d0 -r 5d5cb645ac97 tests/start_indexer.py --- a/tests/start_indexer.py Thu Jun 19 21:42:43 2008 +0200 +++ b/tests/start_indexer.py Tue Oct 11 17:07:11 2005 +0000 @@ -2,7 +2,7 @@ "A simple text indexing activity." -from parallel import start, Exchange +from pprocess import start, Exchange from Dict import Indexer, Searcher, Parser import os diff -r b269890969d0 -r 5d5cb645ac97 tests/start_loop.py --- a/tests/start_loop.py Thu Jun 19 21:42:43 2008 +0200 +++ b/tests/start_loop.py Tue Oct 11 17:07:11 2005 +0000 @@ -1,6 +1,6 @@ #!/usr/bin/env python -from parallel import start +from pprocess import start def loop(channel, limit): print "loop to", limit