mirror of
https://github.com/josegonzalez/python-github-backup.git
synced 2025-12-05 16:18:02 +01:00
Pass the -H or --github-host argument with a GitHub Enterprise hostname to backup from that GitHub enterprise host. If no argument is passed then back up from github.com.
338 lines
14 KiB
Python
338 lines
14 KiB
Python
#!/usr/bin/env python
|
|
|
|
import argparse
|
|
import base64
|
|
import errno
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import select
|
|
import subprocess
|
|
import sys
|
|
import urllib
|
|
import urllib2
|
|
|
|
from github_backup import __version__
|
|
|
|
|
|
def log_error(message):
|
|
if type(message) == str:
|
|
message = [message]
|
|
|
|
for msg in message:
|
|
sys.stderr.write("{0}\n".format(msg))
|
|
|
|
sys.exit(1)
|
|
|
|
|
|
def log_info(message):
|
|
if type(message) == str:
|
|
message = [message]
|
|
|
|
for msg in message:
|
|
sys.stdout.write("{0}\n".format(msg))
|
|
|
|
|
|
def logging_subprocess(popenargs, logger, stdout_log_level=logging.DEBUG, stderr_log_level=logging.ERROR, **kwargs):
|
|
"""
|
|
Variant of subprocess.call that accepts a logger instead of stdout/stderr,
|
|
and logs stdout messages via logger.debug and stderr messages via
|
|
logger.error.
|
|
"""
|
|
child = subprocess.Popen(popenargs, stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE, **kwargs)
|
|
|
|
log_level = {child.stdout: stdout_log_level,
|
|
child.stderr: stderr_log_level}
|
|
|
|
def check_io():
|
|
ready_to_read = select.select([child.stdout, child.stderr], [], [], 1000)[0]
|
|
for io in ready_to_read:
|
|
line = io.readline()
|
|
if not logger:
|
|
continue
|
|
if not (io == child.stderr and not line):
|
|
logger.log(log_level[io], line[:-1])
|
|
|
|
# keep checking stdout/stderr until the child exits
|
|
while child.poll() is None:
|
|
check_io()
|
|
|
|
check_io() # check again to catch anything after the process exits
|
|
|
|
return child.wait()
|
|
|
|
|
|
def mkdir_p(*args):
|
|
for path in args:
|
|
try:
|
|
os.makedirs(path)
|
|
except OSError as exc: # Python >2.5
|
|
if exc.errno == errno.EEXIST and os.path.isdir(path):
|
|
pass
|
|
else:
|
|
raise
|
|
|
|
|
|
def parse_args():
|
|
parser = argparse.ArgumentParser(description='Backup a github users account', prog='Github Backup')
|
|
parser.add_argument('user', metavar='USER', type=str, help='github username')
|
|
parser.add_argument('-u', '--username', dest='username', help='username for basic auth')
|
|
parser.add_argument('-p', '--password', dest='password', help='password for basic auth')
|
|
parser.add_argument('-t', '--token', dest='token', help='personal access or OAuth token')
|
|
parser.add_argument('-o', '--output-directory', default='.', dest='output_directory', help='directory at which to backup the repositories')
|
|
parser.add_argument('--starred', action='store_true', dest='include_starred', help='include starred repositories in backup')
|
|
parser.add_argument('--watched', action='store_true', dest='include_watched', help='include watched repositories in backup')
|
|
parser.add_argument('--all', action='store_true', dest='include_everything', help='include everything in backup')
|
|
parser.add_argument('--issues', action='store_true', dest='include_issues', help='include issues in backup')
|
|
parser.add_argument('--issue-comments', action='store_true', dest='include_issue_comments', help='include issue comments in backup')
|
|
parser.add_argument('--issue-events', action='store_true', dest='include_issue_events', help='include issue events in backup')
|
|
parser.add_argument('--repositories', action='store_true', dest='include_repository', help='include repository clone in backup')
|
|
parser.add_argument('--wikis', action='store_true', dest='include_wiki', help='include wiki clone in backup')
|
|
parser.add_argument('--skip-existing', action='store_true', dest='skip_existing', help='skip project if a backup directory exists')
|
|
parser.add_argument('-L', '--languages', dest='languages', help='only allow these languages', nargs='*')
|
|
parser.add_argument('-N', '--name-regex', dest='name_regex', help='python regex to match names against')
|
|
parser.add_argument('-H', '--github-host', dest='github_host', help='GitHub Enterprise hostname')
|
|
parser.add_argument('-O', '--organization', action='store_true', dest='organization', help='whether or not this is a query for an organization')
|
|
parser.add_argument('-R', '--repository', dest='repository', help='name of repository to limit backup to')
|
|
parser.add_argument('-P', '--private', action='store_true', dest='private', help='include private repositories')
|
|
parser.add_argument('-F', '--fork', action='store_true', dest='fork', help='include forked repositories')
|
|
parser.add_argument('-v', '--version', action='version', version='%(prog)s ' + __version__)
|
|
return parser.parse_args()
|
|
|
|
|
|
def get_auth(args):
|
|
auth = None
|
|
if args.token:
|
|
auth = base64.b64encode(args.token + ':' + 'x-oauth-basic')
|
|
elif args.username and args.password:
|
|
auth = base64.b64encode(args.username + ':' + args.password)
|
|
elif args.username and not args.password:
|
|
log_error('You must specify a password for basic auth when specifying a username')
|
|
elif args.password and not args.username:
|
|
log_error('You must specify a username for basic auth when specifying a password')
|
|
|
|
return auth
|
|
|
|
def get_github_api_host(args):
|
|
if args.github_host:
|
|
host = args.github_host + '/api/v3'
|
|
else:
|
|
host = 'api.github.com'
|
|
|
|
return host
|
|
|
|
def get_github_ssh_host(args):
|
|
if args.github_host:
|
|
host = args.github_host
|
|
else:
|
|
host = 'github.com'
|
|
|
|
return host
|
|
|
|
def retrieve_data(args, template, query_args=None, single_request=False):
|
|
auth = get_auth(args)
|
|
per_page = 100
|
|
page = 0
|
|
data = []
|
|
if not query_args:
|
|
query_args = {}
|
|
|
|
while True:
|
|
page = page + 1
|
|
querystring = urllib.urlencode(dict({
|
|
'per_page': per_page,
|
|
'page': page
|
|
}.items() + query_args.items()))
|
|
|
|
request = urllib2.Request(template + '?' + querystring)
|
|
if auth is not None:
|
|
request.add_header('Authorization', 'Basic ' + auth)
|
|
r = urllib2.urlopen(request)
|
|
|
|
errors = []
|
|
if int(r.getcode()) != 200:
|
|
errors.append('Bad response from api')
|
|
|
|
if 'X-RateLimit-Limit' in r.headers and int(r.headers['X-RateLimit-Limit']) == 0:
|
|
ratelimit_error = 'No more requests remaining'
|
|
if auth is None:
|
|
ratelimit_error = ratelimit_error + ', specify username/password or token to raise your github ratelimit'
|
|
|
|
errors.append(ratelimit_error)
|
|
|
|
if int(r.getcode()) != 200:
|
|
log_error(errors)
|
|
|
|
response = json.loads(r.read())
|
|
if len(errors) == 0:
|
|
if type(response) == list:
|
|
data.extend(response)
|
|
if len(response) < per_page:
|
|
break
|
|
elif type(response) == dict and single_request:
|
|
data.append(response)
|
|
|
|
if len(errors) > 0:
|
|
log_error(errors)
|
|
|
|
if single_request:
|
|
break
|
|
|
|
return data
|
|
|
|
|
|
def retrieve_repositories(args):
|
|
log_info('Retrieving repositories')
|
|
single_request = False
|
|
template = 'https://{0}/users/{1}/repos'.format(get_github_api_host(args), args.user)
|
|
if args.organization:
|
|
template = 'https://{0}/orgs/{1}/repos'.format(get_github_api_host(args), args.user)
|
|
|
|
if args.repository:
|
|
single_request = True
|
|
template = 'https://{0}/repos/{1}/{2}'.format(get_github_api_host(args), args.user, args.repository)
|
|
|
|
return retrieve_data(args, template, single_request=single_request)
|
|
|
|
|
|
def filter_repositories(args, repositories):
|
|
log_info('Filtering repositories')
|
|
name_regex = None
|
|
if args.name_regex:
|
|
name_regex = re.compile(args.name_regex)
|
|
|
|
languages = None
|
|
if args.languages:
|
|
languages = [x.lower() for x in args.languages]
|
|
|
|
if not args.fork:
|
|
repositories = [r for r in repositories if not r['fork']]
|
|
if not args.private:
|
|
repositories = [r for r in repositories if not r['private']]
|
|
if languages:
|
|
repositories = [r for r in repositories if r['language'] and r['language'].lower() in languages]
|
|
if name_regex:
|
|
repositories = [r for r in repositories if name_regex.match(r['name'])]
|
|
|
|
return repositories
|
|
|
|
|
|
def backup_repositories(args, output_directory, repositories):
|
|
log_info('Backing up repositories')
|
|
issue_template = 'https://{0}/repos'.format(get_github_api_host(args))
|
|
wiki_template = "git@{0}:{1}.wiki.git"
|
|
|
|
issue_states = ['open', 'closed']
|
|
for repository in repositories:
|
|
backup_cwd = os.path.join(output_directory, 'repositories')
|
|
repo_cwd = os.path.join(backup_cwd, repository['name'])
|
|
|
|
if args.include_repository or args.include_everything:
|
|
mkdir_p(backup_cwd, repo_cwd)
|
|
exists = os.path.isdir('{0}/repository/.git'.format(repo_cwd))
|
|
if args.skip_existing and exists:
|
|
continue
|
|
|
|
if exists:
|
|
log_info('Updating {0} repository'.format(repository['full_name']))
|
|
git_command = ["git", "pull", 'origin', 'master']
|
|
logging_subprocess(git_command, logger=None, cwd=os.path.join(repo_cwd, 'repository'))
|
|
else:
|
|
log_info('Cloning {0} repository'.format(repository['full_name']))
|
|
git_command = ["git", "clone", repository['clone_url'], 'repository']
|
|
logging_subprocess(git_command, logger=None, cwd=repo_cwd)
|
|
|
|
if repository['has_wiki'] and (args.include_wiki or args.include_everything):
|
|
mkdir_p(backup_cwd, repo_cwd)
|
|
exists = os.path.isdir('{0}/wiki/.git'.format(repo_cwd))
|
|
if args.skip_existing and exists:
|
|
continue
|
|
|
|
if exists:
|
|
log_info('Updating {0} wiki'.format(repository['full_name']))
|
|
git_command = ["git", "pull", 'origin', 'master']
|
|
logging_subprocess(git_command, logger=None, cwd=os.path.join(repo_cwd, 'wiki'))
|
|
else:
|
|
log_info('Cloning {0} wiki'.format(repository['full_name']))
|
|
git_command = ["git", "clone", wiki_template.format(get_github_ssh_host(args), repository['full_name']), 'wiki']
|
|
logging_subprocess(git_command, logger=None, cwd=repo_cwd)
|
|
|
|
if args.include_issues or args.include_everything:
|
|
if args.skip_existing and os.path.isdir('{0}/issues/.git'.format(repo_cwd)):
|
|
continue
|
|
|
|
log_info('Retrieving {0} issues'.format(repository['full_name']))
|
|
issue_cwd = os.path.join(repo_cwd, 'issues')
|
|
mkdir_p(backup_cwd, repo_cwd, issue_cwd)
|
|
|
|
issues = {}
|
|
_issue_template = '{0}/{1}/issues'.format(issue_template, repository['full_name'])
|
|
|
|
for issue_state in issue_states:
|
|
query_args = {
|
|
'filter': 'all',
|
|
'state': issue_state
|
|
}
|
|
|
|
_issues = retrieve_data(args, _issue_template, query_args=query_args)
|
|
for issue in _issues:
|
|
issues[issue['number']] = issue
|
|
|
|
log_info('Saving {0} issues to disk'.format(len(issues.keys())))
|
|
for number, issue in issues.iteritems():
|
|
comments_template = _issue_template + '/{0}/comments'
|
|
events_template = _issue_template + '/{0}/events'
|
|
if args.include_issue_comments or args.include_everything:
|
|
issues[number]['comment_data'] = retrieve_data(args, comments_template.format(number))
|
|
if args.include_issue_events or args.include_everything:
|
|
issues[number]['event_data'] = retrieve_data(args, events_template.format(number))
|
|
|
|
with open('{0}/{1}.json'.format(issue_cwd, number), 'w') as issue_file:
|
|
json.dump(issue, issue_file, sort_keys=True, indent=4, separators=(',', ': '))
|
|
|
|
|
|
def backup_account(args, output_directory):
|
|
account_cwd = os.path.join(output_directory, 'account')
|
|
if args.include_starred or args.include_everything:
|
|
if not args.skip_existing or not os.path.exists('{0}/starred.json'.format(account_cwd)):
|
|
log_info('Retrieving {0} starred repositories'.format(args.user))
|
|
mkdir_p(account_cwd)
|
|
|
|
starred_template = "https://{0}/users/{1}/starred"
|
|
starred = retrieve_data(args, starred_template.format(get_github_api_host(args), args.user))
|
|
log_info('Writing {0} starred repositories'.format(len(starred)))
|
|
with open('{0}/starred.json'.format(account_cwd), 'w') as starred_file:
|
|
json.dump(starred, starred_file, sort_keys=True, indent=4, separators=(',', ': '))
|
|
|
|
if args.include_watched or args.include_everything:
|
|
if not args.skip_existing or not os.path.exists('{0}/watched.json'.format(account_cwd)):
|
|
log_info('Retrieving {0} watched repositories'.format(args.user))
|
|
mkdir_p(account_cwd)
|
|
|
|
watched_template = "https://{0}/users/{1}/subscriptions"
|
|
watched = retrieve_data(args, watched_template.format(get_github_api_host(args), args.user))
|
|
log_info('Writing {0} watched repositories'.format(len(watched)))
|
|
with open('{0}/watched.json'.format(account_cwd), 'w') as watched_file:
|
|
json.dump(watched, watched_file, sort_keys=True, indent=4, separators=(',', ': '))
|
|
|
|
|
|
def main():
|
|
args = parse_args()
|
|
|
|
output_directory = os.path.realpath(args.output_directory)
|
|
if not os.path.isdir(output_directory):
|
|
log_error('Specified output directory is not a directory: {0}'.format(output_directory))
|
|
|
|
log_info('Backing up user {0} to {1}'.format(args.user, output_directory))
|
|
|
|
repositories = retrieve_repositories(args)
|
|
repositories = filter_repositories(args, repositories)
|
|
backup_repositories(args, output_directory, repositories)
|
|
backup_account(args, output_directory)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|