vContent

Annotated vRecurrence.py

45:ea1c7b3a92cc
2014-10-04 Paul Boddie Added an initial year day filtering capability, needing refinement.
paul@41 1
#!/usr/bin/env python
paul@41 2
paul@41 3
"""
paul@41 4
Recurrence rule calculation.
paul@41 5
paul@41 6
Copyright (C) 2014 Paul Boddie <paul@boddie.org.uk>
paul@41 7
paul@41 8
This program is free software; you can redistribute it and/or modify it under
paul@41 9
the terms of the GNU General Public License as published by the Free Software
paul@41 10
Foundation; either version 3 of the License, or (at your option) any later
paul@41 11
version.
paul@41 12
paul@41 13
This program is distributed in the hope that it will be useful, but WITHOUT
paul@41 14
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
paul@41 15
FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
paul@41 16
details.
paul@41 17
paul@41 18
You should have received a copy of the GNU General Public License along with
paul@41 19
this program.  If not, see <http://www.gnu.org/licenses/>.
paul@41 20
paul@41 21
----
paul@41 22
paul@41 23
References:
paul@41 24
paul@41 25
RFC 5545: Internet Calendaring and Scheduling Core Object Specification
paul@41 26
          (iCalendar)
paul@41 27
          http://tools.ietf.org/html/rfc5545
paul@41 28
paul@41 29
----
paul@41 30
paul@41 31
FREQ defines the selection resolution.
paul@41 32
DTSTART defines the start of the selection.
paul@41 33
INTERVAL defines the step of the selection.
paul@41 34
COUNT defines a number of instances; UNTIL defines a limit to the selection.
paul@41 35
paul@41 36
BY... qualifiers select instances within each outer selection instance according
paul@41 37
to the recurrence of instances of the next highest resolution. For example,
paul@41 38
BYDAY selects days in weeks. Thus, if no explicit week recurrence is indicated,
paul@41 39
all weeks are selected within the selection of the next highest explicitly
paul@41 40
specified resolution, whether this is months or years.
paul@41 41
paul@41 42
BYSETPOS in conjunction with BY... qualifiers permit the selection of specific
paul@41 43
instances.
paul@41 44
paul@41 45
Within the FREQ resolution, BY... qualifiers refine selected instances.
paul@41 46
paul@41 47
Outside the FREQ resolution, BY... qualifiers select instances at the resolution
paul@41 48
concerned.
paul@41 49
paul@41 50
Thus, FREQ and BY... qualifiers need to be ordered in terms of increasing
paul@41 51
resolution (or decreasing scope).
paul@41 52
"""
paul@41 53
paul@42 54
from calendar import monthrange
paul@41 55
from datetime import date, datetime, timedelta
paul@41 56
import operator
paul@41 57
paul@41 58
# Frequency levels, specified by FREQ in iCalendar.
paul@41 59
paul@41 60
freq_levels = (
paul@41 61
    "SECONDLY",
paul@41 62
    "MINUTELY",
paul@41 63
    "HOURLY",
paul@41 64
    "DAILY",
paul@41 65
    "WEEKLY",
paul@41 66
    "MONTHLY",
paul@41 67
    "YEARLY"
paul@41 68
    )
paul@41 69
paul@41 70
# Enumeration levels, employed by BY... qualifiers.
paul@41 71
paul@41 72
enum_levels = (
paul@41 73
    ("BYSECOND",),
paul@41 74
    ("BYMINUTE",),
paul@41 75
    ("BYHOUR",),
paul@41 76
    ("BYDAY", "BYMONTHDAY", "BYYEARDAY"),
paul@41 77
    ("BYWEEKNO",),
paul@41 78
    ("BYMONTH",)
paul@41 79
    )
paul@41 80
paul@41 81
# Map from levels to lengths of datetime tuples.
paul@41 82
paul@41 83
lengths = [6, 5, 4, 3, 3, 2, 1]
paul@41 84
positions = [l-1 for l in lengths]
paul@41 85
paul@41 86
# Map from qualifiers to interval units. Here, weeks are defined as 7 days.
paul@41 87
paul@41 88
units = {"WEEKLY" : 7}
paul@41 89
paul@41 90
# Map from tuple positions to base values at each resolution.
paul@41 91
paul@41 92
bases = [0, 1, 1, 0, 0, 0]
paul@41 93
paul@41 94
def basevalue(resolution):
paul@41 95
    return bases[positions[resolution]]
paul@41 96
paul@41 97
# Make dictionaries mapping qualifiers to levels.
paul@41 98
paul@41 99
freq = dict([(level, i) for (i, level) in enumerate(freq_levels)])
paul@41 100
enum = dict([(level, i) for (i, levels) in enumerate(enum_levels) for level in levels])
paul@41 101
paul@41 102
# Functions for structuring the recurrences.
paul@41 103
paul@41 104
def get_next(it):
paul@41 105
    try:
