imip-agent

vRecurrence.py

1178:3252084e9608
2016-05-13 Paul Boddie Renamed the quota tool and updated the documentation to illustrate the way it currently works.
     1 #!/usr/bin/env python     2      3 """     4 Recurrence rule calculation.     5      6 Copyright (C) 2014, 2015 Paul Boddie <paul@boddie.org.uk>     7      8 This program is free software; you can redistribute it and/or modify it under     9 the terms of the GNU General Public License as published by the Free Software    10 Foundation; either version 3 of the License, or (at your option) any later    11 version.    12     13 This program is distributed in the hope that it will be useful, but WITHOUT    14 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS    15 FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more    16 details.    17     18 You should have received a copy of the GNU General Public License along with    19 this program.  If not, see <http://www.gnu.org/licenses/>.    20     21 ----    22     23 References:    24     25 RFC 5545: Internet Calendaring and Scheduling Core Object Specification    26           (iCalendar)    27           http://tools.ietf.org/html/rfc5545    28     29 ----    30     31 FREQ defines the selection resolution.    32 DTSTART defines the start of the selection.    33 INTERVAL defines the step of the selection.    34 COUNT defines a number of instances    35 UNTIL defines a limit to the selection.    36     37 BY... qualifiers select instances within each outer selection instance according    38 to the recurrence of instances of the next highest resolution. For example,    39 BYDAY selects days in weeks. Thus, if no explicit week recurrence is indicated,    40 all weeks are selected within the selection of the next highest explicitly    41 specified resolution, whether this is months or years.    42     43 BYSETPOS in conjunction with BY... qualifiers permit the selection of specific    44 instances.    45     46 Within the FREQ resolution, BY... qualifiers refine selected instances.    47     48 Outside the FREQ resolution, BY... qualifiers select instances at the resolution    49 concerned.    50     51 Thus, FREQ and BY... qualifiers need to be ordered in terms of increasing    52 resolution (or decreasing scope).    53 """    54     55 from calendar import monthrange    56 from datetime import date, datetime, timedelta    57 import operator    58     59 # Frequency levels, specified by FREQ in iCalendar.    60     61 freq_levels = (    62     "YEARLY",    63     "MONTHLY",    64     "WEEKLY",    65     None,    66     None,    67     "DAILY",    68     "HOURLY",    69     "MINUTELY",    70     "SECONDLY"    71     )    72     73 # Enumeration levels, employed by BY... qualifiers.    74     75 enum_levels = (    76     None,    77     "BYMONTH",    78     "BYWEEKNO",    79     "BYYEARDAY",    80     "BYMONTHDAY",    81     "BYDAY",    82     "BYHOUR",    83     "BYMINUTE",    84     "BYSECOND"    85     )    86     87 # Map from levels to lengths of datetime tuples.    88     89 lengths = [1, 2, 3, 3, 3, 3, 4, 5, 6]    90 positions = [l-1 for l in lengths]    91     92 # Map from qualifiers to interval units. Here, weeks are defined as 7 days.    93     94 units = {"WEEKLY" : 7}    95     96 # Make dictionaries mapping qualifiers to levels.    97     98 freq = dict([(level, i) for (i, level) in enumerate(freq_levels) if level])    99 enum = dict([(level, i) for (i, level) in enumerate(enum_levels) if level])   100 weekdays = dict([(weekday, i+1) for (i, weekday) in enumerate(["MO", "TU", "WE", "TH", "FR", "SA", "SU"])])   101    102 # Functions for structuring the recurrences.   103    104 def get_next(it):   105     try:   106         return it.next()   107     except StopIteration:   108         return None   109    110 def get_parameters(values):   111    112     "Return parameters from the given list of 'values'."   113    114     d = {}   115     for value in values:   116         parts = value.split("=", 1)   117         if len(parts) < 2:   118             continue   119         key, value = parts   120         if key in ("COUNT", "BYSETPOS"):   121             d[key] = int(value)   122         else:   123             d[key] = value   124     return d   125    126 def get_qualifiers(values):   127    128     """   129     Process the list of 'values' of the form "key=value", returning a list of   130     qualifiers of the form (qualifier name, args).   131     """   132    133     qualifiers = []   134     frequency = None   135     interval = 1   136    137     for value in values:   138         parts = value.split("=", 1)   139         if len(parts) < 2:   140             continue   141         key, value = parts   142         if key == "FREQ" and freq.has_key(value):   143             qualifier = frequency = (value, {})   144         elif key == "INTERVAL":   145             interval = int(value)   146             continue   147         elif enum.has_key(key):   148             qualifier = (key, {"values" : get_qualifier_values(key, value)})   149         else:   150             continue   151    152         qualifiers.append(qualifier)   153    154     if frequency:   155         frequency[1]["interval"] = interval   156    157     return qualifiers   158    159 def get_qualifier_values(qualifier, value):   160    161     """   162     For the given 'qualifier', process the 'value' string, returning a list of   163     suitable values.   164     """   165    166     if qualifier != "BYDAY":   167         return map(int, value.split(","))   168    169     values = []   170     for part in value.split(","):   171         weekday = weekdays.get(part[-2:])   172         if not weekday:   173             continue   174         index = part[:-2]   175         if index:   176             index = int(index)   177         else:   178             index = None   179         values.append((weekday, index))   180    181     return values   182    183 def order_qualifiers(qualifiers):   184    185     "Return the 'qualifiers' in order of increasing resolution."   186    187     l = []   188    189     for qualifier, args in qualifiers:   190         if enum.has_key(qualifier):   191             level = enum[qualifier]   192             if special_enum_levels.has_key(qualifier):   193                 args["interval"] = 1   194                 selector = special_enum_levels[qualifier]   195             else:   196                 selector = Enum   197         else:   198             level = freq[qualifier]   199             selector = Pattern   200    201         l.append(selector(level, args, qualifier))   202    203     l.sort(key=lambda x: x.level)   204     return l   205    206 def get_datetime_structure(datetime):   207    208     "Return the structure of 'datetime' for recurrence production."   209    210     l = []   211     offset = 0   212     for level, value in enumerate(datetime):   213         if level == 2:   214             offset = 3   215         l.append(Enum(level + offset, {"values" : [value]}, "DT"))   216     return l   217    218 def combine_datetime_with_qualifiers(datetime, qualifiers):   219    220     """   221     Combine 'datetime' with 'qualifiers' to produce a structure for recurrence   222     production.   223     """   224    225     iter_dt = iter(get_datetime_structure(datetime))   226     iter_q = iter(order_qualifiers(qualifiers))   227    228     l = []   229    230     from_dt = get_next(iter_dt)   231     from_q = get_next(iter_q)   232    233     have_q = False   234     context = []   235     context.append(from_dt.args["values"][0])   236    237     # Consume from both lists, merging entries.   238    239     while from_dt and from_q:   240         _level = from_dt.level   241         level = from_q.level   242    243         # Datetime value at wider resolution.   244    245         if _level < level:   246             from_dt = get_next(iter_dt)   247             context.append(from_dt.args["values"][0])   248    249         # Qualifier at wider or same resolution as datetime value.   250    251         else:   252             if not have_q:   253                 if isinstance(from_q, Enum) and level > 0:   254                     repeat = Pattern(level - 1, {"interval" : 1}, None)   255                     repeat.context = tuple(context)   256                     l.append(repeat)   257                 have_q = True   258    259             from_q.context = tuple(context)   260             l.append(from_q)   261             from_q = get_next(iter_q)   262    263             if _level == level:   264                 context.append(from_dt.args["values"][0])   265                 from_dt = get_next(iter_dt)   266    267     # Complete the list.   268    269     while from_dt:   270         l.append(from_dt)   271         from_dt = get_next(iter_dt)   272    273     while from_q:   274         if not have_q:   275             if isinstance(from_q, Enum) and level > 0:   276                 repeat = Pattern(level - 1, {"interval" : 1}, None)   277                 repeat.context = tuple(context)   278                 l.append(repeat)   279             have_q = True   280    281         from_q.context = tuple(context)   282         l.append(from_q)   283         from_q = get_next(iter_q)   284    285     return l   286    287 # Datetime arithmetic.   288    289 def combine(t1, t2):   290    291     """   292     Combine tuples 't1' and 't2', returning a copy of 't1' filled with values   293     from 't2' in positions where 0 appeared in 't1'.   294     """   295    296     return tuple(map(lambda x, y: x or y, t1, t2))   297    298 def scale(interval, pos):   299    300     """   301     Scale the given 'interval' value to the indicated position 'pos', returning   302     a tuple with leading zero elements and 'interval' at the stated position.   303     """   304    305     return (0,) * pos + (interval,)   306    307 def get_seconds(t):   308    309     "Convert the sub-day part of 't' into seconds."   310    311     t = t + (0,) * (6 - len(t))   312     return (t[3] * 60 + t[4]) * 60 + t[5]   313    314 def update(t, step):   315    316     "Update 't' by 'step' at the resolution of 'step'."   317    318     i = len(step)   319    320     # Years only.   321    322     if i == 1:   323         return (t[0] + step[0],) + tuple(t[1:])   324    325     # Years and months.   326    327     elif i == 2:   328         month = t[1] + step[1]   329         return (t[0] + step[0] + (month - 1) / 12, (month - 1) % 12 + 1) + tuple(t[2:])   330    331     # Dates and datetimes.   332    333     else:   334         updated_for_months = update(t, step[:2])   335    336         # Dates only.   337    338         if i == 3:   339             d = datetime(*updated_for_months)   340             s = timedelta(step[2])   341    342         # Datetimes.   343    344         else:   345             d = datetime(*updated_for_months)   346             s = timedelta(step[2], get_seconds(step))   347    348         return to_tuple(d + s, len(t))   349    350 def to_tuple(d, n=None):   351    352     "Return 'd' as a tuple, optionally trimming the result to 'n' positions."   353    354     if not isinstance(d, date):   355         return d   356     if n is None:   357         if isinstance(d, datetime):   358             n = 6   359         else:   360             n = 3   361     return d.timetuple()[:n]   362    363 def get_first_day(first_day, weekday):   364    365     "Return the first occurrence at or after 'first_day' of 'weekday'."   366    367     first_day = date(*first_day)   368     first_weekday = first_day.isoweekday()   369     if first_weekday > weekday:   370         return first_day + timedelta(7 - first_weekday + weekday)   371     else:   372         return first_day + timedelta(weekday - first_weekday)   373    374 def get_last_day(last_day, weekday):   375    376     "Return the last occurrence at or before 'last_day' of 'weekday'."   377    378     last_day = date(*last_day)   379     last_weekday = last_day.isoweekday()   380     if last_weekday < weekday:   381         return last_day - timedelta(last_weekday + 7 - weekday)   382     else:   383         return last_day - timedelta(last_weekday - weekday)   384    385 # Classes for producing instances from recurrence structures.   386    387 class Selector:   388    389     "A generic selector."   390    391     def __init__(self, level, args, qualifier, selecting=None):   392    393         """   394         Initialise at the given 'level' a selector employing the given 'args'   395         defined in the interpretation of recurrence rule qualifiers, with the   396         'qualifier' being the name of the rule qualifier, and 'selecting' being   397         an optional selector used to find more specific instances from those   398         found by this selector.   399         """   400    401         self.level = level   402         self.pos = positions[level]   403         self.args = args   404         self.qualifier = qualifier   405         self.context = ()   406         self.selecting = selecting   407    408     def __repr__(self):   409         return "%s(%r, %r, %r, %r)" % (self.__class__.__name__, self.level, self.args, self.qualifier, self.context)   410    411     def materialise(self, start, end, count=None, setpos=None, inclusive=False):   412    413         """   414         Starting at 'start', materialise instances up to but not including any   415         at 'end' or later, returning at most 'count' if specified, and returning   416         only the occurrences indicated by 'setpos' if specified. A list of   417         instances is returned.   418    419         If 'inclusive' is specified, the selection of instances will include the   420         end of the search period if present in the results.   421         """   422    423         start = to_tuple(start)   424         end = to_tuple(end)   425         counter = count and [0, count]   426         results = self.materialise_items(self.context, start, end, counter, setpos, inclusive)   427         results.sort()   428         return results[:count]   429    430     def materialise_item(self, current, earliest, next, counter, setpos=None, inclusive=False):   431    432         """   433         Given the 'current' instance, the 'earliest' acceptable instance, the   434         'next' instance, an instance 'counter', and the optional 'setpos'   435         criteria, return a list of result items. Where no selection within the   436         current instance occurs, the current instance will be returned as a   437         result if the same or later than the earliest acceptable instance.   438         """   439    440         if self.selecting:   441             return self.selecting.materialise_items(current, earliest, next, counter, setpos, inclusive)   442         elif earliest <= current:   443             return [current]   444         else:   445             return []   446    447     def convert_positions(self, setpos):   448    449         "Convert 'setpos' to 0-based indexes."   450    451         l = []   452         for pos in setpos:   453             lower = pos < 0 and pos or pos - 1   454             upper = pos > 0 and pos or pos < -1 and pos + 1 or None   455             l.append((lower, upper))   456         return l   457    458     def select_positions(self, results, setpos):   459    460         "Select in 'results' the 1-based positions given by 'setpos'."   461    462         results.sort()   463         l = []   464         for lower, upper in self.convert_positions(setpos):   465             l += results[lower:upper]   466         return l   467    468     def filter_by_period(self, results, start, end, inclusive):   469    470         """   471         Filter 'results' so that only those at or after 'start' and before 'end'   472         are returned.   473    474         If 'inclusive' is specified, the selection of instances will include the   475         end of the search period if present in the results.   476         """   477    478         l = []   479         for result in results:   480             if start <= result and (inclusive and result <= end or result < end):   481                 l.append(result)   482         return l   483    484 class Pattern(Selector):   485    486     "A selector of instances according to a repeating pattern."   487    488     def materialise_items(self, context, start, end, counter, setpos=None, inclusive=False):   489         first = scale(self.context[self.pos], self.pos)   490    491         # Define the step between items.   492    493         interval = self.args.get("interval", 1) * units.get(self.qualifier, 1)   494         step = scale(interval, self.pos)   495    496         # Define the scale of a single item.   497    498         unit_interval = units.get(self.qualifier, 1)   499         unit_step = scale(unit_interval, self.pos)   500    501         current = combine(context, first)   502         results = []   503    504         while (inclusive and current <= end or current < end) and (counter is None or counter[0] < counter[1]):   505             next = update(current, step)   506             current_end = update(current, unit_step)   507             interval_results = self.materialise_item(current, max(current, start), min(current_end, end), counter, setpos, inclusive)   508             if counter is not None:   509                 counter[0] += len(interval_results)   510             results += interval_results   511             current = next   512    513         return results   514    515 class WeekDayFilter(Selector):   516    517     "A selector of instances specified in terms of day numbers."   518    519     def materialise_items(self, context, start, end, counter, setpos=None, inclusive=False):   520         step = scale(1, 2)   521         results = []   522    523         # Get weekdays in the year.   524    525         if len(context) == 1:   526             first_day = (context[0], 1, 1)   527             last_day = (context[0], 12, 31)   528    529         # Get weekdays in the month.   530    531         elif len(context) == 2:   532             first_day = (context[0], context[1], 1)   533             last_day = update((context[0], context[1], 1), (0, 1, 0))   534             last_day = update(last_day, (0, 0, -1))   535    536         # Get weekdays in the week.   537    538         else:   539             current = context   540             values = [value for (value, index) in self.args["values"]]   541    542             while (inclusive and current <= end or current < end):   543                 next = update(current, step)   544                 if date(*current).isoweekday() in values:   545                     results += self.materialise_item(current, max(current, start), min(next, end), counter, inclusive=inclusive)   546                 current = next   547    548             if setpos:   549                 return self.select_positions(results, setpos)   550             else:   551                 return results   552    553         # Find each of the given days.   554    555         for value, index in self.args["values"]:   556             if index is not None:   557                 offset = timedelta(7 * (abs(index) - 1))   558    559                 if index < 0:   560                     current = to_tuple(get_last_day(last_day, value) - offset, 3)   561                 else:   562                     current = to_tuple(get_first_day(first_day, value) + offset, 3)   563    564                 next = update(current, step)   565    566                 # To support setpos, only current and next bound the search, not   567                 # the period in addition.   568    569                 results += self.materialise_item(current, current, next, counter, inclusive=inclusive)   570    571             else:   572                 if index < 0:   573                     current = to_tuple(get_last_day(last_day, value), 3)   574                     direction = operator.sub   575                 else:   576                     current = to_tuple(get_first_day(first_day, value), 3)   577                     direction = operator.add   578    579                 while first_day <= current <= last_day:   580                     next = update(current, step)   581    582                     # To support setpos, only current and next bound the search, not   583                     # the period in addition.   584    585                     results += self.materialise_item(current, current, next, counter, inclusive=inclusive)   586                     current = to_tuple(direction(date(*current), timedelta(7)), 3)   587    588         # Extract selected positions and remove out-of-period instances.   589    590         if setpos:   591             results = self.select_positions(results, setpos)   592    593         return self.filter_by_period(results, start, end, inclusive)   594    595 class Enum(Selector):   596     def materialise_items(self, context, start, end, counter, setpos=None, inclusive=False):   597         step = scale(1, self.pos)   598         results = []   599         for value in self.args["values"]:   600             current = combine(context, scale(value, self.pos))   601             next = update(current, step)   602    603             # To support setpos, only current and next bound the search, not   604             # the period in addition.   605    606             results += self.materialise_item(current, current, next, counter, setpos, inclusive)   607    608         # Extract selected positions and remove out-of-period instances.   609    610         if setpos:   611             results = self.select_positions(results, setpos)   612    613         return self.filter_by_period(results, start, end, inclusive)   614    615 class MonthDayFilter(Enum):   616     def materialise_items(self, context, start, end, counter, setpos=None, inclusive=False):   617         last_day = monthrange(context[0], context[1])[1]   618         step = scale(1, self.pos)   619         results = []   620         for value in self.args["values"]:   621             if value < 0:   622                 value = last_day + 1 + value   623             current = combine(context, scale(value, self.pos))   624             next = update(current, step)   625    626             # To support setpos, only current and next bound the search, not   627             # the period in addition.   628    629             results += self.materialise_item(current, current, next, counter, inclusive=inclusive)   630    631         # Extract selected positions and remove out-of-period instances.   632    633         if setpos:   634             results = self.select_positions(results, setpos)   635    636         return self.filter_by_period(results, start, end, inclusive)   637    638 class YearDayFilter(Enum):   639     def materialise_items(self, context, start, end, counter, setpos=None, inclusive=False):   640         first_day = date(context[0], 1, 1)   641         next_first_day = date(context[0] + 1, 1, 1)   642         year_length = (next_first_day - first_day).days   643         step = scale(1, self.pos)   644         results = []   645         for value in self.args["values"]:   646             if value < 0:   647                 value = year_length + 1 + value   648             current = to_tuple(first_day + timedelta(value - 1), 3)   649             next = update(current, step)   650    651             # To support setpos, only current and next bound the search, not   652             # the period in addition.   653    654             results += self.materialise_item(current, current, next, counter, inclusive=inclusive)   655    656         # Extract selected positions and remove out-of-period instances.   657    658         if setpos:   659             results = self.select_positions(results, setpos)   660    661         return self.filter_by_period(results, start, end, inclusive)   662    663 special_enum_levels = {   664     "BYDAY" : WeekDayFilter,   665     "BYMONTHDAY" : MonthDayFilter,   666     "BYYEARDAY" : YearDayFilter,   667     }   668    669 # Public functions.   670    671 def connect_selectors(selectors):   672    673     """   674     Make the 'selectors' reference each other in a hierarchy so that   675     materialising the principal selector causes the more specific ones to be   676     employed in the operation.   677     """   678    679     current = selectors[0]   680     for selector in selectors[1:]:   681         current.selecting = selector   682         current = selector   683     return selectors[0]   684    685 def get_selector(dt, qualifiers):   686    687     """   688     Combine the initial datetime 'dt' with the given 'qualifiers', returning an   689     object that can be used to select recurrences described by the 'qualifiers'.   690     """   691    692     dt = to_tuple(dt)   693     return connect_selectors(combine_datetime_with_qualifiers(dt, qualifiers))   694    695 def get_rule(dt, rule):   696    697     """   698     Using the given initial datetime 'dt', interpret the 'rule' (a semicolon-   699     separated collection of "key=value" strings), and return the resulting   700     selector object.   701     """   702    703     if not isinstance(rule, tuple):   704         rule = rule.split(";")   705     qualifiers = get_qualifiers(rule)   706     return get_selector(dt, qualifiers)   707    708 # vim: tabstop=4 expandtab shiftwidth=4