All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH 0/4] b4: Initial DKIM attestation implementation
@ 2020-11-20 21:27 Konstantin Ryabitsev
  2020-11-20 21:27 ` [PATCH 1/4] Add initial support for DKIM attestation Konstantin Ryabitsev
                   ` (3 more replies)
  0 siblings, 4 replies; 5+ messages in thread
From: Konstantin Ryabitsev @ 2020-11-20 21:27 UTC (permalink / raw)
  To: signatures; +Cc: Konstantin Ryabitsev

This implements DKIM attestation and fixes pgp-mode in-header
attestation to work again.

Konstantin Ryabitsev (4):
  Add initial support for DKIM attestation
  Add very simple dkim key caching
  Fix signature verification for b4 pr
  Fix in-header attestation code

 b4/__init__.py   | 523 ++++++++++++++++++++++++++++++++++-------------
 b4/attest.py     |  14 +-
 b4/pr.py         |  14 +-
 requirements.txt |   8 +-
 4 files changed, 406 insertions(+), 153 deletions(-)

-- 
2.26.2



^ permalink raw reply	[flat|nested] 5+ messages in thread

* [PATCH 1/4] Add initial support for DKIM attestation
  2020-11-20 21:27 [PATCH 0/4] b4: Initial DKIM attestation implementation Konstantin Ryabitsev
