diff options
authorKonstantin Ryabitsev <konstantin@linuxfoundation.org>2021-05-04 17:51:47 -0400
committerKonstantin Ryabitsev <konstantin@linuxfoundation.org>2021-05-04 17:51:47 -0400
commit30503532619445f808184a286351fefc4e7f9980 (patch)
parent1ad11609a4af6c45c28e76d098327eea3937f769 (diff)
Refactor code to be cleaner
There was too much passing around of parameters, so switch to using two auxiliary classes for most of the signing/validation work. This also more properly implements DKIM and is just generally much cleaner. Signed-off-by: Konstantin Ryabitsev <konstantin@linuxfoundation.org>
1 files changed, 525 insertions, 417 deletions
diff --git a/patatt/__init__.py b/patatt/__init__.py
index 7a8cd1c..78393ed 100644
--- a/patatt/__init__.py
+++ b/patatt/__init__.py
@@ -22,7 +22,7 @@ import email.utils
import email.header
from pathlib import Path
-from typing import Optional, Tuple
+from typing import Optional, Tuple, Union
from io import BytesIO
logger = logging.getLogger(__name__)
@@ -40,6 +40,7 @@ DEFAULT_CONFIG = {
# My version
__VERSION__ = '0.1.0'
class SigningError(Exception):
@@ -48,18 +49,464 @@ class SigningError(Exception):
self.errors = errors
-class ValidationError(Exception):
+class ConfigurationError(Exception):
def __init__(self, message: str, errors: Optional[list] = None):
self.errors = errors
-class ConfigurationError(Exception):
+class ValidationError(Exception):
def __init__(self, message: str, errors: Optional[list] = None):
self.errors = errors
+class BodyValidationError(ValidationError):
+ def __init__(self, message: str, errors: Optional[list] = None):
+ super().__init__(message, errors)
+class DevsigHeader:
+ def __init__(self, hval: Optional[bytes] = None):
+ self._headervals = list()
+ self._body_hash = None
+ # it doesn't need to be in any particular order,
+ # but that's just anarchy, anarchy, I say!
+ self._order = ['v', 'a', 't', 'l', 'i', 's', 'h', 'bh']
+ self.hval = None
+ self.hdata = dict()
+ if hval:
+ self.from_bytes(hval)
+ else:
+ self.hdata['v'] = b'1'
+ def from_bytes(self, hval: bytes) -> None:
+ self.hval = DevsigHeader._dkim_canonicalize_header(hval)
+ hval = re.sub(rb'\s*', b'', hval)
+ for chunk in hval.split(b';'):
+ parts = chunk.split(b'=', 1)
+ if len(parts) < 2:
+ continue
+ self.set_field(parts[0].decode(), parts[1])
+ def get_field(self, field: str, decode: bool = False) -> Union[None, str, bytes]:
+ value = self.hdata.get(field)
+ if isinstance(value, bytes) and decode:
+ return value.decode()
+ return value
+ def set_field(self, field: str, value: Union[None, str, bytes]) -> None:
+ if value is None:
+ del self.hdata[field]
+ return
+ if isinstance(value, str):
+ value = value.encode()
+ self.hdata[field] = value
+ # do any git-mailinfo normalization prior to calling this
+ def set_body(self, body: bytes, maxlen: Optional[int] = None) -> None:
+ if maxlen:
+ if maxlen > len(body):
+ raise ValidationError('maxlen is larger than payload')
+ if maxlen < len(body):
+ body = body[:maxlen]
+ self.hdata['l'] = bytes(len(body))
+ hashed = hashlib.sha256()
+ hashed.update(body)
+ self._body_hash = base64.b64encode(hashed.digest())
+ # do any git-mailinfo normalization prior to calling this
+ def set_headers(self, headers: list) -> None:
+ hfield = self.get_field('h')
+ if hfield:
+ # Make sure REQ_HEADERS are in this list
+ want_headers = [x.strip() for x in hfield.split(b':')]
+ for rqhdr in REQ_HDRS:
+ if rqhdr not in want_headers:
+ raise ValidationError('Signature is missing a required header %s' % rqhdr.decode())
+ else:
+ want_headers = REQ_HDRS
+ self._headervals = list()
+ for header in headers:
+ try:
+ left, right = header.split(b':', 1)
+ hname = left.strip().lower()
+ if hname not in want_headers:
+ continue
+ except ValueError:
+ continue
+ self._headervals.append(hname + b':' + DevsigHeader._dkim_canonicalize_header(right))
+ self.hdata['h'] = b':'.join(want_headers)
+ def sanity_check(self) -> None:
+ if 'a' not in self.hdata:
+ raise RuntimeError('Must set "a" field first')
+ if not self._body_hash:
+ raise RuntimeError('Must use set_body first')
+ if not self._headervals:
+ raise RuntimeError('Must use set_headers first')
+ def validate(self, keyinfo: Union[str, bytes, None]) -> str:
+ self.sanity_check()
+ # Check that we have a b= field
+ if not self.get_field('b'):
+ raise RuntimeError('Missing "b=" value')
+ pts = self.hval.rsplit(b'b=', 1)
+ dshdr = pts[0] + b'b='
+ bdata = re.sub(rb'\s*', b'', pts[1])
+ algo = self.get_field('a', decode=True)
+ if algo.startswith('ed25519'):
+ sdigest = DevsigHeader._validate_ed25519(bdata, keyinfo)
+ signtime = self.get_field('t', decode=True)
+ if not signtime:
+ raise ValidationError('t= field is required for ed25519 sigs')
+ elif algo.startswith('openpgp'):
+ sdigest, (good, valid, trusted, signtime) = DevsigHeader._validate_openpgp(bdata, keyinfo)
+ else:
+ raise ValidationError('Unknown algorithm: %s', algo)
+ # Now we calculate our own digest
+ hashed = hashlib.sha256()
+ # Add in our _headervals first (they aready have CRLF endings)
+ hashed.update(b''.join(self._headervals))
+ # and the devsig header now, without the trailing CRLF
+ hashed.update(DEVSIG_HDR.lower() + b':' + dshdr)
+ vdigest = hashed.digest()
+ if sdigest != vdigest:
+ raise ValidationError('Header validation failed')
+ # Now validate body hash
+ if self.get_field('bh') != self._body_hash:
+ raise BodyValidationError('Body content validation failed')
+ return signtime
+ def sign(self, keyinfo: bytes, split: bool = True) -> Tuple[bytes, bytes]:
+ self.sanity_check()
+ self.set_field('bh', self._body_hash)
+ algo = self.get_field('a', decode=True)
+ hparts = list()
+ for fn in self._order:
+ fv = self.get_field(fn)
+ if fv is not None:
+ hparts.append(b'%s=%s' % (fn.encode(), fv))
+ hparts.append(b'b=')
+ dshval = b'; '.join(hparts)
+ hashed = hashlib.sha256()
+ # Add in our _headervals first (they aready have CRLF endings)
+ hashed.update(b''.join(self._headervals))
+ # and ourselves now, without the trailing CRLF
+ hashed.update(DEVSIG_HDR.lower() + b':' + dshval)
+ digest = hashed.digest()
+ if algo.startswith('ed25519'):
+ bval = DevsigHeader._sign_ed25519(digest, keyinfo)
+ elif algo.startswith('openpgp'):
+ bval = DevsigHeader._sign_openpgp(digest, keyinfo)
+ else:
+ raise RuntimeError('Unknown a=%s' % algo)
+ if split:
+ return DEVSIG_HDR, dshval + DevsigHeader.splitter(bval)
+ return DEVSIG_HDR, dshval + bval
+ @staticmethod
+ def _sign_ed25519(payload: bytes, privkey: bytes) -> bytes:
+ from nacl.signing import SigningKey
+ from nacl.encoding import Base64Encoder
+ sk = SigningKey(privkey, encoder=Base64Encoder)
+ bdata = sk.sign(payload, encoder=Base64Encoder)
+ return bdata
+ @staticmethod
+ def _validate_ed25519(sigdata: bytes, pubkey: bytes) -> bytes:
+ from nacl.signing import VerifyKey
+ from nacl.encoding import Base64Encoder
+ from nacl.exceptions import BadSignatureError
+ vk = VerifyKey(pubkey, encoder=Base64Encoder)
+ try:
+ return vk.verify(sigdata, encoder=Base64Encoder)
+ except BadSignatureError:
+ raise ValidationError('Failed to validate signature')
+ @staticmethod
+ def _sign_openpgp(payload: bytes, keyid: Optional[bytes]) -> bytes:
+ gpgargs = ['-s']
+ if keyid:
+ gpgargs += ['-u', keyid]
+ ecode, out, err = gpg_run_command(gpgargs, payload)
+ if ecode > 0:
+ raise SigningError('Running gpg failed', errors=err.decode().split('\n'))
+ bdata = base64.b64encode(out)
+ return bdata
+ @staticmethod
+ def _validate_openpgp(sigdata: bytes, pubkey: Optional[bytes]) -> Tuple[bytes, tuple]:
+ bsigdata = base64.b64decode(sigdata)
+ vrfyargs = ['--verify', '--output', '-', '--status-fd=2']
+ if pubkey:
+ with tempfile.TemporaryFile(suffix='.patch-attest-poc') as temp_keyring:
+ keyringargs = ['--no-default-keyring', f'--keyring={temp_keyring}']
+ gpgargs = keyringargs + ['--status-fd=1', '--import']
+ ecode, out, err = gpg_run_command(gpgargs, stdin=pubkey)
+ # look for IMPORT_OK
+ if out.find(b'[GNUPG:] IMPORT_OK') < 0:
+ raise ValidationError('Could not import GnuPG public key')
+ gpgargs = keyringargs + vrfyargs
+ ecode, out, err = gpg_run_command(gpgargs, stdin=bsigdata)
+ else:
+ logger.debug('Verifying using default keyring')
+ ecode, out, err = gpg_run_command(vrfyargs, stdin=bsigdata)
+ if ecode > 0:
+ raise ValidationError('Failed to validate PGP signature')
+ good, valid, trusted, signtime = DevsigHeader._check_gpg_status(err)
+ if good and valid:
+ return out, (good, valid, trusted, signtime)
+ raise ValidationError('Failed to validate PGP signature')
+ @staticmethod
+ def _check_gpg_status(status: bytes) -> Tuple[bool, bool, bool, str]:
+ good = False
+ valid = False
+ trusted = False
+ signtime = ''
+ gs_matches = re.search(rb'^\[GNUPG:] GOODSIG ([0-9A-F]+)\s+(.*)$', status, flags=re.M)
+ if gs_matches:
+ good = True
+ vs_matches = re.search(rb'^\[GNUPG:] VALIDSIG ([0-9A-F]+) (\d{4}-\d{2}-\d{2}) (\d+)', status, flags=re.M)
+ if vs_matches:
+ valid = True
+ signtime = vs_matches.groups()[2].decode()
+ ts_matches = re.search(rb'^\[GNUPG:] TRUST_(FULLY|ULTIMATE)', status, flags=re.M)
+ if ts_matches:
+ trusted = True
+ return good, valid, trusted, signtime
+ @staticmethod
+ def splitter(longstr: bytes, limit: int = 78) -> bytes:
+ splitstr = list()
+ first = True
+ while len(longstr) > limit:
+ at = limit
+ if first:
+ first = False
+ at -= 2
+ splitstr.append(longstr[:at])
+ longstr = longstr[at:]
+ splitstr.append(longstr)
+ return b' '.join(splitstr)
+ @staticmethod
+ def _dkim_canonicalize_header(hval: bytes) -> bytes:
+ # We only do relaxed for headers
+ # o Unfold all header field continuation lines as described in
+ # [RFC5322]; in particular, lines with terminators embedded in
+ # continued header field values (that is, CRLF sequences followed by
+ # WSP) MUST be interpreted without the CRLF. Implementations MUST
+ # NOT remove the CRLF at the end of the header field value.
+ hval = re.sub(rb'[\r\n]', b'', hval)
+ # o Convert all sequences of one or more WSP characters to a single SP
+ # character. WSP characters here include those before and after a
+ # line folding boundary.
+ hval = re.sub(rb'\s+', b' ', hval)
+ # o Delete all WSP characters at the end of each unfolded header field
+ # value.
+ # o Delete any WSP characters remaining before and after the colon
+ # separating the header field name from the header field value. The
+ # colon separator MUST be retained.
+ hval = hval.strip() + b'\r\n'
+ return hval
+class PatattMessage:
+ def __init__(self, msgdata: bytes):
+ self.headers = list()
+ self.body = b''
+ self.lf = b'\n'
+ self.signed = False
+ self.canon_headers = None
+ self.canon_body = None
+ self.canon_identity = None
+ self.sigs = None
+ self.load_from_bytes(msgdata)
+ def git_canonicalize(self):
+ if self.canon_body is not None:
+ return
+ # Generate a new payload using m and p and canonicalize with \r\n endings,
+ # trimming any excess blank lines ("simple" DKIM canonicalization).
+ m, p, i = PatattMessage._get_git_mailinfo(b''.join(self.headers) + self.lf + self.body)
+ self.canon_body = b''
+ for line in re.sub(rb'[\r\n]*$', b'', m + p).split(b'\n'):
+ self.canon_body += re.sub(rb'[\r\n]*$', b'', line) + b'\r\n'
+ idata = dict()
+ for line in re.sub(rb'[\r\n]*$', b'', i).split(b'\n'):
+ left, right = line.split(b':', 1)
+ idata[left.lower()] = right.strip()
+ # Theoretically, we should always see an "Email" line
+ self.canon_identity = idata.get(b'email', b'').decode()
+ # Now substituting headers returned by mailinfo
+ self.canon_headers = list()
+ for header in self.headers:
+ try:
+ left, right = header.split(b':', 1)
+ lleft = left.lower()
+ if lleft == b'from':
+ right = b' ' + idata.get(b'author', b'') + b' <' + idata.get(b'email', b'') + b'>'
+ elif lleft == b'subject':
+ right = b' ' + idata.get(b'subject', b'')
+ self.canon_headers.append(left + b':' + right)
+ except ValueError:
+ self.canon_headers.append(header)
+ def sign(self, algo: str, keyinfo: Union[str, bytes], identity: Optional[str], selector: Optional[str]) -> None:
+ self.git_canonicalize()
+ ds = DevsigHeader()
+ ds.set_headers(self.canon_headers)
+ ds.set_body(self.canon_body)
+ ds.set_field('l', str(len(self.body)))
+ if identity and identity != self.canon_identity:
+ ds.set_field('i', identity)
+ if selector:
+ ds.set_field('s', selector)
+ if algo not in ('ed25519', 'openpgp'):
+ raise SigningError('Unsupported algorithm: %s' % algo)
+ ds.set_field('a', '%s-sha256' % algo)
+ if algo == 'ed25519':
+ # Set signing time for ed25519 sigs
+ ds.set_field('t', str(int(time.time())))
+ hn, hv = ds.sign(keyinfo)
+ hhdr = email.header.make_header([(hn + b': ' + hv, 'us-ascii')], maxlinelen=78)
+ self.headers.append(hhdr.encode().encode() + self.lf)
+ def validate(self, identity: str, pkey: Union[bytes, str, None], trim_body: bool = False) -> str:
+ vds = None
+ for ds in self.sigs:
+ if ds.get_field('i', decode=True) == identity:
+ vds = ds
+ break
+ if vds is None:
+ raise ValidationError('No signatures matching identity %s' % identity)
+ self.git_canonicalize()
+ vds.set_headers(self.canon_headers)
+ if trim_body:
+ lfield = vds.get_field('l')
+ if lfield:
+ try:
+ maxlen = int(lfield)
+ vds.set_body(self.canon_body, maxlen=maxlen)
+ except ValueError:
+ vds.set_body(self.canon_body)
+ else:
+ vds.set_body(self.canon_body)
+ return vds.validate(pkey)
+ def as_bytes(self):
+ return b''.join(self.headers) + self.lf + self.body
+ def as_string(self, encoding='utf-8'):
+ return self.as_bytes().decode(encoding)
+ def load_from_bytes(self, msgdata: bytes) -> None:
+ # We use simplest parsing -- using Python's email module would be overkill
+ ldshn = DEVSIG_HDR.lower()
+ with BytesIO(msgdata) as fh:
+ while True:
+ line = fh.readline()
+ if not len(line):
+ break
+ if not len(line.strip()):
+ self.lf = line
+ self.body = fh.read()
+ break
+ # is it a wrapped header?
+ if line[0] in ("\x09", "\x20", 0x09, 0x20):
+ if not len(self.headers):
+ raise RuntimeError('Not a valid RFC2822 message')
+ # attach it to the previous header
+ self.headers[-1] += line
+ continue
+ # Is it a signature header?
+ if line.lower().startswith(ldshn):
+ self.signed = True
+ self.headers.append(line)
+ def get_sigs(self) -> list:
+ if self.sigs is not None:
+ return self.sigs
+ ldshn = DEVSIG_HDR.lower()
+ self.sigs = list()
+ from_id = None
+ for header in self.headers:
+ try:
+ left, right = header.split(b':', 1)
+ hn = left.strip().lower()
+ hv = right
+ if hn == ldshn:
+ self.sigs.append(DevsigHeader(hv))
+ elif hn == b'from':
+ parts = email.utils.parseaddr(hv.decode().strip())
+ from_id = parts[1]
+ except ValueError:
+ raise RuntimeError('Error parsing headers')
+ if from_id:
+ for ds in self.sigs:
+ if 'i' not in ds.hdata:
+ ds.set_field('i', from_id)
+ return self.sigs
+ @staticmethod
+ def _get_git_mailinfo(payload: bytes) -> Tuple[bytes, bytes, bytes]:
+ with tempfile.TemporaryDirectory(suffix='.git-mailinfo') as td:
+ mf = os.path.join(td, 'm')
+ pf = os.path.join(td, 'p')
+ cmdargs = ['git', 'mailinfo', '--encoding=utf-8', mf, pf]
+ ecode, i, err = _run_command(cmdargs, stdin=payload)
+ if ecode > 0:
+ logger.debug('FAILED : Failed running git-mailinfo:')
+ logger.debug(err.decode())
+ raise RuntimeError('Failed to run git-mailinfo: %s' % err.decode())
+ with open(mf, 'rb') as mfh:
+ m = mfh.read()
+ with open(pf, 'rb') as pfh:
+ p = pfh.read()
+ return m, p, i
def get_data_dir():
if 'XDG_DATA_HOME' in os.environ:
datahome = os.environ['XDG_DATA_HOME']
@@ -139,140 +586,6 @@ def gpg_run_command(cmdargs: list, stdin: bytes = None) -> Tuple[int, bytes, byt
return _run_command(cmdargs, stdin)
-def check_gpg_status(status: bytes) -> Tuple[bool, bool, bool, str]:
- good = False
- valid = False
- trusted = False
- signtime = ''
- gs_matches = re.search(rb'^\[GNUPG:] GOODSIG ([0-9A-F]+)\s+(.*)$', status, flags=re.M)
- if gs_matches:
- good = True
- vs_matches = re.search(rb'^\[GNUPG:] VALIDSIG ([0-9A-F]+) (\d{4}-\d{2}-\d{2}) (\d+)', status, flags=re.M)
- if vs_matches:
- valid = True
- signtime = vs_matches.groups()[2].decode()
- ts_matches = re.search(rb'^\[GNUPG:] TRUST_(FULLY|ULTIMATE)', status, flags=re.M)
- if ts_matches:
- trusted = True
- return good, valid, trusted, signtime
-def get_git_mailinfo(payload: bytes) -> Tuple[bytes, bytes, bytes]:
- with tempfile.TemporaryDirectory(suffix='.git-mailinfo') as td:
- mf = os.path.join(td, 'm')
- pf = os.path.join(td, 'p')
- cmdargs = ['git', 'mailinfo', '--encoding=utf-8', mf, pf]
- ecode, out, err = _run_command(cmdargs, stdin=payload)
- if ecode > 0:
- logger.debug('FAILED : Failed running git-mailinfo:')
- logger.debug(err.decode())
- raise RuntimeError('Failed to run git-mailinfo: %s' % err.decode())
- with open(mf, 'rb') as mfh:
- m = mfh.read()
- with open(pf, 'rb') as pfh:
- p = pfh.read()
- return m, p, out
-def is_signed(headers: list):
- for header in headers:
- try:
- left, right = header.split(b':', 1)
- if left.strip().lower() == DEVSIG_HDR.lower():
- return True
- except ValueError:
- continue
- return False
-def parse_message(msgdata: bytes) -> Tuple[list, bytes]:
- # We use simplest parsing -- using Python's email module would be overkill
- headers = list()
- with BytesIO(msgdata) as fh:
- while True:
- line = fh.readline()
- if not len(line):
- break
- if not len(line.strip()):
- # Keep extra LF in headers so we don't have to track LF/CRLF endings
- headers.append(line)
- payload = fh.read()
- break
- # is it a wrapped header?
- if line[0] in ("\x09", "\x20", 0x09, 0x20):
- if not len(headers):
- raise RuntimeError('Not a valid RFC2822 message')
- # attach it to the previous header
- headers[-1] += line
- continue
- headers.append(line)
- return headers, payload
-def get_mailinfo_message(oheaders: list, opayload: bytes, want_hdrs: list,
- maxlen: Optional[int]) -> Tuple[list, bytes, str]:
- # We pre-canonicalize using git mailinfo
- # use whatever lf is used in the headers
- origmsg = b''.join(oheaders) + opayload
- m, p, i = get_git_mailinfo(origmsg)
- # Generate a new payload using m and p and canonicalize with \r\n endings,
- # trimming any excess blank lines ("simple" DKIM canonicalization).
- cpayload = b''
- for line in re.sub(rb'[\r\n]*$', b'', m + p).split(b'\n'):
- cpayload += re.sub(rb'[\r\n]*$', b'', line) + b'\r\n'
- if maxlen:
- logger.debug('Limiting payload length to %d bytes', maxlen)
- cpayload = cpayload[:maxlen]
- idata = dict()
- for line in re.sub(rb'[\r\n]*$', b'', i).split(b'\n'):
- left, right = line.split(b':', 1)
- idata[left.lower()] = right.strip()
- # Theoretically, we should always see an "Email" line
- identity = idata.get(b'email', b'').decode()
- # Now substituting headers returned by mailinfo
- cheaders = list()
- for oheader in oheaders:
- try:
- left, right = oheader.split(b':', 1)
- lleft = left.lower()
- if lleft not in want_hdrs:
- continue
- if lleft == b'from':
- right = b' ' + idata.get(b'author', b'') + b' <' + idata.get(b'email', b'') + b'>'
- elif lleft == b'subject':
- right = b' ' + idata.get(b'subject', b'')
- cheaders.append(left + b':' + right)
- except ValueError:
- cheaders.append(oheader)
- return cheaders, cpayload, identity
-def splitter(longstr: bytes, limit: int = 78) -> bytes:
- splitstr = list()
- first = True
- while len(longstr) > limit:
- at = limit
- if first:
- first = False
- at -= 2
- splitstr.append(longstr[:at])
- longstr = longstr[at:]
- splitstr.append(longstr)
- return b' '.join(splitstr)
def get_git_toplevel(gitdir: str = None) -> str:
cmdargs = ['git']
if gitdir:
@@ -284,38 +597,6 @@ def get_git_toplevel(gitdir: str = None) -> str:
return ''
-def get_parts_from_header(hval: bytes) -> dict:
- hval = re.sub(rb'\s*', b'', hval)
- hdata = dict()
- for chunk in hval.split(b';'):
- parts = chunk.split(b'=', 1)
- if len(parts) < 2:
- continue
- hdata[parts[0].decode()] = parts[1]
- return hdata
-def dkim_canonicalize_header(hval: bytes) -> bytes:
- # We only do relaxed for headers
- # o Unfold all header field continuation lines as described in
- # [RFC5322]; in particular, lines with terminators embedded in
- # continued header field values (that is, CRLF sequences followed by
- # WSP) MUST be interpreted without the CRLF. Implementations MUST
- # NOT remove the CRLF at the end of the header field value.
- hval = re.sub(rb'[\r\n]', b'', hval)
- # o Convert all sequences of one or more WSP characters to a single SP
- # character. WSP characters here include those before and after a
- # line folding boundary.
- hval = re.sub(rb'\s+', b' ', hval)
- # o Delete all WSP characters at the end of each unfolded header field
- # value.
- # o Delete any WSP characters remaining before and after the colon
- # separating the header field name from the header field value. The
- # colon separator MUST be retained.
- hval = hval.strip() + b'\r\n'
- return hval
def make_pkey_path(keytype: str, identity: str, selector: str) -> str:
chunks = identity.split('@', 1)
if len(chunks) != 2:
@@ -383,188 +664,6 @@ def get_public_key(source: str, keytype: str, identity: str, selector: str) -> T
raise KeyError('Could not find %s' % fullpath)
-def make_devsig_header(headers: list, payload: bytes, algo: str, signtime: Optional[str] = None,
- identity: Optional[str] = None, selector: Optional[str] = None, maxlen: Optional[int] = None,
- want_hdrs: Optional[list] = None, strict: bool = False) -> Tuple[bytes, bytes]:
- if not want_hdrs:
- want_hdrs = REQ_HDRS
- cheaders, cpayload, cidentity = get_mailinfo_message(headers, payload, want_hdrs, maxlen)
- hashed = hashlib.sha256()
- hashed.update(cpayload)
- bh = base64.b64encode(hashed.digest())
- hparts = [
- b'v=1',
- b'a=%s-sha256' % algo.encode(),
- ]
- if (identity and strict) or (not strict and identity != cidentity):
- hparts.append(b'i=%s' % identity.encode())
- if selector:
- hparts.append(b's=%s' % selector.encode())
- if signtime:
- hparts.append(b't=%s' % signtime.encode())
- hparts.append(b'h=%s' % b':'.join(want_hdrs))
- hparts.append(b'l=%d' % len(cpayload))
- hparts.append(b'bh=%s' % bh)
- hparts.append(b'b=')
- dshval = b'; '.join(hparts)
- hashed = hashlib.sha256()
- for cheader in cheaders:
- try:
- left, right = cheader.split(b':', 1)
- hname = left.strip().lower()
- if hname not in want_hdrs:
- continue
- except ValueError:
- continue
- hashed.update(hname + b':' + dkim_canonicalize_header(right))
- hashed.update(DEVSIG_HDR.lower() + b':' + dshval)
- dshdr = DEVSIG_HDR + b': ' + dshval
- return dshdr, hashed.digest()
-def get_devsig_header_info(headers) -> Tuple[Optional[str], str, str, str, list, dict]:
- from_hdr = None
- hdata = None
- need_hdrs = [b'from', DEVSIG_HDR.lower()]
- for header in headers:
- try:
- left, right = header.split(b':', 1)
- hname = left.strip().lower()
- # We want a "from" header and a DEVSIG_HDR
- if hname not in need_hdrs:
- continue
- if hname == b'from':
- from_hdr = right
- continue
- hval = dkim_canonicalize_header(right)
- hdata = get_parts_from_header(hval)
- except ValueError:
- continue
- if hdata is None:
- raise ValidationError('No "%s:" header in message' % DEVSIG_HDR.decode())
- # make sure the required headers are in the sig
- if 'h' not in hdata:
- raise ValidationError('h= is required but is not present in %s' % DEVSIG_HDR.decode())
- signed_hdrs = [x.strip() for x in hdata['h'].split(b':')]
- for rhdr in REQ_HDRS:
- if rhdr not in signed_hdrs:
- raise ValidationError('%s is a required header' % rhdr.decode())
- if 'i' not in hdata:
- # Use the identity from the from header
- if not from_hdr:
- raise ValidationError('No i= in %s, and no From: header!' % DEVSIG_HDR.decode())
- parts = email.utils.parseaddr(from_hdr.decode())
- identity = parts[1]
- else:
- identity = hdata['i'].decode()
- if 'a' in hdata:
- apart = hdata['a'].decode()
- if apart.startswith('ed25519'):
- algo = 'ed25519'
- elif apart.startswith('openpgp'):
- algo = 'openpgp'
- else:
- raise ValidationError('Unsupported a= in %s: %s' % (DEVSIG_HDR.decode(), apart))
- else:
- # Default is ed25519-sha256
- algo = 'ed25519'
- if 's' in hdata:
- selector = hdata['s'].decode()
- else:
- selector = 'default'
- if 't' in hdata:
- signtime = hdata['t'].decode()
- else:
- signtime = None
- return signtime, identity, selector, algo, signed_hdrs, hdata
-def sign_ed25519(headers: list, payload: bytes, keydata: str,
- identity: Optional[str] = None, selector: Optional[str] = None) -> email.header.Header:
- from nacl.signing import SigningKey
- from nacl.encoding import Base64Encoder
- logger.debug('SIGNING : ED25519')
- signtime = str(int(time.time()))
- dshdr, digest = make_devsig_header(headers, payload, algo='ed25519', signtime=signtime,
- identity=identity, selector=selector)
- sk = SigningKey(keydata, encoder=Base64Encoder)
- bdata = sk.sign(digest, encoder=Base64Encoder)
- hhdr = email.header.make_header([(dshdr + splitter(bdata), 'us-ascii')], maxlinelen=78)
- return hhdr
-def validate_ed25519(sigdata: bytes, pubkey: bytes) -> bytes:
- from nacl.signing import VerifyKey
- from nacl.encoding import Base64Encoder
- from nacl.exceptions import BadSignatureError
- vk = VerifyKey(pubkey, encoder=Base64Encoder)
- try:
- return vk.verify(sigdata, encoder=Base64Encoder)
- except BadSignatureError:
- raise ValidationError('Failed to validate signature')
-def sign_openpgp(headers: list, payload: bytes, keyid: Optional[str],
- identity: Optional[str] = None, selector: Optional[str] = None) -> email.header.Header:
- logger.debug('SIGNING : OpenPGP')
- # OpenPGP header includes signing time, so we don't need to include t=
- dshdr, digest = make_devsig_header(headers, payload, algo='openpgp', identity=identity, selector=selector)
- gpgargs = ['-s']
- if keyid:
- gpgargs += ['-u', keyid]
- ecode, out, err = gpg_run_command(gpgargs, digest)
- if ecode > 0:
- raise SigningError('Running gpg failed', errors=err.decode().split('\n'))
- bdata = base64.b64encode(out)
- hhdr = email.header.make_header([(dshdr + splitter(bdata), 'us-ascii')], maxlinelen=78)
- return hhdr
-def validate_openpgp(sigdata: bytes, pubkey: Optional[bytes]) -> Tuple[bytes, tuple]:
- bsigdata = base64.b64decode(sigdata)
- vrfyargs = ['--verify', '--output', '-', '--status-fd=2']
- if pubkey:
- with tempfile.TemporaryFile(suffix='.patch-attest-poc') as temp_keyring:
- keyringargs = ['--no-default-keyring', f'--keyring={temp_keyring}']
- gpgargs = keyringargs + ['--status-fd=1', '--import']
- ecode, out, err = gpg_run_command(gpgargs, stdin=pubkey)
- # look for IMPORT_OK
- if out.find(b'[GNUPG:] IMPORT_OK') < 0:
- raise ValidationError('Could not import GnuPG public key')
- gpgargs = keyringargs + vrfyargs
- ecode, out, err = gpg_run_command(gpgargs, stdin=bsigdata)
- else:
- logger.debug('Verifying using default keyring')
- ecode, out, err = gpg_run_command(vrfyargs, stdin=bsigdata)
- if ecode > 0:
- raise ValidationError('Failed to validate PGP signature')
- good, valid, trusted, signtime = check_gpg_status(err)
- if good and valid:
- return out, (good, valid, trusted, signtime)
- raise ValidationError('Failed to validate PGP signature')
def _load_messages(cmdargs) -> dict:
import sys
if not sys.stdin.isatty():
@@ -582,6 +681,13 @@ def _load_messages(cmdargs) -> dict:
return messages
+def sign_message(msgdata: bytes, algo: str, keyinfo: Union[str, bytes],
+ identity: Optional[str], selector: Optional[str]) -> bytes:
+ pm = PatattMessage(msgdata)
+ pm.sign(algo, keyinfo, identity=identity, selector=selector)
+ return pm.as_bytes()
def cmd_sign(cmdargs, config: dict) -> None:
# Do we have the signingkey defined?
usercfg = get_config_from_git(r'user\..*')
@@ -602,7 +708,7 @@ def cmd_sign(cmdargs, config: dict) -> None:
sk = config.get('signingkey')
if sk.startswith('ed25519:'):
- _sign_func = sign_ed25519
+ algo = 'ed25519'
identifier = sk[8:]
keysrc = None
if identifier.startswith('/') and os.path.exists(identifier):
@@ -630,84 +736,84 @@ def cmd_sign(cmdargs, config: dict) -> None:
keydata = fh.read()
elif sk.startswith('openpgp:'):
- _sign_func = sign_openpgp
+ algo = 'openpgp'
keydata = sk[8:]
logger.critical('Unknown key type: %s', sk)
- for filename, msgdata in messages.items():
- headers, payload = parse_message(msgdata)
- if is_signed(headers):
- logger.critical('Already signed: %s', filename)
+ for fn, msgdata in messages.items():
+ pm = PatattMessage(msgdata)
+ if pm.signed:
+ logger.critical('Already signed: %s', fn)
- hhdr = _sign_func(headers, payload, keydata, identity=config.get('identity', ''),
- selector=config.get('selector', ''))
+ pm.sign(algo, keydata, identity=config.get('identity'), selector=config.get('selector'))
+ logger.debug('--- SIGNED MESSAGE STARTS ---')
+ logger.debug(pm.as_string())
+ if fn == '-':
+ sys.stdout.buffer.write(pm.as_bytes())
+ else:
+ with open(fn, 'wb') as fh:
+ fh.write(pm.as_bytes())
+ logger.info('Signed: %s', fn)
except SigningError as ex:
logger.critical('ERROR: %s', ex)
- dshdr = hhdr.encode().encode()
- # insert it before the blank line
- lf = headers.pop(-1)
- headers.append(dshdr + lf)
- headers.append(lf)
- payload = b''.join(headers) + payload
- logger.debug('--- SIGNED MESSAGE STARTS ---')
- logger.debug(payload)
- if filename == '-':
- sys.stdout.buffer.write(payload)
+def validate_message(msgdata: bytes, sources: list) -> list:
+ errors = list()
+ goodsigs = list()
+ success = False
+ pm = PatattMessage(msgdata)
+ if not pm.signed:
+ errors.append('message is not signed')
+ raise ValidationError('message is not signed', errors)
+ # Find all identities for which we have public keys
+ for ds in pm.get_sigs():
+ a = ds.get_field('a', decode=True)
+ i = ds.get_field('i', decode=True)
+ s = ds.get_field('s', decode=True)
+ if not s:
+ s = 'default'
+ if a.startswith('ed25519'):
+ algo = 'ed25519'
+ elif a.startswith('openpgp'):
+ algo = 'openpgp'
- with open(filename, 'wb') as fh:
- fh.write(payload)
+ errors.append('%s/%s Unknown algorigthm: %s' % (i, s, a))
+ continue
- logger.info('Signed: %s', filename)
+ pkey = keysrc = None
+ for source in sources:
+ try:
+ pkey, keysrc = get_public_key(source, algo, i, s)
+ break
+ except KeyError:
+ pass
+ if not pkey and algo == 'ed25519':
+ errors.append('%s/%s no matching ed25519 key found' % (i, s))
+ continue
-def validate_message(msgdata: bytes, sources: list):
- headers, payload = parse_message(msgdata)
+ try:
+ signtime = pm.validate(i, pkey)
+ success = True
+ except ValidationError:
+ errors.append('%s/%s failed to validate using a=%s, pkey=%s' % (i, s, a, keysrc))
+ continue
- if not is_signed(headers):
- raise ValidationError('message is not signed')
+ goodsigs.append((i, signtime, keysrc, algo))
- signtime, identity, selector, algo, signed_hdrs, hdata = get_devsig_header_info(headers)
+ if not success:
+ raise ValidationError('Failed to validate message', errors)
- pkey = None
- keysrc = None
- for source in sources:
- try:
- pkey, keysrc = get_public_key(source, algo, identity, selector)
- break
- except KeyError:
- pass
- if not pkey and algo == 'ed25519':
- raise ValidationError('no %s public key for %s/%s' % (algo, identity, selector))
- sdigest = None
- if algo == 'ed25519':
- sdigest = validate_ed25519(hdata['b'], pkey)
- # signtime is required for ed25519 signatures
- signtime = hdata.get('t', b'').decode()
- if not signtime:
- raise ValidationError('signature does not include t= signing time')
- elif algo == 'openpgp':
- sdigest, signtime = validate_openpgp(hdata['b'], pkey)
- if not sdigest:
- raise ValidationError('faled to verify %s signature for %s/%s' % (algo, identity, selector))
- # Now calculate our own digest and compare
- dshdr, digest = make_devsig_header(headers, payload, algo, signtime=hdata.get('t', b'').decode(),
- identity=hdata.get('i', b'').decode(),
- selector=hdata.get('s', b'').decode(), want_hdrs=signed_hdrs,
- strict=True)
- if sdigest == digest:
- return signtime, identity, selector, algo, keysrc
- raise ValidationError('failed to verify message content')
+ return goodsigs
def cmd_validate(cmdargs, config: dict):
@@ -720,13 +826,15 @@ def cmd_validate(cmdargs, config: dict):
for filename, msgdata in messages.items():
- signtime, identity, selector, algo, pkey = validate_message(msgdata, sources)
+ goodsigs = validate_message(msgdata, sources)
logger.critical('PASS: %s', os.path.basename(filename))
- logger.info(' by : %s (%s)', identity, algo)
- if pkey:
- logger.info(' key: %s', pkey)
- else:
- logger.info(' key: in default GnuPG keyring')
+ for identity, signtime, keysrc, algo in goodsigs:
+ logger.info(' by : %s (%s)', identity, algo)
+ if keysrc:
+ logger.info(' key: %s', keysrc)
+ else:
+ logger.info(' key: default keyring')
except ValidationError as ex:
logger.critical('FAIL: %s', os.path.basename(filename))
logger.critical(' err: %s', ex)