MoinMessage

Changeset

133:9ff6f746f5e9
2015-04-06 Paul Boddie raw files shortlog changelog graph Added a more specific version of the MoinMessage module. gpgutils
GPGUtils.py (file)
     1.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     1.2 +++ b/GPGUtils.py	Mon Apr 06 17:17:41 2015 +0200
     1.3 @@ -0,0 +1,376 @@
     1.4 +#!/usr/bin/env python
     1.5 +
     1.6 +"""
     1.7 +GPG utilities derived from the MoinMessage library.
     1.8 +
     1.9 +Copyright (C) 2012, 2013, 2014 Paul Boddie <paul@boddie.org.uk>
    1.10 +
    1.11 +This program is free software; you can redistribute it and/or modify it under
    1.12 +the terms of the GNU General Public License as published by the Free Software
    1.13 +Foundation; either version 3 of the License, or (at your option) any later
    1.14 +version.
    1.15 +
    1.16 +This program is distributed in the hope that it will be useful, but WITHOUT
    1.17 +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
    1.18 +FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
    1.19 +details.
    1.20 +
    1.21 +You should have received a copy of the GNU General Public License along with
    1.22 +this program.  If not, see <http://www.gnu.org/licenses/>.
    1.23 +"""
    1.24 +
    1.25 +from email.encoders import encode_noop
    1.26 +from email.generator import Generator
    1.27 +from email.mime.multipart import MIMEMultipart
    1.28 +from email.mime.application import MIMEApplication
    1.29 +from email.mime.base import MIMEBase
    1.30 +from subprocess import Popen, PIPE
    1.31 +from tempfile import mkstemp
    1.32 +import os
    1.33 +
    1.34 +try:
    1.35 +    from cStringIO import StringIO
    1.36 +except ImportError:
    1.37 +    from StringIO import StringIO
    1.38 +
    1.39 +class GPGError(Exception):
    1.40 +    pass
    1.41 +
    1.42 +class GPGDecodingError(Exception):
    1.43 +    pass
    1.44 +
    1.45 +class GPGMissingPart(GPGDecodingError):
    1.46 +    pass
    1.47 +
    1.48 +class GPGBadContent(GPGDecodingError):
    1.49 +    pass
    1.50 +
    1.51 +class GPG:
    1.52 +
    1.53 +    "A wrapper around the gpg command using a particular configuration."
    1.54 +
    1.55 +    def __init__(self, homedir=None):
    1.56 +        self.conf_args = []
    1.57 +
    1.58 +        if homedir:
    1.59 +            self.conf_args += ["--homedir", homedir]
    1.60 +
    1.61 +        self.errors = None
    1.62 +
    1.63 +    def run(self, args, text=None):
    1.64 +
    1.65 +        """
    1.66 +        Invoke gpg with the given 'args', supplying the given 'text' to the
    1.67 +        command directly or, if 'text' is omitted, using a file provided as part
    1.68 +        of the 'args' if appropriate.
    1.69 +
    1.70 +        Failure to complete the operation will result in a GPGError being
    1.71 +        raised.
    1.72 +        """
    1.73 +
    1.74 +        cmd = Popen(["gpg"] + self.conf_args + list(args), stdin=PIPE, stdout=PIPE, stderr=PIPE)
    1.75 +
    1.76 +        # Attempt to write input to the command and to read output from the
    1.77 +        # command.
    1.78 +
    1.79 +        text, self.errors = cmd.communicate(text)
    1.80 +
    1.81 +        # Test for a zero result.
    1.82 +
    1.83 +        if not cmd.returncode:
    1.84 +            return text
    1.85 +        else:
    1.86 +            raise GPGError, self.errors
    1.87 +
    1.88 +    def verifyMessageText(self, signature, content):
    1.89 +
    1.90 +        "Using the given 'signature', verify the given message 'content'."
    1.91 +
    1.92 +        # Write the detached signature and content to files.
    1.93 +
    1.94 +        signature_fd, signature_filename = mkstemp()
    1.95 +        content_fd, content_filename = mkstemp()
    1.96 +
    1.97 +        try:
    1.98 +            signature_fp = os.fdopen(signature_fd, "w")
    1.99 +            content_fp = os.fdopen(content_fd, "w")
   1.100 +            try:
   1.101 +                signature_fp.write(signature)
   1.102 +                content_fp.write(content)
   1.103 +            finally:
   1.104 +                signature_fp.close()
   1.105 +                content_fp.close()
   1.106 +
   1.107 +            # Verify the message text.
   1.108 +
   1.109 +            text = self.run(["--status-fd", "1", "--verify", signature_filename, content_filename])
   1.110 +
   1.111 +            # Return the details of the signing key.
   1.112 +
   1.113 +            identity = None
   1.114 +            fingerprint = None
   1.115 +
   1.116 +            for line in text.split("\n"):
   1.117 +                try:
   1.118 +                    prefix, msgtype, digest, details = line.strip().split(" ", 3)
   1.119 +                except ValueError:
   1.120 +                    continue
   1.121 +
   1.122 +                # Return the fingerprint and identity details.
   1.123 +
   1.124 +                if msgtype == "GOODSIG":
   1.125 +                    identity = details
   1.126 +                elif msgtype == "VALIDSIG":
   1.127 +                    fingerprint = digest
   1.128 +
   1.129 +                if identity and fingerprint:
   1.130 +                    return fingerprint, identity
   1.131 +
   1.132 +            return None
   1.133 +
   1.134 +        finally:
   1.135 +            os.remove(signature_filename)
   1.136 +            os.remove(content_filename)
   1.137 +
   1.138 +    def verifyMessage(self, message):
   1.139 +
   1.140 +        """
   1.141 +        Verify the given RFC 3156 'message', returning a tuple of the form
   1.142 +        (fingerprint, identity, content).
   1.143 +        """
   1.144 +
   1.145 +        content, signature = getContentAndSignature(message)
   1.146 +
   1.147 +        # Verify the message format.
   1.148 +
   1.149 +        if signature.get_content_type() != "application/pgp-signature":
   1.150 +            raise GPGBadContent
   1.151 +
   1.152 +        # Verify the message.
   1.153 +
   1.154 +        fingerprint, identity = self.verifyMessageText(signature.get_payload(decode=True), as_string(content))
   1.155 +        return fingerprint, identity, content
   1.156 +
   1.157 +    def signMessage(self, message, keyid):
   1.158 +
   1.159 +        """
   1.160 +        Return a signed version of 'message' using the given 'keyid'.
   1.161 +        """
   1.162 +
   1.163 +        # Sign the container's representation.
   1.164 +
   1.165 +        signature = self.run(["--armor", "-u", keyid, "--detach-sig"], as_string(message))
   1.166 +
   1.167 +        # Make the container for the message.
   1.168 +
   1.169 +        signed_message = MIMEMultipart("signed", protocol="application/pgp-signature")
   1.170 +        signed_message.attach(message)
   1.171 +
   1.172 +        signature_part = MIMEBase("application", "pgp-signature")
   1.173 +        signature_part.set_payload(signature)
   1.174 +        signed_message.attach(signature_part)
   1.175 +
   1.176 +        return signed_message
   1.177 +
   1.178 +    def decryptMessageText(self, message):
   1.179 +
   1.180 +        "Return a decrypted version of 'message'."
   1.181 +
   1.182 +        return self.run(["--decrypt"], message)
   1.183 +
   1.184 +    def decryptMessage(self, message):
   1.185 +
   1.186 +        """
   1.187 +        Decrypt the given RFC 3156 'message', returning the message text.
   1.188 +        """
   1.189 +
   1.190 +        try:
   1.191 +            declaration, content = message.get_payload()
   1.192 +        except ValueError:
   1.193 +            raise GPGMissingPart
   1.194 +
   1.195 +        # Verify the message format.
   1.196 +
   1.197 +        if content.get_content_type() != "application/octet-stream":
   1.198 +            raise GPGBadContent
   1.199 +
   1.200 +        # Return the decrypted message text.
   1.201 +
   1.202 +        return self.decryptMessageText(content.get_payload(decode=True))
   1.203 +
   1.204 +    def encryptMessage(self, message, keyid):
   1.205 +
   1.206 +        """
   1.207 +        Return an encrypted version of 'message' using the given 'keyid'.
   1.208 +        """
   1.209 +
   1.210 +        text = as_string(message)
   1.211 +        encrypted = self.run(["--armor", "-r", keyid, "--encrypt", "--trust-model", "always"], text)
   1.212 +
   1.213 +        # Make the container for the message.
   1.214 +
   1.215 +        encrypted_message = MIMEMultipart("encrypted", protocol="application/pgp-encrypted")
   1.216 +
   1.217 +        # For encrypted content, add the declaration and content.
   1.218 +
   1.219 +        declaration = MIMEBase("application", "pgp-encrypted")
   1.220 +        declaration.set_payload("Version: 1")
   1.221 +        encrypted_message.attach(declaration)
   1.222 +
   1.223 +        content = MIMEApplication(encrypted, "octet-stream", encode_noop)
   1.224 +        encrypted_message.attach(content)
   1.225 +
   1.226 +        return encrypted_message
   1.227 +
   1.228 +    def importKeys(self, text):
   1.229 +
   1.230 +        """
   1.231 +        Import the keys provided by the given 'text'.
   1.232 +        """
   1.233 +
   1.234 +        self.run(["--import"], text)
   1.235 +
   1.236 +    def exportKey(self, keyid):
   1.237 +
   1.238 +        """
   1.239 +        Return the "armoured" public key text for 'keyid' as a message part with
   1.240 +        a suitable media type.
   1.241 +        See: https://tools.ietf.org/html/rfc3156#section-7
   1.242 +        """
   1.243 +
   1.244 +        text = self.run(["--armor", "--export", keyid])
   1.245 +        return MIMEApplication(text, "pgp-keys", encode_noop)
   1.246 +
   1.247 +    def listKeys(self, keyid=None):
   1.248 +
   1.249 +        """
   1.250 +        Return a list of key details for keys on the keychain, selecting only
   1.251 +        one specific key if 'keyid' is specified.
   1.252 +        """
   1.253 +
   1.254 +        text = self.run(["--list-keys", "--with-colons", "--with-fingerprint"] +
   1.255 +            (keyid and ["0x%s" % keyid] or []))
   1.256 +        return self._getKeysFromResult(text)
   1.257 +
   1.258 +    def listSignatures(self, keyid=None):
   1.259 +
   1.260 +        """
   1.261 +        Return a list of key and signature details for keys on the keychain,
   1.262 +        selecting only one specific key if 'keyid' is specified.
   1.263 +        """
   1.264 +
   1.265 +        text = self.run(["--list-sigs", "--with-colons", "--with-fingerprint"] +
   1.266 +            (keyid and ["0x%s" % keyid] or []))
   1.267 +        return self._getKeysFromResult(text)
   1.268 +
   1.269 +    def getKeysFromMessagePart(self, part):
   1.270 +
   1.271 +        """
   1.272 +        Process an application/pgp-keys message 'part', returning a list of
   1.273 +        key details.
   1.274 +        """
   1.275 +
   1.276 +        return self.getKeysFromString(part.get_payload(decode=True))
   1.277 +
   1.278 +    def getKeysFromString(self, s):
   1.279 +
   1.280 +        """
   1.281 +        Return a list of key details extracted from the given key block string
   1.282 +        's'. Signature information is also included through the use of the gpg
   1.283 +        verbose option.
   1.284 +        """
   1.285 +
   1.286 +        text = self.run(["--with-colons", "--with-fingerprint", "-v"], s)
   1.287 +        return self._getKeysFromResult(text)
   1.288 +
   1.289 +    def _getKeysFromResult(self, text):
   1.290 +
   1.291 +        """
   1.292 +        Return a list of key details extracted from the given command result
   1.293 +        'text'.
   1.294 +        """
   1.295 +
   1.296 +        keys = []
   1.297 +        for line in text.split("\n"):
   1.298 +            try:
   1.299 +                recordtype, trust, keylength, algorithm, keyid, cdate, expdate, serial, ownertrust, _rest = line.split(":", 9)
   1.300 +            except ValueError:
   1.301 +                continue
   1.302 +
   1.303 +            if recordtype == "pub":
   1.304 +                userid, _rest = _rest.split(":", 1)
   1.305 +                keys.append({
   1.306 +                    "type" : recordtype, "trust" : trust, "keylength" : keylength,
   1.307 +                    "algorithm" : algorithm, "keyid" : keyid, "cdate" : cdate,
   1.308 +                    "expdate" : expdate, "userid" : userid, "ownertrust" : ownertrust,
   1.309 +                    "fingerprint" : None, "subkeys" : [], "signatures" : []
   1.310 +                    })
   1.311 +            elif recordtype == "sub" and keys:
   1.312 +                keys[-1]["subkeys"].append({
   1.313 +                    "trust" : trust, "keylength" : keylength, "algorithm" : algorithm,
   1.314 +                    "keyid" : keyid, "cdate" : cdate, "expdate" : expdate,
   1.315 +                    "ownertrust" : ownertrust
   1.316 +                    })
   1.317 +            elif recordtype == "fpr" and keys:
   1.318 +                fingerprint, _rest = _rest.split(":", 1)
   1.319 +                keys[-1]["fingerprint"] = fingerprint
   1.320 +            elif recordtype == "sig" and keys:
   1.321 +                userid, _rest = _rest.split(":", 1)
   1.322 +                keys[-1]["signatures"].append({
   1.323 +                    "keyid" : keyid, "cdate" : cdate, "expdate" : expdate,
   1.324 +                    "userid" : userid
   1.325 +                    })
   1.326 +
   1.327 +        return keys
   1.328 +
   1.329 +# Message serialisation functions, working around email module problems.
   1.330 +
   1.331 +def as_string(message):
   1.332 +
   1.333 +    """
   1.334 +    Return the string representation of 'message', attempting to preserve the
   1.335 +    precise original formatting.
   1.336 +    """
   1.337 +
   1.338 +    out = StringIO()
   1.339 +    generator = Generator(out, False, 0) # disable reformatting measures
   1.340 +    generator.flatten(message)
   1.341 +    return out.getvalue()
   1.342 +
   1.343 +# Message decoding functions.
   1.344 +
   1.345 +# Detect PGP/GPG-encoded payloads.
   1.346 +# See: http://tools.ietf.org/html/rfc3156
   1.347 +
   1.348 +def is_signed(message):
   1.349 +    mimetype = message.get_content_type()
   1.350 +    encoding = message.get_content_charset()
   1.351 +
   1.352 +    return mimetype == "multipart/signed" and \
   1.353 +        message.get_param("protocol") == "application/pgp-signature"
   1.354 +
   1.355 +def is_encrypted(message):
   1.356 +    mimetype = message.get_content_type()
   1.357 +    encoding = message.get_content_charset()
   1.358 +
   1.359 +    return mimetype == "multipart/encrypted" and \
   1.360 +        message.get_param("protocol") == "application/pgp-encrypted"
   1.361 +
   1.362 +def getContentAndSignature(message):
   1.363 +
   1.364 +    """
   1.365 +    Return the content and signature parts of the given RFC 3156 'message'.
   1.366 +
   1.367 +    NOTE: RFC 3156 states that signed messages should employ a detached
   1.368 +    NOTE: signature but then shows "BEGIN PGP MESSAGE" for signatures
   1.369 +    NOTE: instead of "BEGIN PGP SIGNATURE".
   1.370 +    NOTE: The "micalg" parameter is currently not supported.
   1.371 +    """
   1.372 +
   1.373 +    try:
   1.374 +        content, signature = message.get_payload()
   1.375 +        return content, signature
   1.376 +    except ValueError:
   1.377 +        raise GPGMissingPart
   1.378 +
   1.379 +# vim: tabstop=4 expandtab shiftwidth=4