1 #!/usr/bin/env python 2 3 """ 4 GPG utilities derived from the MoinMessage library. 5 6 Copyright (C) 2012, 2013, 2014, 2015 Paul Boddie <paul@boddie.org.uk> 7 8 This program is free software; you can redistribute it and/or modify it under 9 the terms of the GNU General Public License as published by the Free Software 10 Foundation; either version 3 of the License, or (at your option) any later 11 version. 12 13 This program is distributed in the hope that it will be useful, but WITHOUT 14 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 15 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more 16 details. 17 18 You should have received a copy of the GNU General Public License along with 19 this program. If not, see <http://www.gnu.org/licenses/>. 20 """ 21 22 # NOTE: Nasty fix-up for the email module, thanks to the Python 2 abandonment 23 # NOTE: policy of the core developers. 24 25 from emailfix.generator import Generator 26 27 from email.encoders import encode_noop 28 from email.mime.multipart import MIMEMultipart 29 from email.mime.application import MIMEApplication 30 from email.mime.base import MIMEBase 31 from subprocess import Popen, PIPE 32 from tempfile import mkstemp 33 import os 34 35 try: 36 from cStringIO import StringIO 37 except ImportError: 38 from StringIO import StringIO 39 40 class GPGError(Exception): 41 pass 42 43 class GPGDecodingError(Exception): 44 pass 45 46 class GPGMissingPart(GPGDecodingError): 47 pass 48 49 class GPGBadContent(GPGDecodingError): 50 pass 51 52 class GPG: 53 54 "A wrapper around the gpg command using a particular configuration." 55 56 def __init__(self, homedir=None): 57 self.conf_args = [] 58 59 if homedir: 60 self.conf_args += ["--homedir", homedir] 61 62 self.errors = None 63 64 def run(self, args, text=None): 65 66 """ 67 Invoke gpg with the given 'args', supplying the given 'text' to the 68 command directly or, if 'text' is omitted, using a file provided as part 69 of the 'args' if appropriate. 70 71 Failure to complete the operation will result in a GPGError being 72 raised. 73 """ 74 75 cmd = Popen(["gpg"] + self.conf_args + list(args), stdin=PIPE, stdout=PIPE, stderr=PIPE) 76 77 # Attempt to write input to the command and to read output from the 78 # command. 79 80 text, self.errors = cmd.communicate(text) 81 82 # Test for a zero result. 83 84 if not cmd.returncode: 85 return text 86 else: 87 raise GPGError, self.errors 88 89 def verifyMessageText(self, signature, content): 90 91 "Using the given 'signature', verify the given message 'content'." 92 93 # Write the detached signature and content to files. 94 95 signature_fd, signature_filename = mkstemp() 96 content_fd, content_filename = mkstemp() 97 98 try: 99 signature_fp = os.fdopen(signature_fd, "w") 100 content_fp = os.fdopen(content_fd, "w") 101 try: 102 signature_fp.write(signature) 103 content_fp.write(content) 104 finally: 105 signature_fp.close() 106 content_fp.close() 107 108 # Verify the message text. 109 110 text = self.run(["--status-fd", "1", "--verify", signature_filename, content_filename]) 111 112 # Return the details of the signing key. 113 114 identity = None 115 fingerprint = None 116 117 for line in text.split("\n"): 118 try: 119 prefix, msgtype, digest, details = line.strip().split(" ", 3) 120 except ValueError: 121 continue 122 123 # Return the fingerprint and identity details. 124 125 if msgtype == "GOODSIG": 126 identity = details 127 elif msgtype == "VALIDSIG": 128 fingerprint = digest 129 130 if identity and fingerprint: 131 return fingerprint, identity 132 133 return None 134 135 finally: 136 os.remove(signature_filename) 137 os.remove(content_filename) 138 139 def verifyMessage(self, message): 140 141 """ 142 Verify the given RFC 3156 'message', returning a tuple of the form 143 (fingerprint, identity, content). 144 """ 145 146 content, signature = getContentAndSignature(message) 147 148 # Verify the message format. 149 150 if signature.get_content_type() != "application/pgp-signature": 151 raise GPGBadContent 152 153 # Verify the message. 154 155 fingerprint, identity = self.verifyMessageText(signature.get_payload(decode=True), as_string(content)) 156 return fingerprint, identity, content 157 158 def signMessageText(self, text, keyid): 159 160 "Return a signature for 'text' using the given 'keyid'." 161 162 return self.run(["--armor", "-u", keyid, "--detach-sig"], text) 163 164 def signMessage(self, message, keyid): 165 166 """ 167 Return a signed version of 'message' using the given 'keyid'. 168 """ 169 170 # Sign the container's representation. 171 172 text = as_string(message) 173 signature = self.signMessageText(text, keyid) 174 175 # Make the container for the message. 176 177 signed_message = MIMEMultipart("signed", protocol="application/pgp-signature") 178 signed_message.attach(message) 179 180 signature_part = MIMEBase("application", "pgp-signature") 181 signature_part.set_payload(signature) 182 signed_message.attach(signature_part) 183 184 return signed_message 185 186 def decryptMessageText(self, text): 187 188 "Return a decrypted version of 'text'." 189 190 return self.run(["--decrypt"], text) 191 192 def decryptMessage(self, message): 193 194 """ 195 Decrypt the given RFC 3156 'message', returning the message text. 196 """ 197 198 try: 199 declaration, content = message.get_payload() 200 except ValueError: 201 raise GPGMissingPart 202 203 # Verify the message format. 204 205 if content.get_content_type() != "application/octet-stream": 206 raise GPGBadContent 207 208 # Return the decrypted message text. 209 210 return self.decryptMessageText(content.get_payload(decode=True)) 211 212 def encryptMessageText(self, text, keyid): 213 214 "Return an encrypted version of 'text' using the given 'keyid'." 215 216 return self.run(["--armor", "-r", keyid, "--encrypt", "--trust-model", "always"], text) 217 218 def encryptMessage(self, message, keyid): 219 220 """ 221 Return an encrypted version of 'message' using the given 'keyid'. 222 """ 223 224 text = as_string(message) 225 encrypted = self.encryptMessageText(text, keyid) 226 227 # Make the container for the message. 228 229 encrypted_message = MIMEMultipart("encrypted", protocol="application/pgp-encrypted") 230 231 # For encrypted content, add the declaration and content. 232 233 declaration = MIMEBase("application", "pgp-encrypted") 234 declaration.set_payload("Version: 1") 235 encrypted_message.attach(declaration) 236 237 content = MIMEApplication(encrypted, "octet-stream", encode_noop) 238 encrypted_message.attach(content) 239 240 return encrypted_message 241 242 def importKeys(self, text): 243 244 """ 245 Import the keys provided by the given 'text'. 246 """ 247 248 self.run(["--import"], text) 249 250 def exportKey(self, keyid): 251 252 """ 253 Return the "armoured" public key text for 'keyid' as a message part with 254 a suitable media type. 255 See: https://tools.ietf.org/html/rfc3156#section-7 256 """ 257 258 text = self.run(["--armor", "--export", keyid]) 259 return MIMEApplication(text, "pgp-keys", encode_noop) 260 261 def listKeys(self, keyid=None): 262 263 """ 264 Return a list of key details for keys on the keychain, selecting only 265 one specific key if 'keyid' is specified. 266 """ 267 268 text = self.run(["--list-keys", "--with-colons", "--with-fingerprint"] + 269 (keyid and ["0x%s" % keyid] or [])) 270 return self._getKeysFromResult(text) 271 272 def listSignatures(self, keyid=None): 273 274 """ 275 Return a list of key and signature details for keys on the keychain, 276 selecting only one specific key if 'keyid' is specified. 277 """ 278 279 text = self.run(["--list-sigs", "--with-colons", "--with-fingerprint"] + 280 (keyid and ["0x%s" % keyid] or [])) 281 return self._getKeysFromResult(text) 282 283 def getKeysFromMessagePart(self, part): 284 285 """ 286 Process an application/pgp-keys message 'part', returning a list of 287 key details. 288 """ 289 290 return self.getKeysFromString(part.get_payload(decode=True)) 291 292 def getKeysFromString(self, s): 293 294 """ 295 Return a list of key details extracted from the given key block string 296 's'. Signature information is also included through the use of the gpg 297 verbose option. 298 """ 299 300 text = self.run(["--with-colons", "--with-fingerprint", "-v"], s) 301 return self._getKeysFromResult(text) 302 303 def _getKeysFromResult(self, text): 304 305 """ 306 Return a list of key details extracted from the given command result 307 'text'. 308 """ 309 310 keys = [] 311 for line in text.split("\n"): 312 try: 313 recordtype, trust, keylength, algorithm, keyid, cdate, expdate, serial, ownertrust, _rest = line.split(":", 9) 314 except ValueError: 315 continue 316 317 if recordtype == "pub": 318 userid, _rest = _rest.split(":", 1) 319 keys.append({ 320 "type" : recordtype, "trust" : trust, "keylength" : keylength, 321 "algorithm" : algorithm, "keyid" : keyid, "cdate" : cdate, 322 "expdate" : expdate, "userid" : userid, "ownertrust" : ownertrust, 323 "fingerprint" : None, "subkeys" : [], "signatures" : [] 324 }) 325 elif recordtype == "sub" and keys: 326 keys[-1]["subkeys"].append({ 327 "trust" : trust, "keylength" : keylength, "algorithm" : algorithm, 328 "keyid" : keyid, "cdate" : cdate, "expdate" : expdate, 329 "ownertrust" : ownertrust 330 }) 331 elif recordtype == "fpr" and keys: 332 fingerprint, _rest = _rest.split(":", 1) 333 keys[-1]["fingerprint"] = fingerprint 334 elif recordtype == "sig" and keys: 335 userid, _rest = _rest.split(":", 1) 336 keys[-1]["signatures"].append({ 337 "keyid" : keyid, "cdate" : cdate, "expdate" : expdate, 338 "userid" : userid 339 }) 340 341 return keys 342 343 # Message serialisation functions, working around email module problems. 344 345 def as_string(message): 346 347 """ 348 Return the string representation of 'message', attempting to preserve the 349 precise original formatting. 350 """ 351 352 out = StringIO() 353 generator = Generator(out, False, 0) # disable reformatting measures 354 generator.flatten(message, linesep="\r\n") 355 return out.getvalue() 356 357 # Message decoding functions. 358 359 # Detect PGP/GPG-encoded payloads. 360 # See: http://tools.ietf.org/html/rfc3156 361 362 def is_signed(message): 363 mimetype = message.get_content_type() 364 encoding = message.get_content_charset() 365 366 return mimetype == "multipart/signed" and \ 367 message.get_param("protocol") == "application/pgp-signature" 368 369 def is_encrypted(message): 370 mimetype = message.get_content_type() 371 encoding = message.get_content_charset() 372 373 return mimetype == "multipart/encrypted" and \ 374 message.get_param("protocol") == "application/pgp-encrypted" 375 376 def getContentAndSignature(message): 377 378 """ 379 Return the content and signature parts of the given RFC 3156 'message'. 380 381 NOTE: RFC 3156 states that signed messages should employ a detached 382 NOTE: signature but then shows "BEGIN PGP MESSAGE" for signatures 383 NOTE: instead of "BEGIN PGP SIGNATURE". 384 NOTE: The "micalg" parameter is currently not supported. 385 """ 386 387 try: 388 content, signature = message.get_payload() 389 return content, signature 390 except ValueError: 391 raise GPGMissingPart 392 393 # vim: tabstop=4 expandtab shiftwidth=4