1 #!/usr/bin/env python 2 3 """ 4 A simple parallel processing API for Python, inspired somewhat by the thread 5 module, slightly less by pypar, and slightly less still by pypvm. 6 7 Thread-style Processing 8 ----------------------- 9 10 To create new processes to run a function or any callable object, specify the 11 "callable" and any arguments as follows: 12 13 channel = start(fn, arg1, arg2, named1=value1, named2=value2) 14 15 This returns a channel which can then be used to communicate with the created 16 process. Meanwhile, in the created process, the given callable will be invoked 17 with another channel as its first argument followed by the specified arguments: 18 19 def fn(channel, arg1, arg2, named1, named2): 20 # Read from and write to the channel. 21 # Return value is ignored. 22 ... 23 24 Fork-style Processing 25 --------------------- 26 27 To create new processes in a similar way to that employed when using os.fork 28 (ie. the fork system call on various operating systems), use the following 29 method: 30 31 channel = create() 32 if channel.pid == 0: 33 # This code is run by the created process. 34 # Read from and write to the channel to communicate with the 35 # creating/calling process. 36 # An explicit exit of the process may be desirable to prevent the process 37 # from running code which is intended for the creating/calling process. 38 ... 39 else: 40 # This code is run by the creating/calling process. 41 # Read from and write to the channel to communicate with the created 42 # process. 43 ... 44 45 Message Exchanges 46 ----------------- 47 48 When creating many processes, each providing results for the consumption of the 49 main process, the collection of those results in an efficient fashion can be 50 problematic: if some processes take longer than others, and if we decide to read 51 from those processes when they are not ready instead of other processes which 52 are ready, the whole activity will take much longer than necessary. 53 54 One solution to the problem of knowing when to read from channels is to create 55 an Exchange object, initialising it with a list of channels through which data 56 is expected to arrive: 57 58 exchange = Exchange(channels) 59 60 We may then check the exchange to see whether any data is ready to be received; 61 for example: 62 63 for channel in exchange.ready(): 64 # Read from and write to the channel. 65 ... 66 67 If we do not wish to wait indefinitely for a list of channels, we can set a 68 timeout value as an argument to the ready method (as a floating point number 69 specifying the timeout in seconds, where 0 means a non-blocking poll as stated 70 in the select module's select function documentation). 71 72 Signals and Waiting 73 ------------------- 74 75 When created/child processes terminate, one would typically want to be informed 76 of such conditions using a signal handler. Unfortunately, Python seems to have 77 issues with restartable reads from file descriptors when interrupted by signals: 78 79 http://mail.python.org/pipermail/python-dev/2002-September/028572.html 80 http://twistedmatrix.com/bugs/issue733 81 """ 82 83 import os 84 import sys 85 import select 86 #from signal import signal, SIGCHLD 87 88 try: 89 import cPickle as pickle 90 except ImportError: 91 import pickle 92 93 class Channel: 94 95 "A communications channel." 96 97 def __init__(self, pid, read_pipe, write_pipe): 98 99 """ 100 Initialise the channel with a process identifier 'pid', a 'read_pipe' 101 from which messages will be received, and a 'write_pipe' into which 102 messages will be sent. 103 """ 104 105 self.pid = pid 106 self.read_pipe = read_pipe 107 self.write_pipe = write_pipe 108 self.closed = 0 109 110 def __del__(self): 111 112 # NOTE: Hack until the signals vs. pipes behaviour is fixed. 113 114 if not self.closed: 115 self.close() 116 117 def close(self): 118 119 "Explicitly close the channel." 120 121 self.closed = 1 122 self.read_pipe.close() 123 self.write_pipe.close() 124 self.wait() 125 126 def wait(self): 127 128 "Wait for the created process, if any, to exit." 129 130 if self.pid != 0: 131 try: 132 os.waitpid(self.pid, os.WNOHANG) 133 except OSError: 134 pass 135 136 def send(self, obj): 137 138 "Send the given object 'obj' through the channel." 139 140 pickle.dump(obj, self.write_pipe) 141 self.write_pipe.flush() 142 143 def receive(self): 144 145 "Receive an object through the channel, returning the object." 146 147 obj = pickle.load(self.read_pipe) 148 if isinstance(obj, Exception): 149 raise obj 150 else: 151 return obj 152 153 class Exchange: 154 155 """ 156 A communications exchange that can be used to detect channels which are 157 ready to communicate. 158 """ 159 160 def __init__(self, channels): 161 162 "Initialise the exchange with the given 'channels'." 163 164 self.readables = {} 165 self.poller = select.poll() 166 for channel in channels: 167 self.readables[channel.read_pipe.fileno()] = channel 168 self.poller.register(channel.read_pipe.fileno(), select.POLLIN | select.POLLOUT | select.POLLHUP | select.POLLNVAL | select.POLLERR) 169 170 def active(self): 171 172 "Return the number of active channels." 173 174 return len(self.readables.keys()) 175 176 def ready(self, timeout=None): 177 178 """ 179 Wait for a period of time specified by the optional 'timeout' (or until 180 communication is possible) and return a list of channels which are ready 181 to be read from. 182 """ 183 184 fds = self.poller.poll(timeout) 185 readables = [] 186 for fd, status in fds: 187 channel = self.readables[fd] 188 189 # Record readable channels. 190 191 if status & select.POLLIN: 192 readables.append(channel) 193 194 # Remove ended/error channels. 195 196 if status & (select.POLLOUT | select.POLLHUP | select.POLLNVAL | select.POLLERR): 197 self.remove(channel) 198 199 return readables 200 201 def remove(self, channel): 202 203 """ 204 Remove the given 'channel' from the exchange. 205 """ 206 207 del self.readables[channel.read_pipe.fileno()] 208 self.poller.unregister(channel.read_pipe.fileno()) 209 210 def create(): 211 212 """ 213 Create a new process, returning a communications channel to both the 214 creating process and the created process. 215 """ 216 217 parent_read_fd, child_write_fd = os.pipe() 218 child_read_fd, parent_write_fd = os.pipe() 219 220 pid = os.fork() 221 if pid == 0: 222 return Channel(pid, os.fdopen(child_read_fd, "r"), os.fdopen(child_write_fd, "w")) 223 else: 224 return Channel(pid, os.fdopen(parent_read_fd, "r"), os.fdopen(parent_write_fd, "w")) 225 226 def start(callable, *args, **kwargs): 227 228 """ 229 Create a new process which shall start running in the given 'callable'. 230 Return a communications channel to the creating process, and supply such a 231 channel to the created process as the 'channel' parameter in the given 232 'callable'. Additional arguments to the 'callable' can be given as 233 additional arguments to this function. 234 """ 235 236 channel = create() 237 if channel.pid == 0: 238 try: 239 try: 240 callable(channel, *args, **kwargs) 241 except: 242 exc_type, exc_value, exc_traceback = sys.exc_info() 243 channel.send(exc_value) 244 finally: 245 channel.close() 246 raise SystemExit 247 else: 248 return channel 249 250 # Define and install a handler which waits for terminated child processes. 251 252 #def handler(number, frame): 253 # os.wait() 254 255 #signal(SIGCHLD, handler) 256 257 # vim: tabstop=4 expandtab shiftwidth=4