1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
1.2 +++ b/GPGUtils.py Tue Dec 16 15:07:35 2014 +0100
1.3 @@ -0,0 +1,376 @@
1.4 +#!/usr/bin/env python
1.5 +
1.6 +"""
1.7 +GPG utilities derived from the MoinMessage library.
1.8 +
1.9 +Copyright (C) 2012, 2013, 2014 Paul Boddie <paul@boddie.org.uk>
1.10 +
1.11 +This program is free software; you can redistribute it and/or modify it under
1.12 +the terms of the GNU General Public License as published by the Free Software
1.13 +Foundation; either version 3 of the License, or (at your option) any later
1.14 +version.
1.15 +
1.16 +This program is distributed in the hope that it will be useful, but WITHOUT
1.17 +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
1.18 +FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
1.19 +details.
1.20 +
1.21 +You should have received a copy of the GNU General Public License along with
1.22 +this program. If not, see <http://www.gnu.org/licenses/>.
1.23 +"""
1.24 +
1.25 +from email.encoders import encode_noop
1.26 +from email.generator import Generator
1.27 +from email.mime.multipart import MIMEMultipart
1.28 +from email.mime.application import MIMEApplication
1.29 +from email.mime.base import MIMEBase
1.30 +from subprocess import Popen, PIPE
1.31 +from tempfile import mkstemp
1.32 +import os
1.33 +
1.34 +try:
1.35 + from cStringIO import StringIO
1.36 +except ImportError:
1.37 + from StringIO import StringIO
1.38 +
1.39 +class GPGError(Exception):
1.40 + pass
1.41 +
1.42 +class GPGDecodingError(Exception):
1.43 + pass
1.44 +
1.45 +class GPGMissingPart(GPGDecodingError):
1.46 + pass
1.47 +
1.48 +class GPGBadContent(GPGDecodingError):
1.49 + pass
1.50 +
1.51 +class GPG:
1.52 +
1.53 + "A wrapper around the gpg command using a particular configuration."
1.54 +
1.55 + def __init__(self, homedir=None):
1.56 + self.conf_args = []
1.57 +
1.58 + if homedir:
1.59 + self.conf_args += ["--homedir", homedir]
1.60 +
1.61 + self.errors = None
1.62 +
1.63 + def run(self, args, text=None):
1.64 +
1.65 + """
1.66 + Invoke gpg with the given 'args', supplying the given 'text' to the
1.67 + command directly or, if 'text' is omitted, using a file provided as part
1.68 + of the 'args' if appropriate.
1.69 +
1.70 + Failure to complete the operation will result in a GPGError being
1.71 + raised.
1.72 + """
1.73 +
1.74 + cmd = Popen(["gpg"] + self.conf_args + list(args), stdin=PIPE, stdout=PIPE, stderr=PIPE)
1.75 +
1.76 + # Attempt to write input to the command and to read output from the
1.77 + # command.
1.78 +
1.79 + text, self.errors = cmd.communicate(text)
1.80 +
1.81 + # Test for a zero result.
1.82 +
1.83 + if not cmd.returncode:
1.84 + return text
1.85 + else:
1.86 + raise GPGError, self.errors
1.87 +
1.88 + def verifyMessageText(self, signature, content):
1.89 +
1.90 + "Using the given 'signature', verify the given message 'content'."
1.91 +
1.92 + # Write the detached signature and content to files.
1.93 +
1.94 + signature_fd, signature_filename = mkstemp()
1.95 + content_fd, content_filename = mkstemp()
1.96 +
1.97 + try:
1.98 + signature_fp = os.fdopen(signature_fd, "w")
1.99 + content_fp = os.fdopen(content_fd, "w")
1.100 + try:
1.101 + signature_fp.write(signature)
1.102 + content_fp.write(content)
1.103 + finally:
1.104 + signature_fp.close()
1.105 + content_fp.close()
1.106 +
1.107 + # Verify the message text.
1.108 +
1.109 + text = self.run(["--status-fd", "1", "--verify", signature_filename, content_filename])
1.110 +
1.111 + # Return the details of the signing key.
1.112 +
1.113 + identity = None
1.114 + fingerprint = None
1.115 +
1.116 + for line in text.split("\n"):
1.117 + try:
1.118 + prefix, msgtype, digest, details = line.strip().split(" ", 3)
1.119 + except ValueError:
1.120 + continue
1.121 +
1.122 + # Return the fingerprint and identity details.
1.123 +
1.124 + if msgtype == "GOODSIG":
1.125 + identity = details
1.126 + elif msgtype == "VALIDSIG":
1.127 + fingerprint = digest
1.128 +
1.129 + if identity and fingerprint:
1.130 + return fingerprint, identity
1.131 +
1.132 + return None
1.133 +
1.134 + finally:
1.135 + os.remove(signature_filename)
1.136 + os.remove(content_filename)
1.137 +
1.138 + def verifyMessage(self, message):
1.139 +
1.140 + """
1.141 + Verify the given RFC 3156 'message', returning a tuple of the form
1.142 + (fingerprint, identity, content).
1.143 + """
1.144 +
1.145 + content, signature = getContentAndSignature(message)
1.146 +
1.147 + # Verify the message format.
1.148 +
1.149 + if signature.get_content_type() != "application/pgp-signature":
1.150 + raise GPGBadContent
1.151 +
1.152 + # Verify the message.
1.153 +
1.154 + fingerprint, identity = self.verifyMessageText(signature.get_payload(decode=True), as_string(content))
1.155 + return fingerprint, identity, content
1.156 +
1.157 + def signMessage(self, message, keyid):
1.158 +
1.159 + """
1.160 + Return a signed version of 'message' using the given 'keyid'.
1.161 + """
1.162 +
1.163 + # Sign the container's representation.
1.164 +
1.165 + signature = self.run(["--armor", "-u", keyid, "--detach-sig"], as_string(message))
1.166 +
1.167 + # Make the container for the message.
1.168 +
1.169 + signed_message = MIMEMultipart("signed", protocol="application/pgp-signature")
1.170 + signed_message.attach(message)
1.171 +
1.172 + signature_part = MIMEBase("application", "pgp-signature")
1.173 + signature_part.set_payload(signature)
1.174 + signed_message.attach(signature_part)
1.175 +
1.176 + return signed_message
1.177 +
1.178 + def decryptMessageText(self, message):
1.179 +
1.180 + "Return a decrypted version of 'message'."
1.181 +
1.182 + return self.run(["--decrypt"], message)
1.183 +
1.184 + def decryptMessage(self, message):
1.185 +
1.186 + """
1.187 + Decrypt the given RFC 3156 'message', returning the message text.
1.188 + """
1.189 +
1.190 + try:
1.191 + declaration, content = message.get_payload()
1.192 + except ValueError:
1.193 + raise GPGMissingPart
1.194 +
1.195 + # Verify the message format.
1.196 +
1.197 + if content.get_content_type() != "application/octet-stream":
1.198 + raise GPGBadContent
1.199 +
1.200 + # Return the decrypted message text.
1.201 +
1.202 + return self.decryptMessageText(content.get_payload(decode=True))
1.203 +
1.204 + def encryptMessage(self, message, keyid):
1.205 +
1.206 + """
1.207 + Return an encrypted version of 'message' using the given 'keyid'.
1.208 + """
1.209 +
1.210 + text = as_string(message)
1.211 + encrypted = self.run(["--armor", "-r", keyid, "--encrypt", "--trust-model", "always"], text)
1.212 +
1.213 + # Make the container for the message.
1.214 +
1.215 + encrypted_message = MIMEMultipart("encrypted", protocol="application/pgp-encrypted")
1.216 +
1.217 + # For encrypted content, add the declaration and content.
1.218 +
1.219 + declaration = MIMEBase("application", "pgp-encrypted")
1.220 + declaration.set_payload("Version: 1")
1.221 + encrypted_message.attach(declaration)
1.222 +
1.223 + content = MIMEApplication(encrypted, "octet-stream", encode_noop)
1.224 + encrypted_message.attach(content)
1.225 +
1.226 + return encrypted_message
1.227 +
1.228 + def importKeys(self, text):
1.229 +
1.230 + """
1.231 + Import the keys provided by the given 'text'.
1.232 + """
1.233 +
1.234 + self.run(["--import"], text)
1.235 +
1.236 + def exportKey(self, keyid):
1.237 +
1.238 + """
1.239 + Return the "armoured" public key text for 'keyid' as a message part with
1.240 + a suitable media type.
1.241 + See: https://tools.ietf.org/html/rfc3156#section-7
1.242 + """
1.243 +
1.244 + text = self.run(["--armor", "--export", keyid])
1.245 + return MIMEApplication(text, "pgp-keys", encode_noop)
1.246 +
1.247 + def listKeys(self, keyid=None):
1.248 +
1.249 + """
1.250 + Return a list of key details for keys on the keychain, selecting only
1.251 + one specific key if 'keyid' is specified.
1.252 + """
1.253 +
1.254 + text = self.run(["--list-keys", "--with-colons", "--with-fingerprint"] +
1.255 + (keyid and ["0x%s" % keyid] or []))
1.256 + return self._getKeysFromResult(text)
1.257 +
1.258 + def listSignatures(self, keyid=None):
1.259 +
1.260 + """
1.261 + Return a list of key and signature details for keys on the keychain,
1.262 + selecting only one specific key if 'keyid' is specified.
1.263 + """
1.264 +
1.265 + text = self.run(["--list-sigs", "--with-colons", "--with-fingerprint"] +
1.266 + (keyid and ["0x%s" % keyid] or []))
1.267 + return self._getKeysFromResult(text)
1.268 +
1.269 + def getKeysFromMessagePart(self, part):
1.270 +
1.271 + """
1.272 + Process an application/pgp-keys message 'part', returning a list of
1.273 + key details.
1.274 + """
1.275 +
1.276 + return self.getKeysFromString(part.get_payload(decode=True))
1.277 +
1.278 + def getKeysFromString(self, s):
1.279 +
1.280 + """
1.281 + Return a list of key details extracted from the given key block string
1.282 + 's'. Signature information is also included through the use of the gpg
1.283 + verbose option.
1.284 + """
1.285 +
1.286 + text = self.run(["--with-colons", "--with-fingerprint", "-v"], s)
1.287 + return self._getKeysFromResult(text)
1.288 +
1.289 + def _getKeysFromResult(self, text):
1.290 +
1.291 + """
1.292 + Return a list of key details extracted from the given command result
1.293 + 'text'.
1.294 + """
1.295 +
1.296 + keys = []
1.297 + for line in text.split("\n"):
1.298 + try:
1.299 + recordtype, trust, keylength, algorithm, keyid, cdate, expdate, serial, ownertrust, _rest = line.split(":", 9)
1.300 + except ValueError:
1.301 + continue
1.302 +
1.303 + if recordtype == "pub":
1.304 + userid, _rest = _rest.split(":", 1)
1.305 + keys.append({
1.306 + "type" : recordtype, "trust" : trust, "keylength" : keylength,
1.307 + "algorithm" : algorithm, "keyid" : keyid, "cdate" : cdate,
1.308 + "expdate" : expdate, "userid" : userid, "ownertrust" : ownertrust,
1.309 + "fingerprint" : None, "subkeys" : [], "signatures" : []
1.310 + })
1.311 + elif recordtype == "sub" and keys:
1.312 + keys[-1]["subkeys"].append({
1.313 + "trust" : trust, "keylength" : keylength, "algorithm" : algorithm,
1.314 + "keyid" : keyid, "cdate" : cdate, "expdate" : expdate,
1.315 + "ownertrust" : ownertrust
1.316 + })
1.317 + elif recordtype == "fpr" and keys:
1.318 + fingerprint, _rest = _rest.split(":", 1)
1.319 + keys[-1]["fingerprint"] = fingerprint
1.320 + elif recordtype == "sig" and keys:
1.321 + userid, _rest = _rest.split(":", 1)
1.322 + keys[-1]["signatures"].append({
1.323 + "keyid" : keyid, "cdate" : cdate, "expdate" : expdate,
1.324 + "userid" : userid
1.325 + })
1.326 +
1.327 + return keys
1.328 +
1.329 +# Message serialisation functions, working around email module problems.
1.330 +
1.331 +def as_string(message):
1.332 +
1.333 + """
1.334 + Return the string representation of 'message', attempting to preserve the
1.335 + precise original formatting.
1.336 + """
1.337 +
1.338 + out = StringIO()
1.339 + generator = Generator(out, False, 0) # disable reformatting measures
1.340 + generator.flatten(message)
1.341 + return out.getvalue()
1.342 +
1.343 +# Message decoding functions.
1.344 +
1.345 +# Detect PGP/GPG-encoded payloads.
1.346 +# See: http://tools.ietf.org/html/rfc3156
1.347 +
1.348 +def is_signed(message):
1.349 + mimetype = message.get_content_type()
1.350 + encoding = message.get_content_charset()
1.351 +
1.352 + return mimetype == "multipart/signed" and \
1.353 + message.get_param("protocol") == "application/pgp-signature"
1.354 +
1.355 +def is_encrypted(message):
1.356 + mimetype = message.get_content_type()
1.357 + encoding = message.get_content_charset()
1.358 +
1.359 + return mimetype == "multipart/encrypted" and \
1.360 + message.get_param("protocol") == "application/pgp-encrypted"
1.361 +
1.362 +def getContentAndSignature(message):
1.363 +
1.364 + """
1.365 + Return the content and signature parts of the given RFC 3156 'message'.
1.366 +
1.367 + NOTE: RFC 3156 states that signed messages should employ a detached
1.368 + NOTE: signature but then shows "BEGIN PGP MESSAGE" for signatures
1.369 + NOTE: instead of "BEGIN PGP SIGNATURE".
1.370 + NOTE: The "micalg" parameter is currently not supported.
1.371 + """
1.372 +
1.373 + try:
1.374 + content, signature = message.get_payload()
1.375 + return content, signature
1.376 + except ValueError:
1.377 + raise GPGMissingPart
1.378 +
1.379 +# vim: tabstop=4 expandtab shiftwidth=4