@ 2020-11-20 21:27 ` Konstantin Ryabitsev
  2020-11-20 21:27 ` [PATCH 2/4] Add very simple dkim key caching Konstantin Ryabitsev
                   ` (2 subsequent siblings)
  3 siblings, 0 replies; 5+ messages in thread
From: Konstantin Ryabitsev @ 2020-11-20 21:27 UTC (permalink / raw)
  To: signatures; +Cc: Konstantin Ryabitsev

Now that vger is doing a much better job preserving DKIM signatures, it
makes sense to teach b4 to check those. It's still failing for most
mailman lists, but those are fewer than vger sources.

Signed-off-by: Konstantin Ryabitsev <konstantin@linuxfoundation.org>
---
 b4/__init__.py   | 468 ++++++++++++++++++++++++++++++++++-------------
 requirements.txt |   8 +-
 2 files changed, 352 insertions(+), 124 deletions(-)

diff --git a/b4/__init__.py b/b4/__init__.py
index a0fa086..c552a5f 100644
--- a/b4/__init__.py
+++ b/b4/__init__.py
@@ -29,8 +29,16 @@ from email import charset
 charset.add_charset('utf-8', None)
 emlpolicy = email.policy.EmailPolicy(utf8=True, cte_type='8bit', max_line_length=None)
 
+try:
+    import dns.resolver
+    import dkim
+
+    can_dkim_verify = True
+    _resolver = dns.resolver.get_default_resolver()
+except ModuleNotFoundError:
+    can_dkim_verify = False
+
 __VERSION__ = '0.6.0-dev'
-ATTESTATION_FORMAT_VER = '0.1'
 
 logger = logging.getLogger('b4')
 
@@ -38,9 +46,14 @@ HUNK_RE = re.compile(r'^@@ -\d+(?:,(\d+))? \+\d+(?:,(\d+))? @@')
 FILENAME_RE = re.compile(r'^(---|\+\+\+) (\S+)')
 
 PASS_SIMPLE = '[P]'
+WEAK_SIMPLE = '[D]'
 FAIL_SIMPLE = '[F]'
-PASS_FANCY = '[\033[32m\u2713\033[0m]'
-FAIL_FANCY = '[\033[31m\u2717\033[0m]'
+PASS_FANCY = '\033[32m\u2714\033[0m'
+WEAK_FANCY = '\033[33m\u2713\033[0m'
+FAIL_FANCY = '\033[31m\u2717\033[0m'
+
+HDR_PATCH_HASHES = 'X-Patch-Hashes'
+HDR_PATCH_SIG = 'X-Patch-Sig'
 
 # You can use bash-style globbing here
 WANTHDRS = [
@@ -480,22 +493,17 @@ class LoreSeries:
                 logger.critical('WARNING: Unable to add your Signed-off-by: git returned no user.name or user.email')
                 addmysob = False
 
-        attdata = [None] * self.expected
+        attdata = [(None, None)] * self.expected
         attpolicy = config['attestation-policy']
-        try:
-            attstaled = int(config['attestation-staleness-days'])
-        except ValueError:
-            attstaled = 30
-        # exact_from_match = False
-        # if config['attestation-uid-match'] == 'strict':
-        #     exact_from_match = True
 
         if config['attestation-checkmarks'] == 'fancy':
             attpass = PASS_FANCY
             attfail = FAIL_FANCY
+            attweak = WEAK_FANCY
         else:
             attpass = PASS_SIMPLE
             attfail = FAIL_SIMPLE
+            attweak = WEAK_SIMPLE
 
         at = 1
         atterrors = list()
@@ -508,6 +516,7 @@ class LoreSeries:
                 if lmsg is None:
                     logger.critical('CRITICAL: [%s/%s] is missing, cannot cherrypick', at, self.expected)
                     raise KeyError('Cherrypick not in series')
+
             if lmsg is not None:
                 if self.has_cover and covertrailers and self.patches[0].followup_trailers:
                     lmsg.followup_trailers += self.patches[0].followup_trailers
@@ -521,17 +530,12 @@ class LoreSeries:
                     lmsg.load_hashes()
                     latt = lmsg.attestation
                     if latt and latt.validate(lmsg.msg):
-                        # Make sure it's not too old compared to the message date
-                        # Timezone doesn't matter as we calculate whole days
-                        tdelta = lmsg.date.replace(tzinfo=None) - latt.lsig.sigdate
-                        if tdelta.days > attstaled:
-                            # Uh-oh, attestation is too old!
-                            logger.info('  %s %s', attfail, lmsg.full_subject)
-                            atterrors.append('Attestation for %s/%s is over %sd old: %sd' % (at, lmsg.expected,
-                                                                                             attstaled, tdelta.days))
+                        if latt.lsig.attestor and latt.lsig.attestor.mode == 'domain':
+                            logger.info('  %s %s', attweak, lmsg.full_subject)
+                            attdata[at-1] = (latt.lsig.attestor.get_trailer(lmsg.fromemail), attweak) # noqa
                         else:
                             logger.info('  %s %s', attpass, lmsg.full_subject)
-                            attdata[at - 1] = latt.lsig.attestor.get_trailer(lmsg.fromemail)
+                            attdata[at-1] = (latt.lsig.attestor.get_trailer(lmsg.fromemail), attpass) # noqa
                     else:
                         if attpolicy in ('softfail', 'hardfail'):
                             logger.info('  %s %s', attfail, lmsg.full_subject)
@@ -561,12 +565,16 @@ class LoreSeries:
 
         if attpolicy == 'off':
             return mbx
-        failed = None in attdata
+
+        failed = (None, None) in attdata
         if not failed:
             logger.info('  ---')
-            for trailer in set(attdata):
-                logger.info('  %s %s', attpass, trailer)
+            for trailer, attmode in set(attdata):
+                logger.info('  %s %s', attmode, trailer)
             return mbx
+        elif not can_dkim_verify:
+            logger.info('  ---')
+            logger.info('  NOTE: install dkimpy for DKIM signature attestation.')
 
         errors = set(atterrors)
         for attdoc in ATTESTATIONS:
@@ -1452,6 +1460,27 @@ class LoreAttestor:
         self.keyid = keyid
         self.uids = list()
 
+    def __repr__(self):
+        out = list()
+        out.append('  keyid: %s' % self.keyid)
+        for uid in self.uids:
+            out.append('    uid: %s <%s>' % uid)
+        return '\n'.join(out)
+
+
+class LoreAttestorDKIM(LoreAttestor):
+    def __init__(self, keyid):
+        self.mode = 'domain'
+        super().__init__(keyid)
+
+    def get_trailer(self, fromaddr): # noqa
+        return 'Attestation-by: DKIM/%s (From: %s)' % (self.keyid, fromaddr)
+
+
+class LoreAttestorPGP(LoreAttestor):
+    def __init__(self, keyid):
+        super().__init__(keyid)
+        self.mode = 'person'
         self.load_subkey_uids()
 
     def load_subkey_uids(self):
@@ -1497,16 +1526,11 @@ class LoreAttestor:
 
         return 'Attestation-by: %s <%s> (pgp: %s)' % (uid[0], uid[1], self.keyid)
 
-    def __repr__(self):
-        out = list()
-        out.append('  keyid: %s' % self.keyid)
-        for uid in self.uids:
-            out.append('    uid: %s <%s>' % uid)
-        return '\n'.join(out)
-
 
 class LoreAttestationSignature:
-    def __init__(self, output, trustmodel):
+    def __init__(self, msg):
+        self.msg = msg
+        self.mode = None
         self.good = False
         self.valid = False
         self.trusted = False
@@ -1515,19 +1539,232 @@ class LoreAttestationSignature:
         self.attestor = None
         self.errors = set()
 
+        config = get_main_config()
+        try:
+            driftd = int(config['attestation-staleness-days'])
+        except ValueError:
+            driftd = 30
+
+        self.maxdrift = datetime.timedelta(days=driftd)
+
+    def verify_time_drift(self) -> None:
+        msgdt = email.utils.parsedate_to_datetime(str(self.msg['Date']))
+        sdrift = self.sigdate - msgdt
+        if sdrift > self.maxdrift:
+            self.passing = False
+            self.errors.add('Time drift between Date and t too great (%s)' % sdrift)
+            return
+        logger.debug('PASS : time drift between Date and t (%s)', sdrift)
+
+    def verify_identity_domain(self, identity: str, domain: str):
+        # Domain is supposed to be present in identity
+        if not identity.endswith(domain):
+            logger.debug('domain (d=%s) is not in identity (i=%s)', domain, identity)
+            self.passing = False
+            return
+        fromeml = email.utils.getaddresses(self.msg.get_all('from', []))[0][1]
+        if identity.find('@') < 0:
+            logger.debug('identity must contain @ (i=%s)', identity)
+            self.passing = False
+            return
+        ilocal, idomain = identity.split('@')
+        # identity is supposed to be present in from
+        if not fromeml.endswith(f'@{idomain}'):
+            self.errors.add('identity (i=%s) does not match from (from=%s)' % (identity, fromeml))
+            self.passing = False
+            return
+        logger.debug('identity and domain match From header')
+
+    # @staticmethod
+    # def get_dkim_key(domain: str, selector: str, timeout: int = 5) -> Tuple[str, str]:
+    #     global DNSCACHE
+    #     if (domain, selector) in DNSCACHE:
+    #         return DNSCACHE[(domain, selector)]
+    #
+    #     name = f'{selector}._domainkey.{domain}.'
+    #     logger.debug('DNS-lookup: %s', name)
+    #     keydata = None
+    #     try:
+    #         a = dns.resolver.resolve(name, dns.rdatatype.TXT, raise_on_no_answer=False, lifetime=timeout) # noqa
+    #         # Find v=DKIM1
+    #         for r in a.response.answer:
+    #             if r.rdtype == dns.rdatatype.TXT:
+    #                 for item in r.items:
+    #                     # Concatenate all strings
+    #                     txtdata = b''.join(item.strings)
+    #                     if txtdata.find(b'v=DKIM1') >= 0:
+    #                         keydata = txtdata.decode()
+    #                         break
+    #             if keydata:
+    #                 break
+    #     except dns.resolver.NXDOMAIN: # noqa
+    #         raise LookupError('Domain %s does not exist', name)
+    #
+    #     if not keydata:
+    #         raise LookupError('Domain %s does not contain a DKIM record', name)
+    #
+    #     parts = get_parts_from_header(keydata)
+    #     if 'p' not in parts:
+    #         raise LookupError('Domain %s does not contain a DKIM key', name)
+    #     if 'k' not in parts:
+    #         raise LookupError('Domain %s does not indicate key time', name)
+    #
+    #     DNSCACHE[(domain, selector)] = (parts['k'], parts['p'])
+    #     logger.debug('k=%s, p=%s', parts['k'], parts['p'])
+    #     return parts['k'], parts['p']
+
+    def __repr__(self):
+        out = list()
+        out.append('  mode: %s' % self.mode)
+        out.append('  good: %s' % self.good)
+        out.append('  valid: %s' % self.valid)
+        out.append('  trusted: %s' % self.trusted)
+        if self.attestor is not None:
+            out.append('  attestor: %s' % self.attestor.keyid)
+
+        out.append('  --- validation errors ---')
+        for error in self.errors:
+            out.append('  | %s' % error)
+        return '\n'.join(out)
+
+
+class LoreAttestationSignatureDKIM(LoreAttestationSignature):
+    def __init__(self, msg):
+        super().__init__(msg)
+        self.mode = 'dkim'
+        # Doesn't quite work right, so just use dkimpy's native
+        # self.native_verify()
+        # return
+
+        if not dkim.verify(self.msg.as_bytes(), dnsfunc=dkim_get_txt):
+            return
+        self.good = True
+
+        # Grab toplevel signature that we just verified
+        dks = self.msg.get('dkim-signature')
+        ddata = get_parts_from_header(dks)
+        self.attestor = LoreAttestorDKIM(ddata['d'])
+        self.valid = True
+        self.trusted = True
+        self.passing = True
+
+        if ddata.get('t'):
+            self.sigdate = datetime.datetime.utcfromtimestamp(int(ddata['t'])).replace(tzinfo=datetime.timezone.utc)
+        else:
+            self.sigdate = email.utils.parsedate_to_datetime(str(self.msg['Date']))
+
+    # def native_verify(self):
+    #     dks = self.msg.get('dkim-signature')
+    #     ddata = get_parts_from_header(dks)
+    #     try:
+    #         kt, kp = LoreAttestationSignature.get_dkim_key(ddata['d'], ddata['s'])
+    #         if kt not in ('rsa',):  # 'ed25519'):
+    #             logger.debug('DKIM key type %s not supported', kt)
+    #             return
+    #         pk = base64.b64decode(kp)
+    #         sig = base64.b64decode(ddata['b'])
+    #     except (LookupError, binascii.Error) as ex:
+    #         logger.debug('Unable to look up DKIM key: %s', ex)
+    #         return
+    #
+    #     headers = list()
+    #
+    #     for header in ddata['h'].split(':'):
+    #         # For the POC, we assume 'relaxed/'
+    #         hval = self.msg.get(header)
+    #         if hval is None:
+    #             # Missing headers are omitted by the DKIM RFC
+    #             continue
+    #         if ddata['c'].startswith('relaxed/'):
+    #             hname, hval = dkim_canonicalize_header(header, str(self.msg.get(header)))
+    #         else:
+    #             hname = header
+    #             hval = str(self.msg.get(header))
+    #         headers.append(f'{hname}:{hval}')
+    #
+    #     # Now we add the dkim-signature header itself, without b= content
+    #     if ddata['c'].startswith('relaxed/'):
+    #         dname, dval = dkim_canonicalize_header('dkim-signature', dks)
+    #     else:
+    #         dname = 'DKIM-Signature'
+    #         dval = dks
+    #
+    #     dval = dval.rsplit('; b=')[0] + '; b='
+    #     headers.append(f'{dname}:{dval}')
+    #     payload = ('\r\n'.join(headers)).encode()
+    #     key = RSA.import_key(pk)
+    #     hashed = SHA256.new(payload)
+    #     try:
+    #         # noinspection PyTypeChecker
+    #         pkcs1_15.new(key).verify(hashed, sig)
+    #     except (ValueError, TypeError):
+    #         logger.debug('DKIM signature did not verify')
+    #         self.errors.add('The DKIM signature did NOT verify!')
+    #         return
+    #
+    #     self.good = True
+    #     if not ddata.get('i'):
+    #         ddata['i'] = '@' + ddata['d']
+    #
+    #     logger.debug('PASS : DKIM signature for d=%s, s=%s', ddata['d'], ddata['s'])
+    #
+    #     self.attestor = LoreAttestorDKIM(ddata['d'])
+    #     self.valid = True
+    #     self.trusted = True
+    #     self.passing = True
+    #
+    #     self.verify_identity_domain(ddata['i'], ddata['d'])
+    #     if ddata.get('t'):
+    #         self.sigdate = datetime.datetime.utcfromtimestamp(int(ddata['t'])).replace(tzinfo=datetime.timezone.utc)
+    #         self.verify_time_drift()
+    #     else:
+    #         self.sigdate = email.utils.parsedate_to_datetime(str(self.msg['Date']))
+
+
+class LoreAttestationSignaturePGP(LoreAttestationSignature):
+    def __init__(self, msg):
+        super().__init__(msg)
+        self.mode = 'pgp'
+
+        shdr = msg.get(HDR_PATCH_SIG)
+        sdata = get_parts_from_header(shdr)
+        hhdr = msg.get(HDR_PATCH_HASHES)
+        sig = base64.b64decode(sdata['b'])
+        headers = list()
+        hhname, hhval = dkim_canonicalize_header(HDR_PATCH_HASHES, str(hhdr))
+        headers.append(f'{hhname}:{hhval}')
+        # Now we add the sig header itself, without b= content
+        shname, shval = dkim_canonicalize_header(HDR_PATCH_SIG, shdr)
+        shval = shval.rsplit('; b=')[0] + '; b='
+        headers.append(f'{shname}:{shval}')
+        payload = ('\r\n'.join(headers)).encode()
+        savefile = mkstemp('in-header-pgp-verify')[1]
+        with open(savefile, 'wb') as fh:
+            fh.write(sig)
+
+        gpgargs = list()
+        config = get_main_config()
+        trustmodel = config.get('attestation-trust-model', 'tofu')
+        if trustmodel == 'tofu':
+            gpgargs += ['--trust-model', 'tofu', '--tofu-default-policy', 'good']
+        gpgargs += ['--verify', '--status-fd=1', savefile, '-']
+        ecode, out, err = gpg_run_command(gpgargs, stdin=payload)
+        os.unlink(savefile)
+        output = out.decode()
+
         gs_matches = re.search(r'^\[GNUPG:] GOODSIG ([0-9A-F]+)\s+.*$', output, re.M)
         if gs_matches:
             logger.debug('  GOODSIG')
             self.good = True
             keyid = gs_matches.groups()[0]
-            self.attestor = LoreAttestor(keyid)
+            self.attestor = LoreAttestorPGP(keyid)
             puid = '%s <%s>' % self.attestor.get_primary_uid()
             vs_matches = re.search(r'^\[GNUPG:] VALIDSIG ([0-9A-F]+) (\d{4}-\d{2}-\d{2}) (\d+)', output, re.M)
             if vs_matches:
                 logger.debug('  VALIDSIG')
                 self.valid = True
                 ymd = vs_matches.groups()[1]
-                self.sigdate = datetime.datetime.strptime(ymd, '%Y-%m-%d')
+                self.sigdate = datetime.datetime.strptime(ymd, '%Y-%m-%d').replace(tzinfo=datetime.timezone.utc)
                 # Do we have a TRUST_(FULLY|ULTIMATE)?
                 ts_matches = re.search(r'^\[GNUPG:] TRUST_(FULLY|ULTIMATE)', output, re.M)
                 if ts_matches:
@@ -1545,18 +1782,9 @@ class LoreAttestationSignature:
             if matches:
                 self.errors.add('Missing public key: %s' % matches.groups()[0])
 
-    def __repr__(self):
-        out = list()
-        out.append('  good: %s' % self.good)
-        out.append('  valid: %s' % self.valid)
-        out.append('  trusted: %s' % self.trusted)
-        if self.attestor is not None:
-            out.append('  attestor: %s' % self.attestor.keyid)
-
-        out.append('  --- validation errors ---')
-        for error in self.errors:
-            out.append('  | %s' % error)
-        return '\n'.join(out)
+        # A couple of final verifications
+        self.verify_time_drift()
+        # XXX: Need to verify identity domain
 
 
 class LoreAttestation:
@@ -1568,9 +1796,6 @@ class LoreAttestation:
         self.mb = base64.b64encode(_m.digest()).decode()
         self.pb = base64.b64encode(_p.digest()).decode()
 
-        self.hashes_header_name = 'X-Patch-Hashes'
-        self.sig_header_name = 'X-Patch-Sig'
-
         self.lsig = None
         self.passing = False
         self.iv = False
@@ -1592,90 +1817,50 @@ class LoreAttestation:
         out.append('    iv: %s' % self.iv)
         out.append('    mv: %s' % self.mv)
         out.append('    pv: %s' % self.pv)
+        out.append('  pass: %s' % self.passing)
         return '\n'.join(out)
 
-    @staticmethod
-    def verify_identity_domain(msg, identity: str, domain: str) -> bool:
-        # Domain is supposed to be present in identity
-        if not identity.endswith(domain):
-            logger.debug('domain (d=%s) is not in identity (i=%s)', domain, identity)
-            return False
-        fromeml = email.utils.getaddresses(msg.get_all('from', []))[0][1]
-        if identity.find('@') < 0:
-            logger.debug('identity must contain @ (i=%s)', identity)
-            return False
-        ilocal, idomain = identity.split('@')
-        # identity is supposed to be present in from
-        if not fromeml.endswith(f'@{idomain}'):
-            logger.debug('identity (i=%s) does not match from (from=%s)', identity, fromeml)
-            return False
-        logger.debug('identity and domain match From header')
-        return True
-
-    @staticmethod
-    def get_gpg_attestation(smsg: bytes, dsig: bytes) -> LoreAttestationSignature:
-        # We can't pass both the detached sig and the content on stdin, so
-        # use a temporary file
-        savefile = mkstemp('in-header-pgp-verify')[1]
-        with open(savefile, 'wb') as fh:
-            fh.write(dsig)
-
-        gpgargs = list()
-        config = get_main_config()
-        if config['attestation-trust-model'] == 'tofu':
-            gpgargs += ['--trust-model', 'tofu', '--tofu-default-policy', 'good']
-        gpgargs += ['--verify', '--status-fd=1', savefile, '-']
-        ecode, out, err = gpg_run_command(gpgargs, stdin=smsg)
-        os.unlink(savefile)
-        return LoreAttestationSignature(out.decode(), config['attestation-trust-model'])
-
-    @staticmethod
-    def dkim_canonicalize_header(hname, hval):
-        hname = hname.lower()
-        hval = hval.strip()
-        hval = re.sub(r'\n', '', hval)
-        hval = re.sub(r'\s+', ' ', hval)
-        return hname, hval
-
-    @staticmethod
-    def get_parts_from_header(hstr: str) -> dict:
-        hstr = re.sub(r'\s*', '', hstr)
-        hdata = dict()
-        for chunk in hstr.split(';'):
-            parts = chunk.split('=', 1)
-            if len(parts) < 2:
-                continue
-            hdata[parts[0]] = parts[1]
-        return hdata
-
     def validate(self, msg):
-        shdr = msg.get(self.sig_header_name)
+        # Check if we have a X-Patch-Sig header. At this time, we only support two modes:
+        # - GPG mode, which we check for fist
+        # - Plain DKIM mode, which we check as fall-back
+        # More modes may be coming in the future, depending on feedback.
+        shdr = msg.get(HDR_PATCH_SIG)
+        hhdr = msg.get(HDR_PATCH_HASHES)
+        if hhdr is None:
+            # Do we have a dkim signature header?
+            if msg.get('DKIM-Signature'):
+                if can_dkim_verify:
+                    self.lsig = LoreAttestationSignatureDKIM(msg)
+                    if self.lsig.passing:
+                        self.passing = True
+                        self.iv = True
+                        self.mv = True
+                        self.pv = True
+                    return self.passing
+            return None
+
         if shdr is None:
             return None
-        sdata = LoreAttestation.get_parts_from_header(shdr)
-        sig = base64.b64decode(sdata['b'])
-        headers = list()
-        hhname, hhval = LoreAttestation.dkim_canonicalize_header(self.hashes_header_name,
-                                                                 str(msg.get(self.hashes_header_name)))
-        headers.append(f'{hhname}:{hhval}')
-        # Now we add the sig header itself, without b= content
-        shname, shval = LoreAttestation.dkim_canonicalize_header(self.sig_header_name, shdr)
-        shval = shval.rsplit('; b=')[0] + '; b='
-        headers.append(f'{shname}:{shval}')
-        payload = ('\r\n'.join(headers)).encode()
-        self.lsig = LoreAttestation.get_gpg_attestation(payload, sig)
-        if self.lsig.passing:
-            hdata = LoreAttestation.get_parts_from_header(hhval)
-            if hdata['i'] == self.ib:
-                self.iv = True
-            if hdata['m'] == self.mb:
-                self.mv = True
-            if hdata['p'] == self.pb:
-                self.pv = True
+
+        sdata = get_parts_from_header(shdr)
+        if sdata.get('m') == 'pgp':
+            self.lsig = LoreAttestationSignaturePGP(msg)
+            if self.lsig.passing:
+                hdata = get_parts_from_header(hhdr)
+                if hdata['i'] == self.ib:
+                    self.iv = True
+                if hdata['m'] == self.mb:
+                    self.mv = True
+                if hdata['p'] == self.pb:
+                    self.pv = True
 
             if self.iv and self.mv and self.pv:
                 self.passing = True
 
+        if self.lsig is None:
+            return None
+
         return self.passing
 
 
@@ -2112,3 +2297,40 @@ def parse_int_range(intrange, upper=None):
                 yield from range(int(nr[0]), upper+1)
         else:
             logger.critical('Unknown range value specified: %s', n)
+
+
+def dkim_canonicalize_header(hname, hval):
+    hname = hname.lower()
+    hval = hval.strip()
+    hval = re.sub(r'\n', '', hval)
+    hval = re.sub(r'\s+', ' ', hval)
+    return hname, hval
+
+
+def get_parts_from_header(hstr: str) -> dict:
+    hstr = re.sub(r'\s*', '', hstr)
+    hdata = dict()
+    for chunk in hstr.split(';'):
+        parts = chunk.split('=', 1)
+        if len(parts) < 2:
+            continue
+        hdata[parts[0]] = parts[1]
+    return hdata
+
+
+def dkim_get_txt(name: bytes, timeout: int = 5):
+    lookup = name.decode()
+    logger.debug('DNS-lookup: %s', lookup)
+    try:
+        a = _resolver.resolve(lookup, dns.rdatatype.TXT, raise_on_no_answer=False, lifetime=timeout, search=True)
+        # Find v=DKIM1
+        for r in a.response.answer:
+            if r.rdtype == dns.rdatatype.TXT:
+                for item in r.items:
+                    # Concatenate all strings
+                    txtdata = b''.join(item.strings)
+                    if txtdata.find(b'v=DKIM1') >= 0:
+                        return txtdata
+    except dns.resolver.NXDOMAIN:
+        pass
+    return None
diff --git a/requirements.txt b/requirements.txt
index f229360..11d400d 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1 +1,7 @@
-requests
+requests ~= 2.24.0
+# These are optional, needed for attestation features
+dnspython~=2.0.0
+dkimpy~=1.0.5
+# These may be required in the future for other patch attestation features
+#pycryptodomex~=3.9.9
+#PyNaCl~=1.4.0
-- 
2.26.2



^ permalink raw reply related	[flat|nested] 5+ messages in thread

* [PATCH 2/4] Add very simple dkim key caching
  2020-11-20 21:27 [PATCH 0/4] b4: Initial DKIM attestation implementation Konstantin Ryabitsev
  2020-11-20 21:27 ` [PATCH 1/4] Add initial support for DKIM attestation Konstantin Ryabitsev
