1 #!/usr/bin/env python 2 3 """ 4 GPG utilities derived from the MoinMessage library. 5 6 Copyright (C) 2012, 2013, 2014 Paul Boddie <paul@boddie.org.uk> 7 8 This program is free software; you can redistribute it and/or modify it under 9 the terms of the GNU General Public License as published by the Free Software 10 Foundation; either version 3 of the License, or (at your option) any later 11 version. 12 13 This program is distributed in the hope that it will be useful, but WITHOUT 14 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 15 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more 16 details. 17 18 You should have received a copy of the GNU General Public License along with 19 this program. If not, see <http://www.gnu.org/licenses/>. 20 """ 21 22 from email.encoders import encode_noop 23 from email.generator import Generator 24 from email.mime.multipart import MIMEMultipart 25 from email.mime.application import MIMEApplication 26 from email.mime.base import MIMEBase 27 from subprocess import Popen, PIPE 28 from tempfile import mkstemp 29 import os 30 31 try: 32 from cStringIO import StringIO 33 except ImportError: 34 from StringIO import StringIO 35 36 class GPGError(Exception): 37 pass 38 39 class GPGDecodingError(Exception): 40 pass 41 42 class GPGMissingPart(GPGDecodingError): 43 pass 44 45 class GPGBadContent(GPGDecodingError): 46 pass 47 48 class GPG: 49 50 "A wrapper around the gpg command using a particular configuration." 51 52 def __init__(self, homedir=None): 53 self.conf_args = [] 54 55 if homedir: 56 self.conf_args += ["--homedir", homedir] 57 58 self.errors = None 59 60 def run(self, args, text=None): 61 62 """ 63 Invoke gpg with the given 'args', supplying the given 'text' to the 64 command directly or, if 'text' is omitted, using a file provided as part 65 of the 'args' if appropriate. 66 67 Failure to complete the operation will result in a GPGError being 68 raised. 69 """ 70 71 cmd = Popen(["gpg"] + self.conf_args + list(args), stdin=PIPE, stdout=PIPE, stderr=PIPE) 72 73 # Attempt to write input to the command and to read output from the 74 # command. 75 76 text, self.errors = cmd.communicate(text) 77 78 # Test for a zero result. 79 80 if not cmd.returncode: 81 return text 82 else: 83 raise GPGError, self.errors 84 85 def verifyMessageText(self, signature, content): 86 87 "Using the given 'signature', verify the given message 'content'." 88 89 # Write the detached signature and content to files. 90 91 signature_fd, signature_filename = mkstemp() 92 content_fd, content_filename = mkstemp() 93 94 try: 95 signature_fp = os.fdopen(signature_fd, "w") 96 content_fp = os.fdopen(content_fd, "w") 97 try: 98 signature_fp.write(signature) 99 content_fp.write(content) 100 finally: 101 signature_fp.close() 102 content_fp.close() 103 104 # Verify the message text. 105 106 text = self.run(["--status-fd", "1", "--verify", signature_filename, content_filename]) 107 108 # Return the details of the signing key. 109 110 identity = None 111 fingerprint = None 112 113 for line in text.split("\n"): 114 try: 115 prefix, msgtype, digest, details = line.strip().split(" ", 3) 116 except ValueError: 117 continue 118 119 # Return the fingerprint and identity details. 120 121 if msgtype == "GOODSIG": 122 identity = details 123 elif msgtype == "VALIDSIG": 124 fingerprint = digest 125 126 if identity and fingerprint: 127 return fingerprint, identity 128 129 return None 130 131 finally: 132 os.remove(signature_filename) 133 os.remove(content_filename) 134 135 def verifyMessage(self, message): 136 137 """ 138 Verify the given RFC 3156 'message', returning a tuple of the form 139 (fingerprint, identity, content). 140 """ 141 142 content, signature = getContentAndSignature(message) 143 144 # Verify the message format. 145 146 if signature.get_content_type() != "application/pgp-signature": 147 raise GPGBadContent 148 149 # Verify the message. 150 151 fingerprint, identity = self.verifyMessageText(signature.get_payload(decode=True), as_string(content)) 152 return fingerprint, identity, content 153 154 def signMessageText(self, text, keyid): 155 156 "Return a signature for 'text' using the given 'keyid'." 157 158 return self.run(["--armor", "-u", keyid, "--detach-sig"], text) 159 160 def signMessage(self, message, keyid): 161 162 """ 163 Return a signed version of 'message' using the given 'keyid'. 164 """ 165 166 # Sign the container's representation. 167 168 text = as_string(message) 169 signature = self.signMessageText(text, keyid) 170 171 # Make the container for the message. 172 173 signed_message = MIMEMultipart("signed", protocol="application/pgp-signature") 174 signed_message.attach(message) 175 176 signature_part = MIMEBase("application", "pgp-signature") 177 signature_part.set_payload(signature) 178 signed_message.attach(signature_part) 179 180 return signed_message 181 182 def decryptMessageText(self, text): 183 184 "Return a decrypted version of 'text'." 185 186 return self.run(["--decrypt"], text) 187 188 def decryptMessage(self, message): 189 190 """ 191 Decrypt the given RFC 3156 'message', returning the message text. 192 """ 193 194 try: 195 declaration, content = message.get_payload() 196 except ValueError: 197 raise GPGMissingPart 198 199 # Verify the message format. 200 201 if content.get_content_type() != "application/octet-stream": 202 raise GPGBadContent 203 204 # Return the decrypted message text. 205 206 return self.decryptMessageText(content.get_payload(decode=True)) 207 208 def encryptMessageText(self, text, keyid): 209 210 "Return an encrypted version of 'text' using the given 'keyid'." 211 212 return self.run(["--armor", "-r", keyid, "--encrypt", "--trust-model", "always"], text) 213 214 def encryptMessage(self, message, keyid): 215 216 """ 217 Return an encrypted version of 'message' using the given 'keyid'. 218 """ 219 220 text = as_string(message) 221 encrypted = self.encryptMessageText(text, keyid) 222 223 # Make the container for the message. 224 225 encrypted_message = MIMEMultipart("encrypted", protocol="application/pgp-encrypted") 226 227 # For encrypted content, add the declaration and content. 228 229 declaration = MIMEBase("application", "pgp-encrypted") 230 declaration.set_payload("Version: 1") 231 encrypted_message.attach(declaration) 232 233 content = MIMEApplication(encrypted, "octet-stream", encode_noop) 234 encrypted_message.attach(content) 235 236 return encrypted_message 237 238 def importKeys(self, text): 239 240 """ 241 Import the keys provided by the given 'text'. 242 """ 243 244 self.run(["--import"], text) 245 246 def exportKey(self, keyid): 247 248 """ 249 Return the "armoured" public key text for 'keyid' as a message part with 250 a suitable media type. 251 See: https://tools.ietf.org/html/rfc3156#section-7 252 """ 253 254 text = self.run(["--armor", "--export", keyid]) 255 return MIMEApplication(text, "pgp-keys", encode_noop) 256 257 def listKeys(self, keyid=None): 258 259 """ 260 Return a list of key details for keys on the keychain, selecting only 261 one specific key if 'keyid' is specified. 262 """ 263 264 text = self.run(["--list-keys", "--with-colons", "--with-fingerprint"] + 265 (keyid and ["0x%s" % keyid] or [])) 266 return self._getKeysFromResult(text) 267 268 def listSignatures(self, keyid=None): 269 270 """ 271 Return a list of key and signature details for keys on the keychain, 272 selecting only one specific key if 'keyid' is specified. 273 """ 274 275 text = self.run(["--list-sigs", "--with-colons", "--with-fingerprint"] + 276 (keyid and ["0x%s" % keyid] or [])) 277 return self._getKeysFromResult(text) 278 279 def getKeysFromMessagePart(self, part): 280 281 """ 282 Process an application/pgp-keys message 'part', returning a list of 283 key details. 284 """ 285 286 return self.getKeysFromString(part.get_payload(decode=True)) 287 288 def getKeysFromString(self, s): 289 290 """ 291 Return a list of key details extracted from the given key block string 292 's'. Signature information is also included through the use of the gpg 293 verbose option. 294 """ 295 296 text = self.run(["--with-colons", "--with-fingerprint", "-v"], s) 297 return self._getKeysFromResult(text) 298 299 def _getKeysFromResult(self, text): 300 301 """ 302 Return a list of key details extracted from the given command result 303 'text'. 304 """ 305 306 keys = [] 307 for line in text.split("\n"): 308 try: 309 recordtype, trust, keylength, algorithm, keyid, cdate, expdate, serial, ownertrust, _rest = line.split(":", 9) 310 except ValueError: 311 continue 312 313 if recordtype == "pub": 314 userid, _rest = _rest.split(":", 1) 315 keys.append({ 316 "type" : recordtype, "trust" : trust, "keylength" : keylength, 317 "algorithm" : algorithm, "keyid" : keyid, "cdate" : cdate, 318 "expdate" : expdate, "userid" : userid, "ownertrust" : ownertrust, 319 "fingerprint" : None, "subkeys" : [], "signatures" : [] 320 }) 321 elif recordtype == "sub" and keys: 322 keys[-1]["subkeys"].append({ 323 "trust" : trust, "keylength" : keylength, "algorithm" : algorithm, 324 "keyid" : keyid, "cdate" : cdate, "expdate" : expdate, 325 "ownertrust" : ownertrust 326 }) 327 elif recordtype == "fpr" and keys: 328 fingerprint, _rest = _rest.split(":", 1) 329 keys[-1]["fingerprint"] = fingerprint 330 elif recordtype == "sig" and keys: 331 userid, _rest = _rest.split(":", 1) 332 keys[-1]["signatures"].append({ 333 "keyid" : keyid, "cdate" : cdate, "expdate" : expdate, 334 "userid" : userid 335 }) 336 337 return keys 338 339 # Message serialisation functions, working around email module problems. 340 341 def as_string(message): 342 343 """ 344 Return the string representation of 'message', attempting to preserve the 345 precise original formatting. 346 """ 347 348 out = StringIO() 349 generator = Generator(out, False, 0) # disable reformatting measures 350 generator.flatten(message) 351 return out.getvalue() 352 353 # Message decoding functions. 354 355 # Detect PGP/GPG-encoded payloads. 356 # See: http://tools.ietf.org/html/rfc3156 357 358 def is_signed(message): 359 mimetype = message.get_content_type() 360 encoding = message.get_content_charset() 361 362 return mimetype == "multipart/signed" and \ 363 message.get_param("protocol") == "application/pgp-signature" 364 365 def is_encrypted(message): 366 mimetype = message.get_content_type() 367 encoding = message.get_content_charset() 368 369 return mimetype == "multipart/encrypted" and \ 370 message.get_param("protocol") == "application/pgp-encrypted" 371 372 def getContentAndSignature(message): 373 374 """ 375 Return the content and signature parts of the given RFC 3156 'message'. 376 377 NOTE: RFC 3156 states that signed messages should employ a detached 378 NOTE: signature but then shows "BEGIN PGP MESSAGE" for signatures 379 NOTE: instead of "BEGIN PGP SIGNATURE". 380 NOTE: The "micalg" parameter is currently not supported. 381 """ 382 383 try: 384 content, signature = message.get_payload() 385 return content, signature 386 except ValueError: 387 raise GPGMissingPart 388 389 # vim: tabstop=4 expandtab shiftwidth=4