1 #!/usr/bin/env python 2 3 """ 4 OpenID redirection classes, sending unauthenticated users to the OpenID 5 initiation page. 6 7 Copyright (C) 2004, 2005, 2006, 2007 Paul Boddie <paul@boddie.org.uk> 8 9 This library is free software; you can redistribute it and/or 10 modify it under the terms of the GNU Lesser General Public 11 License as published by the Free Software Foundation; either 12 version 2.1 of the License, or (at your option) any later version. 13 14 This library is distributed in the hope that it will be useful, 15 but WITHOUT ANY WARRANTY; without even the implied warranty of 16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 17 Lesser General Public License for more details. 18 19 You should have received a copy of the GNU Lesser General Public 20 License along with this library; if not, write to the Free Software 21 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA 22 """ 23 24 from WebStack.Resources.LoginRedirect import LoginRedirectResource 25 from WebStack.Helpers.Auth import Verifier, check_openid_signature 26 import WebStack.Generic 27 import datetime 28 import urllib 29 30 class OpenIDRedirectResource(LoginRedirectResource): 31 32 "A resource redirecting to an OpenID initiation page." 33 34 openid_ns = "http://specs.openid.net/auth/2.0" 35 36 def respond(self, trans): 37 38 "Respond using the given transaction 'trans'." 39 40 fields = trans.get_fields(self.path_encoding) 41 42 # If requested, attempt to verify OpenID assertions. 43 # http://openid.net/specs/openid-authentication-2_0-12.html#rfc.section.11 44 45 if fields.get("openid.mode", [None])[0] == "id_res": 46 47 # The additional condition could be used to insist on OpenID 2.0 48 # conformance: 49 # fields.get("openid.ns", [None])[0] == self.openid_ns 50 51 if self.authenticator.authenticate(trans, verify=1): 52 trans.redirect(fields["openid.return_to"][0]) 53 54 # Otherwise, handle the usual parameters and request details. 55 56 LoginRedirectResource.respond(self, trans) 57 58 class OpenIDRedirectAuthenticator(Verifier): 59 60 """ 61 An authenticator which verifies the credentials provided in a special login 62 cookie, accepting OpenID assertions if necessary. 63 """ 64 65 openid_ns = "http://specs.openid.net/auth/2.0" 66 replay_limit = datetime.timedelta(0, 10) # 10s 67 68 def __init__(self, secret_key, app_url, associations=None, replay_limit=None, 69 cookie_name=None, urlencoding=None): 70 71 """ 72 Initialise the authenticator with a 'secret_key', 'app_url' and optional 73 'associations', 'replay_limit', 'cookie_name' and 'urlencoding'. 74 """ 75 76 Verifier.__init__(self, secret_key, cookie_name) 77 78 self.app_url = app_url 79 self.associations = associations or {} 80 self.replay_limit = replay_limit or self.replay_limit 81 self.urlencoding = urlencoding 82 83 def authenticate(self, trans, verify=0): 84 85 """ 86 Authenticate the originator of 'trans', updating the object if 87 successful and returning a true value if successful, or a false value 88 otherwise. 89 90 If the optional 'verify' parameter is specified as a true value, perform 91 verification on any 92 """ 93 94 # If requested, attempt to verify OpenID assertions. 95 96 if verify: 97 fields = trans.get_fields(self.urlencoding) 98 99 # NOTE: Could expose all errors. 100 101 try: 102 # Test the details of the assertion. 103 104 if self.test_url(fields) and \ 105 self.test_signature(fields) and \ 106 self.test_replay(fields): 107 108 self.set_token(trans, fields["openid.identity"][0]) 109 return 1 110 111 # Incomplete assertion. 112 113 except (KeyError, ValueError): 114 raise 115 116 # Assertion failed or was incomplete. 117 118 return 0 119 120 # Otherwise, try to authenticate with an application cookie. 121 122 else: 123 valid = Verifier.authenticate(self, trans) 124 125 # Update the transaction with the user details. 126 127 if valid: 128 username, token = self.get_username_and_token(trans) 129 trans.set_user(username) 130 return valid 131 132 def test_url(self, fields): 133 134 """ 135 See: 136 http://openid.net/specs/openid-authentication-2_0-12.html#rfc.section.11.1 137 """ 138 139 # NOTE: Currently, this is not strict enough. 140 141 return fields["openid.return_to"][0].startswith(self.app_url) 142 143 def test_signature(self, fields): 144 145 """ 146 See: 147 http://openid.net/specs/openid-authentication-2_0-12.html#rfc.section.11.4 148 http://openid.net/specs/openid-authentication-2_0-12.html#rfc.section.6 149 """ 150 151 handle = fields.get("openid.assoc_handle", [None])[0] 152 153 # With an association handle, look up the appropriate secret key and 154 # verify the signed items. 155 156 if handle is not None: 157 158 # Where an association exists, use the known secret key. 159 160 if self.associations.has_key(handle): 161 return check_openid_signature(fields, self.associations[handle]) 162 163 # Without an association, request verification of the signed items 164 # from the OpenID provider. 165 166 else: 167 return self.test_signature_direct(fields) 168 169 # Without a handle, no signature verification can occur. 170 171 return 0 172 173 def test_signature_direct(self, fields): 174 175 """ 176 See: 177 http://openid.net/specs/openid-authentication-2_0-12.html#rfc.section.11.4.2 178 """ 179 180 # Make a POST request using the "openid." fields. 181 182 d = {} 183 for name, values in fields.items(): 184 if name.startswith("openid.") and name != "openid.mode": 185 d[name] = values[0] 186 d["openid.mode"] = "check_authentication" 187 data = urllib.urlencode(d) 188 189 # Send a POST request to the OpenID provider, reading the response and 190 # testing for certain fields and values. 191 192 f = urllib.urlopen(fields["openid.op_endpoint"][0], data) 193 try: 194 items = [] 195 for line in f.readlines(): 196 if line[-1] == "\n": 197 line = line[:-1] 198 parts = line.split(":") 199 items.append((parts[0], ":".join(parts[1:]))) 200 fields = dict(items) 201 return fields["ns"] == self.openid_ns and fields["is_valid"] == "true" 202 finally: 203 f.close() 204 205 def test_replay(self, fields): 206 207 """ 208 See: 209 http://openid.net/specs/openid-authentication-2_0-12.html#rfc.section.11.3 210 """ 211 212 timestamp = fields["openid.response_nonce"][0] 213 # YYYY-MM-DDTHH:MM:SSZ... 214 year, month, day, hour, minute, second, unique = \ 215 int(timestamp[0:4]), int(timestamp[5:7]), int(timestamp[8:10]), \ 216 int(timestamp[11:13]), int(timestamp[14:16]), int(timestamp[17:19]), \ 217 timestamp[20:] 218 dt = datetime.datetime(year, month, day, hour, minute, second) 219 return -self.replay_limit < (datetime.datetime.utcnow() - dt) < self.replay_limit 220 221 def set_token(self, trans, username): 222 223 "Set an authentication token in 'trans' with the given 'username'." 224 225 Verifier.set_token(self, trans, username) 226 227 # Update the transaction with the user details. 228 229 trans.set_user(username) 230 231 # vim: tabstop=4 expandtab shiftwidth=4