1 #!/usr/bin/env python 2 3 """ 4 OpenID redirection classes, sending unauthenticated users to the OpenID 5 initiation page. 6 7 Copyright (C) 2004, 2005, 2006, 2007 Paul Boddie <paul@boddie.org.uk> 8 9 This library is free software; you can redistribute it and/or 10 modify it under the terms of the GNU Lesser General Public 11 License as published by the Free Software Foundation; either 12 version 2.1 of the License, or (at your option) any later version. 13 14 This library is distributed in the hope that it will be useful, 15 but WITHOUT ANY WARRANTY; without even the implied warranty of 16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 17 Lesser General Public License for more details. 18 19 You should have received a copy of the GNU Lesser General Public 20 License along with this library; if not, write to the Free Software 21 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA 22 """ 23 24 from WebStack.Resources.LoginRedirect import LoginRedirectResource 25 from WebStack.Helpers.Auth import Verifier, check_openid_signature 26 import WebStack.Generic 27 import datetime 28 import urllib 29 30 class OpenIDRedirectResource(LoginRedirectResource): 31 32 "A resource redirecting to an OpenID initiation page." 33 34 openid_ns = "http://specs.openid.net/auth/2.0" 35 36 def respond(self, trans): 37 38 "Respond using the given transaction 'trans'." 39 40 fields = trans.get_fields(self.path_encoding) 41 42 # If requested, attempt to verify OpenID assertions. 43 # http://openid.net/specs/openid-authentication-2_0-12.html#rfc.section.11 44 45 if fields.get("openid.mode", [None])[0] == "id_res": 46 47 # The additional condition could be used to insist on OpenID 2.0 48 # conformance: 49 # fields.get("openid.ns", [None])[0] == self.openid_ns 50 51 if self.authenticator.authenticate(trans, verify=1): 52 trans.redirect(trans.encode_url_without_query(fields["openid.return_to"][0])) 53 54 # Otherwise, handle the usual parameters and request details. 55 56 LoginRedirectResource.respond(self, trans) 57 58 class OpenIDRedirectAuthenticator(Verifier): 59 60 """ 61 An authenticator which verifies the credentials provided in a special login 62 cookie, accepting OpenID assertions if necessary. 63 """ 64 65 openid_ns = "http://specs.openid.net/auth/2.0" 66 replay_limit = datetime.timedelta(0, 10) # 10s 67 68 def __init__(self, secret_key, app_url, associations=None, replay_limit=None, 69 cookie_name=None, urlencoding=None): 70 71 """ 72 Initialise the authenticator with a 'secret_key', 'app_url' and optional 73 'associations', 'replay_limit', 'cookie_name' and 'urlencoding'. The 74 'app_url' should be the "bare" reference using a protocol, host and 75 port, not including any path information. 76 """ 77 78 Verifier.__init__(self, secret_key, cookie_name) 79 80 self.app_url = app_url 81 self.associations = associations or {} 82 self.replay_limit = replay_limit or self.replay_limit 83 self.urlencoding = urlencoding 84 85 def authenticate(self, trans, verify=0): 86 87 """ 88 Authenticate the originator of 'trans', updating the object if 89 successful and returning a true value if successful, or a false value 90 otherwise. 91 92 If the optional 'verify' parameter is specified as a true value, perform 93 verification on any 94 """ 95 96 # If requested, attempt to verify OpenID assertions. 97 98 if verify: 99 fields = trans.get_fields(self.urlencoding) 100 101 # NOTE: Could expose all errors. 102 103 try: 104 # Test the details of the assertion. 105 106 if self.test_url(fields) and \ 107 self.test_signature(trans, fields) and \ 108 self.test_replay(fields): 109 110 self.set_token(trans, fields["openid.identity"][0]) 111 return 1 112 113 # Incomplete assertion. 114 115 except (KeyError, ValueError): 116 raise 117 118 # Assertion failed or was incomplete. 119 120 return 0 121 122 # Otherwise, try to authenticate with an application cookie. 123 124 else: 125 valid = Verifier.authenticate(self, trans) 126 127 # Update the transaction with the user details. 128 129 if valid: 130 username, token = self.get_username_and_token(trans) 131 trans.set_user(username) 132 return valid 133 134 def test_url(self, fields): 135 136 """ 137 See: 138 http://openid.net/specs/openid-authentication-2_0-12.html#rfc.section.11.1 139 """ 140 141 # NOTE: Currently, this is not strict enough. 142 143 return fields["openid.return_to"][0].startswith(self.app_url) 144 145 def test_signature(self, trans, fields): 146 147 """ 148 See: 149 http://openid.net/specs/openid-authentication-2_0-12.html#rfc.section.11.4 150 http://openid.net/specs/openid-authentication-2_0-12.html#rfc.section.6 151 """ 152 153 handle = fields.get("openid.assoc_handle", [None])[0] 154 155 # With an association handle, look up the appropriate secret key and 156 # verify the signed items. 157 158 if handle is not None: 159 160 # Where an association exists, use the known secret key. 161 162 if self.associations.has_key(handle): 163 return check_openid_signature(fields, self.associations[handle]) 164 165 # Without an association, request verification of the signed items 166 # from the OpenID provider. 167 168 else: 169 return self.test_signature_direct(trans, fields) 170 171 # Without a handle, no signature verification can occur. 172 173 return 0 174 175 def test_signature_direct(self, trans, fields): 176 177 """ 178 See: 179 http://openid.net/specs/openid-authentication-2_0-12.html#rfc.section.11.4.2 180 """ 181 182 # Make a POST request using the "openid." fields. 183 184 d = [] 185 for name, values in fields.items(): 186 if name.startswith("openid.") and name != "openid.mode": 187 d.append("%s=%s" % (name, trans.encode_path(values[0]))) 188 d.append("%s=%s" % ("openid.mode", "check_authentication")) 189 data = "&".join(d) 190 191 # Send a POST request to the OpenID provider, reading the response and 192 # testing for certain fields and values. 193 194 f = urllib.urlopen(fields["openid.op_endpoint"][0], data) 195 try: 196 items = [] 197 for line in f.readlines(): 198 if line[-1] == "\n": 199 line = line[:-1] 200 parts = line.split(":") 201 items.append((parts[0], ":".join(parts[1:]))) 202 fields = dict(items) 203 return fields["ns"] == self.openid_ns and fields["is_valid"] == "true" 204 finally: 205 f.close() 206 207 def test_replay(self, fields): 208 209 """ 210 See: 211 http://openid.net/specs/openid-authentication-2_0-12.html#rfc.section.11.3 212 """ 213 214 timestamp = fields["openid.response_nonce"][0] 215 # YYYY-MM-DDTHH:MM:SSZ... 216 year, month, day, hour, minute, second, unique = \ 217 int(timestamp[0:4]), int(timestamp[5:7]), int(timestamp[8:10]), \ 218 int(timestamp[11:13]), int(timestamp[14:16]), int(timestamp[17:19]), \ 219 timestamp[20:] 220 dt = datetime.datetime(year, month, day, hour, minute, second) 221 return -self.replay_limit < (datetime.datetime.utcnow() - dt) < self.replay_limit 222 223 def set_token(self, trans, username): 224 225 "Set an authentication token in 'trans' with the given 'username'." 226 227 Verifier.set_token(self, trans, username) 228 229 # Update the transaction with the user details. 230 231 trans.set_user(username) 232 233 # vim: tabstop=4 expandtab shiftwidth=4