diff options
author | Konstantin Ryabitsev <konstantin@linuxfoundation.org> | 2021-05-04 17:51:47 -0400 |
---|---|---|
committer | Konstantin Ryabitsev <konstantin@linuxfoundation.org> | 2021-05-04 17:51:47 -0400 |
commit | 30503532619445f808184a286351fefc4e7f9980 (patch) | |
tree | 508eaec08d66387d5751c14e138940238bc21229 | |
parent | 1ad11609a4af6c45c28e76d098327eea3937f769 (diff) | |
download | patatt-30503532619445f808184a286351fefc4e7f9980.tar.gz |
Refactor code to be cleaner
There was too much passing around of parameters, so switch to using two
auxiliary classes for most of the signing/validation work. This also
more properly implements DKIM and is just generally much cleaner.
Signed-off-by: Konstantin Ryabitsev <konstantin@linuxfoundation.org>
-rw-r--r-- | patatt/__init__.py | 942 |
1 files changed, 525 insertions, 417 deletions
diff --git a/patatt/__init__.py b/patatt/__init__.py index 7a8cd1c..78393ed 100644 --- a/patatt/__init__.py +++ b/patatt/__init__.py @@ -22,7 +22,7 @@ import email.utils import email.header from pathlib import Path -from typing import Optional, Tuple +from typing import Optional, Tuple, Union from io import BytesIO logger = logging.getLogger(__name__) @@ -40,6 +40,7 @@ DEFAULT_CONFIG = { # My version __VERSION__ = '0.1.0' +MAX_SUPPORTED_VERSION = 1 class SigningError(Exception): @@ -48,18 +49,464 @@ class SigningError(Exception): self.errors = errors -class ValidationError(Exception): +class ConfigurationError(Exception): def __init__(self, message: str, errors: Optional[list] = None): super().__init__(message) self.errors = errors -class ConfigurationError(Exception): +class ValidationError(Exception): def __init__(self, message: str, errors: Optional[list] = None): super().__init__(message) self.errors = errors +class BodyValidationError(ValidationError): + def __init__(self, message: str, errors: Optional[list] = None): + super().__init__(message, errors) + + +class DevsigHeader: + def __init__(self, hval: Optional[bytes] = None): + self._headervals = list() + self._body_hash = None + # it doesn't need to be in any particular order, + # but that's just anarchy, anarchy, I say! + self._order = ['v', 'a', 't', 'l', 'i', 's', 'h', 'bh'] + self.hval = None + self.hdata = dict() + + if hval: + self.from_bytes(hval) + else: + self.hdata['v'] = b'1' + + def from_bytes(self, hval: bytes) -> None: + self.hval = DevsigHeader._dkim_canonicalize_header(hval) + hval = re.sub(rb'\s*', b'', hval) + for chunk in hval.split(b';'): + parts = chunk.split(b'=', 1) + if len(parts) < 2: + continue + self.set_field(parts[0].decode(), parts[1]) + + def get_field(self, field: str, decode: bool = False) -> Union[None, str, bytes]: + value = self.hdata.get(field) + if isinstance(value, bytes) and decode: + return value.decode() + return value + + def set_field(self, field: str, value: Union[None, str, bytes]) -> None: + if value is None: + del self.hdata[field] + return + if isinstance(value, str): + value = value.encode() + self.hdata[field] = value + + # do any git-mailinfo normalization prior to calling this + def set_body(self, body: bytes, maxlen: Optional[int] = None) -> None: + if maxlen: + if maxlen > len(body): + raise ValidationError('maxlen is larger than payload') + if maxlen < len(body): + body = body[:maxlen] + + self.hdata['l'] = bytes(len(body)) + + hashed = hashlib.sha256() + hashed.update(body) + self._body_hash = base64.b64encode(hashed.digest()) + + # do any git-mailinfo normalization prior to calling this + def set_headers(self, headers: list) -> None: + hfield = self.get_field('h') + if hfield: + # Make sure REQ_HEADERS are in this list + want_headers = [x.strip() for x in hfield.split(b':')] + for rqhdr in REQ_HDRS: + if rqhdr not in want_headers: + raise ValidationError('Signature is missing a required header %s' % rqhdr.decode()) + else: + want_headers = REQ_HDRS + + self._headervals = list() + for header in headers: + try: + left, right = header.split(b':', 1) + hname = left.strip().lower() + if hname not in want_headers: + continue + except ValueError: + continue + self._headervals.append(hname + b':' + DevsigHeader._dkim_canonicalize_header(right)) + + self.hdata['h'] = b':'.join(want_headers) + + def sanity_check(self) -> None: + if 'a' not in self.hdata: + raise RuntimeError('Must set "a" field first') + if not self._body_hash: + raise RuntimeError('Must use set_body first') + if not self._headervals: + raise RuntimeError('Must use set_headers first') + + def validate(self, keyinfo: Union[str, bytes, None]) -> str: + self.sanity_check() + # Check that we have a b= field + if not self.get_field('b'): + raise RuntimeError('Missing "b=" value') + pts = self.hval.rsplit(b'b=', 1) + dshdr = pts[0] + b'b=' + bdata = re.sub(rb'\s*', b'', pts[1]) + algo = self.get_field('a', decode=True) + if algo.startswith('ed25519'): + sdigest = DevsigHeader._validate_ed25519(bdata, keyinfo) + signtime = self.get_field('t', decode=True) + if not signtime: + raise ValidationError('t= field is required for ed25519 sigs') + elif algo.startswith('openpgp'): + sdigest, (good, valid, trusted, signtime) = DevsigHeader._validate_openpgp(bdata, keyinfo) + else: + raise ValidationError('Unknown algorithm: %s', algo) + + # Now we calculate our own digest + hashed = hashlib.sha256() + # Add in our _headervals first (they aready have CRLF endings) + hashed.update(b''.join(self._headervals)) + # and the devsig header now, without the trailing CRLF + hashed.update(DEVSIG_HDR.lower() + b':' + dshdr) + vdigest = hashed.digest() + if sdigest != vdigest: + raise ValidationError('Header validation failed') + # Now validate body hash + if self.get_field('bh') != self._body_hash: + raise BodyValidationError('Body content validation failed') + + return signtime + + def sign(self, keyinfo: bytes, split: bool = True) -> Tuple[bytes, bytes]: + self.sanity_check() + self.set_field('bh', self._body_hash) + algo = self.get_field('a', decode=True) + hparts = list() + for fn in self._order: + fv = self.get_field(fn) + if fv is not None: + hparts.append(b'%s=%s' % (fn.encode(), fv)) + + hparts.append(b'b=') + dshval = b'; '.join(hparts) + hashed = hashlib.sha256() + # Add in our _headervals first (they aready have CRLF endings) + hashed.update(b''.join(self._headervals)) + # and ourselves now, without the trailing CRLF + hashed.update(DEVSIG_HDR.lower() + b':' + dshval) + digest = hashed.digest() + + if algo.startswith('ed25519'): + bval = DevsigHeader._sign_ed25519(digest, keyinfo) + elif algo.startswith('openpgp'): + bval = DevsigHeader._sign_openpgp(digest, keyinfo) + else: + raise RuntimeError('Unknown a=%s' % algo) + + if split: + return DEVSIG_HDR, dshval + DevsigHeader.splitter(bval) + + return DEVSIG_HDR, dshval + bval + + @staticmethod + def _sign_ed25519(payload: bytes, privkey: bytes) -> bytes: + from nacl.signing import SigningKey + from nacl.encoding import Base64Encoder + + sk = SigningKey(privkey, encoder=Base64Encoder) + bdata = sk.sign(payload, encoder=Base64Encoder) + return bdata + + @staticmethod + def _validate_ed25519(sigdata: bytes, pubkey: bytes) -> bytes: + from nacl.signing import VerifyKey + from nacl.encoding import Base64Encoder + from nacl.exceptions import BadSignatureError + + vk = VerifyKey(pubkey, encoder=Base64Encoder) + try: + return vk.verify(sigdata, encoder=Base64Encoder) + except BadSignatureError: + raise ValidationError('Failed to validate signature') + + @staticmethod + def _sign_openpgp(payload: bytes, keyid: Optional[bytes]) -> bytes: + gpgargs = ['-s'] + if keyid: + gpgargs += ['-u', keyid] + ecode, out, err = gpg_run_command(gpgargs, payload) + if ecode > 0: + raise SigningError('Running gpg failed', errors=err.decode().split('\n')) + bdata = base64.b64encode(out) + return bdata + + @staticmethod + def _validate_openpgp(sigdata: bytes, pubkey: Optional[bytes]) -> Tuple[bytes, tuple]: + bsigdata = base64.b64decode(sigdata) + vrfyargs = ['--verify', '--output', '-', '--status-fd=2'] + if pubkey: + with tempfile.TemporaryFile(suffix='.patch-attest-poc') as temp_keyring: + keyringargs = ['--no-default-keyring', f'--keyring={temp_keyring}'] + gpgargs = keyringargs + ['--status-fd=1', '--import'] + ecode, out, err = gpg_run_command(gpgargs, stdin=pubkey) + # look for IMPORT_OK + if out.find(b'[GNUPG:] IMPORT_OK') < 0: + raise ValidationError('Could not import GnuPG public key') + gpgargs = keyringargs + vrfyargs + ecode, out, err = gpg_run_command(gpgargs, stdin=bsigdata) + + else: + logger.debug('Verifying using default keyring') + ecode, out, err = gpg_run_command(vrfyargs, stdin=bsigdata) + + if ecode > 0: + raise ValidationError('Failed to validate PGP signature') + + good, valid, trusted, signtime = DevsigHeader._check_gpg_status(err) + if good and valid: + return out, (good, valid, trusted, signtime) + + raise ValidationError('Failed to validate PGP signature') + + @staticmethod + def _check_gpg_status(status: bytes) -> Tuple[bool, bool, bool, str]: + good = False + valid = False + trusted = False + signtime = '' + + gs_matches = re.search(rb'^\[GNUPG:] GOODSIG ([0-9A-F]+)\s+(.*)$', status, flags=re.M) + if gs_matches: + good = True + vs_matches = re.search(rb'^\[GNUPG:] VALIDSIG ([0-9A-F]+) (\d{4}-\d{2}-\d{2}) (\d+)', status, flags=re.M) + if vs_matches: + valid = True + signtime = vs_matches.groups()[2].decode() + ts_matches = re.search(rb'^\[GNUPG:] TRUST_(FULLY|ULTIMATE)', status, flags=re.M) + if ts_matches: + trusted = True + + return good, valid, trusted, signtime + + @staticmethod + def splitter(longstr: bytes, limit: int = 78) -> bytes: + splitstr = list() + first = True + while len(longstr) > limit: + at = limit + if first: + first = False + at -= 2 + splitstr.append(longstr[:at]) + longstr = longstr[at:] + splitstr.append(longstr) + return b' '.join(splitstr) + + @staticmethod + def _dkim_canonicalize_header(hval: bytes) -> bytes: + # We only do relaxed for headers + # o Unfold all header field continuation lines as described in + # [RFC5322]; in particular, lines with terminators embedded in + # continued header field values (that is, CRLF sequences followed by + # WSP) MUST be interpreted without the CRLF. Implementations MUST + # NOT remove the CRLF at the end of the header field value. + hval = re.sub(rb'[\r\n]', b'', hval) + # o Convert all sequences of one or more WSP characters to a single SP + # character. WSP characters here include those before and after a + # line folding boundary. + hval = re.sub(rb'\s+', b' ', hval) + # o Delete all WSP characters at the end of each unfolded header field + # value. + # o Delete any WSP characters remaining before and after the colon + # separating the header field name from the header field value. The + # colon separator MUST be retained. + hval = hval.strip() + b'\r\n' + return hval + + +class PatattMessage: + def __init__(self, msgdata: bytes): + self.headers = list() + self.body = b'' + self.lf = b'\n' + self.signed = False + + self.canon_headers = None + self.canon_body = None + self.canon_identity = None + + self.sigs = None + + self.load_from_bytes(msgdata) + + def git_canonicalize(self): + if self.canon_body is not None: + return + + # Generate a new payload using m and p and canonicalize with \r\n endings, + # trimming any excess blank lines ("simple" DKIM canonicalization). + m, p, i = PatattMessage._get_git_mailinfo(b''.join(self.headers) + self.lf + self.body) + self.canon_body = b'' + for line in re.sub(rb'[\r\n]*$', b'', m + p).split(b'\n'): + self.canon_body += re.sub(rb'[\r\n]*$', b'', line) + b'\r\n' + + idata = dict() + for line in re.sub(rb'[\r\n]*$', b'', i).split(b'\n'): + left, right = line.split(b':', 1) + idata[left.lower()] = right.strip() + + # Theoretically, we should always see an "Email" line + self.canon_identity = idata.get(b'email', b'').decode() + + # Now substituting headers returned by mailinfo + self.canon_headers = list() + for header in self.headers: + try: + left, right = header.split(b':', 1) + lleft = left.lower() + if lleft == b'from': + right = b' ' + idata.get(b'author', b'') + b' <' + idata.get(b'email', b'') + b'>' + elif lleft == b'subject': + right = b' ' + idata.get(b'subject', b'') + self.canon_headers.append(left + b':' + right) + except ValueError: + self.canon_headers.append(header) + + def sign(self, algo: str, keyinfo: Union[str, bytes], identity: Optional[str], selector: Optional[str]) -> None: + self.git_canonicalize() + ds = DevsigHeader() + ds.set_headers(self.canon_headers) + ds.set_body(self.canon_body) + ds.set_field('l', str(len(self.body))) + if identity and identity != self.canon_identity: + ds.set_field('i', identity) + if selector: + ds.set_field('s', selector) + + if algo not in ('ed25519', 'openpgp'): + raise SigningError('Unsupported algorithm: %s' % algo) + + ds.set_field('a', '%s-sha256' % algo) + if algo == 'ed25519': + # Set signing time for ed25519 sigs + ds.set_field('t', str(int(time.time()))) + hn, hv = ds.sign(keyinfo) + + hhdr = email.header.make_header([(hn + b': ' + hv, 'us-ascii')], maxlinelen=78) + self.headers.append(hhdr.encode().encode() + self.lf) + + def validate(self, identity: str, pkey: Union[bytes, str, None], trim_body: bool = False) -> str: + vds = None + for ds in self.sigs: + if ds.get_field('i', decode=True) == identity: + vds = ds + break + if vds is None: + raise ValidationError('No signatures matching identity %s' % identity) + + self.git_canonicalize() + vds.set_headers(self.canon_headers) + + if trim_body: + lfield = vds.get_field('l') + if lfield: + try: + maxlen = int(lfield) + vds.set_body(self.canon_body, maxlen=maxlen) + except ValueError: + vds.set_body(self.canon_body) + else: + vds.set_body(self.canon_body) + + return vds.validate(pkey) + + def as_bytes(self): + return b''.join(self.headers) + self.lf + self.body + + def as_string(self, encoding='utf-8'): + return self.as_bytes().decode(encoding) + + def load_from_bytes(self, msgdata: bytes) -> None: + # We use simplest parsing -- using Python's email module would be overkill + ldshn = DEVSIG_HDR.lower() + with BytesIO(msgdata) as fh: + while True: + line = fh.readline() + if not len(line): + break + + if not len(line.strip()): + self.lf = line + self.body = fh.read() + break + + # is it a wrapped header? + if line[0] in ("\x09", "\x20", 0x09, 0x20): + if not len(self.headers): + raise RuntimeError('Not a valid RFC2822 message') + # attach it to the previous header + self.headers[-1] += line + continue + # Is it a signature header? + if line.lower().startswith(ldshn): + self.signed = True + self.headers.append(line) + + def get_sigs(self) -> list: + if self.sigs is not None: + return self.sigs + + ldshn = DEVSIG_HDR.lower() + self.sigs = list() + from_id = None + + for header in self.headers: + try: + left, right = header.split(b':', 1) + hn = left.strip().lower() + hv = right + if hn == ldshn: + self.sigs.append(DevsigHeader(hv)) + elif hn == b'from': + parts = email.utils.parseaddr(hv.decode().strip()) + from_id = parts[1] + except ValueError: + raise RuntimeError('Error parsing headers') + + if from_id: + for ds in self.sigs: + if 'i' not in ds.hdata: + ds.set_field('i', from_id) + + return self.sigs + + @staticmethod + def _get_git_mailinfo(payload: bytes) -> Tuple[bytes, bytes, bytes]: + with tempfile.TemporaryDirectory(suffix='.git-mailinfo') as td: + mf = os.path.join(td, 'm') + pf = os.path.join(td, 'p') + cmdargs = ['git', 'mailinfo', '--encoding=utf-8', mf, pf] + ecode, i, err = _run_command(cmdargs, stdin=payload) + if ecode > 0: + logger.debug('FAILED : Failed running git-mailinfo:') + logger.debug(err.decode()) + raise RuntimeError('Failed to run git-mailinfo: %s' % err.decode()) + + with open(mf, 'rb') as mfh: + m = mfh.read() + with open(pf, 'rb') as pfh: + p = pfh.read() + return m, p, i + + def get_data_dir(): if 'XDG_DATA_HOME' in os.environ: datahome = os.environ['XDG_DATA_HOME'] @@ -139,140 +586,6 @@ def gpg_run_command(cmdargs: list, stdin: bytes = None) -> Tuple[int, bytes, byt return _run_command(cmdargs, stdin) -def check_gpg_status(status: bytes) -> Tuple[bool, bool, bool, str]: - good = False - valid = False - trusted = False - signtime = '' - - gs_matches = re.search(rb'^\[GNUPG:] GOODSIG ([0-9A-F]+)\s+(.*)$', status, flags=re.M) - if gs_matches: - good = True - vs_matches = re.search(rb'^\[GNUPG:] VALIDSIG ([0-9A-F]+) (\d{4}-\d{2}-\d{2}) (\d+)', status, flags=re.M) - if vs_matches: - valid = True - signtime = vs_matches.groups()[2].decode() - ts_matches = re.search(rb'^\[GNUPG:] TRUST_(FULLY|ULTIMATE)', status, flags=re.M) - if ts_matches: - trusted = True - - return good, valid, trusted, signtime - - -def get_git_mailinfo(payload: bytes) -> Tuple[bytes, bytes, bytes]: - with tempfile.TemporaryDirectory(suffix='.git-mailinfo') as td: - mf = os.path.join(td, 'm') - pf = os.path.join(td, 'p') - cmdargs = ['git', 'mailinfo', '--encoding=utf-8', mf, pf] - ecode, out, err = _run_command(cmdargs, stdin=payload) - if ecode > 0: - logger.debug('FAILED : Failed running git-mailinfo:') - logger.debug(err.decode()) - raise RuntimeError('Failed to run git-mailinfo: %s' % err.decode()) - - with open(mf, 'rb') as mfh: - m = mfh.read() - with open(pf, 'rb') as pfh: - p = pfh.read() - return m, p, out - - -def is_signed(headers: list): - for header in headers: - try: - left, right = header.split(b':', 1) - if left.strip().lower() == DEVSIG_HDR.lower(): - return True - except ValueError: - continue - - return False - - -def parse_message(msgdata: bytes) -> Tuple[list, bytes]: - # We use simplest parsing -- using Python's email module would be overkill - headers = list() - with BytesIO(msgdata) as fh: - while True: - line = fh.readline() - if not len(line): - break - - if not len(line.strip()): - # Keep extra LF in headers so we don't have to track LF/CRLF endings - headers.append(line) - payload = fh.read() - break - - # is it a wrapped header? - if line[0] in ("\x09", "\x20", 0x09, 0x20): - if not len(headers): - raise RuntimeError('Not a valid RFC2822 message') - # attach it to the previous header - headers[-1] += line - continue - headers.append(line) - - return headers, payload - - -def get_mailinfo_message(oheaders: list, opayload: bytes, want_hdrs: list, - maxlen: Optional[int]) -> Tuple[list, bytes, str]: - # We pre-canonicalize using git mailinfo - # use whatever lf is used in the headers - origmsg = b''.join(oheaders) + opayload - m, p, i = get_git_mailinfo(origmsg) - # Generate a new payload using m and p and canonicalize with \r\n endings, - # trimming any excess blank lines ("simple" DKIM canonicalization). - cpayload = b'' - for line in re.sub(rb'[\r\n]*$', b'', m + p).split(b'\n'): - cpayload += re.sub(rb'[\r\n]*$', b'', line) + b'\r\n' - - if maxlen: - logger.debug('Limiting payload length to %d bytes', maxlen) - cpayload = cpayload[:maxlen] - - idata = dict() - for line in re.sub(rb'[\r\n]*$', b'', i).split(b'\n'): - left, right = line.split(b':', 1) - idata[left.lower()] = right.strip() - - # Theoretically, we should always see an "Email" line - identity = idata.get(b'email', b'').decode() - - # Now substituting headers returned by mailinfo - cheaders = list() - for oheader in oheaders: - try: - left, right = oheader.split(b':', 1) - lleft = left.lower() - if lleft not in want_hdrs: - continue - if lleft == b'from': - right = b' ' + idata.get(b'author', b'') + b' <' + idata.get(b'email', b'') + b'>' - elif lleft == b'subject': - right = b' ' + idata.get(b'subject', b'') - cheaders.append(left + b':' + right) - except ValueError: - cheaders.append(oheader) - - return cheaders, cpayload, identity - - -def splitter(longstr: bytes, limit: int = 78) -> bytes: - splitstr = list() - first = True - while len(longstr) > limit: - at = limit - if first: - first = False - at -= 2 - splitstr.append(longstr[:at]) - longstr = longstr[at:] - splitstr.append(longstr) - return b' '.join(splitstr) - - def get_git_toplevel(gitdir: str = None) -> str: cmdargs = ['git'] if gitdir: @@ -284,38 +597,6 @@ def get_git_toplevel(gitdir: str = None) -> str: return '' -def get_parts_from_header(hval: bytes) -> dict: - hval = re.sub(rb'\s*', b'', hval) - hdata = dict() - for chunk in hval.split(b';'): - parts = chunk.split(b'=', 1) - if len(parts) < 2: - continue - hdata[parts[0].decode()] = parts[1] - return hdata - - -def dkim_canonicalize_header(hval: bytes) -> bytes: - # We only do relaxed for headers - # o Unfold all header field continuation lines as described in - # [RFC5322]; in particular, lines with terminators embedded in - # continued header field values (that is, CRLF sequences followed by - # WSP) MUST be interpreted without the CRLF. Implementations MUST - # NOT remove the CRLF at the end of the header field value. - hval = re.sub(rb'[\r\n]', b'', hval) - # o Convert all sequences of one or more WSP characters to a single SP - # character. WSP characters here include those before and after a - # line folding boundary. - hval = re.sub(rb'\s+', b' ', hval) - # o Delete all WSP characters at the end of each unfolded header field - # value. - # o Delete any WSP characters remaining before and after the colon - # separating the header field name from the header field value. The - # colon separator MUST be retained. - hval = hval.strip() + b'\r\n' - return hval - - def make_pkey_path(keytype: str, identity: str, selector: str) -> str: chunks = identity.split('@', 1) if len(chunks) != 2: @@ -383,188 +664,6 @@ def get_public_key(source: str, keytype: str, identity: str, selector: str) -> T raise KeyError('Could not find %s' % fullpath) -def make_devsig_header(headers: list, payload: bytes, algo: str, signtime: Optional[str] = None, - identity: Optional[str] = None, selector: Optional[str] = None, maxlen: Optional[int] = None, - want_hdrs: Optional[list] = None, strict: bool = False) -> Tuple[bytes, bytes]: - if not want_hdrs: - want_hdrs = REQ_HDRS - cheaders, cpayload, cidentity = get_mailinfo_message(headers, payload, want_hdrs, maxlen) - hashed = hashlib.sha256() - hashed.update(cpayload) - bh = base64.b64encode(hashed.digest()) - - hparts = [ - b'v=1', - b'a=%s-sha256' % algo.encode(), - ] - if (identity and strict) or (not strict and identity != cidentity): - hparts.append(b'i=%s' % identity.encode()) - - if selector: - hparts.append(b's=%s' % selector.encode()) - if signtime: - hparts.append(b't=%s' % signtime.encode()) - - hparts.append(b'h=%s' % b':'.join(want_hdrs)) - hparts.append(b'l=%d' % len(cpayload)) - hparts.append(b'bh=%s' % bh) - hparts.append(b'b=') - dshval = b'; '.join(hparts) - - hashed = hashlib.sha256() - for cheader in cheaders: - try: - left, right = cheader.split(b':', 1) - hname = left.strip().lower() - if hname not in want_hdrs: - continue - except ValueError: - continue - - hashed.update(hname + b':' + dkim_canonicalize_header(right)) - hashed.update(DEVSIG_HDR.lower() + b':' + dshval) - dshdr = DEVSIG_HDR + b': ' + dshval - - return dshdr, hashed.digest() - - -def get_devsig_header_info(headers) -> Tuple[Optional[str], str, str, str, list, dict]: - from_hdr = None - hdata = None - need_hdrs = [b'from', DEVSIG_HDR.lower()] - for header in headers: - try: - left, right = header.split(b':', 1) - hname = left.strip().lower() - # We want a "from" header and a DEVSIG_HDR - if hname not in need_hdrs: - continue - if hname == b'from': - from_hdr = right - continue - hval = dkim_canonicalize_header(right) - hdata = get_parts_from_header(hval) - except ValueError: - continue - - if hdata is None: - raise ValidationError('No "%s:" header in message' % DEVSIG_HDR.decode()) - - # make sure the required headers are in the sig - if 'h' not in hdata: - raise ValidationError('h= is required but is not present in %s' % DEVSIG_HDR.decode()) - - signed_hdrs = [x.strip() for x in hdata['h'].split(b':')] - for rhdr in REQ_HDRS: - if rhdr not in signed_hdrs: - raise ValidationError('%s is a required header' % rhdr.decode()) - - if 'i' not in hdata: - # Use the identity from the from header - if not from_hdr: - raise ValidationError('No i= in %s, and no From: header!' % DEVSIG_HDR.decode()) - parts = email.utils.parseaddr(from_hdr.decode()) - identity = parts[1] - else: - identity = hdata['i'].decode() - - if 'a' in hdata: - apart = hdata['a'].decode() - if apart.startswith('ed25519'): - algo = 'ed25519' - elif apart.startswith('openpgp'): - algo = 'openpgp' - else: - raise ValidationError('Unsupported a= in %s: %s' % (DEVSIG_HDR.decode(), apart)) - else: - # Default is ed25519-sha256 - algo = 'ed25519' - - if 's' in hdata: - selector = hdata['s'].decode() - else: - selector = 'default' - - if 't' in hdata: - signtime = hdata['t'].decode() - else: - signtime = None - - return signtime, identity, selector, algo, signed_hdrs, hdata - - -def sign_ed25519(headers: list, payload: bytes, keydata: str, - identity: Optional[str] = None, selector: Optional[str] = None) -> email.header.Header: - from nacl.signing import SigningKey - from nacl.encoding import Base64Encoder - - logger.debug('SIGNING : ED25519') - signtime = str(int(time.time())) - dshdr, digest = make_devsig_header(headers, payload, algo='ed25519', signtime=signtime, - identity=identity, selector=selector) - sk = SigningKey(keydata, encoder=Base64Encoder) - bdata = sk.sign(digest, encoder=Base64Encoder) - hhdr = email.header.make_header([(dshdr + splitter(bdata), 'us-ascii')], maxlinelen=78) - return hhdr - - -def validate_ed25519(sigdata: bytes, pubkey: bytes) -> bytes: - from nacl.signing import VerifyKey - from nacl.encoding import Base64Encoder - from nacl.exceptions import BadSignatureError - - vk = VerifyKey(pubkey, encoder=Base64Encoder) - try: - return vk.verify(sigdata, encoder=Base64Encoder) - except BadSignatureError: - raise ValidationError('Failed to validate signature') - - -def sign_openpgp(headers: list, payload: bytes, keyid: Optional[str], - identity: Optional[str] = None, selector: Optional[str] = None) -> email.header.Header: - logger.debug('SIGNING : OpenPGP') - # OpenPGP header includes signing time, so we don't need to include t= - dshdr, digest = make_devsig_header(headers, payload, algo='openpgp', identity=identity, selector=selector) - gpgargs = ['-s'] - if keyid: - gpgargs += ['-u', keyid] - ecode, out, err = gpg_run_command(gpgargs, digest) - if ecode > 0: - raise SigningError('Running gpg failed', errors=err.decode().split('\n')) - - bdata = base64.b64encode(out) - hhdr = email.header.make_header([(dshdr + splitter(bdata), 'us-ascii')], maxlinelen=78) - return hhdr - - -def validate_openpgp(sigdata: bytes, pubkey: Optional[bytes]) -> Tuple[bytes, tuple]: - bsigdata = base64.b64decode(sigdata) - vrfyargs = ['--verify', '--output', '-', '--status-fd=2'] - if pubkey: - with tempfile.TemporaryFile(suffix='.patch-attest-poc') as temp_keyring: - keyringargs = ['--no-default-keyring', f'--keyring={temp_keyring}'] - gpgargs = keyringargs + ['--status-fd=1', '--import'] - ecode, out, err = gpg_run_command(gpgargs, stdin=pubkey) - # look for IMPORT_OK - if out.find(b'[GNUPG:] IMPORT_OK') < 0: - raise ValidationError('Could not import GnuPG public key') - gpgargs = keyringargs + vrfyargs - ecode, out, err = gpg_run_command(gpgargs, stdin=bsigdata) - - else: - logger.debug('Verifying using default keyring') - ecode, out, err = gpg_run_command(vrfyargs, stdin=bsigdata) - - if ecode > 0: - raise ValidationError('Failed to validate PGP signature') - - good, valid, trusted, signtime = check_gpg_status(err) - if good and valid: - return out, (good, valid, trusted, signtime) - - raise ValidationError('Failed to validate PGP signature') - - def _load_messages(cmdargs) -> dict: import sys if not sys.stdin.isatty(): @@ -582,6 +681,13 @@ def _load_messages(cmdargs) -> dict: return messages +def sign_message(msgdata: bytes, algo: str, keyinfo: Union[str, bytes], + identity: Optional[str], selector: Optional[str]) -> bytes: + pm = PatattMessage(msgdata) + pm.sign(algo, keyinfo, identity=identity, selector=selector) + return pm.as_bytes() + + def cmd_sign(cmdargs, config: dict) -> None: # Do we have the signingkey defined? usercfg = get_config_from_git(r'user\..*') @@ -602,7 +708,7 @@ def cmd_sign(cmdargs, config: dict) -> None: sk = config.get('signingkey') if sk.startswith('ed25519:'): - _sign_func = sign_ed25519 + algo = 'ed25519' identifier = sk[8:] keysrc = None if identifier.startswith('/') and os.path.exists(identifier): @@ -630,84 +736,84 @@ def cmd_sign(cmdargs, config: dict) -> None: keydata = fh.read() elif sk.startswith('openpgp:'): - _sign_func = sign_openpgp + algo = 'openpgp' keydata = sk[8:] else: logger.critical('Unknown key type: %s', sk) sys.exit(1) - for filename, msgdata in messages.items(): - headers, payload = parse_message(msgdata) - if is_signed(headers): - logger.critical('Already signed: %s', filename) + for fn, msgdata in messages.items(): + pm = PatattMessage(msgdata) + if pm.signed: + logger.critical('Already signed: %s', fn) continue try: - hhdr = _sign_func(headers, payload, keydata, identity=config.get('identity', ''), - selector=config.get('selector', '')) + pm.sign(algo, keydata, identity=config.get('identity'), selector=config.get('selector')) + logger.debug('--- SIGNED MESSAGE STARTS ---') + logger.debug(pm.as_string()) + if fn == '-': + sys.stdout.buffer.write(pm.as_bytes()) + else: + with open(fn, 'wb') as fh: + fh.write(pm.as_bytes()) + + logger.info('Signed: %s', fn) + except SigningError as ex: logger.critical('ERROR: %s', ex) sys.exit(1) - dshdr = hhdr.encode().encode() - # insert it before the blank line - lf = headers.pop(-1) - headers.append(dshdr + lf) - headers.append(lf) - payload = b''.join(headers) + payload - logger.debug('--- SIGNED MESSAGE STARTS ---') - logger.debug(payload) - if filename == '-': - sys.stdout.buffer.write(payload) + +def validate_message(msgdata: bytes, sources: list) -> list: + errors = list() + goodsigs = list() + success = False + pm = PatattMessage(msgdata) + if not pm.signed: + errors.append('message is not signed') + raise ValidationError('message is not signed', errors) + + # Find all identities for which we have public keys + for ds in pm.get_sigs(): + a = ds.get_field('a', decode=True) + i = ds.get_field('i', decode=True) + s = ds.get_field('s', decode=True) + if not s: + s = 'default' + if a.startswith('ed25519'): + algo = 'ed25519' + elif a.startswith('openpgp'): + algo = 'openpgp' else: - with open(filename, 'wb') as fh: - fh.write(payload) + errors.append('%s/%s Unknown algorigthm: %s' % (i, s, a)) + continue - logger.info('Signed: %s', filename) + pkey = keysrc = None + for source in sources: + try: + pkey, keysrc = get_public_key(source, algo, i, s) + break + except KeyError: + pass + if not pkey and algo == 'ed25519': + errors.append('%s/%s no matching ed25519 key found' % (i, s)) + continue -def validate_message(msgdata: bytes, sources: list): - headers, payload = parse_message(msgdata) + try: + signtime = pm.validate(i, pkey) + success = True + except ValidationError: + errors.append('%s/%s failed to validate using a=%s, pkey=%s' % (i, s, a, keysrc)) + continue - if not is_signed(headers): - raise ValidationError('message is not signed') + goodsigs.append((i, signtime, keysrc, algo)) - signtime, identity, selector, algo, signed_hdrs, hdata = get_devsig_header_info(headers) + if not success: + raise ValidationError('Failed to validate message', errors) - pkey = None - keysrc = None - for source in sources: - try: - pkey, keysrc = get_public_key(source, algo, identity, selector) - break - except KeyError: - pass - - if not pkey and algo == 'ed25519': - raise ValidationError('no %s public key for %s/%s' % (algo, identity, selector)) - - sdigest = None - if algo == 'ed25519': - sdigest = validate_ed25519(hdata['b'], pkey) - # signtime is required for ed25519 signatures - signtime = hdata.get('t', b'').decode() - if not signtime: - raise ValidationError('signature does not include t= signing time') - elif algo == 'openpgp': - sdigest, signtime = validate_openpgp(hdata['b'], pkey) - - if not sdigest: - raise ValidationError('faled to verify %s signature for %s/%s' % (algo, identity, selector)) - - # Now calculate our own digest and compare - dshdr, digest = make_devsig_header(headers, payload, algo, signtime=hdata.get('t', b'').decode(), - identity=hdata.get('i', b'').decode(), - selector=hdata.get('s', b'').decode(), want_hdrs=signed_hdrs, - strict=True) - if sdigest == digest: - return signtime, identity, selector, algo, keysrc - - raise ValidationError('failed to verify message content') + return goodsigs def cmd_validate(cmdargs, config: dict): @@ -720,13 +826,15 @@ def cmd_validate(cmdargs, config: dict): for filename, msgdata in messages.items(): try: - signtime, identity, selector, algo, pkey = validate_message(msgdata, sources) + goodsigs = validate_message(msgdata, sources) logger.critical('PASS: %s', os.path.basename(filename)) - logger.info(' by : %s (%s)', identity, algo) - if pkey: - logger.info(' key: %s', pkey) - else: - logger.info(' key: in default GnuPG keyring') + for identity, signtime, keysrc, algo in goodsigs: + logger.info(' by : %s (%s)', identity, algo) + if keysrc: + logger.info(' key: %s', keysrc) + else: + logger.info(' key: default keyring') + except ValidationError as ex: logger.critical('FAIL: %s', os.path.basename(filename)) logger.critical(' err: %s', ex) |