1 #!/usr/bin/env python 2 3 """ 4 Recurrence rule calculation. 5 6 Copyright (C) 2014 Paul Boddie <paul@boddie.org.uk> 7 8 This program is free software; you can redistribute it and/or modify it under 9 the terms of the GNU General Public License as published by the Free Software 10 Foundation; either version 3 of the License, or (at your option) any later 11 version. 12 13 This program is distributed in the hope that it will be useful, but WITHOUT 14 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 15 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more 16 details. 17 18 You should have received a copy of the GNU General Public License along with 19 this program. If not, see <http://www.gnu.org/licenses/>. 20 21 ---- 22 23 References: 24 25 RFC 5545: Internet Calendaring and Scheduling Core Object Specification 26 (iCalendar) 27 http://tools.ietf.org/html/rfc5545 28 29 ---- 30 31 FREQ defines the selection resolution. 32 DTSTART defines the start of the selection. 33 INTERVAL defines the step of the selection. 34 COUNT defines a number of instances; UNTIL defines a limit to the selection. 35 36 BY... qualifiers select instances within each outer selection instance according 37 to the recurrence of instances of the next highest resolution. For example, 38 BYDAY selects days in weeks. Thus, if no explicit week recurrence is indicated, 39 all weeks are selected within the selection of the next highest explicitly 40 specified resolution, whether this is months or years. 41 42 BYSETPOS in conjunction with BY... qualifiers permit the selection of specific 43 instances. 44 45 Within the FREQ resolution, BY... qualifiers refine selected instances. 46 47 Outside the FREQ resolution, BY... qualifiers select instances at the resolution 48 concerned. 49 50 Thus, FREQ and BY... qualifiers need to be ordered in terms of increasing 51 resolution (or decreasing scope). 52 """ 53 54 from calendar import monthrange 55 from datetime import date, datetime, timedelta 56 import operator 57 58 # Frequency levels, specified by FREQ in iCalendar. 59 60 freq_levels = ( 61 "YEARLY", 62 "MONTHLY", 63 "WEEKLY", 64 None, 65 None, 66 "DAILY", 67 "HOURLY", 68 "MINUTELY", 69 "SECONDLY" 70 ) 71 72 # Enumeration levels, employed by BY... qualifiers. 73 74 enum_levels = ( 75 None, 76 "BYMONTH", 77 "BYWEEKNO", 78 "BYYEARDAY", 79 "BYMONTHDAY", 80 "BYDAY", 81 "BYHOUR", 82 "BYMINUTE", 83 "BYSECOND" 84 ) 85 86 # Map from levels to lengths of datetime tuples. 87 88 lengths = [1, 2, 3, 3, 3, 3, 4, 5, 6] 89 positions = [l-1 for l in lengths] 90 91 # Map from qualifiers to interval units. Here, weeks are defined as 7 days. 92 93 units = {"WEEKLY" : 7} 94 95 # Make dictionaries mapping qualifiers to levels. 96 97 freq = dict([(level, i) for (i, level) in enumerate(freq_levels) if level]) 98 enum = dict([(level, i) for (i, level) in enumerate(enum_levels) if level]) 99 weekdays = dict([(weekday, i+1) for (i, weekday) in enumerate(["MO", "TU", "WE", "TH", "FR", "SA", "SU"])]) 100 101 # Functions for structuring the recurrences. 102 103 def get_next(it): 104 try: 105 return it.next() 106 except StopIteration: 107 return None 108 109 def get_parameters(values): 110 111 "Return parameters from the given list of 'values'." 112 113 d = {} 114 for value in values: 115 parts = value.split("=", 1) 116 if len(parts) < 2: 117 continue 118 key, value = parts 119 if key in ("COUNT", "BYSETPOS"): 120 d[key] = int(value) 121 return d 122 123 def get_qualifiers(values): 124 125 """ 126 Process the list of 'values' of the form "key=value", returning a list of 127 qualifiers. 128 """ 129 130 qualifiers = [] 131 frequency = None 132 interval = 1 133 134 for value in values: 135 parts = value.split("=", 1) 136 if len(parts) < 2: 137 continue 138 key, value = parts 139 if key == "FREQ" and freq.has_key(value): 140 qualifier = frequency = (value, {}) 141 elif key == "INTERVAL": 142 interval = int(value) 143 continue 144 elif enum.has_key(key): 145 qualifier = (key, {"values" : get_qualifier_values(key, value)}) 146 else: 147 continue 148 149 qualifiers.append(qualifier) 150 151 if frequency: 152 frequency[1]["interval"] = interval 153 154 return qualifiers 155 156 def get_qualifier_values(qualifier, value): 157 158 """ 159 For the given 'qualifier', process the 'value' string, returning a list of 160 suitable values. 161 """ 162 163 if qualifier != "BYDAY": 164 return map(int, value.split(",")) 165 166 values = [] 167 for part in value.split(","): 168 weekday = weekdays.get(part[-2:]) 169 if not weekday: 170 continue 171 index = part[:-2] 172 if index: 173 index = int(index) 174 else: 175 index = None 176 values.append((weekday, index)) 177 178 return values 179 180 def order_qualifiers(qualifiers): 181 182 "Return the 'qualifiers' in order of increasing resolution." 183 184 l = [] 185 186 for qualifier, args in qualifiers: 187 if enum.has_key(qualifier): 188 level = enum[qualifier] 189 if special_enum_levels.has_key(qualifier): 190 args["interval"] = 1 191 selector = special_enum_levels[qualifier] 192 else: 193 selector = Enum 194 else: 195 level = freq[qualifier] 196 selector = Pattern 197 198 l.append(selector(level, args, qualifier)) 199 200 l.sort(key=lambda x: x.level) 201 return l 202 203 def get_datetime_structure(datetime): 204 205 "Return the structure of 'datetime' for recurrence production." 206 207 l = [] 208 offset = 0 209 for level, value in enumerate(datetime): 210 if level == 2: 211 offset = 3 212 l.append(Enum(level + offset, {"values" : [value]}, "DT")) 213 return l 214 215 def combine_datetime_with_qualifiers(datetime, qualifiers): 216 217 """ 218 Combine 'datetime' with 'qualifiers' to produce a structure for recurrence 219 production. 220 """ 221 222 iter_dt = iter(get_datetime_structure(datetime)) 223 iter_q = iter(order_qualifiers(qualifiers)) 224 225 l = [] 226 227 from_dt = get_next(iter_dt) 228 from_q = get_next(iter_q) 229 230 have_q = False 231 context = [] 232 context.append(from_dt.args["values"][0]) 233 234 # Consume from both lists, merging entries. 235 236 while from_dt and from_q: 237 _level = from_dt.level 238 level = from_q.level 239 240 # Datetime value at wider resolution. 241 242 if _level < level: 243 from_dt = get_next(iter_dt) 244 context.append(from_dt.args["values"][0]) 245 246 # Qualifier at wider or same resolution as datetime value. 247 248 else: 249 if not have_q: 250 if isinstance(from_q, Enum) and level > 0: 251 repeat = Pattern(level - 1, {"interval" : 1}, None) 252 repeat.context = tuple(context) 253 l.append(repeat) 254 have_q = True 255 256 from_q.context = tuple(context) 257 l.append(from_q) 258 from_q = get_next(iter_q) 259 260 if _level == level: 261 from_dt = get_next(iter_dt) 262 context.append(from_dt.args["values"][0]) 263 264 # Complete the list. 265 266 while from_dt: 267 l.append(from_dt) 268 from_dt = get_next(iter_dt) 269 270 while from_q: 271 if not have_q: 272 if isinstance(from_q, Enum) and level > 0: 273 repeat = Pattern(level - 1, {"interval" : 1}, None) 274 repeat.context = tuple(context) 275 l.append(repeat) 276 have_q = True 277 278 from_q.context = tuple(context) 279 l.append(from_q) 280 from_q = get_next(iter_q) 281 282 return l 283 284 # Datetime arithmetic. 285 286 def combine(t1, t2): 287 288 """ 289 Combine tuples 't1' and 't2', returning a copy of 't1' filled with values 290 from 't2' in positions where 0 appeared in 't1'. 291 """ 292 293 return tuple(map(lambda x, y: x or y, t1, t2)) 294 295 def scale(interval, pos): 296 297 """ 298 Scale the given 'interval' value to the indicated position 'pos', returning 299 a tuple with leading zero elements and 'interval' at the stated position. 300 """ 301 302 return (0,) * pos + (interval,) 303 304 def get_seconds(t): 305 306 "Convert the sub-day part of 't' into seconds." 307 308 t = t + (0,) * (6 - len(t)) 309 return (t[3] * 60 + t[4]) * 60 + t[5] 310 311 def update(t, step): 312 313 "Update 't' by 'step' at the resolution of 'step'." 314 315 i = len(step) 316 317 # Years only. 318 319 if i == 1: 320 return (t[0] + step[0],) + tuple(t[1:]) 321 322 # Years and months. 323 324 elif i == 2: 325 month = t[1] + step[1] 326 return (t[0] + step[0] + (month - 1) / 12, (month - 1) % 12 + 1) + tuple(t[2:]) 327 328 # Dates and datetimes. 329 330 else: 331 updated_for_months = update(t, step[:2]) 332 333 # Dates only. 334 335 if i == 3: 336 d = datetime(*updated_for_months) 337 s = timedelta(step[2]) 338 339 # Datetimes. 340 341 else: 342 d = datetime(*updated_for_months) 343 s = timedelta(step[2], get_seconds(step)) 344 345 return to_tuple(d + s, len(t)) 346 347 def to_tuple(d, n=None): 348 349 "Return 'd' as a tuple, optionally trimming the result to 'n' positions." 350 351 if not isinstance(d, date): 352 return d 353 if n is None: 354 if isinstance(d, datetime): 355 n = 6 356 else: 357 n = 3 358 return d.timetuple()[:n] 359 360 def get_first_day(first_day, weekday): 361 362 "Return the first occurrence at or after 'first_day' of 'weekday'." 363 364 first_day = date(*first_day) 365 first_weekday = first_day.isoweekday() 366 if first_weekday > weekday: 367 return first_day + timedelta(7 - first_weekday + weekday) 368 else: 369 return first_day + timedelta(weekday - first_weekday) 370 371 def get_last_day(last_day, weekday): 372 373 "Return the last occurrence at or before 'last_day' of 'weekday'." 374 375 last_day = date(*last_day) 376 last_weekday = last_day.isoweekday() 377 if last_weekday < weekday: 378 return last_day - timedelta(last_weekday + 7 - weekday) 379 else: 380 return last_day - timedelta(last_weekday - weekday) 381 382 # Classes for producing instances from recurrence structures. 383 384 class Selector: 385 def __init__(self, level, args, qualifier, selecting=None): 386 self.level = level 387 self.pos = positions[level] 388 self.args = args 389 self.qualifier = qualifier 390 self.context = () 391 self.selecting = selecting 392 393 def __repr__(self): 394 return "%s(%r, %r, %r, %r)" % (self.__class__.__name__, self.level, self.args, self.qualifier, self.context) 395 396 def materialise(self, start, end, count=None, setpos=None): 397 start = to_tuple(start) 398 end = to_tuple(end) 399 counter = count and [0, count] 400 results = self.materialise_items(self.context, start, end, counter, setpos) 401 results.sort() 402 return results[:count] 403 404 def materialise_item(self, current, last, next, counter, setpos=None): 405 if self.selecting: 406 return self.selecting.materialise_items(current, last, next, counter, setpos) 407 elif last <= current: 408 return [current] 409 else: 410 return [] 411 412 def convert_positions(self, setpos): 413 l = [] 414 for pos in setpos: 415 lower = pos < 0 and pos or pos - 1 416 upper = pos > 0 and pos or pos < -1 and pos + 1 or None 417 l.append((lower, upper)) 418 return l 419 420 def select_positions(self, results, setpos): 421 results.sort() 422 l = [] 423 for lower, upper in self.convert_positions(setpos): 424 l += results[lower:upper] 425 return l 426 427 def filter_by_period(self, results, start, end): 428 l = [] 429 for result in results: 430 if start <= result < end: 431 l.append(result) 432 return l 433 434 class Pattern(Selector): 435 def materialise_items(self, context, start, end, counter, setpos=None): 436 first = scale(self.context[self.pos], self.pos) 437 438 # Define the step between items. 439 440 interval = self.args.get("interval", 1) * units.get(self.qualifier, 1) 441 step = scale(interval, self.pos) 442 443 # Define the scale of a single item. 444 445 unit_interval = units.get(self.qualifier, 1) 446 unit_step = scale(unit_interval, self.pos) 447 448 current = combine(context, first) 449 results = [] 450 451 while current < end and (counter is None or counter[0] < counter[1]): 452 next = update(current, step) 453 current_end = update(current, unit_step) 454 interval_results = self.materialise_item(current, max(current, start), min(current_end, end), counter, setpos) 455 if counter is not None: 456 counter[0] += len(interval_results) 457 results += interval_results 458 current = next 459 460 return results 461 462 class WeekDayFilter(Selector): 463 def materialise_items(self, context, start, end, counter, setpos=None): 464 step = scale(1, 2) 465 results = [] 466 467 # Get weekdays in the year. 468 469 if len(context) == 1: 470 first_day = (context[0], 1, 1) 471 last_day = (context[0], 12, 31) 472 473 # Get weekdays in the month. 474 475 elif len(context) == 2: 476 first_day = (context[0], context[1], 1) 477 last_day = update((context[0], context[1], 1), (0, 1, 0)) 478 last_day = update(last_day, (0, 0, -1)) 479 480 # Get weekdays in the week. 481 482 else: 483 current = context 484 values = [value for (value, index) in self.args["values"]] 485 486 while current < end: 487 next = update(current, step) 488 if date(*current).isoweekday() in values: 489 results += self.materialise_item(current, max(current, start), min(next, end), counter) 490 current = next 491 492 if setpos: 493 return self.select_positions(results, setpos) 494 else: 495 return results 496 497 # Find each of the given days. 498 499 for value, index in self.args["values"]: 500 if index is not None: 501 offset = timedelta(7 * (abs(index) - 1)) 502 503 if index < 0: 504 current = to_tuple(get_last_day(last_day, value) - offset, 3) 505 else: 506 current = to_tuple(get_first_day(first_day, value) + offset, 3) 507 508 next = update(current, step) 509 510 # To support setpos, only current and next bound the search, not 511 # the period in addition. 512 513 results += self.materialise_item(current, current, next, counter) 514 515 else: 516 if index < 0: 517 current = to_tuple(get_last_day(last_day, value), 3) 518 direction = operator.sub 519 else: 520 current = to_tuple(get_first_day(first_day, value), 3) 521 direction = operator.add 522 523 while first_day <= current <= last_day: 524 next = update(current, step) 525 526 # To support setpos, only current and next bound the search, not 527 # the period in addition. 528 529 results += self.materialise_item(current, current, next, counter) 530 current = to_tuple(direction(date(*current), timedelta(7)), 3) 531 532 # Extract selected positions and remove out-of-period instances. 533 534 if setpos: 535 results = self.select_positions(results, setpos) 536 537 return self.filter_by_period(results, start, end) 538 539 class Enum(Selector): 540 def materialise_items(self, context, start, end, counter, setpos=None): 541 step = scale(1, self.pos) 542 results = [] 543 for value in self.args["values"]: 544 current = combine(context, scale(value, self.pos)) 545 next = update(current, step) 546 547 # To support setpos, only current and next bound the search, not 548 # the period in addition. 549 550 results += self.materialise_item(current, current, next, counter, setpos) 551 552 # Extract selected positions and remove out-of-period instances. 553 554 if setpos: 555 results = self.select_positions(results, setpos) 556 557 return self.filter_by_period(results, start, end) 558 559 class MonthDayFilter(Enum): 560 def materialise_items(self, context, start, end, counter, setpos=None): 561 last_day = monthrange(context[0], context[1])[1] 562 step = scale(1, self.pos) 563 results = [] 564 for value in self.args["values"]: 565 if value < 0: 566 value = last_day + 1 + value 567 current = combine(context, scale(value, self.pos)) 568 next = update(current, step) 569 570 # To support setpos, only current and next bound the search, not 571 # the period in addition. 572 573 results += self.materialise_item(current, current, next, counter) 574 575 # Extract selected positions and remove out-of-period instances. 576 577 if setpos: 578 results = self.select_positions(results, setpos) 579 580 return self.filter_by_period(results, start, end) 581 582 class YearDayFilter(Enum): 583 def materialise_items(self, context, start, end, counter, setpos=None): 584 first_day = date(context[0], 1, 1) 585 next_first_day = date(context[0] + 1, 1, 1) 586 year_length = (next_first_day - first_day).days 587 step = scale(1, self.pos) 588 results = [] 589 for value in self.args["values"]: 590 if value < 0: 591 value = year_length + 1 + value 592 current = to_tuple(first_day + timedelta(value - 1), 3) 593 next = update(current, step) 594 595 # To support setpos, only current and next bound the search, not 596 # the period in addition. 597 598 results += self.materialise_item(current, current, next, counter) 599 600 # Extract selected positions and remove out-of-period instances. 601 602 if setpos: 603 results = self.select_positions(results, setpos) 604 605 return self.filter_by_period(results, start, end) 606 607 special_enum_levels = { 608 "BYDAY" : WeekDayFilter, 609 "BYMONTHDAY" : MonthDayFilter, 610 "BYYEARDAY" : YearDayFilter, 611 } 612 613 # Public functions. 614 615 def connect_selectors(selectors): 616 current = selectors[0] 617 for selector in selectors[1:]: 618 current.selecting = selector 619 current = selector 620 return selectors[0] 621 622 def get_selector(dt, qualifiers): 623 624 """ 625 Combine the initial datetime 'dt' with the given 'qualifiers', returning an 626 object that can be used to select recurrences described by the 'qualifiers'. 627 """ 628 629 dt = to_tuple(dt) 630 return connect_selectors(combine_datetime_with_qualifiers(dt, qualifiers)) 631 632 def get_rule(dt, rule): 633 634 """ 635 Using the given initial datetime 'dt', interpret the 'rule' (a semicolon- 636 separated collection of "key=value" strings), and return the resulting 637 selector object. 638 """ 639 640 qualifiers = get_qualifiers(rule.split(";")) 641 return get_selector(dt, qualifiers) 642 643 # vim: tabstop=4 expandtab shiftwidth=4