aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKonstantin Ryabitsev <konstantin@linuxfoundation.org>2021-05-05 10:01:04 -0400
committerKonstantin Ryabitsev <konstantin@linuxfoundation.org>2021-05-05 10:01:04 -0400
commit29e8e42d5b1c98e2975e1d49cf693568ebeca2a2 (patch)
tree5459a1bb93a722f607c89c9ae32d9a1279f24717
parent30503532619445f808184a286351fefc4e7f9980 (diff)
downloadpatatt-29e8e42d5b1c98e2975e1d49cf693568ebeca2a2.tar.gz
Add X-Developer-Key header
Let's include public key information in the header as well, for informational purposes. Obviously, we won't use it for validation, but since most of these messages will be logged in public-inbox, it provides a nice trail of historical key usage data. Additionally, we may want to implement some kind of trust-on-first-use setup in the future, and this allows us easy access to key information in every message. The header is intentionally unsigned. Signed-off-by: Konstantin Ryabitsev <konstantin@linuxfoundation.org>
-rw-r--r--patatt/__init__.py83
1 files changed, 64 insertions, 19 deletions
diff --git a/patatt/__init__.py b/patatt/__init__.py
index 78393ed..5981a27 100644
--- a/patatt/__init__.py
+++ b/patatt/__init__.py
@@ -32,15 +32,19 @@ GPGBIN = 'gpg'
# Hardcoded defaults
DEVSIG_HDR = b'X-Developer-Signature'
+SK_HDR = b'X-Developer-Key'
REQ_HDRS = [b'from', b'subject']
DEFAULT_CONFIG = {
'publickeypath': ['ref::.keys', 'ref::.local-keys'],
'gpgusedefaultkeyring': 'yes',
}
+# Quick cache for key info
+KEYCACHE = dict()
+
# My version
__VERSION__ = '0.1.0'
-MAX_SUPPORTED_VERSION = 1
+MAX_SUPPORTED_FORMAT_VERSION = 1
class SigningError(Exception):
@@ -205,25 +209,33 @@ class DevsigHeader:
digest = hashed.digest()
if algo.startswith('ed25519'):
- bval = DevsigHeader._sign_ed25519(digest, keyinfo)
+ bval, pkinfo = DevsigHeader._sign_ed25519(digest, keyinfo)
elif algo.startswith('openpgp'):
- bval = DevsigHeader._sign_openpgp(digest, keyinfo)
+ bval, pkinfo = DevsigHeader._sign_openpgp(digest, keyinfo)
else:
raise RuntimeError('Unknown a=%s' % algo)
if split:
- return DEVSIG_HDR, dshval + DevsigHeader.splitter(bval)
+ return dshval + DevsigHeader.splitter(bval), pkinfo
- return DEVSIG_HDR, dshval + bval
+ return dshval + bval, pkinfo
@staticmethod
- def _sign_ed25519(payload: bytes, privkey: bytes) -> bytes:
+ def _sign_ed25519(payload: bytes, privkey: bytes) -> Tuple[bytes, bytes]:
+ global KEYCACHE
from nacl.signing import SigningKey
from nacl.encoding import Base64Encoder
- sk = SigningKey(privkey, encoder=Base64Encoder)
+ if privkey not in KEYCACHE:
+ sk = SigningKey(privkey, encoder=Base64Encoder)
+ vk = base64.b64encode(sk.verify_key.encode())
+ KEYCACHE[privkey] = (sk, vk)
+ else:
+ sk, vk = KEYCACHE[privkey]
+
bdata = sk.sign(payload, encoder=Base64Encoder)
- return bdata
+
+ return bdata, vk
@staticmethod
def _validate_ed25519(sigdata: bytes, pubkey: bytes) -> bytes:
@@ -238,15 +250,35 @@ class DevsigHeader:
raise ValidationError('Failed to validate signature')
@staticmethod
- def _sign_openpgp(payload: bytes, keyid: Optional[bytes]) -> bytes:
- gpgargs = ['-s']
- if keyid:
- gpgargs += ['-u', keyid]
+ def _sign_openpgp(payload: bytes, keyid: bytes) -> Tuple[bytes, bytes]:
+ global KEYCACHE
+ gpgargs = ['-s', '-u', keyid]
ecode, out, err = gpg_run_command(gpgargs, payload)
if ecode > 0:
raise SigningError('Running gpg failed', errors=err.decode().split('\n'))
bdata = base64.b64encode(out)
- return bdata
+ # Now get the fingerprint of this keyid
+ if keyid not in KEYCACHE:
+ gpgargs = ['--with-colons', '--fingerprint', keyid]
+ ecode, out, err = gpg_run_command(gpgargs)
+ if ecode > 0:
+ raise SigningError('Running gpg failed', errors=err.decode().split('\n'))
+ pkid = None
+ keyfp = None
+ for line in out.split(b'\n'):
+ if line.startswith(b'pub:'):
+ fields = line.split(b':')
+ pkid = fields[4]
+ elif line.startswith(b'fpr:') and pkid:
+ fields = line.split(b':')
+ if fields[9].find(pkid) > 0:
+ keyfp = fields[9]
+ break
+ KEYCACHE[keyid] = keyfp
+ else:
+ keyfp = KEYCACHE[keyid]
+
+ return bdata, keyfp
@staticmethod
def _validate_openpgp(sigdata: bytes, pubkey: Optional[bytes]) -> Tuple[bytes, tuple]:
@@ -398,10 +430,23 @@ class PatattMessage:
if algo == 'ed25519':
# Set signing time for ed25519 sigs
ds.set_field('t', str(int(time.time())))
- hn, hv = ds.sign(keyinfo)
+ hv, pkinfo = ds.sign(keyinfo)
+
+ dshdr = email.header.make_header([(DEVSIG_HDR + b': ' + hv, 'us-ascii')], maxlinelen=78)
+ self.headers.append(dshdr.encode().encode() + self.lf)
+
+ # Make informational header about the key used
+ idata = [
+ b'i=%s' % identity.encode(),
+ b'a=%s' % algo.encode(),
+ ]
+ if algo == 'openpgp':
+ idata.append(b'fpr=%s' % pkinfo)
+ else:
+ idata.append(b'pk=%s' % pkinfo)
- hhdr = email.header.make_header([(hn + b': ' + hv, 'us-ascii')], maxlinelen=78)
- self.headers.append(hhdr.encode().encode() + self.lf)
+ skhdr = email.header.make_header([(SK_HDR + b': ' + b'; '.join(idata), 'us-ascii')], maxlinelen=78)
+ self.headers.append(skhdr.encode().encode() + self.lf)
def validate(self, identity: str, pkey: Union[bytes, str, None], trim_body: bool = False) -> str:
vds = None
@@ -883,7 +928,7 @@ def cmd_gen(cmdargs, config: dict) -> None:
logger.info('Wrote: %s', skey)
with open(pkey, 'wb') as fh:
- fh.write(base64.b64encode(bytes(newkey.verify_key)))
+ fh.write(base64.b64encode(newkey.verify_key.encode()))
logger.info('Wrote: %s', pkey)
# Also copy it into our local keyring
@@ -891,12 +936,12 @@ def cmd_gen(cmdargs, config: dict) -> None:
Path(os.path.dirname(dpkey)).mkdir(parents=True, exist_ok=True)
if not os.path.exists(dpkey):
with open(dpkey, 'wb') as fh:
- fh.write(base64.b64encode(bytes(newkey.verify_key)))
+ fh.write(base64.b64encode(newkey.verify_key.encode()))
logger.info('Wrote: %s', dpkey)
else:
spkey = os.path.join(pdir, make_pkey_path('ed25519', config.get('identity'), identifier))
with open(spkey, 'wb') as fh:
- fh.write(base64.b64encode(bytes(newkey.verify_key)))
+ fh.write(base64.b64encode(newkey.verify_key.encode()))
logger.info('Wrote: %s', spkey)
logger.info('Add the following to your .git/config (or global ~/.gitconfig):')