paul@41 106
        return it.next()
paul@41 107
    except StopIteration:
paul@41 108
        return None
paul@41 109
paul@41 110
def order_qualifiers(qualifiers):
paul@41 111
paul@41 112
    "Return the 'qualifiers' in order of increasing resolution."
paul@41 113
paul@41 114
    l = []
paul@41 115
paul@41 116
    for qualifier, args in qualifiers:
paul@41 117
        if enum.has_key(qualifier):
paul@41 118
            level = enum[qualifier]
paul@43 119
            if special_enum_levels.has_key(qualifier):
paul@41 120
                args["interval"] = 1
paul@43 121
                selector = special_enum_levels[qualifier]
paul@41 122
            else:
paul@41 123
                selector = Enum
paul@41 124
        else:
paul@41 125
            level = freq[qualifier]
paul@41 126
            selector = Pattern
paul@41 127
paul@41 128
        pos = positions[level]
paul@41 129
        l.append(selector(pos, args, qualifier))
paul@41 130
paul@41 131
    l.sort(key=lambda x: x.pos)
paul@41 132
    return l
paul@41 133
paul@41 134
def get_datetime_structure(datetime):
paul@41 135
paul@41 136
    "Return the structure of 'datetime' for recurrence production."
paul@41 137
paul@41 138
    l = []
paul@41 139
    for pos, value in enumerate(datetime):
paul@41 140
        l.append(Enum(pos, {"values" : [value]}, "DT"))
paul@41 141
    return l
paul@41 142
paul@41 143
def combine_datetime_with_qualifiers(datetime, qualifiers):
paul@41 144
paul@41 145
    """
paul@41 146
    Combine 'datetime' with 'qualifiers' to produce a structure for recurrence
paul@41 147
    production.
paul@41 148
    """
paul@41 149
paul@41 150
    iter_dt = iter(get_datetime_structure(datetime))
paul@41 151
    iter_q = iter(order_qualifiers(qualifiers))
paul@41 152
paul@41 153
    l = []
paul@41 154
paul@41 155
    from_dt = get_next(iter_dt)
paul@41 156
    from_q = get_next(iter_q)
paul@41 157
paul@41 158
    have_q = False
paul@41 159
    context = []
paul@41 160
paul@41 161
    # Consume from both lists, merging entries.
paul@41 162
paul@41 163
    while from_dt and from_q:
paul@41 164
        _pos = from_dt.pos
paul@41 165
        pos = from_q.pos
paul@41 166
paul@41 167
        # Datetime value at wider resolution.
paul@41 168
paul@41 169
        if _pos < pos:
paul@41 170
            if have_q:
paul@41 171
                l.append(from_dt)
paul@41 172
            else:
paul@41 173
                context.append(from_dt.args["values"][0])
paul@41 174
            from_dt = get_next(iter_dt)
paul@41 175
paul@41 176
        # Qualifier at wider or same resolution as datetime value.
paul@41 177
paul@41 178
        else:
paul@41 179
            if not have_q:
paul@41 180
                if isinstance(from_q, Enum) and pos > 0:
paul@41 181
                    repeat = Pattern(pos - 1, {"interval" : 1, "values" : [context[-1]]}, "REPEAT")
paul@41 182
                    repeat.context = tuple(context[:-1])
paul@41 183
                    l.append(repeat)
paul@41 184
                else:
paul@41 185
                    from_q.context = tuple(context)
paul@41 186
                have_q = True
paul@41 187
paul@41 188
            # Either introduce the qualifier first.
paul@41 189
paul@41 190
            if _pos > pos:
paul@41 191
                l.append(from_q)
paul@41 192
paul@41 193
            # Or combine the qualifier and value details.
paul@41 194
paul@41 195
            else:
paul@41 196
                l.append(combine_value_with_qualifier(from_dt, from_q))
paul@41 197
                from_dt = get_next(iter_dt)
paul@41 198
paul@41 199
            from_q = get_next(iter_q)
paul@41 200
paul@41 201
    # Complete the list.
paul@41 202
paul@41 203
    while from_dt:
paul@41 204
        l.append(from_dt)
paul@41 205
        from_dt = get_next(iter_dt)
paul@41 206
paul@41 207
    while from_q:
paul@41 208
        if not have_q:
paul@41 209
            if isinstance(from_q, Enum) and pos > 0:
paul@41 210
                repeat = Pattern(pos - 1, {"interval" : 1, "values" : [context[-1]]}, "REPEAT")
paul@41 211
                repeat.context = tuple(context[:-1])
paul@41 212
                l.append(repeat)
paul@41 213
            else:
paul@41 214
                from_q.context = tuple(context)
