paul@136 | 1 | #!/usr/bin/env python |
paul@136 | 2 | |
paul@136 | 3 | """ |
paul@136 | 4 | GPG utilities derived from the MoinMessage library. |
paul@136 | 5 | |
paul@136 | 6 | Copyright (C) 2012, 2013, 2014 Paul Boddie <paul@boddie.org.uk> |
paul@136 | 7 | |
paul@136 | 8 | This program is free software; you can redistribute it and/or modify it under |
paul@136 | 9 | the terms of the GNU General Public License as published by the Free Software |
paul@136 | 10 | Foundation; either version 3 of the License, or (at your option) any later |
paul@136 | 11 | version. |
paul@136 | 12 | |
paul@136 | 13 | This program is distributed in the hope that it will be useful, but WITHOUT |
paul@136 | 14 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
paul@136 | 15 | FOR A PARTICULAR PURPOSE. See the GNU General Public License for more |
paul@136 | 16 | details. |
paul@136 | 17 | |
paul@136 | 18 | You should have received a copy of the GNU General Public License along with |
paul@136 | 19 | this program. If not, see <http://www.gnu.org/licenses/>. |
paul@136 | 20 | """ |
paul@136 | 21 | |
paul@136 | 22 | from email.encoders import encode_noop |
paul@136 | 23 | from email.generator import Generator |
paul@136 | 24 | from email.mime.multipart import MIMEMultipart |
paul@136 | 25 | from email.mime.application import MIMEApplication |
paul@136 | 26 | from email.mime.base import MIMEBase |
paul@136 | 27 | from subprocess import Popen, PIPE |
paul@136 | 28 | from tempfile import mkstemp |
paul@136 | 29 | import os |
paul@136 | 30 | |
paul@136 | 31 | try: |
paul@136 | 32 | from cStringIO import StringIO |
paul@136 | 33 | except ImportError: |
paul@136 | 34 | from StringIO import StringIO |
paul@136 | 35 | |
paul@136 | 36 | class GPGError(Exception): |
paul@136 | 37 | pass |
paul@136 | 38 | |
paul@136 | 39 | class GPGDecodingError(Exception): |
paul@136 | 40 | pass |
paul@136 | 41 | |
paul@136 | 42 | class GPGMissingPart(GPGDecodingError): |
paul@136 | 43 | pass |
paul@136 | 44 | |
paul@136 | 45 | class GPGBadContent(GPGDecodingError): |
paul@136 | 46 | pass |
paul@136 | 47 | |
paul@136 | 48 | class GPG: |
paul@136 | 49 | |
paul@136 | 50 | "A wrapper around the gpg command using a particular configuration." |
paul@136 | 51 | |
paul@136 | 52 | def __init__(self, homedir=None): |
paul@136 | 53 | self.conf_args = [] |
paul@136 | 54 | |
paul@136 | 55 | if homedir: |
paul@136 | 56 | self.conf_args += ["--homedir", homedir] |
paul@136 | 57 | |
paul@136 | 58 | self.errors = None |
paul@136 | 59 | |
paul@136 | 60 | def run(self, args, text=None): |
paul@136 | 61 | |
paul@136 | 62 | """ |
paul@136 | 63 | Invoke gpg with the given 'args', supplying the given 'text' to the |
paul@136 | 64 | command directly or, if 'text' is omitted, using a file provided as part |
paul@136 | 65 | of the 'args' if appropriate. |
paul@136 | 66 | |
paul@136 | 67 | Failure to complete the operation will result in a GPGError being |
paul@136 | 68 | raised. |
paul@136 | 69 | """ |
paul@136 | 70 | |
paul@136 | 71 | cmd = Popen(["gpg"] + self.conf_args + list(args), stdin=PIPE, stdout=PIPE, stderr=PIPE) |
paul@136 | 72 | |
paul@136 | 73 | # Attempt to write input to the command and to read output from the |
paul@136 | 74 | # command. |
paul@136 | 75 | |
paul@136 | 76 | text, self.errors = cmd.communicate(text) |
paul@136 | 77 | |
paul@136 | 78 | # Test for a zero result. |
paul@136 | 79 | |
paul@136 | 80 | if not cmd.returncode: |
paul@136 | 81 | return text |
paul@136 | 82 | else: |
paul@136 | 83 | raise GPGError, self.errors |
paul@136 | 84 | |
paul@136 | 85 | def verifyMessageText(self, signature, content): |
paul@136 | 86 | |
paul@136 | 87 | "Using the given 'signature', verify the given message 'content'." |
paul@136 | 88 | |
paul@136 | 89 | # Write the detached signature and content to files. |
paul@136 | 90 | |
paul@136 | 91 | signature_fd, signature_filename = mkstemp() |
paul@136 | 92 | content_fd, content_filename = mkstemp() |
paul@136 | 93 | |
paul@136 | 94 | try: |
paul@136 | 95 | signature_fp = os.fdopen(signature_fd, "w") |
paul@136 | 96 | content_fp = os.fdopen(content_fd, "w") |
paul@136 | 97 | try: |
paul@136 | 98 | signature_fp.write(signature) |
paul@136 | 99 | content_fp.write(content) |
paul@136 | 100 | finally: |
paul@136 | 101 | signature_fp.close() |
paul@136 | 102 | content_fp.close() |
paul@136 | 103 | |
paul@136 | 104 | # Verify the message text. |
paul@136 | 105 | |
paul@136 | 106 | text = self.run(["--status-fd", "1", "--verify", signature_filename, content_filename]) |
paul@136 | 107 | |
paul@136 | 108 | # Return the details of the signing key. |
paul@136 | 109 | |
paul@136 | 110 | identity = None |
paul@136 | 111 | fingerprint = None |
paul@136 | 112 | |
paul@136 | 113 | for line in text.split("\n"): |
paul@136 | 114 | try: |
paul@136 | 115 | prefix, msgtype, digest, details = line.strip().split(" ", 3) |
paul@136 | 116 | except ValueError: |
paul@136 | 117 | continue |
paul@136 | 118 | |
paul@136 | 119 | # Return the fingerprint and identity details. |
paul@136 | 120 | |
paul@136 | 121 | if msgtype == "GOODSIG": |
paul@136 | 122 | identity = details |
paul@136 | 123 | elif msgtype == "VALIDSIG": |
paul@136 | 124 | fingerprint = digest |
paul@136 | 125 | |
paul@136 | 126 | if identity and fingerprint: |
paul@136 | 127 | return fingerprint, identity |
paul@136 | 128 | |
paul@136 | 129 | return None |
paul@136 | 130 | |
paul@136 | 131 | finally: |
paul@136 | 132 | os.remove(signature_filename) |
paul@136 | 133 | os.remove(content_filename) |
paul@136 | 134 | |
paul@136 | 135 | def verifyMessage(self, message): |
paul@136 | 136 | |
paul@136 | 137 | """ |
paul@136 | 138 | Verify the given RFC 3156 'message', returning a tuple of the form |
paul@136 | 139 | (fingerprint, identity, content). |
paul@136 | 140 | """ |
paul@136 | 141 | |
paul@136 | 142 | content, signature = getContentAndSignature(message) |
paul@136 | 143 | |
paul@136 | 144 | # Verify the message format. |
paul@136 | 145 | |
paul@136 | 146 | if signature.get_content_type() != "application/pgp-signature": |
paul@136 | 147 | raise GPGBadContent |
paul@136 | 148 | |
paul@136 | 149 | # Verify the message. |
paul@136 | 150 | |
paul@136 | 151 | fingerprint, identity = self.verifyMessageText(signature.get_payload(decode=True), as_string(content)) |
paul@136 | 152 | return fingerprint, identity, content |
paul@136 | 153 | |
paul@136 | 154 | def signMessage(self, message, keyid): |
paul@136 | 155 | |
paul@136 | 156 | """ |
paul@136 | 157 | Return a signed version of 'message' using the given 'keyid'. |
paul@136 | 158 | """ |
paul@136 | 159 | |
paul@136 | 160 | # Sign the container's representation. |
paul@136 | 161 | |
paul@136 | 162 | signature = self.run(["--armor", "-u", keyid, "--detach-sig"], as_string(message)) |
paul@136 | 163 | |
paul@136 | 164 | # Make the container for the message. |
paul@136 | 165 | |
paul@136 | 166 | signed_message = MIMEMultipart("signed", protocol="application/pgp-signature") |
paul@136 | 167 | signed_message.attach(message) |
paul@136 | 168 | |
paul@136 | 169 | signature_part = MIMEBase("application", "pgp-signature") |
paul@136 | 170 | signature_part.set_payload(signature) |
paul@136 | 171 | signed_message.attach(signature_part) |
paul@136 | 172 | |
paul@136 | 173 | return signed_message |
paul@136 | 174 | |
paul@136 | 175 | def decryptMessageText(self, message): |
paul@136 | 176 | |
paul@136 | 177 | "Return a decrypted version of 'message'." |
paul@136 | 178 | |
paul@136 | 179 | return self.run(["--decrypt"], message) |
paul@136 | 180 | |
paul@136 | 181 | def decryptMessage(self, message): |
paul@136 | 182 | |
paul@136 | 183 | """ |
paul@136 | 184 | Decrypt the given RFC 3156 'message', returning the message text. |
paul@136 | 185 | """ |
paul@136 | 186 | |
paul@136 | 187 | try: |
paul@136 | 188 | declaration, content = message.get_payload() |
paul@136 | 189 | except ValueError: |
paul@136 | 190 | raise GPGMissingPart |
paul@136 | 191 | |
paul@136 | 192 | # Verify the message format. |
paul@136 | 193 | |
paul@136 | 194 | if content.get_content_type() != "application/octet-stream": |
paul@136 | 195 | raise GPGBadContent |
paul@136 | 196 | |
paul@136 | 197 | # Return the decrypted message text. |
paul@136 | 198 | |
paul@136 | 199 | return self.decryptMessageText(content.get_payload(decode=True)) |
paul@136 | 200 | |
paul@136 | 201 | def encryptMessage(self, message, keyid): |
paul@136 | 202 | |
paul@136 | 203 | """ |
paul@136 | 204 | Return an encrypted version of 'message' using the given 'keyid'. |
paul@136 | 205 | """ |
paul@136 | 206 | |
paul@136 | 207 | text = as_string(message) |
paul@136 | 208 | encrypted = self.run(["--armor", "-r", keyid, "--encrypt", "--trust-model", "always"], text) |
paul@136 | 209 | |
paul@136 | 210 | # Make the container for the message. |
paul@136 | 211 | |
paul@136 | 212 | encrypted_message = MIMEMultipart("encrypted", protocol="application/pgp-encrypted") |
paul@136 | 213 | |
paul@136 | 214 | # For encrypted content, add the declaration and content. |
paul@136 | 215 | |
paul@136 | 216 | declaration = MIMEBase("application", "pgp-encrypted") |
paul@136 | 217 | declaration.set_payload("Version: 1") |
paul@136 | 218 | encrypted_message.attach(declaration) |
paul@136 | 219 | |
paul@136 | 220 | content = MIMEApplication(encrypted, "octet-stream", encode_noop) |
paul@136 | 221 | encrypted_message.attach(content) |
paul@136 | 222 | |
paul@136 | 223 | return encrypted_message |
paul@136 | 224 | |
paul@136 | 225 | def importKeys(self, text): |
paul@136 | 226 | |
paul@136 | 227 | """ |
paul@136 | 228 | Import the keys provided by the given 'text'. |
paul@136 | 229 | """ |
paul@136 | 230 | |
paul@136 | 231 | self.run(["--import"], text) |
paul@136 | 232 | |
paul@136 | 233 | def exportKey(self, keyid): |
paul@136 | 234 | |
paul@136 | 235 | """ |
paul@136 | 236 | Return the "armoured" public key text for 'keyid' as a message part with |
paul@136 | 237 | a suitable media type. |
paul@136 | 238 | See: https://tools.ietf.org/html/rfc3156#section-7 |
paul@136 | 239 | """ |
paul@136 | 240 | |
paul@136 | 241 | text = self.run(["--armor", "--export", keyid]) |
paul@136 | 242 | return MIMEApplication(text, "pgp-keys", encode_noop) |
paul@136 | 243 | |
paul@136 | 244 | def listKeys(self, keyid=None): |
paul@136 | 245 | |
paul@136 | 246 | """ |
paul@136 | 247 | Return a list of key details for keys on the keychain, selecting only |
paul@136 | 248 | one specific key if 'keyid' is specified. |
paul@136 | 249 | """ |
paul@136 | 250 | |
paul@136 | 251 | text = self.run(["--list-keys", "--with-colons", "--with-fingerprint"] + |
paul@136 | 252 | (keyid and ["0x%s" % keyid] or [])) |
paul@136 | 253 | return self._getKeysFromResult(text) |
paul@136 | 254 | |
paul@136 | 255 | def listSignatures(self, keyid=None): |
paul@136 | 256 | |
paul@136 | 257 | """ |
paul@136 | 258 | Return a list of key and signature details for keys on the keychain, |
paul@136 | 259 | selecting only one specific key if 'keyid' is specified. |
paul@136 | 260 | """ |
paul@136 | 261 | |
paul@136 | 262 | text = self.run(["--list-sigs", "--with-colons", "--with-fingerprint"] + |
paul@136 | 263 | (keyid and ["0x%s" % keyid] or [])) |
paul@136 | 264 | return self._getKeysFromResult(text) |
paul@136 | 265 | |
paul@136 | 266 | def getKeysFromMessagePart(self, part): |
paul@136 | 267 | |
paul@136 | 268 | """ |
paul@136 | 269 | Process an application/pgp-keys message 'part', returning a list of |
paul@136 | 270 | key details. |
paul@136 | 271 | """ |
paul@136 | 272 | |
paul@136 | 273 | return self.getKeysFromString(part.get_payload(decode=True)) |
paul@136 | 274 | |
paul@136 | 275 | def getKeysFromString(self, s): |
paul@136 | 276 | |
paul@136 | 277 | """ |
paul@136 | 278 | Return a list of key details extracted from the given key block string |
paul@136 | 279 | 's'. Signature information is also included through the use of the gpg |
paul@136 | 280 | verbose option. |
paul@136 | 281 | """ |
paul@136 | 282 | |
paul@136 | 283 | text = self.run(["--with-colons", "--with-fingerprint", "-v"], s) |
paul@136 | 284 | return self._getKeysFromResult(text) |
paul@136 | 285 | |
paul@136 | 286 | def _getKeysFromResult(self, text): |
paul@136 | 287 | |
paul@136 | 288 | """ |
paul@136 | 289 | Return a list of key details extracted from the given command result |
paul@136 | 290 | 'text'. |
paul@136 | 291 | """ |
paul@136 | 292 | |
paul@136 | 293 | keys = [] |
paul@136 | 294 | for line in text.split("\n"): |
paul@136 | 295 | try: |
paul@136 | 296 | recordtype, trust, keylength, algorithm, keyid, cdate, expdate, serial, ownertrust, _rest = line.split(":", 9) |
paul@136 | 297 | except ValueError: |
paul@136 | 298 | continue |
paul@136 | 299 | |
paul@136 | 300 | if recordtype == "pub": |
paul@136 | 301 | userid, _rest = _rest.split(":", 1) |
paul@136 | 302 | keys.append({ |
paul@136 | 303 | "type" : recordtype, "trust" : trust, "keylength" : keylength, |
paul@136 | 304 | "algorithm" : algorithm, "keyid" : keyid, "cdate" : cdate, |
paul@136 | 305 | "expdate" : expdate, "userid" : userid, "ownertrust" : ownertrust, |
paul@136 | 306 | "fingerprint" : None, "subkeys" : [], "signatures" : [] |
paul@136 | 307 | }) |
paul@136 | 308 | elif recordtype == "sub" and keys: |
paul@136 | 309 | keys[-1]["subkeys"].append({ |
paul@136 | 310 | "trust" : trust, "keylength" : keylength, "algorithm" : algorithm, |
paul@136 | 311 | "keyid" : keyid, "cdate" : cdate, "expdate" : expdate, |
paul@136 | 312 | "ownertrust" : ownertrust |
paul@136 | 313 | }) |
paul@136 | 314 | elif recordtype == "fpr" and keys: |
paul@136 | 315 | fingerprint, _rest = _rest.split(":", 1) |
paul@136 | 316 | keys[-1]["fingerprint"] = fingerprint |
paul@136 | 317 | elif recordtype == "sig" and keys: |
paul@136 | 318 | userid, _rest = _rest.split(":", 1) |
paul@136 | 319 | keys[-1]["signatures"].append({ |
paul@136 | 320 | "keyid" : keyid, "cdate" : cdate, "expdate" : expdate, |
paul@136 | 321 | "userid" : userid |
paul@136 | 322 | }) |
paul@136 | 323 | |
paul@136 | 324 | return keys |
paul@136 | 325 | |
paul@136 | 326 | # Message serialisation functions, working around email module problems. |
paul@136 | 327 | |
paul@136 | 328 | def as_string(message): |
paul@136 | 329 | |
paul@136 | 330 | """ |
paul@136 | 331 | Return the string representation of 'message', attempting to preserve the |
paul@136 | 332 | precise original formatting. |
paul@136 | 333 | """ |
paul@136 | 334 | |
paul@136 | 335 | out = StringIO() |
paul@136 | 336 | generator = Generator(out, False, 0) # disable reformatting measures |
paul@136 | 337 | generator.flatten(message) |
paul@136 | 338 | return out.getvalue() |
paul@136 | 339 | |
paul@136 | 340 | # Message decoding functions. |
paul@136 | 341 | |
paul@136 | 342 | # Detect PGP/GPG-encoded payloads. |
paul@136 | 343 | # See: http://tools.ietf.org/html/rfc3156 |
paul@136 | 344 | |
paul@136 | 345 | def is_signed(message): |
paul@136 | 346 | mimetype = message.get_content_type() |
paul@136 | 347 | encoding = message.get_content_charset() |
paul@136 | 348 | |
paul@136 | 349 | return mimetype == "multipart/signed" and \ |
paul@136 | 350 | message.get_param("protocol") == "application/pgp-signature" |
paul@136 | 351 | |
paul@136 | 352 | def is_encrypted(message): |
paul@136 | 353 | mimetype = message.get_content_type() |
paul@136 | 354 | encoding = message.get_content_charset() |
paul@136 | 355 | |
paul@136 | 356 | return mimetype == "multipart/encrypted" and \ |
paul@136 | 357 | message.get_param("protocol") == "application/pgp-encrypted" |
paul@136 | 358 | |
paul@136 | 359 | def getContentAndSignature(message): |
paul@136 | 360 | |
paul@136 | 361 | """ |
paul@136 | 362 | Return the content and signature parts of the given RFC 3156 'message'. |
paul@136 | 363 | |
paul@136 | 364 | NOTE: RFC 3156 states that signed messages should employ a detached |
paul@136 | 365 | NOTE: signature but then shows "BEGIN PGP MESSAGE" for signatures |
paul@136 | 366 | NOTE: instead of "BEGIN PGP SIGNATURE". |
paul@136 | 367 | NOTE: The "micalg" parameter is currently not supported. |
paul@136 | 368 | """ |
paul@136 | 369 | |
paul@136 | 370 | try: |
paul@136 | 371 | content, signature = message.get_payload() |
paul@136 | 372 | return content, signature |
paul@136 | 373 | except ValueError: |
paul@136 | 374 | raise GPGMissingPart |
paul@136 | 375 | |
paul@136 | 376 | # vim: tabstop=4 expandtab shiftwidth=4 |