@ 2020-11-20 21:27 ` Konstantin Ryabitsev
  2020-11-20 21:27 ` [PATCH 3/4] Fix signature verification for b4 pr Konstantin Ryabitsev
  2020-11-20 21:27 ` [PATCH 4/4] Fix in-header attestation code Konstantin Ryabitsev
  3 siblings, 0 replies; 5+ messages in thread
From: Konstantin Ryabitsev @ 2020-11-20 21:27 UTC (permalink / raw)
  To: signatures; +Cc: Konstantin Ryabitsev

We're still spending too much time in dns lookups, even though they are
supposed to be cached.

Signed-off-by: Konstantin Ryabitsev <konstantin@linuxfoundation.org>
---
 b4/__init__.py | 37 +++++++++++++++++++++----------------
 1 file changed, 21 insertions(+), 16 deletions(-)

diff --git a/b4/__init__.py b/b4/__init__.py
index c552a5f..ab5797d 100644
--- a/b4/__init__.py
+++ b/b4/__init__.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 # SPDX-License-Identifier: GPL-2.0-or-later
 # Copyright (C) 2020 by the Linux Foundation
 import subprocess
@@ -139,6 +138,8 @@ SUBKEY_DATA = dict()
 REQSESSION = None
 # Indicates that we've cleaned cache already
 _CACHE_CLEANED = False
+# Used for dkim key lookups
+_DKIM_DNS_CACHE = dict()
 
 
 class LoreMailbox:
@@ -2319,18 +2320,22 @@ def get_parts_from_header(hstr: str) -> dict:
 
 
 def dkim_get_txt(name: bytes, timeout: int = 5):
-    lookup = name.decode()
-    logger.debug('DNS-lookup: %s', lookup)
-    try:
-        a = _resolver.resolve(lookup, dns.rdatatype.TXT, raise_on_no_answer=False, lifetime=timeout, search=True)
-        # Find v=DKIM1
-        for r in a.response.answer:
-            if r.rdtype == dns.rdatatype.TXT:
-                for item in r.items:
-                    # Concatenate all strings
-                    txtdata = b''.join(item.strings)
-                    if txtdata.find(b'v=DKIM1') >= 0:
-                        return txtdata
-    except dns.resolver.NXDOMAIN:
-        pass
-    return None
+    global _DKIM_DNS_CACHE
+    if name not in _DKIM_DNS_CACHE:
+        lookup = name.decode()
+        logger.debug('DNS-lookup: %s', lookup)
+        try:
+            a = _resolver.resolve(lookup, dns.rdatatype.TXT, raise_on_no_answer=False, lifetime=timeout, search=True)
+            # Find v=DKIM1
+            for r in a.response.answer:
+                if r.rdtype == dns.rdatatype.TXT:
+                    for item in r.items:
+                        # Concatenate all strings
+                        txtdata = b''.join(item.strings)
+                        if txtdata.find(b'v=DKIM1') >= 0:
+                            _DKIM_DNS_CACHE[name] = txtdata
+                            return txtdata
+        except dns.resolver.NXDOMAIN:
+            pass
+        _DKIM_DNS_CACHE[name] = None
+    return _DKIM_DNS_CACHE[name]
-- 
2.26.2



^ permalink raw reply related	[flat|nested] 5+ messages in thread

* [PATCH 3/4] Fix signature verification for b4 pr
  2020-11-20 21:27 [PATCH 0/4] b4: Initial DKIM attestation implementation Konstantin Ryabitsev
  2020-11-20 21:27 ` [PATCH 1/4] Add initial support for DKIM attestation Konstantin Ryabitsev
  2020-11-20 21:27 ` [PATCH 2/4] Add very simple dkim key caching Konstantin Ryabitsev
@ 2020-11-20 21:27 ` Konstantin Ryabitsev
  2020-11-20 21:27 ` [PATCH 4/4] Fix in-header attestation code Konstantin Ryabitsev
  3 siblings, 0 replies; 5+ messages in thread
From: Konstantin Ryabitsev @ 2020-11-20 21:27 UTC (permalink / raw)
  To: signatures; +Cc: Konstantin Ryabitsev

We moved pgp sig verification code around, so fix it for the invocation
in b4 pr.

Signed-off-by: Konstantin Ryabitsev <konstantin@linuxfoundation.org>
---
 b4/__init__.py | 76 +++++++++++++++++++++++++++++++-------------------
 b4/attest.py   |  4 +--
 b4/pr.py       | 14 ++++++----
 3 files changed, 57 insertions(+), 37 deletions(-)

diff --git a/b4/__init__.py b/b4/__init__.py
index ab5797d..ac0e85c 100644
--- a/b4/__init__.py
+++ b/b4/__init__.py
@@ -1753,35 +1753,11 @@ class LoreAttestationSignaturePGP(LoreAttestationSignature):
         os.unlink(savefile)
         output = out.decode()
 
-        gs_matches = re.search(r'^\[GNUPG:] GOODSIG ([0-9A-F]+)\s+.*$', output, re.M)
-        if gs_matches:
-            logger.debug('  GOODSIG')
-            self.good = True
-            keyid = gs_matches.groups()[0]
-            self.attestor = LoreAttestorPGP(keyid)
-            puid = '%s <%s>' % self.attestor.get_primary_uid()
-            vs_matches = re.search(r'^\[GNUPG:] VALIDSIG ([0-9A-F]+) (\d{4}-\d{2}-\d{2}) (\d+)', output, re.M)
-            if vs_matches:
-                logger.debug('  VALIDSIG')
-                self.valid = True
-                ymd = vs_matches.groups()[1]
-                self.sigdate = datetime.datetime.strptime(ymd, '%Y-%m-%d').replace(tzinfo=datetime.timezone.utc)
-                # Do we have a TRUST_(FULLY|ULTIMATE)?
-                ts_matches = re.search(r'^\[GNUPG:] TRUST_(FULLY|ULTIMATE)', output, re.M)
-                if ts_matches:
-                    logger.debug('  TRUST_%s', ts_matches.groups()[0])
-                    self.trusted = True
-                    self.passing = True
-                else:
-                    self.errors.add('Insufficient trust (model=%s): %s (%s)'
-                                    % (trustmodel, keyid, puid))
-            else:
-                self.errors.add('Signature not valid from key: %s (%s)' % (keyid, puid))
-        else:
-            # Are we missing a key?
-            matches = re.search(r'^\[GNUPG:] NO_PUBKEY ([0-9A-F]+)$', output, re.M)
-            if matches:
-                self.errors.add('Missing public key: %s' % matches.groups()[0])
+        self.good, self.valid, self.trusted, self.attestor, self.sigdate, self.errors = \
+            validate_gpg_signature(output, trustmodel)
+
+        if self.good and self.valid and self.trusted:
+            self.passing = True
 
         # A couple of final verifications
         self.verify_time_drift()
@@ -2319,6 +2295,48 @@ def get_parts_from_header(hstr: str) -> dict:
     return hdata
 
 
+def validate_gpg_signature(output, trustmodel):
+    good = False
+    valid = False
+    trusted = False
+    attestor = None
+    sigdate = None
+    errors = set()
+    gs_matches = re.search(r'^\[GNUPG:] GOODSIG ([0-9A-F]+)\s+.*$', output, re.M)
+    if gs_matches:
+        logger.debug('  GOODSIG')
+        good = True
+        keyid = gs_matches.groups()[0]
+        attestor = LoreAttestorPGP(keyid)
+        puid = '%s <%s>' % attestor.get_primary_uid()
+        vs_matches = re.search(r'^\[GNUPG:] VALIDSIG ([0-9A-F]+) (\d{4}-\d{2}-\d{2}) (\d+)', output, re.M)
+        if vs_matches:
+            logger.debug('  VALIDSIG')
+            valid = True
+            ymd = vs_matches.groups()[1]
+            sigdate = datetime.datetime.strptime(ymd, '%Y-%m-%d').replace(tzinfo=datetime.timezone.utc)
+            # Do we have a TRUST_(FULLY|ULTIMATE)?
+            ts_matches = re.search(r'^\[GNUPG:] TRUST_(FULLY|ULTIMATE)', output, re.M)
+            if ts_matches:
+                logger.debug('  TRUST_%s', ts_matches.groups()[0])
+                trusted = True
+            else:
+                errors.add('Insufficient trust (model=%s): %s (%s)' % (trustmodel, keyid, puid))
+        else:
+            errors.add('Signature not valid from key: %s (%s)' % (attestor.keyid, puid))
+    else:
+        # Are we missing a key?
+        matches = re.search(r'^\[GNUPG:] NO_PUBKEY ([0-9A-F]+)$', output, re.M)
+        if matches:
+            errors.add('Missing public key: %s' % matches.groups()[0])
+        # Is the key expired?
+        matches = re.search(r'^\[GNUPG:] EXPKEYSIG (.*)$', output, re.M)
+        if matches:
+            errors.add('Expired key: %s' % matches.groups()[0])
+
+    return good, valid, trusted, attestor, sigdate, errors
+
+
 def dkim_get_txt(name: bytes, timeout: int = 5):
     global _DKIM_DNS_CACHE
     if name not in _DKIM_DNS_CACHE:
diff --git a/b4/attest.py b/b4/attest.py
index 4391b19..a4012d7 100644
--- a/b4/attest.py
+++ b/b4/attest.py
@@ -39,7 +39,7 @@ def in_header_attest(lmsg: b4.LoreMessage, mode: str = 'pgp', replace: bool = Fa
         f'm={lmsg.attestation.mb}',
         f'p={lmsg.attestation.pb}',
     ]
-    hhname, hhval = b4.LoreAttestation.dkim_canonicalize_header(lmsg.attestation.hashes_header_name, '; '.join(hparts))
+    hhname, hhval = b4.dkim_canonicalize_header(lmsg.attestation.hashes_header_name, '; '.join(hparts))
     headers.append(f'{hhname}:{hhval}')
 
     logger.debug('Signing with mode=%s', mode)
@@ -59,7 +59,7 @@ def in_header_attest(lmsg: b4.LoreMessage, mode: str = 'pgp', replace: bool = Fa
             'b=',
         ]
 
-        shname, shval = b4.LoreAttestation.dkim_canonicalize_header(lmsg.attestation.sig_header_name, '; '.join(hparts))
+        shname, shval = b4.dkim_canonicalize_header(lmsg.attestation.sig_header_name, '; '.join(hparts))
         headers.append(f'{shname}:{shval}')
         payload = '\r\n'.join(headers).encode()
         ecode, out, err = b4.gpg_run_command(gpgargs, payload)
diff --git a/b4/pr.py b/b4/pr.py
index 40d3127..b7ed9e1 100644
--- a/b4/pr.py
+++ b/b4/pr.py
@@ -127,27 +127,29 @@ def attest_fetch_head(gitdir, lmsg):
         ecode, out = b4.git_run_command(gitdir, ['verify-tag', '--raw', 'FETCH_HEAD'], logstderr=True)
     elif otype == 'commit':
         ecode, out = b4.git_run_command(gitdir, ['verify-commit', '--raw', 'FETCH_HEAD'], logstderr=True)
-    lsig = b4.LoreAttestationSignature(out, 'git')
-    if lsig.good and lsig.valid and lsig.trusted:
+
+    good, valid, trusted, attestor, sigdate, errors = b4.validate_gpg_signature(out, 'pgp')
+
+    if good and valid and trusted:
         passing = True
 
     out = out.strip()
     if not len(out) and attpolicy != 'check':
-        lsig.errors.add('Remote %s is not signed!' % otype)
+        errors.add('Remote %s is not signed!' % otype)
 
     if passing:
-        trailer = lsig.attestor.get_trailer(lmsg.fromemail)
+        trailer = attestor.get_trailer(lmsg.fromemail)
         logger.info('  ---')
         logger.info('  %s %s', attpass, trailer)
         return
 
-    if lsig.errors:
+    if errors:
         logger.critical('  ---')
         if len(out):
             logger.critical('  Pull request is signed, but verification did not succeed:')
         else:
             logger.critical('  Pull request verification did not succeed:')
-        for error in lsig.errors:
+        for error in errors:
             logger.critical('    %s %s', attfail, error)
 
         if attpolicy == 'hardfail':
-- 
2.26.2



^ permalink raw reply related	[flat|nested] 5+ messages in thread

* [PATCH 4/4] Fix in-header attestation code
  2020-11-20 21:27 [PATCH 0/4] b4: Initial DKIM attestation implementation Konstantin Ryabitsev
                   ` (2 preceding siblings ...)
  2020-11-20 21:27 ` [PATCH 3/4] Fix signature verification for b4 pr Konstantin Ryabitsev
@ 2020-11-20 21:27 ` Konstantin Ryabitsev
  3 siblings, 0 replies; 5+ messages in thread
