imip-agent

Annotated imiptools/handlers/scheduling/access.py

1031:fc1efc973849
2016-01-31 Paul Boddie Introduced the access control list scheduling function plus argument support. Moved scheduling function lookup into the scheduling module as part of the revised function invocation parsing activity, employing a new text processing module in imiptools. Added tests of the access control list functionality.
paul@1028 1
#!/usr/bin/env python
paul@1028 2
paul@1028 3
"""
paul@1028 4
Access-control-related scheduling functionality.
paul@1028 5
paul@1028 6
Copyright (C) 2016 Paul Boddie <paul@boddie.org.uk>
paul@1028 7
paul@1028 8
This program is free software; you can redistribute it and/or modify it under
paul@1028 9
the terms of the GNU General Public License as published by the Free Software
paul@1028 10
Foundation; either version 3 of the License, or (at your option) any later
paul@1028 11
version.
paul@1028 12
paul@1028 13
This program is distributed in the hope that it will be useful, but WITHOUT
paul@1028 14
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
paul@1028 15
FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
paul@1028 16
details.
paul@1028 17
paul@1028 18
You should have received a copy of the GNU General Public License along with
paul@1028 19
this program.  If not, see <http://www.gnu.org/licenses/>.
paul@1028 20
"""
paul@1028 21
paul@1031 22
from imiptools.data import get_address, get_addresses
paul@1031 23
from imiptools.text import parse_line
paul@1031 24
paul@1031 25
def access_control_list(handler, args):
paul@1031 26
paul@1031 27
    """
paul@1031 28
    Attempt to schedule the current object of the given 'handler' using an
paul@1031 29
    access control list provided in the given 'args', applying it to the
paul@1031 30
    organiser.
paul@1031 31
    """
paul@1031 32
paul@1031 33
    # Obtain either a file from the user's preferences directory...
paul@1031 34
paul@1031 35
    if not args:
paul@1031 36
        acl = handler.get_preferences().get("acl")
paul@1031 37
        lines = acl.strip().split("\n")
paul@1031 38
paul@1031 39
    # Or obtain the contents of a specific file.
paul@1031 40
paul@1031 41
    else:
paul@1031 42
        try:
paul@1031 43
            f = open(args[0])
paul@1031 44
        except IOError:
paul@1031 45
            return None
paul@1031 46
        try:
paul@1031 47
            lines = f.readlines()
paul@1031 48
        finally:
paul@1031 49
            f.close()
paul@1031 50
paul@1031 51
    # Use the current object's identities with the ACL rules.
paul@1031 52
paul@1031 53
    organiser = get_address(handler.obj.get_value("ORGANIZER"))
paul@1031 54
    attendees = get_addresses(handler.obj.get_values("ATTENDEE"))
paul@1031 55
paul@1031 56
    response = None
paul@1031 57
paul@1031 58
    for line in lines:
paul@1031 59
        parts = parse_line(line.strip())
paul@1031 60
paul@1031 61
        # Skip empty lines.
paul@1031 62
paul@1031 63
        if not parts:
paul@1031 64
            continue
paul@1028 65
paul@1031 66
        # Accept either a single word with an action or a rule.
paul@1031 67
        # NOTE: Should signal an error with the format.
paul@1031 68
paul@1031 69
        if len(parts) == 1:
paul@1031 70
            action = parts[0]
paul@1031 71
        elif len(parts) >= 3:
paul@1031 72
            action, role, identities = parts[0], parts[1], map(get_address, parts[2:])
paul@1031 73
        else:
paul@1031 74
            return None
paul@1031 75
paul@1031 76
        if action.lower() == "accept":
paul@1031 77
            result = "ACCEPTED"
paul@1031 78
        elif action.lower() in ["decline", "reject"]:
paul@1031 79
            result = "DECLINED"
paul@1031 80
        else:
paul@1031 81
            return None
paul@1031 82
paul@1031 83
        # With only an action, prepare a default response in case none of
paul@1031 84
        # the rules match.
paul@1031 85
paul@1031 86
        if len(parts) == 1:
paul@1031 87
            response = result
paul@1031 88
            continue
paul@1031 89
paul@1031 90
        # Where no default has been set, use an implicit default based on
paul@1031 91
        # the action appearing in a rule.
paul@1031 92
paul@1031 93
        elif not response:
paul@1031 94
            response = result == "ACCEPTED" and "DECLINED" or "ACCEPTED"
paul@1031 95
paul@1031 96
        # Interpret a rule, attempting to match identities to properties.
paul@1031 97
paul@1031 98
        if role.lower() in ["organiser", "organizer"]:
paul@1031 99
            match = organiser in identities
paul@1031 100
        elif role.lower() in ["attendee", "attendees"]:
paul@1031 101
            match = set(attendees).intersection(identities)
paul@1031 102
        else:
paul@1031 103
            return None
paul@1031 104
paul@1031 105
        # Use the result of any match.
paul@1031 106
paul@1031 107
        if match:
paul@1031 108
            response = result
paul@1031 109
paul@1031 110
    return response
paul@1031 111
paul@1031 112
def same_domain_only(handler, args):
paul@1028 113
paul@1028 114
    """
paul@1028 115
    Attempt to schedule the current object of the given 'handler' if the
paul@1028 116
    organiser employs an address in the same domain as the resource.
paul@1028 117
    """
paul@1028 118
paul@1028 119
    organiser = get_address(handler.obj.get_value("ORGANIZER"))
paul@1028 120
    user = get_address(handler.user)
paul@1028 121
paul@1028 122
    organiser_domain = organiser.rsplit("@", 1)[-1]
paul@1028 123
    user_domain = user.rsplit("@", 1)[-1]
paul@1028 124
    
paul@1028 125
    return organiser_domain == user_domain and "ACCEPTED" or "DECLINED"
paul@1028 126
paul@1028 127
# Registry of scheduling functions.
paul@1028 128
paul@1028 129
scheduling_functions = {
paul@1031 130
    "access_control_list" : access_control_list,
paul@1028 131
    "same_domain_only" : same_domain_only,
paul@1028 132
    }
paul@1028 133
paul@1028 134
# vim: tabstop=4 expandtab shiftwidth=4