paul@41 215
            have_q = True
paul@41 216
        l.append(from_q)
paul@41 217
        from_q = get_next(iter_q)
paul@41 218
paul@41 219
    return l
paul@41 220
paul@41 221
def combine_value_with_qualifier(from_dt, from_q):
paul@41 222
paul@41 223
    """
paul@41 224
    Combine value information supplied by 'from_dt' (a datetime) and 'from_q'
paul@41 225
    (a qualifier), imposing the datetime value information on any qualifiers.
paul@41 226
    """
paul@41 227
paul@41 228
    if not from_q.args.has_key("values") and from_dt.args.has_key("values"):
paul@41 229
        from_q.args["values"] = from_dt.args["values"]
paul@41 230
    return from_q
paul@41 231
paul@41 232
# Datetime arithmetic.
paul@41 233
paul@41 234
def combine(t1, t2):
paul@41 235
    return tuple(map(lambda x, y: x or y, t1, t2))
paul@41 236
paul@41 237
def scale(interval, pos):
paul@41 238
    return (0,) * pos + (interval,)
paul@41 239
paul@41 240
def get_seconds(t):
paul@41 241
paul@41 242
    "Convert the sub-day part of 't' into seconds."
paul@41 243
paul@41 244
    t = t + (0,) * (6 - len(t))
paul@41 245
    return (t[3] * 60 + t[4]) * 60 + t[5]
paul@41 246
paul@41 247
def update(t, step):
paul@41 248
paul@41 249
    "Update 't' by 'step' at the resolution of 'step'."
paul@41 250
paul@41 251
    i = len(step)
paul@41 252
paul@41 253
    # Years only.
paul@41 254
paul@41 255
    if i == 1:
paul@41 256
        return (t[0] + step[0],) + tuple(t[1:])
paul@41 257
paul@41 258
    # Years and months.
paul@41 259
paul@41 260
    elif i == 2:
paul@41 261
        month = t[1] + step[1]
paul@41 262
        return (t[0] + step[0] + (month - 1) / 12, (month - 1) % 12 + 1) + tuple(t[2:])
paul@41 263
paul@41 264
    # Dates and datetimes.
paul@41 265
paul@41 266
    else:
paul@41 267
        updated_for_months = update(t, step[:2])
paul@41 268
paul@41 269
        # Dates only.
paul@41 270
paul@41 271
        if i == 3:
paul@41 272
            d = datetime(*updated_for_months)
paul@41 273
            s = timedelta(step[2])
paul@41 274
paul@41 275
        # Datetimes.
paul@41 276
paul@41 277
        else:
paul@41 278
            d = datetime(*updated_for_months)
paul@41 279
            s = timedelta(step[2], get_seconds(step))
paul@41 280
paul@41 281
        return (d + s).timetuple()[:len(t)]
paul@41 282
paul@41 283
# Classes for producing instances from recurrence structures.
paul@41 284
paul@41 285
class Selector:
paul@41 286
    def __init__(self, pos, args, qualifier, selecting=None):
paul@41 287
        self.pos = pos
paul@41 288
        self.args = args
paul@41 289
        self.qualifier = qualifier
paul@41 290
        self.context = ()
paul@41 291
        self.selecting = selecting
paul@41 292
paul@41 293
    def __repr__(self):
paul@41 294
        return "%s(%r, %r, %r, %r)" % (self.__class__.__name__, self.pos, self.args, self.qualifier, self.context)
paul@41 295
paul@43 296
    def materialise(self, start, end, count=None):
paul@41 297
        counter = count and [0, count]
paul@43 298
        return self.materialise_items(self.context, start, end, counter)
paul@41 299
paul@43 300
    def materialise_item(self, current, last, next, counter):
paul@42 301
        if counter is None or counter[0] < counter[1]:
paul@42 302
            if self.selecting:
paul@43 303
                return self.selecting.materialise_items(current, last, next, counter)
paul@43 304
            elif last <= current:
paul@42 305
                if counter is not None:
paul@42 306
                    counter[0] += 1
paul@42 307
                return [current]
paul@43 308
        return []
paul@41 309
paul@41 310
class Pattern(Selector):
paul@43 311
    def materialise_items(self, context, start, end, counter):
paul@42 312
        first = scale(self.args["values"][0], self.pos)
paul@42 313
paul@42 314
        # Define the step between items.
paul@42 315
paul@41 316
        interval = self.args.get("interval", 1) * units.get(self.qualifier, 1)
paul@41 317
        step = scale(interval, self.pos)
paul@42 318
paul@42 319
        # Define the scale of a single item.
paul@42 320
paul@41 321
        unit_interval = units.get(self.qualifier, 1)
paul@41 322
        unit_step = scale(unit_interval, self.pos)
paul@42 323
paul@42 324
        current = combine(context, first)
