# HG changeset patch # User Paul Boddie # Date 1428333829 -7200 # Node ID 76ce54fdaa5c97117934041df38f1311565d96d9 # Parent 9ff6f746f5e9aaf596117db9041178c7fefda4c1 Removed the parts of MoinMessage now provided by GPGUtils. diff -r 9ff6f746f5e9 -r 76ce54fdaa5c MoinMessage.py --- a/MoinMessage.py Mon Apr 06 17:17:41 2015 +0200 +++ b/MoinMessage.py Mon Apr 06 17:23:49 2015 +0200 @@ -2,7 +2,7 @@ """ MoinMoin - MoinMessage library - @copyright: 2012, 2013, 2014 by Paul Boddie + @copyright: 2012, 2013, 2014, 2015 by Paul Boddie @license: GNU GPL (v2 or later), see COPYING.txt for details. """ @@ -20,6 +20,8 @@ from tempfile import mkstemp from urlparse import urlsplit from DateSupport import getDateTimeFromRFC2822 +from GPGUtils import GPG, GPGError, GPGDecodingError, GPGMissingPart, GPGBadContent, \ + as_string, is_signed, is_encrypted, getContentAndSignature import httplib import os @@ -139,351 +141,10 @@ return message -class MoinMessageError(Exception): - pass - -class MoinMessageDecodingError(Exception): - pass - -class MoinMessageMissingPart(MoinMessageDecodingError): - pass - -class MoinMessageBadContent(MoinMessageDecodingError): - pass - -class MoinMessageTransferError(MoinMessageError): - def __init__(self, code, message, body): - MoinMessageError.__init__(self, message) - self.code = code - self.body = body - -class GPG: - - "A wrapper around the gpg command using a particular configuration." - - def __init__(self, homedir=None): - self.conf_args = [] - - if homedir: - self.conf_args += ["--homedir", homedir] - - self.errors = None - - def run(self, args, text=None): - - """ - Invoke gpg with the given 'args', supplying the given 'text' to the - command directly or, if 'text' is omitted, using a file provided as part - of the 'args' if appropriate. - - Failure to complete the operation will result in a MoinMessageError - being raised. - """ - - cmd = Popen(["gpg"] + self.conf_args + list(args), stdin=PIPE, stdout=PIPE, stderr=PIPE) - - # Attempt to write input to the command and to read output from the - # command. - - text, self.errors = cmd.communicate(text) - - # Test for a zero result. - - if not cmd.returncode: - return text - else: - raise MoinMessageError, self.errors - - def verifyMessageText(self, signature, content): - - "Using the given 'signature', verify the given message 'content'." - - # Write the detached signature and content to files. - - signature_fd, signature_filename = mkstemp() - content_fd, content_filename = mkstemp() - - try: - signature_fp = os.fdopen(signature_fd, "w") - content_fp = os.fdopen(content_fd, "w") - try: - signature_fp.write(signature) - content_fp.write(content) - finally: - signature_fp.close() - content_fp.close() - - # Verify the message text. - - text = self.run(["--status-fd", "1", "--verify", signature_filename, content_filename]) - - # Return the details of the signing key. - - identity = None - fingerprint = None - - for line in text.split("\n"): - try: - prefix, msgtype, digest, details = line.strip().split(" ", 3) - except ValueError: - continue - - # Return the fingerprint and identity details. - - if msgtype == "GOODSIG": - identity = details - elif msgtype == "VALIDSIG": - fingerprint = digest - - if identity and fingerprint: - return fingerprint, identity - - return None - - finally: - os.remove(signature_filename) - os.remove(content_filename) - - def verifyMessage(self, message): - - """ - Verify the given RFC 3156 'message', returning a tuple of the form - (fingerprint, identity, content). - """ - - content, signature = getContentAndSignature(message) - - # Verify the message format. - - if signature.get_content_type() != "application/pgp-signature": - raise MoinMessageBadContent - - # Verify the message. - - fingerprint, identity = self.verifyMessageText(signature.get_payload(decode=True), as_string(content)) - return fingerprint, identity, content - - def signMessage(self, message, keyid): - - """ - Return a signed version of 'message' using the given 'keyid'. - """ - - # Sign the container's representation. - - signature = self.run(["--armor", "-u", keyid, "--detach-sig"], as_string(message)) - - # Make the container for the message. - - signed_message = MIMEMultipart("signed", protocol="application/pgp-signature") - signed_message.attach(message) - - signature_part = MIMEBase("application", "pgp-signature") - signature_part.set_payload(signature) - signed_message.attach(signature_part) - - return signed_message - - def decryptMessageText(self, message): - - "Return a decrypted version of 'message'." - - return self.run(["--decrypt"], message) - - def decryptMessage(self, message): - - """ - Decrypt the given RFC 3156 'message', returning the message text. - """ - - try: - declaration, content = message.get_payload() - except ValueError: - raise MoinMessageMissingPart - - # Verify the message format. - - if content.get_content_type() != "application/octet-stream": - raise MoinMessageBadContent - - # Return the decrypted message text. - - return self.decryptMessageText(content.get_payload(decode=True)) - - def encryptMessage(self, message, keyid): - - """ - Return an encrypted version of 'message' using the given 'keyid'. - """ - - text = as_string(message) - encrypted = self.run(["--armor", "-r", keyid, "--encrypt", "--trust-model", "always"], text) - - # Make the container for the message. - - encrypted_message = MIMEMultipart("encrypted", protocol="application/pgp-encrypted") - - # For encrypted content, add the declaration and content. - - declaration = MIMEBase("application", "pgp-encrypted") - declaration.set_payload("Version: 1") - encrypted_message.attach(declaration) - - content = MIMEApplication(encrypted, "octet-stream", encode_noop) - encrypted_message.attach(content) - - return encrypted_message - - def importKeys(self, text): - - """ - Import the keys provided by the given 'text'. - """ - - self.run(["--import"], text) - - def exportKey(self, keyid): - - """ - Return the "armoured" public key text for 'keyid' as a message part with - a suitable media type. - See: https://tools.ietf.org/html/rfc3156#section-7 - """ - - text = self.run(["--armor", "--export", keyid]) - return MIMEApplication(text, "pgp-keys", encode_noop) - - def listKeys(self, keyid=None): - - """ - Return a list of key details for keys on the keychain, selecting only - one specific key if 'keyid' is specified. - """ - - text = self.run(["--list-keys", "--with-colons", "--with-fingerprint"] + - (keyid and ["0x%s" % keyid] or [])) - return self._getKeysFromResult(text) - - def listSignatures(self, keyid=None): - - """ - Return a list of key and signature details for keys on the keychain, - selecting only one specific key if 'keyid' is specified. - """ - - text = self.run(["--list-sigs", "--with-colons", "--with-fingerprint"] + - (keyid and ["0x%s" % keyid] or [])) - return self._getKeysFromResult(text) - - def getKeysFromMessagePart(self, part): - - """ - Process an application/pgp-keys message 'part', returning a list of - key details. - """ - - return self.getKeysFromString(part.get_payload(decode=True)) - - def getKeysFromString(self, s): - - """ - Return a list of key details extracted from the given key block string - 's'. Signature information is also included through the use of the gpg - verbose option. - """ - - text = self.run(["--with-colons", "--with-fingerprint", "-v"], s) - return self._getKeysFromResult(text) - - def _getKeysFromResult(self, text): - - """ - Return a list of key details extracted from the given command result - 'text'. - """ - - keys = [] - for line in text.split("\n"): - try: - recordtype, trust, keylength, algorithm, keyid, cdate, expdate, serial, ownertrust, _rest = line.split(":", 9) - except ValueError: - continue - - if recordtype == "pub": - userid, _rest = _rest.split(":", 1) - keys.append({ - "type" : recordtype, "trust" : trust, "keylength" : keylength, - "algorithm" : algorithm, "keyid" : keyid, "cdate" : cdate, - "expdate" : expdate, "userid" : userid, "ownertrust" : ownertrust, - "fingerprint" : None, "subkeys" : [], "signatures" : [] - }) - elif recordtype == "sub" and keys: - keys[-1]["subkeys"].append({ - "trust" : trust, "keylength" : keylength, "algorithm" : algorithm, - "keyid" : keyid, "cdate" : cdate, "expdate" : expdate, - "ownertrust" : ownertrust - }) - elif recordtype == "fpr" and keys: - fingerprint, _rest = _rest.split(":", 1) - keys[-1]["fingerprint"] = fingerprint - elif recordtype == "sig" and keys: - userid, _rest = _rest.split(":", 1) - keys[-1]["signatures"].append({ - "keyid" : keyid, "cdate" : cdate, "expdate" : expdate, - "userid" : userid - }) - - return keys - -# Message serialisation functions, working around email module problems. - -def as_string(message): - - """ - Return the string representation of 'message', attempting to preserve the - precise original formatting. - """ - - out = StringIO() - generator = Generator(out, False, 0) # disable reformatting measures - generator.flatten(message) - return out.getvalue() - -# Message decoding functions. - -# Detect PGP/GPG-encoded payloads. -# See: http://tools.ietf.org/html/rfc3156 - -def is_signed(message): - mimetype = message.get_content_type() - encoding = message.get_content_charset() - - return mimetype == "multipart/signed" and \ - message.get_param("protocol") == "application/pgp-signature" - -def is_encrypted(message): - mimetype = message.get_content_type() - encoding = message.get_content_charset() - - return mimetype == "multipart/encrypted" and \ - message.get_param("protocol") == "application/pgp-encrypted" - -def getContentAndSignature(message): - - """ - Return the content and signature parts of the given RFC 3156 'message'. - - NOTE: RFC 3156 states that signed messages should employ a detached - NOTE: signature but then shows "BEGIN PGP MESSAGE" for signatures - NOTE: instead of "BEGIN PGP SIGNATURE". - NOTE: The "micalg" parameter is currently not supported. - """ - - try: - content, signature = message.get_payload() - return content, signature - except ValueError: - raise MoinMessageMissingPart +MoinMessageError = GPGError +MoinMessageDecodingError = GPGDecodingError +MoinMessageMissingPart = GPGMissingPart +MoinMessageBadContent = GPGBadContent # Communications functions.