diff options
author | Konstantin Ryabitsev <konstantin@linuxfoundation.org> | 2019-03-13 18:50:39 -0400 |
---|---|---|
committer | Konstantin Ryabitsev <konstantin@linuxfoundation.org> | 2019-03-13 18:50:39 -0400 |
commit | 9bc20f52a85fb8d27528973dc69a4acb8702fc85 (patch) | |
tree | 639b8a9baf3a78ff0a7acbdd870a6f1eb78c47f5 | |
parent | 4040fb4f18e1e8d9f164593ee657e65f1326dd44 (diff) | |
download | korg-helpers-9bc20f52a85fb8d27528973dc69a4acb8702fc85.tar.gz |
Properly roll over between public-inbox shards
This does a much better job of handling shard roll-over.
Signed-off-by: Konstantin Ryabitsev <konstantin@linuxfoundation.org>
-rwxr-xr-x | pr-tracker-bot.py | 301 |
1 files changed, 127 insertions, 174 deletions
diff --git a/pr-tracker-bot.py b/pr-tracker-bot.py index 3c876a6..4ac9287 100755 --- a/pr-tracker-bot.py +++ b/pr-tracker-bot.py @@ -31,12 +31,11 @@ import sqlite3 import logging import re import glob -import json from fcntl import lockf, LOCK_EX, LOCK_NB from string import Template -DB_VERSION = 1 +DB_VERSION = 2 # Case doesn't matter PULL_SUBJECT_RE = [ @@ -72,19 +71,53 @@ def make_msgid(idstring=None, domain='kernel.org'): return '<%d.%d.%d%s@%s>' % (timeval, pid, randint, idstring, domain) +def db_migrate_1_to_2(projpath): + pirepo, maxshard = get_pirepo_dir(projpath, None) + old_dbpath = os.path.join(projpath, '{0}.git'.format(maxshard), 'prs.db') + dbconn_old = sqlite3.connect(old_dbpath, sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES) + c_old = dbconn_old.cursor() + # Get all values from old db + pr_rows = c_old.execute('SELECT * FROM prs').fetchall() + h_rows = c_old.execute('SELECT * FROM heads').fetchall() + dbconn_old.close() + + dbpath = os.path.join(projpath, 'prtracker.db') + dbconn = sqlite3.connect(dbpath, sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES) + c = dbconn.cursor() + db_init_pr_sqlite_db(c) + + for row in pr_rows: + c.execute('INSERT INTO prs VALUES(?,?,?,?,?,?)', [maxshard]+list(row)) + + for row in h_rows: + c.execute('INSERT INTO heads VALUES(?,?,?)', [maxshard] + list(row)) + + dbconn.commit() + dbconn.close() + + def db_save_meta(c): c.execute('DELETE FROM meta') c.execute('''INSERT INTO meta VALUES(?)''', (DB_VERSION,)) -def db_save_repo_heads(c, heads): - c.execute('DELETE FROM heads') +def db_save_repo_heads(c, heads, shard=None): + if shard is None: + c.execute('DELETE FROM heads') + for refname, commit_id in heads: + c.execute('''INSERT INTO heads VALUES(?,?)''', (refname, commit_id)) + return + + c.execute('DELETE FROM heads WHERE pi_shard=?', (shard,)) for refname, commit_id in heads: - c.execute('''INSERT INTO heads VALUES(?,?)''', (refname, commit_id)) + c.execute('''INSERT INTO heads VALUES(?,?,?)''', (shard, refname, commit_id)) -def db_get_repo_heads(c): - return c.execute('SELECT refname, commit_id FROM heads').fetchall() +def db_get_repo_heads(c, shard=None): + if shard is None: + return c.execute('SELECT refname, commit_id FROM heads').fetchall() + return c.execute('SELECT refname, commit_id FROM heads WHERE pi_shard=?', + (shard,)).fetchall() def db_init_common_sqlite_db(c): @@ -93,19 +126,20 @@ def db_init_common_sqlite_db(c): version INTEGER )''') db_save_meta(c) - c.execute(''' - CREATE TABLE heads ( - refname TEXT, - commit_id TEXT - )''') def db_init_pr_sqlite_db(c): logger.info('Initializing new sqlite3 db with metadata version %s', DB_VERSION) db_init_common_sqlite_db(c) - + c.execute(''' + CREATE TABLE heads ( + pi_shard INTEGER, + refname TEXT, + commit_id TEXT + )''') c.execute(''' CREATE TABLE prs ( + pi_shard INTEGER, msg_commit_id TEXT UNIQUE, msg_id TEXT UNIQUE, subject TEXT, @@ -118,6 +152,11 @@ def db_init_thanks_sqlite_db(c): logger.info('Initializing new sqlite3 db with metadata version %s', DB_VERSION) db_init_common_sqlite_db(c) c.execute(''' + CREATE TABLE heads ( + refname TEXT, + commit_id TEXT + )''') + c.execute(''' CREATE TABLE thanks ( pr_commit_id TEXT, --- pi.prs.pr_commit_id refname TEXT, @@ -166,24 +205,11 @@ def git_get_repo_heads(gitdir): return refs -def git_get_message_from_pi(pirepo, commit_id): +def git_get_message_from_pi(projpath, shard, commit_id): + pirepo = os.path.join(projpath, '{0}.git'.format(shard)) full_email = git_run_command(pirepo, ['show', '%s:m' % commit_id]) if not len(full_email): - logger.debug('%s not found in %s, attempting to find in the previous shard', - commit_id, pirepo) - shardrepo = os.path.basename(pirepo) - shard = int(shardrepo.split('.')[0]) - if shard <= 0: - # Well, we tried - return None - prevpirepo = os.path.join(os.path.dirname(pirepo), - '{0}.git'.format(shard-1)) - logger.debug('previous shard is in %s', prevpirepo) - full_email = git_run_command(prevpirepo, ['show', '%s:m' % commit_id]) - if not len(full_email): - # it's *not* going to be in the shard before that, so just - # give up at this point -- we assume it got deleted - return None + return None msg = email.message_from_string(full_email.encode('utf-8')) return msg @@ -239,7 +265,7 @@ def get_remote_ref_from_body(body): return repo, ref -def record_pr_data(msg_commit_id, msg, c): +def record_pr_data(shard, msg_commit_id, msg, c): body = get_plain_part(msg) if body is None: return False @@ -278,8 +304,8 @@ def record_pr_data(msg_commit_id, msg, c): logger.debug('Found a new PR: %s', subject) logger.debug(' commit_id: %s', pr_commit_id) try: - c.execute('INSERT INTO prs VALUES(?, ?, ?, ?, ?)', - (msg_commit_id, msg_id, subject, pr_commit_id, received)) + c.execute('INSERT INTO prs VALUES(?, ?, ?, ?, ?, ?)', + (shard, msg_commit_id, msg_id, subject, pr_commit_id, received)) return True except sqlite3.IntegrityError: logger.debug('Got integrity-error for %s', msg_id) @@ -310,7 +336,11 @@ def git_get_new_revs(gitdir, db_heads, git_heads): # No changes in this head continue - rev_range = '%s..%s' % (db_commit_id, git_commit_id) + if len(db_commit_id): + rev_range = '%s..%s' % (db_commit_id, git_commit_id) + else: + rev_range = git_commit_id + lines = git_get_command_lines(gitdir, ['rev-list', '--pretty=oneline', '--reverse', rev_range, refname]) if not lines: @@ -334,55 +364,82 @@ def git_get_merge_id(repo, commit_id): def get_pirepo_dir(projpath, topdir): - # Find the largest #.git in a public-inbox sharded repo - # TODO: handle roll-over between shards if topdir: projpath = os.path.join(topdir, projpath) - # if the path does not end with .git, find the latest shard - pirepo = projpath - if pirepo[-4:] != '.git': - subs = os.listdir(projpath) - at = 0 - while True: - at += 1 - if '{0}.git'.format(at) not in subs: - break + projpath = projpath.rstrip('/') - pirepo = os.path.join(projpath, '{0}.git'.format(at-1)) + # drop the #.git if we find it + if projpath[-4:] == '.git': + projpath = os.path.dirname(projpath) - return pirepo + subs = os.listdir(projpath) + at = 0 + while True: + if '{0}.git'.format(at+1) not in subs: + return projpath, at + at += 1 def parse_pull_requests(pirepo, topdir, dryrun): - pirepo = get_pirepo_dir(pirepo, topdir) + projpath, maxshard = get_pirepo_dir(pirepo, topdir) + pirepo = os.path.join(projpath, '{0}.git'.format(maxshard)) + logger.debug('pirepo=%s', pirepo) git_heads = git_get_repo_heads(pirepo) if not git_heads: logger.critical('Could not get the latest ref in %s', pirepo) sys.exit(1) try: - lockfh = open(os.path.join(pirepo, '.prtracker.lock'), 'w') + lockfh = open(os.path.join(projpath, 'prtracker.lock'), 'w') lockf(lockfh, LOCK_EX | LOCK_NB) except IOError: logger.debug('Could not obtain an exclusive lock, assuming another process is running.') return - # Do we have a prs.db there yet? - dbpath = os.path.join(pirepo, 'prs.db') + # Do we have a prtracker.db there yet? + dbpath = os.path.join(projpath, 'prtracker.db') db_exists = os.path.isfile(dbpath) + if not db_exists: + # Do we have a prs.db in the latest shard (metadata 1.0) + old_dbpath = os.path.join(pirepo, 'prs.db') + if os.path.exists(old_dbpath): + db_migrate_1_to_2(projpath) + db_exists = True + dbconn = sqlite3.connect(dbpath, sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES) c = dbconn.cursor() if not db_exists: # initialize it once we get the latest ref db_init_pr_sqlite_db(c) - db_save_repo_heads(c, git_heads) + db_save_repo_heads(c, git_heads, maxshard) # Exit early dbconn.commit() return - db_heads = db_get_repo_heads(c) + db_heads = db_get_repo_heads(c, maxshard) + if not len(db_heads) and maxshard == 0: + logger.info('Not sure what happened, but I did not find any db heads and maxshard=0') + return + + if not len(db_heads): + # This should only happen when we have rolled over + # between shards, so we need to first finish the + # old shard and then prep next run from starting from + # the new shard + # Prep the next run with the new shard. We use empty + # space so that rev-list formulates a ..foo rev-list + # that returns the revs from the beginning of history + logger.info('Preparing for shard roll-over to %s.git', maxshard) + db_save_repo_heads(c, (('refs/heads/master', ''),), maxshard) + # Now let's finish with the previous shard + maxshard -= 1 + # Refresh these + pirepo = os.path.join(projpath, '{0}.git'.format(maxshard)) + logger.info('Finishing previous shard %s', pirepo) + db_heads = db_get_repo_heads(c, maxshard) + git_heads = git_get_repo_heads(pirepo) newrevs = git_get_new_revs(pirepo, db_heads, git_heads) if not newrevs: @@ -397,15 +454,15 @@ def parse_pull_requests(pirepo, topdir, dryrun): for subject_re in PULL_SUBJECT_RE: if subject_re.match(subject): logger.debug('potential match: "%s"', subject) - msg = git_get_message_from_pi(pirepo, commit_id) - if msg is not None and record_pr_data(commit_id, msg, c): + msg = git_get_message_from_pi(projpath, maxshard, commit_id) + if msg is not None and record_pr_data(maxshard, commit_id, msg, c): new_prs += 1 logger.info('Started tracking: %s', subject) if not new_prs: logger.info('No new PRs found.') - db_save_repo_heads(c, git_heads) + db_save_repo_heads(c, git_heads, maxshard) if not dryrun: dbconn.commit() @@ -474,8 +531,8 @@ def get_all_thanked_prs(c, cutoffdays=30): return sent_prs -def get_all_prs(pirepo, cutoffdays=30): - dbpath = os.path.join(pirepo, 'prs.db') +def get_all_prs(projpath, cutoffdays=30): + dbpath = os.path.join(projpath, 'prtracker.db') if not os.path.isfile(dbpath): # The DB does not exist, and we don't create it # from this location. @@ -485,7 +542,7 @@ def get_all_prs(pirepo, cutoffdays=30): prdbconn = sqlite3.connect(dbpath, sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES) prc = prdbconn.cursor() rows = prc.execute(''' - SELECT pr_commit_id, msg_commit_id + SELECT pi_shard, pr_commit_id, msg_commit_id FROM prs WHERE received >= datetime('now', ?) ''', ('-%d day' % cutoffdays,)).fetchall() @@ -524,7 +581,7 @@ def to_or_cc_contains(tocc, addrs): return False -def thank_for_pr(c, repo, refname, commit_id, pirepo, msg_commit_id, config, dryrun, nomail): +def thank_for_pr(c, repo, refname, commit_id, projpath, pi_shard, msg_commit_id, config, dryrun, nomail): # Make sure we haven't thanked for it already c.execute('SELECT sent_msgid FROM thanks WHERE pr_commit_id=? AND refname=?', (commit_id, refname)) @@ -533,7 +590,7 @@ def thank_for_pr(c, repo, refname, commit_id, pirepo, msg_commit_id, config, dry logger.debug('Already thanked for this PR, skipping') return None - orig = git_get_message_from_pi(pirepo, msg_commit_id) + orig = git_get_message_from_pi(projpath, pi_shard, msg_commit_id) if orig is None: return None @@ -710,8 +767,8 @@ def send_thanks(repo, pitopdir, cmdconfig, nomail, dryrun): tycount = 0 for pirepo, settings in config.items(): - pirepo = get_pirepo_dir(pirepo, pitopdir) - logger.info('Grabbing PR commits from %s', pirepo) + projpath, maxshard = get_pirepo_dir(pirepo, pitopdir) + logger.info('Grabbing PR commits from %s', projpath) logger.debug('config follows') logger.debug(settings) @@ -721,18 +778,18 @@ def send_thanks(repo, pitopdir, cmdconfig, nomail, dryrun): except (IndexError, ValueError): pass - prs = get_all_prs(pirepo, cutoffdays=cutoffdays) - logger.debug('Found %s PRs in %s', len(prs), pirepo) + prs = get_all_prs(projpath, cutoffdays=cutoffdays) + logger.debug('Found %s PRs in %s', len(prs), projpath) un_prs = get_unprocessed_prs(c, prs, cutoffdays=cutoffdays) logger.debug('Of them, %s not already thanked for', len(un_prs)) - for pr_commit_id, msg_commit_id in un_prs: + for pi_shard, pr_commit_id, msg_commit_id in un_prs: logger.debug('Checking %s', pr_commit_id) # Is this pr_commit_id in the repo? lines = git_get_command_lines(repo, ['branch', '--contains', pr_commit_id]) if len(lines): refname = 'refs/heads/%s' % lines[0].split()[-1] logger.debug('Found %s in %s', pr_commit_id, refname) - sent_msgid = thank_for_pr(c, repo, refname, pr_commit_id, pirepo, + sent_msgid = thank_for_pr(c, repo, refname, pr_commit_id, projpath, pi_shard, msg_commit_id, settings, dryrun, nomail) if sent_msgid and not dryrun: tycount += 1 @@ -748,107 +805,6 @@ def send_thanks(repo, pitopdir, cmdconfig, nomail, dryrun): logger.info('No new thank-yous to send.') -def check_if_in_repo(repo, commit_id): - lines = git_get_command_lines(repo, ['cat-file', '-t', commit_id]) - if len(lines): - return True - return False - - -def show_unapplied(repo, pitopdir, cmdconfig, use_json): - # Do we have a thanks.db there yet? - dbpath = os.path.join(repo, 'thanks.db') - db_exists = os.path.isfile(dbpath) - dbconn = sqlite3.connect(dbpath, sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES) - c = dbconn.cursor() - - if not db_exists: - logger.info('Looks like this repository is not used for tracking PRs') - return - - config = get_config_from_repo(repo, pitopdir, cmdconfig) - - unapplied = list() - cutoffdays = 30 - seen_commit_ids = list() - - for pirepodef, settings in config.items(): - pirepo = get_pirepo_dir(pirepodef, pitopdir) - - try: - cutoffdays = int(settings['cutoffdays']) - except (IndexError, ValueError): - pass - - for pr_commit_id, msg_commit_id in get_all_prs(pirepo, cutoffdays=cutoffdays): - if pr_commit_id in seen_commit_ids: - # skip if we've already seen this commit-id on some other list - continue - - c.execute('SELECT refname, sent_msgid FROM thanks WHERE pr_commit_id=?', - (pr_commit_id,)) - rows = c.fetchall() - if len(rows): - continue - - # Is this pr_commit_id in the repo? - if check_if_in_repo(repo, pr_commit_id): - continue - - orig = git_get_message_from_pi(pirepo, msg_commit_id) - if orig is None: - continue - - dest = email.utils.getaddresses(orig.get_all('to', [])) - dest += email.utils.getaddresses(orig.get_all('cc', [])) - targets = [chunk[1] for chunk in dest] - - if 'onlyifto' in settings: - # Only show unapplied pull requests matching "onlyifto" - if not to_or_cc_contains(targets, settings['onlyifto']): - continue - - origbody = get_plain_part(orig) - if origbody is None: - continue - - remoterepo, remoteref = get_remote_ref_from_body(origbody) - - values = { - 'commit-id': pr_commit_id, - 'listid': orig.get('List-Id', '(unknown-list)'), - 'msgid': orig.get('Message-Id', '(no message id)'), - 'from': orig.get('From', '(no from)'), - 'subject': orig.get('Subject', '(no subject)'), - 'date': orig.get('Date', '(no date)'), - 'remoteref': '%s %s' % (remoterepo, remoteref), - } - - unapplied.append(values) - seen_commit_ids.append(pr_commit_id) - - if not use_json: - print() - print('Subject: %s' % values['subject']) - print(' From: %s' % values['from']) - print(' Date: %s' % values['date']) - print(' List: %s' % values['listid']) - print(' MsgId: %s' % values['msgid']) - print(' Remote: %s' % values['remoteref']) - print(' Commit: %s' % values['commit-id']) - - if use_json: - jsout = json.dumps(unapplied, indent=4, sort_keys=True) - print(jsout) - else: - if len(unapplied): - print() - print('%s unapplied pull requests from the past %d days.' - % (len(unapplied), cutoffdays)) - else: - print('No unapplied pull requests from the past %d days.' % cutoffdays) - - if __name__ == '__main__': parser = argparse.ArgumentParser( formatter_class=argparse.ArgumentDefaultsHelpFormatter @@ -857,10 +813,6 @@ if __name__ == '__main__': help='Check the Public Inbox ML repository for any new pull requests.') parser.add_argument('-m', '--mail-thankyous', dest='tyrepo', default=None, help='Check the repository and thank for any matching pulled PRs.') - parser.add_argument('-u', '--show-unapplied', dest='unapplied', default=None, - help='Show pull requests that have not yet been applied to this repository.') - parser.add_argument('-j', '--json-formatted', dest='json', action='store_true', default=False, - help='Show unapplied PRs in a json format (use with -u).') parser.add_argument('-t', '--pirepos-topdir', dest='topdir', default=None, help='Toplevel path where all public-inbox repos are (optional)') parser.add_argument('-c', '--config', dest='config', nargs='+', default=list(), @@ -873,6 +825,8 @@ if __name__ == '__main__': help='Do not mail anything, but store database entries.') parser.add_argument('-q', '--quiet', action='store_true', default=False, help='Only output errors to the stdout') + parser.add_argument('-v', '--verbose', action='store_true', default=False, + help='Output extra debugging information') cmdargs = parser.parse_args() @@ -893,6 +847,8 @@ if __name__ == '__main__': if cmdargs.quiet: ch.setLevel(logging.CRITICAL) + elif cmdargs.verbose: + ch.setLevel(logging.DEBUG) else: ch.setLevel(logging.INFO) @@ -903,6 +859,3 @@ if __name__ == '__main__': if cmdargs.tyrepo is not None: send_thanks(cmdargs.tyrepo, cmdargs.topdir, cmdargs.config, cmdargs.nomail, cmdargs.dryrun) - - if cmdargs.unapplied is not None: - show_unapplied(cmdargs.unapplied, cmdargs.topdir, cmdargs.config, cmdargs.json) |