# HG changeset patch # User Paul Boddie # Date 1475532969 -7200 # Node ID 45aa2c728c7af909466b9c331674d0d81d760d69 # Parent ede16df6ab1d20cde02cac10a90ce3350fcc2e4a Attempt to handle temporary resource unavailability by queuing processes, sleeping if no processes have been created already. diff -r ede16df6ab1d -r 45aa2c728c7a pprocess.py --- a/pprocess.py Tue Oct 20 23:16:46 2015 +0200 +++ b/pprocess.py Tue Oct 04 00:16:09 2016 +0200 @@ -4,7 +4,8 @@ A simple parallel processing API for Python, inspired somewhat by the thread module, slightly less by pypar, and slightly less still by pypvm. -Copyright (C) 2005, 2006, 2007, 2008, 2009, 2013 Paul Boddie +Copyright (C) 2005, 2006, 2007, 2008, 2009, 2013, 2014, + 2016 Paul Boddie Copyright (C) 2013 Yaroslav Halchenko This program is free software; you can redistribute it and/or modify it under @@ -28,7 +29,9 @@ import select import socket import platform +import errno +from time import sleep from warnings import warn try: @@ -402,16 +405,22 @@ try: self.store_data(channel) self.start_waiting(channel) - except IOError, exc: + except (IOError, OSError), exc: self.remove(channel) - warn("Removed channel %r due to IOError: %s" % (channel, exc)) + warn("Removed channel %r due to exception: %s" % (channel, exc)) # Or schedule new processes and channels. else: while self.waiting and not self.busy(): - callable, args, kw = self.waiting.pop() - self.start(callable, *args, **kw) + details = self.waiting.pop() + + # Stop actively scheduling if resources are exhausted. + + if not self.start_new_waiting(details): + if not self.active(): + sleep(1) + break def store_data(self, channel): @@ -449,6 +458,8 @@ self.add(channel) channel.send((args, kw)) + # Return the details for a new channel. + else: return callable, args, kw @@ -496,9 +507,31 @@ """ details = self._get_waiting(channel) + if details is not None: - callable, args, kw = details - self.add(start(callable, *args, **kw)) + self.start_new_waiting(details) + + def start_new_waiting(self, details): + + """ + Start a waiting process with the given 'details', obtaining a new + channel. + """ + + callable, args, kw = details + channel = self._start(callable, *args, **kw) + + # Monitor any newly-created process. + + if channel: + self.add(channel) + return True + + # Push the details back onto the end of the waiting list. + + else: + self.waiting.append(details) + return False # Convenience methods. @@ -511,9 +544,21 @@ """ if self._set_waiting(callable, args, kw): - return + return False + + channel = self._start(callable, *args, **kw) + + # Monitor any newly-created process. - self.add_wait(start(callable, *args, **kw)) + if channel: + self.add_wait(channel) + return True + + # Otherwise, add the details to the waiting list unconditionally. + + else: + self.waiting.insert(0, (callable, args, kw)) + return False def create(self): @@ -537,6 +582,22 @@ return ManagedCallable(callable, self) + def _start(self, callable, *args, **kw): + + """ + Create a new process for the given 'callable' using any additional + arguments provided. Return any successfully created channel or None if + no process could be created at the present time. + """ + + try: + return start(callable, *args, **kw) + except OSError, exc: + if exc.errno != errno.EAGAIN: + raise + else: + return None + class Persistent: """