imip-agent

Annotated imip_store.py

17:800c6b929859
2014-09-23 Paul Boddie Return tuples in free/busy lists for stable comparisons.
paul@2 1
#!/usr/bin/env python
paul@2 2
paul@15 3
from os.path import abspath, commonprefix, exists, join, split
paul@2 4
from os import makedirs
paul@15 5
from vCalendar import iterwrite
paul@2 6
paul@15 7
STORE_DIR = "/var/lib/imip-agent/store"
paul@2 8
paul@2 9
def check_dir(base, dir):
paul@2 10
    return commonprefix([base, abspath(dir)]) == base
paul@2 11
paul@15 12
def to_stream(out, fragment, objtype, encoding="utf-8"):
paul@15 13
    iterwrite(out, encoding=encoding).write(objtype, {}, fragment)
paul@15 14
paul@2 15
class FileStore:
paul@2 16
paul@2 17
    "A file store of tabular data."
paul@2 18
paul@2 19
    def __init__(self):
paul@2 20
        self.store_dir = STORE_DIR
paul@2 21
        if not exists(self.store_dir):
paul@2 22
            makedirs(self.store_dir)
paul@2 23
paul@15 24
    def get_file_object(self, base, *parts):
paul@15 25
        pathname = join(base, *parts)
paul@15 26
        return check_dir(base, pathname) and pathname or None
paul@2 27
paul@15 28
    def get_event(self, user, uid):
paul@2 29
paul@15 30
        "Get the event for the given 'user' with the given 'uid'."
paul@15 31
paul@15 32
        filename = self.get_file_object(self.store_dir, user, uid)
paul@15 33
        if not filename:
paul@2 34
            return None
paul@2 35
paul@15 36
        return exists(filename) and open(filename) or None
paul@15 37
paul@15 38
    def set_event(self, user, uid, details):
paul@15 39
paul@15 40
        "Set an event for 'user' having the given 'uid' and 'details'."
paul@15 41
paul@15 42
        dir = self.get_file_object(self.store_dir, user)
paul@15 43
        if not dir:
paul@15 44
            return False
paul@15 45
paul@15 46
        filename = self.get_file_object(dir, uid)
paul@15 47
        if not filename:
paul@15 48
            return False
paul@15 49
paul@15 50
        if not exists(dir):
paul@15 51
            makedirs(dir)
paul@15 52
paul@15 53
        f = open(filename, "w")
paul@15 54
        try:
paul@15 55
            to_stream(f, details, "VEVENT")
paul@15 56
        finally:
paul@15 57
            f.close()
paul@15 58
paul@15 59
        return True
paul@15 60
paul@15 61
    def get_freebusy(self, user):
paul@15 62
paul@15 63
        "Get free/busy details for the given 'user'."
paul@15 64
paul@15 65
        filename = self.get_file_object(self.store_dir, user, "freebusy")
paul@15 66
        if not filename or not exists(filename):
paul@2 67
            return None
paul@2 68
paul@2 69
        f = open(filename)
paul@2 70
        try:
paul@2 71
            l = []
paul@2 72
            for line in f.readlines():
paul@17 73
                l.append(tuple(line.strip().split("\t")))
paul@2 74
            return l
paul@2 75
        finally:
paul@2 76
            f.close()
paul@2 77
paul@15 78
    def set_freebusy(self, user, freebusy):
paul@15 79
paul@15 80
        "For the given 'user', set 'freebusy' details."
paul@15 81
paul@15 82
        filename = self.get_file_object(self.store_dir, user, "freebusy")
paul@15 83
        if not filename:
paul@15 84
            return False
paul@15 85
paul@15 86
        dir = split(filename)[0]
paul@15 87
paul@15 88
        if not exists(dir):
paul@15 89
            makedirs(dir)
paul@15 90
paul@15 91
        f = open(filename, "w")
paul@15 92
        try:
paul@15 93
            for item in freebusy:
paul@15 94
                f.write("\t".join(item) + "\n")
paul@15 95
        finally:
paul@15 96
            f.close()
paul@15 97
paul@15 98
        return True
paul@15 99
paul@2 100
# vim: tabstop=4 expandtab shiftwidth=4