# HG changeset patch # User Paul Boddie # Date 1441644894 -7200 # Node ID ceebd7a6e5229bdccf8076c1e4daa14043a14769 # Parent 73b632900c42751e999e7df80c4879d0b52f0583 Added a compound locking mechanism to avoid update inconsistencies upon concurrent modification of data. diff -r 73b632900c42 -r ceebd7a6e522 imip_store.py --- a/imip_store.py Sun Sep 06 23:20:02 2015 +0200 +++ b/imip_store.py Mon Sep 07 18:54:54 2015 +0200 @@ -64,19 +64,32 @@ value. """ + f = codecs.open(filename, "rb", encoding="utf-8") + try: + l = [] + for line in f.readlines(): + t = line.strip(" \r\n").split("\t") + if empty_defaults: + t = self._set_defaults(t, empty_defaults) + l.append(tuple(t)) + return l + finally: + f.close() + + def _get_table_atomic(self, user, filename, empty_defaults=None): + + """ + From the file for the given 'user' having the given 'filename', return + a list of tuples representing the file's contents. + + The 'empty_defaults' is a list of (index, value) tuples indicating the + default value where a column either does not exist or provides an empty + value. + """ + self.acquire_lock(user) try: - f = codecs.open(filename, "rb", encoding="utf-8") - try: - l = [] - for line in f.readlines(): - t = line.strip(" \r\n").split("\t") - if empty_defaults: - t = self._set_defaults(t, empty_defaults) - l.append(tuple(t)) - return l - finally: - f.close() + return self._get_table(user, filename, empty_defaults) finally: self.release_lock(user) @@ -91,17 +104,30 @@ value. """ + f = codecs.open(filename, "wb", encoding="utf-8") + try: + for item in items: + if empty_defaults: + item = self._set_defaults(list(item), empty_defaults) + f.write("\t".join(item) + "\n") + finally: + f.close() + fix_permissions(filename) + + def _set_table_atomic(self, user, filename, items, empty_defaults=None): + + """ + For the given 'user', write to the file having the given 'filename' the + 'items'. + + The 'empty_defaults' is a list of (index, value) tuples indicating the + default value where a column either does not exist or provides an empty + value. + """ + self.acquire_lock(user) try: - f = codecs.open(filename, "wb", encoding="utf-8") - try: - for item in items: - if empty_defaults: - item = self._set_defaults(list(item), empty_defaults) - f.write("\t".join(item) + "\n") - finally: - f.close() - fix_permissions(filename) + self._set_table(user, filename, items, empty_defaults) finally: self.release_lock(user) @@ -420,7 +446,7 @@ # Attempt to read providers, with a declaration of the datetime # from which such providers are considered as still being active. - t = self._get_table(user, filename, [(1, None)]) + t = self._get_table_atomic(user, filename, [(1, None)]) try: dt_string = t[0][0] except IndexError: @@ -469,7 +495,7 @@ return False t.insert(0, (dt_string,)) - self._set_table(user, filename, t, [(1, "")]) + self._set_table_atomic(user, filename, t, [(1, "")]) return True def set_freebusy_providers(self, user, dt, providers): @@ -517,17 +543,28 @@ # Free/busy period access. - def get_freebusy(self, user): + def get_freebusy(self, user, name=None, get_table=None): "Get free/busy details for the given 'user'." - filename = self.get_object_in_store(user, "freebusy") + filename = self.get_object_in_store(user, name or "freebusy") if not filename or not exists(filename): return [] else: - return map(lambda t: FreeBusyPeriod(*t), self._get_table(user, filename, [(4, None)])) + return map(lambda t: FreeBusyPeriod(*t), + (get_table or self._get_table_atomic)(user, filename, [(4, None)])) + + def get_freebusy_for_update(self, user, name=None): - def get_freebusy_for_other(self, user, other): + """ + Get free/busy details for the given 'user', locking the table. Dependent + code must release this lock regardless of it completing successfully. + """ + + self.acquire_lock(user) + return self.get_freebusy(user, name, self._get_table) + + def get_freebusy_for_other(self, user, other, get_table=None): "For the given 'user', get free/busy details for the 'other' user." @@ -535,20 +572,39 @@ if not filename or not exists(filename): return [] else: - return map(lambda t: FreeBusyPeriod(*t), self._get_table(user, filename, [(4, None)])) + return map(lambda t: FreeBusyPeriod(*t), + (get_table or self._get_table_atomic)(user, filename, [(4, None)])) + + def get_freebusy_for_other_for_update(self, user, other): - def set_freebusy(self, user, freebusy): + """ + For the given 'user', get free/busy details for the 'other' user, + locking the table. Dependent code must release this lock regardless of + it completing successfully. + """ + + self.acquire_lock(user) + return self.get_freebusy_for_other(user, other, self._get_table) + + def set_freebusy(self, user, freebusy, name=None, set_table=None): "For the given 'user', set 'freebusy' details." - filename = self.get_object_in_store(user, "freebusy") + filename = self.get_object_in_store(user, name or "freebusy") if not filename: return False - self._set_table(user, filename, map(lambda fb: fb.as_tuple(strings_only=True), freebusy)) + (set_table or self._set_table_atomic)(user, filename, + map(lambda fb: fb.as_tuple(strings_only=True), freebusy)) return True - def set_freebusy_for_other(self, user, freebusy, other): + def set_freebusy_in_update(self, user, freebusy, name=None): + + "For the given 'user', set 'freebusy' details during a compound update." + + return self.set_freebusy(user, freebusy, name, self._set_table) + + def set_freebusy_for_other(self, user, freebusy, other, set_table=None): "For the given 'user', set 'freebusy' details for the 'other' user." @@ -556,9 +612,23 @@ if not filename: return False - self._set_table(user, filename, map(lambda fb: fb.as_tuple(strings_only=True), freebusy)) + (set_table or self._set_table_atomic)(user, filename, + map(lambda fb: fb.as_tuple(strings_only=True), freebusy)) return True + def set_freebusy_for_other_in_update(self, user, freebusy, other): + + """ + For the given 'user', set 'freebusy' details for the 'other' user during + a compound update. + """ + + return self.set_freebusy_for_other(user, freebusy, other, self._set_table) + + # Release methods. + + release_freebusy = release_lock + # Object status details access. def _get_requests(self, user, queue): @@ -569,7 +639,7 @@ if not filename or not exists(filename): return None - return self._get_table(user, filename, [(1, None)]) + return self._get_table_atomic(user, filename, [(1, None)]) def get_requests(self, user): diff -r 73b632900c42 -r ceebd7a6e522 imiptools/client.py --- a/imiptools/client.py Sun Sep 06 23:20:02 2015 +0200 +++ b/imiptools/client.py Mon Sep 07 18:54:54 2015 +0200 @@ -611,13 +611,17 @@ if user == self.user: return - freebusy = self.store.get_freebusy_for_other(self.user, user) - fn(freebusy, user, for_organiser, True) + freebusy = self.store.get_freebusy_for_other_for_update(self.user, user) + try: + fn(freebusy, user, for_organiser, True) + + # Tidy up any obsolete recurrences. - # Tidy up any obsolete recurrences. + self.remove_freebusy_for_recurrences(freebusy, self.store.get_recurrences(self.user, self.uid)) + self.store.set_freebusy_for_other_in_update(self.user, freebusy, user) - self.remove_freebusy_for_recurrences(freebusy, self.store.get_recurrences(self.user, self.uid)) - self.store.set_freebusy_for_other(self.user, freebusy, user) + finally: + self.store.release_freebusy(self.user) def update_freebusy_from_organiser(self, organiser): diff -r 73b632900c42 -r ceebd7a6e522 imiptools/filesys.py --- a/imiptools/filesys.py Sun Sep 06 23:20:02 2015 +0200 +++ b/imiptools/filesys.py Mon Sep 07 18:54:54 2015 +0200 @@ -22,7 +22,7 @@ import errno from imiptools.config import DEFAULT_PERMISSIONS, DEFAULT_DIR_PERMISSIONS from os.path import abspath, commonprefix, exists, join, split -from os import chmod, makedirs, mkdir, rename, rmdir +from os import chmod, getpid, makedirs, mkdir, rename, rmdir from time import sleep, time def check_dir(base, filename): @@ -149,13 +149,34 @@ # This uses the directory creation method exploited by MoinMoin.util.lock. # However, a simple single lock type mechanism is employed here. - def get_lock_dir(self, *parts): + def make_lock_dir(self, *parts): - "Return the lock directory defined by the given 'parts'." + "Make the lock directory defined by the given 'parts'." parts = parts and list(parts) or [] parts.append(self.lock_name) - return self.get_object_in_store(*parts) + parts.append(str(getpid())) + makedirs(self.get_object_in_store(*parts)) + + def remove_lock_dir(self, *parts): + + "Remove the lock directory defined by the given 'parts'." + + parts = parts and list(parts) or [] + parts.append(self.lock_name) + parts.append(str(getpid())) + rmdir(self.get_object_in_store(*parts)) + parts.pop() + rmdir(self.get_object_in_store(*parts)) + + def owning_lock_dir(self, *parts): + + "Return whether this process owns the lock directory." + + parts = parts and list(parts) or [] + parts.append(self.lock_name) + parts.append(str(getpid())) + return exists(self.get_object_in_store(*parts)) def acquire_lock(self, timeout=None, *parts): @@ -168,11 +189,13 @@ while not timeout or now - start < timeout: try: - mkdir(self.get_lock_dir(*parts)) + self.make_lock_dir(*parts) break except OSError, exc: if exc.errno != errno.EEXIST: raise + elif self.owning_lock_dir(*parts): + break sleep(1) now = time() @@ -184,7 +207,7 @@ """ try: - rmdir(self.get_lock_dir(*parts)) + self.remove_lock_dir(*parts) except OSError, exc: if exc.errno != errno.ENOENT: raise diff -r 73b632900c42 -r ceebd7a6e522 imiptools/handlers/common.py --- a/imiptools/handlers/common.py Sun Sep 06 23:20:02 2015 +0200 +++ b/imiptools/handlers/common.py Mon Sep 07 18:54:54 2015 +0200 @@ -77,26 +77,29 @@ organiser of an event if 'for_organiser' is set to a true value. """ - freebusy = self.store.get_freebusy(self.user) + freebusy = self.store.get_freebusy_for_update(self.user) + try: + # Obtain the attendance attributes for this user, if available. - # Obtain the attendance attributes for this user, if available. + self.update_freebusy_for_participant(freebusy, self.user, for_organiser) - self.update_freebusy_for_participant(freebusy, self.user, for_organiser) + # Remove original recurrence details replaced by additional + # recurrences, as well as obsolete additional recurrences. - # Remove original recurrence details replaced by additional - # recurrences, as well as obsolete additional recurrences. + self.remove_freebusy_for_recurrences(freebusy, self.store.get_recurrences(self.user, self.uid)) + self.store.set_freebusy_in_update(self.user, freebusy) - self.remove_freebusy_for_recurrences(freebusy, self.store.get_recurrences(self.user, self.uid)) - self.store.set_freebusy(self.user, freebusy) + if self.publisher and self.is_sharing(): + self.publisher.set_freebusy(self.user, freebusy) - if self.publisher and self.is_sharing(): - self.publisher.set_freebusy(self.user, freebusy) + # Update free/busy provider information if the event may recur + # indefinitely. - # Update free/busy provider information if the event may recur - # indefinitely. + if self.possibly_recurring_indefinitely(): + self.store.append_freebusy_provider(self.user, self.obj) - if self.possibly_recurring_indefinitely(): - self.store.append_freebusy_provider(self.user, self.obj) + finally: + self.store.release_freebusy(self.user) return True @@ -104,18 +107,22 @@ "Remove free/busy information when handling an object." - freebusy = self.store.get_freebusy(self.user) - self.remove_from_freebusy(freebusy) - self.remove_freebusy_for_recurrences(freebusy) - self.store.set_freebusy(self.user, freebusy) + freebusy = self.store.get_freebusy_for_update(self.user) + try: + self.remove_from_freebusy(freebusy) + self.remove_freebusy_for_recurrences(freebusy) + self.store.set_freebusy_in_update(self.user, freebusy) + + if self.publisher and self.is_sharing(): + self.publisher.set_freebusy(self.user, freebusy) - if self.publisher and self.is_sharing(): - self.publisher.set_freebusy(self.user, freebusy) + # Update free/busy provider information if the event may recur + # indefinitely. - # Update free/busy provider information if the event may recur - # indefinitely. + if self.possibly_recurring_indefinitely(): + self.store.remove_freebusy_provider(self.user, self.obj) - if self.possibly_recurring_indefinitely(): - self.store.remove_freebusy_provider(self.user, self.obj) + finally: + self.store.release_freebusy(self.user) # vim: tabstop=4 expandtab shiftwidth=4 diff -r 73b632900c42 -r ceebd7a6e522 imiptools/handlers/person.py --- a/imiptools/handlers/person.py Sun Sep 06 23:20:02 2015 +0200 +++ b/imiptools/handlers/person.py Mon Sep 07 18:54:54 2015 +0200 @@ -167,9 +167,12 @@ period = Period(dtstart, dtend, self.get_tzid()) for sender, sender_attr in senders: - stored_freebusy = self.store.get_freebusy_for_other(self.user, sender) - replace_overlapping(stored_freebusy, period, freebusy) - self.store.set_freebusy_for_other(self.user, stored_freebusy, sender) + stored_freebusy = self.store.get_freebusy_for_other_for_update(self.user, sender) + try: + replace_overlapping(stored_freebusy, period, freebusy) + self.store.set_freebusy_for_other_in_update(self.user, stored_freebusy, sender) + finally: + self.store.release_freebusy(self.user) def _refresh(self):