1.1 --- a/PKG-INFO Thu Jun 19 21:42:43 2008 +0200
1.2 +++ b/PKG-INFO Tue Oct 11 17:07:11 2005 +0000
1.3 @@ -1,18 +1,18 @@
1.4 Metadata-Version: 1.1
1.5 Name: parallel
1.6 -Version: 0.1
1.7 +Version: 0.2
1.8 Author: Paul Boddie
1.9 Author-email: paul at boddie org uk
1.10 Maintainer: Paul Boddie
1.11 Maintainer-email: paul at boddie org uk
1.12 -Download-url: http://www.boddie.org.uk/python/downloads/parallel-0.1.tar.gz
1.13 +Download-url: http://www.boddie.org.uk/python/downloads/parallel-0.2.tar.gz
1.14 Summary: Elementary parallel programming for Python
1.15 License: LGPL
1.16 -Description: The parallel module provides elementary support for parallel
1.17 +Description: The pprocess module provides elementary support for parallel
1.18 programming in Python using a fork-based process creation model in
1.19 conjunction with a channel-based communications model implemented
1.20 using socketpair and poll.
1.21 -Keywords: parallel fork socketpair socket channel
1.22 +Keywords: parallel pprocess fork socketpair socket channel
1.23 Platform: Any
1.24 Classifier: Development Status :: 3 - Alpha
1.25 Classifier: License :: OSI Approved :: GNU Library or Lesser General Public License (LGPL)
2.1 --- a/README.txt Thu Jun 19 21:42:43 2008 +0200
2.2 +++ b/README.txt Tue Oct 11 17:07:11 2005 +0000
2.3 @@ -1,7 +1,7 @@
2.4 Introduction
2.5 ------------
2.6
2.7 -The parallel module provides elementary support for parallel programming in
2.8 +The pprocess module provides elementary support for parallel programming in
2.9 Python using a fork-based process creation model in conjunction with a
2.10 channel-based communications model implemented using socketpair and poll.
2.11
2.12 @@ -41,10 +41,16 @@
2.13 This software depends on standard library features which are stated as being
2.14 available only on "UNIX"; it has only been tested on a GNU/Linux system.
2.15
2.16 +New in parallel 0.2 (Changes since parallel 0.1)
2.17 +------------------------------------------------
2.18 +
2.19 + * Changed the name of the included module from parallel to pprocess in order
2.20 + to avoid naming conflicts with PyParallel.
2.21 +
2.22 Release Procedures
2.23 ------------------
2.24
2.25 -Update the parallel __version__ attribute.
2.26 +Update the pprocess __version__ attribute.
2.27 Change the version number and package filename/directory in the documentation.
2.28 Update the release notes (see above).
2.29 Check the release information in the PKG-INFO file.
2.30 @@ -61,7 +67,7 @@
2.31 2. Make a symbolic link in the distribution's root directory to keep the
2.32 Debian tools happy:
2.33
2.34 - ln -s packages/debian/python2.4-parallel-module/debian/
2.35 + ln -s packages/debian/python2.4-parallel-pprocess/debian/
2.36
2.37 3. Run the package builder:
2.38
3.1 --- a/packages/debian/python2.4-parallel-module/debian/changelog Thu Jun 19 21:42:43 2008 +0200
3.2 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000
3.3 @@ -1,5 +0,0 @@
3.4 -parallel-module (0.1-1) unstable; urgency=low
3.5 -
3.6 - * New upstream release.
3.7 -
3.8 - -- Paul Boddie <paul@boddie.org.uk> Wed, 28 Sep 2005 19:18:47 +0200
4.1 --- a/packages/debian/python2.4-parallel-module/debian/control Thu Jun 19 21:42:43 2008 +0200
4.2 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000
4.3 @@ -1,18 +0,0 @@
4.4 -Source: parallel-module
4.5 -Priority: optional
4.6 -Maintainer: Paul Boddie <paul@boddie.org.uk>
4.7 -Build-Depends: debhelper (>> 3.0.0)
4.8 -Standards-Version: 3.5.8
4.9 -Section: python
4.10 -
4.11 -Package: python2.4-parallel-module
4.12 -Section: python
4.13 -Architecture: all
4.14 -Depends: python2.4
4.15 -Description: Elementary parallel programming for Python
4.16 - The parallel module provides elementary support for parallel
4.17 - programming in Python using a fork-based process creation model in
4.18 - conjunction with a channel-based communications model implemented
4.19 - using socketpair and poll.
4.20 - .
4.21 - This package is built for Python 2.4.x.
5.1 --- a/packages/debian/python2.4-parallel-module/debian/copyright Thu Jun 19 21:42:43 2008 +0200
5.2 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000
5.3 @@ -1,32 +0,0 @@
5.4 -Package creator:
5.5 -
5.6 - Paul Boddie <paul@boddie.org.uk>
5.7 -
5.8 -The origin of the package is:
5.9 -
5.10 - http://www.python.org/pypi/parallel
5.11 -
5.12 -Package author:
5.13 -
5.14 - Paul Boddie <paul@boddie.org.uk>
5.15 -
5.16 -Copyright:
5.17 -
5.18 -Licence Agreement for parallel
5.19 -------------------------------
5.20 -
5.21 -Copyright (C) 2005 Paul Boddie <paul@boddie.org.uk>
5.22 -
5.23 -This library is free software; you can redistribute it and/or
5.24 -modify it under the terms of the GNU Lesser General Public
5.25 -License as published by the Free Software Foundation; either
5.26 -version 2.1 of the License, or (at your option) any later version.
5.27 -
5.28 -This library is distributed in the hope that it will be useful,
5.29 -but WITHOUT ANY WARRANTY; without even the implied warranty of
5.30 -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
5.31 -Lesser General Public License for more details.
5.32 -
5.33 -You should have received a copy of the GNU Lesser General Public
5.34 -License along with this library; if not, write to the Free Software
5.35 -Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
6.1 --- a/packages/debian/python2.4-parallel-module/debian/docs Thu Jun 19 21:42:43 2008 +0200
6.2 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000
6.3 @@ -1,2 +0,0 @@
6.4 -README.txt
6.5 -docs
7.1 --- a/packages/debian/python2.4-parallel-module/debian/postinst Thu Jun 19 21:42:43 2008 +0200
7.2 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000
7.3 @@ -1,46 +0,0 @@
7.4 -#! /bin/sh
7.5 -# postinst script for parallel
7.6 -#
7.7 -# see: dh_installdeb(1)
7.8 -
7.9 -set -e
7.10 -
7.11 -# summary of how this script can be called:
7.12 -# * <postinst> `configure' <most-recently-configured-version>
7.13 -# * <old-postinst> `abort-upgrade' <new version>
7.14 -# * <conflictor's-postinst> `abort-remove' `in-favour' <package>
7.15 -# <new-version>
7.16 -# * <deconfigured's-postinst> `abort-deconfigure' `in-favour'
7.17 -# <failed-install-package> <version> `removing'
7.18 -# <conflicting-package> <version>
7.19 -# for details, see http://www.debian.org/doc/debian-policy/ or
7.20 -# the debian-policy package
7.21 -#
7.22 -# quoting from the policy:
7.23 -# Any necessary prompting should almost always be confined to the
7.24 -# post-installation script, and should be protected with a conditional
7.25 -# so that unnecessary prompting doesn't happen if a package's
7.26 -# installation fails and the `postinst' is called with `abort-upgrade',
7.27 -# `abort-remove' or `abort-deconfigure'.
7.28 -
7.29 -PACKAGE=python2.4-parallel-module
7.30 -VERSION=2.4
7.31 -LIB="/usr/lib/python$VERSION"
7.32 -
7.33 -case "$1" in
7.34 - configure|abort-upgrade|abort-remove|abort-deconfigure)
7.35 - /usr/bin/python$VERSION -O -c "import parallel"
7.36 - /usr/bin/python$VERSION -c "import parallel"
7.37 - ;;
7.38 -
7.39 - *)
7.40 - echo "postinst called with unknown argument \`$1'" >&2
7.41 - exit 1
7.42 - ;;
7.43 -esac
7.44 -
7.45 -
7.46 -
7.47 -exit 0
7.48 -
7.49 -
8.1 --- a/packages/debian/python2.4-parallel-module/debian/prerm Thu Jun 19 21:42:43 2008 +0200
8.2 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000
8.3 @@ -1,24 +0,0 @@
8.4 -#! /bin/sh
8.5 -# prerm script for parallel
8.6 -
8.7 -set -e
8.8 -
8.9 -PACKAGE=python2.4-parallel-module
8.10 -VERSION=2.4
8.11 -LIB="/usr/lib/python$VERSION"
8.12 -DIR="$LIB/site-packages"
8.13 -
8.14 -case "$1" in
8.15 - remove|upgrade|failed-upgrade)
8.16 - rm ${DIR}/parallel.py[co]
8.17 - ;;
8.18 -
8.19 - *)
8.20 - echo "prerm called with unknown argument \`$1'" >&2
8.21 - exit 1
8.22 - ;;
8.23 -esac
8.24 -
8.25 -
8.26 -
8.27 -exit 0
9.1 --- a/packages/debian/python2.4-parallel-module/debian/rules Thu Jun 19 21:42:43 2008 +0200
9.2 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000
9.3 @@ -1,49 +0,0 @@
9.4 -#!/usr/bin/make -f
9.5 -# Sample debian/rules that uses debhelper.
9.6 -# GNU copyright 1997 to 1999 by Joey Hess.
9.7 -
9.8 -# Uncomment this to turn on verbose mode.
9.9 -#export DH_VERBOSE=1
9.10 -
9.11 -# This is the debhelper compatibility version to use.
9.12 -export DH_COMPAT=4
9.13 -
9.14 -
9.15 -
9.16 -build: build-stamp
9.17 - /usr/bin/python2.4 setup.py build
9.18 -build-stamp:
9.19 - touch build-stamp
9.20 -
9.21 -configure:
9.22 - # Do nothing
9.23 -
9.24 -clean:
9.25 - dh_testdir
9.26 - dh_testroot
9.27 - rm -f build-stamp
9.28 -
9.29 - -rm -rf build
9.30 -
9.31 - dh_clean
9.32 -
9.33 -install: build
9.34 - dh_testdir
9.35 - dh_testroot
9.36 - dh_clean -k
9.37 - /usr/bin/python2.4 setup.py install --no-compile --prefix=$(CURDIR)/debian/python2.4-parallel-module/usr
9.38 -
9.39 -# Build architecture-independent files here.
9.40 -binary-indep: install
9.41 - dh_testdir
9.42 - dh_testroot
9.43 -
9.44 - dh_installdocs
9.45 - dh_installdeb
9.46 - dh_gencontrol
9.47 - dh_md5sums
9.48 - dh_builddeb
9.49 -# We have nothing to do by default.
9.50 -
9.51 -binary: binary-indep
9.52 -.PHONY: build clean binary-indep binary install
10.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
10.2 +++ b/packages/debian/python2.4-parallel-pprocess/debian/changelog Tue Oct 11 17:07:11 2005 +0000
10.3 @@ -0,0 +1,11 @@
10.4 +parallel-pprocess (0.2-0ubuntu1) unstable; urgency=low
10.5 +
10.6 + * Changed the parallel module to pprocess.
10.7 +
10.8 + -- Paul Boddie <paulb@boddie.org.uk> Tue, 11 Oct 2005 18:50:24 +0200
10.9 +
10.10 +parallel-module (0.1-0ubuntu1) unstable; urgency=low
10.11 +
10.12 + * New upstream release.
10.13 +
10.14 + -- Paul Boddie <paul@boddie.org.uk> Wed, 28 Sep 2005 19:18:47 +0200
11.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
11.2 +++ b/packages/debian/python2.4-parallel-pprocess/debian/control Tue Oct 11 17:07:11 2005 +0000
11.3 @@ -0,0 +1,18 @@
11.4 +Source: parallel-pprocess
11.5 +Priority: optional
11.6 +Maintainer: Paul Boddie <paul@boddie.org.uk>
11.7 +Build-Depends: debhelper (>> 3.0.0)
11.8 +Standards-Version: 3.5.8
11.9 +Section: python
11.10 +
11.11 +Package: python2.4-parallel-pprocess
11.12 +Section: python
11.13 +Architecture: all
11.14 +Depends: python2.4
11.15 +Description: Elementary parallel programming for Python
11.16 + The pprocess module provides elementary support for parallel
11.17 + programming in Python using a fork-based process creation model in
11.18 + conjunction with a channel-based communications model implemented
11.19 + using socketpair and poll.
11.20 + .
11.21 + This package is built for Python 2.4.x.
12.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
12.2 +++ b/packages/debian/python2.4-parallel-pprocess/debian/copyright Tue Oct 11 17:07:11 2005 +0000
12.3 @@ -0,0 +1,32 @@
12.4 +Package creator:
12.5 +
12.6 + Paul Boddie <paul@boddie.org.uk>
12.7 +
12.8 +The origin of the package is:
12.9 +
12.10 + http://www.python.org/pypi/parallel
12.11 +
12.12 +Package author:
12.13 +
12.14 + Paul Boddie <paul@boddie.org.uk>
12.15 +
12.16 +Copyright:
12.17 +
12.18 +Licence Agreement for parallel
12.19 +------------------------------
12.20 +
12.21 +Copyright (C) 2005 Paul Boddie <paul@boddie.org.uk>
12.22 +
12.23 +This library is free software; you can redistribute it and/or
12.24 +modify it under the terms of the GNU Lesser General Public
12.25 +License as published by the Free Software Foundation; either
12.26 +version 2.1 of the License, or (at your option) any later version.
12.27 +
12.28 +This library is distributed in the hope that it will be useful,
12.29 +but WITHOUT ANY WARRANTY; without even the implied warranty of
12.30 +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12.31 +Lesser General Public License for more details.
12.32 +
12.33 +You should have received a copy of the GNU Lesser General Public
12.34 +License along with this library; if not, write to the Free Software
12.35 +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
13.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
13.2 +++ b/packages/debian/python2.4-parallel-pprocess/debian/docs Tue Oct 11 17:07:11 2005 +0000
13.3 @@ -0,0 +1,2 @@
13.4 +README.txt
13.5 +docs
14.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
14.2 +++ b/packages/debian/python2.4-parallel-pprocess/debian/postinst Tue Oct 11 17:07:11 2005 +0000
14.3 @@ -0,0 +1,46 @@
14.4 +#! /bin/sh
14.5 +# postinst script for parallel
14.6 +#
14.7 +# see: dh_installdeb(1)
14.8 +
14.9 +set -e
14.10 +
14.11 +# summary of how this script can be called:
14.12 +# * <postinst> `configure' <most-recently-configured-version>
14.13 +# * <old-postinst> `abort-upgrade' <new version>
14.14 +# * <conflictor's-postinst> `abort-remove' `in-favour' <package>
14.15 +# <new-version>
14.16 +# * <deconfigured's-postinst> `abort-deconfigure' `in-favour'
14.17 +# <failed-install-package> <version> `removing'
14.18 +# <conflicting-package> <version>
14.19 +# for details, see http://www.debian.org/doc/debian-policy/ or
14.20 +# the debian-policy package
14.21 +#
14.22 +# quoting from the policy:
14.23 +# Any necessary prompting should almost always be confined to the
14.24 +# post-installation script, and should be protected with a conditional
14.25 +# so that unnecessary prompting doesn't happen if a package's
14.26 +# installation fails and the `postinst' is called with `abort-upgrade',
14.27 +# `abort-remove' or `abort-deconfigure'.
14.28 +
14.29 +PACKAGE=python2.4-parallel-pprocess
14.30 +VERSION=2.4
14.31 +LIB="/usr/lib/python$VERSION"
14.32 +
14.33 +case "$1" in
14.34 + configure|abort-upgrade|abort-remove|abort-deconfigure)
14.35 + /usr/bin/python$VERSION -O -c "import pprocess"
14.36 + /usr/bin/python$VERSION -c "import pprocess"
14.37 + ;;
14.38 +
14.39 + *)
14.40 + echo "postinst called with unknown argument \`$1'" >&2
14.41 + exit 1
14.42 + ;;
14.43 +esac
14.44 +
14.45 +
14.46 +
14.47 +exit 0
14.48 +
14.49 +
15.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
15.2 +++ b/packages/debian/python2.4-parallel-pprocess/debian/prerm Tue Oct 11 17:07:11 2005 +0000
15.3 @@ -0,0 +1,24 @@
15.4 +#! /bin/sh
15.5 +# prerm script for parallel
15.6 +
15.7 +set -e
15.8 +
15.9 +PACKAGE=python2.4-parallel-pprocess
15.10 +VERSION=2.4
15.11 +LIB="/usr/lib/python$VERSION"
15.12 +DIR="$LIB/site-packages"
15.13 +
15.14 +case "$1" in
15.15 + remove|upgrade|failed-upgrade)
15.16 + rm ${DIR}/pprocess.py[co]
15.17 + ;;
15.18 +
15.19 + *)
15.20 + echo "prerm called with unknown argument \`$1'" >&2
15.21 + exit 1
15.22 + ;;
15.23 +esac
15.24 +
15.25 +
15.26 +
15.27 +exit 0
16.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
16.2 +++ b/packages/debian/python2.4-parallel-pprocess/debian/rules Tue Oct 11 17:07:11 2005 +0000
16.3 @@ -0,0 +1,49 @@
16.4 +#!/usr/bin/make -f
16.5 +# Sample debian/rules that uses debhelper.
16.6 +# GNU copyright 1997 to 1999 by Joey Hess.
16.7 +
16.8 +# Uncomment this to turn on verbose mode.
16.9 +#export DH_VERBOSE=1
16.10 +
16.11 +# This is the debhelper compatibility version to use.
16.12 +export DH_COMPAT=4
16.13 +
16.14 +
16.15 +
16.16 +build: build-stamp
16.17 + /usr/bin/python2.4 setup.py build
16.18 +build-stamp:
16.19 + touch build-stamp
16.20 +
16.21 +configure:
16.22 + # Do nothing
16.23 +
16.24 +clean:
16.25 + dh_testdir
16.26 + dh_testroot
16.27 + rm -f build-stamp
16.28 +
16.29 + -rm -rf build
16.30 +
16.31 + dh_clean
16.32 +
16.33 +install: build
16.34 + dh_testdir
16.35 + dh_testroot
16.36 + dh_clean -k
16.37 + /usr/bin/python2.4 setup.py install --no-compile --prefix=$(CURDIR)/debian/python2.4-parallel-pprocess/usr
16.38 +
16.39 +# Build architecture-independent files here.
16.40 +binary-indep: install
16.41 + dh_testdir
16.42 + dh_testroot
16.43 +
16.44 + dh_installdocs
16.45 + dh_installdeb
16.46 + dh_gencontrol
16.47 + dh_md5sums
16.48 + dh_builddeb
16.49 +# We have nothing to do by default.
16.50 +
16.51 +binary: binary-indep
16.52 +.PHONY: build clean binary-indep binary install
17.1 --- a/parallel.py Thu Jun 19 21:42:43 2008 +0200
17.2 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000
17.3 @@ -1,335 +0,0 @@
17.4 -#!/usr/bin/env python
17.5 -
17.6 -"""
17.7 -A simple parallel processing API for Python, inspired somewhat by the thread
17.8 -module, slightly less by pypar, and slightly less still by pypvm.
17.9 -
17.10 -Thread-style Processing
17.11 ------------------------
17.12 -
17.13 -To create new processes to run a function or any callable object, specify the
17.14 -"callable" and any arguments as follows:
17.15 -
17.16 -channel = start(fn, arg1, arg2, named1=value1, named2=value2)
17.17 -
17.18 -This returns a channel which can then be used to communicate with the created
17.19 -process. Meanwhile, in the created process, the given callable will be invoked
17.20 -with another channel as its first argument followed by the specified arguments:
17.21 -
17.22 -def fn(channel, arg1, arg2, named1, named2):
17.23 - # Read from and write to the channel.
17.24 - # Return value is ignored.
17.25 - ...
17.26 -
17.27 -Fork-style Processing
17.28 ----------------------
17.29 -
17.30 -To create new processes in a similar way to that employed when using os.fork
17.31 -(ie. the fork system call on various operating systems), use the following
17.32 -method:
17.33 -
17.34 -channel = create()
17.35 -if channel.pid == 0:
17.36 - # This code is run by the created process.
17.37 - # Read from and write to the channel to communicate with the
17.38 - # creating/calling process.
17.39 - # An explicit exit of the process may be desirable to prevent the process
17.40 - # from running code which is intended for the creating/calling process.
17.41 - ...
17.42 -else:
17.43 - # This code is run by the creating/calling process.
17.44 - # Read from and write to the channel to communicate with the created
17.45 - # process.
17.46 - ...
17.47 -
17.48 -Message Exchanges
17.49 ------------------
17.50 -
17.51 -When creating many processes, each providing results for the consumption of the
17.52 -main process, the collection of those results in an efficient fashion can be
17.53 -problematic: if some processes take longer than others, and if we decide to read
17.54 -from those processes when they are not ready instead of other processes which
17.55 -are ready, the whole activity will take much longer than necessary.
17.56 -
17.57 -One solution to the problem of knowing when to read from channels is to create
17.58 -an Exchange object, optionally initialising it with a list of channels through
17.59 -which data is expected to arrive:
17.60 -
17.61 -exchange = Exchange() # populate the exchange later
17.62 -exchange = Exchange(channels) # populate the exchange with channels
17.63 -
17.64 -We can add channels to the exchange using the add method:
17.65 -
17.66 -exchange.add(channel)
17.67 -
17.68 -To test whether an exchange is active - that is, whether it is actually
17.69 -monitoring any channels - we can use the active method which returns all
17.70 -channels being monitored by the exchange:
17.71 -
17.72 -channels = exchange.active()
17.73 -
17.74 -We may then check the exchange to see whether any data is ready to be received;
17.75 -for example:
17.76 -
17.77 -for channel in exchange.ready():
17.78 - # Read from and write to the channel.
17.79 - ...
17.80 -
17.81 -If we do not wish to wait indefinitely for a list of channels, we can set a
17.82 -timeout value as an argument to the ready method (as a floating point number
17.83 -specifying the timeout in seconds, where 0 means a non-blocking poll as stated
17.84 -in the select module's select function documentation).
17.85 -
17.86 -Signals and Waiting
17.87 --------------------
17.88 -
17.89 -When created/child processes terminate, one would typically want to be informed
17.90 -of such conditions using a signal handler. Unfortunately, Python seems to have
17.91 -issues with restartable reads from file descriptors when interrupted by signals:
17.92 -
17.93 -http://mail.python.org/pipermail/python-dev/2002-September/028572.html
17.94 -http://twistedmatrix.com/bugs/issue733
17.95 -
17.96 -Select and Poll
17.97 ----------------
17.98 -
17.99 -The exact combination of conditions indicating closed pipes remains relatively
17.100 -obscure. Here is a message/thread describing them (in the context of another
17.101 -topic):
17.102 -
17.103 -http://twistedmatrix.com/pipermail/twisted-python/2005-February/009666.html
17.104 -
17.105 -It would seem, from using sockets and from studying the asycore module, that
17.106 -sockets are more predictable than pipes.
17.107 -"""
17.108 -
17.109 -__version__ = "0.1"
17.110 -
17.111 -import os
17.112 -import sys
17.113 -import select
17.114 -import socket
17.115 -
17.116 -try:
17.117 - import cPickle as pickle
17.118 -except ImportError:
17.119 - import pickle
17.120 -
17.121 -class AcknowledgementError(Exception):
17.122 - pass
17.123 -
17.124 -class Channel:
17.125 -
17.126 - "A communications channel."
17.127 -
17.128 - def __init__(self, pid, read_pipe, write_pipe):
17.129 -
17.130 - """
17.131 - Initialise the channel with a process identifier 'pid', a 'read_pipe'
17.132 - from which messages will be received, and a 'write_pipe' into which
17.133 - messages will be sent.
17.134 - """
17.135 -
17.136 - self.pid = pid
17.137 - self.read_pipe = read_pipe
17.138 - self.write_pipe = write_pipe
17.139 - self.closed = 0
17.140 -
17.141 - def __del__(self):
17.142 -
17.143 - # Since signals don't work well with I/O, we close pipes and wait for
17.144 - # created processes upon finalisation.
17.145 -
17.146 - self.close()
17.147 -
17.148 - def close(self):
17.149 -
17.150 - "Explicitly close the channel."
17.151 -
17.152 - if not self.closed:
17.153 - self.closed = 1
17.154 - self.read_pipe.close()
17.155 - self.write_pipe.close()
17.156 - #self.wait(os.WNOHANG)
17.157 -
17.158 - def wait(self, options=0):
17.159 -
17.160 - "Wait for the created process, if any, to exit."
17.161 -
17.162 - if self.pid != 0:
17.163 - try:
17.164 - os.waitpid(self.pid, options)
17.165 - except OSError:
17.166 - pass
17.167 -
17.168 - def _send(self, obj):
17.169 -
17.170 - "Send the given object 'obj' through the channel."
17.171 -
17.172 - pickle.dump(obj, self.write_pipe)
17.173 - self.write_pipe.flush()
17.174 -
17.175 - def send(self, obj):
17.176 -
17.177 - """
17.178 - Send the given object 'obj' through the channel. Then wait for an
17.179 - acknowledgement. (The acknowledgement makes the caller wait, thus
17.180 - preventing processes from exiting and disrupting the communications
17.181 - channel and losing data.)
17.182 - """
17.183 -
17.184 - self._send(obj)
17.185 - if self._receive() != "OK":
17.186 - raise AcknowledgementError, obj
17.187 -
17.188 - def _receive(self):
17.189 -
17.190 - "Receive an object through the channel, returning the object."
17.191 -
17.192 - obj = pickle.load(self.read_pipe)
17.193 - if isinstance(obj, Exception):
17.194 - raise obj
17.195 - else:
17.196 - return obj
17.197 -
17.198 - def receive(self):
17.199 -
17.200 - """
17.201 - Receive an object through the channel, returning the object. Send an
17.202 - acknowledgement of receipt. (The acknowledgement makes the sender wait,
17.203 - thus preventing processes from exiting and disrupting the communications
17.204 - channel and losing data.)
17.205 - """
17.206 -
17.207 - try:
17.208 - obj = self._receive()
17.209 - return obj
17.210 - finally:
17.211 - self._send("OK")
17.212 -
17.213 -class Exchange:
17.214 -
17.215 - """
17.216 - A communications exchange that can be used to detect channels which are
17.217 - ready to communicate.
17.218 - """
17.219 -
17.220 - def __init__(self, channels=None, autoclose=1):
17.221 -
17.222 - """
17.223 - Initialise the exchange with an optional list of 'channels'. If the
17.224 - optional 'autoclose' parameter is set to a false value, channels will
17.225 - not be closed automatically when they are removed from the exchange - by
17.226 - default they are closed when removed.
17.227 - """
17.228 -
17.229 - self.autoclose = autoclose
17.230 - self.readables = {}
17.231 - self.poller = select.poll()
17.232 - for channel in channels or []:
17.233 - self.add(channel)
17.234 -
17.235 - def add(self, channel):
17.236 -
17.237 - "Add the given 'channel' to the exchange."
17.238 -
17.239 - self.readables[channel.read_pipe.fileno()] = channel
17.240 - self.poller.register(channel.read_pipe.fileno(), select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR)
17.241 -
17.242 - def active(self):
17.243 -
17.244 - "Return a list of active channels."
17.245 -
17.246 - return self.readables.values()
17.247 -
17.248 - def ready(self, timeout=None):
17.249 -
17.250 - """
17.251 - Wait for a period of time specified by the optional 'timeout' (or until
17.252 - communication is possible) and return a list of channels which are ready
17.253 - to be read from.
17.254 - """
17.255 -
17.256 - fds = self.poller.poll(timeout)
17.257 - readables = []
17.258 - for fd, status in fds:
17.259 - channel = self.readables[fd]
17.260 -
17.261 - # Remove ended/error channels.
17.262 -
17.263 - if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR):
17.264 - self.remove(channel)
17.265 -
17.266 - # Record readable channels.
17.267 -
17.268 - elif status & select.POLLIN:
17.269 - readables.append(channel)
17.270 -
17.271 - return readables
17.272 -
17.273 - def remove(self, channel):
17.274 -
17.275 - """
17.276 - Remove the given 'channel' from the exchange.
17.277 - """
17.278 -
17.279 - del self.readables[channel.read_pipe.fileno()]
17.280 - self.poller.unregister(channel.read_pipe.fileno())
17.281 - if self.autoclose:
17.282 - channel.close()
17.283 - channel.wait()
17.284 -
17.285 -def create():
17.286 -
17.287 - """
17.288 - Create a new process, returning a communications channel to both the
17.289 - creating process and the created process.
17.290 - """
17.291 -
17.292 - parent, child = socket.socketpair()
17.293 - for s in [parent, child]:
17.294 - s.setblocking(1)
17.295 -
17.296 - pid = os.fork()
17.297 - if pid == 0:
17.298 - parent.close()
17.299 - return Channel(pid, child.makefile("r"), child.makefile("w"))
17.300 - else:
17.301 - child.close()
17.302 - return Channel(pid, parent.makefile("r"), parent.makefile("w"))
17.303 -
17.304 -def start(callable, *args, **kwargs):
17.305 -
17.306 - """
17.307 - Create a new process which shall start running in the given 'callable'.
17.308 - Return a communications channel to the creating process, and supply such a
17.309 - channel to the created process as the 'channel' parameter in the given
17.310 - 'callable'. Additional arguments to the 'callable' can be given as
17.311 - additional arguments to this function.
17.312 - """
17.313 -
17.314 - channel = create()
17.315 - if channel.pid == 0:
17.316 - try:
17.317 - try:
17.318 - callable(channel, *args, **kwargs)
17.319 - except:
17.320 - exc_type, exc_value, exc_traceback = sys.exc_info()
17.321 - channel.send(exc_value)
17.322 - finally:
17.323 - channel.close()
17.324 - sys.exit(0)
17.325 - else:
17.326 - return channel
17.327 -
17.328 -def waitall():
17.329 -
17.330 - "Wait for all created processes to terminate."
17.331 -
17.332 - try:
17.333 - while 1:
17.334 - os.wait()
17.335 - except OSError:
17.336 - pass
17.337 -
17.338 -# vim: tabstop=4 expandtab shiftwidth=4
18.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
18.2 +++ b/pprocess.py Tue Oct 11 17:07:11 2005 +0000
18.3 @@ -0,0 +1,335 @@
18.4 +#!/usr/bin/env python
18.5 +
18.6 +"""
18.7 +A simple parallel processing API for Python, inspired somewhat by the thread
18.8 +module, slightly less by pypar, and slightly less still by pypvm.
18.9 +
18.10 +Thread-style Processing
18.11 +-----------------------
18.12 +
18.13 +To create new processes to run a function or any callable object, specify the
18.14 +"callable" and any arguments as follows:
18.15 +
18.16 +channel = start(fn, arg1, arg2, named1=value1, named2=value2)
18.17 +
18.18 +This returns a channel which can then be used to communicate with the created
18.19 +process. Meanwhile, in the created process, the given callable will be invoked
18.20 +with another channel as its first argument followed by the specified arguments:
18.21 +
18.22 +def fn(channel, arg1, arg2, named1, named2):
18.23 + # Read from and write to the channel.
18.24 + # Return value is ignored.
18.25 + ...
18.26 +
18.27 +Fork-style Processing
18.28 +---------------------
18.29 +
18.30 +To create new processes in a similar way to that employed when using os.fork
18.31 +(ie. the fork system call on various operating systems), use the following
18.32 +method:
18.33 +
18.34 +channel = create()
18.35 +if channel.pid == 0:
18.36 + # This code is run by the created process.
18.37 + # Read from and write to the channel to communicate with the
18.38 + # creating/calling process.
18.39 + # An explicit exit of the process may be desirable to prevent the process
18.40 + # from running code which is intended for the creating/calling process.
18.41 + ...
18.42 +else:
18.43 + # This code is run by the creating/calling process.
18.44 + # Read from and write to the channel to communicate with the created
18.45 + # process.
18.46 + ...
18.47 +
18.48 +Message Exchanges
18.49 +-----------------
18.50 +
18.51 +When creating many processes, each providing results for the consumption of the
18.52 +main process, the collection of those results in an efficient fashion can be
18.53 +problematic: if some processes take longer than others, and if we decide to read
18.54 +from those processes when they are not ready instead of other processes which
18.55 +are ready, the whole activity will take much longer than necessary.
18.56 +
18.57 +One solution to the problem of knowing when to read from channels is to create
18.58 +an Exchange object, optionally initialising it with a list of channels through
18.59 +which data is expected to arrive:
18.60 +
18.61 +exchange = Exchange() # populate the exchange later
18.62 +exchange = Exchange(channels) # populate the exchange with channels
18.63 +
18.64 +We can add channels to the exchange using the add method:
18.65 +
18.66 +exchange.add(channel)
18.67 +
18.68 +To test whether an exchange is active - that is, whether it is actually
18.69 +monitoring any channels - we can use the active method which returns all
18.70 +channels being monitored by the exchange:
18.71 +
18.72 +channels = exchange.active()
18.73 +
18.74 +We may then check the exchange to see whether any data is ready to be received;
18.75 +for example:
18.76 +
18.77 +for channel in exchange.ready():
18.78 + # Read from and write to the channel.
18.79 + ...
18.80 +
18.81 +If we do not wish to wait indefinitely for a list of channels, we can set a
18.82 +timeout value as an argument to the ready method (as a floating point number
18.83 +specifying the timeout in seconds, where 0 means a non-blocking poll as stated
18.84 +in the select module's select function documentation).
18.85 +
18.86 +Signals and Waiting
18.87 +-------------------
18.88 +
18.89 +When created/child processes terminate, one would typically want to be informed
18.90 +of such conditions using a signal handler. Unfortunately, Python seems to have
18.91 +issues with restartable reads from file descriptors when interrupted by signals:
18.92 +
18.93 +http://mail.python.org/pipermail/python-dev/2002-September/028572.html
18.94 +http://twistedmatrix.com/bugs/issue733
18.95 +
18.96 +Select and Poll
18.97 +---------------
18.98 +
18.99 +The exact combination of conditions indicating closed pipes remains relatively
18.100 +obscure. Here is a message/thread describing them (in the context of another
18.101 +topic):
18.102 +
18.103 +http://twistedmatrix.com/pipermail/twisted-python/2005-February/009666.html
18.104 +
18.105 +It would seem, from using sockets and from studying the asycore module, that
18.106 +sockets are more predictable than pipes.
18.107 +"""
18.108 +
18.109 +__version__ = "0.2"
18.110 +
18.111 +import os
18.112 +import sys
18.113 +import select
18.114 +import socket
18.115 +
18.116 +try:
18.117 + import cPickle as pickle
18.118 +except ImportError:
18.119 + import pickle
18.120 +
18.121 +class AcknowledgementError(Exception):
18.122 + pass
18.123 +
18.124 +class Channel:
18.125 +
18.126 + "A communications channel."
18.127 +
18.128 + def __init__(self, pid, read_pipe, write_pipe):
18.129 +
18.130 + """
18.131 + Initialise the channel with a process identifier 'pid', a 'read_pipe'
18.132 + from which messages will be received, and a 'write_pipe' into which
18.133 + messages will be sent.
18.134 + """
18.135 +
18.136 + self.pid = pid
18.137 + self.read_pipe = read_pipe
18.138 + self.write_pipe = write_pipe
18.139 + self.closed = 0
18.140 +
18.141 + def __del__(self):
18.142 +
18.143 + # Since signals don't work well with I/O, we close pipes and wait for
18.144 + # created processes upon finalisation.
18.145 +
18.146 + self.close()
18.147 +
18.148 + def close(self):
18.149 +
18.150 + "Explicitly close the channel."
18.151 +
18.152 + if not self.closed:
18.153 + self.closed = 1
18.154 + self.read_pipe.close()
18.155 + self.write_pipe.close()
18.156 + #self.wait(os.WNOHANG)
18.157 +
18.158 + def wait(self, options=0):
18.159 +
18.160 + "Wait for the created process, if any, to exit."
18.161 +
18.162 + if self.pid != 0:
18.163 + try:
18.164 + os.waitpid(self.pid, options)
18.165 + except OSError:
18.166 + pass
18.167 +
18.168 + def _send(self, obj):
18.169 +
18.170 + "Send the given object 'obj' through the channel."
18.171 +
18.172 + pickle.dump(obj, self.write_pipe)
18.173 + self.write_pipe.flush()
18.174 +
18.175 + def send(self, obj):
18.176 +
18.177 + """
18.178 + Send the given object 'obj' through the channel. Then wait for an
18.179 + acknowledgement. (The acknowledgement makes the caller wait, thus
18.180 + preventing processes from exiting and disrupting the communications
18.181 + channel and losing data.)
18.182 + """
18.183 +
18.184 + self._send(obj)
18.185 + if self._receive() != "OK":
18.186 + raise AcknowledgementError, obj
18.187 +
18.188 + def _receive(self):
18.189 +
18.190 + "Receive an object through the channel, returning the object."
18.191 +
18.192 + obj = pickle.load(self.read_pipe)
18.193 + if isinstance(obj, Exception):
18.194 + raise obj
18.195 + else:
18.196 + return obj
18.197 +
18.198 + def receive(self):
18.199 +
18.200 + """
18.201 + Receive an object through the channel, returning the object. Send an
18.202 + acknowledgement of receipt. (The acknowledgement makes the sender wait,
18.203 + thus preventing processes from exiting and disrupting the communications
18.204 + channel and losing data.)
18.205 + """
18.206 +
18.207 + try:
18.208 + obj = self._receive()
18.209 + return obj
18.210 + finally:
18.211 + self._send("OK")
18.212 +
18.213 +class Exchange:
18.214 +
18.215 + """
18.216 + A communications exchange that can be used to detect channels which are
18.217 + ready to communicate.
18.218 + """
18.219 +
18.220 + def __init__(self, channels=None, autoclose=1):
18.221 +
18.222 + """
18.223 + Initialise the exchange with an optional list of 'channels'. If the
18.224 + optional 'autoclose' parameter is set to a false value, channels will
18.225 + not be closed automatically when they are removed from the exchange - by
18.226 + default they are closed when removed.
18.227 + """
18.228 +
18.229 + self.autoclose = autoclose
18.230 + self.readables = {}
18.231 + self.poller = select.poll()
18.232 + for channel in channels or []:
18.233 + self.add(channel)
18.234 +
18.235 + def add(self, channel):
18.236 +
18.237 + "Add the given 'channel' to the exchange."
18.238 +
18.239 + self.readables[channel.read_pipe.fileno()] = channel
18.240 + self.poller.register(channel.read_pipe.fileno(), select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR)
18.241 +
18.242 + def active(self):
18.243 +
18.244 + "Return a list of active channels."
18.245 +
18.246 + return self.readables.values()
18.247 +
18.248 + def ready(self, timeout=None):
18.249 +
18.250 + """
18.251 + Wait for a period of time specified by the optional 'timeout' (or until
18.252 + communication is possible) and return a list of channels which are ready
18.253 + to be read from.
18.254 + """
18.255 +
18.256 + fds = self.poller.poll(timeout)
18.257 + readables = []
18.258 + for fd, status in fds:
18.259 + channel = self.readables[fd]
18.260 +
18.261 + # Remove ended/error channels.
18.262 +
18.263 + if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR):
18.264 + self.remove(channel)
18.265 +
18.266 + # Record readable channels.
18.267 +
18.268 + elif status & select.POLLIN:
18.269 + readables.append(channel)
18.270 +
18.271 + return readables
18.272 +
18.273 + def remove(self, channel):
18.274 +
18.275 + """
18.276 + Remove the given 'channel' from the exchange.
18.277 + """
18.278 +
18.279 + del self.readables[channel.read_pipe.fileno()]
18.280 + self.poller.unregister(channel.read_pipe.fileno())
18.281 + if self.autoclose:
18.282 + channel.close()
18.283 + channel.wait()
18.284 +
18.285 +def create():
18.286 +
18.287 + """
18.288 + Create a new process, returning a communications channel to both the
18.289 + creating process and the created process.
18.290 + """
18.291 +
18.292 + parent, child = socket.socketpair()
18.293 + for s in [parent, child]:
18.294 + s.setblocking(1)
18.295 +
18.296 + pid = os.fork()
18.297 + if pid == 0:
18.298 + parent.close()
18.299 + return Channel(pid, child.makefile("r"), child.makefile("w"))
18.300 + else:
18.301 + child.close()
18.302 + return Channel(pid, parent.makefile("r"), parent.makefile("w"))
18.303 +
18.304 +def start(callable, *args, **kwargs):
18.305 +
18.306 + """
18.307 + Create a new process which shall start running in the given 'callable'.
18.308 + Return a communications channel to the creating process, and supply such a
18.309 + channel to the created process as the 'channel' parameter in the given
18.310 + 'callable'. Additional arguments to the 'callable' can be given as
18.311 + additional arguments to this function.
18.312 + """
18.313 +
18.314 + channel = create()
18.315 + if channel.pid == 0:
18.316 + try:
18.317 + try:
18.318 + callable(channel, *args, **kwargs)
18.319 + except:
18.320 + exc_type, exc_value, exc_traceback = sys.exc_info()
18.321 + channel.send(exc_value)
18.322 + finally:
18.323 + channel.close()
18.324 + sys.exit(0)
18.325 + else:
18.326 + return channel
18.327 +
18.328 +def waitall():
18.329 +
18.330 + "Wait for all created processes to terminate."
18.331 +
18.332 + try:
18.333 + while 1:
18.334 + os.wait()
18.335 + except OSError:
18.336 + pass
18.337 +
18.338 +# vim: tabstop=4 expandtab shiftwidth=4
19.1 --- a/setup.py Thu Jun 19 21:42:43 2008 +0200
19.2 +++ b/setup.py Tue Oct 11 17:07:11 2005 +0000
19.3 @@ -2,7 +2,7 @@
19.4
19.5 from distutils.core import setup
19.6
19.7 -import parallel
19.8 +import pprocess
19.9
19.10 setup(
19.11 name = "parallel",
19.12 @@ -10,6 +10,6 @@
19.13 author = "Paul Boddie",
19.14 author_email = "paul@boddie.org.uk",
19.15 url = "http://www.python.org/pypi/parallel",
19.16 - version = parallel.__version__,
19.17 - py_modules = ["parallel"]
19.18 + version = pprocess.__version__,
19.19 + py_modules = ["pprocess"]
19.20 )
20.1 --- a/tests/create_loop.py Thu Jun 19 21:42:43 2008 +0200
20.2 +++ b/tests/create_loop.py Tue Oct 11 17:07:11 2005 +0000
20.3 @@ -1,6 +1,6 @@
20.4 #!/usr/bin/env python
20.5
20.6 -from parallel import create
20.7 +from pprocess import create
20.8
20.9 limit = 100
20.10 channel = create()
21.1 --- a/tests/start_indexer.py Thu Jun 19 21:42:43 2008 +0200
21.2 +++ b/tests/start_indexer.py Tue Oct 11 17:07:11 2005 +0000
21.3 @@ -2,7 +2,7 @@
21.4
21.5 "A simple text indexing activity."
21.6
21.7 -from parallel import start, Exchange
21.8 +from pprocess import start, Exchange
21.9 from Dict import Indexer, Searcher, Parser
21.10 import os
21.11
22.1 --- a/tests/start_loop.py Thu Jun 19 21:42:43 2008 +0200
22.2 +++ b/tests/start_loop.py Tue Oct 11 17:07:11 2005 +0000
22.3 @@ -1,6 +1,6 @@
22.4 #!/usr/bin/env python
22.5
22.6 -from parallel import start
22.7 +from pprocess import start
22.8
22.9 def loop(channel, limit):
22.10 print "loop to", limit