diff options
author | Konstantin Ryabitsev <konstantin@linuxfoundation.org> | 2021-05-05 11:49:55 -0400 |
---|---|---|
committer | Konstantin Ryabitsev <konstantin@linuxfoundation.org> | 2021-05-05 11:49:55 -0400 |
commit | d27e5a8000f3a8bc706e5d4fb1ae724d9dd34d05 (patch) | |
tree | 4a4a16203d0e0f9458d9e92c57434ad353580ac4 | |
parent | a70a054ad0c391a992f31b6dcecf5479a9a80b5a (diff) | |
download | patatt-d27e5a8000f3a8bc706e5d4fb1ae724d9dd34d05.tar.gz |
Some UI/usability improvements
Handle more errors and cache generated keyring when importing openpgp
keys.
Signed-off-by: Konstantin Ryabitsev <konstantin@linuxfoundation.org>
-rw-r--r-- | patatt/__init__.py | 97 | ||||
-rwxr-xr-x | sendemail-validate-hook | 11 |
2 files changed, 74 insertions, 34 deletions
diff --git a/patatt/__init__.py b/patatt/__init__.py index 3855014..39ed6f6 100644 --- a/patatt/__init__.py +++ b/patatt/__init__.py @@ -32,7 +32,7 @@ GPGBIN = 'gpg' # Hardcoded defaults DEVSIG_HDR = b'X-Developer-Signature' -SK_HDR = b'X-Developer-Key' +DEVKEY_HDR = b'X-Developer-Key' REQ_HDRS = [b'from', b'subject'] DEFAULT_CONFIG = { 'publickeypath': ['ref::.keys', 'ref::.local-keys'], @@ -282,16 +282,23 @@ class DevsigHeader: @staticmethod def _validate_openpgp(sigdata: bytes, pubkey: Optional[bytes]) -> Tuple[bytes, tuple]: + global KEYCACHE bsigdata = base64.b64decode(sigdata) vrfyargs = ['--verify', '--output', '-', '--status-fd=2'] if pubkey: with tempfile.TemporaryFile(suffix='.patch-attest-poc') as temp_keyring: - keyringargs = ['--no-default-keyring', f'--keyring={temp_keyring}'] - gpgargs = keyringargs + ['--status-fd=1', '--import'] - ecode, out, err = gpg_run_command(gpgargs, stdin=pubkey) - # look for IMPORT_OK - if out.find(b'[GNUPG:] IMPORT_OK') < 0: - raise ValidationError('Could not import GnuPG public key') + keyringargs = ['--no-default-keyring', f'--keyring={temp_keyring.name}'] + if pubkey in KEYCACHE: + logger.debug('Reusing cached keyring') + temp_keyring.write(KEYCACHE[pubkey]) + else: + logger.debug('Importing into new keyring') + gpgargs = keyringargs + ['--status-fd=1', '--import'] + ecode, out, err = gpg_run_command(gpgargs, stdin=pubkey) + # look for IMPORT_OK + if out.find(b'[GNUPG:] IMPORT_OK') < 0: + raise ValidationError('Could not import GnuPG public key') + KEYCACHE[pubkey] = temp_keyring.read() gpgargs = keyringargs + vrfyargs ecode, out, err = gpg_run_command(gpgargs, stdin=bsigdata) @@ -413,6 +420,10 @@ class PatattMessage: self.canon_headers.append(header) def sign(self, algo: str, keyinfo: Union[str, bytes], identity: Optional[str], selector: Optional[str]) -> None: + # Remove any devsig headers + for header in list(self.headers): + if header.startswith(DEVSIG_HDR) or header.startswith(DEVKEY_HDR): + self.headers.remove(header) self.git_canonicalize() ds = DevsigHeader() ds.set_headers(self.canon_headers) @@ -445,8 +456,8 @@ class PatattMessage: else: idata.append(b'pk=%s' % pkinfo) - skhdr = email.header.make_header([(SK_HDR + b': ' + b'; '.join(idata), 'us-ascii')], maxlinelen=78) - self.headers.append(skhdr.encode().encode() + self.lf) + dkhdr = email.header.make_header([(DEVKEY_HDR + b': ' + b'; '.join(idata), 'us-ascii')], maxlinelen=78) + self.headers.append(dkhdr.encode().encode() + self.lf) def validate(self, identity: str, pkey: Union[bytes, str, None], trim_body: bool = False) -> str: vds = None @@ -505,6 +516,9 @@ class PatattMessage: self.signed = True self.headers.append(line) + if not len(self.headers) or not len(self.body): + raise RuntimeError('Not a valid RFC2822 message') + def get_sigs(self) -> list: if self.sigs is not None: return self.sigs @@ -720,7 +734,7 @@ def _load_messages(cmdargs) -> dict: with open(msgfile, 'rb') as fh: messages[msgfile] = fh.read() else: - logger.critical('ERROR: Pipe a message to sign or pass filenames with individual messages') + logger.critical('E: Pipe a message to sign or pass filenames with individual messages') raise RuntimeError('Nothing to do') return messages @@ -741,18 +755,18 @@ def cmd_sign(cmdargs, config: dict) -> None: config['identity'] = usercfg.get('email') if not config.get('signingkey'): if usercfg.get('signingkey'): - logger.warning('NOTICE: Using pgp key %s defined by user.signingkey', usercfg.get('signingkey')) - logger.warning(' Override by setting patatt.signingkey') + logger.info('N: Using pgp key %s defined by user.signingkey', usercfg.get('signingkey')) + logger.info('N: Override by setting patatt.signingkey') config['signingkey'] = 'openpgp:%s' % usercfg.get('signingkey') else: - logger.critical('ERROR: patatt.signingkey is not set') - logger.critical(' Perhaps you need to run genkey first?') + logger.critical('E: patatt.signingkey is not set') + logger.critical('E: Perhaps you need to run genkey first?') sys.exit(1) try: messages = _load_messages(cmdargs) except IOError as ex: - logger.critical('ERROR: %s', ex) + logger.critical('E: %s', ex) sys.exit(1) sk = config.get('signingkey') @@ -777,10 +791,10 @@ def cmd_sign(cmdargs, config: dict) -> None: keysrc = skey if not keysrc: - logger.critical('ERROR: Could not find the key matching %s', identifier) + logger.critical('E: Could not find the key matching %s', identifier) sys.exit(1) - logger.info('Using ed25519 key: %s', keysrc) + logger.info('N: Using ed25519 key: %s', keysrc) with open(keysrc, 'r') as fh: keydata = fh.read() @@ -788,16 +802,12 @@ def cmd_sign(cmdargs, config: dict) -> None: algo = 'openpgp' keydata = sk[8:] else: - logger.critical('Unknown key type: %s', sk) + logger.critical('E: Unknown key type: %s', sk) sys.exit(1) for fn, msgdata in messages.items(): - pm = PatattMessage(msgdata) - if pm.signed: - logger.critical('Already signed: %s', fn) - continue - try: + pm = PatattMessage(msgdata) pm.sign(algo, keydata, identity=config.get('identity'), selector=config.get('selector')) logger.debug('--- SIGNED MESSAGE STARTS ---') logger.debug(pm.as_string()) @@ -807,10 +817,14 @@ def cmd_sign(cmdargs, config: dict) -> None: with open(fn, 'wb') as fh: fh.write(pm.as_bytes()) - logger.info('Signed: %s', fn) + logger.critical('SIGN: %s', os.path.basename(fn)) except SigningError as ex: - logger.critical('ERROR: %s', ex) + logger.critical('E: %s', ex) + sys.exit(1) + + except RuntimeError as ex: + logger.critical('E: %s: %s' % (fn, ex)) sys.exit(1) @@ -869,7 +883,7 @@ def cmd_validate(cmdargs, config: dict): try: messages = _load_messages(cmdargs) except IOError as ex: - logger.critical('ERROR: %s', ex) + logger.critical('E: %s', ex) sys.exit(1) ddir = get_data_dir() @@ -878,10 +892,11 @@ def cmd_validate(cmdargs, config: dict): if pdir not in sources: sources.append(pdir) - for filename, msgdata in messages.items(): + allgood = True + for fn, msgdata in messages.items(): try: goodsigs = validate_message(msgdata, sources) - logger.critical('PASS: %s', os.path.basename(filename)) + logger.critical('PASS: %s', os.path.basename(fn)) for identity, signtime, keysrc, algo in goodsigs: logger.info(' by : %s (%s)', identity, algo) if keysrc: @@ -890,8 +905,15 @@ def cmd_validate(cmdargs, config: dict): logger.info(' key: default keyring') except ValidationError as ex: - logger.critical('FAIL: %s', os.path.basename(filename)) + allgood = False + logger.critical('FAIL: %s', os.path.basename(fn)) logger.critical(' err: %s', ex) + except RuntimeError as ex: + allgood = False + logger.critical('E: %s: %s' % (fn, ex)) + + if not allgood: + sys.exit(1) def cmd_gen(cmdargs, config: dict) -> None: @@ -974,8 +996,8 @@ def command() -> None: description='Cryptographically attest patches before sending out', formatter_class=argparse.ArgumentDefaultsHelpFormatter ) - parser.add_argument('-q', '--quiet', action='store_true', default=False, - help='Only output errors to the stdout') + parser.add_argument('-v', '--verbose', action='store_true', default=False, + help='Be a bit more verbose') parser.add_argument('-d', '--debug', action='store_true', default=False, help='Show debugging output') parser.add_argument('-s', '--section', dest='section', default=None, @@ -984,6 +1006,8 @@ def command() -> None: subparsers = parser.add_subparsers(help='sub-command help', dest='subcmd') sp_sign = subparsers.add_parser('sign', help='Cryptographically attest an RFC2822 message') + sp_sign.add_argument('--hook', dest='hookmode', action='store_true', default=False, + help='Git hook mode') sp_sign.add_argument('msgfile', nargs='*', help='RFC2822 message files to sign') sp_sign.set_defaults(func=cmd_sign) @@ -1004,14 +1028,19 @@ def command() -> None: ch = logging.StreamHandler() formatter = logging.Formatter('%(message)s') + try: + if _args.hookmode: + formatter = logging.Formatter('patatt: %(message)s') + except AttributeError: + pass ch.setFormatter(formatter) - if _args.quiet: - ch.setLevel(logging.CRITICAL) + if _args.verbose: + ch.setLevel(logging.INFO) elif _args.debug: ch.setLevel(logging.DEBUG) else: - ch.setLevel(logging.INFO) + ch.setLevel(logging.CRITICAL) logger.addHandler(ch) config = get_config_from_git(r'patatt\..*', section=_args.section, defaults=DEFAULT_CONFIG) diff --git a/sendemail-validate-hook b/sendemail-validate-hook new file mode 100755 index 0000000..17281a6 --- /dev/null +++ b/sendemail-validate-hook @@ -0,0 +1,11 @@ +#!/usr/bin/env bash +if which patatt>/dev/null 2>&1; then + # We have it in path, so just execute it + patatt -q sign --hook "${1}" +else + # Assume we're symlinked into a git checkout + REAL_SCRIPT=$(realpath -e ${BASH_SOURCE[0]}) + SCRIPT_TOP="${SCRIPT_TOP:-$(dirname ${REAL_SCRIPT})}" + PATATT_TOP=$(realpath -e ${SCRIPT_TOP}/..) + exec env PYTHONPATH="${PATATT_TOP}" python3 "${PATATT_TOP}/patatt/__init__.py" sign --hook "${1}" +fi |