aboutsummaryrefslogtreecommitdiffstats
path: root/sig-prover.py
blob: 447f41c83109b3a8cf24cb9b4a54186b47becfcf (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
#!/usr/bin/env python3
#
# This script will check random content published on www.kernel.org/pub against
# authorized signatures to identify when corruption or substitution happens. The name
# comes from the Russian word /proveryat/, meaning "to verify".
#
# The script it supposed to be fire-and-forget, running in a screen session, as
# a background task, or as a systemd service, with reports sent to admin@kernel.org.
#
# E.g. (after you play with it to verify that it's doing the right thing):
# ./sig-prover -c sig-prover.conf -q &
#
# CAUTION:
#    This script is not a guaranteed mechanism to detect intrusion -- an
#    attacker can defeat it by analyzing access patterns/IPs and serving
#    different content when it suspects that someone is running an automated
#    signature verification check. The script can probably be improved by
#    adding random delays between retrieving the tarball and the detached
#    signature, setting a referrer value, etc. However, even with added
#    measures, it will always act fairly predictably, so there will always
#    remain a way to detect and defeat it.
#
#    If you download tarballs from kernel.org for any purpose, you should
#    always run your own verification on each downloaded file.
#    https://www.kernel.org/signature.html
#
# SPDX-License-Identifier: GPL-2.0-or-later
#
# -*- coding: utf-8 -*-
#
__author__ = 'Konstantin Ryabitsev <konstantin@linuxfoundation.org>'

import sys
import os
import logging
import argparse
import requests
import random
import subprocess
import tempfile
import re
import time
import json

import email
import email.message
import email.utils
import smtplib

from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry

logger = logging.getLogger(__name__)
REQSESSION = None
GPGBIN = '/usr/bin/gpg'
SEEN = dict()

__VERSION__ = '0.1'


def get_requests_session(useragent=None):
    global REQSESSION
    if REQSESSION is None:
        REQSESSION = requests.session()
        retry = Retry(connect=3, backoff_factor=1)
        adapter = HTTPAdapter(max_retries=retry)
        REQSESSION.mount('http://', adapter)
        REQSESSION.mount('https://', adapter)
        if useragent is None:
            useragent = f'Sig-Prover/{__VERSION__}'

        headers = {
            'User-Agent': useragent,
        }
        REQSESSION.headers.update(headers)

    return REQSESSION


def get_random_target(config, rsect):
    global SEEN
    if rsect not in SEEN:
        SEEN[rsect] = set()

    ua = config[rsect].get('useragent')
    if ua:
        ua = random.choice(ua.split('\n'))
    rses = get_requests_session(useragent=ua)
    candidates = list()

    # Is it a releases.json, or a collection of hosts and paths?
    jurl = config[rsect].get('json')
    if jurl:
        logger.info('  retrieving %s', jurl)
        resp = rses.get(jurl)
        resp.raise_for_status()
        rels = json.loads(resp.content)
        for release in rels['releases']:
            if not release['pgp']:
                continue
            candidate = release['source']
            # Do we define hosts?
            hosts = config[rsect].get('hosts')
            if hosts and candidate.find('https://cdn') == 0:
                # Swap in the CDN URL with an actual host URL, as it doesn't
                # really make sense to check things over cdn cache which we don't
                # control and can't do anything about.
                for rhost in config[rsect].get('hosts').split('\n'):
                    hostcand = candidate.replace('https://cdn.kernel.org', rhost)
                    if hostcand not in SEEN[rsect]:
                        candidate = hostcand
                        break

            if candidate in SEEN[rsect]:
                logger.debug('Already checked %s in this session', candidate)
                continue
            candidates.append(candidate)

    else:
        # Grab a random host
        rhost = random.choice(config[rsect].get('hosts').split('\n'))
        # Grab a random path
        rpath = random.choice(config[rsect].get('paths').split('\n'))
        rurl = rhost + rpath
        # Now we grab the sha256sums.txt file from there
        shapath = rurl + 'sha256sums.asc'
        logger.info('  retrieving %s', shapath)
        resp = rses.get(shapath)
        resp.raise_for_status()

        keyring = os.path.join(config[rsect].get('keyringdir'), config[rsect].get('dirsigner_keyring'))
        logger.info('    verifying with %s', keyring)
        gpgargs = ['--verify', '--status-fd=2', '-']
        ecode, out, err = gpg_run_command(gpgargs, keyring, stdin=resp.content)
        if ecode == 0:
            good, valid, created, errors = validate_gpg_signature(err.decode())
            if good and valid:
                logger.info('    checksums signature is good and valid (created: %s)', created)
        else:
            errors = err.decode().split('\n')

        if errors:
            report_badness(config[rsect], shapath, errors)

        rmask = random.choice(config[rsect].get('masks').split('\n'))
        for line in resp.content.split(b'\n'):
            if re.search(rmask.encode(), line):
                filen = line.split()[1].decode()
                candidate = rurl + filen
                if candidate in SEEN[rsect]:
                    logger.debug('Already checked %s in this session', candidate)
                    continue
                candidates.append(rurl + filen)

    if not candidates:
        logger.debug('Already tried all possible choices for %s', rsect)
        candidates = list(SEEN[rsect])
        SEEN[rsect] = set()

    if not candidates:
        logger.info('No suitable candidates found for %s', rsect)
        return None

    candidate = random.choice(candidates)
    SEEN[rsect].add(candidate)
    return candidate


def _run_command(cmdargs, stdin=None):
    logger.debug('Running %s' % ' '.join(cmdargs))

    sp = subprocess.Popen(cmdargs,
                          stdout=subprocess.PIPE,
                          stdin=subprocess.PIPE,
                          stderr=subprocess.PIPE)

    (output, error) = sp.communicate(input=stdin)

    return sp.returncode, output, error


def gpg_run_command(args, keyring, stdin=None):
    cmdargs = [GPGBIN, '--batch', '--no-auto-key-retrieve', '--no-auto-check-trustdb', '--no-default-keyring',
               '--keyring', keyring]
    cmdargs += args

    return _run_command(cmdargs, stdin=stdin)


def validate_gpg_signature(output):
    good = False
    valid = False
    created = None
    errors = set()
    gs_matches = re.search(r'^\[GNUPG:] GOODSIG ([0-9A-F]+)\s+.*$', output, re.M)
    if gs_matches:
        logger.debug('  GOODSIG')
        good = True
        keyid = gs_matches.groups()[0]
        vs_matches = re.search(r'^\[GNUPG:] VALIDSIG ([0-9A-F]+) (\d{4}-\d{2}-\d{2}) (\d+)', output, re.M)
        if vs_matches:
            logger.debug('  VALIDSIG')
            valid = True
            created = vs_matches.groups()[1]
        else:
            errors.add('Signature not valid from key: %s' % keyid)
    else:
        # Are we missing a key?
        matches = re.search(r'^\[GNUPG:] NO_PUBKEY ([0-9A-F]+)$', output, re.M)
        if matches:
            errors.add('Missing public key: %s' % matches.groups()[0])
        # Is the key expired?
        matches = re.search(r'^\[GNUPG:] EXPKEYSIG (.*)$', output, re.M)
        if matches:
            errors.add('Expired key: %s' % matches.groups()[0])

    return good, valid, created, errors


def report_badness(config, furl, errors):
    if not config.get('notify'):
        logger.critical('ERROR: failed verifying: %s', furl)
        for entry in errors:
            logger.critical('  %s', entry)
        logger.debug('WARNING: notify not set, not sending a mail report')
        sys.exit(1)

    logger.info('ERROR: failed verifying: %s', furl)
    msg = email.message.Message()

    # Set to and cc
    msg['To'] = config.get('notify')
    targets = [msg['To']]

    ccs = config.get('notify_cc', '')
    if ccs:
        msg['Cc'] = ccs
        targets += [x.strip() for x in ccs.split(',')]

    msg['Subject'] = f'SIGFAIL: {furl}'
    msg['From'] = config.get('mailfrom', 'devnull@kernel.org')

    msg['Message-Id'] = email.utils.make_msgid('sig-prover')
    msg['Date'] = email.utils.formatdate(localtime=True)

    body = list()
    body.append('Hello:')
    body.append('')
    body.append('The following URL failed signature verification:')
    body.append(f'  {furl}')
    body.append('')
    body.append('Errors:')
    for error in errors:
        body.append(f'  {error}')

    msg.set_payload('\r\n'.join(body))

    logger.debug('Message follows')
    logger.debug(msg.as_string())

    mailhost = config.get('mailhost', 'localhost')

    try:
        server = smtplib.SMTP(mailhost)
        if config.getboolean('mailtls'):
            server.starttls()

        muser = config.get('mailuser')
        mpass = config.get('mailpass')
        if muser and mpass:
            server.login(muser, mpass)

        logger.info('Sending mail to %s', ', '.join(targets))
        server.sendmail(msg['From'], targets, msg.as_string())
        server.close()
    except Exception as ex: # noqa
        logger.critical('Unable to send mail to %s', ', '.join(targets))
        logger.critical('Attempting to use %s returned:', mailhost)
        logger.critical(ex)


def verify_tarball(config, turl):
    # Try the exact filename + .sign first
    signurl = turl + '.sign'
    rses = get_requests_session()
    resp = rses.get(signurl)
    zext = None
    zbin = None
    if resp.status_code > 200:
        # Try dropping the last .foo and trying again
        parts = turl.rsplit('.', 1)
        signurl = parts[0] + '.sign'
        zext = parts[1]
        # Are we capable of dealing with zext?
        zbin = config.get(f'un{zext}')
        if not zbin:
            logger.critical('Not aware of how to deal with %s compression', zext)
            sys.exit(1)
        logger.debug('Will use %s for uncompression', zbin)
        resp = rses.get(signurl)
    resp.raise_for_status()
    logger.info('  retrieving %s', signurl)
    with tempfile.TemporaryDirectory(suffix='.sig-prover', dir=config.get('tempdir', '/tmp')) as td:
        signfile = os.path.join(td, 'content.sig')
        with open(signfile, 'wb') as sfh:
            sfh.write(resp.content)
        resp.close()
        logger.info('  retrieving %s', turl)
        resp = rses.get(turl, stream=True)
        resp.raise_for_status()
        contentfile = os.path.join(td, 'content')
        if zext:
            contentfile = f'{contentfile}.{zext}'
        with open(contentfile, 'wb') as cfh:
            for chunk in resp.iter_content(chunk_size=8192):
                cfh.write(chunk)
        resp.close()
        if zext:
            logger.info('    uncompressing %s', zext)
            cmdargs = [zbin, contentfile]
            ecode, out, err = _run_command(cmdargs)
            if ecode > 0:
                # Failure to uncompress is not a critical failure, because
                # this could be the result of any number of things: bad cache,
                # errors during transmission, etc. We don't care for such
                # situations -- we are looking specifically at bad signatures.
                logger.info('Failed uncompressing %s')
                return
            contentfile = os.path.join(td, 'content')
        gpgargs = ['--verify', '--status-fd=2', signfile, contentfile]
        keyring = os.path.join(config.get('keyringdir'), config.get('keyring'))
        logger.info('    verifying with %s', keyring)
        ecode, out, err = gpg_run_command(gpgargs, keyring=keyring)
        if ecode == 0:
            good, valid, created, errors = validate_gpg_signature(err.decode())
            if good and valid:
                logger.info('  signature is good and valid (created: %s)', created)
                return
        else:
            errors = err.decode().split('\n')

        report_badness(config, turl, errors)


def get_random_sect(config):
    global GPGBIN
    sects = list(config.sections())
    weights = list()
    for sect in sects:
        weights.append(config[sect].getint('weight', 10))

    rsect = random.choices(sects, weights=weights, k=1)[0]
    if config[rsect].get('gpgbin'):
        GPGBIN = config[rsect].get('gpgbin')

    return rsect


def sig_verify(config):
    rsect = get_random_sect(config)
    logger.info('[%s]', rsect)
    try:
        target = get_random_target(config, rsect)
        if target:
            verify_tarball(config[rsect], target)
    except requests.exceptions.RequestException as ex:
        # Treat failures as non-critical, because hosts can be intermittently
        # unreachable for various reasons.
        logger.info('Failed getting remote content:')
        logger.info(ex)

    return config[rsect].getint('sleep', 0)


def read_config(cfgfile):
    from configparser import ConfigParser, ExtendedInterpolation
    if not os.path.exists(cfgfile):
        sys.stderr.write('ERROR: config file %s does not exist' % cfgfile)
        sys.exit(1)
    fconfig = ConfigParser(interpolation=ExtendedInterpolation())
    fconfig.read(cfgfile)

    return fconfig


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('-c', '--config-file', dest='cfgfile', required=True,
                        help='Config file to use')
    parser.add_argument('-q', '--quiet', dest='quiet', action='store_true', default=False,
                        help='Quiet operation (cron mode)')
    parser.add_argument('-d', '--debug', dest='debug', action='store_true', default=False,
                        help='Output debug information')
    parser.add_argument('-l', '--logfile', dest='logfile',
                        help='Record activity in this log file')

    _cmdargs = parser.parse_args()
    _config = read_config(_cmdargs.cfgfile)
    logger.setLevel(logging.DEBUG)

    if _cmdargs.logfile:
        ch = logging.FileHandler(_cmdargs.logfile)
        formatter = logging.Formatter(f'[%(asctime)s] %(message)s')
        ch.setFormatter(formatter)
        ch.setLevel(logging.INFO)
        logger.addHandler(ch)

    ch = logging.StreamHandler()
    formatter = logging.Formatter('%(message)s')
    ch.setFormatter(formatter)
    if _cmdargs.quiet:
        ch.setLevel(logging.CRITICAL)
    elif _cmdargs.debug:
        ch.setLevel(logging.DEBUG)
    else:
        ch.setLevel(logging.INFO)
    logger.addHandler(ch)

    while True:
        sleep = sig_verify(_config)
        if not sleep:
            break
        logger.info('--- sleeping %s seconds ---', sleep)
        try:
            time.sleep(sleep)
        except KeyboardInterrupt:
            logger.info('Bye')
            sys.exit(0)