1 #!/usr/bin/env python 2 3 """ 4 A simple parallel processing API for Python, inspired somewhat by the thread 5 module, slightly less by pypar, and slightly less still by pypvm. 6 7 Thread-style Processing 8 ----------------------- 9 10 To create new processes to run a function or any callable object, specify the 11 "callable" and any arguments as follows: 12 13 channel = start(fn, arg1, arg2, named1=value1, named2=value2) 14 15 This returns a channel which can then be used to communicate with the created 16 process. Meanwhile, in the created process, the given callable will be invoked 17 with another channel as its first argument followed by the specified arguments: 18 19 def fn(channel, arg1, arg2, named1, named2): 20 # Read from and write to the channel. 21 # Return value is ignored. 22 ... 23 24 Fork-style Processing 25 --------------------- 26 27 To create new processes in a similar way to that employed when using os.fork 28 (ie. the fork system call on various operating systems), use the following 29 method: 30 31 channel = create() 32 if channel.pid == 0: 33 # This code is run by the created process. 34 # Read from and write to the channel to communicate with the 35 # creating/calling process. 36 # An explicit exit of the process may be desirable to prevent the process 37 # from running code which is intended for the creating/calling process. 38 ... 39 else: 40 # This code is run by the creating/calling process. 41 # Read from and write to the channel to communicate with the created 42 # process. 43 ... 44 45 Message Exchanges 46 ----------------- 47 48 When creating many processes, each providing results for the consumption of the 49 main process, the collection of those results in an efficient fashion can be 50 problematic: if some processes take longer than others, and if we decide to read 51 from those processes when they are not ready instead of other processes which 52 are ready, the whole activity will take much longer than necessary. 53 54 One solution to the problem of knowing when to read from channels is to create 55 an Exchange object, initialising it with a list of channels through which data 56 is expected to arrive: 57 58 exchange = Exchange(channels) 59 60 We may then check the exchange to see whether any data is ready to be received; 61 for example: 62 63 for channel in exchange.ready(): 64 # Read from and write to the channel. 65 ... 66 67 If we do not wish to wait indefinitely for a list of channels, we can set a 68 timeout value as an argument to the ready method (as a floating point number 69 specifying the timeout in seconds, where 0 means a non-blocking poll as stated 70 in the select module's select function documentation). 71 72 Signals and Waiting 73 ------------------- 74 75 When created/child processes terminate, one would typically want to be informed 76 of such conditions using a signal handler. Unfortunately, Python seems to have 77 issues with restartable reads from file descriptors when interrupted by signals: 78 79 http://mail.python.org/pipermail/python-dev/2002-September/028572.html 80 """ 81 82 import os 83 import sys 84 import select 85 #from signal import signal, SIGCHLD 86 87 try: 88 import cPickle as pickle 89 except ImportError: 90 import pickle 91 92 class Channel: 93 94 "A communications channel." 95 96 def __init__(self, pid, read_pipe, write_pipe): 97 98 """ 99 Initialise the channel with a process identifier 'pid', a 'read_pipe' 100 from which messages will be received, and a 'write_pipe' into which 101 messages will be sent. 102 """ 103 104 self.pid = pid 105 self.read_pipe = read_pipe 106 self.write_pipe = write_pipe 107 self.closed = 0 108 109 def __del__(self): 110 111 # NOTE: Hack until the signals vs. pipes behaviour is fixed. 112 113 if not self.closed: 114 self.close() 115 116 def close(self): 117 118 "Explicitly close the channel." 119 120 self.closed = 1 121 self.read_pipe.close() 122 self.write_pipe.close() 123 if self.pid != 0: 124 try: 125 os.waitpid(self.pid, os.WNOHANG) 126 except OSError: 127 pass 128 129 def send(self, obj): 130 131 "Send the given object 'obj' through the channel." 132 133 pickle.dump(obj, self.write_pipe) 134 self.write_pipe.flush() 135 136 def receive(self): 137 138 "Receive an object through the channel, returning the object." 139 140 obj = pickle.load(self.read_pipe) 141 if isinstance(obj, Exception): 142 raise obj 143 else: 144 return obj 145 146 class Exchange: 147 148 """ 149 A communications exchange that can be used to detect channels which are 150 ready to communicate. 151 """ 152 153 def __init__(self, channels): 154 155 "Initialise the exchange with the given 'channels'." 156 157 self.readables = {} 158 for channel in channels: 159 self.readables[channel.read_pipe] = channel 160 161 def active(self): 162 163 "Return the number of active channels." 164 165 return len(self.readables.keys()) 166 167 def ready(self, timeout=None): 168 169 """ 170 Wait for a period of time specified by the optional 'timeout' (or until 171 communication is possible) and return a list of channels which are ready 172 to be read from. 173 """ 174 175 try: 176 if timeout is not None: 177 t = select.select(self.readables.keys(), [], [], timeout) 178 else: 179 t = select.select(self.readables.keys(), [], []) 180 except select.error: 181 return [] 182 183 readable_fds, writable_fds, exceptional_fds = t 184 readable = [self.readables[fd] for fd in readable_fds] 185 return readable 186 187 def remove(self, channel): 188 189 """ 190 Remove the given 'channel' from the exchange. 191 """ 192 193 del self.readables[channel.read_pipe] 194 195 def create(): 196 197 """ 198 Create a new process, returning a communications channel to both the 199 creating process and the created process. 200 """ 201 202 parent_read_fd, child_write_fd = os.pipe() 203 child_read_fd, parent_write_fd = os.pipe() 204 205 pid = os.fork() 206 if pid == 0: 207 return Channel(pid, os.fdopen(child_read_fd, "r"), os.fdopen(child_write_fd, "w")) 208 else: 209 return Channel(pid, os.fdopen(parent_read_fd, "r"), os.fdopen(parent_write_fd, "w")) 210 211 def start(callable, *args, **kwargs): 212 213 """ 214 Create a new process which shall start running in the given 'callable'. 215 Return a communications channel to the creating process, and supply such a 216 channel to the created process as the 'channel' parameter in the given 217 'callable'. Additional arguments to the 'callable' can be given as 218 additional arguments to this function. 219 """ 220 221 channel = create() 222 if channel.pid == 0: 223 try: 224 try: 225 callable(channel, *args, **kwargs) 226 except: 227 exc_type, exc_value, exc_traceback = sys.exc_info() 228 channel.send(exc_value) 229 finally: 230 sys.exit(0) 231 else: 232 return channel 233 234 # Define and install a handler which waits for terminated child processes. 235 236 #def handler(number, frame): 237 # os.wait() 238 239 #signal(SIGCHLD, handler) 240 241 # vim: tabstop=4 expandtab shiftwidth=4