From: Konstantin Ryabitsev @ 2020-11-20 21:27 UTC (permalink / raw)
  To: signatures; +Cc: Konstantin Ryabitsev

We've moved some constant declarations around, so fix the code to look
for them in the right places.

Signed-off-by: Konstantin Ryabitsev <konstantin@linuxfoundation.org>
---
 b4/attest.py | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)

diff --git a/b4/attest.py b/b4/attest.py
index a4012d7..1b464d0 100644
--- a/b4/attest.py
+++ b/b4/attest.py
@@ -19,12 +19,12 @@ logger = b4.logger
 
 
 def in_header_attest(lmsg: b4.LoreMessage, mode: str = 'pgp', replace: bool = False) -> None:
-    if lmsg.msg.get(lmsg.attestation.hashes_header_name):
+    if lmsg.msg.get(b4.HDR_PATCH_HASHES):
         if not replace:
             logger.info(' attest: message already attested')
             return
-        del lmsg.msg[lmsg.attestation.hashes_header_name]
-        del lmsg.msg[lmsg.attestation.sig_header_name]
+        del lmsg.msg[b4.HDR_PATCH_HASHES]
+        del lmsg.msg[b4.HDR_PATCH_SIG]
 
     logger.info(' attest: generating attestation hashes')
     if not lmsg.attestation:
