aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKonstantin Ryabitsev <konstantin@linuxfoundation.org>2019-03-13 18:50:39 -0400
committerKonstantin Ryabitsev <konstantin@linuxfoundation.org>2019-03-13 18:50:39 -0400
commit9bc20f52a85fb8d27528973dc69a4acb8702fc85 (patch)
tree639b8a9baf3a78ff0a7acbdd870a6f1eb78c47f5
parent4040fb4f18e1e8d9f164593ee657e65f1326dd44 (diff)
downloadkorg-helpers-9bc20f52a85fb8d27528973dc69a4acb8702fc85.tar.gz
Properly roll over between public-inbox shards
This does a much better job of handling shard roll-over. Signed-off-by: Konstantin Ryabitsev <konstantin@linuxfoundation.org>
-rwxr-xr-xpr-tracker-bot.py301
1 files changed, 127 insertions, 174 deletions
diff --git a/pr-tracker-bot.py b/pr-tracker-bot.py
index 3c876a6..4ac9287 100755
--- a/pr-tracker-bot.py
+++ b/pr-tracker-bot.py
@@ -31,12 +31,11 @@ import sqlite3
import logging
import re
import glob
-import json
from fcntl import lockf, LOCK_EX, LOCK_NB
from string import Template
-DB_VERSION = 1
+DB_VERSION = 2
# Case doesn't matter
PULL_SUBJECT_RE = [
@@ -72,19 +71,53 @@ def make_msgid(idstring=None, domain='kernel.org'):
return '<%d.%d.%d%s@%s>' % (timeval, pid, randint, idstring, domain)
+def db_migrate_1_to_2(projpath):
+ pirepo, maxshard = get_pirepo_dir(projpath, None)
+ old_dbpath = os.path.join(projpath, '{0}.git'.format(maxshard), 'prs.db')
+ dbconn_old = sqlite3.connect(old_dbpath, sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES)
+ c_old = dbconn_old.cursor()
+ # Get all values from old db
+ pr_rows = c_old.execute('SELECT * FROM prs').fetchall()
+ h_rows = c_old.execute('SELECT * FROM heads').fetchall()
+ dbconn_old.close()
+
+ dbpath = os.path.join(projpath, 'prtracker.db')
+ dbconn = sqlite3.connect(dbpath, sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES)
+ c = dbconn.cursor()
+ db_init_pr_sqlite_db(c)
+
+ for row in pr_rows:
+ c.execute('INSERT INTO prs VALUES(?,?,?,?,?,?)', [maxshard]+list(row))
+
+ for row in h_rows:
+ c.execute('INSERT INTO heads VALUES(?,?,?)', [maxshard] + list(row))
+
+ dbconn.commit()
+ dbconn.close()
+
+
def db_save_meta(c):
c.execute('DELETE FROM meta')
c.execute('''INSERT INTO meta VALUES(?)''', (DB_VERSION,))
-def db_save_repo_heads(c, heads):
- c.execute('DELETE FROM heads')
+def db_save_repo_heads(c, heads, shard=None):
+ if shard is None:
+ c.execute('DELETE FROM heads')
+ for refname, commit_id in heads:
+ c.execute('''INSERT INTO heads VALUES(?,?)''', (refname, commit_id))
+ return
+
+ c.execute('DELETE FROM heads WHERE pi_shard=?', (shard,))
for refname, commit_id in heads:
- c.execute('''INSERT INTO heads VALUES(?,?)''', (refname, commit_id))
+ c.execute('''INSERT INTO heads VALUES(?,?,?)''', (shard, refname, commit_id))
-def db_get_repo_heads(c):
- return c.execute('SELECT refname, commit_id FROM heads').fetchall()
+def db_get_repo_heads(c, shard=None):
+ if shard is None:
+ return c.execute('SELECT refname, commit_id FROM heads').fetchall()
+ return c.execute('SELECT refname, commit_id FROM heads WHERE pi_shard=?',
+ (shard,)).fetchall()
def db_init_common_sqlite_db(c):
@@ -93,19 +126,20 @@ def db_init_common_sqlite_db(c):
version INTEGER
)''')
db_save_meta(c)
- c.execute('''
- CREATE TABLE heads (
- refname TEXT,
- commit_id TEXT
- )''')
def db_init_pr_sqlite_db(c):
logger.info('Initializing new sqlite3 db with metadata version %s', DB_VERSION)
db_init_common_sqlite_db(c)
-
+ c.execute('''
+ CREATE TABLE heads (
+ pi_shard INTEGER,
+ refname TEXT,
+ commit_id TEXT
+ )''')
c.execute('''
CREATE TABLE prs (
+ pi_shard INTEGER,
msg_commit_id TEXT UNIQUE,
msg_id TEXT UNIQUE,
subject TEXT,
@@ -118,6 +152,11 @@ def db_init_thanks_sqlite_db(c):
logger.info('Initializing new sqlite3 db with metadata version %s', DB_VERSION)
db_init_common_sqlite_db(c)
c.execute('''
+ CREATE TABLE heads (
+ refname TEXT,
+ commit_id TEXT
+ )''')
+ c.execute('''
CREATE TABLE thanks (
pr_commit_id TEXT, --- pi.prs.pr_commit_id
refname TEXT,
@@ -166,24 +205,11 @@ def git_get_repo_heads(gitdir):
return refs
-def git_get_message_from_pi(pirepo, commit_id):
+def git_get_message_from_pi(projpath, shard, commit_id):
+ pirepo = os.path.join(projpath, '{0}.git'.format(shard))
full_email = git_run_command(pirepo, ['show', '%s:m' % commit_id])
if not len(full_email):
- logger.debug('%s not found in %s, attempting to find in the previous shard',
- commit_id, pirepo)
- shardrepo = os.path.basename(pirepo)
- shard = int(shardrepo.split('.')[0])
- if shard <= 0:
- # Well, we tried
- return None
- prevpirepo = os.path.join(os.path.dirname(pirepo),
- '{0}.git'.format(shard-1))
- logger.debug('previous shard is in %s', prevpirepo)
- full_email = git_run_command(prevpirepo, ['show', '%s:m' % commit_id])
- if not len(full_email):
- # it's *not* going to be in the shard before that, so just
- # give up at this point -- we assume it got deleted
- return None
+ return None
msg = email.message_from_string(full_email.encode('utf-8'))
return msg
@@ -239,7 +265,7 @@ def get_remote_ref_from_body(body):
return repo, ref
-def record_pr_data(msg_commit_id, msg, c):
+def record_pr_data(shard, msg_commit_id, msg, c):
body = get_plain_part(msg)
if body is None:
return False
@@ -278,8 +304,8 @@ def record_pr_data(msg_commit_id, msg, c):
logger.debug('Found a new PR: %s', subject)
logger.debug(' commit_id: %s', pr_commit_id)
try:
- c.execute('INSERT INTO prs VALUES(?, ?, ?, ?, ?)',
- (msg_commit_id, msg_id, subject, pr_commit_id, received))
+ c.execute('INSERT INTO prs VALUES(?, ?, ?, ?, ?, ?)',
+ (shard, msg_commit_id, msg_id, subject, pr_commit_id, received))
return True
except sqlite3.IntegrityError:
logger.debug('Got integrity-error for %s', msg_id)
@@ -310,7 +336,11 @@ def git_get_new_revs(gitdir, db_heads, git_heads):
# No changes in this head
continue
- rev_range = '%s..%s' % (db_commit_id, git_commit_id)
+ if len(db_commit_id):
+ rev_range = '%s..%s' % (db_commit_id, git_commit_id)
+ else:
+ rev_range = git_commit_id
+
lines = git_get_command_lines(gitdir, ['rev-list', '--pretty=oneline',
'--reverse', rev_range, refname])
if not lines:
@@ -334,55 +364,82 @@ def git_get_merge_id(repo, commit_id):
def get_pirepo_dir(projpath, topdir):
- # Find the largest #.git in a public-inbox sharded repo
- # TODO: handle roll-over between shards
if topdir:
projpath = os.path.join(topdir, projpath)
- # if the path does not end with .git, find the latest shard
- pirepo = projpath
- if pirepo[-4:] != '.git':
- subs = os.listdir(projpath)
- at = 0
- while True:
- at += 1
- if '{0}.git'.format(at) not in subs:
- break
+ projpath = projpath.rstrip('/')
- pirepo = os.path.join(projpath, '{0}.git'.format(at-1))
+ # drop the #.git if we find it
+ if projpath[-4:] == '.git':
+ projpath = os.path.dirname(projpath)
- return pirepo
+ subs = os.listdir(projpath)
+ at = 0
+ while True:
+ if '{0}.git'.format(at+1) not in subs:
+ return projpath, at
+ at += 1
def parse_pull_requests(pirepo, topdir, dryrun):
- pirepo = get_pirepo_dir(pirepo, topdir)
+ projpath, maxshard = get_pirepo_dir(pirepo, topdir)
+ pirepo = os.path.join(projpath, '{0}.git'.format(maxshard))
+ logger.debug('pirepo=%s', pirepo)
git_heads = git_get_repo_heads(pirepo)
if not git_heads:
logger.critical('Could not get the latest ref in %s', pirepo)
sys.exit(1)
try:
- lockfh = open(os.path.join(pirepo, '.prtracker.lock'), 'w')
+ lockfh = open(os.path.join(projpath, 'prtracker.lock'), 'w')
lockf(lockfh, LOCK_EX | LOCK_NB)
except IOError:
logger.debug('Could not obtain an exclusive lock, assuming another process is running.')
return
- # Do we have a prs.db there yet?
- dbpath = os.path.join(pirepo, 'prs.db')
+ # Do we have a prtracker.db there yet?
+ dbpath = os.path.join(projpath, 'prtracker.db')
db_exists = os.path.isfile(dbpath)
+ if not db_exists:
+ # Do we have a prs.db in the latest shard (metadata 1.0)
+ old_dbpath = os.path.join(pirepo, 'prs.db')
+ if os.path.exists(old_dbpath):
+ db_migrate_1_to_2(projpath)
+ db_exists = True
+
dbconn = sqlite3.connect(dbpath, sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES)
c = dbconn.cursor()
if not db_exists:
# initialize it once we get the latest ref
db_init_pr_sqlite_db(c)
- db_save_repo_heads(c, git_heads)
+ db_save_repo_heads(c, git_heads, maxshard)
# Exit early
dbconn.commit()
return
- db_heads = db_get_repo_heads(c)
+ db_heads = db_get_repo_heads(c, maxshard)
+ if not len(db_heads) and maxshard == 0:
+ logger.info('Not sure what happened, but I did not find any db heads and maxshard=0')
+ return
+
+ if not len(db_heads):
+ # This should only happen when we have rolled over
+ # between shards, so we need to first finish the
+ # old shard and then prep next run from starting from
+ # the new shard
+ # Prep the next run with the new shard. We use empty
+ # space so that rev-list formulates a ..foo rev-list
+ # that returns the revs from the beginning of history
+ logger.info('Preparing for shard roll-over to %s.git', maxshard)
+ db_save_repo_heads(c, (('refs/heads/master', ''),), maxshard)
+ # Now let's finish with the previous shard
+ maxshard -= 1
+ # Refresh these
+ pirepo = os.path.join(projpath, '{0}.git'.format(maxshard))
+ logger.info('Finishing previous shard %s', pirepo)
+ db_heads = db_get_repo_heads(c, maxshard)
+ git_heads = git_get_repo_heads(pirepo)
newrevs = git_get_new_revs(pirepo, db_heads, git_heads)
if not newrevs:
@@ -397,15 +454,15 @@ def parse_pull_requests(pirepo, topdir, dryrun):
for subject_re in PULL_SUBJECT_RE:
if subject_re.match(subject):
logger.debug('potential match: "%s"', subject)
- msg = git_get_message_from_pi(pirepo, commit_id)
- if msg is not None and record_pr_data(commit_id, msg, c):
+ msg = git_get_message_from_pi(projpath, maxshard, commit_id)
+ if msg is not None and record_pr_data(maxshard, commit_id, msg, c):
new_prs += 1
logger.info('Started tracking: %s', subject)
if not new_prs:
logger.info('No new PRs found.')
- db_save_repo_heads(c, git_heads)
+ db_save_repo_heads(c, git_heads, maxshard)
if not dryrun:
dbconn.commit()
@@ -474,8 +531,8 @@ def get_all_thanked_prs(c, cutoffdays=30):
return sent_prs
-def get_all_prs(pirepo, cutoffdays=30):
- dbpath = os.path.join(pirepo, 'prs.db')
+def get_all_prs(projpath, cutoffdays=30):
+ dbpath = os.path.join(projpath, 'prtracker.db')
if not os.path.isfile(dbpath):
# The DB does not exist, and we don't create it
# from this location.
@@ -485,7 +542,7 @@ def get_all_prs(pirepo, cutoffdays=30):
prdbconn = sqlite3.connect(dbpath, sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES)
prc = prdbconn.cursor()
rows = prc.execute('''
- SELECT pr_commit_id, msg_commit_id
+ SELECT pi_shard, pr_commit_id, msg_commit_id
FROM prs
WHERE received >= datetime('now', ?)
''', ('-%d day' % cutoffdays,)).fetchall()
@@ -524,7 +581,7 @@ def to_or_cc_contains(tocc, addrs):
return False
-def thank_for_pr(c, repo, refname, commit_id, pirepo, msg_commit_id, config, dryrun, nomail):
+def thank_for_pr(c, repo, refname, commit_id, projpath, pi_shard, msg_commit_id, config, dryrun, nomail):
# Make sure we haven't thanked for it already
c.execute('SELECT sent_msgid FROM thanks WHERE pr_commit_id=? AND refname=?',
(commit_id, refname))
@@ -533,7 +590,7 @@ def thank_for_pr(c, repo, refname, commit_id, pirepo, msg_commit_id, config, dry
logger.debug('Already thanked for this PR, skipping')
return None
- orig = git_get_message_from_pi(pirepo, msg_commit_id)
+ orig = git_get_message_from_pi(projpath, pi_shard, msg_commit_id)
if orig is None:
return None
@@ -710,8 +767,8 @@ def send_thanks(repo, pitopdir, cmdconfig, nomail, dryrun):
tycount = 0
for pirepo, settings in config.items():
- pirepo = get_pirepo_dir(pirepo, pitopdir)
- logger.info('Grabbing PR commits from %s', pirepo)
+ projpath, maxshard = get_pirepo_dir(pirepo, pitopdir)
+ logger.info('Grabbing PR commits from %s', projpath)
logger.debug('config follows')
logger.debug(settings)
@@ -721,18 +778,18 @@ def send_thanks(repo, pitopdir, cmdconfig, nomail, dryrun):
except (IndexError, ValueError):
pass
- prs = get_all_prs(pirepo, cutoffdays=cutoffdays)
- logger.debug('Found %s PRs in %s', len(prs), pirepo)
+ prs = get_all_prs(projpath, cutoffdays=cutoffdays)
+ logger.debug('Found %s PRs in %s', len(prs), projpath)
un_prs = get_unprocessed_prs(c, prs, cutoffdays=cutoffdays)
logger.debug('Of them, %s not already thanked for', len(un_prs))
- for pr_commit_id, msg_commit_id in un_prs:
+ for pi_shard, pr_commit_id, msg_commit_id in un_prs:
logger.debug('Checking %s', pr_commit_id)
# Is this pr_commit_id in the repo?
lines = git_get_command_lines(repo, ['branch', '--contains', pr_commit_id])
if len(lines):
refname = 'refs/heads/%s' % lines[0].split()[-1]
logger.debug('Found %s in %s', pr_commit_id, refname)
- sent_msgid = thank_for_pr(c, repo, refname, pr_commit_id, pirepo,
+ sent_msgid = thank_for_pr(c, repo, refname, pr_commit_id, projpath, pi_shard,
msg_commit_id, settings, dryrun, nomail)
if sent_msgid and not dryrun:
tycount += 1
@@ -748,107 +805,6 @@ def send_thanks(repo, pitopdir, cmdconfig, nomail, dryrun):
logger.info('No new thank-yous to send.')
-def check_if_in_repo(repo, commit_id):
- lines = git_get_command_lines(repo, ['cat-file', '-t', commit_id])
- if len(lines):
- return True
- return False
-
-
-def show_unapplied(repo, pitopdir, cmdconfig, use_json):
- # Do we have a thanks.db there yet?
- dbpath = os.path.join(repo, 'thanks.db')
- db_exists = os.path.isfile(dbpath)
- dbconn = sqlite3.connect(dbpath, sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES)
- c = dbconn.cursor()
-
- if not db_exists:
- logger.info('Looks like this repository is not used for tracking PRs')
- return
-
- config = get_config_from_repo(repo, pitopdir, cmdconfig)
-
- unapplied = list()
- cutoffdays = 30
- seen_commit_ids = list()
-
- for pirepodef, settings in config.items():
- pirepo = get_pirepo_dir(pirepodef, pitopdir)
-
- try:
- cutoffdays = int(settings['cutoffdays'])
- except (IndexError, ValueError):
- pass
-
- for pr_commit_id, msg_commit_id in get_all_prs(pirepo, cutoffdays=cutoffdays):
- if pr_commit_id in seen_commit_ids:
- # skip if we've already seen this commit-id on some other list
- continue
-
- c.execute('SELECT refname, sent_msgid FROM thanks WHERE pr_commit_id=?',
- (pr_commit_id,))
- rows = c.fetchall()
- if len(rows):
- continue
-
- # Is this pr_commit_id in the repo?
- if check_if_in_repo(repo, pr_commit_id):
- continue
-
- orig = git_get_message_from_pi(pirepo, msg_commit_id)
- if orig is None:
- continue
-
- dest = email.utils.getaddresses(orig.get_all('to', []))
- dest += email.utils.getaddresses(orig.get_all('cc', []))
- targets = [chunk[1] for chunk in dest]
-
- if 'onlyifto' in settings:
- # Only show unapplied pull requests matching "onlyifto"
- if not to_or_cc_contains(targets, settings['onlyifto']):
- continue
-
- origbody = get_plain_part(orig)
- if origbody is None:
- continue
-
- remoterepo, remoteref = get_remote_ref_from_body(origbody)
-
- values = {
- 'commit-id': pr_commit_id,
- 'listid': orig.get('List-Id', '(unknown-list)'),
- 'msgid': orig.get('Message-Id', '(no message id)'),
- 'from': orig.get('From', '(no from)'),
- 'subject': orig.get('Subject', '(no subject)'),
- 'date': orig.get('Date', '(no date)'),
- 'remoteref': '%s %s' % (remoterepo, remoteref),
- }
-
- unapplied.append(values)
- seen_commit_ids.append(pr_commit_id)
-
- if not use_json:
- print()
- print('Subject: %s' % values['subject'])
- print(' From: %s' % values['from'])
- print(' Date: %s' % values['date'])
- print(' List: %s' % values['listid'])
- print(' MsgId: %s' % values['msgid'])
- print(' Remote: %s' % values['remoteref'])
- print(' Commit: %s' % values['commit-id'])
-
- if use_json:
- jsout = json.dumps(unapplied, indent=4, sort_keys=True)
- print(jsout)
- else:
- if len(unapplied):
- print()
- print('%s unapplied pull requests from the past %d days.'
- % (len(unapplied), cutoffdays))
- else:
- print('No unapplied pull requests from the past %d days.' % cutoffdays)
-
-
if __name__ == '__main__':
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter
@@ -857,10 +813,6 @@ if __name__ == '__main__':
help='Check the Public Inbox ML repository for any new pull requests.')
parser.add_argument('-m', '--mail-thankyous', dest='tyrepo', default=None,
help='Check the repository and thank for any matching pulled PRs.')
- parser.add_argument('-u', '--show-unapplied', dest='unapplied', default=None,
- help='Show pull requests that have not yet been applied to this repository.')
- parser.add_argument('-j', '--json-formatted', dest='json', action='store_true', default=False,
- help='Show unapplied PRs in a json format (use with -u).')
parser.add_argument('-t', '--pirepos-topdir', dest='topdir', default=None,
help='Toplevel path where all public-inbox repos are (optional)')
parser.add_argument('-c', '--config', dest='config', nargs='+', default=list(),
@@ -873,6 +825,8 @@ if __name__ == '__main__':
help='Do not mail anything, but store database entries.')
parser.add_argument('-q', '--quiet', action='store_true', default=False,
help='Only output errors to the stdout')
+ parser.add_argument('-v', '--verbose', action='store_true', default=False,
+ help='Output extra debugging information')
cmdargs = parser.parse_args()
@@ -893,6 +847,8 @@ if __name__ == '__main__':
if cmdargs.quiet:
ch.setLevel(logging.CRITICAL)
+ elif cmdargs.verbose:
+ ch.setLevel(logging.DEBUG)
else:
ch.setLevel(logging.INFO)
@@ -903,6 +859,3 @@ if __name__ == '__main__':
if cmdargs.tyrepo is not None:
send_thanks(cmdargs.tyrepo, cmdargs.topdir, cmdargs.config, cmdargs.nomail, cmdargs.dryrun)
-
- if cmdargs.unapplied is not None:
- show_unapplied(cmdargs.unapplied, cmdargs.topdir, cmdargs.config, cmdargs.json)