paul@41 325
        results = []
paul@42 326
paul@41 327
        while current < end and (counter is None or counter[0] < counter[1]):
paul@41 328
            next = update(current, step)
paul@41 329
            current_end = update(current, unit_step)
paul@43 330
            results += self.materialise_item(current, max(current, start), min(current_end, end), counter)
paul@41 331
            current = next
paul@42 332
paul@41 333
        return results
paul@41 334
paul@43 335
class WeekDayFilter(Selector):
paul@43 336
    def materialise_items(self, context, start, end, counter):
paul@42 337
        first = scale(bases[self.pos], self.pos)
paul@42 338
paul@42 339
        # Define the step between items to be tested.
paul@42 340
paul@41 341
        step = scale(1, self.pos)
paul@42 342
paul@42 343
        current = combine(context, first)
paul@41 344
        results = []
paul@42 345
paul@41 346
        while current < end and (counter is None or counter[0] < counter[1]):
paul@41 347
            next = update(current, step)
paul@43 348
            results += self.materialise_item(current, max(current, start), min(next, end), counter)
paul@41 349
            current = next
paul@42 350
paul@41 351
        return results
paul@41 352
paul@43 353
    def materialise_item(self, current, last, next, counter):
paul@41 354
        values = self.args["values"]
paul@42 355
paul@42 356
        # Select by day in week, also by occurrence in month.
paul@42 357
paul@43 358
        for value, index in values:
paul@43 359
            if datetime(*current).isoweekday() == value:
paul@43 360
                last_day = monthrange(current[0], current[1])[1]
paul@43 361
                index_from_start = (current[2] - 1) / 7
paul@43 362
                index_from_end = -(1 + (last_day - current[2]) / 7)
paul@43 363
                if index is None or index in (index_from_start, index_from_end):
paul@43 364
                    return Selector.materialise_item(self, current, last, next, counter)
paul@42 365
paul@41 366
        return []
paul@41 367
paul@41 368
class Enum(Selector):
paul@43 369
    def materialise_items(self, context, start, end, counter):
paul@41 370
        step = scale(1, self.pos)
paul@41 371
        results = []
paul@41 372
        for value in self.args["values"]:
paul@41 373
            current = combine(context, scale(value, self.pos))
paul@41 374
            if current < end and (counter is None or counter[0] < counter[1]):
paul@41 375
                next = update(current, step)
paul@43 376
                results += self.materialise_item(current, max(current, start), min(next, end), counter)
paul@43 377
        return results
paul@43 378
paul@43 379
class MonthDayFilter(Enum):
paul@43 380
    def materialise_items(self, context, start, end, counter):
paul@43 381
        last_day = monthrange(context[0], context[1])[1]
paul@43 382
        step = scale(1, self.pos)
paul@43 383
        results = []
paul@43 384
        for value in self.args["values"]:
paul@43 385
            if value < 0:
paul@43 386
                value = last_day + 1 + value
paul@43 387
            current = combine(context, scale(value, self.pos))
paul@43 388
            if current < end and (counter is None or counter[0] < counter[1]):
paul@43 389
                next = update(current, step)
paul@43 390
                results += self.materialise_item(current, max(current, start), min(next, end), counter)
paul@41 391
        return results
paul@41 392
paul@45 393
class YearDayFilter(Enum):
paul@45 394
    def materialise_items(self, context, start, end, counter):
paul@45 395
        first_day = date(context[0], 1, 1)
paul@45 396
        next_first_day = date(context[0] + 1, 1, 1)
paul@45 397
        year_length = (next_first_day - first_day).days
paul@45 398
        step = scale(1, self.pos)
paul@45 399
        results = []
paul@45 400
        for value in self.args["values"]:
paul@45 401
            if value < 0:
paul@45 402
                value = year_length + 1 + value
paul@45 403
            current = (first_day + timedelta(value - 1)).timetuple()[:3]
paul@45 404
            if current < end and (counter is None or counter[0] < counter[1]):
paul@45 405
                next = update(current, step)
paul@45 406
                results += self.materialise_item(current, max(current, start), min(next, end), counter)
paul@45 407
        return results
paul@45 408
paul@41 409
def process(selectors):
paul@41 410
    current = selectors[0]
paul@41 411
    for selector in selectors[1:]:
paul@41 412
        current.selecting = selector
paul@41 413
        current = selector
paul@41 414
    return selectors[0]
paul@41 415
paul@43 416
special_enum_levels = {
paul@43 417
    "BYDAY" : WeekDayFilter,
paul@43 418
    "BYMONTHDAY" : MonthDayFilter,
paul@45 419
    "BYYEARDAY" : YearDayFilter,
paul@43 420
    }
paul@43 421
paul@41 422
# vim: tabstop=4 expandtab shiftwidth=4