@@ -39,7 +39,7 @@ def in_header_attest(lmsg: b4.LoreMessage, mode: str = 'pgp', replace: bool = Fa
         f'm={lmsg.attestation.mb}',
         f'p={lmsg.attestation.pb}',
     ]
-    hhname, hhval = b4.dkim_canonicalize_header(lmsg.attestation.hashes_header_name, '; '.join(hparts))
+    hhname, hhval = b4.dkim_canonicalize_header(b4.HDR_PATCH_HASHES, '; '.join(hparts))
     headers.append(f'{hhname}:{hhval}')
 
     logger.debug('Signing with mode=%s', mode)
@@ -59,7 +59,7 @@ def in_header_attest(lmsg: b4.LoreMessage, mode: str = 'pgp', replace: bool = Fa
             'b=',
         ]
 
-        shname, shval = b4.dkim_canonicalize_header(lmsg.attestation.sig_header_name, '; '.join(hparts))
+        shname, shval = b4.dkim_canonicalize_header(b4.HDR_PATCH_SIG, '; '.join(hparts))
         headers.append(f'{shname}:{shval}')
         payload = '\r\n'.join(headers).encode()
         ecode, out, err = b4.gpg_run_command(gpgargs, payload)
@@ -74,8 +74,8 @@ def in_header_attest(lmsg: b4.LoreMessage, mode: str = 'pgp', replace: bool = Fa
 
     hhdr = email.header.make_header([(hhval.encode(), 'us-ascii')], maxlinelen=78)
     shdr = email.header.make_header([(shval.encode(), 'us-ascii')], maxlinelen=78)
-    lmsg.msg[lmsg.attestation.hashes_header_name] = hhdr
-    lmsg.msg[lmsg.attestation.sig_header_name] = shdr
+    lmsg.msg[b4.HDR_PATCH_HASHES] = hhdr
+    lmsg.msg[b4.HDR_PATCH_SIG] = shdr
 
 
 def header_splitter(longstr: str, limit: int = 77) -> str:
-- 
2.26.2



^ permalink raw reply related	[flat|nested] 5+ messages in thread

end of thread, other threads:[~2020-11-20 21:27 UTC | newest]

Thread overview: 5+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-11-20 21:27 [PATCH 0/4] b4: Initial DKIM attestation implementation Konstantin Ryabitsev
2020-11-20 21:27 ` [PATCH 1/4] Add initial support for DKIM attestation Konstantin Ryabitsev
2020-11-20 21:27 ` [PATCH 2/4] Add very simple dkim key caching Konstantin Ryabitsev
2020-11-20 21:27 ` [PATCH 3/4] Fix signature verification for b4 pr Konstantin Ryabitsev
2020-11-20 21:27 ` [PATCH 4/4] Fix in-header attestation code Konstantin Ryabitsev

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.