1 #!/usr/bin/env python 2 3 """ 4 A simple parallel processing API for Python, inspired somewhat by the thread 5 module, slightly less by pypar, and slightly less still by pypvm. 6 7 Thread-style Processing 8 ----------------------- 9 10 To create new processes to run a function or any callable object, specify the 11 "callable" and any arguments as follows: 12 13 channel = start(fn, arg1, arg2, named1=value1, named2=value2) 14 15 This returns a channel which can then be used to communicate with the created 16 process. Meanwhile, in the created process, the given callable will be invoked 17 with another channel as its first argument followed by the specified arguments: 18 19 def fn(channel, arg1, arg2, named1, named2): 20 # Read from and write to the channel. 21 # Return value is ignored. 22 ... 23 24 Fork-style Processing 25 --------------------- 26 27 To create new processes in a similar way to that employed when using os.fork 28 (ie. the fork system call on various operating systems), use the following 29 method: 30 31 channel = create() 32 if channel.pid == 0: 33 # This code is run by the created process. 34 # Read from and write to the channel to communicate with the 35 # creating/calling process. 36 # An explicit exit of the process may be desirable to prevent the process 37 # from running code which is intended for the creating/calling process. 38 ... 39 else: 40 # This code is run by the creating/calling process. 41 # Read from and write to the channel to communicate with the created 42 # process. 43 ... 44 45 Message Exchanges 46 ----------------- 47 48 When creating many processes, each providing results for the consumption of the 49 main process, the collection of those results in an efficient fashion can be 50 problematic: if some processes take longer than others, and if we decide to read 51 from those processes when they are not ready instead of other processes which 52 are ready, the whole activity will take much longer than necessary. 53 54 One solution to the problem of knowing when to read from channels is to create 55 an Exchange object, initialising it with a list of channels through which data 56 is expected to arrive: 57 58 exchange = Exchange(channels) 59 60 We may then check the exchange to see whether any data is ready to be received; 61 for example: 62 63 for channel in exchange.ready(): 64 # Read from and write to the channel. 65 ... 66 67 If we do not wish to wait indefinitely for a list of channels, we can set a 68 timeout value as an argument to the ready method (as a floating point number 69 specifying the timeout in seconds, where 0 means a non-blocking poll as stated 70 in the select module's select function documentation). 71 72 Signals and Waiting 73 ------------------- 74 75 When created/child processes terminate, one would typically want to be informed 76 of such conditions using a signal handler. Unfortunately, Python seems to have 77 issues with restartable reads from file descriptors when interrupted by signals: 78 79 http://mail.python.org/pipermail/python-dev/2002-September/028572.html 80 http://twistedmatrix.com/bugs/issue733 81 82 Select and Poll 83 --------------- 84 85 The exact combination of conditions indicating closed pipes remains relatively 86 obscure. Here is a message/thread describing them (in the context of another 87 topic): 88 89 http://twistedmatrix.com/pipermail/twisted-python/2005-February/009666.html 90 """ 91 92 import os 93 import sys 94 import select 95 import socket 96 #from signal import signal, SIGCHLD 97 98 try: 99 import cPickle as pickle 100 except ImportError: 101 import pickle 102 103 class Channel: 104 105 "A communications channel." 106 107 def __init__(self, pid, read_pipe, write_pipe): 108 109 """ 110 Initialise the channel with a process identifier 'pid', a 'read_pipe' 111 from which messages will be received, and a 'write_pipe' into which 112 messages will be sent. 113 """ 114 115 self.pid = pid 116 self.read_pipe = read_pipe 117 self.write_pipe = write_pipe 118 self.closed = 0 119 120 def __del__(self): 121 122 # Since signals don't work well with I/O, we close pipes and wait for 123 # created processes upon finalisation. 124 125 if not self.closed: 126 self.close() 127 128 def close(self): 129 130 "Explicitly close the channel." 131 132 self.closed = 1 133 self.read_pipe.close() 134 self.write_pipe.close() 135 self.wait(os.WNOHANG) 136 137 def wait(self, options=0): 138 139 "Wait for the created process, if any, to exit." 140 141 if self.pid != 0: 142 try: 143 os.waitpid(self.pid, options) 144 except OSError: 145 pass 146 147 def send(self, obj): 148 149 "Send the given object 'obj' through the channel." 150 151 pickle.dump(obj, self.write_pipe) 152 self.write_pipe.flush() 153 154 def receive(self): 155 156 "Receive an object through the channel, returning the object." 157 158 obj = pickle.load(self.read_pipe) 159 if isinstance(obj, Exception): 160 raise obj 161 else: 162 return obj 163 164 class Exchange: 165 166 """ 167 A communications exchange that can be used to detect channels which are 168 ready to communicate. 169 """ 170 171 def __init__(self, channels): 172 173 "Initialise the exchange with the given 'channels'." 174 175 self.readables = {} 176 self.poller = select.poll() 177 for channel in channels: 178 self.readables[channel.read_pipe.fileno()] = channel 179 self.poller.register(channel.read_pipe.fileno(), select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR) 180 181 def active(self): 182 183 "Return the number of active channels." 184 185 return len(self.readables.keys()) 186 187 def ready(self, timeout=None): 188 189 """ 190 Wait for a period of time specified by the optional 'timeout' (or until 191 communication is possible) and return a list of channels which are ready 192 to be read from. 193 """ 194 195 fds = self.poller.poll(timeout) 196 readables = [] 197 for fd, status in fds: 198 channel = self.readables[fd] 199 200 # Record readable channels. 201 202 if status & select.POLLIN: 203 readables.append(channel) 204 205 # Remove ended/error channels. 206 207 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR): 208 self.remove(channel) 209 210 return readables 211 212 def remove(self, channel): 213 214 """ 215 Remove the given 'channel' from the exchange. 216 """ 217 218 del self.readables[channel.read_pipe.fileno()] 219 self.poller.unregister(channel.read_pipe.fileno()) 220 221 def create(): 222 223 """ 224 Create a new process, returning a communications channel to both the 225 creating process and the created process. 226 """ 227 228 parent_read, child_write = socket.socketpair() 229 child_read, parent_write = socket.socketpair() 230 for s in [parent_read, child_write, child_read, parent_write]: 231 s.setblocking(1) 232 233 pid = os.fork() 234 if pid == 0: 235 parent_read.close() 236 parent_write.close() 237 return Channel(pid, child_read.makefile("r"), child_write.makefile("w")) 238 else: 239 child_read.close() 240 child_write.close() 241 return Channel(pid, parent_read.makefile("r"), parent_write.makefile("w")) 242 243 def start(callable, *args, **kwargs): 244 245 """ 246 Create a new process which shall start running in the given 'callable'. 247 Return a communications channel to the creating process, and supply such a 248 channel to the created process as the 'channel' parameter in the given 249 'callable'. Additional arguments to the 'callable' can be given as 250 additional arguments to this function. 251 """ 252 253 channel = create() 254 if channel.pid == 0: 255 try: 256 try: 257 callable(channel, *args, **kwargs) 258 except: 259 exc_type, exc_value, exc_traceback = sys.exc_info() 260 channel.send(exc_value) 261 finally: 262 channel.close() 263 sys.exit(0) 264 else: 265 return channel 266 267 # Define and install a handler which waits for terminated child processes. 268 269 #def handler(number, frame): 270 # os.wait() 271 272 #signal(SIGCHLD, handler) 273 274 # vim: tabstop=4 expandtab shiftwidth=4