1 #!/usr/bin/env python 2 3 """ 4 A simple parallel processing API for Python, inspired somewhat by the thread 5 module, slightly less by pypar, and slightly less still by pypvm. 6 7 Copyright (C) 2005, 2006 Paul Boddie <paul@boddie.org.uk> 8 9 This software is free software; you can redistribute it and/or 10 modify it under the terms of the GNU General Public License as 11 published by the Free Software Foundation; either version 2 of 12 the License, or (at your option) any later version. 13 14 This software is distributed in the hope that it will be useful, 15 but WITHOUT ANY WARRANTY; without even the implied warranty of 16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 17 GNU General Public License for more details. 18 19 You should have received a copy of the GNU General Public 20 License along with this library; see the file LICENCE.txt 21 If not, write to the Free Software Foundation, Inc., 22 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA 23 24 -------- 25 26 Thread-style Processing 27 ----------------------- 28 29 To create new processes to run a function or any callable object, specify the 30 "callable" and any arguments as follows: 31 32 channel = start(fn, arg1, arg2, named1=value1, named2=value2) 33 34 This returns a channel which can then be used to communicate with the created 35 process. Meanwhile, in the created process, the given callable will be invoked 36 with another channel as its first argument followed by the specified arguments: 37 38 def fn(channel, arg1, arg2, named1, named2): 39 # Read from and write to the channel. 40 # Return value is ignored. 41 ... 42 43 Fork-style Processing 44 --------------------- 45 46 To create new processes in a similar way to that employed when using os.fork 47 (ie. the fork system call on various operating systems), use the following 48 method: 49 50 channel = create() 51 if channel.pid == 0: 52 # This code is run by the created process. 53 # Read from and write to the channel to communicate with the 54 # creating/calling process. 55 # An explicit exit of the process may be desirable to prevent the process 56 # from running code which is intended for the creating/calling process. 57 ... 58 else: 59 # This code is run by the creating/calling process. 60 # Read from and write to the channel to communicate with the created 61 # process. 62 ... 63 64 Message Exchanges 65 ----------------- 66 67 When creating many processes, each providing results for the consumption of the 68 main process, the collection of those results in an efficient fashion can be 69 problematic: if some processes take longer than others, and if we decide to read 70 from those processes when they are not ready instead of other processes which 71 are ready, the whole activity will take much longer than necessary. 72 73 One solution to the problem of knowing when to read from channels is to create 74 an Exchange object, optionally initialising it with a list of channels through 75 which data is expected to arrive: 76 77 exchange = Exchange() # populate the exchange later 78 exchange = Exchange(channels) # populate the exchange with channels 79 80 We can add channels to the exchange using the add method: 81 82 exchange.add(channel) 83 84 To test whether an exchange is active - that is, whether it is actually 85 monitoring any channels - we can use the active method which returns all 86 channels being monitored by the exchange: 87 88 channels = exchange.active() 89 90 We may then check the exchange to see whether any data is ready to be received; 91 for example: 92 93 for channel in exchange.ready(): 94 # Read from and write to the channel. 95 ... 96 97 If we do not wish to wait indefinitely for a list of channels, we can set a 98 timeout value as an argument to the ready method (as a floating point number 99 specifying the timeout in seconds, where 0 means a non-blocking poll as stated 100 in the select module's select function documentation). 101 102 Signals and Waiting 103 ------------------- 104 105 When created/child processes terminate, one would typically want to be informed 106 of such conditions using a signal handler. Unfortunately, Python seems to have 107 issues with restartable reads from file descriptors when interrupted by signals: 108 109 http://mail.python.org/pipermail/python-dev/2002-September/028572.html 110 http://twistedmatrix.com/bugs/issue733 111 112 Select and Poll 113 --------------- 114 115 The exact combination of conditions indicating closed pipes remains relatively 116 obscure. Here is a message/thread describing them (in the context of another 117 topic): 118 119 http://twistedmatrix.com/pipermail/twisted-python/2005-February/009666.html 120 121 It would seem, from using sockets and from studying the asyncore module, that 122 sockets are more predictable than pipes. 123 124 Notes about poll implementations can be found here: 125 126 http://www.greenend.org.uk/rjk/2001/06/poll.html 127 """ 128 129 __version__ = "0.2.2" 130 131 import os 132 import sys 133 import select 134 import socket 135 136 try: 137 import cPickle as pickle 138 except ImportError: 139 import pickle 140 141 class AcknowledgementError(Exception): 142 pass 143 144 class Channel: 145 146 "A communications channel." 147 148 def __init__(self, pid, read_pipe, write_pipe): 149 150 """ 151 Initialise the channel with a process identifier 'pid', a 'read_pipe' 152 from which messages will be received, and a 'write_pipe' into which 153 messages will be sent. 154 """ 155 156 self.pid = pid 157 self.read_pipe = read_pipe 158 self.write_pipe = write_pipe 159 self.closed = 0 160 161 def __del__(self): 162 163 # Since signals don't work well with I/O, we close pipes and wait for 164 # created processes upon finalisation. 165 166 self.close() 167 168 def close(self): 169 170 "Explicitly close the channel." 171 172 if not self.closed: 173 self.closed = 1 174 self.read_pipe.close() 175 self.write_pipe.close() 176 #self.wait(os.WNOHANG) 177 178 def wait(self, options=0): 179 180 "Wait for the created process, if any, to exit." 181 182 if self.pid != 0: 183 try: 184 os.waitpid(self.pid, options) 185 except OSError: 186 pass 187 188 def _send(self, obj): 189 190 "Send the given object 'obj' through the channel." 191 192 pickle.dump(obj, self.write_pipe) 193 self.write_pipe.flush() 194 195 def send(self, obj): 196 197 """ 198 Send the given object 'obj' through the channel. Then wait for an 199 acknowledgement. (The acknowledgement makes the caller wait, thus 200 preventing processes from exiting and disrupting the communications 201 channel and losing data.) 202 """ 203 204 self._send(obj) 205 if self._receive() != "OK": 206 raise AcknowledgementError, obj 207 208 def _receive(self): 209 210 "Receive an object through the channel, returning the object." 211 212 obj = pickle.load(self.read_pipe) 213 if isinstance(obj, Exception): 214 raise obj 215 else: 216 return obj 217 218 def receive(self): 219 220 """ 221 Receive an object through the channel, returning the object. Send an 222 acknowledgement of receipt. (The acknowledgement makes the sender wait, 223 thus preventing processes from exiting and disrupting the communications 224 channel and losing data.) 225 """ 226 227 try: 228 obj = self._receive() 229 return obj 230 finally: 231 self._send("OK") 232 233 class Exchange: 234 235 """ 236 A communications exchange that can be used to detect channels which are 237 ready to communicate. 238 """ 239 240 def __init__(self, channels=None, autoclose=1): 241 242 """ 243 Initialise the exchange with an optional list of 'channels'. If the 244 optional 'autoclose' parameter is set to a false value, channels will 245 not be closed automatically when they are removed from the exchange - by 246 default they are closed when removed. 247 """ 248 249 self.autoclose = autoclose 250 self.readables = {} 251 self.removed = [] 252 self.poller = select.poll() 253 for channel in channels or []: 254 self.add(channel) 255 256 def add(self, channel): 257 258 "Add the given 'channel' to the exchange." 259 260 self.readables[channel.read_pipe.fileno()] = channel 261 self.poller.register(channel.read_pipe.fileno(), select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR) 262 263 def active(self): 264 265 "Return a list of active channels." 266 267 return self.readables.values() 268 269 def ready(self, timeout=None): 270 271 """ 272 Wait for a period of time specified by the optional 'timeout' (or until 273 communication is possible) and return a list of channels which are ready 274 to be read from. 275 """ 276 277 fds = self.poller.poll(timeout) 278 readables = [] 279 self.removed = [] 280 281 for fd, status in fds: 282 channel = self.readables[fd] 283 removed = 0 284 285 # Remove ended/error channels. 286 287 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR): 288 self.remove(channel) 289 self.removed.append(channel) 290 removed = 1 291 292 # Record readable channels. 293 294 if status & select.POLLIN: 295 if not (removed and self.autoclose): 296 readables.append(channel) 297 298 return readables 299 300 def remove(self, channel): 301 302 """ 303 Remove the given 'channel' from the exchange. 304 """ 305 306 del self.readables[channel.read_pipe.fileno()] 307 self.poller.unregister(channel.read_pipe.fileno()) 308 if self.autoclose: 309 channel.close() 310 channel.wait() 311 312 def create(): 313 314 """ 315 Create a new process, returning a communications channel to both the 316 creating process and the created process. 317 """ 318 319 parent, child = socket.socketpair() 320 for s in [parent, child]: 321 s.setblocking(1) 322 323 pid = os.fork() 324 if pid == 0: 325 parent.close() 326 return Channel(pid, child.makefile("r"), child.makefile("w")) 327 else: 328 child.close() 329 return Channel(pid, parent.makefile("r"), parent.makefile("w")) 330 331 def start(callable, *args, **kwargs): 332 333 """ 334 Create a new process which shall start running in the given 'callable'. 335 Return a communications channel to the creating process, and supply such a 336 channel to the created process as the 'channel' parameter in the given 337 'callable'. Additional arguments to the 'callable' can be given as 338 additional arguments to this function. 339 """ 340 341 channel = create() 342 if channel.pid == 0: 343 try: 344 try: 345 callable(channel, *args, **kwargs) 346 except: 347 exc_type, exc_value, exc_traceback = sys.exc_info() 348 channel.send(exc_value) 349 finally: 350 channel.close() 351 os._exit(0) 352 else: 353 return channel 354 355 def waitall(): 356 357 "Wait for all created processes to terminate." 358 359 try: 360 while 1: 361 os.wait() 362 except OSError: 363 pass 364 365 # vim: tabstop=4 expandtab shiftwidth=4