1 #!/usr/bin/env python 2 3 """ 4 OpenID redirection classes, sending unauthenticated users to the OpenID 5 initiation page. 6 7 Copyright (C) 2004, 2005, 2006, 2007 Paul Boddie <paul@boddie.org.uk> 8 9 This library is free software; you can redistribute it and/or 10 modify it under the terms of the GNU Lesser General Public 11 License as published by the Free Software Foundation; either 12 version 2.1 of the License, or (at your option) any later version. 13 14 This library is distributed in the hope that it will be useful, 15 but WITHOUT ANY WARRANTY; without even the implied warranty of 16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 17 Lesser General Public License for more details. 18 19 You should have received a copy of the GNU Lesser General Public 20 License along with this library; if not, write to the Free Software 21 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA 22 """ 23 24 from WebStack.Resources.LoginRedirect import LoginRedirectResource 25 from WebStack.Helpers.Auth import Verifier, check_openid_signature 26 import WebStack.Generic 27 import datetime 28 import urllib 29 30 class OpenIDRedirectResource(LoginRedirectResource): 31 32 "A resource redirecting to an OpenID initiation page." 33 34 openid_ns = "http://specs.openid.net/auth/2.0" 35 36 def respond(self, trans): 37 38 "Respond using the given transaction 'trans'." 39 40 fields = trans.get_fields(self.path_encoding) 41 42 # If requested, attempt to verify OpenID assertions. 43 # http://openid.net/specs/openid-authentication-2_0-12.html#rfc.section.11 44 45 if fields.get("openid.mode", [None])[0] == "id_res": 46 47 # The additional condition could be used to insist on OpenID 2.0 48 # conformance: 49 # fields.get("openid.ns", [None])[0] == self.openid_ns 50 51 if self.authenticator.authenticate(trans, verify=1): 52 trans.redirect(trans.encode_url_without_query(fields["openid.return_to"][0])) 53 54 # Otherwise, handle the usual parameters and request details. 55 56 LoginRedirectResource.respond(self, trans) 57 58 class OpenIDRedirectAuthenticator(Verifier): 59 60 """ 61 An authenticator which verifies the credentials provided in a special login 62 cookie, accepting OpenID assertions if necessary. 63 """ 64 65 openid_ns = "http://specs.openid.net/auth/2.0" 66 replay_limit = datetime.timedelta(0, 10) # 10s 67 68 def __init__(self, secret_key, app_url, associations=None, replay_limit=None, 69 cookie_name=None, urlencoding=None): 70 71 """ 72 Initialise the authenticator with a 'secret_key', 'app_url' and optional 73 'associations', 'replay_limit', 'cookie_name' and 'urlencoding'. The 74 'app_url' should be the "bare" reference using a protocol, host and 75 port, not including any path information. 76 """ 77 78 Verifier.__init__(self, secret_key, cookie_name) 79 80 self.app_url = app_url 81 self.associations = associations or {} 82 self.replay_limit = replay_limit or self.replay_limit 83 self.urlencoding = urlencoding 84 85 def authenticate(self, trans, verify=0): 86 87 """ 88 Authenticate the originator of 'trans', updating the object if 89 successful and returning a true value if successful, or a false value 90 otherwise. 91 92 If the optional 'verify' parameter is specified as a true value, perform 93 verification on any incoming OpenID credentials. 94 """ 95 96 # If requested, attempt to verify OpenID assertions. 97 98 if verify: 99 fields = trans.get_fields(self.urlencoding) 100 101 # NOTE: Could expose all errors. 102 103 try: 104 # Test the details of the assertion. 105 106 if self.test_url(fields) and \ 107 self.test_signature(trans, fields) and \ 108 self.test_replay(fields): 109 110 self.set_token(trans, fields["openid.claimed_id"][0]) 111 return 1 112 113 # Incomplete assertion. 114 115 except (KeyError, ValueError): 116 raise 117 118 # Assertion failed or was incomplete. 119 120 return 0 121 122 # Otherwise, try to authenticate with an application cookie. 123 124 else: 125 valid = Verifier.authenticate(self, trans) 126 127 # Update the transaction with the user details. 128 129 if valid: 130 username, token = self.get_username_and_token(trans) 131 trans.set_user(username) 132 return valid 133 134 def test_url(self, fields): 135 136 """ 137 See: 138 http://openid.net/specs/openid-authentication-2_0-12.html#rfc.section.11.1 139 """ 140 141 # NOTE: Currently, this is not strict enough. 142 143 return fields.has_key("openid.return_to") and \ 144 fields["openid.return_to"][0].startswith(self.app_url) 145 146 def test_signature(self, trans, fields): 147 148 """ 149 See: 150 http://openid.net/specs/openid-authentication-2_0-12.html#rfc.section.11.4 151 http://openid.net/specs/openid-authentication-2_0-12.html#rfc.section.6 152 """ 153 154 handle = fields.get("openid.assoc_handle", [None])[0] 155 156 # With an association handle, look up the appropriate secret key and 157 # verify the signed items. 158 159 if handle is not None: 160 161 # Where an association exists, use the known secret key. 162 163 if self.associations.has_key(handle): 164 return check_openid_signature(fields, self.associations[handle]) 165 166 # Without an association, request verification of the signed items 167 # from the OpenID provider. 168 169 else: 170 return self.test_signature_direct(trans, fields) 171 172 # Without a handle, no signature verification can occur. 173 174 return 0 175 176 def test_signature_direct(self, trans, fields): 177 178 """ 179 See: 180 http://openid.net/specs/openid-authentication-2_0-12.html#rfc.section.11.4.2 181 """ 182 183 # Make a POST request using the "openid." fields. 184 185 d = [] 186 for name, values in fields.items(): 187 if name.startswith("openid.") and name != "openid.mode": 188 d.append("%s=%s" % (name, trans.encode_path(values[0]))) 189 d.append("%s=%s" % ("openid.mode", "check_authentication")) 190 data = "&".join(d) 191 192 # Send a POST request to the OpenID provider, reading the response and 193 # testing for certain fields and values. 194 195 f = urllib.urlopen(fields["openid.op_endpoint"][0], data) 196 try: 197 items = [] 198 for line in f.readlines(): 199 if line[-1] == "\n": 200 line = line[:-1] 201 parts = line.split(":") 202 items.append((parts[0], ":".join(parts[1:]))) 203 fields = dict(items) 204 return fields["ns"] == self.openid_ns and fields["is_valid"] == "true" 205 finally: 206 f.close() 207 208 def test_replay(self, fields): 209 210 """ 211 See: 212 http://openid.net/specs/openid-authentication-2_0-12.html#rfc.section.11.3 213 """ 214 215 timestamp = fields["openid.response_nonce"][0] 216 # YYYY-MM-DDTHH:MM:SSZ... 217 year, month, day, hour, minute, second, unique = \ 218 int(timestamp[0:4]), int(timestamp[5:7]), int(timestamp[8:10]), \ 219 int(timestamp[11:13]), int(timestamp[14:16]), int(timestamp[17:19]), \ 220 timestamp[20:] 221 dt = datetime.datetime(year, month, day, hour, minute, second) 222 return -self.replay_limit < (datetime.datetime.utcnow() - dt) < self.replay_limit 223 224 def set_token(self, trans, username): 225 226 "Set an authentication token in 'trans' with the given 'username'." 227 228 Verifier.set_token(self, trans, username) 229 230 # Update the transaction with the user details. 231 232 trans.set_user(username) 233 234 # vim: tabstop=4 expandtab shiftwidth=4