1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
1.2 +++ b/emailfix/header.py Sun Apr 12 19:35:41 2015 +0200
1.3 @@ -0,0 +1,515 @@
1.4 +# Copyright (C) 2002-2006 Python Software Foundation
1.5 +# Author: Ben Gertzfield, Barry Warsaw
1.6 +# Contact: email-sig@python.org
1.7 +
1.8 +"""Header encoding and decoding functionality."""
1.9 +
1.10 +__all__ = [
1.11 + 'Header',
1.12 + 'decode_header',
1.13 + 'make_header',
1.14 + ]
1.15 +
1.16 +import re
1.17 +import binascii
1.18 +
1.19 +import email.quoprimime
1.20 +import email.base64mime
1.21 +
1.22 +from email.errors import HeaderParseError
1.23 +from email.charset import Charset
1.24 +
1.25 +NL = '\n'
1.26 +SPACE = ' '
1.27 +USPACE = u' '
1.28 +SPACE8 = ' ' * 8
1.29 +UEMPTYSTRING = u''
1.30 +
1.31 +MAXLINELEN = 76
1.32 +
1.33 +USASCII = Charset('us-ascii')
1.34 +UTF8 = Charset('utf-8')
1.35 +
1.36 +# Match encoded-word strings in the form =?charset?q?Hello_World?=
1.37 +ecre = re.compile(r'''
1.38 + =\? # literal =?
1.39 + (?P<charset>[^?]*?) # non-greedy up to the next ? is the charset
1.40 + \? # literal ?
1.41 + (?P<encoding>[qb]) # either a "q" or a "b", case insensitive
1.42 + \? # literal ?
1.43 + (?P<encoded>.*?) # non-greedy up to the next ?= is the encoded string
1.44 + \?= # literal ?=
1.45 + (?=[ \t]|$) # whitespace or the end of the string
1.46 + ''', re.VERBOSE | re.IGNORECASE | re.MULTILINE)
1.47 +
1.48 +# Field name regexp, including trailing colon, but not separating whitespace,
1.49 +# according to RFC 2822. Character range is from tilde to exclamation mark.
1.50 +# For use with .match()
1.51 +fcre = re.compile(r'[\041-\176]+:$')
1.52 +
1.53 +# Find a header embedded in a putative header value. Used to check for
1.54 +# header injection attack.
1.55 +_embeded_header = re.compile(r'\n[^ \t]+:')
1.56 +
1.57 +
1.58 +
1.59 +# Helpers
1.60 +_max_append = email.quoprimime._max_append
1.61 +
1.62 +
1.63 +
1.64 +def decode_header(header):
1.65 + """Decode a message header value without converting charset.
1.66 +
1.67 + Returns a list of (decoded_string, charset) pairs containing each of the
1.68 + decoded parts of the header. Charset is None for non-encoded parts of the
1.69 + header, otherwise a lower-case string containing the name of the character
1.70 + set specified in the encoded string.
1.71 +
1.72 + An email.errors.HeaderParseError may be raised when certain decoding error
1.73 + occurs (e.g. a base64 decoding exception).
1.74 + """
1.75 + # If no encoding, just return the header
1.76 + header = str(header)
1.77 + if not ecre.search(header):
1.78 + return [(header, None)]
1.79 + decoded = []
1.80 + dec = ''
1.81 + for line in header.splitlines():
1.82 + # This line might not have an encoding in it
1.83 + if not ecre.search(line):
1.84 + decoded.append((line, None))
1.85 + continue
1.86 + parts = ecre.split(line)
1.87 + while parts:
1.88 + unenc = parts.pop(0).strip()
1.89 + if unenc:
1.90 + # Should we continue a long line?
1.91 + if decoded and decoded[-1][1] is None:
1.92 + decoded[-1] = (decoded[-1][0] + SPACE + unenc, None)
1.93 + else:
1.94 + decoded.append((unenc, None))
1.95 + if parts:
1.96 + charset, encoding = [s.lower() for s in parts[0:2]]
1.97 + encoded = parts[2]
1.98 + dec = None
1.99 + if encoding == 'q':
1.100 + dec = email.quoprimime.header_decode(encoded)
1.101 + elif encoding == 'b':
1.102 + paderr = len(encoded) % 4 # Postel's law: add missing padding
1.103 + if paderr:
1.104 + encoded += '==='[:4 - paderr]
1.105 + try:
1.106 + dec = email.base64mime.decode(encoded)
1.107 + except binascii.Error:
1.108 + # Turn this into a higher level exception. BAW: Right
1.109 + # now we throw the lower level exception away but
1.110 + # when/if we get exception chaining, we'll preserve it.
1.111 + raise HeaderParseError
1.112 + if dec is None:
1.113 + dec = encoded
1.114 +
1.115 + if decoded and decoded[-1][1] == charset:
1.116 + decoded[-1] = (decoded[-1][0] + dec, decoded[-1][1])
1.117 + else:
1.118 + decoded.append((dec, charset))
1.119 + del parts[0:3]
1.120 + return decoded
1.121 +
1.122 +
1.123 +
1.124 +def make_header(decoded_seq, maxlinelen=None, header_name=None,
1.125 + continuation_ws=' '):
1.126 + """Create a Header from a sequence of pairs as returned by decode_header()
1.127 +
1.128 + decode_header() takes a header value string and returns a sequence of
1.129 + pairs of the format (decoded_string, charset) where charset is the string
1.130 + name of the character set.
1.131 +
1.132 + This function takes one of those sequence of pairs and returns a Header
1.133 + instance. Optional maxlinelen, header_name, and continuation_ws are as in
1.134 + the Header constructor.
1.135 + """
1.136 + h = Header(maxlinelen=maxlinelen, header_name=header_name,
1.137 + continuation_ws=continuation_ws)
1.138 + for s, charset in decoded_seq:
1.139 + # None means us-ascii but we can simply pass it on to h.append()
1.140 + if charset is not None and not isinstance(charset, Charset):
1.141 + charset = Charset(charset)
1.142 + h.append(s, charset)
1.143 + return h
1.144 +
1.145 +
1.146 +
1.147 +class Header:
1.148 + def __init__(self, s=None, charset=None,
1.149 + maxlinelen=None, header_name=None,
1.150 + continuation_ws=' ', errors='strict'):
1.151 + """Create a MIME-compliant header that can contain many character sets.
1.152 +
1.153 + Optional s is the initial header value. If None, the initial header
1.154 + value is not set. You can later append to the header with .append()
1.155 + method calls. s may be a byte string or a Unicode string, but see the
1.156 + .append() documentation for semantics.
1.157 +
1.158 + Optional charset serves two purposes: it has the same meaning as the
1.159 + charset argument to the .append() method. It also sets the default
1.160 + character set for all subsequent .append() calls that omit the charset
1.161 + argument. If charset is not provided in the constructor, the us-ascii
1.162 + charset is used both as s's initial charset and as the default for
1.163 + subsequent .append() calls.
1.164 +
1.165 + The maximum line length can be specified explicit via maxlinelen. For
1.166 + splitting the first line to a shorter value (to account for the field
1.167 + header which isn't included in s, e.g. `Subject') pass in the name of
1.168 + the field in header_name. The default maxlinelen is 76.
1.169 +
1.170 + continuation_ws must be RFC 2822 compliant folding whitespace (usually
1.171 + either a space or a hard tab) which will be prepended to continuation
1.172 + lines.
1.173 +
1.174 + errors is passed through to the .append() call.
1.175 + """
1.176 + if charset is None:
1.177 + charset = USASCII
1.178 + if not isinstance(charset, Charset):
1.179 + charset = Charset(charset)
1.180 + self._charset = charset
1.181 + self._continuation_ws = continuation_ws
1.182 + cws_expanded_len = len(continuation_ws.replace('\t', SPACE8))
1.183 + # BAW: I believe `chunks' and `maxlinelen' should be non-public.
1.184 + self._chunks = []
1.185 + if s is not None:
1.186 + self.append(s, charset, errors)
1.187 + if maxlinelen is None:
1.188 + maxlinelen = MAXLINELEN
1.189 + if header_name is None:
1.190 + # We don't know anything about the field header so the first line
1.191 + # is the same length as subsequent lines.
1.192 + self._firstlinelen = maxlinelen
1.193 + else:
1.194 + # The first line should be shorter to take into account the field
1.195 + # header. Also subtract off 2 extra for the colon and space.
1.196 + self._firstlinelen = maxlinelen - len(header_name) - 2
1.197 + # Second and subsequent lines should subtract off the length in
1.198 + # columns of the continuation whitespace prefix.
1.199 + self._maxlinelen = maxlinelen - cws_expanded_len
1.200 +
1.201 + def __str__(self):
1.202 + """A synonym for self.encode()."""
1.203 + return self.encode()
1.204 +
1.205 + def __unicode__(self):
1.206 + """Helper for the built-in unicode function."""
1.207 + uchunks = []
1.208 + lastcs = None
1.209 + for s, charset in self._chunks:
1.210 + # We must preserve spaces between encoded and non-encoded word
1.211 + # boundaries, which means for us we need to add a space when we go
1.212 + # from a charset to None/us-ascii, or from None/us-ascii to a
1.213 + # charset. Only do this for the second and subsequent chunks.
1.214 + nextcs = charset
1.215 + if uchunks:
1.216 + if lastcs not in (None, 'us-ascii'):
1.217 + if nextcs in (None, 'us-ascii'):
1.218 + uchunks.append(USPACE)
1.219 + nextcs = None
1.220 + elif nextcs not in (None, 'us-ascii'):
1.221 + uchunks.append(USPACE)
1.222 + lastcs = nextcs
1.223 + uchunks.append(unicode(s, str(charset)))
1.224 + return UEMPTYSTRING.join(uchunks)
1.225 +
1.226 + # Rich comparison operators for equality only. BAW: does it make sense to
1.227 + # have or explicitly disable <, <=, >, >= operators?
1.228 + def __eq__(self, other):
1.229 + # other may be a Header or a string. Both are fine so coerce
1.230 + # ourselves to a string, swap the args and do another comparison.
1.231 + return other == self.encode()
1.232 +
1.233 + def __ne__(self, other):
1.234 + return not self == other
1.235 +
1.236 + def append(self, s, charset=None, errors='strict'):
1.237 + """Append a string to the MIME header.
1.238 +
1.239 + Optional charset, if given, should be a Charset instance or the name
1.240 + of a character set (which will be converted to a Charset instance). A
1.241 + value of None (the default) means that the charset given in the
1.242 + constructor is used.
1.243 +
1.244 + s may be a byte string or a Unicode string. If it is a byte string
1.245 + (i.e. isinstance(s, str) is true), then charset is the encoding of
1.246 + that byte string, and a UnicodeError will be raised if the string
1.247 + cannot be decoded with that charset. If s is a Unicode string, then
1.248 + charset is a hint specifying the character set of the characters in
1.249 + the string. In this case, when producing an RFC 2822 compliant header
1.250 + using RFC 2047 rules, the Unicode string will be encoded using the
1.251 + following charsets in order: us-ascii, the charset hint, utf-8. The
1.252 + first character set not to provoke a UnicodeError is used.
1.253 +
1.254 + Optional `errors' is passed as the third argument to any unicode() or
1.255 + ustr.encode() call.
1.256 + """
1.257 + if charset is None:
1.258 + charset = self._charset
1.259 + elif not isinstance(charset, Charset):
1.260 + charset = Charset(charset)
1.261 + # If the charset is our faux 8bit charset, leave the string unchanged
1.262 + if charset != '8bit':
1.263 + # We need to test that the string can be converted to unicode and
1.264 + # back to a byte string, given the input and output codecs of the
1.265 + # charset.
1.266 + if isinstance(s, str):
1.267 + # Possibly raise UnicodeError if the byte string can't be
1.268 + # converted to a unicode with the input codec of the charset.
1.269 + incodec = charset.input_codec or 'us-ascii'
1.270 + ustr = unicode(s, incodec, errors)
1.271 + # Now make sure that the unicode could be converted back to a
1.272 + # byte string with the output codec, which may be different
1.273 + # than the iput coded. Still, use the original byte string.
1.274 + outcodec = charset.output_codec or 'us-ascii'
1.275 + ustr.encode(outcodec, errors)
1.276 + elif isinstance(s, unicode):
1.277 + # Now we have to be sure the unicode string can be converted
1.278 + # to a byte string with a reasonable output codec. We want to
1.279 + # use the byte string in the chunk.
1.280 + for charset in USASCII, charset, UTF8:
1.281 + try:
1.282 + outcodec = charset.output_codec or 'us-ascii'
1.283 + s = s.encode(outcodec, errors)
1.284 + break
1.285 + except UnicodeError:
1.286 + pass
1.287 + else:
1.288 + assert False, 'utf-8 conversion failed'
1.289 + self._chunks.append((s, charset))
1.290 +
1.291 + def _split(self, s, charset, maxlinelen, splitchars):
1.292 + # Split up a header safely for use with encode_chunks.
1.293 + splittable = charset.to_splittable(s)
1.294 + encoded = charset.from_splittable(splittable, True)
1.295 + elen = charset.encoded_header_len(encoded)
1.296 + # If the line's encoded length first, just return it
1.297 + if elen <= maxlinelen:
1.298 + return [(encoded, charset)]
1.299 + # If we have undetermined raw 8bit characters sitting in a byte
1.300 + # string, we really don't know what the right thing to do is. We
1.301 + # can't really split it because it might be multibyte data which we
1.302 + # could break if we split it between pairs. The least harm seems to
1.303 + # be to not split the header at all, but that means they could go out
1.304 + # longer than maxlinelen.
1.305 + if charset == '8bit':
1.306 + return [(s, charset)]
1.307 + # BAW: I'm not sure what the right test here is. What we're trying to
1.308 + # do is be faithful to RFC 2822's recommendation that ($2.2.3):
1.309 + #
1.310 + # "Note: Though structured field bodies are defined in such a way that
1.311 + # folding can take place between many of the lexical tokens (and even
1.312 + # within some of the lexical tokens), folding SHOULD be limited to
1.313 + # placing the CRLF at higher-level syntactic breaks."
1.314 + #
1.315 + # For now, I can only imagine doing this when the charset is us-ascii,
1.316 + # although it's possible that other charsets may also benefit from the
1.317 + # higher-level syntactic breaks.
1.318 + elif charset == 'us-ascii':
1.319 + return self._split_ascii(s, charset, maxlinelen, splitchars)
1.320 + # BAW: should we use encoded?
1.321 + elif elen == len(s):
1.322 + # We can split on _maxlinelen boundaries because we know that the
1.323 + # encoding won't change the size of the string
1.324 + splitpnt = maxlinelen
1.325 + first = charset.from_splittable(splittable[:splitpnt], False)
1.326 + last = charset.from_splittable(splittable[splitpnt:], False)
1.327 + else:
1.328 + # Binary search for split point
1.329 + first, last = _binsplit(splittable, charset, maxlinelen)
1.330 + # first is of the proper length so just wrap it in the appropriate
1.331 + # chrome. last must be recursively split.
1.332 + fsplittable = charset.to_splittable(first)
1.333 + fencoded = charset.from_splittable(fsplittable, True)
1.334 + chunk = [(fencoded, charset)]
1.335 + return chunk + self._split(last, charset, self._maxlinelen, splitchars)
1.336 +
1.337 + def _split_ascii(self, s, charset, firstlen, splitchars):
1.338 + chunks = _split_ascii(s, firstlen, self._maxlinelen,
1.339 + self._continuation_ws, splitchars, self._NL)
1.340 + return zip(chunks, [charset]*len(chunks))
1.341 +
1.342 + def _encode_chunks(self, newchunks, maxlinelen):
1.343 + # MIME-encode a header with many different charsets and/or encodings.
1.344 + #
1.345 + # Given a list of pairs (string, charset), return a MIME-encoded
1.346 + # string suitable for use in a header field. Each pair may have
1.347 + # different charsets and/or encodings, and the resulting header will
1.348 + # accurately reflect each setting.
1.349 + #
1.350 + # Each encoding can be email.utils.QP (quoted-printable, for
1.351 + # ASCII-like character sets like iso-8859-1), email.utils.BASE64
1.352 + # (Base64, for non-ASCII like character sets like KOI8-R and
1.353 + # iso-2022-jp), or None (no encoding).
1.354 + #
1.355 + # Each pair will be represented on a separate line; the resulting
1.356 + # string will be in the format:
1.357 + #
1.358 + # =?charset1?q?Mar=EDa_Gonz=E1lez_Alonso?=\n
1.359 + # =?charset2?b?SvxyZ2VuIEL2aW5n?="
1.360 + chunks = []
1.361 + for header, charset in newchunks:
1.362 + if not header:
1.363 + continue
1.364 + if charset is None or charset.header_encoding is None:
1.365 + s = header
1.366 + else:
1.367 + s = charset.header_encode(header)
1.368 + # Don't add more folding whitespace than necessary
1.369 + if chunks and chunks[-1].endswith(' '):
1.370 + extra = ''
1.371 + else:
1.372 + extra = ' '
1.373 + _max_append(chunks, s, maxlinelen, extra)
1.374 + joiner = self._NL + self._continuation_ws
1.375 + return joiner.join(chunks)
1.376 +
1.377 + def encode(self, splitchars=';, ', linesep=NL):
1.378 + """Encode a message header into an RFC-compliant format.
1.379 +
1.380 + There are many issues involved in converting a given string for use in
1.381 + an email header. Only certain character sets are readable in most
1.382 + email clients, and as header strings can only contain a subset of
1.383 + 7-bit ASCII, care must be taken to properly convert and encode (with
1.384 + Base64 or quoted-printable) header strings. In addition, there is a
1.385 + 75-character length limit on any given encoded header field, so
1.386 + line-wrapping must be performed, even with double-byte character sets.
1.387 +
1.388 + This method will do its best to convert the string to the correct
1.389 + character set used in email, and encode and line wrap it safely with
1.390 + the appropriate scheme for that character set.
1.391 +
1.392 + If the given charset is not known or an error occurs during
1.393 + conversion, this function will return the header untouched.
1.394 +
1.395 + Optional splitchars is a string containing characters to split long
1.396 + ASCII lines on, in rough support of RFC 2822's `highest level
1.397 + syntactic breaks'. This doesn't affect RFC 2047 encoded lines.
1.398 + """
1.399 + self._NL = linesep
1.400 + newchunks = []
1.401 + maxlinelen = self._firstlinelen
1.402 + lastlen = 0
1.403 + for s, charset in self._chunks:
1.404 + # The first bit of the next chunk should be just long enough to
1.405 + # fill the next line. Don't forget the space separating the
1.406 + # encoded words.
1.407 + targetlen = maxlinelen - lastlen - 1
1.408 + if targetlen < charset.encoded_header_len(''):
1.409 + # Stick it on the next line
1.410 + targetlen = maxlinelen
1.411 + newchunks += self._split(s, charset, targetlen, splitchars)
1.412 + lastchunk, lastcharset = newchunks[-1]
1.413 + lastlen = lastcharset.encoded_header_len(lastchunk)
1.414 + value = self._encode_chunks(newchunks, maxlinelen)
1.415 + if _embeded_header.search(value):
1.416 + raise HeaderParseError("header value appears to contain "
1.417 + "an embedded header: {!r}".format(value))
1.418 + return value
1.419 +
1.420 +
1.421 +
1.422 +def _split_ascii(s, firstlen, restlen, continuation_ws, splitchars, linesep):
1.423 + lines = []
1.424 + maxlen = firstlen
1.425 + for line in s.split(linesep):
1.426 + # Ignore any leading whitespace (i.e. continuation whitespace) already
1.427 + # on the line, since we'll be adding our own.
1.428 + line = line.lstrip()
1.429 + if len(line) < maxlen:
1.430 + lines.append(line)
1.431 + maxlen = restlen
1.432 + continue
1.433 + # Attempt to split the line at the highest-level syntactic break
1.434 + # possible. Note that we don't have a lot of smarts about field
1.435 + # syntax; we just try to break on semi-colons, then commas, then
1.436 + # whitespace.
1.437 + for ch in splitchars:
1.438 + if ch in line:
1.439 + break
1.440 + else:
1.441 + # There's nothing useful to split the line on, not even spaces, so
1.442 + # just append this line unchanged
1.443 + lines.append(line)
1.444 + maxlen = restlen
1.445 + continue
1.446 + # Now split the line on the character plus trailing whitespace
1.447 + cre = re.compile(r'%s\s*' % ch)
1.448 + if ch in ';,':
1.449 + eol = ch
1.450 + else:
1.451 + eol = ''
1.452 + joiner = eol + ' '
1.453 + joinlen = len(joiner)
1.454 + wslen = len(continuation_ws.replace('\t', SPACE8))
1.455 + this = []
1.456 + linelen = 0
1.457 + for part in cre.split(line):
1.458 + curlen = linelen + max(0, len(this)-1) * joinlen
1.459 + partlen = len(part)
1.460 + onfirstline = not lines
1.461 + # We don't want to split after the field name, if we're on the
1.462 + # first line and the field name is present in the header string.
1.463 + if ch == ' ' and onfirstline and \
1.464 + len(this) == 1 and fcre.match(this[0]):
1.465 + this.append(part)
1.466 + linelen += partlen
1.467 + elif curlen + partlen > maxlen:
1.468 + if this:
1.469 + lines.append(joiner.join(this) + eol)
1.470 + # If this part is longer than maxlen and we aren't already
1.471 + # splitting on whitespace, try to recursively split this line
1.472 + # on whitespace.
1.473 + if partlen > maxlen and ch != ' ':
1.474 + subl = _split_ascii(part, maxlen, restlen,
1.475 + continuation_ws, ' ', self._NL)
1.476 + lines.extend(subl[:-1])
1.477 + this = [subl[-1]]
1.478 + else:
1.479 + this = [part]
1.480 + linelen = wslen + len(this[-1])
1.481 + maxlen = restlen
1.482 + else:
1.483 + this.append(part)
1.484 + linelen += partlen
1.485 + # Put any left over parts on a line by themselves
1.486 + if this:
1.487 + lines.append(joiner.join(this))
1.488 + return lines
1.489 +
1.490 +
1.491 +
1.492 +def _binsplit(splittable, charset, maxlinelen):
1.493 + i = 0
1.494 + j = len(splittable)
1.495 + while i < j:
1.496 + # Invariants:
1.497 + # 1. splittable[:k] fits for all k <= i (note that we *assume*,
1.498 + # at the start, that splittable[:0] fits).
1.499 + # 2. splittable[:k] does not fit for any k > j (at the start,
1.500 + # this means we shouldn't look at any k > len(splittable)).
1.501 + # 3. We don't know about splittable[:k] for k in i+1..j.
1.502 + # 4. We want to set i to the largest k that fits, with i <= k <= j.
1.503 + #
1.504 + m = (i+j+1) >> 1 # ceiling((i+j)/2); i < m <= j
1.505 + chunk = charset.from_splittable(splittable[:m], True)
1.506 + chunklen = charset.encoded_header_len(chunk)
1.507 + if chunklen <= maxlinelen:
1.508 + # m is acceptable, so is a new lower bound.
1.509 + i = m
1.510 + else:
1.511 + # m is not acceptable, so final i must be < m.
1.512 + j = m - 1
1.513 + # i == j. Invariant #1 implies that splittable[:i] fits, and
1.514 + # invariant #2 implies that splittable[:i+1] does not fit, so i
1.515 + # is what we're looking for.
1.516 + first = charset.from_splittable(splittable[:i], False)
1.517 + last = charset.from_splittable(splittable[i:], False)
1.518 + return first, last