# HG changeset patch # User paulb # Date 1195431825 0 # Node ID 37ef332e67b5762627ffbc52c5b8540538a47e6e # Parent d4eda3da8f7a978b2ff51c229056f28b73e47fc9 [project @ 2007-11-19 00:23:45 by paulb] Made the OpenID resources more extensible. Reverted the redirect default parameter values. diff -r d4eda3da8f7a -r 37ef332e67b5 WebStack/Resources/OpenIDInitiation.py --- a/WebStack/Resources/OpenIDInitiation.py Mon Nov 19 00:23:06 2007 +0000 +++ b/WebStack/Resources/OpenIDInitiation.py Mon Nov 19 00:23:45 2007 +0000 @@ -31,7 +31,7 @@ encoding = "utf-8" openid_ns = "http://specs.openid.net/auth/2.0" - def __init__(self, openid_mode=None, use_redirect=0, urlencoding=None, encoding=None): + def __init__(self, openid_mode=None, use_redirect=1, urlencoding=None, encoding=None): """ Initialise the resource. diff -r d4eda3da8f7a -r 37ef332e67b5 WebStack/Resources/OpenIDLogin.py --- a/WebStack/Resources/OpenIDLogin.py Mon Nov 19 00:23:06 2007 +0000 +++ b/WebStack/Resources/OpenIDLogin.py Mon Nov 19 00:23:45 2007 +0000 @@ -28,102 +28,24 @@ import random import cgi # for escape -class OpenIDLoginResource: - - "A resource providing a login screen." - - encoding = "utf-8" - openid_ns = "http://specs.openid.net/auth/2.0" +class OpenIDLoginUtils: - def __init__(self, app_url, authenticator, associations=None, use_redirect=0, urlencoding=None, encoding=None): - - """ - Initialise the resource with the application URL 'app_url' and an - 'authenticator'. - - The optional 'associations' is a mapping from association handles to - secret keys. + "Utilities for OpenID login screens which may be inherited." - If the optional 'use_redirect' flag is set to a false value (which is - not the default), a confirmation screen is given instead of immediately - redirecting the user back to the original application. - - The optional 'urlencoding' parameter allows a special encoding to be - used in producing the redirection path. + openid_ns = "http://specs.openid.net/auth/2.0" + signed_names = ["op_endpoint", "return_to", "response_nonce", "assoc_handle", "claimed_id", "identity"] - The optional 'encoding' parameter allows a special encoding to be used - in producing the login pages. - - To change the pages employed by this resource, either redefine the - 'login_page' and 'success_page' attributes in instances of this class or - a subclass, or override the 'show_login' and 'show_success' methods. - """ - - self.app_url = app_url - self.authenticator = authenticator + def __init__(self, associations=None, use_redirect=1): self.associations = associations or {} self.use_redirect = use_redirect - self.urlencoding = urlencoding - self.encoding = encoding or self.encoding - - def respond(self, trans): - - "Respond using the transaction 'trans'." - - # Check for a submitted login form. - - fields = trans.get_fields(self.encoding) - - app = fields.get("openid.return_to", [""])[0] - claimed_id = fields.get("openid.claimed_id", [""])[0] - local_id = fields.get("openid.identity", [""])[0] - - if fields.has_key("login"): - - # Check a combination of local identifier and username together with - # the password. - - username = fields.get("username", [""])[0] - password = fields.get("password", [""])[0] - - # NOTE: Permit flexibility in the credentials. - - if self.authenticator.authenticate(trans, (local_id, username), password): - self._redirect(trans, claimed_id, local_id, username, app) - # The above method does not return. - - # Check for an OpenID signature verification request. - elif fields.get("openid.mode", [None])[0] == "check_authentication": - - # Obtain the secret key from recorded associations. - - handle = fields.get("openid.assoc_handle", [None])[0] - if handle is not None and self.associations.has_key(handle): - valid = check_openid_signature(fields, self.associations[handle]) - del self.associations[handle] - else: - valid = 0 - - # Produce a response for this request. + def urlencode(self, trans, value): + if not trans.default_charset: + return trans.encode_path(value, self.urlencoding) + else: + return trans.encode_path(value) - self.show_verification(trans, valid) - # The above method does not return. - - # NOTE: Permit association requests here. - - # Otherwise, show the login form. - - self.show_login(trans, app, claimed_id, local_id) - - def _redirect(self, trans, claimed_id, local_id, username, app): - - """ - Redirect the client using 'trans', 'claimed_id', 'local_id', 'username' - and the given 'app' details. - """ - - app_url = self.app_url + trans.get_path_without_query(self.urlencoding) + def get_openid_fields(self, trans, claimed_id, local_id, username, return_to, endpoint): # Make an association that can be used in signature verification. # NOTE: Probably need to consider the secret key a bit more. @@ -139,35 +61,51 @@ # Make a signature. - signed_names = ["op_endpoint", "return_to", "response_nonce", "assoc_handle", "claimed_id", "identity"] fields = { - "openid.op_endpoint" : [app_url], - "openid.return_to" : [app], + "openid.ns" : [self.openid_ns], + "openid.mode" : ["id_res"], + "openid.signed" : [",".join(self.signed_names)], + "openid.op_endpoint" : [endpoint], + "openid.return_to" : [return_to], "openid.response_nonce" : [timestamp], "openid.assoc_handle" : [handle], "openid.claimed_id" : [claimed_id], "openid.identity" : [local_id] } + signature = make_openid_signature(self.signed_names, fields, secret_key) + fields["openid.sig"] = [signature] - signature = make_openid_signature(signed_names, fields, secret_key) + return fields + + def get_openid_url(self, trans, fields): # Build an URL for returning to the application. - url = "%s?openid.ns=%s&openid.mode=%s&openid.signed=%s&openid.sig=%s" % ( - app, - trans.encode_path(self.openid_ns, self.urlencoding), - trans.encode_path("id_res", self.urlencoding), - trans.encode_path(",".join(signed_names), self.urlencoding), - trans.encode_path(signature, self.urlencoding) - ) + url = "%s?" % fields["openid.return_to"][0] + + first = 1 + for name, value in fields.items(): + if not first: + url += "&" + url += "%s=%s" % (name, self.urlencode(trans, value[0])) + first = 0 - for name, value in fields.items(): - url += "&%s=%s" % (name, trans.encode_path(value[0], self.urlencoding)) + return url + + def redirect_to_application(self, trans, claimed_id, local_id, username, return_to, endpoint): + + """ + Redirect the client using 'trans', 'claimed_id', 'local_id', 'username' + and the given 'return_to' and 'endpoint' details. + """ + + fields = self.get_openid_fields(trans, claimed_id, local_id, username, return_to, endpoint) + url = self.get_openid_url(trans, fields) # Show the success page anyway. # Offer a POST-based form for redirection. - self.show_success(trans, app, "id_res", signed_names, signature, fields) + self.show_success(trans, fields) if self.use_redirect: trans.redirect(url) else: @@ -192,37 +130,129 @@ out.write("ns:%s\nis_valid:%s\n" % (self.openid_ns, status_str)) raise WebStack.Generic.EndOfResponse - def show_login(self, trans, app, claimed_id, local_id): + def check_authentication(self, trans, fields): + + "Check the authentication details supplied in 'trans' and 'fields'." + + # Obtain the secret key from recorded associations. + + handle = fields.get("openid.assoc_handle", [None])[0] + if handle is not None and self.associations.has_key(handle): + valid = check_openid_signature(fields, self.associations[handle]) + del self.associations[handle] + else: + valid = 0 + + # Produce a response for this request. + + self.show_verification(trans, valid) + + def check_login(self, trans, fields): + + "Check the login details supplied in 'trans' and 'fields'." + + return_to = fields.get("openid.return_to", [""])[0] + claimed_id = fields.get("openid.claimed_id", [""])[0] + local_id = fields.get("openid.identity", [""])[0] + + # Check a combination of local identifier and username together with + # the password. + + username = fields.get("username", [""])[0] + password = fields.get("password", [""])[0] + + # NOTE: Permit flexibility in the credentials. + + if self.authenticator.authenticate(trans, (local_id, username), password): + endpoint = self.app_url + trans.get_path_without_query(self.urlencoding) + self.redirect_to_application(trans, claimed_id, local_id, username, return_to, endpoint) + +class OpenIDLoginResource(OpenIDLoginUtils): + + "A resource providing a login screen." + + encoding = "utf-8" + + def __init__(self, app_url, authenticator, associations=None, use_redirect=1, urlencoding=None, encoding=None): """ - Writes a login screen using the transaction 'trans', including details - of the 'app' which the client was attempting to access, along with the - 'claimed_id' and 'local_id'. + Initialise the resource with the application URL 'app_url' and an + 'authenticator'. + + The optional 'associations' is a mapping from association handles to + secret keys. + + If the optional 'use_redirect' flag is set to a false value (which is + not the default), a confirmation screen is given instead of immediately + redirecting the user back to the original application. + + The optional 'urlencoding' parameter allows a special encoding to be + used in producing the redirection path. + + The optional 'encoding' parameter allows a special encoding to be used + in producing the login pages. + + To change the pages employed by this resource, either redefine the + 'login_page' and 'success_page' attributes in instances of this class or + a subclass, or override the 'show_login' and 'show_success' methods. """ + OpenIDLoginUtils.__init__(self, associations, use_redirect) + self.app_url = app_url + self.authenticator = authenticator + self.urlencoding = urlencoding + self.encoding = encoding or self.encoding + + def respond(self, trans): + + "Respond using the transaction 'trans'." + + # Check for a submitted login form. + + fields = trans.get_fields(self.encoding) + + if fields.has_key("login"): + self.check_login(trans, fields) + # The above method may not return. + + # Check for an OpenID signature verification request. + + elif fields.get("openid.mode", [None])[0] == "check_authentication": + self.check_authentication(trans, fields) + # The above method does not return. + + # NOTE: Permit association requests here. + # Otherwise, show the login form. + + self.show_login(trans, fields) + + def show_login(self, trans, fields): + + """ + Writes a login screen using the transaction 'trans' and 'fields'. + """ + + return_to = fields.get("openid.return_to", [""])[0] + claimed_id = fields.get("openid.claimed_id", [""])[0] + local_id = fields.get("openid.identity", [""])[0] + trans.set_content_type(WebStack.Generic.ContentType("text/html", self.encoding)) out = trans.get_response_stream() - out.write(self.login_page % tuple(map(cgi.escape, (app, claimed_id, local_id)))) + out.write(self.login_page % tuple(map(cgi.escape, (return_to, claimed_id, local_id)))) - def show_success(self, trans, app, mode, signed_names, signature, fields): + def show_success(self, trans, fields): """ - Writes a success screen using the transaction 'trans', including details - of the 'app' which the client was attempting to access, the - communications 'mode', the 'signed_names' indicating the fields which - are signed, the 'signature' associated with the message, and a - dictionary of 'fields' indicating other information. + Writes a success screen using the transaction 'trans', using a + dictionary of 'fields' providing details of the transaction. """ trans.set_content_type(WebStack.Generic.ContentType("text/html", self.encoding)) out = trans.get_response_stream() l = [] - for name, value in fields.items(): - l.append("""""" % (name, value[0])) - args = tuple( - map(cgi.escape, (app, self.openid_ns, mode, ",".join(signed_names), signature)) - ) + ("\n".join(l),) - out.write(self.success_page % args) + for name, values in fields.items(): + l.append("""""" % (name, cgi.escape(values[0]))) + out.write(self.success_page % (fields["openid.return_to"][0], "\n".join(l))) login_page = """ @@ -251,10 +281,6 @@

Login Successful

- - - - %s

Please proceed to the application: