Files
python-github-backup/github_backup/github_backup.py
Rodos a194fa48ce feat: Add attachment download support for issues and pull requests
Adds new --attachments flag that downloads user-uploaded files from
issue and PR bodies and comments. Key features:

- Determines attachment URLs
- Tracks downloads in manifest.json with metadata
- Supports --skip-existing to avoid re-downloading
- Handles filename collisions with counter suffix
- Smart retry logic for transient vs permanent failures
- Uses Content-Disposition for correct file extensions
2025-11-06 12:42:57 +11:00

2087 lines
73 KiB
Python

#!/usr/bin/env python
from __future__ import print_function
import argparse
import base64
import calendar
import codecs
import errno
import getpass
import json
import logging
import os
import platform
import re
import select
import socket
import ssl
import subprocess
import sys
import time
from datetime import datetime
from http.client import IncompleteRead
from urllib.error import HTTPError, URLError
from urllib.parse import quote as urlquote
from urllib.parse import urlencode, urlparse
from urllib.request import HTTPRedirectHandler, Request, build_opener, urlopen
try:
from . import __version__
VERSION = __version__
except ImportError:
VERSION = "unknown"
FNULL = open(os.devnull, "w")
FILE_URI_PREFIX = "file://"
logger = logging.getLogger(__name__)
https_ctx = ssl.create_default_context()
if not https_ctx.get_ca_certs():
import warnings
warnings.warn(
"\n\nYOUR DEFAULT CA CERTS ARE EMPTY.\n"
+ "PLEASE POPULATE ANY OF:"
+ "".join(
["\n - " + x for x in ssl.get_default_verify_paths() if type(x) is str]
)
+ "\n",
stacklevel=2,
)
import certifi
https_ctx = ssl.create_default_context(cafile=certifi.where())
def logging_subprocess(
popenargs, stdout_log_level=logging.DEBUG, stderr_log_level=logging.ERROR, **kwargs
):
"""
Variant of subprocess.call that accepts a logger instead of stdout/stderr,
and logs stdout messages via logger.debug and stderr messages via
logger.error.
"""
child = subprocess.Popen(
popenargs, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs
)
if sys.platform == "win32":
logger.info(
"Windows operating system detected - no subprocess logging will be returned"
)
log_level = {child.stdout: stdout_log_level, child.stderr: stderr_log_level}
def check_io():
if sys.platform == "win32":
return
ready_to_read = select.select([child.stdout, child.stderr], [], [], 1000)[0]
for io in ready_to_read:
line = io.readline()
if not logger:
continue
if not (io == child.stderr and not line):
logger.log(log_level[io], line[:-1])
# keep checking stdout/stderr until the child exits
while child.poll() is None:
check_io()
check_io() # check again to catch anything after the process exits
rc = child.wait()
if rc != 0:
print("{} returned {}:".format(popenargs[0], rc), file=sys.stderr)
print("\t", " ".join(popenargs), file=sys.stderr)
return rc
def mkdir_p(*args):
for path in args:
try:
os.makedirs(path)
except OSError as exc: # Python >2.5
if exc.errno == errno.EEXIST and os.path.isdir(path):
pass
else:
raise
def mask_password(url, secret="*****"):
parsed = urlparse(url)
if not parsed.password:
return url
elif parsed.password == "x-oauth-basic":
return url.replace(parsed.username, secret)
return url.replace(parsed.password, secret)
def parse_args(args=None):
parser = argparse.ArgumentParser(description="Backup a github account")
parser.add_argument("user", metavar="USER", type=str, help="github username")
parser.add_argument(
"-u", "--username", dest="username", help="username for basic auth"
)
parser.add_argument(
"-p",
"--password",
dest="password",
help="password for basic auth. "
"If a username is given but not a password, the "
"password will be prompted for.",
)
parser.add_argument(
"-t",
"--token",
dest="token_classic",
help="personal access, OAuth, or JSON Web token, or path to token (file://...)",
) # noqa
parser.add_argument(
"-f",
"--token-fine",
dest="token_fine",
help="fine-grained personal access token (github_pat_....), or path to token (file://...)",
) # noqa
parser.add_argument(
"-q",
"--quiet",
action="store_true",
dest="quiet",
help="supress log messages less severe than warning, e.g. info",
)
parser.add_argument(
"--as-app",
action="store_true",
dest="as_app",
help="authenticate as github app instead of as a user.",
)
parser.add_argument(
"-o",
"--output-directory",
default=".",
dest="output_directory",
help="directory at which to backup the repositories",
)
parser.add_argument(
"-l",
"--log-level",
default="info",
dest="log_level",
help="log level to use (default: info, possible levels: debug, info, warning, error, critical)",
)
parser.add_argument(
"-i",
"--incremental",
action="store_true",
dest="incremental",
help="incremental backup",
)
parser.add_argument(
"--incremental-by-files",
action="store_true",
dest="incremental_by_files",
help="incremental backup based on modification date of files",
)
parser.add_argument(
"--starred",
action="store_true",
dest="include_starred",
help="include JSON output of starred repositories in backup",
)
parser.add_argument(
"--all-starred",
action="store_true",
dest="all_starred",
help="include starred repositories in backup [*]",
)
parser.add_argument(
"--watched",
action="store_true",
dest="include_watched",
help="include JSON output of watched repositories in backup",
)
parser.add_argument(
"--followers",
action="store_true",
dest="include_followers",
help="include JSON output of followers in backup",
)
parser.add_argument(
"--following",
action="store_true",
dest="include_following",
help="include JSON output of following users in backup",
)
parser.add_argument(
"--all",
action="store_true",
dest="include_everything",
help="include everything in backup (not including [*])",
)
parser.add_argument(
"--issues",
action="store_true",
dest="include_issues",
help="include issues in backup",
)
parser.add_argument(
"--issue-comments",
action="store_true",
dest="include_issue_comments",
help="include issue comments in backup",
)
parser.add_argument(
"--issue-events",
action="store_true",
dest="include_issue_events",
help="include issue events in backup",
)
parser.add_argument(
"--pulls",
action="store_true",
dest="include_pulls",
help="include pull requests in backup",
)
parser.add_argument(
"--pull-comments",
action="store_true",
dest="include_pull_comments",
help="include pull request review comments in backup",
)
parser.add_argument(
"--pull-commits",
action="store_true",
dest="include_pull_commits",
help="include pull request commits in backup",
)
parser.add_argument(
"--pull-details",
action="store_true",
dest="include_pull_details",
help="include more pull request details in backup [*]",
)
parser.add_argument(
"--labels",
action="store_true",
dest="include_labels",
help="include labels in backup",
)
parser.add_argument(
"--hooks",
action="store_true",
dest="include_hooks",
help="include hooks in backup (works only when authenticated)",
) # noqa
parser.add_argument(
"--milestones",
action="store_true",
dest="include_milestones",
help="include milestones in backup",
)
parser.add_argument(
"--repositories",
action="store_true",
dest="include_repository",
help="include repository clone in backup",
)
parser.add_argument(
"--bare", action="store_true", dest="bare_clone", help="clone bare repositories"
)
parser.add_argument(
"--no-prune",
action="store_true",
dest="no_prune",
help="disable prune option for git fetch",
)
parser.add_argument(
"--lfs",
action="store_true",
dest="lfs_clone",
help="clone LFS repositories (requires Git LFS to be installed, https://git-lfs.github.com) [*]",
)
parser.add_argument(
"--wikis",
action="store_true",
dest="include_wiki",
help="include wiki clone in backup",
)
parser.add_argument(
"--gists",
action="store_true",
dest="include_gists",
help="include gists in backup [*]",
)
parser.add_argument(
"--starred-gists",
action="store_true",
dest="include_starred_gists",
help="include starred gists in backup [*]",
)
parser.add_argument(
"--skip-archived",
action="store_true",
dest="skip_archived",
help="skip project if it is archived",
)
parser.add_argument(
"--skip-existing",
action="store_true",
dest="skip_existing",
help="skip project if a backup directory exists",
)
parser.add_argument(
"-L",
"--languages",
dest="languages",
help="only allow these languages",
nargs="*",
)
parser.add_argument(
"-N",
"--name-regex",
dest="name_regex",
help="python regex to match names against",
)
parser.add_argument(
"-H", "--github-host", dest="github_host", help="GitHub Enterprise hostname"
)
parser.add_argument(
"-O",
"--organization",
action="store_true",
dest="organization",
help="whether or not this is an organization user",
)
parser.add_argument(
"-R",
"--repository",
dest="repository",
help="name of repository to limit backup to",
)
parser.add_argument(
"-P",
"--private",
action="store_true",
dest="private",
help="include private repositories [*]",
)
parser.add_argument(
"-F",
"--fork",
action="store_true",
dest="fork",
help="include forked repositories [*]",
)
parser.add_argument(
"--prefer-ssh",
action="store_true",
help="Clone repositories using SSH instead of HTTPS",
)
parser.add_argument(
"-v", "--version", action="version", version="%(prog)s " + VERSION
)
parser.add_argument(
"--keychain-name",
dest="osx_keychain_item_name",
help="OSX ONLY: name field of password item in OSX keychain that holds the personal access or OAuth token",
)
parser.add_argument(
"--keychain-account",
dest="osx_keychain_item_account",
help="OSX ONLY: account field of password item in OSX keychain that holds the personal access or OAuth token",
)
parser.add_argument(
"--releases",
action="store_true",
dest="include_releases",
help="include release information, not including assets or binaries",
)
parser.add_argument(
"--latest-releases",
type=int,
default=0,
dest="number_of_latest_releases",
help="include certain number of the latest releases; only applies if including releases",
)
parser.add_argument(
"--skip-prerelease",
action="store_true",
dest="skip_prerelease",
help="skip prerelease and draft versions; only applies if including releases",
)
parser.add_argument(
"--assets",
action="store_true",
dest="include_assets",
help="include assets alongside release information; only applies if including releases",
)
parser.add_argument(
"--attachments",
action="store_true",
dest="include_attachments",
help="download user-attachments from issues and pull requests",
)
parser.add_argument(
"--throttle-limit",
dest="throttle_limit",
type=int,
default=0,
help="start throttling of GitHub API requests after this amount of API requests remain",
)
parser.add_argument(
"--throttle-pause",
dest="throttle_pause",
type=float,
default=30.0,
help="wait this amount of seconds when API request throttling is active (default: 30.0, requires --throttle-limit to be set)",
)
parser.add_argument(
"--exclude", dest="exclude", help="names of repositories to exclude", nargs="*"
)
return parser.parse_args(args)
def get_auth(args, encode=True, for_git_cli=False):
auth = None
if args.osx_keychain_item_name:
if not args.osx_keychain_item_account:
raise Exception(
"You must specify both name and account fields for osx keychain password items"
)
else:
if platform.system() != "Darwin":
raise Exception("Keychain arguments are only supported on Mac OSX")
try:
with open(os.devnull, "w") as devnull:
token = subprocess.check_output(
[
"security",
"find-generic-password",
"-s",
args.osx_keychain_item_name,
"-a",
args.osx_keychain_item_account,
"-w",
],
stderr=devnull,
).strip()
token = token.decode("utf-8")
auth = token + ":" + "x-oauth-basic"
except subprocess.SubprocessError:
raise Exception(
"No password item matching the provided name and account could be found in the osx keychain."
)
elif args.osx_keychain_item_account:
raise Exception(
"You must specify both name and account fields for osx keychain password items"
)
elif args.token_fine:
if args.token_fine.startswith(FILE_URI_PREFIX):
args.token_fine = read_file_contents(args.token_fine)
if args.token_fine.startswith("github_pat_"):
auth = args.token_fine
else:
raise Exception(
"Fine-grained token supplied does not look like a GitHub PAT"
)
elif args.token_classic:
if args.token_classic.startswith(FILE_URI_PREFIX):
args.token_classic = read_file_contents(args.token_classic)
if not args.as_app:
auth = args.token_classic + ":" + "x-oauth-basic"
else:
if not for_git_cli:
auth = args.token_classic
else:
auth = "x-access-token:" + args.token_classic
elif args.username:
if not args.password:
args.password = getpass.getpass()
if encode:
password = args.password
else:
password = urlquote(args.password)
auth = args.username + ":" + password
elif args.password:
raise Exception("You must specify a username for basic auth")
if not auth:
return None
if not encode or args.token_fine is not None:
return auth
return base64.b64encode(auth.encode("ascii"))
def get_github_api_host(args):
if args.github_host:
host = args.github_host + "/api/v3"
else:
host = "api.github.com"
return host
def get_github_host(args):
if args.github_host:
host = args.github_host
else:
host = "github.com"
return host
def read_file_contents(file_uri):
return open(file_uri[len(FILE_URI_PREFIX) :], "rt").readline().strip()
def get_github_repo_url(args, repository):
if repository.get("is_gist"):
if args.prefer_ssh:
# The git_pull_url value is always https for gists, so we need to transform it to ssh form
repo_url = re.sub(
r"^https?:\/\/(.+)\/(.+)\.git$",
r"git@\1:\2.git",
repository["git_pull_url"],
)
repo_url = re.sub(
r"^git@gist\.", "git@", repo_url
) # strip gist subdomain for better hostkey compatibility
else:
repo_url = repository["git_pull_url"]
return repo_url
if args.prefer_ssh:
return repository["ssh_url"]
auth = get_auth(args, encode=False, for_git_cli=True)
if auth:
repo_url = "https://{0}@{1}/{2}/{3}.git".format(
auth if args.token_fine is None else "oauth2:" + auth,
get_github_host(args),
repository["owner"]["login"],
repository["name"],
)
else:
repo_url = repository["clone_url"]
return repo_url
def retrieve_data_gen(args, template, query_args=None, single_request=False):
auth = get_auth(args, encode=not args.as_app)
query_args = get_query_args(query_args)
per_page = 100
page = 0
while True:
if single_request:
request_page, request_per_page = None, None
else:
page = page + 1
request_page, request_per_page = page, per_page
request = _construct_request(
request_per_page,
request_page,
query_args,
template,
auth,
as_app=args.as_app,
fine=True if args.token_fine is not None else False,
) # noqa
r, errors = _get_response(request, auth, template)
status_code = int(r.getcode())
# Check if we got correct data
try:
response = json.loads(r.read().decode("utf-8"))
except IncompleteRead:
logger.warning("Incomplete read error detected")
read_error = True
except json.decoder.JSONDecodeError:
logger.warning("JSON decode error detected")
read_error = True
except TimeoutError:
logger.warning("Tiemout error detected")
read_error = True
else:
read_error = False
# be gentle with API request limit and throttle requests if remaining requests getting low
limit_remaining = int(r.headers.get("x-ratelimit-remaining", 0))
if args.throttle_limit and limit_remaining <= args.throttle_limit:
logger.info(
"API request limit hit: {} requests left, pausing further requests for {}s".format(
limit_remaining, args.throttle_pause
)
)
time.sleep(args.throttle_pause)
retries = 0
while retries < 3 and (status_code == 502 or read_error):
logger.warning("API request failed. Retrying in 5 seconds")
retries += 1
time.sleep(5)
request = _construct_request(
per_page,
page,
query_args,
template,
auth,
as_app=args.as_app,
fine=True if args.token_fine is not None else False,
) # noqa
r, errors = _get_response(request, auth, template)
status_code = int(r.getcode())
try:
response = json.loads(r.read().decode("utf-8"))
read_error = False
except IncompleteRead:
logger.warning("Incomplete read error detected")
read_error = True
except json.decoder.JSONDecodeError:
logger.warning("JSON decode error detected")
read_error = True
except TimeoutError:
logger.warning("Tiemout error detected")
read_error = True
if status_code != 200:
template = "API request returned HTTP {0}: {1}"
errors.append(template.format(status_code, r.reason))
raise Exception(", ".join(errors))
if read_error:
template = "API request problem reading response for {0}"
errors.append(template.format(request))
raise Exception(", ".join(errors))
if len(errors) == 0:
if type(response) is list:
for resp in response:
yield resp
if len(response) < per_page:
break
elif type(response) is dict and single_request:
yield response
if len(errors) > 0:
raise Exception(", ".join(errors))
if single_request:
break
def retrieve_data(args, template, query_args=None, single_request=False):
return list(retrieve_data_gen(args, template, query_args, single_request))
def get_query_args(query_args=None):
if not query_args:
query_args = {}
return query_args
def _get_response(request, auth, template):
retry_timeout = 3
errors = []
# We'll make requests in a loop so we can
# delay and retry in the case of rate-limiting
while True:
should_continue = False
try:
r = urlopen(request, context=https_ctx)
except HTTPError as exc:
errors, should_continue = _request_http_error(exc, auth, errors) # noqa
r = exc
except URLError as e:
logger.warning(e.reason)
should_continue, retry_timeout = _request_url_error(template, retry_timeout)
if not should_continue:
raise
except socket.error as e:
logger.warning(e.strerror)
should_continue, retry_timeout = _request_url_error(template, retry_timeout)
if not should_continue:
raise
if should_continue:
continue
break
return r, errors
def _construct_request(
per_page, page, query_args, template, auth, as_app=None, fine=False
):
all_query_args = {}
if per_page:
all_query_args["per_page"] = per_page
if page:
all_query_args["page"] = page
if query_args:
all_query_args.update(query_args)
request_url = template
if all_query_args:
querystring = urlencode(all_query_args)
request_url = template + "?" + querystring
else:
querystring = ""
request = Request(request_url)
if auth is not None:
if not as_app:
if fine:
request.add_header("Authorization", "token " + auth)
else:
request.add_header("Authorization", "Basic ".encode("ascii") + auth)
else:
auth = auth.encode("ascii")
request.add_header("Authorization", "token ".encode("ascii") + auth)
request.add_header(
"Accept", "application/vnd.github.machine-man-preview+json"
)
log_url = template
if querystring:
log_url += "?" + querystring
logger.info("Requesting {}".format(log_url))
return request
def _request_http_error(exc, auth, errors):
# HTTPError behaves like a Response so we can
# check the status code and headers to see exactly
# what failed.
should_continue = False
headers = exc.headers
limit_remaining = int(headers.get("x-ratelimit-remaining", 0))
if exc.code == 403 and limit_remaining < 1:
# The X-RateLimit-Reset header includes a
# timestamp telling us when the limit will reset
# so we can calculate how long to wait rather
# than inefficiently polling:
gm_now = calendar.timegm(time.gmtime())
reset = int(headers.get("x-ratelimit-reset", 0)) or gm_now
# We'll never sleep for less than 10 seconds:
delta = max(10, reset - gm_now)
limit = headers.get("x-ratelimit-limit")
logger.warning(
"Exceeded rate limit of {} requests; waiting {} seconds to reset".format(
limit, delta
)
) # noqa
if auth is None:
logger.info("Hint: Authenticate to raise your GitHub rate limit")
time.sleep(delta)
should_continue = True
return errors, should_continue
def _request_url_error(template, retry_timeout):
# In case of a connection timing out, we can retry a few time
# But we won't crash and not back-up the rest now
logger.info("'{}' timed out".format(template))
retry_timeout -= 1
if retry_timeout >= 0:
return True, retry_timeout
raise Exception("'{}' timed out to much, skipping!".format(template))
class S3HTTPRedirectHandler(HTTPRedirectHandler):
"""
A subclassed redirect handler for downloading Github assets from S3.
urllib will add the Authorization header to the redirected request to S3, which will result in a 400,
so we should remove said header on redirect.
"""
def redirect_request(self, req, fp, code, msg, headers, newurl):
request = super(S3HTTPRedirectHandler, self).redirect_request(
req, fp, code, msg, headers, newurl
)
# Only delete Authorization header if it exists (attachments may not have it)
if "Authorization" in request.headers:
del request.headers["Authorization"]
return request
def download_file(url, path, auth, as_app=False, fine=False):
# Skip downloading release assets if they already exist on disk so we don't redownload on every sync
if os.path.exists(path):
return
request = _construct_request(
per_page=100,
page=1,
query_args={},
template=url,
auth=auth,
as_app=as_app,
fine=fine,
)
request.add_header("Accept", "application/octet-stream")
opener = build_opener(S3HTTPRedirectHandler)
try:
response = opener.open(request)
chunk_size = 16 * 1024
with open(path, "wb") as f:
while True:
chunk = response.read(chunk_size)
if not chunk:
break
f.write(chunk)
except HTTPError as exc:
# Gracefully handle 404 responses (and others) when downloading from S3
logger.warning(
"Skipping download of asset {0} due to HTTPError: {1}".format(
url, exc.reason
)
)
except URLError as e:
# Gracefully handle other URL errors
logger.warning(
"Skipping download of asset {0} due to URLError: {1}".format(url, e.reason)
)
except socket.error as e:
# Gracefully handle socket errors
# TODO: Implement retry logic
logger.warning(
"Skipping download of asset {0} due to socker error: {1}".format(
url, e.strerror
)
)
def download_attachment_file(url, path, auth, as_app=False, fine=False):
"""Download attachment file directly (not via GitHub API).
Similar to download_file() but for direct file URLs, not API endpoints.
Attachment URLs (user-images, user-attachments) are direct downloads,
not API endpoints, so we skip _construct_request() which adds API params.
URL Format Support & Authentication Requirements:
| URL Format | Auth Required | Notes |
|----------------------------------------------|---------------|--------------------------|
| github.com/user-attachments/assets/* | Private only | Modern format (2024+) |
| github.com/user-attachments/files/* | Private only | Modern format (2024+) |
| user-images.githubusercontent.com/* | No (public) | Legacy CDN, all eras |
| private-user-images.githubusercontent.com/* | JWT in URL | Legacy private (5min) |
| github.com/{owner}/{repo}/files/* | Repo filter | Old repo files |
- Modern user-attachments: Requires GitHub token auth for private repos
- Legacy public CDN: No auth needed/accepted (returns 400 with auth header)
- Legacy private CDN: Uses JWT token embedded in URL, no GitHub token needed
- Repo files: Filtered to current repository only during extraction
Returns dict with metadata:
- success: bool
- http_status: int (200, 404, etc.)
- content_type: str or None
- original_filename: str or None (from Content-Disposition)
- size_bytes: int or None
- error: str or None
"""
import re
from datetime import datetime, timezone
metadata = {
"url": url,
"success": False,
"http_status": None,
"content_type": None,
"original_filename": None,
"size_bytes": None,
"downloaded_at": datetime.now(timezone.utc).isoformat(),
"error": None,
}
if os.path.exists(path):
metadata["success"] = True
metadata["http_status"] = 200 # Assume success if already exists
metadata["size_bytes"] = os.path.getsize(path)
return metadata
# Create simple request (no API query params)
request = Request(url)
request.add_header("Accept", "application/octet-stream")
# Add authentication header only for modern github.com/user-attachments URLs
# Legacy CDN URLs (user-images.githubusercontent.com) are public and don't need/accept auth
# Private CDN URLs (private-user-images) use JWT tokens embedded in the URL
if auth is not None and "github.com/user-attachments/" in url:
if not as_app:
if fine:
# Fine-grained token: plain token with "token " prefix
request.add_header("Authorization", "token " + auth)
else:
# Classic token: base64-encoded with "Basic " prefix
request.add_header("Authorization", "Basic ".encode("ascii") + auth)
else:
# App authentication
auth = auth.encode("ascii")
request.add_header("Authorization", "token ".encode("ascii") + auth)
# Reuse S3HTTPRedirectHandler from download_file()
opener = build_opener(S3HTTPRedirectHandler)
try:
response = opener.open(request)
metadata["http_status"] = response.getcode()
# Extract Content-Type
content_type = response.headers.get("Content-Type", "").split(";")[0].strip()
if content_type:
metadata["content_type"] = content_type
# Extract original filename from Content-Disposition header
# Format: attachment; filename=example.mov or attachment;filename="example.mov"
content_disposition = response.headers.get("Content-Disposition", "")
if content_disposition:
# Match: filename=something or filename="something" or filename*=UTF-8''something
match = re.search(r'filename\*?=["\']?([^"\';\r\n]+)', content_disposition)
if match:
original_filename = match.group(1).strip()
# Handle RFC 5987 encoding: filename*=UTF-8''example.mov
if "UTF-8''" in original_filename:
original_filename = original_filename.split("UTF-8''")[1]
metadata["original_filename"] = original_filename
# Fallback: Extract filename from final URL after redirects
# This handles user-attachments/assets URLs which redirect to S3 with filename.ext
if not metadata["original_filename"]:
from urllib.parse import urlparse, unquote
final_url = response.geturl()
parsed = urlparse(final_url)
# Get filename from path (last component before query string)
path_parts = parsed.path.split("/")
if path_parts:
# URL might be encoded, decode it
filename_from_url = unquote(path_parts[-1])
# Only use if it has an extension
if "." in filename_from_url:
metadata["original_filename"] = filename_from_url
# Download file
chunk_size = 16 * 1024
bytes_downloaded = 0
with open(path, "wb") as f:
while True:
chunk = response.read(chunk_size)
if not chunk:
break
f.write(chunk)
bytes_downloaded += len(chunk)
metadata["size_bytes"] = bytes_downloaded
metadata["success"] = True
except HTTPError as exc:
metadata["http_status"] = exc.code
metadata["error"] = str(exc.reason)
logger.warning(
"Skipping download of attachment {0} due to HTTPError: {1}".format(
url, exc.reason
)
)
except URLError as e:
metadata["error"] = str(e.reason)
logger.warning(
"Skipping download of attachment {0} due to URLError: {1}".format(
url, e.reason
)
)
except socket.error as e:
metadata["error"] = str(e.strerror) if hasattr(e, "strerror") else str(e)
logger.warning(
"Skipping download of attachment {0} due to socket error: {1}".format(
url, e.strerror if hasattr(e, "strerror") else str(e)
)
)
except Exception as e:
metadata["error"] = str(e)
logger.warning(
"Skipping download of attachment {0} due to error: {1}".format(url, str(e))
)
return metadata
def extract_attachment_urls(item_data, issue_number=None, repository_full_name=None):
"""Extract GitHub-hosted attachment URLs from issue/PR body and comments.
What qualifies as an attachment?
There is no "attachment" concept in the GitHub API - it's a user behavior pattern
we've identified through analysis of real-world repositories. We define attachments as:
- User-uploaded files hosted on GitHub's CDN domains
- Found outside of code blocks (not examples/documentation)
- Matches known GitHub attachment URL patterns
This intentionally captures bare URLs pasted by users, not just markdown/HTML syntax.
Some false positives (example URLs in documentation) may occur - these fail gracefully
with HTTP 404 and are logged in the manifest.
Supported URL formats:
- Modern: github.com/user-attachments/{assets,files}/*
- Legacy: user-images.githubusercontent.com/* (including private-user-images)
- Repo files: github.com/{owner}/{repo}/files/* (filtered to current repo)
- Repo assets: github.com/{owner}/{repo}/assets/* (filtered to current repo)
Repository filtering (repo files/assets only):
- Direct match: URL is for current repository → included
- Redirect match: URL redirects to current repository → included (handles renames/transfers)
- Different repo: URL is for different repository → excluded
Code block filtering:
- Removes fenced code blocks (```) and inline code (`) before extraction
- Prevents extracting URLs from code examples and documentation snippets
Args:
item_data: Issue or PR data dict
issue_number: Issue/PR number for logging
repository_full_name: Full repository name (owner/repo) for filtering repo-scoped URLs
"""
import re
urls = []
# Define all GitHub attachment patterns
# Stop at markdown punctuation: whitespace, ), `, ", >, <
# Trailing sentence punctuation (. ! ? , ; : ' ") is stripped in post-processing
patterns = [
r'https://github\.com/user-attachments/(?:assets|files)/[^\s\)`"<>]+', # Modern
r'https://(?:private-)?user-images\.githubusercontent\.com/[^\s\)`"<>]+', # Legacy CDN
]
# Add repo-scoped patterns (will be filtered by repository later)
# These patterns match ANY repo, then we filter to current repo with redirect checking
repo_files_pattern = r'https://github\.com/[^/]+/[^/]+/files/\d+/[^\s\)`"<>]+'
repo_assets_pattern = r'https://github\.com/[^/]+/[^/]+/assets/\d+/[^\s\)`"<>]+'
patterns.append(repo_files_pattern)
patterns.append(repo_assets_pattern)
def clean_url(url):
"""Remove trailing sentence and markdown punctuation that's not part of the URL."""
return url.rstrip(".!?,;:'\")")
def remove_code_blocks(text):
"""Remove markdown code blocks (fenced and inline) from text.
This prevents extracting URLs from code examples like:
- Fenced code blocks: ```code```
- Inline code: `code`
"""
# Remove fenced code blocks first (```...```)
# DOTALL flag makes . match newlines
text = re.sub(r"```.*?```", "", text, flags=re.DOTALL)
# Remove inline code (`...`)
# Non-greedy match between backticks
text = re.sub(r"`[^`]*`", "", text)
return text
def is_repo_scoped_url(url):
"""Check if URL is a repo-scoped attachment (files or assets)."""
return bool(
re.match(r"https://github\.com/[^/]+/[^/]+/(?:files|assets)/\d+/", url)
)
def check_redirect_to_current_repo(url, current_repo):
"""Check if URL redirects to current repository.
Returns True if:
- URL is already for current repo
- URL redirects (301/302) to current repo (handles renames/transfers)
Returns False otherwise (URL is for a different repo).
"""
# Extract owner/repo from URL
match = re.match(r"https://github\.com/([^/]+)/([^/]+)/", url)
if not match:
return False
url_owner, url_repo = match.groups()
url_repo_full = f"{url_owner}/{url_repo}"
# Direct match - no need to check redirect
if url_repo_full.lower() == current_repo.lower():
return True
# Different repo - check if it redirects to current repo
# This handles repository transfers and renames
try:
import urllib.request
import urllib.error
# Make HEAD request with redirect following disabled
# We need to manually handle redirects to see the Location header
request = urllib.request.Request(url, method="HEAD")
request.add_header("User-Agent", "python-github-backup")
# Create opener that does NOT follow redirects
class NoRedirectHandler(urllib.request.HTTPRedirectHandler):
def redirect_request(self, req, fp, code, msg, headers, newurl):
return None # Don't follow redirects
opener = urllib.request.build_opener(NoRedirectHandler)
try:
_ = opener.open(request, timeout=10)
# Got 200 - URL works as-is but for different repo
return False
except urllib.error.HTTPError as e:
# Check if it's a redirect (301, 302, 307, 308)
if e.code in (301, 302, 307, 308):
location = e.headers.get("Location", "")
# Check if redirect points to current repo
if location:
redirect_match = re.match(
r"https://github\.com/([^/]+)/([^/]+)/", location
)
if redirect_match:
redirect_owner, redirect_repo = redirect_match.groups()
redirect_repo_full = f"{redirect_owner}/{redirect_repo}"
return redirect_repo_full.lower() == current_repo.lower()
return False
except Exception:
# On any error (timeout, network issue, etc.), be conservative
# and exclude the URL to avoid downloading from wrong repos
return False
# Extract from body
body = item_data.get("body") or ""
# Remove code blocks before searching for URLs
body_cleaned = remove_code_blocks(body)
for pattern in patterns:
found_urls = re.findall(pattern, body_cleaned)
urls.extend([clean_url(url) for url in found_urls])
# Extract from issue comments
if "comment_data" in item_data:
for comment in item_data["comment_data"]:
comment_body = comment.get("body") or ""
# Remove code blocks before searching for URLs
comment_cleaned = remove_code_blocks(comment_body)
for pattern in patterns:
found_urls = re.findall(pattern, comment_cleaned)
urls.extend([clean_url(url) for url in found_urls])
# Extract from PR regular comments
if "comment_regular_data" in item_data:
for comment in item_data["comment_regular_data"]:
comment_body = comment.get("body") or ""
# Remove code blocks before searching for URLs
comment_cleaned = remove_code_blocks(comment_body)
for pattern in patterns:
found_urls = re.findall(pattern, comment_cleaned)
urls.extend([clean_url(url) for url in found_urls])
regex_urls = list(set(urls)) # dedupe
# Filter repo-scoped URLs to current repository only
# This handles repository transfers/renames via redirect checking
if repository_full_name:
filtered_urls = []
for url in regex_urls:
if is_repo_scoped_url(url):
# Check if URL belongs to current repo (or redirects to it)
if check_redirect_to_current_repo(url, repository_full_name):
filtered_urls.append(url)
# else: skip URLs from other repositories
else:
# Non-repo-scoped URLs (user-attachments, CDN) - always include
filtered_urls.append(url)
regex_urls = filtered_urls
return regex_urls
def extract_and_apply_extension(filepath, original_filename):
"""Extract extension from original filename and rename file if needed.
Args:
filepath: Current file path (may have no extension)
original_filename: Original filename from Content-Disposition (has extension)
Returns:
Final filepath with extension applied
"""
if not original_filename or not os.path.exists(filepath):
return filepath
# Get extension from original filename
original_ext = os.path.splitext(original_filename)[1]
if not original_ext:
return filepath
# Check if current file already has this extension
current_ext = os.path.splitext(filepath)[1]
if current_ext == original_ext:
return filepath
# Rename file to add extension
new_filepath = filepath + original_ext
try:
os.rename(filepath, new_filepath)
logger.debug("Renamed {0} to {1}".format(filepath, new_filepath))
return new_filepath
except Exception as e:
logger.warning("Could not rename {0}: {1}".format(filepath, str(e)))
return filepath
def get_attachment_filename(url):
"""Get filename from attachment URL, handling all GitHub formats.
Formats:
- github.com/user-attachments/assets/{uuid} → uuid (add extension later)
- github.com/user-attachments/files/{id}/{filename} → filename
- github.com/{owner}/{repo}/files/{id}/{filename} → filename
- user-images.githubusercontent.com/{user}/{hash}.{ext} → hash.ext
- private-user-images.githubusercontent.com/...?jwt=... → extract from path
"""
from urllib.parse import urlparse
parsed = urlparse(url)
path_parts = parsed.path.split("/")
# Modern: /user-attachments/files/{id}/{filename}
if "user-attachments/files" in parsed.path:
return path_parts[-1]
# Modern: /user-attachments/assets/{uuid}
elif "user-attachments/assets" in parsed.path:
return path_parts[-1] # extension added later via detect_and_add_extension
# Repo files: /{owner}/{repo}/files/{id}/{filename}
elif "/files/" in parsed.path and len(path_parts) >= 2:
return path_parts[-1]
# Legacy: user-images.githubusercontent.com/{user}/{hash-with-ext}
elif "githubusercontent.com" in parsed.netloc:
return path_parts[-1] # Already has extension usually
# Fallback: use last path component
return path_parts[-1] if path_parts[-1] else "unknown_attachment"
def resolve_filename_collision(filepath):
"""Resolve filename collisions using counter suffix pattern.
If filepath exists, returns a new filepath with counter suffix.
Pattern: report.pdf → report_1.pdf → report_2.pdf
Also protects against manifest.json collisions by treating it as reserved.
Args:
filepath: Full path to file that might exist
Returns:
filepath that doesn't collide (may be same as input if no collision)
"""
directory = os.path.dirname(filepath)
filename = os.path.basename(filepath)
# Protect manifest.json - it's a reserved filename
if filename == "manifest.json":
name, ext = os.path.splitext(filename)
counter = 1
while True:
new_filename = f"{name}_{counter}{ext}"
new_filepath = os.path.join(directory, new_filename)
if not os.path.exists(new_filepath):
return new_filepath
counter += 1
if not os.path.exists(filepath):
return filepath
name, ext = os.path.splitext(filename)
counter = 1
while True:
new_filename = f"{name}_{counter}{ext}"
new_filepath = os.path.join(directory, new_filename)
if not os.path.exists(new_filepath):
return new_filepath
counter += 1
def download_attachments(args, item_cwd, item_data, number, repository, item_type="issue"):
"""Download user-attachments from issue/PR body and comments with manifest.
Args:
args: Command line arguments
item_cwd: Working directory (issue_cwd or pulls_cwd)
item_data: Issue or PR data dict
number: Issue or PR number
repository: Repository dict
item_type: "issue" or "pull" for logging/manifest
"""
import json
from datetime import datetime, timezone
item_type_display = "issue" if item_type == "issue" else "pull request"
urls = extract_attachment_urls(
item_data, issue_number=number, repository_full_name=repository["full_name"]
)
if not urls:
return
attachments_dir = os.path.join(item_cwd, "attachments", str(number))
manifest_path = os.path.join(attachments_dir, "manifest.json")
# Load existing manifest if skip_existing is enabled
existing_urls = set()
existing_metadata = []
if args.skip_existing and os.path.exists(manifest_path):
try:
with open(manifest_path, "r") as f:
existing_manifest = json.load(f)
all_metadata = existing_manifest.get("attachments", [])
# Only skip URLs that were successfully downloaded OR failed with permanent errors
# Retry transient failures (5xx, timeouts, network errors)
for item in all_metadata:
if item.get("success"):
existing_urls.add(item["url"])
else:
# Check if this is a permanent failure (don't retry) or transient (retry)
http_status = item.get("http_status")
if http_status in [404, 410, 451]:
# Permanent failures - don't retry
existing_urls.add(item["url"])
# Transient failures (5xx, auth errors, timeouts) will be retried
existing_metadata = all_metadata
except (json.JSONDecodeError, IOError):
# If manifest is corrupted, re-download everything
logger.warning(
"Corrupted manifest for {0} #{1}, will re-download".format(
item_type_display, number
)
)
existing_urls = set()
existing_metadata = []
# Filter to only new URLs
new_urls = [url for url in urls if url not in existing_urls]
if not new_urls and existing_urls:
logger.debug(
"Skipping attachments for {0} #{1} (all {2} already downloaded)".format(
item_type_display, number, len(urls)
)
)
return
if new_urls:
logger.info(
"Downloading {0} new attachment(s) for {1} #{2}".format(
len(new_urls), item_type_display, number
)
)
mkdir_p(item_cwd, attachments_dir)
# Collect metadata for manifest (start with existing)
attachment_metadata_list = existing_metadata[:]
for url in new_urls:
filename = get_attachment_filename(url)
filepath = os.path.join(attachments_dir, filename)
# Check for collision BEFORE downloading
filepath = resolve_filename_collision(filepath)
# Download and get metadata
metadata = download_attachment_file(
url,
filepath,
get_auth(args, encode=not args.as_app),
as_app=args.as_app,
fine=args.token_fine is not None,
)
# Apply extension from Content-Disposition if available
if metadata["success"] and metadata.get("original_filename"):
final_filepath = extract_and_apply_extension(
filepath, metadata["original_filename"]
)
# Check for collision again ONLY if filename changed (extension was added)
if final_filepath != filepath:
final_filepath = resolve_filename_collision(final_filepath)
# Update saved_as to reflect actual filename
metadata["saved_as"] = os.path.basename(final_filepath)
else:
metadata["saved_as"] = (
os.path.basename(filepath) if metadata["success"] else None
)
attachment_metadata_list.append(metadata)
# Write manifest
if attachment_metadata_list:
manifest = {
"issue_number": number,
"issue_type": item_type,
"repository": f"{args.user}/{args.repository}"
if hasattr(args, "repository") and args.repository
else args.user,
"manifest_updated_at": datetime.now(timezone.utc).isoformat(),
"attachments": attachment_metadata_list,
}
manifest_path = os.path.join(attachments_dir, "manifest.json")
with open(manifest_path, "w") as f:
json.dump(manifest, f, indent=2)
logger.debug(
"Wrote manifest for {0} #{1}: {2} attachments".format(
item_type_display, number, len(attachment_metadata_list)
)
)
def get_authenticated_user(args):
template = "https://{0}/user".format(get_github_api_host(args))
data = retrieve_data(args, template, single_request=True)
return data[0]
def check_git_lfs_install():
exit_code = subprocess.call(["git", "lfs", "version"])
if exit_code != 0:
raise Exception(
"The argument --lfs requires you to have Git LFS installed.\nYou can get it from https://git-lfs.github.com."
)
def retrieve_repositories(args, authenticated_user):
logger.info("Retrieving repositories")
single_request = False
if args.user == authenticated_user["login"]:
# we must use the /user/repos API to be able to access private repos
template = "https://{0}/user/repos".format(get_github_api_host(args))
else:
if args.private and not args.organization:
logger.warning(
"Authenticated user is different from user being backed up, thus private repositories cannot be accessed"
)
template = "https://{0}/users/{1}/repos".format(
get_github_api_host(args), args.user
)
if args.organization:
template = "https://{0}/orgs/{1}/repos".format(
get_github_api_host(args), args.user
)
if args.repository:
if "/" in args.repository:
repo_path = args.repository
else:
repo_path = "{0}/{1}".format(args.user, args.repository)
single_request = True
template = "https://{0}/repos/{1}".format(
get_github_api_host(args), repo_path
)
repos = retrieve_data(args, template, single_request=single_request)
if args.all_starred:
starred_template = "https://{0}/users/{1}/starred".format(
get_github_api_host(args), args.user
)
starred_repos = retrieve_data(args, starred_template, single_request=False)
# flag each repo as starred for downstream processing
for item in starred_repos:
item.update({"is_starred": True})
repos.extend(starred_repos)
if args.include_gists:
gists_template = "https://{0}/users/{1}/gists".format(
get_github_api_host(args), args.user
)
gists = retrieve_data(args, gists_template, single_request=False)
# flag each repo as a gist for downstream processing
for item in gists:
item.update({"is_gist": True})
repos.extend(gists)
if args.include_starred_gists:
starred_gists_template = "https://{0}/gists/starred".format(
get_github_api_host(args)
)
starred_gists = retrieve_data(
args, starred_gists_template, single_request=False
)
# flag each repo as a starred gist for downstream processing
for item in starred_gists:
item.update({"is_gist": True, "is_starred": True})
repos.extend(starred_gists)
return repos
def filter_repositories(args, unfiltered_repositories):
if args.repository:
return unfiltered_repositories
logger.info("Filtering repositories")
repositories = []
for r in unfiltered_repositories:
# gists can be anonymous, so need to safely check owner
if r.get("owner", {}).get("login") == args.user or r.get("is_starred"):
repositories.append(r)
name_regex = None
if args.name_regex:
name_regex = re.compile(args.name_regex)
languages = None
if args.languages:
languages = [x.lower() for x in args.languages]
if not args.fork:
repositories = [r for r in repositories if not r.get("fork")]
if not args.private:
repositories = [
r for r in repositories if not r.get("private") or r.get("public")
]
if languages:
repositories = [
r
for r in repositories
if r.get("language") and r.get("language").lower() in languages
] # noqa
if name_regex:
repositories = [
r for r in repositories if "name" not in r or name_regex.match(r["name"])
]
if args.skip_archived:
repositories = [r for r in repositories if not r.get("archived")]
if args.exclude:
repositories = [
r for r in repositories if "name" not in r or r["name"] not in args.exclude
]
return repositories
def backup_repositories(args, output_directory, repositories):
logger.info("Backing up repositories")
repos_template = "https://{0}/repos".format(get_github_api_host(args))
if args.incremental:
last_update_path = os.path.join(output_directory, "last_update")
if os.path.exists(last_update_path):
args.since = open(last_update_path).read().strip()
else:
args.since = None
else:
args.since = None
last_update = "0000-00-00T00:00:00Z"
for repository in repositories:
if "updated_at" in repository and repository["updated_at"] > last_update:
last_update = repository["updated_at"]
elif "pushed_at" in repository and repository["pushed_at"] > last_update:
last_update = repository["pushed_at"]
if repository.get("is_gist"):
repo_cwd = os.path.join(output_directory, "gists", repository["id"])
elif repository.get("is_starred"):
# put starred repos in -o/starred/${owner}/${repo} to prevent collision of
# any repositories with the same name
repo_cwd = os.path.join(
output_directory,
"starred",
repository["owner"]["login"],
repository["name"],
)
else:
repo_cwd = os.path.join(
output_directory, "repositories", repository["name"]
)
repo_dir = os.path.join(repo_cwd, "repository")
repo_url = get_github_repo_url(args, repository)
include_gists = args.include_gists or args.include_starred_gists
if (args.include_repository or args.include_everything) or (
include_gists and repository.get("is_gist")
):
repo_name = (
repository.get("name")
if not repository.get("is_gist")
else repository.get("id")
)
fetch_repository(
repo_name,
repo_url,
repo_dir,
skip_existing=args.skip_existing,
bare_clone=args.bare_clone,
lfs_clone=args.lfs_clone,
no_prune=args.no_prune,
)
if repository.get("is_gist"):
# dump gist information to a file as well
output_file = "{0}/gist.json".format(repo_cwd)
with codecs.open(output_file, "w", encoding="utf-8") as f:
json_dump(repository, f)
continue # don't try to back anything else for a gist; it doesn't exist
download_wiki = args.include_wiki or args.include_everything
if repository["has_wiki"] and download_wiki:
fetch_repository(
repository["name"],
repo_url.replace(".git", ".wiki.git"),
os.path.join(repo_cwd, "wiki"),
skip_existing=args.skip_existing,
bare_clone=args.bare_clone,
lfs_clone=args.lfs_clone,
no_prune=args.no_prune,
)
if args.include_issues or args.include_everything:
backup_issues(args, repo_cwd, repository, repos_template)
if args.include_pulls or args.include_everything:
backup_pulls(args, repo_cwd, repository, repos_template)
if args.include_milestones or args.include_everything:
backup_milestones(args, repo_cwd, repository, repos_template)
if args.include_labels or args.include_everything:
backup_labels(args, repo_cwd, repository, repos_template)
if args.include_hooks or args.include_everything:
backup_hooks(args, repo_cwd, repository, repos_template)
if args.include_releases or args.include_everything:
backup_releases(
args,
repo_cwd,
repository,
repos_template,
include_assets=args.include_assets or args.include_everything,
)
if args.incremental:
if last_update == "0000-00-00T00:00:00Z":
last_update = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.localtime())
open(last_update_path, "w").write(last_update)
def backup_issues(args, repo_cwd, repository, repos_template):
has_issues_dir = os.path.isdir("{0}/issues/.git".format(repo_cwd))
if args.skip_existing and has_issues_dir:
return
logger.info("Retrieving {0} issues".format(repository["full_name"]))
issue_cwd = os.path.join(repo_cwd, "issues")
mkdir_p(repo_cwd, issue_cwd)
issues = {}
issues_skipped = 0
issues_skipped_message = ""
_issue_template = "{0}/{1}/issues".format(repos_template, repository["full_name"])
should_include_pulls = args.include_pulls or args.include_everything
issue_states = ["open", "closed"]
for issue_state in issue_states:
query_args = {"filter": "all", "state": issue_state}
if args.since:
query_args["since"] = args.since
_issues = retrieve_data(args, _issue_template, query_args=query_args)
for issue in _issues:
# skip pull requests which are also returned as issues
# if retrieving pull requests is requested as well
if "pull_request" in issue and should_include_pulls:
issues_skipped += 1
continue
issues[issue["number"]] = issue
if issues_skipped:
issues_skipped_message = " (skipped {0} pull requests)".format(issues_skipped)
logger.info(
"Saving {0} issues to disk{1}".format(
len(list(issues.keys())), issues_skipped_message
)
)
comments_template = _issue_template + "/{0}/comments"
events_template = _issue_template + "/{0}/events"
for number, issue in list(issues.items()):
issue_file = "{0}/{1}.json".format(issue_cwd, number)
if args.incremental_by_files and os.path.isfile(issue_file):
modified = os.path.getmtime(issue_file)
modified = datetime.fromtimestamp(modified).strftime("%Y-%m-%dT%H:%M:%SZ")
if modified > issue["updated_at"]:
logger.info("Skipping issue {0} because it wasn't modified since last backup".format(number))
continue
if args.include_issue_comments or args.include_everything:
template = comments_template.format(number)
issues[number]["comment_data"] = retrieve_data(args, template)
if args.include_issue_events or args.include_everything:
template = events_template.format(number)
issues[number]["event_data"] = retrieve_data(args, template)
if args.include_attachments:
download_attachments(
args, issue_cwd, issues[number], number, repository, item_type="issue"
)
with codecs.open(issue_file + ".temp", "w", encoding="utf-8") as f:
json_dump(issue, f)
os.rename(issue_file + ".temp", issue_file) # Unlike json_dump, this is atomic
def backup_pulls(args, repo_cwd, repository, repos_template):
has_pulls_dir = os.path.isdir("{0}/pulls/.git".format(repo_cwd))
if args.skip_existing and has_pulls_dir:
return
logger.info("Retrieving {0} pull requests".format(repository["full_name"])) # noqa
pulls_cwd = os.path.join(repo_cwd, "pulls")
mkdir_p(repo_cwd, pulls_cwd)
pulls = {}
_pulls_template = "{0}/{1}/pulls".format(repos_template, repository["full_name"])
_issue_template = "{0}/{1}/issues".format(repos_template, repository["full_name"])
query_args = {
"filter": "all",
"state": "all",
"sort": "updated",
"direction": "desc",
}
if not args.include_pull_details:
pull_states = ["open", "closed"]
for pull_state in pull_states:
query_args["state"] = pull_state
_pulls = retrieve_data_gen(args, _pulls_template, query_args=query_args)
for pull in _pulls:
if args.since and pull["updated_at"] < args.since:
break
if not args.since or pull["updated_at"] >= args.since:
pulls[pull["number"]] = pull
else:
_pulls = retrieve_data_gen(args, _pulls_template, query_args=query_args)
for pull in _pulls:
if args.since and pull["updated_at"] < args.since:
break
if not args.since or pull["updated_at"] >= args.since:
pulls[pull["number"]] = retrieve_data(
args,
_pulls_template + "/{}".format(pull["number"]),
single_request=True,
)[0]
logger.info("Saving {0} pull requests to disk".format(len(list(pulls.keys()))))
# Comments from pulls API are only _review_ comments
# regular comments need to be fetched via issue API.
# For backwards compatibility with versions <= 0.41.0
# keep name "comment_data" for review comments
comments_regular_template = _issue_template + "/{0}/comments"
comments_template = _pulls_template + "/{0}/comments"
commits_template = _pulls_template + "/{0}/commits"
for number, pull in list(pulls.items()):
pull_file = "{0}/{1}.json".format(pulls_cwd, number)
if args.incremental_by_files and os.path.isfile(pull_file):
modified = os.path.getmtime(pull_file)
modified = datetime.fromtimestamp(modified).strftime("%Y-%m-%dT%H:%M:%SZ")
if modified > pull["updated_at"]:
logger.info("Skipping pull request {0} because it wasn't modified since last backup".format(number))
continue
if args.include_pull_comments or args.include_everything:
template = comments_regular_template.format(number)
pulls[number]["comment_regular_data"] = retrieve_data(args, template)
template = comments_template.format(number)
pulls[number]["comment_data"] = retrieve_data(args, template)
if args.include_pull_commits or args.include_everything:
template = commits_template.format(number)
pulls[number]["commit_data"] = retrieve_data(args, template)
if args.include_attachments:
download_attachments(
args, pulls_cwd, pulls[number], number, repository, item_type="pull"
)
with codecs.open(pull_file + ".temp", "w", encoding="utf-8") as f:
json_dump(pull, f)
os.rename(pull_file + ".temp", pull_file) # Unlike json_dump, this is atomic
def backup_milestones(args, repo_cwd, repository, repos_template):
milestone_cwd = os.path.join(repo_cwd, "milestones")
if args.skip_existing and os.path.isdir(milestone_cwd):
return
logger.info("Retrieving {0} milestones".format(repository["full_name"]))
mkdir_p(repo_cwd, milestone_cwd)
template = "{0}/{1}/milestones".format(repos_template, repository["full_name"])
query_args = {"state": "all"}
_milestones = retrieve_data(args, template, query_args=query_args)
milestones = {}
for milestone in _milestones:
milestones[milestone["number"]] = milestone
logger.info("Saving {0} milestones to disk".format(len(list(milestones.keys()))))
for number, milestone in list(milestones.items()):
milestone_file = "{0}/{1}.json".format(milestone_cwd, number)
with codecs.open(milestone_file, "w", encoding="utf-8") as f:
json_dump(milestone, f)
def backup_labels(args, repo_cwd, repository, repos_template):
label_cwd = os.path.join(repo_cwd, "labels")
output_file = "{0}/labels.json".format(label_cwd)
template = "{0}/{1}/labels".format(repos_template, repository["full_name"])
_backup_data(args, "labels", template, output_file, label_cwd)
def backup_hooks(args, repo_cwd, repository, repos_template):
auth = get_auth(args)
if not auth:
logger.info("Skipping hooks since no authentication provided")
return
hook_cwd = os.path.join(repo_cwd, "hooks")
output_file = "{0}/hooks.json".format(hook_cwd)
template = "{0}/{1}/hooks".format(repos_template, repository["full_name"])
try:
_backup_data(args, "hooks", template, output_file, hook_cwd)
except Exception as e:
if "404" in str(e):
logger.info("Unable to read hooks, skipping")
else:
raise e
def backup_releases(args, repo_cwd, repository, repos_template, include_assets=False):
repository_fullname = repository["full_name"]
# give release files somewhere to live & log intent
release_cwd = os.path.join(repo_cwd, "releases")
logger.info("Retrieving {0} releases".format(repository_fullname))
mkdir_p(repo_cwd, release_cwd)
query_args = {}
release_template = "{0}/{1}/releases".format(repos_template, repository_fullname)
releases = retrieve_data(args, release_template, query_args=query_args)
if args.skip_prerelease:
releases = [r for r in releases if not r["prerelease"] and not r["draft"]]
if args.number_of_latest_releases and args.number_of_latest_releases < len(
releases
):
releases.sort(
key=lambda item: datetime.strptime(
item["created_at"], "%Y-%m-%dT%H:%M:%SZ"
),
reverse=True,
)
releases = releases[: args.number_of_latest_releases]
logger.info("Saving the latest {0} releases to disk".format(len(releases)))
else:
logger.info("Saving {0} releases to disk".format(len(releases)))
# for each release, store it
for release in releases:
release_name = release["tag_name"]
release_name_safe = release_name.replace("/", "__")
output_filepath = os.path.join(
release_cwd, "{0}.json".format(release_name_safe)
)
with codecs.open(output_filepath, "w+", encoding="utf-8") as f:
json_dump(release, f)
if include_assets:
assets = retrieve_data(args, release["assets_url"])
if len(assets) > 0:
# give release asset files somewhere to live & download them (not including source archives)
release_assets_cwd = os.path.join(release_cwd, release_name_safe)
mkdir_p(release_assets_cwd)
for asset in assets:
download_file(
asset["url"],
os.path.join(release_assets_cwd, asset["name"]),
get_auth(args, encode=not args.as_app),
as_app=args.as_app,
fine=True if args.token_fine is not None else False,
)
def fetch_repository(
name,
remote_url,
local_dir,
skip_existing=False,
bare_clone=False,
lfs_clone=False,
no_prune=False,
):
if bare_clone:
if os.path.exists(local_dir):
clone_exists = (
subprocess.check_output(
["git", "rev-parse", "--is-bare-repository"], cwd=local_dir
)
== b"true\n"
)
else:
clone_exists = False
else:
clone_exists = os.path.exists(os.path.join(local_dir, ".git"))
if clone_exists and skip_existing:
return
masked_remote_url = mask_password(remote_url)
initialized = subprocess.call(
"git ls-remote " + remote_url, stdout=FNULL, stderr=FNULL, shell=True
)
if initialized == 128:
logger.info(
"Skipping {0} ({1}) since it's not initialized".format(
name, masked_remote_url
)
)
return
if clone_exists:
logger.info("Updating {0} in {1}".format(name, local_dir))
remotes = subprocess.check_output(["git", "remote", "show"], cwd=local_dir)
remotes = [i.strip() for i in remotes.decode("utf-8").splitlines()]
if "origin" not in remotes:
git_command = ["git", "remote", "rm", "origin"]
logging_subprocess(git_command, cwd=local_dir)
git_command = ["git", "remote", "add", "origin", remote_url]
logging_subprocess(git_command, cwd=local_dir)
else:
git_command = ["git", "remote", "set-url", "origin", remote_url]
logging_subprocess(git_command, cwd=local_dir)
git_command = ["git", "fetch", "--all", "--force", "--tags", "--prune"]
if no_prune:
git_command.pop()
logging_subprocess(git_command, cwd=local_dir)
if lfs_clone:
git_command = ["git", "lfs", "fetch", "--all", "--prune"]
if no_prune:
git_command.pop()
logging_subprocess(git_command, cwd=local_dir)
else:
logger.info(
"Cloning {0} repository from {1} to {2}".format(
name, masked_remote_url, local_dir
)
)
if bare_clone:
git_command = ["git", "clone", "--mirror", remote_url, local_dir]
logging_subprocess(git_command)
if lfs_clone:
git_command = ["git", "lfs", "fetch", "--all", "--prune"]
if no_prune:
git_command.pop()
logging_subprocess(git_command, cwd=local_dir)
else:
if lfs_clone:
git_command = ["git", "lfs", "clone", remote_url, local_dir]
else:
git_command = ["git", "clone", remote_url, local_dir]
logging_subprocess(git_command)
def backup_account(args, output_directory):
account_cwd = os.path.join(output_directory, "account")
if args.include_starred or args.include_everything:
output_file = "{0}/starred.json".format(account_cwd)
template = "https://{0}/users/{1}/starred".format(
get_github_api_host(args), args.user
)
_backup_data(args, "starred repositories", template, output_file, account_cwd)
if args.include_watched or args.include_everything:
output_file = "{0}/watched.json".format(account_cwd)
template = "https://{0}/users/{1}/subscriptions".format(
get_github_api_host(args), args.user
)
_backup_data(args, "watched repositories", template, output_file, account_cwd)
if args.include_followers or args.include_everything:
output_file = "{0}/followers.json".format(account_cwd)
template = "https://{0}/users/{1}/followers".format(
get_github_api_host(args), args.user
)
_backup_data(args, "followers", template, output_file, account_cwd)
if args.include_following or args.include_everything:
output_file = "{0}/following.json".format(account_cwd)
template = "https://{0}/users/{1}/following".format(
get_github_api_host(args), args.user
)
_backup_data(args, "following", template, output_file, account_cwd)
def _backup_data(args, name, template, output_file, output_directory):
skip_existing = args.skip_existing
if not skip_existing or not os.path.exists(output_file):
logger.info("Retrieving {0} {1}".format(args.user, name))
mkdir_p(output_directory)
data = retrieve_data(args, template)
logger.info("Writing {0} {1} to disk".format(len(data), name))
with codecs.open(output_file, "w", encoding="utf-8") as f:
json_dump(data, f)
def json_dump(data, output_file):
json.dump(
data,
output_file,
ensure_ascii=False,
sort_keys=True,
indent=4,
separators=(",", ": "),
)