1 #!/usr/bin/env python 2 3 """ 4 A simple parallel processing API for Python, inspired somewhat by the thread 5 module, slightly less by pypar, and slightly less still by pypvm. 6 7 Thread-style Processing 8 ----------------------- 9 10 To create new processes to run a function or any callable object, specify the 11 "callable" and any arguments as follows: 12 13 channel = start(fn, arg1, arg2, named1=value1, named2=value2) 14 15 This returns a channel which can then be used to communicate with the created 16 process. Meanwhile, in the created process, the given callable will be invoked 17 with another channel as its first argument followed by the specified arguments: 18 19 def fn(channel, arg1, arg2, named1, named2): 20 # Read from and write to the channel. 21 # Return value is ignored. 22 ... 23 24 Fork-style Processing 25 --------------------- 26 27 To create new processes in a similar way to that employed when using os.fork 28 (ie. the fork system call on various operating systems), use the following 29 method: 30 31 channel = create() 32 if channel.pid == 0: 33 # This code is run by the created process. 34 # Read from and write to the channel to communicate with the 35 # creating/calling process. 36 # An explicit exit of the process may be desirable to prevent the process 37 # from running code which is intended for the creating/calling process. 38 ... 39 else: 40 # This code is run by the creating/calling process. 41 # Read from and write to the channel to communicate with the created 42 # process. 43 ... 44 45 Message Exchanges 46 ----------------- 47 48 When creating many processes, each providing results for the consumption of the 49 main process, the collection of those results in an efficient fashion can be 50 problematic: if some processes take longer than others, and if we decide to read 51 from those processes when they are not ready instead of other processes which 52 are ready, the whole activity will take much longer than necessary. 53 54 One solution to the problem of knowing when to read from channels is to create 55 an Exchange object, initialising it with a list of channels through which data 56 is expected to arrive: 57 58 exchange = Exchange(channels) 59 60 To test whether an exchange is active - that is, whether it is actually 61 monitoring any channels - we can use the active method which returns all 62 channels being monitored by the exchange: 63 64 channels = exchange.active() 65 66 We may then check the exchange to see whether any data is ready to be received; 67 for example: 68 69 for channel in exchange.ready(): 70 # Read from and write to the channel. 71 ... 72 73 If we do not wish to wait indefinitely for a list of channels, we can set a 74 timeout value as an argument to the ready method (as a floating point number 75 specifying the timeout in seconds, where 0 means a non-blocking poll as stated 76 in the select module's select function documentation). 77 78 Signals and Waiting 79 ------------------- 80 81 When created/child processes terminate, one would typically want to be informed 82 of such conditions using a signal handler. Unfortunately, Python seems to have 83 issues with restartable reads from file descriptors when interrupted by signals: 84 85 http://mail.python.org/pipermail/python-dev/2002-September/028572.html 86 http://twistedmatrix.com/bugs/issue733 87 88 Select and Poll 89 --------------- 90 91 The exact combination of conditions indicating closed pipes remains relatively 92 obscure. Here is a message/thread describing them (in the context of another 93 topic): 94 95 http://twistedmatrix.com/pipermail/twisted-python/2005-February/009666.html 96 """ 97 98 __version__ = "0.1" 99 100 import os 101 import sys 102 import select 103 import socket 104 105 try: 106 import cPickle as pickle 107 except ImportError: 108 import pickle 109 110 class Channel: 111 112 "A communications channel." 113 114 def __init__(self, pid, read_pipe, write_pipe): 115 116 """ 117 Initialise the channel with a process identifier 'pid', a 'read_pipe' 118 from which messages will be received, and a 'write_pipe' into which 119 messages will be sent. 120 """ 121 122 self.pid = pid 123 self.read_pipe = read_pipe 124 self.write_pipe = write_pipe 125 self.closed = 0 126 127 def __del__(self): 128 129 # Since signals don't work well with I/O, we close pipes and wait for 130 # created processes upon finalisation. 131 132 if not self.closed: 133 self.close() 134 135 def close(self): 136 137 "Explicitly close the channel." 138 139 self.closed = 1 140 self.read_pipe.close() 141 self.write_pipe.close() 142 self.wait(os.WNOHANG) 143 144 def wait(self, options=0): 145 146 "Wait for the created process, if any, to exit." 147 148 if self.pid != 0: 149 try: 150 os.waitpid(self.pid, options) 151 except OSError: 152 pass 153 154 def send(self, obj): 155 156 "Send the given object 'obj' through the channel." 157 158 pickle.dump(obj, self.write_pipe) 159 self.write_pipe.flush() 160 161 def receive(self): 162 163 "Receive an object through the channel, returning the object." 164 165 obj = pickle.load(self.read_pipe) 166 if isinstance(obj, Exception): 167 raise obj 168 else: 169 return obj 170 171 class Exchange: 172 173 """ 174 A communications exchange that can be used to detect channels which are 175 ready to communicate. 176 """ 177 178 def __init__(self, channels): 179 180 "Initialise the exchange with the given 'channels'." 181 182 self.readables = {} 183 self.poller = select.poll() 184 for channel in channels: 185 self.readables[channel.read_pipe.fileno()] = channel 186 self.poller.register(channel.read_pipe.fileno(), select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR) 187 188 def active(self): 189 190 "Return a list of active channels." 191 192 return self.readables.values() 193 194 def ready(self, timeout=None): 195 196 """ 197 Wait for a period of time specified by the optional 'timeout' (or until 198 communication is possible) and return a list of channels which are ready 199 to be read from. 200 """ 201 202 fds = self.poller.poll(timeout) 203 readables = [] 204 for fd, status in fds: 205 channel = self.readables[fd] 206 207 # Remove ended/error channels. 208 209 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR): 210 self.remove(channel) 211 212 # Record readable channels. 213 214 elif status & select.POLLIN: 215 readables.append(channel) 216 217 return readables 218 219 def remove(self, channel): 220 221 """ 222 Remove the given 'channel' from the exchange. 223 """ 224 225 del self.readables[channel.read_pipe.fileno()] 226 self.poller.unregister(channel.read_pipe.fileno()) 227 228 def create(): 229 230 """ 231 Create a new process, returning a communications channel to both the 232 creating process and the created process. 233 """ 234 235 parent, child = socket.socketpair() 236 for s in [parent, child]: 237 s.setblocking(1) 238 239 pid = os.fork() 240 if pid == 0: 241 parent.close() 242 return Channel(pid, child.makefile("r"), child.makefile("w")) 243 else: 244 child.close() 245 return Channel(pid, parent.makefile("r"), parent.makefile("w")) 246 247 def start(callable, *args, **kwargs): 248 249 """ 250 Create a new process which shall start running in the given 'callable'. 251 Return a communications channel to the creating process, and supply such a 252 channel to the created process as the 'channel' parameter in the given 253 'callable'. Additional arguments to the 'callable' can be given as 254 additional arguments to this function. 255 """ 256 257 channel = create() 258 if channel.pid == 0: 259 try: 260 try: 261 callable(channel, *args, **kwargs) 262 except: 263 exc_type, exc_value, exc_traceback = sys.exc_info() 264 channel.send(exc_value) 265 finally: 266 channel.close() 267 sys.exit(0) 268 else: 269 return channel 270 271 def waitall(): 272 273 "Wait for all created processes to terminate." 274 275 try: 276 while 1: 277 os.wait() 278 except OSError: 279 pass 280 281 # vim: tabstop=4 expandtab shiftwidth=4