1 #!/usr/bin/env python 2 3 """ 4 A simple parallel processing API for Python, inspired somewhat by the thread 5 module, slightly less by pypar, and slightly less still by pypvm. 6 7 Thread-style Processing 8 ----------------------- 9 10 To create new processes to run a function or any callable object, specify the 11 "callable" and any arguments as follows: 12 13 channel = start(fn, arg1, arg2, named1=value1, named2=value2) 14 15 This returns a channel which can then be used to communicate with the created 16 process. Meanwhile, in the created process, the given callable will be invoked 17 with another channel as its first argument followed by the specified arguments: 18 19 def fn(channel, arg1, arg2, named1, named2): 20 # Read from and write to the channel. 21 # Return value is ignored. 22 ... 23 24 Fork-style Processing 25 --------------------- 26 27 To create new processes in a similar way to that employed when using os.fork 28 (ie. the fork system call on various operating systems), use the following 29 method: 30 31 channel = create() 32 if channel.pid == 0: 33 # This code is run by the created process. 34 # Read from and write to the channel to communicate with the 35 # creating/calling process. 36 # An explicit exit of the process may be desirable to prevent the process 37 # from running code which is intended for the creating/calling process. 38 ... 39 else: 40 # This code is run by the creating/calling process. 41 # Read from and write to the channel to communicate with the created 42 # process. 43 ... 44 45 Message Exchanges 46 ----------------- 47 48 When creating many processes, each providing results for the consumption of the 49 main process, the collection of those results in an efficient fashion can be 50 problematic: if some processes take longer than others, and if we decide to read 51 from those processes when they are not ready instead of other processes which 52 are ready, the whole activity will take much longer than necessary. 53 54 One solution to the problem of knowing when to read from channels is to create 55 an Exchange object, initialising it with a list of channels through which data 56 is expected to arrive: 57 58 exchange = Exchange(channels) 59 60 We may then check the exchange to see whether any data is ready to be received; 61 for example: 62 63 for channel in exchange.ready(): 64 # Read from and write to the channel. 65 ... 66 67 If we do not wish to wait indefinitely for a list of channels, we can set a 68 timeout value as an argument to the ready method (as a floating point number 69 specifying the timeout in seconds, where 0 means a non-blocking poll as stated 70 in the select module's select function documentation). 71 """ 72 73 import os 74 import sys 75 import select 76 from signal import signal, SIGCHLD 77 78 try: 79 import cPickle as pickle 80 except ImportError: 81 import pickle 82 83 class Channel: 84 85 "A communications channel." 86 87 def __init__(self, pid, read_pipe, write_pipe): 88 89 """ 90 Initialise the channel with a process identifier 'pid', a 'read_pipe' 91 from which messages will be received, and a 'write_pipe' into which 92 messages will be sent. 93 """ 94 95 self.pid = pid 96 self.read_pipe = read_pipe 97 self.write_pipe = write_pipe 98 99 def __del__(self): 100 101 # NOTE: Hack until the signals vs. pipes behaviour is fixed. 102 103 if self.pid != 0: 104 try: 105 os.wait() 106 except OSError: 107 pass 108 109 def send(self, obj): 110 111 "Send the given object 'obj' through the channel." 112 113 pickle.dump(obj, self.write_pipe) 114 self.write_pipe.flush() 115 116 def receive(self): 117 118 "Receive an object through the channel, returning the object." 119 120 obj = pickle.load(self.read_pipe) 121 if isinstance(obj, Exception): 122 raise obj 123 else: 124 return obj 125 126 class Exchange: 127 128 """ 129 A communications exchange that can be used to detect channels which are 130 ready to communicate. 131 """ 132 133 def __init__(self, channels): 134 135 "Initialise the exchange with the given 'channels'." 136 137 self.readables = {} 138 for channel in channels: 139 self.readables[channel.read_pipe] = channel 140 141 def ready(self, timeout=None): 142 143 """ 144 Wait for a period of time specified by the optional 'timeout' (or until 145 communication is possible) and return a list of channels which are ready 146 to be read from. 147 """ 148 149 try: 150 if timeout is not None: 151 t = select.select(self.readables.keys(), [], [], timeout) 152 else: 153 t = select.select(self.readables.keys(), [], []) 154 except select.error: 155 return [] 156 157 readable_fds, writable_fds, exceptional_fds = t 158 readable = [self.readables[fd] for fd in readable_fds] 159 return readable 160 161 def create(): 162 163 """ 164 Create a new process, returning a communications channel to both the 165 creating process and the created process. 166 """ 167 168 parent_read_fd, child_write_fd = os.pipe() 169 child_read_fd, parent_write_fd = os.pipe() 170 171 pid = os.fork() 172 if pid == 0: 173 return Channel(pid, os.fdopen(child_read_fd, "r"), os.fdopen(child_write_fd, "w")) 174 else: 175 return Channel(pid, os.fdopen(parent_read_fd, "r"), os.fdopen(parent_write_fd, "w")) 176 177 def start(callable, *args, **kwargs): 178 179 """ 180 Create a new process which shall start running in the given 'callable'. 181 Return a communications channel to the creating process, and supply such a 182 channel to the created process as the 'channel' parameter in the given 183 'callable'. Additional arguments to the 'callable' can be given as 184 additional arguments to this function. 185 """ 186 187 channel = create() 188 if channel.pid == 0: 189 try: 190 try: 191 callable(channel, *args, **kwargs) 192 except: 193 exc_type, exc_value, exc_traceback = sys.exc_info() 194 channel.send(exc_value) 195 finally: 196 sys.exit(0) 197 else: 198 return channel 199 200 # Define and install a handler which waits for terminated child processes. 201 202 #def handler(number, frame): 203 # os.wait() 204 205 #signal(SIGCHLD, handler) 206 207 # vim: tabstop=4 expandtab shiftwidth=4