# HG changeset patch # User paulb # Date 1127830608 0 # Node ID 97103996bad14495f473c256c1a0dc66556357a8 # Parent 93f4b77f294f83130abeebd20edec9d481889f09 [project @ 2005-09-27 14:16:48 by paulb] Introduced explicit closing of channels, along with specific waiting for created/child processes. Added active and remove methods to the Exchange class so that the channels being monitored can be queried and manipulated directly. diff -r 93f4b77f294f -r 97103996bad1 parallel.py --- a/parallel.py Mon Sep 26 22:27:07 2005 +0000 +++ b/parallel.py Tue Sep 27 14:16:48 2005 +0000 @@ -68,12 +68,21 @@ timeout value as an argument to the ready method (as a floating point number specifying the timeout in seconds, where 0 means a non-blocking poll as stated in the select module's select function documentation). + +Signals and Waiting +------------------- + +When created/child processes terminate, one would typically want to be informed +of such conditions using a signal handler. Unfortunately, Python seems to have +issues with restartable reads from file descriptors when interrupted by signals: + +http://mail.python.org/pipermail/python-dev/2002-September/028572.html """ import os import sys import select -from signal import signal, SIGCHLD +#from signal import signal, SIGCHLD try: import cPickle as pickle @@ -95,14 +104,25 @@ self.pid = pid self.read_pipe = read_pipe self.write_pipe = write_pipe + self.closed = 0 def __del__(self): # NOTE: Hack until the signals vs. pipes behaviour is fixed. + if not self.closed: + self.close() + + def close(self): + + "Explicitly close the channel." + + self.closed = 1 + self.read_pipe.close() + self.write_pipe.close() if self.pid != 0: try: - os.wait() + os.waitpid(self.pid, os.WNOHANG) except OSError: pass @@ -138,6 +158,12 @@ for channel in channels: self.readables[channel.read_pipe] = channel + def active(self): + + "Return the number of active channels." + + return len(self.readables.keys()) + def ready(self, timeout=None): """ @@ -158,6 +184,14 @@ readable = [self.readables[fd] for fd in readable_fds] return readable + def remove(self, channel): + + """ + Remove the given 'channel' from the exchange. + """ + + del self.readables[channel.read_pipe] + def create(): """