1.1 --- a/pprocess.py Fri Sep 14 23:16:24 2007 +0000
1.2 +++ b/pprocess.py Sat Sep 15 00:11:23 2007 +0000
1.3 @@ -52,7 +52,7 @@
1.4 (ie. the fork system call on various operating systems), use the following
1.5 method:
1.6
1.7 -channel = create()
1.8 +channel = pprocess.create()
1.9 if channel.pid == 0:
1.10 # This code is run by the created process.
1.11 # Read from and write to the channel to communicate with the
1.12 @@ -60,6 +60,7 @@
1.13 # An explicit exit of the process may be desirable to prevent the process
1.14 # from running code which is intended for the creating/calling process.
1.15 ...
1.16 + pprocess.exit(channel)
1.17 else:
1.18 # This code is run by the creating/calling process.
1.19 # Read from and write to the channel to communicate with the created
1.20 @@ -130,6 +131,10 @@
1.21 exchange.add_wait(channel) # add a channel, waiting if the limit would be
1.22 # exceeded
1.23
1.24 +Or we can request that the exchange create a channel on our behalf:
1.25 +
1.26 +channel = exchange.create()
1.27 +
1.28 We can even start processes and monitor channels without ever handling the
1.29 channel ourselves:
1.30
1.31 @@ -149,6 +154,35 @@
1.32 exchange API as described above. However, it permits much simpler and clearer
1.33 code.
1.34
1.35 +Exchanges as Queues
1.36 +-------------------
1.37 +
1.38 +Instead of having to subclass the pprocess.Exchange class and to define the
1.39 +store_data method, it might be more desirable to let the exchange manage the
1.40 +communications between created and creating processes and to let the creating
1.41 +process just consume received data as it arrives, without particular regard for
1.42 +the order of the received data - perhaps the creating process has its own way of
1.43 +managing such issues.
1.44 +
1.45 +For such situations, the Queue class may be instantiated and channels added to
1.46 +the queue using the various methods provided:
1.47 +
1.48 +queue = pprocess.Queue(limit=10)
1.49 +channel = queue.create()
1.50 +if channel:
1.51 + # Do some computation.
1.52 + pprocess.exit(channel)
1.53 +
1.54 +The results can then be consumed by treating the queue like an iterator:
1.55 +
1.56 +for result in queue:
1.57 + # Capture each result.
1.58 +
1.59 +This approach does not, of course, require the direct handling of channels. One
1.60 +could instead use the start method on the queue to create processes and to
1.61 +initiate computations (since a queue is merely an enhanced exchange with a
1.62 +specific implementation of the store_data method).
1.63 +
1.64 Managed Callables
1.65 -----------------
1.66
1.67 @@ -473,6 +507,22 @@
1.68
1.69 self.add_wait(start(callable, *args, **kw))
1.70
1.71 + def create(self):
1.72 +
1.73 + """
1.74 + Using pprocess.create, create a new process and return the created
1.75 + communications channel to the created process. In the creating process,
1.76 + return None - the channel receiving data from the created process will
1.77 + be automatically managed by this exchange.
1.78 + """
1.79 +
1.80 + channel = create()
1.81 + if channel.pid == 0:
1.82 + return channel
1.83 + else:
1.84 + self.add_wait(channel)
1.85 + return None
1.86 +
1.87 def manage(self, callable):
1.88
1.89 """
1.90 @@ -553,6 +603,40 @@
1.91 self.results[self.channels[channel]] = data
1.92 del self.channels[channel]
1.93
1.94 +class Queue(Exchange):
1.95 +
1.96 + """
1.97 + An exchange acting as a queue, making data from created processes available
1.98 + in the order in which it is received.
1.99 + """
1.100 +
1.101 + def __init__(self, *args, **kw):
1.102 + Exchange.__init__(self, *args, **kw)
1.103 + self.queue = []
1.104 +
1.105 + def store_data(self, channel):
1.106 +
1.107 + "Accumulate the incoming data, associating results with channels."
1.108 +
1.109 + data = channel.receive()
1.110 + self.queue.insert(0, data)
1.111 +
1.112 + def __iter__(self):
1.113 + return self
1.114 +
1.115 + def next(self):
1.116 +
1.117 + "Return the next element in the queue."
1.118 +
1.119 + if self.queue:
1.120 + return self.queue.pop()
1.121 + while self.active():
1.122 + self.store()
1.123 + if self.queue:
1.124 + return self.queue.pop()
1.125 + else:
1.126 + raise StopIteration
1.127 +
1.128 class MakeParallel:
1.129
1.130 "A wrapper around functions making them able to communicate results."
1.131 @@ -595,6 +679,15 @@
1.132 child.close()
1.133 return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0))
1.134
1.135 +def exit(channel):
1.136 +
1.137 + """
1.138 + Terminate a created process, closing the given 'channel'.
1.139 + """
1.140 +
1.141 + channel.close()
1.142 + os._exit(0)
1.143 +
1.144 def start(callable, *args, **kw):
1.145
1.146 """
1.147 @@ -616,8 +709,7 @@
1.148 exc_type, exc_value, exc_traceback = sys.exc_info()
1.149 channel.send(exc_value)
1.150 finally:
1.151 - channel.close()
1.152 - os._exit(0)
1.153 + pprocess.exit(channel)
1.154 else:
1.155 return channel
1.156