1 #!/usr/bin/env python 2 3 """ 4 A simple parallel processing API for Python, inspired somewhat by the thread 5 module, slightly less by pypar, and slightly less still by pypvm. 6 7 Thread-style Processing 8 ----------------------- 9 10 To create new processes to run a function or any callable object, specify the 11 "callable" and any arguments as follows: 12 13 channel = start(fn, arg1, arg2, named1=value1, named2=value2) 14 15 This returns a channel which can then be used to communicate with the created 16 process. Meanwhile, in the created process, the given callable will be invoked 17 with another channel as its first argument followed by the specified arguments: 18 19 def fn(channel, arg1, arg2, named1, named2): 20 # Read from and write to the channel. 21 # Return value is ignored. 22 ... 23 24 Fork-style Processing 25 --------------------- 26 27 To create new processes in a similar way to that employed when using os.fork 28 (ie. the fork system call on various operating systems), use the following 29 method: 30 31 channel = create() 32 if channel.pid == 0: 33 # This code is run by the created process. 34 # Read from and write to the channel to communicate with the 35 # creating/calling process. 36 # An explicit exit of the process may be desirable to prevent the process 37 # from running code which is intended for the creating/calling process. 38 ... 39 else: 40 # This code is run by the creating/calling process. 41 # Read from and write to the channel to communicate with the created 42 # process. 43 ... 44 45 Message Exchanges 46 ----------------- 47 48 When creating many processes, each providing results for the consumption of the 49 main process, the collection of those results in an efficient fashion can be 50 problematic: if some processes take longer than others, and if we decide to read 51 from those processes when they are not ready instead of other processes which 52 are ready, the whole activity will take much longer than necessary. 53 54 One solution to the problem of knowing when to read from channels is to create 55 an Exchange object, initialising it with a list of channels through which data 56 is expected to arrive: 57 58 exchange = Exchange(channels) 59 60 We may then check the exchange to see whether any data is ready to be received; 61 for example: 62 63 for channel in exchange.ready(): 64 # Read from and write to the channel. 65 ... 66 67 If we do not wish to wait indefinitely for a list of channels, we can set a 68 timeout value as an argument to the ready method (as a floating point number 69 specifying the timeout in seconds, where 0 means a non-blocking poll as stated 70 in the select module's select function documentation). 71 72 Signals and Waiting 73 ------------------- 74 75 When created/child processes terminate, one would typically want to be informed 76 of such conditions using a signal handler. Unfortunately, Python seems to have 77 issues with restartable reads from file descriptors when interrupted by signals: 78 79 http://mail.python.org/pipermail/python-dev/2002-September/028572.html 80 http://twistedmatrix.com/bugs/issue733 81 82 Select and Poll 83 --------------- 84 85 The exact combination of conditions indicating closed pipes remains relatively 86 obscure. Here is a message/thread describing them (in the context of another 87 topic): 88 89 http://twistedmatrix.com/pipermail/twisted-python/2005-February/009666.html 90 """ 91 92 import os 93 import sys 94 import select 95 #from signal import signal, SIGCHLD 96 97 try: 98 import cPickle as pickle 99 except ImportError: 100 import pickle 101 102 class Channel: 103 104 "A communications channel." 105 106 def __init__(self, pid, read_pipe, write_pipe): 107 108 """ 109 Initialise the channel with a process identifier 'pid', a 'read_pipe' 110 from which messages will be received, and a 'write_pipe' into which 111 messages will be sent. 112 """ 113 114 self.pid = pid 115 self.read_pipe = read_pipe 116 self.write_pipe = write_pipe 117 self.closed = 0 118 119 def __del__(self): 120 121 # NOTE: Hack until the signals vs. pipes behaviour is fixed. 122 123 if not self.closed: 124 self.close() 125 126 def close(self): 127 128 "Explicitly close the channel." 129 130 self.closed = 1 131 self.read_pipe.close() 132 self.write_pipe.close() 133 self.wait(os.WNOHANG) 134 135 def wait(self, options=0): 136 137 "Wait for the created process, if any, to exit." 138 139 if self.pid != 0: 140 try: 141 os.waitpid(self.pid, options) 142 except OSError: 143 pass 144 145 def send(self, obj): 146 147 "Send the given object 'obj' through the channel." 148 149 pickle.dump(obj, self.write_pipe) 150 self.write_pipe.flush() 151 152 def receive(self): 153 154 "Receive an object through the channel, returning the object." 155 156 obj = pickle.load(self.read_pipe) 157 if isinstance(obj, Exception): 158 raise obj 159 else: 160 return obj 161 162 class Exchange: 163 164 """ 165 A communications exchange that can be used to detect channels which are 166 ready to communicate. 167 """ 168 169 def __init__(self, channels): 170 171 "Initialise the exchange with the given 'channels'." 172 173 self.readables = {} 174 self.poller = select.poll() 175 for channel in channels: 176 self.readables[channel.read_pipe.fileno()] = channel 177 self.poller.register(channel.read_pipe.fileno(), select.POLLIN | select.POLLOUT | select.POLLHUP | select.POLLNVAL | select.POLLERR) 178 179 def active(self): 180 181 "Return the number of active channels." 182 183 return len(self.readables.keys()) 184 185 def ready(self, timeout=None): 186 187 """ 188 Wait for a period of time specified by the optional 'timeout' (or until 189 communication is possible) and return a list of channels which are ready 190 to be read from. 191 """ 192 193 fds = self.poller.poll(timeout) 194 readables = [] 195 for fd, status in fds: 196 channel = self.readables[fd] 197 198 # Record readable channels. 199 200 if status & select.POLLIN: 201 readables.append(channel) 202 203 # Remove ended/error channels. 204 205 if status & (select.POLLOUT | select.POLLHUP | select.POLLNVAL | select.POLLERR): 206 self.remove(channel) 207 208 return readables 209 210 def remove(self, channel): 211 212 """ 213 Remove the given 'channel' from the exchange. 214 """ 215 216 del self.readables[channel.read_pipe.fileno()] 217 self.poller.unregister(channel.read_pipe.fileno()) 218 219 def create(): 220 221 """ 222 Create a new process, returning a communications channel to both the 223 creating process and the created process. 224 """ 225 226 parent_read_fd, child_write_fd = os.pipe() 227 child_read_fd, parent_write_fd = os.pipe() 228 229 pid = os.fork() 230 if pid == 0: 231 os.close(parent_read_fd) 232 os.close(parent_write_fd) 233 return Channel(pid, os.fdopen(child_read_fd, "r"), os.fdopen(child_write_fd, "w")) 234 else: 235 os.close(child_read_fd) 236 os.close(child_write_fd) 237 return Channel(pid, os.fdopen(parent_read_fd, "r"), os.fdopen(parent_write_fd, "w")) 238 239 def start(callable, *args, **kwargs): 240 241 """ 242 Create a new process which shall start running in the given 'callable'. 243 Return a communications channel to the creating process, and supply such a 244 channel to the created process as the 'channel' parameter in the given 245 'callable'. Additional arguments to the 'callable' can be given as 246 additional arguments to this function. 247 """ 248 249 channel = create() 250 if channel.pid == 0: 251 try: 252 try: 253 callable(channel, *args, **kwargs) 254 except: 255 exc_type, exc_value, exc_traceback = sys.exc_info() 256 channel.send(exc_value) 257 finally: 258 channel.close() 259 sys.exit(0) 260 else: 261 return channel 262 263 # Define and install a handler which waits for terminated child processes. 264 265 #def handler(number, frame): 266 # os.wait() 267 268 #signal(SIGCHLD, handler) 269 270 # vim: tabstop=4 expandtab shiftwidth=4