1 # -*- coding: iso-8859-1 -*- 2 """ 3 MoinMoin - MoinMessage library 4 5 @copyright: 2012 by Paul Boddie <paul@boddie.org.uk> 6 @license: GNU GPL (v2 or later), see COPYING.txt for details. 7 """ 8 9 from email import message_from_string 10 from email.encoders import encode_noop 11 from email.mime.multipart import MIMEMultipart 12 from email.mime.application import MIMEApplication 13 from email.mime.base import MIMEBase 14 from email.mime.text import MIMEText 15 from subprocess import Popen, PIPE 16 from tempfile import mkstemp 17 from urlparse import urlsplit 18 import httplib 19 import os 20 21 class Message: 22 23 "An update message." 24 25 def __init__(self): 26 self.updates = [] 27 28 def add_update(self, alternatives): 29 if len(alternatives) > 1: 30 part = MIMEMultipart() 31 for alternative in alternatives: 32 part.attach(alternative) 33 self.updates.append(part) 34 else: 35 self.updates.append(alternatives[0]) 36 37 def get_payload(self): 38 if len(self.updates) == 1: 39 message = self.updates[0] 40 else: 41 message = MIMEMultipart() 42 message.add_header("Update-Type", "collection") 43 for update in self.updates: 44 message.attach(update) 45 46 return message 47 48 class MoinMessageError(Exception): 49 pass 50 51 class GPG: 52 53 "A wrapper around the gpg command using a particular configuration." 54 55 def __init__(self, homedir=None): 56 self.conf_args = [] 57 58 if homedir: 59 self.conf_args += ["--homedir", homedir] 60 61 self.errors = None 62 63 def run(self, args, text=None): 64 65 """ 66 Invoke gpg with the given 'args', supplying the given 'text' to the 67 command directly or, if 'text' is omitted, using a file provided as part 68 of the 'args' if appropriate. 69 70 Failure to complete the operation will result in a MoinMessageError 71 being raised. 72 """ 73 74 cmd = Popen(["gpg"] + self.conf_args + list(args), stdin=PIPE, stdout=PIPE, stderr=PIPE) 75 76 try: 77 # Attempt to write input to the command and to read output from the 78 # command. 79 80 try: 81 if text: 82 cmd.stdin.write(text) 83 cmd.stdin.close() 84 85 text = cmd.stdout.read() 86 87 # I/O errors can indicate the failure of the command. 88 89 except IOError: 90 pass 91 92 self.errors = cmd.stderr.read() 93 94 # Test for a zero result. 95 96 if not cmd.wait(): 97 return text 98 else: 99 raise MoinMessageError, self.errors 100 101 finally: 102 cmd.stdout.close() 103 cmd.stderr.close() 104 105 def verifyMessage(self, signature, content): 106 107 "Using the given 'signature', verify the given message 'content'." 108 109 # Write the detached signature and content to files. 110 111 signature_fd, signature_filename = mkstemp() 112 content_fd, content_filename = mkstemp() 113 114 try: 115 signature_fp = os.fdopen(signature_fd, "w") 116 content_fp = os.fdopen(content_fd, "w") 117 try: 118 signature_fp.write(signature) 119 content_fp.write(content) 120 finally: 121 signature_fp.close() 122 content_fp.close() 123 124 # Verify the message text. 125 126 text = self.run(["--status-fd", "1", "--verify", signature_filename, content_filename]) 127 128 # Return the details of the signing key. 129 130 identity = None 131 fingerprint = None 132 133 for line in text.split("\n"): 134 try: 135 prefix, msgtype, digest, details = line.strip().split(" ", 3) 136 except ValueError: 137 continue 138 139 # Return the fingerprint and identity details. 140 141 if msgtype == "GOODSIG": 142 identity = details 143 elif msgtype == "VALIDSIG": 144 fingerprint = digest 145 146 if identity and fingerprint: 147 return fingerprint, identity 148 149 return None 150 151 finally: 152 os.remove(signature_filename) 153 os.remove(content_filename) 154 155 def signMessage(self, message, keyid): 156 157 """ 158 Return a signed version of 'message' using the given 'keyid'. 159 """ 160 161 text = message.as_string() 162 signature = self.run(["--armor", "-u", keyid, "--detach-sig"], text) 163 164 # Make the container for the message. 165 166 signed_message = MIMEMultipart("signed", protocol="application/pgp-signature") 167 signed_message.attach(message) 168 169 signature_part = MIMEBase("application", "pgp-signature") 170 signature_part.set_payload(signature) 171 signed_message.attach(signature_part) 172 173 return signed_message 174 175 def decryptMessage(self, message): 176 177 "Return a decrypted version of 'message'." 178 179 return self.run(["--decrypt"], message) 180 181 def encryptMessage(self, message, keyid): 182 183 """ 184 Return an encrypted version of 'message' using the given 'keyid'. 185 """ 186 187 text = message.as_string() 188 encrypted = self.run(["--armor", "-r", keyid, "--encrypt", "--trust-model", "always"], text) 189 190 # Make the container for the message. 191 192 encrypted_message = MIMEMultipart("encrypted", protocol="application/pgp-encrypted") 193 194 # For encrypted content, add the declaration and content. 195 196 declaration = MIMEBase("application", "pgp-encrypted") 197 declaration.set_payload("Version: 1") 198 encrypted_message.attach(declaration) 199 200 content = MIMEApplication(encrypted, "octet-stream", encode_noop) 201 encrypted_message.attach(content) 202 203 return encrypted_message 204 205 # Communications functions. 206 207 def sendMessage(message, url): 208 209 "Send 'message' to the given 'url." 210 211 scheme, host, port, path = parseURL(url) 212 text = message.as_string() 213 214 if scheme == "http": 215 cls = httplib.HTTPConnection 216 elif scheme == "https": 217 cls = httplib.HTTPSConnection 218 else: 219 raise MoinMessageError, "Communications protocol not supported: %s" % scheme 220 221 req = cls(host, port) 222 req.request("PUT", path, text) 223 resp = req.getresponse() 224 return resp.read() 225 226 def parseURL(url): 227 228 "Return the scheme, host, port and path for the given 'url'." 229 230 scheme, host_port, path, query, fragment = urlsplit(url) 231 host_port = host_port.split(":") 232 233 if query: 234 path += "?" + query 235 236 if len(host_port) > 1: 237 host = host_port[0] 238 port = int(host_port[1]) 239 else: 240 host = host_port[0] 241 port = 80 242 243 return scheme, host, port, path 244 245 # vim: tabstop=4 expandtab shiftwidth=4