# HG changeset patch # User paulb # Date 1127759158 0 # Node ID 0b6a52dd38222a16a7a14ddc41ac22f9403e631c # Parent 33269b6eee20b1dff82cbd9f244dea99007b427f [project @ 2005-09-26 18:25:58 by paulb] Initial revision diff -r 33269b6eee20 -r 0b6a52dd3822 parallel.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/parallel.py Mon Sep 26 18:25:58 2005 +0000 @@ -0,0 +1,120 @@ +#!/usr/bin/env python + +""" +A simple parallel processing API for Python, inspired somewhat by the thread +module, slightly less by pypar, and slightly less still by pypvm. + +To create new processes to run a function or any callable object, specify the +"callable" and any arguments as follows: + +channel = start(fn, arg1, arg2, named1=value1, named2=value2) + +This returns a channel which can then be used to communicate with the created +process. Meanwhile, in the created process, the given callable will be invoked +with another channel as its first argument followed by the specified arguments: + +def fn(channel, arg1, arg2, named1, named2): + # Read to and write from the channel. + # Return value is ignored. + ... + +To create new processes in a similar way to that employed when using os.fork +(ie. the fork system call on various operating systems), use the following +method: + +channel = create() +if channel.pid == 0: + # This code is run by the created process. + # Read to and write from the channel to communicate with the + # creating/calling process. + # An explicit exit of the process may be desirable to prevent the process + # from running code which is intended for the creating/calling process. + ... +else: + # This code is run by the creating/calling process. + # Read to and write from the channel to communicate with the created + # process. + ... +""" + +import os +import sys + +try: + import cPickle as pickle +except ImportError: + import pickle + +class Channel: + + "A communications channel." + + def __init__(self, pid, read_pipe, write_pipe): + + """ + Initialise the channel with a process identifier 'pid', a 'read_pipe' + from which messages will be received, and a 'write_pipe' into which + messages will be sent. + """ + + self.pid = pid + self.read_pipe = read_pipe + self.write_pipe = write_pipe + + def send(self, obj): + + "Send the given object 'obj' through the channel." + + pickle.dump(obj, self.write_pipe) + self.write_pipe.flush() + + def receive(self): + + "Receive an object through the channel, returning the object." + + obj = pickle.load(self.read_pipe) + if isinstance(obj, Exception): + raise obj + else: + return obj + +def create(): + + """ + Create a new process, returning a communications channel to both the + creating process and the created process. + """ + + parent_read_fd, child_write_fd = os.pipe() + child_read_fd, parent_write_fd = os.pipe() + + pid = os.fork() + if pid == 0: + return Channel(pid, os.fdopen(child_read_fd, "r"), os.fdopen(child_write_fd, "w")) + else: + return Channel(pid, os.fdopen(parent_read_fd, "r"), os.fdopen(parent_write_fd, "w")) + +def start(callable, *args, **kwargs): + + """ + Create a new process which shall start running in the given 'callable'. + Return a communications channel to the creating process, and supply such a + channel to the created process as the 'channel' parameter in the given + 'callable'. Additional arguments to the 'callable' can be given as + additional arguments to this function. + """ + + channel = create() + if channel.pid == 0: + try: + try: + callable(channel, *args, **kwargs) + except: + exc_type, exc_value, exc_traceback = sys.exc_info() + channel.send(exc_value) + finally: + sys.exit(0) + else: + return channel + +# vim: tabstop=4 expandtab shiftwidth=4 diff -r 33269b6eee20 -r 0b6a52dd3822 tests/create_loop.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/create_loop.py Mon Sep 26 18:25:58 2005 +0000 @@ -0,0 +1,18 @@ +#!/usr/bin/env python + +from parallel import create + +limit = 100 +channel = create() +if channel.pid == 0: + i = channel.receive() + while i < limit: + print i + i = channel.receive() + channel.send("Done") +else: + for i in range(0, limit + 1): + channel.send(i) + print channel.receive() + +# vim: tabstop=4 expandtab shiftwidth=4 diff -r 33269b6eee20 -r 0b6a52dd3822 tests/start_loop.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/start_loop.py Mon Sep 26 18:25:58 2005 +0000 @@ -0,0 +1,20 @@ +#!/usr/bin/env python + +from parallel import start + +def loop(channel, limit): + print "loop to", limit + i = channel.receive() + while i < limit: + print i + i = channel.receive() + channel.send("Done") + +if __name__ == "__main__": + limit = 100 + channel = start(loop, limit) + for i in range(0, limit + 1): + channel.send(i) + print channel.receive() + +# vim: tabstop=4 expandtab shiftwidth=4