1 #!/usr/bin/env python 2 3 """ 4 A simple parallel processing API for Python, inspired somewhat by the thread 5 module, slightly less by pypar, and slightly less still by pypvm. 6 7 Thread-style Processing 8 ----------------------- 9 10 To create new processes to run a function or any callable object, specify the 11 "callable" and any arguments as follows: 12 13 channel = start(fn, arg1, arg2, named1=value1, named2=value2) 14 15 This returns a channel which can then be used to communicate with the created 16 process. Meanwhile, in the created process, the given callable will be invoked 17 with another channel as its first argument followed by the specified arguments: 18 19 def fn(channel, arg1, arg2, named1, named2): 20 # Read from and write to the channel. 21 # Return value is ignored. 22 ... 23 24 Fork-style Processing 25 --------------------- 26 27 To create new processes in a similar way to that employed when using os.fork 28 (ie. the fork system call on various operating systems), use the following 29 method: 30 31 channel = create() 32 if channel.pid == 0: 33 # This code is run by the created process. 34 # Read from and write to the channel to communicate with the 35 # creating/calling process. 36 # An explicit exit of the process may be desirable to prevent the process 37 # from running code which is intended for the creating/calling process. 38 ... 39 else: 40 # This code is run by the creating/calling process. 41 # Read from and write to the channel to communicate with the created 42 # process. 43 ... 44 45 Message Exchanges 46 ----------------- 47 48 When creating many processes, each providing results for the consumption of the 49 main process, the collection of those results in an efficient fashion can be 50 problematic: if some processes take longer than others, and if we decide to read 51 from those processes when they are not ready instead of other processes which 52 are ready, the whole activity will take much longer than necessary. 53 54 One solution to the problem of knowing when to read from channels is to create 55 an Exchange object, initialising it with a list of channels through which data 56 is expected to arrive: 57 58 exchange = Exchange(channels) 59 60 We may then check the exchange to see whether any data is ready to be received; 61 for example: 62 63 for channel in exchange.ready(): 64 # Read from and write to the channel. 65 ... 66 67 If we do not wish to wait indefinitely for a list of channels, we can set a 68 timeout value as an argument to the ready method (as a floating point number 69 specifying the timeout in seconds, where 0 means a non-blocking poll as stated 70 in the select module's select function documentation). 71 72 Signals and Waiting 73 ------------------- 74 75 When created/child processes terminate, one would typically want to be informed 76 of such conditions using a signal handler. Unfortunately, Python seems to have 77 issues with restartable reads from file descriptors when interrupted by signals: 78 79 http://mail.python.org/pipermail/python-dev/2002-September/028572.html 80 http://twistedmatrix.com/bugs/issue733 81 82 Select and Poll 83 --------------- 84 85 The exact combination of conditions indicating closed pipes remains relatively 86 obscure. Here is a message/thread describing them (in the context of another 87 topic): 88 89 http://twistedmatrix.com/pipermail/twisted-python/2005-February/009666.html 90 """ 91 92 import os 93 import sys 94 import select 95 import socket 96 97 try: 98 import cPickle as pickle 99 except ImportError: 100 import pickle 101 102 class Channel: 103 104 "A communications channel." 105 106 def __init__(self, pid, read_pipe, write_pipe): 107 108 """ 109 Initialise the channel with a process identifier 'pid', a 'read_pipe' 110 from which messages will be received, and a 'write_pipe' into which 111 messages will be sent. 112 """ 113 114 self.pid = pid 115 self.read_pipe = read_pipe 116 self.write_pipe = write_pipe 117 self.closed = 0 118 119 def __del__(self): 120 121 # Since signals don't work well with I/O, we close pipes and wait for 122 # created processes upon finalisation. 123 124 if not self.closed: 125 self.close() 126 127 def close(self): 128 129 "Explicitly close the channel." 130 131 self.closed = 1 132 self.read_pipe.close() 133 self.write_pipe.close() 134 self.wait(os.WNOHANG) 135 136 def wait(self, options=0): 137 138 "Wait for the created process, if any, to exit." 139 140 if self.pid != 0: 141 try: 142 os.waitpid(self.pid, options) 143 except OSError: 144 pass 145 146 def send(self, obj): 147 148 "Send the given object 'obj' through the channel." 149 150 pickle.dump(obj, self.write_pipe) 151 self.write_pipe.flush() 152 153 def receive(self): 154 155 "Receive an object through the channel, returning the object." 156 157 obj = pickle.load(self.read_pipe) 158 if isinstance(obj, Exception): 159 raise obj 160 else: 161 return obj 162 163 class Exchange: 164 165 """ 166 A communications exchange that can be used to detect channels which are 167 ready to communicate. 168 """ 169 170 def __init__(self, channels): 171 172 "Initialise the exchange with the given 'channels'." 173 174 self.readables = {} 175 self.poller = select.poll() 176 for channel in channels: 177 self.readables[channel.read_pipe.fileno()] = channel 178 self.poller.register(channel.read_pipe.fileno(), select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR) 179 180 def active(self): 181 182 "Return the number of active channels." 183 184 return len(self.readables.keys()) 185 186 def ready(self, timeout=None): 187 188 """ 189 Wait for a period of time specified by the optional 'timeout' (or until 190 communication is possible) and return a list of channels which are ready 191 to be read from. 192 """ 193 194 fds = self.poller.poll(timeout) 195 readables = [] 196 for fd, status in fds: 197 channel = self.readables[fd] 198 199 # Remove ended/error channels. 200 201 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR): 202 self.remove(channel) 203 204 # Record readable channels. 205 206 elif status & select.POLLIN: 207 readables.append(channel) 208 209 return readables 210 211 def remove(self, channel): 212 213 """ 214 Remove the given 'channel' from the exchange. 215 """ 216 217 del self.readables[channel.read_pipe.fileno()] 218 self.poller.unregister(channel.read_pipe.fileno()) 219 220 def create(): 221 222 """ 223 Create a new process, returning a communications channel to both the 224 creating process and the created process. 225 """ 226 227 parent, child = socket.socketpair() 228 for s in [parent, child]: 229 s.setblocking(1) 230 231 pid = os.fork() 232 if pid == 0: 233 parent.close() 234 return Channel(pid, child.makefile("r"), child.makefile("w")) 235 else: 236 child.close() 237 return Channel(pid, parent.makefile("r"), parent.makefile("w")) 238 239 def start(callable, *args, **kwargs): 240 241 """ 242 Create a new process which shall start running in the given 'callable'. 243 Return a communications channel to the creating process, and supply such a 244 channel to the created process as the 'channel' parameter in the given 245 'callable'. Additional arguments to the 'callable' can be given as 246 additional arguments to this function. 247 """ 248 249 channel = create() 250 if channel.pid == 0: 251 try: 252 try: 253 callable(channel, *args, **kwargs) 254 except: 255 exc_type, exc_value, exc_traceback = sys.exc_info() 256 channel.send(exc_value) 257 finally: 258 channel.close() 259 sys.exit(0) 260 else: 261 return channel 262 263 def waitall(): 264 265 "Wait for all created processes to terminate." 266 267 try: 268 while 1: 269 os.wait() 270 except OSError: 271 pass 272 273 # vim: tabstop=4 expandtab shiftwidth=4