# HG changeset patch # User paulb # Date 1127922996 0 # Node ID c34dad3cdba9b78a0d0c6dcc420068d448cddab7 # Parent 2062a7f628ed02059be1d21ac9a48652b74c0805 [project @ 2005-09-28 15:56:36 by paulb] Changed pipes to socket pairs. diff -r 2062a7f628ed -r c34dad3cdba9 parallel.py --- a/parallel.py Wed Sep 28 15:44:53 2005 +0000 +++ b/parallel.py Wed Sep 28 15:56:36 2005 +0000 @@ -92,6 +92,7 @@ import os import sys import select +import socket #from signal import signal, SIGCHLD try: @@ -175,7 +176,7 @@ self.poller = select.poll() for channel in channels: self.readables[channel.read_pipe.fileno()] = channel - self.poller.register(channel.read_pipe.fileno(), select.POLLIN | select.POLLOUT | select.POLLHUP | select.POLLNVAL | select.POLLERR) + self.poller.register(channel.read_pipe.fileno(), select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR) def active(self): @@ -203,7 +204,7 @@ # Remove ended/error channels. - if status & (select.POLLOUT | select.POLLHUP | select.POLLNVAL | select.POLLERR): + if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR): self.remove(channel) return readables @@ -224,18 +225,20 @@ creating process and the created process. """ - parent_read_fd, child_write_fd = os.pipe() - child_read_fd, parent_write_fd = os.pipe() + parent_read, child_write = socket.socketpair() + child_read, parent_write = socket.socketpair() + for s in [parent_read, child_write, child_read, parent_write]: + s.setblocking(1) pid = os.fork() if pid == 0: - os.close(parent_read_fd) - os.close(parent_write_fd) - return Channel(pid, os.fdopen(child_read_fd, "r"), os.fdopen(child_write_fd, "w")) + parent_read.close() + parent_write.close() + return Channel(pid, child_read.makefile("r"), child_write.makefile("w")) else: - os.close(child_read_fd) - os.close(child_write_fd) - return Channel(pid, os.fdopen(parent_read_fd, "r"), os.fdopen(parent_write_fd, "w")) + child_read.close() + child_write.close() + return Channel(pid, parent_read.makefile("r"), parent_write.makefile("w")) def start(callable, *args, **kwargs):