# HG changeset patch # User paulb # Date 1202068803 0 # Node ID 4d146f48689104f3a87d40318bf49674b0349494 # Parent 139b1412b9c3d71049e89a32bf349c12b58306fd [project @ 2008-02-03 20:00:03 by paulb] Attempted to improve the reusability of the OpenID functionality. Attempted to make the mechanisms tolerate non-ASCII values and to handle such values in URLs correctly. diff -r 139b1412b9c3 -r 4d146f486891 WebStack/Resources/OpenIDInitiation.py --- a/WebStack/Resources/OpenIDInitiation.py Sun Feb 03 19:58:01 2008 +0000 +++ b/WebStack/Resources/OpenIDInitiation.py Sun Feb 03 20:00:03 2008 +0000 @@ -3,7 +3,7 @@ """ OpenID initiation resources which redirect clients to an OpenID provider. -Copyright (C) 2004, 2005, 2006, 2007 Paul Boddie +Copyright (C) 2004, 2005, 2006, 2007, 2008 Paul Boddie This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public @@ -24,12 +24,83 @@ import libxml2dom import cgi # for escape -class OpenIDInitiationResource: +class OpenIDInitiationUtils: + + "Utilities for OpenID initiation screens which may be inherited." + + openid_ns = "http://specs.openid.net/auth/2.0" + + def __init__(self, openid_mode=None, use_redirect=1, urlencoding=None): + + """ + Initialise the resource. + + The optional 'openid_mode' parameter may be set to "checkid_immediate" + or "checkid_setup" (the default). + + If the optional 'use_redirect' flag is set to a false value (which is + not the default), a confirmation screen is given instead of immediately + redirecting the user to the OpenID provider. + + The optional 'urlencoding' parameter allows a special encoding to be + used in producing the redirection path. + """ + + self.openid_mode = openid_mode or "checkid_setup" + self.use_redirect = use_redirect + self.urlencoding = urlencoding + + def get_redirect_url(self, trans, app, claimed_identifier, provider, local_identifier): + + # NOTE: Should consider the special "select" mode for identity. + + return "%s?openid.ns=%s&openid.mode=%s&openid.return_to=%s&openid.claimed_id=%s&openid.identity=%s" % ( + trans.encode_url_without_query(provider, self.urlencoding), + trans.encode_path(self.openid_ns, self.urlencoding), + trans.encode_path(self.openid_mode, self.urlencoding), + trans.encode_path(app, self.urlencoding), + trans.encode_path(claimed_identifier, self.urlencoding), + trans.encode_path(local_identifier, self.urlencoding) + ) + + def get_provider_url(self, trans, identity): + + """ + Return the claimed identifier, provider URL and local identifier for the + authenticating user using the given 'trans' and 'identity'. + + See: + http://openid.net/specs/openid-authentication-2_0-12.html#rfc.section.7.3 + """ + + if identity.startswith("xri://"): + identity = openid[6:] + + # NOTE: Not yet discovering XRI providers. + + if identity[0] in ("=", "@", "+", "$", "!", "("): + pass + else: + if not identity.startswith("http"): + identity = "http://" + identity + + # Obtain a provider url from a resource at the stated URL. + + doc = libxml2dom.parseURI(trans.encode_url_without_query(identity), html=1) + provider_links = doc.xpath("/html/head/link[contains(@rel, 'openid2.provider')]/@href") + local_ids = doc.xpath("/html/head/link[contains(@rel, 'openid2.local_id')]/@href") + if provider_links: + if local_ids: + return identity, provider_links[0].nodeValue, local_ids[0].nodeValue + else: + return identity, provider_links[0].nodeValue, None + + return identity, None, None + +class OpenIDInitiationResource(OpenIDInitiationUtils): "A resource providing an OpenID initiation screen." - openid_ns = "http://specs.openid.net/auth/2.0" - def __init__(self, openid_mode=None, use_redirect=1, urlencoding=None, encoding=None): """ @@ -53,9 +124,7 @@ a subclass, or override the 'show_initiation' and 'show_success' methods. """ - self.openid_mode = openid_mode or "checkid_setup" - self.use_redirect = use_redirect - self.urlencoding = urlencoding + OpenIDInitiationUtils.__init__(self, openid_mode, use_redirect, urlencoding) self.encoding = encoding def respond(self, trans): @@ -69,16 +138,25 @@ fields_body = trans.get_fields_from_body(self.encoding) if fields_body.has_key("initiate") and fields_body.has_key("identity"): - claimed_identifier, provider, local_identifier = self.get_provider_url(fields_body["identity"][0]) - if provider is not None: - self._redirect(trans, app, claimed_identifier, provider, local_identifier) - # The above method does not return. + self.check_identity(trans, fields_body, app) + # The above method does not return. # Otherwise, show the initiation form. self.show_initiation(trans, app) - def _redirect(self, trans, app, claimed_identifier, provider, local_identifier): + def check_identity(self, trans, fields, app): + + """ + Check the identity found through 'trans' and 'fields', using 'app' and + discovered information about the identity to redirect to the provider. + """ + + claimed_identifier, provider, local_identifier = self.get_provider_url(trans, fields["identity"][0]) + if provider is not None: + self.redirect_to_provider(trans, app, claimed_identifier, provider, local_identifier) + + def redirect_to_provider(self, trans, app, claimed_identifier, provider, local_identifier): """ Redirect the client using 'trans' and the given 'app', @@ -89,16 +167,7 @@ http://openid.net/specs/openid-authentication-2_0-12.html#rfc.section.9 """ - # NOTE: Should consider the special "select" mode for identity. - - url = "%s?openid.ns=%s&openid.mode=%s&openid.return_to=%s&openid.claimed_id=%s&openid.identity=%s" % ( - provider, - trans.encode_path(self.openid_ns, self.urlencoding), - trans.encode_path(self.openid_mode, self.urlencoding), - trans.encode_path(app, self.urlencoding), - trans.encode_path(claimed_identifier, self.urlencoding), - trans.encode_path(local_identifier, self.urlencoding) - ) + url = self.get_redirect_url(trans, app, claimed_identifier, provider, local_identifier) # Show the success page anyway. # Offer a POST-based form for redirection. @@ -112,40 +181,6 @@ else: raise WebStack.Generic.EndOfResponse - def get_provider_url(self, identity): - - """ - Return the claimed identifier, provider URL and local identifier for the - authenticating user using the given 'identity'. - - See: - http://openid.net/specs/openid-authentication-2_0-12.html#rfc.section.7.3 - """ - - if identity.startswith("xri://"): - identity = openid[6:] - - # NOTE: Not yet discovering XRI providers. - - if identity[0] in ("=", "@", "+", "$", "!", "("): - pass - else: - if not identity.startswith("http"): - identity = "http://" + identity - - # Obtain a provider url from a resource at the stated URL. - - doc = libxml2dom.parseURI(identity, html=1) - provider_links = doc.xpath("/html/head/link[contains(@rel, 'openid2.provider')]/@href") - local_ids = doc.xpath("/html/head/link[contains(@rel, 'openid2.local_id')]/@href") - if provider_links: - if local_ids: - return identity, provider_links[0].nodeValue, local_ids[0].nodeValue - else: - return identity, provider_links[0].nodeValue, None - - return identity, None, None - def show_initiation(self, trans, app): """ @@ -153,7 +188,7 @@ of the 'app' which the client was attempting to access. """ - trans.set_content_type(WebStack.Generic.ContentType("text/html", self.encoding)) + trans.set_content_type(WebStack.Generic.ContentType("text/html", self.encoding or trans.default_charset)) out = trans.get_response_stream() out.write(self.initiation_page % cgi.escape(app)) @@ -165,7 +200,7 @@ 'local_identifier'. """ - trans.set_content_type(WebStack.Generic.ContentType("text/html", self.encoding)) + trans.set_content_type(WebStack.Generic.ContentType("text/html", self.encoding or trans.default_charset)) out = trans.get_response_stream() out.write(self.success_page % tuple(map(cgi.escape, ( provider, self.openid_ns, self.openid_mode, app, claimed_identifier, local_identifier) diff -r 139b1412b9c3 -r 4d146f486891 WebStack/Resources/OpenIDLogin.py --- a/WebStack/Resources/OpenIDLogin.py Sun Feb 03 19:58:01 2008 +0000 +++ b/WebStack/Resources/OpenIDLogin.py Sun Feb 03 20:00:03 2008 +0000 @@ -4,7 +4,7 @@ OpenID provider login resources which redirect clients back to the application ("relying party"). -Copyright (C) 2004, 2005, 2006, 2007 Paul Boddie +Copyright (C) 2004, 2005, 2006, 2007, 2008 Paul Boddie This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public @@ -27,6 +27,7 @@ import time import random import cgi # for escape +import urlparse # for urlsplit class OpenIDLoginUtils: @@ -35,15 +36,28 @@ openid_ns = "http://specs.openid.net/auth/2.0" signed_names = ["op_endpoint", "return_to", "response_nonce", "assoc_handle", "claimed_id", "identity"] - def __init__(self, associations=None, use_redirect=1): + def __init__(self, app_url, authenticator, associations=None, use_redirect=1, urlencoding=None): + + """ + Initialise the resource with the application URL 'app_url' and an + 'authenticator'. + + The optional 'associations' is a mapping from association handles to + secret keys. + + If the optional 'use_redirect' flag is set to a false value (which is + not the default), a confirmation screen is given instead of immediately + redirecting the user back to the original application. + + The optional 'urlencoding' parameter allows a special encoding to be + used in producing the redirection path. + """ + + self.app_url = app_url + self.authenticator = authenticator self.associations = associations or {} self.use_redirect = use_redirect - - def urlencode(self, trans, value): - if not trans.default_charset: - return trans.encode_path(value, self.urlencoding) - else: - return trans.encode_path(value) + self.urlencoding = urlencoding def get_openid_fields(self, trans, claimed_id, local_id, username, return_to, endpoint): @@ -81,13 +95,13 @@ # Build an URL for returning to the application. - url = "%s?" % fields["openid.return_to"][0] + url = trans.encode_url_without_query(fields["openid.return_to"][0]) + "?" first = 1 for name, value in fields.items(): if not first: url += "&" - url += "%s=%s" % (name, self.urlencode(trans, value[0])) + url += "%s=%s" % (name, trans.encode_path(value[0])) first = 0 return url @@ -147,26 +161,6 @@ self.show_verification(trans, valid) - def check_login(self, trans, fields): - - "Check the login details supplied in 'trans' and 'fields'." - - return_to = fields.get("openid.return_to", [""])[0] - claimed_id = fields.get("openid.claimed_id", [""])[0] - local_id = fields.get("openid.identity", [""])[0] - - # Check a combination of local identifier and username together with - # the password. - - username = fields.get("username", [""])[0] - password = fields.get("password", [""])[0] - - # NOTE: Permit flexibility in the credentials. - - if self.authenticator.authenticate(trans, (local_id, username), password): - endpoint = self.app_url + trans.get_path_without_query(self.urlencoding) - self.redirect_to_application(trans, claimed_id, local_id, username, return_to, endpoint) - class OpenIDLoginResource(OpenIDLoginUtils): "A resource providing a login screen." @@ -195,10 +189,7 @@ a subclass, or override the 'show_login' and 'show_success' methods. """ - OpenIDLoginUtils.__init__(self, associations, use_redirect) - self.app_url = app_url - self.authenticator = authenticator - self.urlencoding = urlencoding + OpenIDLoginUtils.__init__(self, app_url, authenticator, associations, use_redirect, urlencoding) self.encoding = encoding def respond(self, trans): @@ -224,6 +215,26 @@ self.show_login(trans, fields) + def check_login(self, trans, fields): + + "Check the login details supplied in 'trans' and 'fields'." + + return_to = fields.get("openid.return_to", [""])[0] + claimed_id = fields.get("openid.claimed_id", [""])[0] + local_id = fields.get("openid.identity", [""])[0] + + # Check a combination of local identifier and username together with + # the password. + + username = fields.get("username", [""])[0] + password = fields.get("password", [""])[0] + + # NOTE: Permit flexibility in the credentials. + + if self.authenticator.authenticate(trans, (local_id, username), password): + endpoint = self.app_url + trans.get_path_without_query(self.urlencoding) + self.redirect_to_application(trans, claimed_id, local_id, username, return_to, endpoint) + def show_login(self, trans, fields): """ @@ -234,7 +245,7 @@ claimed_id = fields.get("openid.claimed_id", [""])[0] local_id = fields.get("openid.identity", [""])[0] - trans.set_content_type(WebStack.Generic.ContentType("text/html", self.encoding)) + trans.set_content_type(WebStack.Generic.ContentType("text/html", self.encoding or trans.default_charset)) out = trans.get_response_stream() out.write(self.login_page % tuple(map(cgi.escape, (return_to, claimed_id, local_id)))) @@ -245,7 +256,7 @@ dictionary of 'fields' providing details of the transaction. """ - trans.set_content_type(WebStack.Generic.ContentType("text/html", self.encoding)) + trans.set_content_type(WebStack.Generic.ContentType("text/html", self.encoding or trans.default_charset)) out = trans.get_response_stream() l = [] for name, values in fields.items(): diff -r 139b1412b9c3 -r 4d146f486891 WebStack/Resources/OpenIDRedirect.py --- a/WebStack/Resources/OpenIDRedirect.py Sun Feb 03 19:58:01 2008 +0000 +++ b/WebStack/Resources/OpenIDRedirect.py Sun Feb 03 20:00:03 2008 +0000 @@ -49,7 +49,7 @@ # fields.get("openid.ns", [None])[0] == self.openid_ns if self.authenticator.authenticate(trans, verify=1): - trans.redirect(fields["openid.return_to"][0]) + trans.redirect(trans.encode_url_without_query(fields["openid.return_to"][0])) # Otherwise, handle the usual parameters and request details. @@ -102,7 +102,7 @@ # Test the details of the assertion. if self.test_url(fields) and \ - self.test_signature(fields) and \ + self.test_signature(trans, fields) and \ self.test_replay(fields): self.set_token(trans, fields["openid.identity"][0]) @@ -140,7 +140,7 @@ return fields["openid.return_to"][0].startswith(self.app_url) - def test_signature(self, fields): + def test_signature(self, trans, fields): """ See: @@ -164,13 +164,13 @@ # from the OpenID provider. else: - return self.test_signature_direct(fields) + return self.test_signature_direct(trans, fields) # Without a handle, no signature verification can occur. return 0 - def test_signature_direct(self, fields): + def test_signature_direct(self, trans, fields): """ See: @@ -179,12 +179,12 @@ # Make a POST request using the "openid." fields. - d = {} + d = [] for name, values in fields.items(): if name.startswith("openid.") and name != "openid.mode": - d[name] = values[0] - d["openid.mode"] = "check_authentication" - data = urllib.urlencode(d) + d.append("%s=%s" % (name, trans.encode_path(values[0]))) + d.append("%s=%s" % ("openid.mode", "check_authentication")) + data = "&".join(d) # Send a POST request to the OpenID provider, reading the response and # testing for certain fields and values.