* [PATCH 0/4] b4: Initial DKIM attestation implementation
@ 2020-11-20 21:27 Konstantin Ryabitsev
2020-11-20 21:27 ` [PATCH 1/4] Add initial support for DKIM attestation Konstantin Ryabitsev
` (3 more replies)
0 siblings, 4 replies; 5+ messages in thread
From: Konstantin Ryabitsev @ 2020-11-20 21:27 UTC (permalink / raw)
To: signatures; +Cc: Konstantin Ryabitsev
This implements DKIM attestation and fixes pgp-mode in-header
attestation to work again.
Konstantin Ryabitsev (4):
Add initial support for DKIM attestation
Add very simple dkim key caching
Fix signature verification for b4 pr
Fix in-header attestation code
b4/__init__.py | 523 ++++++++++++++++++++++++++++++++++-------------
b4/attest.py | 14 +-
b4/pr.py | 14 +-
requirements.txt | 8 +-
4 files changed, 406 insertions(+), 153 deletions(-)
--
2.26.2
^ permalink raw reply [flat|nested] 5+ messages in thread
* [PATCH 1/4] Add initial support for DKIM attestation
2020-11-20 21:27 [PATCH 0/4] b4: Initial DKIM attestation implementation Konstantin Ryabitsev
@ 2020-11-20 21:27 ` Konstantin Ryabitsev
2020-11-20 21:27 ` [PATCH 2/4] Add very simple dkim key caching Konstantin Ryabitsev
` (2 subsequent siblings)
3 siblings, 0 replies; 5+ messages in thread
From: Konstantin Ryabitsev @ 2020-11-20 21:27 UTC (permalink / raw)
To: signatures; +Cc: Konstantin Ryabitsev
Now that vger is doing a much better job preserving DKIM signatures, it
makes sense to teach b4 to check those. It's still failing for most
mailman lists, but those are fewer than vger sources.
Signed-off-by: Konstantin Ryabitsev <konstantin@linuxfoundation.org>
---
b4/__init__.py | 468 ++++++++++++++++++++++++++++++++++-------------
requirements.txt | 8 +-
2 files changed, 352 insertions(+), 124 deletions(-)
diff --git a/b4/__init__.py b/b4/__init__.py
index a0fa086..c552a5f 100644
--- a/b4/__init__.py
+++ b/b4/__init__.py
@@ -29,8 +29,16 @@ from email import charset
charset.add_charset('utf-8', None)
emlpolicy = email.policy.EmailPolicy(utf8=True, cte_type='8bit', max_line_length=None)
+try:
+ import dns.resolver
+ import dkim
+
+ can_dkim_verify = True
+ _resolver = dns.resolver.get_default_resolver()
+except ModuleNotFoundError:
+ can_dkim_verify = False
+
__VERSION__ = '0.6.0-dev'
-ATTESTATION_FORMAT_VER = '0.1'
logger = logging.getLogger('b4')
@@ -38,9 +46,14 @@ HUNK_RE = re.compile(r'^@@ -\d+(?:,(\d+))? \+\d+(?:,(\d+))? @@')
FILENAME_RE = re.compile(r'^(---|\+\+\+) (\S+)')
PASS_SIMPLE = '[P]'
+WEAK_SIMPLE = '[D]'
FAIL_SIMPLE = '[F]'
-PASS_FANCY = '[\033[32m\u2713\033[0m]'
-FAIL_FANCY = '[\033[31m\u2717\033[0m]'
+PASS_FANCY = '\033[32m\u2714\033[0m'
+WEAK_FANCY = '\033[33m\u2713\033[0m'
+FAIL_FANCY = '\033[31m\u2717\033[0m'
+
+HDR_PATCH_HASHES = 'X-Patch-Hashes'
+HDR_PATCH_SIG = 'X-Patch-Sig'
# You can use bash-style globbing here
WANTHDRS = [
@@ -480,22 +493,17 @@ class LoreSeries:
logger.critical('WARNING: Unable to add your Signed-off-by: git returned no user.name or user.email')
addmysob = False
- attdata = [None] * self.expected
+ attdata = [(None, None)] * self.expected
attpolicy = config['attestation-policy']
- try:
- attstaled = int(config['attestation-staleness-days'])
- except ValueError:
- attstaled = 30
- # exact_from_match = False
- # if config['attestation-uid-match'] == 'strict':
- # exact_from_match = True
if config['attestation-checkmarks'] == 'fancy':
attpass = PASS_FANCY
attfail = FAIL_FANCY
+ attweak = WEAK_FANCY
else:
attpass = PASS_SIMPLE
attfail = FAIL_SIMPLE
+ attweak = WEAK_SIMPLE
at = 1
atterrors = list()
@@ -508,6 +516,7 @@ class LoreSeries:
if lmsg is None:
logger.critical('CRITICAL: [%s/%s] is missing, cannot cherrypick', at, self.expected)
raise KeyError('Cherrypick not in series')
+
if lmsg is not None:
if self.has_cover and covertrailers and self.patches[0].followup_trailers:
lmsg.followup_trailers += self.patches[0].followup_trailers
@@ -521,17 +530,12 @@ class LoreSeries:
lmsg.load_hashes()
latt = lmsg.attestation
if latt and latt.validate(lmsg.msg):
- # Make sure it's not too old compared to the message date
- # Timezone doesn't matter as we calculate whole days
- tdelta = lmsg.date.replace(tzinfo=None) - latt.lsig.sigdate
- if tdelta.days > attstaled:
- # Uh-oh, attestation is too old!
- logger.info(' %s %s', attfail, lmsg.full_subject)
- atterrors.append('Attestation for %s/%s is over %sd old: %sd' % (at, lmsg.expected,
- attstaled, tdelta.days))
+ if latt.lsig.attestor and latt.lsig.attestor.mode == 'domain':
+ logger.info(' %s %s', attweak, lmsg.full_subject)
+ attdata[at-1] = (latt.lsig.attestor.get_trailer(lmsg.fromemail), attweak) # noqa
else:
logger.info(' %s %s', attpass, lmsg.full_subject)
- attdata[at - 1] = latt.lsig.attestor.get_trailer(lmsg.fromemail)
+ attdata[at-1] = (latt.lsig.attestor.get_trailer(lmsg.fromemail), attpass) # noqa
else:
if attpolicy in ('softfail', 'hardfail'):
logger.info(' %s %s', attfail, lmsg.full_subject)
@@ -561,12 +565,16 @@ class LoreSeries:
if attpolicy == 'off':
return mbx
- failed = None in attdata
+
+ failed = (None, None) in attdata
if not failed:
logger.info(' ---')
- for trailer in set(attdata):
- logger.info(' %s %s', attpass, trailer)
+ for trailer, attmode in set(attdata):
+ logger.info(' %s %s', attmode, trailer)
return mbx
+ elif not can_dkim_verify:
+ logger.info(' ---')
+ logger.info(' NOTE: install dkimpy for DKIM signature attestation.')
errors = set(atterrors)
for attdoc in ATTESTATIONS:
@@ -1452,6 +1460,27 @@ class LoreAttestor:
self.keyid = keyid
self.uids = list()
+ def __repr__(self):
+ out = list()
+ out.append(' keyid: %s' % self.keyid)
+ for uid in self.uids:
+ out.append(' uid: %s <%s>' % uid)
+ return '\n'.join(out)
+
+
+class LoreAttestorDKIM(LoreAttestor):
+ def __init__(self, keyid):
+ self.mode = 'domain'
+ super().__init__(keyid)
+
+ def get_trailer(self, fromaddr): # noqa
+ return 'Attestation-by: DKIM/%s (From: %s)' % (self.keyid, fromaddr)
+
+
+class LoreAttestorPGP(LoreAttestor):
+ def __init__(self, keyid):
+ super().__init__(keyid)
+ self.mode = 'person'
self.load_subkey_uids()
def load_subkey_uids(self):
@@ -1497,16 +1526,11 @@ class LoreAttestor:
return 'Attestation-by: %s <%s> (pgp: %s)' % (uid[0], uid[1], self.keyid)
- def __repr__(self):
- out = list()
- out.append(' keyid: %s' % self.keyid)
- for uid in self.uids:
- out.append(' uid: %s <%s>' % uid)
- return '\n'.join(out)
-
class LoreAttestationSignature:
- def __init__(self, output, trustmodel):
+ def __init__(self, msg):
+ self.msg = msg
+ self.mode = None
self.good = False
self.valid = False
self.trusted = False
@@ -1515,19 +1539,232 @@ class LoreAttestationSignature:
self.attestor = None
self.errors = set()
+ config = get_main_config()
+ try:
+ driftd = int(config['attestation-staleness-days'])
+ except ValueError:
+ driftd = 30
+
+ self.maxdrift = datetime.timedelta(days=driftd)
+
+ def verify_time_drift(self) -> None:
+ msgdt = email.utils.parsedate_to_datetime(str(self.msg['Date']))
+ sdrift = self.sigdate - msgdt
+ if sdrift > self.maxdrift:
+ self.passing = False
+ self.errors.add('Time drift between Date and t too great (%s)' % sdrift)
+ return
+ logger.debug('PASS : time drift between Date and t (%s)', sdrift)
+
+ def verify_identity_domain(self, identity: str, domain: str):
+ # Domain is supposed to be present in identity
+ if not identity.endswith(domain):
+ logger.debug('domain (d=%s) is not in identity (i=%s)', domain, identity)
+ self.passing = False
+ return
+ fromeml = email.utils.getaddresses(self.msg.get_all('from', []))[0][1]
+ if identity.find('@') < 0:
+ logger.debug('identity must contain @ (i=%s)', identity)
+ self.passing = False
+ return
+ ilocal, idomain = identity.split('@')
+ # identity is supposed to be present in from
+ if not fromeml.endswith(f'@{idomain}'):
+ self.errors.add('identity (i=%s) does not match from (from=%s)' % (identity, fromeml))
+ self.passing = False
+ return
+ logger.debug('identity and domain match From header')
+
+ # @staticmethod
+ # def get_dkim_key(domain: str, selector: str, timeout: int = 5) -> Tuple[str, str]:
+ # global DNSCACHE
+ # if (domain, selector) in DNSCACHE:
+ # return DNSCACHE[(domain, selector)]
+ #
+ # name = f'{selector}._domainkey.{domain}.'
+ # logger.debug('DNS-lookup: %s', name)
+ # keydata = None
+ # try:
+ # a = dns.resolver.resolve(name, dns.rdatatype.TXT, raise_on_no_answer=False, lifetime=timeout) # noqa
+ # # Find v=DKIM1
+ # for r in a.response.answer:
+ # if r.rdtype == dns.rdatatype.TXT:
+ # for item in r.items:
+ # # Concatenate all strings
+ # txtdata = b''.join(item.strings)
+ # if txtdata.find(b'v=DKIM1') >= 0:
+ # keydata = txtdata.decode()
+ # break
+ # if keydata:
+ # break
+ # except dns.resolver.NXDOMAIN: # noqa
+ # raise LookupError('Domain %s does not exist', name)
+ #
+ # if not keydata:
+ # raise LookupError('Domain %s does not contain a DKIM record', name)
+ #
+ # parts = get_parts_from_header(keydata)
+ # if 'p' not in parts:
+ # raise LookupError('Domain %s does not contain a DKIM key', name)
+ # if 'k' not in parts:
+ # raise LookupError('Domain %s does not indicate key time', name)
+ #
+ # DNSCACHE[(domain, selector)] = (parts['k'], parts['p'])
+ # logger.debug('k=%s, p=%s', parts['k'], parts['p'])
+ # return parts['k'], parts['p']
+
+ def __repr__(self):
+ out = list()
+ out.append(' mode: %s' % self.mode)
+ out.append(' good: %s' % self.good)
+ out.append(' valid: %s' % self.valid)
+ out.append(' trusted: %s' % self.trusted)
+ if self.attestor is not None:
+ out.append(' attestor: %s' % self.attestor.keyid)
+
+ out.append(' --- validation errors ---')
+ for error in self.errors:
+ out.append(' | %s' % error)
+ return '\n'.join(out)
+
+
+class LoreAttestationSignatureDKIM(LoreAttestationSignature):
+ def __init__(self, msg):
+ super().__init__(msg)
+ self.mode = 'dkim'
+ # Doesn't quite work right, so just use dkimpy's native
+ # self.native_verify()
+ # return
+
+ if not dkim.verify(self.msg.as_bytes(), dnsfunc=dkim_get_txt):
+ return
+ self.good = True
+
+ # Grab toplevel signature that we just verified
+ dks = self.msg.get('dkim-signature')
+ ddata = get_parts_from_header(dks)
+ self.attestor = LoreAttestorDKIM(ddata['d'])
+ self.valid = True
+ self.trusted = True
+ self.passing = True
+
+ if ddata.get('t'):
+ self.sigdate = datetime.datetime.utcfromtimestamp(int(ddata['t'])).replace(tzinfo=datetime.timezone.utc)
+ else:
+ self.sigdate = email.utils.parsedate_to_datetime(str(self.msg['Date']))
+
+ # def native_verify(self):
+ # dks = self.msg.get('dkim-signature')
+ # ddata = get_parts_from_header(dks)
+ # try:
+ # kt, kp = LoreAttestationSignature.get_dkim_key(ddata['d'], ddata['s'])
+ # if kt not in ('rsa',): # 'ed25519'):
+ # logger.debug('DKIM key type %s not supported', kt)
+ # return
+ # pk = base64.b64decode(kp)
+ # sig = base64.b64decode(ddata['b'])
+ # except (LookupError, binascii.Error) as ex:
+ # logger.debug('Unable to look up DKIM key: %s', ex)
+ # return
+ #
+ # headers = list()
+ #
+ # for header in ddata['h'].split(':'):
+ # # For the POC, we assume 'relaxed/'
+ # hval = self.msg.get(header)
+ # if hval is None:
+ # # Missing headers are omitted by the DKIM RFC
+ # continue
+ # if ddata['c'].startswith('relaxed/'):
+ # hname, hval = dkim_canonicalize_header(header, str(self.msg.get(header)))
+ # else:
+ # hname = header
+ # hval = str(self.msg.get(header))
+ # headers.append(f'{hname}:{hval}')
+ #
+ # # Now we add the dkim-signature header itself, without b= content
+ # if ddata['c'].startswith('relaxed/'):
+ # dname, dval = dkim_canonicalize_header('dkim-signature', dks)
+ # else:
+ # dname = 'DKIM-Signature'
+ # dval = dks
+ #
+ # dval = dval.rsplit('; b=')[0] + '; b='
+ # headers.append(f'{dname}:{dval}')
+ # payload = ('\r\n'.join(headers)).encode()
+ # key = RSA.import_key(pk)
+ # hashed = SHA256.new(payload)
+ # try:
+ # # noinspection PyTypeChecker
+ # pkcs1_15.new(key).verify(hashed, sig)
+ # except (ValueError, TypeError):
+ # logger.debug('DKIM signature did not verify')
+ # self.errors.add('The DKIM signature did NOT verify!')
+ # return
+ #
+ # self.good = True
+ # if not ddata.get('i'):
+ # ddata['i'] = '@' + ddata['d']
+ #
+ # logger.debug('PASS : DKIM signature for d=%s, s=%s', ddata['d'], ddata['s'])
+ #
+ # self.attestor = LoreAttestorDKIM(ddata['d'])
+ # self.valid = True
+ # self.trusted = True
+ # self.passing = True
+ #
+ # self.verify_identity_domain(ddata['i'], ddata['d'])
+ # if ddata.get('t'):
+ # self.sigdate = datetime.datetime.utcfromtimestamp(int(ddata['t'])).replace(tzinfo=datetime.timezone.utc)
+ # self.verify_time_drift()
+ # else:
+ # self.sigdate = email.utils.parsedate_to_datetime(str(self.msg['Date']))
+
+
+class LoreAttestationSignaturePGP(LoreAttestationSignature):
+ def __init__(self, msg):
+ super().__init__(msg)
+ self.mode = 'pgp'
+
+ shdr = msg.get(HDR_PATCH_SIG)
+ sdata = get_parts_from_header(shdr)
+ hhdr = msg.get(HDR_PATCH_HASHES)
+ sig = base64.b64decode(sdata['b'])
+ headers = list()
+ hhname, hhval = dkim_canonicalize_header(HDR_PATCH_HASHES, str(hhdr))
+ headers.append(f'{hhname}:{hhval}')
+ # Now we add the sig header itself, without b= content
+ shname, shval = dkim_canonicalize_header(HDR_PATCH_SIG, shdr)
+ shval = shval.rsplit('; b=')[0] + '; b='
+ headers.append(f'{shname}:{shval}')
+ payload = ('\r\n'.join(headers)).encode()
+ savefile = mkstemp('in-header-pgp-verify')[1]
+ with open(savefile, 'wb') as fh:
+ fh.write(sig)
+
+ gpgargs = list()
+ config = get_main_config()
+ trustmodel = config.get('attestation-trust-model', 'tofu')
+ if trustmodel == 'tofu':
+ gpgargs += ['--trust-model', 'tofu', '--tofu-default-policy', 'good']
+ gpgargs += ['--verify', '--status-fd=1', savefile, '-']
+ ecode, out, err = gpg_run_command(gpgargs, stdin=payload)
+ os.unlink(savefile)
+ output = out.decode()
+
gs_matches = re.search(r'^\[GNUPG:] GOODSIG ([0-9A-F]+)\s+.*$', output, re.M)
if gs_matches:
logger.debug(' GOODSIG')
self.good = True
keyid = gs_matches.groups()[0]
- self.attestor = LoreAttestor(keyid)
+ self.attestor = LoreAttestorPGP(keyid)
puid = '%s <%s>' % self.attestor.get_primary_uid()
vs_matches = re.search(r'^\[GNUPG:] VALIDSIG ([0-9A-F]+) (\d{4}-\d{2}-\d{2}) (\d+)', output, re.M)
if vs_matches:
logger.debug(' VALIDSIG')
self.valid = True
ymd = vs_matches.groups()[1]
- self.sigdate = datetime.datetime.strptime(ymd, '%Y-%m-%d')
+ self.sigdate = datetime.datetime.strptime(ymd, '%Y-%m-%d').replace(tzinfo=datetime.timezone.utc)
# Do we have a TRUST_(FULLY|ULTIMATE)?
ts_matches = re.search(r'^\[GNUPG:] TRUST_(FULLY|ULTIMATE)', output, re.M)
if ts_matches:
@@ -1545,18 +1782,9 @@ class LoreAttestationSignature:
if matches:
self.errors.add('Missing public key: %s' % matches.groups()[0])
- def __repr__(self):
- out = list()
- out.append(' good: %s' % self.good)
- out.append(' valid: %s' % self.valid)
- out.append(' trusted: %s' % self.trusted)
- if self.attestor is not None:
- out.append(' attestor: %s' % self.attestor.keyid)
-
- out.append(' --- validation errors ---')
- for error in self.errors:
- out.append(' | %s' % error)
- return '\n'.join(out)
+ # A couple of final verifications
+ self.verify_time_drift()
+ # XXX: Need to verify identity domain
class LoreAttestation:
@@ -1568,9 +1796,6 @@ class LoreAttestation:
self.mb = base64.b64encode(_m.digest()).decode()
self.pb = base64.b64encode(_p.digest()).decode()
- self.hashes_header_name = 'X-Patch-Hashes'
- self.sig_header_name = 'X-Patch-Sig'
-
self.lsig = None
self.passing = False
self.iv = False
@@ -1592,90 +1817,50 @@ class LoreAttestation:
out.append(' iv: %s' % self.iv)
out.append(' mv: %s' % self.mv)
out.append(' pv: %s' % self.pv)
+ out.append(' pass: %s' % self.passing)
return '\n'.join(out)
- @staticmethod
- def verify_identity_domain(msg, identity: str, domain: str) -> bool:
- # Domain is supposed to be present in identity
- if not identity.endswith(domain):
- logger.debug('domain (d=%s) is not in identity (i=%s)', domain, identity)
- return False
- fromeml = email.utils.getaddresses(msg.get_all('from', []))[0][1]
- if identity.find('@') < 0:
- logger.debug('identity must contain @ (i=%s)', identity)
- return False
- ilocal, idomain = identity.split('@')
- # identity is supposed to be present in from
- if not fromeml.endswith(f'@{idomain}'):
- logger.debug('identity (i=%s) does not match from (from=%s)', identity, fromeml)
- return False
- logger.debug('identity and domain match From header')
- return True
-
- @staticmethod
- def get_gpg_attestation(smsg: bytes, dsig: bytes) -> LoreAttestationSignature:
- # We can't pass both the detached sig and the content on stdin, so
- # use a temporary file
- savefile = mkstemp('in-header-pgp-verify')[1]
- with open(savefile, 'wb') as fh:
- fh.write(dsig)
-
- gpgargs = list()
- config = get_main_config()
- if config['attestation-trust-model'] == 'tofu':
- gpgargs += ['--trust-model', 'tofu', '--tofu-default-policy', 'good']
- gpgargs += ['--verify', '--status-fd=1', savefile, '-']
- ecode, out, err = gpg_run_command(gpgargs, stdin=smsg)
- os.unlink(savefile)
- return LoreAttestationSignature(out.decode(), config['attestation-trust-model'])
-
- @staticmethod
- def dkim_canonicalize_header(hname, hval):
- hname = hname.lower()
- hval = hval.strip()
- hval = re.sub(r'\n', '', hval)
- hval = re.sub(r'\s+', ' ', hval)
- return hname, hval
-
- @staticmethod
- def get_parts_from_header(hstr: str) -> dict:
- hstr = re.sub(r'\s*', '', hstr)
- hdata = dict()
- for chunk in hstr.split(';'):
- parts = chunk.split('=', 1)
- if len(parts) < 2:
- continue
- hdata[parts[0]] = parts[1]
- return hdata
-
def validate(self, msg):
- shdr = msg.get(self.sig_header_name)
+ # Check if we have a X-Patch-Sig header. At this time, we only support two modes:
+ # - GPG mode, which we check for fist
+ # - Plain DKIM mode, which we check as fall-back
+ # More modes may be coming in the future, depending on feedback.
+ shdr = msg.get(HDR_PATCH_SIG)
+ hhdr = msg.get(HDR_PATCH_HASHES)
+ if hhdr is None:
+ # Do we have a dkim signature header?
+ if msg.get('DKIM-Signature'):
+ if can_dkim_verify:
+ self.lsig = LoreAttestationSignatureDKIM(msg)
+ if self.lsig.passing:
+ self.passing = True
+ self.iv = True
+ self.mv = True
+ self.pv = True
+ return self.passing
+ return None
+
if shdr is None:
return None
- sdata = LoreAttestation.get_parts_from_header(shdr)
- sig = base64.b64decode(sdata['b'])
- headers = list()
- hhname, hhval = LoreAttestation.dkim_canonicalize_header(self.hashes_header_name,
- str(msg.get(self.hashes_header_name)))
- headers.append(f'{hhname}:{hhval}')
- # Now we add the sig header itself, without b= content
- shname, shval = LoreAttestation.dkim_canonicalize_header(self.sig_header_name, shdr)
- shval = shval.rsplit('; b=')[0] + '; b='
- headers.append(f'{shname}:{shval}')
- payload = ('\r\n'.join(headers)).encode()
- self.lsig = LoreAttestation.get_gpg_attestation(payload, sig)
- if self.lsig.passing:
- hdata = LoreAttestation.get_parts_from_header(hhval)
- if hdata['i'] == self.ib:
- self.iv = True
- if hdata['m'] == self.mb:
- self.mv = True
- if hdata['p'] == self.pb:
- self.pv = True
+
+ sdata = get_parts_from_header(shdr)
+ if sdata.get('m') == 'pgp':
+ self.lsig = LoreAttestationSignaturePGP(msg)
+ if self.lsig.passing:
+ hdata = get_parts_from_header(hhdr)
+ if hdata['i'] == self.ib:
+ self.iv = True
+ if hdata['m'] == self.mb:
+ self.mv = True
+ if hdata['p'] == self.pb:
+ self.pv = True
if self.iv and self.mv and self.pv:
self.passing = True
+ if self.lsig is None:
+ return None
+
return self.passing
@@ -2112,3 +2297,40 @@ def parse_int_range(intrange, upper=None):
yield from range(int(nr[0]), upper+1)
else:
logger.critical('Unknown range value specified: %s', n)
+
+
+def dkim_canonicalize_header(hname, hval):
+ hname = hname.lower()
+ hval = hval.strip()
+ hval = re.sub(r'\n', '', hval)
+ hval = re.sub(r'\s+', ' ', hval)
+ return hname, hval
+
+
+def get_parts_from_header(hstr: str) -> dict:
+ hstr = re.sub(r'\s*', '', hstr)
+ hdata = dict()
+ for chunk in hstr.split(';'):
+ parts = chunk.split('=', 1)
+ if len(parts) < 2:
+ continue
+ hdata[parts[0]] = parts[1]
+ return hdata
+
+
+def dkim_get_txt(name: bytes, timeout: int = 5):
+ lookup = name.decode()
+ logger.debug('DNS-lookup: %s', lookup)
+ try:
+ a = _resolver.resolve(lookup, dns.rdatatype.TXT, raise_on_no_answer=False, lifetime=timeout, search=True)
+ # Find v=DKIM1
+ for r in a.response.answer:
+ if r.rdtype == dns.rdatatype.TXT:
+ for item in r.items:
+ # Concatenate all strings
+ txtdata = b''.join(item.strings)
+ if txtdata.find(b'v=DKIM1') >= 0:
+ return txtdata
+ except dns.resolver.NXDOMAIN:
+ pass
+ return None
diff --git a/requirements.txt b/requirements.txt
index f229360..11d400d 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1 +1,7 @@
-requests
+requests ~= 2.24.0
+# These are optional, needed for attestation features
+dnspython~=2.0.0
+dkimpy~=1.0.5
+# These may be required in the future for other patch attestation features
+#pycryptodomex~=3.9.9
+#PyNaCl~=1.4.0
--
2.26.2
^ permalink raw reply related [flat|nested] 5+ messages in thread
* [PATCH 2/4] Add very simple dkim key caching
2020-11-20 21:27 [PATCH 0/4] b4: Initial DKIM attestation implementation Konstantin Ryabitsev
2020-11-20 21:27 ` [PATCH 1/4] Add initial support for DKIM attestation Konstantin Ryabitsev
@ 2020-11-20 21:27 ` Konstantin Ryabitsev
2020-11-20 21:27 ` [PATCH 3/4] Fix signature verification for b4 pr Konstantin Ryabitsev
2020-11-20 21:27 ` [PATCH 4/4] Fix in-header attestation code Konstantin Ryabitsev
3 siblings, 0 replies; 5+ messages in thread
From: Konstantin Ryabitsev @ 2020-11-20 21:27 UTC (permalink / raw)
To: signatures; +Cc: Konstantin Ryabitsev
We're still spending too much time in dns lookups, even though they are
supposed to be cached.
Signed-off-by: Konstantin Ryabitsev <konstantin@linuxfoundation.org>
---
b4/__init__.py | 37 +++++++++++++++++++++----------------
1 file changed, 21 insertions(+), 16 deletions(-)
diff --git a/b4/__init__.py b/b4/__init__.py
index c552a5f..ab5797d 100644
--- a/b4/__init__.py
+++ b/b4/__init__.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# SPDX-License-Identifier: GPL-2.0-or-later
# Copyright (C) 2020 by the Linux Foundation
import subprocess
@@ -139,6 +138,8 @@ SUBKEY_DATA = dict()
REQSESSION = None
# Indicates that we've cleaned cache already
_CACHE_CLEANED = False
+# Used for dkim key lookups
+_DKIM_DNS_CACHE = dict()
class LoreMailbox:
@@ -2319,18 +2320,22 @@ def get_parts_from_header(hstr: str) -> dict:
def dkim_get_txt(name: bytes, timeout: int = 5):
- lookup = name.decode()
- logger.debug('DNS-lookup: %s', lookup)
- try:
- a = _resolver.resolve(lookup, dns.rdatatype.TXT, raise_on_no_answer=False, lifetime=timeout, search=True)
- # Find v=DKIM1
- for r in a.response.answer:
- if r.rdtype == dns.rdatatype.TXT:
- for item in r.items:
- # Concatenate all strings
- txtdata = b''.join(item.strings)
- if txtdata.find(b'v=DKIM1') >= 0:
- return txtdata
- except dns.resolver.NXDOMAIN:
- pass
- return None
+ global _DKIM_DNS_CACHE
+ if name not in _DKIM_DNS_CACHE:
+ lookup = name.decode()
+ logger.debug('DNS-lookup: %s', lookup)
+ try:
+ a = _resolver.resolve(lookup, dns.rdatatype.TXT, raise_on_no_answer=False, lifetime=timeout, search=True)
+ # Find v=DKIM1
+ for r in a.response.answer:
+ if r.rdtype == dns.rdatatype.TXT:
+ for item in r.items:
+ # Concatenate all strings
+ txtdata = b''.join(item.strings)
+ if txtdata.find(b'v=DKIM1') >= 0:
+ _DKIM_DNS_CACHE[name] = txtdata
+ return txtdata
+ except dns.resolver.NXDOMAIN:
+ pass
+ _DKIM_DNS_CACHE[name] = None
+ return _DKIM_DNS_CACHE[name]
--
2.26.2
^ permalink raw reply related [flat|nested] 5+ messages in thread
* [PATCH 3/4] Fix signature verification for b4 pr
2020-11-20 21:27 [PATCH 0/4] b4: Initial DKIM attestation implementation Konstantin Ryabitsev
2020-11-20 21:27 ` [PATCH 1/4] Add initial support for DKIM attestation Konstantin Ryabitsev
2020-11-20 21:27 ` [PATCH 2/4] Add very simple dkim key caching Konstantin Ryabitsev
@ 2020-11-20 21:27 ` Konstantin Ryabitsev
2020-11-20 21:27 ` [PATCH 4/4] Fix in-header attestation code Konstantin Ryabitsev
3 siblings, 0 replies; 5+ messages in thread
From: Konstantin Ryabitsev @ 2020-11-20 21:27 UTC (permalink / raw)
To: signatures; +Cc: Konstantin Ryabitsev
We moved pgp sig verification code around, so fix it for the invocation
in b4 pr.
Signed-off-by: Konstantin Ryabitsev <konstantin@linuxfoundation.org>
---
b4/__init__.py | 76 +++++++++++++++++++++++++++++++-------------------
b4/attest.py | 4 +--
b4/pr.py | 14 ++++++----
3 files changed, 57 insertions(+), 37 deletions(-)
diff --git a/b4/__init__.py b/b4/__init__.py
index ab5797d..ac0e85c 100644
--- a/b4/__init__.py
+++ b/b4/__init__.py
@@ -1753,35 +1753,11 @@ class LoreAttestationSignaturePGP(LoreAttestationSignature):
os.unlink(savefile)
output = out.decode()
- gs_matches = re.search(r'^\[GNUPG:] GOODSIG ([0-9A-F]+)\s+.*$', output, re.M)
- if gs_matches:
- logger.debug(' GOODSIG')
- self.good = True
- keyid = gs_matches.groups()[0]
- self.attestor = LoreAttestorPGP(keyid)
- puid = '%s <%s>' % self.attestor.get_primary_uid()
- vs_matches = re.search(r'^\[GNUPG:] VALIDSIG ([0-9A-F]+) (\d{4}-\d{2}-\d{2}) (\d+)', output, re.M)
- if vs_matches:
- logger.debug(' VALIDSIG')
- self.valid = True
- ymd = vs_matches.groups()[1]
- self.sigdate = datetime.datetime.strptime(ymd, '%Y-%m-%d').replace(tzinfo=datetime.timezone.utc)
- # Do we have a TRUST_(FULLY|ULTIMATE)?
- ts_matches = re.search(r'^\[GNUPG:] TRUST_(FULLY|ULTIMATE)', output, re.M)
- if ts_matches:
- logger.debug(' TRUST_%s', ts_matches.groups()[0])
- self.trusted = True
- self.passing = True
- else:
- self.errors.add('Insufficient trust (model=%s): %s (%s)'
- % (trustmodel, keyid, puid))
- else:
- self.errors.add('Signature not valid from key: %s (%s)' % (keyid, puid))
- else:
- # Are we missing a key?
- matches = re.search(r'^\[GNUPG:] NO_PUBKEY ([0-9A-F]+)$', output, re.M)
- if matches:
- self.errors.add('Missing public key: %s' % matches.groups()[0])
+ self.good, self.valid, self.trusted, self.attestor, self.sigdate, self.errors = \
+ validate_gpg_signature(output, trustmodel)
+
+ if self.good and self.valid and self.trusted:
+ self.passing = True
# A couple of final verifications
self.verify_time_drift()
@@ -2319,6 +2295,48 @@ def get_parts_from_header(hstr: str) -> dict:
return hdata
+def validate_gpg_signature(output, trustmodel):
+ good = False
+ valid = False
+ trusted = False
+ attestor = None
+ sigdate = None
+ errors = set()
+ gs_matches = re.search(r'^\[GNUPG:] GOODSIG ([0-9A-F]+)\s+.*$', output, re.M)
+ if gs_matches:
+ logger.debug(' GOODSIG')
+ good = True
+ keyid = gs_matches.groups()[0]
+ attestor = LoreAttestorPGP(keyid)
+ puid = '%s <%s>' % attestor.get_primary_uid()
+ vs_matches = re.search(r'^\[GNUPG:] VALIDSIG ([0-9A-F]+) (\d{4}-\d{2}-\d{2}) (\d+)', output, re.M)
+ if vs_matches:
+ logger.debug(' VALIDSIG')
+ valid = True
+ ymd = vs_matches.groups()[1]
+ sigdate = datetime.datetime.strptime(ymd, '%Y-%m-%d').replace(tzinfo=datetime.timezone.utc)
+ # Do we have a TRUST_(FULLY|ULTIMATE)?
+ ts_matches = re.search(r'^\[GNUPG:] TRUST_(FULLY|ULTIMATE)', output, re.M)
+ if ts_matches:
+ logger.debug(' TRUST_%s', ts_matches.groups()[0])
+ trusted = True
+ else:
+ errors.add('Insufficient trust (model=%s): %s (%s)' % (trustmodel, keyid, puid))
+ else:
+ errors.add('Signature not valid from key: %s (%s)' % (attestor.keyid, puid))
+ else:
+ # Are we missing a key?
+ matches = re.search(r'^\[GNUPG:] NO_PUBKEY ([0-9A-F]+)$', output, re.M)
+ if matches:
+ errors.add('Missing public key: %s' % matches.groups()[0])
+ # Is the key expired?
+ matches = re.search(r'^\[GNUPG:] EXPKEYSIG (.*)$', output, re.M)
+ if matches:
+ errors.add('Expired key: %s' % matches.groups()[0])
+
+ return good, valid, trusted, attestor, sigdate, errors
+
+
def dkim_get_txt(name: bytes, timeout: int = 5):
global _DKIM_DNS_CACHE
if name not in _DKIM_DNS_CACHE:
diff --git a/b4/attest.py b/b4/attest.py
index 4391b19..a4012d7 100644
--- a/b4/attest.py
+++ b/b4/attest.py
@@ -39,7 +39,7 @@ def in_header_attest(lmsg: b4.LoreMessage, mode: str = 'pgp', replace: bool = Fa
f'm={lmsg.attestation.mb}',
f'p={lmsg.attestation.pb}',
]
- hhname, hhval = b4.LoreAttestation.dkim_canonicalize_header(lmsg.attestation.hashes_header_name, '; '.join(hparts))
+ hhname, hhval = b4.dkim_canonicalize_header(lmsg.attestation.hashes_header_name, '; '.join(hparts))
headers.append(f'{hhname}:{hhval}')
logger.debug('Signing with mode=%s', mode)
@@ -59,7 +59,7 @@ def in_header_attest(lmsg: b4.LoreMessage, mode: str = 'pgp', replace: bool = Fa
'b=',
]
- shname, shval = b4.LoreAttestation.dkim_canonicalize_header(lmsg.attestation.sig_header_name, '; '.join(hparts))
+ shname, shval = b4.dkim_canonicalize_header(lmsg.attestation.sig_header_name, '; '.join(hparts))
headers.append(f'{shname}:{shval}')
payload = '\r\n'.join(headers).encode()
ecode, out, err = b4.gpg_run_command(gpgargs, payload)
diff --git a/b4/pr.py b/b4/pr.py
index 40d3127..b7ed9e1 100644
--- a/b4/pr.py
+++ b/b4/pr.py
@@ -127,27 +127,29 @@ def attest_fetch_head(gitdir, lmsg):
ecode, out = b4.git_run_command(gitdir, ['verify-tag', '--raw', 'FETCH_HEAD'], logstderr=True)
elif otype == 'commit':
ecode, out = b4.git_run_command(gitdir, ['verify-commit', '--raw', 'FETCH_HEAD'], logstderr=True)
- lsig = b4.LoreAttestationSignature(out, 'git')
- if lsig.good and lsig.valid and lsig.trusted:
+
+ good, valid, trusted, attestor, sigdate, errors = b4.validate_gpg_signature(out, 'pgp')
+
+ if good and valid and trusted:
passing = True
out = out.strip()
if not len(out) and attpolicy != 'check':
- lsig.errors.add('Remote %s is not signed!' % otype)
+ errors.add('Remote %s is not signed!' % otype)
if passing:
- trailer = lsig.attestor.get_trailer(lmsg.fromemail)
+ trailer = attestor.get_trailer(lmsg.fromemail)
logger.info(' ---')
logger.info(' %s %s', attpass, trailer)
return
- if lsig.errors:
+ if errors:
logger.critical(' ---')
if len(out):
logger.critical(' Pull request is signed, but verification did not succeed:')
else:
logger.critical(' Pull request verification did not succeed:')
- for error in lsig.errors:
+ for error in errors:
logger.critical(' %s %s', attfail, error)
if attpolicy == 'hardfail':
--
2.26.2
^ permalink raw reply related [flat|nested] 5+ messages in thread
* [PATCH 4/4] Fix in-header attestation code
2020-11-20 21:27 [PATCH 0/4] b4: Initial DKIM attestation implementation Konstantin Ryabitsev
` (2 preceding siblings ...)
2020-11-20 21:27 ` [PATCH 3/4] Fix signature verification for b4 pr Konstantin Ryabitsev
@ 2020-11-20 21:27 ` Konstantin Ryabitsev
3 siblings, 0 replies; 5+ messages in thread
From: Konstantin Ryabitsev @ 2020-11-20 21:27 UTC (permalink / raw)
To: signatures; +Cc: Konstantin Ryabitsev
We've moved some constant declarations around, so fix the code to look
for them in the right places.
Signed-off-by: Konstantin Ryabitsev <konstantin@linuxfoundation.org>
---
b4/attest.py | 14 +++++++-------
1 file changed, 7 insertions(+), 7 deletions(-)
diff --git a/b4/attest.py b/b4/attest.py
index a4012d7..1b464d0 100644
--- a/b4/attest.py
+++ b/b4/attest.py
@@ -19,12 +19,12 @@ logger = b4.logger
def in_header_attest(lmsg: b4.LoreMessage, mode: str = 'pgp', replace: bool = False) -> None:
- if lmsg.msg.get(lmsg.attestation.hashes_header_name):
+ if lmsg.msg.get(b4.HDR_PATCH_HASHES):
if not replace:
logger.info(' attest: message already attested')
return
- del lmsg.msg[lmsg.attestation.hashes_header_name]
- del lmsg.msg[lmsg.attestation.sig_header_name]
+ del lmsg.msg[b4.HDR_PATCH_HASHES]
+ del lmsg.msg[b4.HDR_PATCH_SIG]
logger.info(' attest: generating attestation hashes')
if not lmsg.attestation:
@@ -39,7 +39,7 @@ def in_header_attest(lmsg: b4.LoreMessage, mode: str = 'pgp', replace: bool = Fa
f'm={lmsg.attestation.mb}',
f'p={lmsg.attestation.pb}',
]
- hhname, hhval = b4.dkim_canonicalize_header(lmsg.attestation.hashes_header_name, '; '.join(hparts))
+ hhname, hhval = b4.dkim_canonicalize_header(b4.HDR_PATCH_HASHES, '; '.join(hparts))
headers.append(f'{hhname}:{hhval}')
logger.debug('Signing with mode=%s', mode)
@@ -59,7 +59,7 @@ def in_header_attest(lmsg: b4.LoreMessage, mode: str = 'pgp', replace: bool = Fa
'b=',
]
- shname, shval = b4.dkim_canonicalize_header(lmsg.attestation.sig_header_name, '; '.join(hparts))
+ shname, shval = b4.dkim_canonicalize_header(b4.HDR_PATCH_SIG, '; '.join(hparts))
headers.append(f'{shname}:{shval}')
payload = '\r\n'.join(headers).encode()
ecode, out, err = b4.gpg_run_command(gpgargs, payload)
@@ -74,8 +74,8 @@ def in_header_attest(lmsg: b4.LoreMessage, mode: str = 'pgp', replace: bool = Fa
hhdr = email.header.make_header([(hhval.encode(), 'us-ascii')], maxlinelen=78)
shdr = email.header.make_header([(shval.encode(), 'us-ascii')], maxlinelen=78)
- lmsg.msg[lmsg.attestation.hashes_header_name] = hhdr
- lmsg.msg[lmsg.attestation.sig_header_name] = shdr
+ lmsg.msg[b4.HDR_PATCH_HASHES] = hhdr
+ lmsg.msg[b4.HDR_PATCH_SIG] = shdr
def header_splitter(longstr: str, limit: int = 77) -> str:
--
2.26.2
^ permalink raw reply related [flat|nested] 5+ messages in thread
end of thread, other threads:[~2020-11-20 21:27 UTC | newest]
Thread overview: 5+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-11-20 21:27 [PATCH 0/4] b4: Initial DKIM attestation implementation Konstantin Ryabitsev
2020-11-20 21:27 ` [PATCH 1/4] Add initial support for DKIM attestation Konstantin Ryabitsev
2020-11-20 21:27 ` [PATCH 2/4] Add very simple dkim key caching Konstantin Ryabitsev
2020-11-20 21:27 ` [PATCH 3/4] Fix signature verification for b4 pr Konstantin Ryabitsev
2020-11-20 21:27 ` [PATCH 4/4] Fix in-header attestation code Konstantin Ryabitsev
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.