1 #!/usr/bin/env python 2 3 """ 4 A simple parallel processing API for Python, inspired somewhat by the thread 5 module, slightly less by pypar, and slightly less still by pypvm. 6 7 Thread-style Processing 8 ----------------------- 9 10 To create new processes to run a function or any callable object, specify the 11 "callable" and any arguments as follows: 12 13 channel = start(fn, arg1, arg2, named1=value1, named2=value2) 14 15 This returns a channel which can then be used to communicate with the created 16 process. Meanwhile, in the created process, the given callable will be invoked 17 with another channel as its first argument followed by the specified arguments: 18 19 def fn(channel, arg1, arg2, named1, named2): 20 # Read from and write to the channel. 21 # Return value is ignored. 22 ... 23 24 Fork-style Processing 25 --------------------- 26 27 To create new processes in a similar way to that employed when using os.fork 28 (ie. the fork system call on various operating systems), use the following 29 method: 30 31 channel = create() 32 if channel.pid == 0: 33 # This code is run by the created process. 34 # Read from and write to the channel to communicate with the 35 # creating/calling process. 36 # An explicit exit of the process may be desirable to prevent the process 37 # from running code which is intended for the creating/calling process. 38 ... 39 else: 40 # This code is run by the creating/calling process. 41 # Read from and write to the channel to communicate with the created 42 # process. 43 ... 44 45 Message Exchanges 46 ----------------- 47 48 When creating many processes, each providing results for the consumption of the 49 main process, the collection of those results in an efficient fashion can be 50 problematic: if some processes take longer than others, and if we decide to read 51 from those processes when they are not ready instead of other processes which 52 are ready, the whole activity will take much longer than necessary. 53 54 One solution to the problem of knowing when to read from channels is to create 55 an Exchange object, initialising it with a list of channels through which data 56 is expected to arrive: 57 58 exchange = Exchange(channels) 59 60 We may then check the exchange to see whether any data is ready to be received; 61 for example: 62 63 for channel in exchange.ready(): 64 # Read from and write to the channel. 65 ... 66 67 If we do not wish to wait indefinitely for a list of channels, we can set a 68 timeout value as an argument to the ready method (as a floating point number 69 specifying the timeout in seconds, where 0 means a non-blocking poll as stated 70 in the select module's select function documentation). 71 """ 72 73 import os 74 import sys 75 from select import select 76 from signal import signal, SIGCHLD 77 78 try: 79 import cPickle as pickle 80 except ImportError: 81 import pickle 82 83 class Channel: 84 85 "A communications channel." 86 87 def __init__(self, pid, read_pipe, write_pipe): 88 89 """ 90 Initialise the channel with a process identifier 'pid', a 'read_pipe' 91 from which messages will be received, and a 'write_pipe' into which 92 messages will be sent. 93 """ 94 95 self.pid = pid 96 self.read_pipe = read_pipe 97 self.write_pipe = write_pipe 98 99 def send(self, obj): 100 101 "Send the given object 'obj' through the channel." 102 103 pickle.dump(obj, self.write_pipe) 104 self.write_pipe.flush() 105 106 def receive(self): 107 108 "Receive an object through the channel, returning the object." 109 110 obj = pickle.load(self.read_pipe) 111 if isinstance(obj, Exception): 112 raise obj 113 else: 114 return obj 115 116 class Exchange: 117 118 """ 119 A communications exchange that can be used to detect channels which are 120 ready to communicate. 121 """ 122 123 def __init__(self, channels): 124 125 "Initialise the exchange with the given 'channels'." 126 127 self.readables = {} 128 for channel in channels: 129 self.readables[channel.read_pipe] = channel 130 131 def ready(self, timeout=None): 132 133 """ 134 Wait for a period of time specified by the optional 'timeout' (or until 135 communication is possible) and return a list of channels which are ready 136 to be read from. 137 """ 138 139 if timeout is not None: 140 t = select(self.readables.keys(), [], [], timeout) 141 else: 142 t = select(self.readables.keys(), [], []) 143 144 readable_fds, writable_fds, exceptional_fds = t 145 readable = [self.readables[fd] for fd in readable_fds] 146 return readable 147 148 def create(): 149 150 """ 151 Create a new process, returning a communications channel to both the 152 creating process and the created process. 153 """ 154 155 parent_read_fd, child_write_fd = os.pipe() 156 child_read_fd, parent_write_fd = os.pipe() 157 158 pid = os.fork() 159 if pid == 0: 160 return Channel(pid, os.fdopen(child_read_fd, "r"), os.fdopen(child_write_fd, "w")) 161 else: 162 return Channel(pid, os.fdopen(parent_read_fd, "r"), os.fdopen(parent_write_fd, "w")) 163 164 def start(callable, *args, **kwargs): 165 166 """ 167 Create a new process which shall start running in the given 'callable'. 168 Return a communications channel to the creating process, and supply such a 169 channel to the created process as the 'channel' parameter in the given 170 'callable'. Additional arguments to the 'callable' can be given as 171 additional arguments to this function. 172 """ 173 174 channel = create() 175 if channel.pid == 0: 176 try: 177 try: 178 callable(channel, *args, **kwargs) 179 except: 180 exc_type, exc_value, exc_traceback = sys.exc_info() 181 channel.send(exc_value) 182 finally: 183 sys.exit(0) 184 else: 185 return channel 186 187 # Define and install a handler which waits for terminated child processes. 188 189 def handler(number, frame): 190 os.wait() 191 192 signal(SIGCHLD, handler) 193 194 # vim: tabstop=4 expandtab shiftwidth=4