1 #!/usr/bin/env python 2 3 """ 4 OpenID redirection classes, sending unauthenticated users to the OpenID 5 initiation page. 6 7 Copyright (C) 2004, 2005, 2006, 2007 Paul Boddie <paul@boddie.org.uk> 8 9 This library is free software; you can redistribute it and/or 10 modify it under the terms of the GNU Lesser General Public 11 License as published by the Free Software Foundation; either 12 version 2.1 of the License, or (at your option) any later version. 13 14 This library is distributed in the hope that it will be useful, 15 but WITHOUT ANY WARRANTY; without even the implied warranty of 16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 17 Lesser General Public License for more details. 18 19 You should have received a copy of the GNU Lesser General Public 20 License along with this library; if not, write to the Free Software 21 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA 22 """ 23 24 from WebStack.Resources.LoginRedirect import LoginRedirectResource 25 from WebStack.Helpers.Auth import Verifier, check_openid_signature 26 import WebStack.Generic 27 import datetime 28 import urllib 29 30 class OpenIDRedirectResource(LoginRedirectResource): 31 32 "A resource redirecting to an OpenID initiation page." 33 34 openid_ns = "http://specs.openid.net/auth/2.0" 35 36 def respond(self, trans): 37 38 "Respond using the given transaction 'trans'." 39 40 fields_path = trans.get_fields_from_path(self.path_encoding) 41 42 # If requested, attempt to verify OpenID assertions. 43 # http://openid.net/specs/openid-authentication-2_0-12.html#rfc.section.11 44 45 if fields_path.get("openid.ns", [None])[0] == self.openid_ns and \ 46 fields_path.get("openid.mode", [None])[0] == "id_res": 47 48 if self.authenticator.authenticate(trans, verify=1): 49 trans.redirect(fields_path["openid.return_to"][0]) 50 51 # Otherwise, handle the usual parameters and request details. 52 53 LoginRedirectResource.respond(self, trans) 54 55 class OpenIDRedirectAuthenticator(Verifier): 56 57 """ 58 An authenticator which verifies the credentials provided in a special login 59 cookie, accepting OpenID assertions if necessary. 60 """ 61 62 encoding = "utf-8" 63 openid_ns = "http://specs.openid.net/auth/2.0" 64 replay_limit = datetime.timedelta(0, 10) # 10s 65 66 def __init__(self, secret_key, app_url, associations=None, replay_limit=None, 67 cookie_name=None, urlencoding=None): 68 69 """ 70 Initialise the authenticator with a 'secret_key', 'app_url' and optional 71 'associations', 'replay_limit', 'cookie_name' and 'urlencoding'. 72 """ 73 74 Verifier.__init__(self, secret_key, cookie_name) 75 76 self.app_url = app_url 77 self.associations = associations or {} 78 self.replay_limit = replay_limit or self.replay_limit 79 self.urlencoding = urlencoding or self.encoding 80 81 def authenticate(self, trans, verify=0): 82 83 """ 84 Authenticate the originator of 'trans', updating the object if 85 successful and returning a true value if successful, or a false value 86 otherwise. 87 88 If the optional 'verify' parameter is specified as a true value, perform 89 verification on any 90 """ 91 92 # If requested, attempt to verify OpenID assertions. 93 94 if verify: 95 fields_path = trans.get_fields_from_path(self.urlencoding) 96 97 # NOTE: Could expose all errors. 98 99 try: 100 # Test the details of the assertion. 101 102 if self.test_url(fields_path) and \ 103 self.test_signature(fields_path) and \ 104 self.test_replay(fields_path): 105 106 self.set_token(trans, fields_path["openid.identity"][0]) 107 return 1 108 109 # Incomplete assertion. 110 111 except (KeyError, ValueError): 112 raise 113 114 # Assertion failed or was incomplete. 115 116 return 0 117 118 # Otherwise, try to authenticate with an application cookie. 119 120 else: 121 valid = Verifier.authenticate(self, trans) 122 123 # Update the transaction with the user details. 124 125 if valid: 126 username, token = self.get_username_and_token(trans) 127 trans.set_user(username) 128 return valid 129 130 def test_url(self, fields_path): 131 132 """ 133 See: 134 http://openid.net/specs/openid-authentication-2_0-12.html#rfc.section.11.1 135 """ 136 137 # NOTE: Currently, this is not strict enough. 138 139 return fields_path["openid.return_to"][0].startswith(self.app_url) 140 141 def test_signature(self, fields_path): 142 143 """ 144 See: 145 http://openid.net/specs/openid-authentication-2_0-12.html#rfc.section.11.4 146 http://openid.net/specs/openid-authentication-2_0-12.html#rfc.section.6 147 """ 148 149 handle = fields_path.get("openid.assoc_handle", [None])[0] 150 151 # With an association handle, look up the appropriate secret key and 152 # verify the signed items. 153 154 if handle is not None: 155 156 # Where an association exists, use the known secret key. 157 158 if self.associations.has_key(handle): 159 return check_openid_signature(fields_path, self.associations[handle]) 160 161 # Without an association, request verification of the signed items 162 # from the OpenID provider. 163 164 else: 165 return self.test_signature_direct(fields_path) 166 167 # Without a handle, no signature verification can occur. 168 169 return 0 170 171 def test_signature_direct(self, fields_path): 172 173 """ 174 See: 175 http://openid.net/specs/openid-authentication-2_0-12.html#rfc.section.11.4.2 176 """ 177 178 # Make a POST request using the "openid." fields. 179 180 d = {} 181 for name, values in fields_path.items(): 182 if name.startswith("openid.") and name != "openid.mode": 183 d[name] = values[0] 184 d["openid.mode"] = "check_authentication" 185 data = urllib.urlencode(d) 186 187 # Send a POST request to the OpenID provider, reading the response and 188 # testing for certain fields and values. 189 190 f = urllib.urlopen(fields_path["openid.op_endpoint"][0], data) 191 try: 192 items = [] 193 for line in f.readlines(): 194 if line[-1] == "\n": 195 line = line[:-1] 196 parts = line.split(":") 197 items.append((parts[0], ":".join(parts[1:]))) 198 fields = dict(items) 199 return fields["ns"] == self.openid_ns and fields["is_valid"] == "true" 200 finally: 201 f.close() 202 203 def test_replay(self, fields_path): 204 205 """ 206 See: 207 http://openid.net/specs/openid-authentication-2_0-12.html#rfc.section.11.3 208 """ 209 210 timestamp = fields_path["openid.response_nonce"][0] 211 # YYYY-MM-DDTHH:MM:SSZ... 212 year, month, day, hour, minute, second, unique = \ 213 int(timestamp[0:4]), int(timestamp[5:7]), int(timestamp[8:10]), \ 214 int(timestamp[11:13]), int(timestamp[14:16]), int(timestamp[17:19]), \ 215 timestamp[20:] 216 dt = datetime.datetime(year, month, day, hour, minute, second) 217 return -self.replay_limit < (datetime.datetime.utcnow() - dt) < self.replay_limit 218 219 def set_token(self, trans, username): 220 221 "Set an authentication token in 'trans' with the given 'username'." 222 223 Verifier.set_token(self, trans, username) 224 225 # Update the transaction with the user details. 226 227 trans.set_user(username) 228 229 # vim: tabstop=4 expandtab shiftwidth=4