aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKonstantin Ryabitsev <konstantin@linuxfoundation.org>2021-05-05 11:49:55 -0400
committerKonstantin Ryabitsev <konstantin@linuxfoundation.org>2021-05-05 11:49:55 -0400
commitd27e5a8000f3a8bc706e5d4fb1ae724d9dd34d05 (patch)
tree4a4a16203d0e0f9458d9e92c57434ad353580ac4
parenta70a054ad0c391a992f31b6dcecf5479a9a80b5a (diff)
downloadpatatt-d27e5a8000f3a8bc706e5d4fb1ae724d9dd34d05.tar.gz
Some UI/usability improvements
Handle more errors and cache generated keyring when importing openpgp keys. Signed-off-by: Konstantin Ryabitsev <konstantin@linuxfoundation.org>
-rw-r--r--patatt/__init__.py97
-rwxr-xr-xsendemail-validate-hook11
2 files changed, 74 insertions, 34 deletions
diff --git a/patatt/__init__.py b/patatt/__init__.py
index 3855014..39ed6f6 100644
--- a/patatt/__init__.py
+++ b/patatt/__init__.py
@@ -32,7 +32,7 @@ GPGBIN = 'gpg'
# Hardcoded defaults
DEVSIG_HDR = b'X-Developer-Signature'
-SK_HDR = b'X-Developer-Key'
+DEVKEY_HDR = b'X-Developer-Key'
REQ_HDRS = [b'from', b'subject']
DEFAULT_CONFIG = {
'publickeypath': ['ref::.keys', 'ref::.local-keys'],
@@ -282,16 +282,23 @@ class DevsigHeader:
@staticmethod
def _validate_openpgp(sigdata: bytes, pubkey: Optional[bytes]) -> Tuple[bytes, tuple]:
+ global KEYCACHE
bsigdata = base64.b64decode(sigdata)
vrfyargs = ['--verify', '--output', '-', '--status-fd=2']
if pubkey:
with tempfile.TemporaryFile(suffix='.patch-attest-poc') as temp_keyring:
- keyringargs = ['--no-default-keyring', f'--keyring={temp_keyring}']
- gpgargs = keyringargs + ['--status-fd=1', '--import']
- ecode, out, err = gpg_run_command(gpgargs, stdin=pubkey)
- # look for IMPORT_OK
- if out.find(b'[GNUPG:] IMPORT_OK') < 0:
- raise ValidationError('Could not import GnuPG public key')
+ keyringargs = ['--no-default-keyring', f'--keyring={temp_keyring.name}']
+ if pubkey in KEYCACHE:
+ logger.debug('Reusing cached keyring')
+ temp_keyring.write(KEYCACHE[pubkey])
+ else:
+ logger.debug('Importing into new keyring')
+ gpgargs = keyringargs + ['--status-fd=1', '--import']
+ ecode, out, err = gpg_run_command(gpgargs, stdin=pubkey)
+ # look for IMPORT_OK
+ if out.find(b'[GNUPG:] IMPORT_OK') < 0:
+ raise ValidationError('Could not import GnuPG public key')
+ KEYCACHE[pubkey] = temp_keyring.read()
gpgargs = keyringargs + vrfyargs
ecode, out, err = gpg_run_command(gpgargs, stdin=bsigdata)
@@ -413,6 +420,10 @@ class PatattMessage:
self.canon_headers.append(header)
def sign(self, algo: str, keyinfo: Union[str, bytes], identity: Optional[str], selector: Optional[str]) -> None:
+ # Remove any devsig headers
+ for header in list(self.headers):
+ if header.startswith(DEVSIG_HDR) or header.startswith(DEVKEY_HDR):
+ self.headers.remove(header)
self.git_canonicalize()
ds = DevsigHeader()
ds.set_headers(self.canon_headers)
@@ -445,8 +456,8 @@ class PatattMessage:
else:
idata.append(b'pk=%s' % pkinfo)
- skhdr = email.header.make_header([(SK_HDR + b': ' + b'; '.join(idata), 'us-ascii')], maxlinelen=78)
- self.headers.append(skhdr.encode().encode() + self.lf)
+ dkhdr = email.header.make_header([(DEVKEY_HDR + b': ' + b'; '.join(idata), 'us-ascii')], maxlinelen=78)
+ self.headers.append(dkhdr.encode().encode() + self.lf)
def validate(self, identity: str, pkey: Union[bytes, str, None], trim_body: bool = False) -> str:
vds = None
@@ -505,6 +516,9 @@ class PatattMessage:
self.signed = True
self.headers.append(line)
+ if not len(self.headers) or not len(self.body):
+ raise RuntimeError('Not a valid RFC2822 message')
+
def get_sigs(self) -> list:
if self.sigs is not None:
return self.sigs
@@ -720,7 +734,7 @@ def _load_messages(cmdargs) -> dict:
with open(msgfile, 'rb') as fh:
messages[msgfile] = fh.read()
else:
- logger.critical('ERROR: Pipe a message to sign or pass filenames with individual messages')
+ logger.critical('E: Pipe a message to sign or pass filenames with individual messages')
raise RuntimeError('Nothing to do')
return messages
@@ -741,18 +755,18 @@ def cmd_sign(cmdargs, config: dict) -> None:
config['identity'] = usercfg.get('email')
if not config.get('signingkey'):
if usercfg.get('signingkey'):
- logger.warning('NOTICE: Using pgp key %s defined by user.signingkey', usercfg.get('signingkey'))
- logger.warning(' Override by setting patatt.signingkey')
+ logger.info('N: Using pgp key %s defined by user.signingkey', usercfg.get('signingkey'))
+ logger.info('N: Override by setting patatt.signingkey')
config['signingkey'] = 'openpgp:%s' % usercfg.get('signingkey')
else:
- logger.critical('ERROR: patatt.signingkey is not set')
- logger.critical(' Perhaps you need to run genkey first?')
+ logger.critical('E: patatt.signingkey is not set')
+ logger.critical('E: Perhaps you need to run genkey first?')
sys.exit(1)
try:
messages = _load_messages(cmdargs)
except IOError as ex:
- logger.critical('ERROR: %s', ex)
+ logger.critical('E: %s', ex)
sys.exit(1)
sk = config.get('signingkey')
@@ -777,10 +791,10 @@ def cmd_sign(cmdargs, config: dict) -> None:
keysrc = skey
if not keysrc:
- logger.critical('ERROR: Could not find the key matching %s', identifier)
+ logger.critical('E: Could not find the key matching %s', identifier)
sys.exit(1)
- logger.info('Using ed25519 key: %s', keysrc)
+ logger.info('N: Using ed25519 key: %s', keysrc)
with open(keysrc, 'r') as fh:
keydata = fh.read()
@@ -788,16 +802,12 @@ def cmd_sign(cmdargs, config: dict) -> None:
algo = 'openpgp'
keydata = sk[8:]
else:
- logger.critical('Unknown key type: %s', sk)
+ logger.critical('E: Unknown key type: %s', sk)
sys.exit(1)
for fn, msgdata in messages.items():
- pm = PatattMessage(msgdata)
- if pm.signed:
- logger.critical('Already signed: %s', fn)
- continue
-
try:
+ pm = PatattMessage(msgdata)
pm.sign(algo, keydata, identity=config.get('identity'), selector=config.get('selector'))
logger.debug('--- SIGNED MESSAGE STARTS ---')
logger.debug(pm.as_string())
@@ -807,10 +817,14 @@ def cmd_sign(cmdargs, config: dict) -> None:
with open(fn, 'wb') as fh:
fh.write(pm.as_bytes())
- logger.info('Signed: %s', fn)
+ logger.critical('SIGN: %s', os.path.basename(fn))
except SigningError as ex:
- logger.critical('ERROR: %s', ex)
+ logger.critical('E: %s', ex)
+ sys.exit(1)
+
+ except RuntimeError as ex:
+ logger.critical('E: %s: %s' % (fn, ex))
sys.exit(1)
@@ -869,7 +883,7 @@ def cmd_validate(cmdargs, config: dict):
try:
messages = _load_messages(cmdargs)
except IOError as ex:
- logger.critical('ERROR: %s', ex)
+ logger.critical('E: %s', ex)
sys.exit(1)
ddir = get_data_dir()
@@ -878,10 +892,11 @@ def cmd_validate(cmdargs, config: dict):
if pdir not in sources:
sources.append(pdir)
- for filename, msgdata in messages.items():
+ allgood = True
+ for fn, msgdata in messages.items():
try:
goodsigs = validate_message(msgdata, sources)
- logger.critical('PASS: %s', os.path.basename(filename))
+ logger.critical('PASS: %s', os.path.basename(fn))
for identity, signtime, keysrc, algo in goodsigs:
logger.info(' by : %s (%s)', identity, algo)
if keysrc:
@@ -890,8 +905,15 @@ def cmd_validate(cmdargs, config: dict):
logger.info(' key: default keyring')
except ValidationError as ex:
- logger.critical('FAIL: %s', os.path.basename(filename))
+ allgood = False
+ logger.critical('FAIL: %s', os.path.basename(fn))
logger.critical(' err: %s', ex)
+ except RuntimeError as ex:
+ allgood = False
+ logger.critical('E: %s: %s' % (fn, ex))
+
+ if not allgood:
+ sys.exit(1)
def cmd_gen(cmdargs, config: dict) -> None:
@@ -974,8 +996,8 @@ def command() -> None:
description='Cryptographically attest patches before sending out',
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
- parser.add_argument('-q', '--quiet', action='store_true', default=False,
- help='Only output errors to the stdout')
+ parser.add_argument('-v', '--verbose', action='store_true', default=False,
+ help='Be a bit more verbose')
parser.add_argument('-d', '--debug', action='store_true', default=False,
help='Show debugging output')
parser.add_argument('-s', '--section', dest='section', default=None,
@@ -984,6 +1006,8 @@ def command() -> None:
subparsers = parser.add_subparsers(help='sub-command help', dest='subcmd')
sp_sign = subparsers.add_parser('sign', help='Cryptographically attest an RFC2822 message')
+ sp_sign.add_argument('--hook', dest='hookmode', action='store_true', default=False,
+ help='Git hook mode')
sp_sign.add_argument('msgfile', nargs='*', help='RFC2822 message files to sign')
sp_sign.set_defaults(func=cmd_sign)
@@ -1004,14 +1028,19 @@ def command() -> None:
ch = logging.StreamHandler()
formatter = logging.Formatter('%(message)s')
+ try:
+ if _args.hookmode:
+ formatter = logging.Formatter('patatt: %(message)s')
+ except AttributeError:
+ pass
ch.setFormatter(formatter)
- if _args.quiet:
- ch.setLevel(logging.CRITICAL)
+ if _args.verbose:
+ ch.setLevel(logging.INFO)
elif _args.debug:
ch.setLevel(logging.DEBUG)
else:
- ch.setLevel(logging.INFO)
+ ch.setLevel(logging.CRITICAL)
logger.addHandler(ch)
config = get_config_from_git(r'patatt\..*', section=_args.section, defaults=DEFAULT_CONFIG)
diff --git a/sendemail-validate-hook b/sendemail-validate-hook
new file mode 100755
index 0000000..17281a6
--- /dev/null
+++ b/sendemail-validate-hook
@@ -0,0 +1,11 @@
+#!/usr/bin/env bash
+if which patatt>/dev/null 2>&1; then
+ # We have it in path, so just execute it
+ patatt -q sign --hook "${1}"
+else
+ # Assume we're symlinked into a git checkout
+ REAL_SCRIPT=$(realpath -e ${BASH_SOURCE[0]})
+ SCRIPT_TOP="${SCRIPT_TOP:-$(dirname ${REAL_SCRIPT})}"
+ PATATT_TOP=$(realpath -e ${SCRIPT_TOP}/..)
+ exec env PYTHONPATH="${PATATT_TOP}" python3 "${PATATT_TOP}/patatt/__init__.py" sign --hook "${1}"
+fi