#!/usr/bin/env python3 # Copyright (C) 2020-2021 by the Linux Foundation # SPDX-License-Identifier: MIT-0 import sys import os import base64 import email.utils import email.header import re import subprocess import hashlib import urllib.parse import logging import tempfile from typing import Tuple, Optional DEVSIG_HDR = b'X-Developer-Signature' REQ_HDRS = [b'from', b'subject', b'date', b'message-id'] logger = logging.getLogger(__name__) def _run_command(cmdargs: list, stdin: bytes = None) -> Tuple[int, bytes, bytes]: sp = subprocess.Popen(cmdargs, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE) (output, error) = sp.communicate(input=stdin) return sp.returncode, output, error def gpg_run_command(cmdargs: list, stdin: bytes = None) -> Tuple[int, bytes, bytes]: gpgbin = 'gpg' cmdargs = [gpgbin, '--batch', '--no-auto-key-retrieve', '--no-auto-check-trustdb'] + cmdargs return _run_command(cmdargs, stdin) def check_gpg_status(status: bytes) -> Tuple[bool, bool, bool]: good = False valid = False trusted = False gs_matches = re.search(rb'^\[GNUPG:] GOODSIG ([0-9A-F]+)\s+(.*)$', status, flags=re.M) if gs_matches: good = True vs_matches = re.search(rb'^\[GNUPG:] VALIDSIG ([0-9A-F]+) (\d{4}-\d{2}-\d{2}) (\d+)', status, flags=re.M) if vs_matches: valid = True ts_matches = re.search(rb'^\[GNUPG:] TRUST_(FULLY|ULTIMATE)', status, flags=re.M) if ts_matches: trusted = True return good, valid, trusted def get_git_mailinfo(payload: bytes) -> Tuple[bytes, bytes, bytes]: with tempfile.TemporaryDirectory(suffix='.git-mailinfo') as td: mf = os.path.join(td, 'm') pf = os.path.join(td, 'p') cmdargs = ['git', 'mailinfo', '--encoding=utf-8', mf, pf] ecode, out, err = _run_command(cmdargs, stdin=payload) if ecode > 0: logger.critical('FAILED : Failed running git-mailinfo:') logger.critical(err.decode()) sys.exit(1) with open(mf, 'rb') as mfh: m = mfh.read() with open(pf, 'rb') as pfh: p = pfh.read() return m, p, out def load_message(msgfile: str) -> Tuple[list, bytes]: # we don't use python's email message because we don't want any processing # done on the contents that may result in a wrong hash being generated headers = list() payload = list() with open(msgfile, 'rb') as fh: logger.info('MSGSRC : %s', msgfile) in_payload = False while True: line = fh.readline() if not line: break # strip any trailing CRLF line = re.sub(rb'[\r\n]*$', b'', line) if in_payload: payload.append(line) continue if not len(line): in_payload = True continue # is it a wrapped header? if line[0] in ("\x09", "\x20", 0x09, 0x20): if not len(headers): # What? logger.critical('Not valid RFC2822 message') sys.exit(1) # attach it to the last header headers[-1] += b'\r\n' + line continue headers.append(line) return headers, b'\r\n'.join(payload) + b'\r\n' def get_mailinfo_message(oheaders: list, opayload: bytes, want_hdrs: list, maxlen: Optional[int]) -> Tuple[list, bytes]: # We pre-canonicalize using git mailinfo origmsg = b'\r\n'.join(oheaders) + b'\r\n\r\n' + opayload m, p, i = get_git_mailinfo(origmsg) # we don't use python's email message because we don't want any processing # done on the contents that may result in a wrong hash being generated # Generate a new payload using m and p and canonicalize with \r\n endings, # trimming any excess blank lines ("simple" DKIM canonicalization). cpayload = b'' for line in re.sub(rb'[\r\n]*$', b'', m + p).split(b'\n'): cpayload += re.sub(rb'[\r\n]*$', b'', line) + b'\r\n' if maxlen: logger.debug('Limiting payload length to %d bytes', maxlen) cpayload = cpayload[:maxlen] idata = dict() for line in re.sub(rb'[\r\n]*$', b'', i).split(b'\n'): left, right = line.split(b':', 1) idata[left.lower()] = right.strip() # Now substituting headers returned by mailinfo cheaders = list() for oheader in oheaders: left, right = oheader.split(b':', 1) lleft = left.lower() if lleft not in want_hdrs: continue if lleft == b'from': right = b' ' + idata.get(b'author', b'') + b' <' + idata.get(b'email', b'') + b'>' elif lleft == b'subject': right = b' ' + idata.get(b'subject', b'') elif lleft == b'date': right = b' ' + idata.get(b'date', b'') cheaders.append(left + b':' + right) return cheaders, cpayload def splitter(longstr: bytes, limit: int = 78) -> bytes: splitstr = list() first = True while len(longstr) > limit: at = limit if first: first = False at -= 2 splitstr.append(longstr[:at]) longstr = longstr[at:] splitstr.append(longstr) return b' '.join(splitstr) def get_git_toplevel(gitdir: str = None) -> str: cmdargs = ['git'] if gitdir: cmdargs += ['--git-dir', gitdir] cmdargs += ['rev-parse', '--show-toplevel'] ecode, out, err = _run_command(cmdargs) if ecode == 0: return out.decode().strip() return '' def get_parts_from_header(hval: bytes) -> dict: hval = re.sub(rb'\s*', b'', hval) hdata = dict() for chunk in hval.split(b';'): parts = chunk.split(b'=', 1) if len(parts) < 2: continue hdata[parts[0].decode()] = parts[1] return hdata def dkim_canonicalize_header(hval: bytes) -> bytes: # We only do relaxed for headers # o Unfold all header field continuation lines as described in # [RFC5322]; in particular, lines with terminators embedded in # continued header field values (that is, CRLF sequences followed by # WSP) MUST be interpreted without the CRLF. Implementations MUST # NOT remove the CRLF at the end of the header field value. hval = re.sub(rb'[\r\n]', b'', hval) # o Convert all sequences of one or more WSP characters to a single SP # character. WSP characters here include those before and after a # line folding boundary. hval = re.sub(rb'\s+', b' ', hval) # o Delete all WSP characters at the end of each unfolded header field # value. # o Delete any WSP characters remaining before and after the colon # separating the header field name from the header field value. The # colon separator MUST be retained. hval = hval.strip() + b'\r\n' return hval def get_public_key(source: str, keytype: str, identity: str, selector: str) -> Optional[bytes]: chunks = identity.split('@', 1) if len(chunks) != 2: logger.critical('identity must include both local and domain parts') sys.exit(1) local = chunks[0] domain = chunks[1] # urlencode all potentially untrusted bits to make sure nobody tries path-based badness keypath = os.path.join(urllib.parse.quote_plus(keytype), urllib.parse.quote_plus(domain), urllib.parse.quote_plus(local), urllib.parse.quote_plus(selector)) if source.find('ref:') == 0: gittop = get_git_toplevel() if not gittop: logger.critical('Not in a git tree, so cannot use a ref: source') sys.exit(1) # format is: ref:refspec:path # or it could omit the refspec, meaning "whatever the current ref" # but it should always have at least two ":" chunks = source.split(':', 2) if len(chunks) < 3: logger.critical('Invalid source: %s', source) logger.critical('Must have refspec and path, e.g.: ref:refs/heads/master:.keys') # grab the key from a fully ref'ed path ref = chunks[1] pathtop = chunks[2] subpath = os.path.join(pathtop, keypath) if not ref: # What is our current ref? cmdargs = ['git', 'symbolic-ref', 'HEAD'] ecode, out, err = _run_command(cmdargs) if ecode == 0: ref = out.decode().strip() cmdargs = ['git'] cmdargs += ['show', f'{ref}:{subpath}'] ecode, out, err = _run_command(cmdargs) if ecode == 0: logger.info('KEYSRC : %s:%s', ref, subpath) return out # Does it exist on disk in gittop? fullpath = os.path.join(gittop, subpath) if os.path.exists(fullpath): with open(fullpath, 'rb') as fh: logger.info('KEYSRC : %s', fullpath) return fh.read() logger.info('Could not find %s in %s', subpath, ref) # This is not a critical error for PGP return None # It's a direct path, then fullpath = os.path.join(source, keypath) if os.path.exists(fullpath): with open(fullpath, 'rb') as fh: logger.info('Loaded key from %s', fullpath) return fh.read() # This is not a critical error for PGP logger.info('Could not find %s', fullpath) return None def make_devsig_header(headers: list, payload: bytes, algo: str, identity: Optional[str] = None, selector: Optional[str] = None, maxlen: Optional[int] = None, want_hdrs: Optional[list] = None) -> Tuple[bytes, bytes]: if not want_hdrs: want_hdrs = REQ_HDRS cheaders, cpayload = get_mailinfo_message(headers, payload, want_hdrs, maxlen) hashed = hashlib.sha256() hashed.update(cpayload) bh = base64.b64encode(hashed.digest()) hparts = [ b'v=1', b'a=%s-sha256' % algo.encode(), ] if identity: hparts.append(b'i=%s' % identity.encode()) if selector: hparts.append(b's=%s' % selector.encode()) hparts.append(b'h=%s' % b':'.join(want_hdrs)) hparts.append(b'l=%d' % len(cpayload)) hparts.append(b'bh=%s' % bh) hparts.append(b'b=') dshval = b'; '.join(hparts) hashed = hashlib.sha256() for cheader in cheaders: left, right = cheader.split(b':', 1) hname = left.strip().lower() if hname not in want_hdrs: continue hashed.update(hname + b':' + dkim_canonicalize_header(right)) hashed.update(DEVSIG_HDR.lower() + b':' + dshval) dshdr = DEVSIG_HDR + b': ' + dshval return dshdr, hashed.digest() def get_devsig_header_info(headers) -> Tuple[str, str, str, list, dict]: from_hdr = None hdata = None need_hdrs = [b'from', DEVSIG_HDR.lower()] for header in headers: left, right = header.split(b':', 1) hname = left.strip().lower() # We want a "from" header and a DEVSIG_HDR if hname not in need_hdrs: continue if hname == b'from': from_hdr = right continue hval = dkim_canonicalize_header(right) hdata = get_parts_from_header(hval) if hdata is None: logger.critical('FAILED : No "%s:" header in message', DEVSIG_HDR.decode()) sys.exit(1) # make sure the required headers are in the sig if 'h' not in hdata: logger.critical('FAILED : h= is required but is not present in %s', DEVSIG_HDR) sys.exit(1) signed_hdrs = [x.strip() for x in hdata['h'].split(b':')] for rhdr in REQ_HDRS: if rhdr not in signed_hdrs: logger.critical('FAILED : %s is a required header', rhdr.decode()) sys.exit(1) if 'i' not in hdata: # Use the identity from the from header if not from_hdr: logger.critical('FAILED : No i= in %s, and no From: header!', DEVSIG_HDR) sys.exit(1) parts = email.utils.parseaddr(from_hdr.decode()) identity = parts[1] else: identity = hdata['i'] if 'a' in hdata: apart = hdata['a'].decode() if apart.startswith('ed25519'): algo = 'ed25519' elif apart.startswith('openpgp'): algo = 'openpgp' else: logger.critical('FAILED : Unsupported a= in %s: %s', DEVSIG_HDR, apart) sys.exit(1) else: # Default is ed25519-sha256 algo = 'ed25519' if 's' in hdata: selector = hdata['s'].decode() else: selector = 'default' return identity, selector, algo, signed_hdrs, hdata def cmd_sign_ed25519(cmdargs) -> None: from nacl.signing import SigningKey from nacl.encoding import Base64Encoder logger.info('SIGNING : ED25519 using %s', cmdargs.privkey) headers, payload = load_message(cmdargs.message) dshdr, digest = make_devsig_header(headers, payload, algo='ed25519', selector=cmdargs.selector) try: with open(cmdargs.privkey, 'r') as fh: sk = SigningKey(fh.read(), encoder=Base64Encoder) except IOError: logger.critical('Could not open %s', cmdargs.privkey) sys.exit(1) bdata = sk.sign(digest, encoder=Base64Encoder) hhdr = email.header.make_header([(dshdr + splitter(bdata), 'us-ascii')], maxlinelen=78) dshdr = hhdr.encode().encode() headers.append(dshdr) signed = b'\r\n'.join(headers) + b'\r\n\r\n' + payload logger.info('--- SIGNED MESSAGE STARTS ---') sys.stdout.buffer.write(signed) def verify_ed25519(sigdata: bytes, pk: bytes) -> Optional[bytes]: from nacl.signing import VerifyKey from nacl.encoding import Base64Encoder from nacl.exceptions import BadSignatureError vk = VerifyKey(pk, encoder=Base64Encoder) try: return vk.verify(sigdata, encoder=Base64Encoder) except BadSignatureError: return None def cmd_sign_pgp(cmdargs) -> None: logger.info('SIGNING : PGP using %s', cmdargs.usekey) headers, payload = load_message(cmdargs.message) dshdr, digest = make_devsig_header(headers, payload, algo='openpgp', selector=cmdargs.selector) gpgargs = ['-s', '-u', cmdargs.usekey] ecode, out, err = gpg_run_command(gpgargs, digest) if ecode > 0: logger.critical('Running gpg failed') logger.critical(err.decode()) sys.exit(ecode) bdata = base64.b64encode(out) hhdr = email.header.make_header([(dshdr + splitter(bdata), 'us-ascii')], maxlinelen=78) dshdr = hhdr.encode().encode() headers.append(dshdr) signed = b'\r\n'.join(headers) + b'\r\n\r\n' + payload logger.info('--- SIGNED MESSAGE STARTS ---') sys.stdout.buffer.write(signed) def verify_openpgp(sigdata: bytes, pk: Optional[bytes]) -> Optional[bytes]: bsigdata = base64.b64decode(sigdata) vrfyargs = ['--verify', '--output', '-', '--status-fd=2'] if pk is not None: with tempfile.TemporaryFile(suffix='.patch-attest-poc') as temp_keyring: keyringargs = ['--no-default-keyring', f'--keyring={temp_keyring}'] gpgargs = keyringargs + ['--status-fd=1', '--import'] ecode, out, err = gpg_run_command(gpgargs, stdin=pk) # look for IMPORT_OK if out.find(b'[GNUPG:] IMPORT_OK') < 0: logger.critical('Could not import public key!') return None gpgargs = keyringargs + vrfyargs ecode, out, err = gpg_run_command(gpgargs, stdin=bsigdata) if ecode > 0: logger.critical('FAILED : Failed to verify PGP signature') return None good, valid, trusted = check_gpg_status(err) if good and valid: return out logger.critical('FAILED : Failed to verify PGP signature') return None logger.info('Verifying using default keyring') ecode, out, err = gpg_run_command(vrfyargs, stdin=bsigdata) if ecode > 0: logger.critical('FAILED : Failed to verify PGP signature') return None good, valid, trusted = check_gpg_status(err) if good and valid: if not trusted: logger.warning('WARNING : Insufficient trust on the key') return out logger.critical('FAILED : Failed to verify PGP signature') return None def cmd_verify(cmdargs): headers, payload = load_message(cmdargs.message) identity, selector, algo, signed_hdrs, hdata = get_devsig_header_info(headers) # Check if we have this private key pk = get_public_key(cmdargs.keypath, algo, identity, selector) sdigest = None if algo == 'ed25519': if not pk: sys.exit(1) sdigest = verify_ed25519(hdata['b'], pk) elif algo == 'openpgp': sdigest = verify_openpgp(hdata['b'], pk) if not sdigest: logger.critical('Faled to verify signature!') sys.exit(1) # Now calculate our own digest and compare dshdr, digest = make_devsig_header(headers, payload, algo, identity=hdata.get('i', b'').decode(), selector=hdata.get('s', b'').decode(), want_hdrs=signed_hdrs) success = False if sdigest != digest: # Try to limit the payload to just the number of bytes specified in the sig header try: maxlen = int(hdata.get('l', b'0')) if maxlen: dshdr, digest = make_devsig_header(headers, payload, algo, identity=hdata.get('i', b''), selector=hdata.get('s', b''), maxlen=maxlen, want_hdrs=signed_hdrs) if sdigest == digest: logger.warning('WARNING : Succeeded after trimming payload; the following content was discarded:') for line in payload[maxlen:].strip().split(b'\n'): sys.stderr.buffer.write(b' : %s\n' % line) success = True except ValueError: pass else: success = True if success: logger.info('SUCCESS : Signature and content hashes verified') return logger.critical('FAILED : Failed to verify signature') sys.exit(1) def cmd_gen_ed25519(cmdargs): from nacl.signing import SigningKey logger.info('Generating: new ED25519 key') newkey = SigningKey.generate() with open(cmdargs.output + '.key', 'wb') as fh: fh.write(base64.b64encode(bytes(newkey))) logger.info('Wrote: %s.key', cmdargs.output) with open(cmdargs.output + '.pub', 'wb') as fh: fh.write(base64.b64encode(bytes(newkey.verify_key))) logger.info('Wrote: %s.pub', cmdargs.output) sys.exit(0) if __name__ == '__main__': import argparse # noinspection PyTypeChecker parser = argparse.ArgumentParser( prog='main', description='A proof of concept tool for header-based email patch attestation', formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser.add_argument('-m', '--message', default='emails/dev-unsigned.eml', help='File with the message to work with') parser.add_argument('-v', '--verbose', default=False, help='Print extra debugging output') subparsers = parser.add_subparsers(help='sub-command help', dest='subcmd') # sign-pgp sp_spgp = subparsers.add_parser('sign-pgp', help='Sign with PGP key') sp_spgp.add_argument('-k', '--usekey', default='AAAABBBBCCCCDDDD') sp_spgp.add_argument('-s', '--selector') sp_spgp.set_defaults(func=cmd_sign_pgp) # sign-ed25519 sp_sed25519 = subparsers.add_parser('sign-ed25519', help='Sign with an ed25519 key') sp_sed25519.add_argument('-k', '--privkey', default='dev.key') sp_sed25519.add_argument('-s', '--selector') sp_sed25519.set_defaults(func=cmd_sign_ed25519) # gen-ed25519 sp_gened25519 = subparsers.add_parser('gen-ed25519', help='Generate an ed25519 keypair') sp_gened25519.add_argument('-o', '--output', default='new_ed25519') sp_gened25519.set_defaults(func=cmd_gen_ed25519) # verify sp_verify = subparsers.add_parser('verify', help='Verify a signed message') sp_verify.add_argument('-p', '--keypath', default='ref:refs/heads/master:.keys') sp_verify.set_defaults(func=cmd_verify) args = parser.parse_args() logger.setLevel(logging.DEBUG) ch = logging.StreamHandler() formatter = logging.Formatter('%(message)s') ch.setFormatter(formatter) if args.verbose: ch.setLevel(logging.DEBUG) else: ch.setLevel(logging.INFO) logger.addHandler(ch) if 'func' not in args: parser.print_help() sys.exit(1) args.func(args)