1 #!/usr/bin/env python 2 3 """ 4 A simple parallel processing API for Python, inspired somewhat by the thread 5 module, slightly less by pypar, and slightly less still by pypvm. 6 7 Thread-style Processing 8 ----------------------- 9 10 To create new processes to run a function or any callable object, specify the 11 "callable" and any arguments as follows: 12 13 channel = start(fn, arg1, arg2, named1=value1, named2=value2) 14 15 This returns a channel which can then be used to communicate with the created 16 process. Meanwhile, in the created process, the given callable will be invoked 17 with another channel as its first argument followed by the specified arguments: 18 19 def fn(channel, arg1, arg2, named1, named2): 20 # Read from and write to the channel. 21 # Return value is ignored. 22 ... 23 24 Fork-style Processing 25 --------------------- 26 27 To create new processes in a similar way to that employed when using os.fork 28 (ie. the fork system call on various operating systems), use the following 29 method: 30 31 channel = create() 32 if channel.pid == 0: 33 # This code is run by the created process. 34 # Read from and write to the channel to communicate with the 35 # creating/calling process. 36 # An explicit exit of the process may be desirable to prevent the process 37 # from running code which is intended for the creating/calling process. 38 ... 39 else: 40 # This code is run by the creating/calling process. 41 # Read from and write to the channel to communicate with the created 42 # process. 43 ... 44 45 Message Exchanges 46 ----------------- 47 48 When creating many processes, each providing results for the consumption of the 49 main process, the collection of those results in an efficient fashion can be 50 problematic: if some processes take longer than others, and if we decide to read 51 from those processes when they are not ready instead of other processes which 52 are ready, the whole activity will take much longer than necessary. 53 54 One solution to the problem of knowing when to read from channels is to create 55 an Exchange object, optionally initialising it with a list of channels through 56 which data is expected to arrive: 57 58 exchange = Exchange() # populate the exchange later 59 exchange = Exchange(channels) # populate the exchange with channels 60 61 We can add channels to the exchange using the add method: 62 63 exchange.add(channel) 64 65 To test whether an exchange is active - that is, whether it is actually 66 monitoring any channels - we can use the active method which returns all 67 channels being monitored by the exchange: 68 69 channels = exchange.active() 70 71 We may then check the exchange to see whether any data is ready to be received; 72 for example: 73 74 for channel in exchange.ready(): 75 # Read from and write to the channel. 76 ... 77 78 If we do not wish to wait indefinitely for a list of channels, we can set a 79 timeout value as an argument to the ready method (as a floating point number 80 specifying the timeout in seconds, where 0 means a non-blocking poll as stated 81 in the select module's select function documentation). 82 83 Signals and Waiting 84 ------------------- 85 86 When created/child processes terminate, one would typically want to be informed 87 of such conditions using a signal handler. Unfortunately, Python seems to have 88 issues with restartable reads from file descriptors when interrupted by signals: 89 90 http://mail.python.org/pipermail/python-dev/2002-September/028572.html 91 http://twistedmatrix.com/bugs/issue733 92 93 Select and Poll 94 --------------- 95 96 The exact combination of conditions indicating closed pipes remains relatively 97 obscure. Here is a message/thread describing them (in the context of another 98 topic): 99 100 http://twistedmatrix.com/pipermail/twisted-python/2005-February/009666.html 101 102 It would seem, from using sockets and from studying the asycore module, that 103 sockets are more predictable than pipes. 104 """ 105 106 __version__ = "0.1" 107 108 import os 109 import sys 110 import select 111 import socket 112 113 try: 114 import cPickle as pickle 115 except ImportError: 116 import pickle 117 118 class AcknowledgementError(Exception): 119 pass 120 121 class Channel: 122 123 "A communications channel." 124 125 def __init__(self, pid, read_pipe, write_pipe): 126 127 """ 128 Initialise the channel with a process identifier 'pid', a 'read_pipe' 129 from which messages will be received, and a 'write_pipe' into which 130 messages will be sent. 131 """ 132 133 self.pid = pid 134 self.read_pipe = read_pipe 135 self.write_pipe = write_pipe 136 self.closed = 0 137 138 def __del__(self): 139 140 # Since signals don't work well with I/O, we close pipes and wait for 141 # created processes upon finalisation. 142 143 self.close() 144 145 def close(self): 146 147 "Explicitly close the channel." 148 149 if not self.closed: 150 self.closed = 1 151 self.read_pipe.close() 152 self.write_pipe.close() 153 #self.wait(os.WNOHANG) 154 155 def wait(self, options=0): 156 157 "Wait for the created process, if any, to exit." 158 159 if self.pid != 0: 160 try: 161 os.waitpid(self.pid, options) 162 except OSError: 163 pass 164 165 def _send(self, obj): 166 167 "Send the given object 'obj' through the channel." 168 169 pickle.dump(obj, self.write_pipe) 170 self.write_pipe.flush() 171 172 def send(self, obj): 173 174 """ 175 Send the given object 'obj' through the channel. Then wait for an 176 acknowledgement. (The acknowledgement makes the caller wait, thus 177 preventing processes from exiting and disrupting the communications 178 channel and losing data.) 179 """ 180 181 self._send(obj) 182 if self._receive() != "OK": 183 raise AcknowledgementError, obj 184 185 def _receive(self): 186 187 "Receive an object through the channel, returning the object." 188 189 obj = pickle.load(self.read_pipe) 190 if isinstance(obj, Exception): 191 raise obj 192 else: 193 return obj 194 195 def receive(self): 196 197 """ 198 Receive an object through the channel, returning the object. Send an 199 acknowledgement of receipt. (The acknowledgement makes the sender wait, 200 thus preventing processes from exiting and disrupting the communications 201 channel and losing data.) 202 """ 203 204 try: 205 obj = self._receive() 206 return obj 207 finally: 208 self._send("OK") 209 210 class Exchange: 211 212 """ 213 A communications exchange that can be used to detect channels which are 214 ready to communicate. 215 """ 216 217 def __init__(self, channels=None, autoclose=1): 218 219 """ 220 Initialise the exchange with an optional list of 'channels'. If the 221 optional 'autoclose' parameter is set to a false value, channels will 222 not be closed automatically when they are removed from the exchange - by 223 default they are closed when removed. 224 """ 225 226 self.autoclose = autoclose 227 self.readables = {} 228 self.poller = select.poll() 229 for channel in channels or []: 230 self.add(channel) 231 232 def add(self, channel): 233 234 "Add the given 'channel' to the exchange." 235 236 self.readables[channel.read_pipe.fileno()] = channel 237 self.poller.register(channel.read_pipe.fileno(), select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR) 238 239 def active(self): 240 241 "Return a list of active channels." 242 243 return self.readables.values() 244 245 def ready(self, timeout=None): 246 247 """ 248 Wait for a period of time specified by the optional 'timeout' (or until 249 communication is possible) and return a list of channels which are ready 250 to be read from. 251 """ 252 253 fds = self.poller.poll(timeout) 254 readables = [] 255 for fd, status in fds: 256 channel = self.readables[fd] 257 258 # Remove ended/error channels. 259 260 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR): 261 self.remove(channel) 262 263 # Record readable channels. 264 265 elif status & select.POLLIN: 266 readables.append(channel) 267 268 return readables 269 270 def remove(self, channel): 271 272 """ 273 Remove the given 'channel' from the exchange. 274 """ 275 276 del self.readables[channel.read_pipe.fileno()] 277 self.poller.unregister(channel.read_pipe.fileno()) 278 if self.autoclose: 279 channel.close() 280 channel.wait() 281 282 def create(): 283 284 """ 285 Create a new process, returning a communications channel to both the 286 creating process and the created process. 287 """ 288 289 parent, child = socket.socketpair() 290 for s in [parent, child]: 291 s.setblocking(1) 292 293 pid = os.fork() 294 if pid == 0: 295 parent.close() 296 return Channel(pid, child.makefile("r"), child.makefile("w")) 297 else: 298 child.close() 299 return Channel(pid, parent.makefile("r"), parent.makefile("w")) 300 301 def start(callable, *args, **kwargs): 302 303 """ 304 Create a new process which shall start running in the given 'callable'. 305 Return a communications channel to the creating process, and supply such a 306 channel to the created process as the 'channel' parameter in the given 307 'callable'. Additional arguments to the 'callable' can be given as 308 additional arguments to this function. 309 """ 310 311 channel = create() 312 if channel.pid == 0: 313 try: 314 try: 315 callable(channel, *args, **kwargs) 316 except: 317 exc_type, exc_value, exc_traceback = sys.exc_info() 318 channel.send(exc_value) 319 finally: 320 channel.close() 321 sys.exit(0) 322 else: 323 return channel 324 325 def waitall(): 326 327 "Wait for all created processes to terminate." 328 329 try: 330 while 1: 331 os.wait() 332 except OSError: 333 pass 334 335 # vim: tabstop=4 expandtab shiftwidth=4