diff options
author | Konstantin Ryabitsev <konstantin@linuxfoundation.org> | 2021-05-05 10:01:04 -0400 |
---|---|---|
committer | Konstantin Ryabitsev <konstantin@linuxfoundation.org> | 2021-05-05 10:01:04 -0400 |
commit | 29e8e42d5b1c98e2975e1d49cf693568ebeca2a2 (patch) | |
tree | 5459a1bb93a722f607c89c9ae32d9a1279f24717 | |
parent | 30503532619445f808184a286351fefc4e7f9980 (diff) | |
download | patatt-29e8e42d5b1c98e2975e1d49cf693568ebeca2a2.tar.gz |
Add X-Developer-Key header
Let's include public key information in the header as well, for
informational purposes. Obviously, we won't use it for validation, but
since most of these messages will be logged in public-inbox, it provides
a nice trail of historical key usage data.
Additionally, we may want to implement some kind of trust-on-first-use
setup in the future, and this allows us easy access to key information
in every message.
The header is intentionally unsigned.
Signed-off-by: Konstantin Ryabitsev <konstantin@linuxfoundation.org>
-rw-r--r-- | patatt/__init__.py | 83 |
1 files changed, 64 insertions, 19 deletions
diff --git a/patatt/__init__.py b/patatt/__init__.py index 78393ed..5981a27 100644 --- a/patatt/__init__.py +++ b/patatt/__init__.py @@ -32,15 +32,19 @@ GPGBIN = 'gpg' # Hardcoded defaults DEVSIG_HDR = b'X-Developer-Signature' +SK_HDR = b'X-Developer-Key' REQ_HDRS = [b'from', b'subject'] DEFAULT_CONFIG = { 'publickeypath': ['ref::.keys', 'ref::.local-keys'], 'gpgusedefaultkeyring': 'yes', } +# Quick cache for key info +KEYCACHE = dict() + # My version __VERSION__ = '0.1.0' -MAX_SUPPORTED_VERSION = 1 +MAX_SUPPORTED_FORMAT_VERSION = 1 class SigningError(Exception): @@ -205,25 +209,33 @@ class DevsigHeader: digest = hashed.digest() if algo.startswith('ed25519'): - bval = DevsigHeader._sign_ed25519(digest, keyinfo) + bval, pkinfo = DevsigHeader._sign_ed25519(digest, keyinfo) elif algo.startswith('openpgp'): - bval = DevsigHeader._sign_openpgp(digest, keyinfo) + bval, pkinfo = DevsigHeader._sign_openpgp(digest, keyinfo) else: raise RuntimeError('Unknown a=%s' % algo) if split: - return DEVSIG_HDR, dshval + DevsigHeader.splitter(bval) + return dshval + DevsigHeader.splitter(bval), pkinfo - return DEVSIG_HDR, dshval + bval + return dshval + bval, pkinfo @staticmethod - def _sign_ed25519(payload: bytes, privkey: bytes) -> bytes: + def _sign_ed25519(payload: bytes, privkey: bytes) -> Tuple[bytes, bytes]: + global KEYCACHE from nacl.signing import SigningKey from nacl.encoding import Base64Encoder - sk = SigningKey(privkey, encoder=Base64Encoder) + if privkey not in KEYCACHE: + sk = SigningKey(privkey, encoder=Base64Encoder) + vk = base64.b64encode(sk.verify_key.encode()) + KEYCACHE[privkey] = (sk, vk) + else: + sk, vk = KEYCACHE[privkey] + bdata = sk.sign(payload, encoder=Base64Encoder) - return bdata + + return bdata, vk @staticmethod def _validate_ed25519(sigdata: bytes, pubkey: bytes) -> bytes: @@ -238,15 +250,35 @@ class DevsigHeader: raise ValidationError('Failed to validate signature') @staticmethod - def _sign_openpgp(payload: bytes, keyid: Optional[bytes]) -> bytes: - gpgargs = ['-s'] - if keyid: - gpgargs += ['-u', keyid] + def _sign_openpgp(payload: bytes, keyid: bytes) -> Tuple[bytes, bytes]: + global KEYCACHE + gpgargs = ['-s', '-u', keyid] ecode, out, err = gpg_run_command(gpgargs, payload) if ecode > 0: raise SigningError('Running gpg failed', errors=err.decode().split('\n')) bdata = base64.b64encode(out) - return bdata + # Now get the fingerprint of this keyid + if keyid not in KEYCACHE: + gpgargs = ['--with-colons', '--fingerprint', keyid] + ecode, out, err = gpg_run_command(gpgargs) + if ecode > 0: + raise SigningError('Running gpg failed', errors=err.decode().split('\n')) + pkid = None + keyfp = None + for line in out.split(b'\n'): + if line.startswith(b'pub:'): + fields = line.split(b':') + pkid = fields[4] + elif line.startswith(b'fpr:') and pkid: + fields = line.split(b':') + if fields[9].find(pkid) > 0: + keyfp = fields[9] + break + KEYCACHE[keyid] = keyfp + else: + keyfp = KEYCACHE[keyid] + + return bdata, keyfp @staticmethod def _validate_openpgp(sigdata: bytes, pubkey: Optional[bytes]) -> Tuple[bytes, tuple]: @@ -398,10 +430,23 @@ class PatattMessage: if algo == 'ed25519': # Set signing time for ed25519 sigs ds.set_field('t', str(int(time.time()))) - hn, hv = ds.sign(keyinfo) + hv, pkinfo = ds.sign(keyinfo) + + dshdr = email.header.make_header([(DEVSIG_HDR + b': ' + hv, 'us-ascii')], maxlinelen=78) + self.headers.append(dshdr.encode().encode() + self.lf) + + # Make informational header about the key used + idata = [ + b'i=%s' % identity.encode(), + b'a=%s' % algo.encode(), + ] + if algo == 'openpgp': + idata.append(b'fpr=%s' % pkinfo) + else: + idata.append(b'pk=%s' % pkinfo) - hhdr = email.header.make_header([(hn + b': ' + hv, 'us-ascii')], maxlinelen=78) - self.headers.append(hhdr.encode().encode() + self.lf) + skhdr = email.header.make_header([(SK_HDR + b': ' + b'; '.join(idata), 'us-ascii')], maxlinelen=78) + self.headers.append(skhdr.encode().encode() + self.lf) def validate(self, identity: str, pkey: Union[bytes, str, None], trim_body: bool = False) -> str: vds = None @@ -883,7 +928,7 @@ def cmd_gen(cmdargs, config: dict) -> None: logger.info('Wrote: %s', skey) with open(pkey, 'wb') as fh: - fh.write(base64.b64encode(bytes(newkey.verify_key))) + fh.write(base64.b64encode(newkey.verify_key.encode())) logger.info('Wrote: %s', pkey) # Also copy it into our local keyring @@ -891,12 +936,12 @@ def cmd_gen(cmdargs, config: dict) -> None: Path(os.path.dirname(dpkey)).mkdir(parents=True, exist_ok=True) if not os.path.exists(dpkey): with open(dpkey, 'wb') as fh: - fh.write(base64.b64encode(bytes(newkey.verify_key))) + fh.write(base64.b64encode(newkey.verify_key.encode())) logger.info('Wrote: %s', dpkey) else: spkey = os.path.join(pdir, make_pkey_path('ed25519', config.get('identity'), identifier)) with open(spkey, 'wb') as fh: - fh.write(base64.b64encode(bytes(newkey.verify_key))) + fh.write(base64.b64encode(newkey.verify_key.encode())) logger.info('Wrote: %s', spkey) logger.info('Add the following to your .git/config (or global ~/.gitconfig):') |