#!/usr/bin/env python from __future__ import print_function import argparse import base64 import calendar import codecs import errno import json import logging import os import platform import random import re import select import socket import ssl import subprocess import sys import time from collections.abc import Generator from datetime import datetime from http.client import IncompleteRead from urllib.error import HTTPError, URLError from urllib.parse import urlencode, urlparse from urllib.request import HTTPRedirectHandler, Request, build_opener, urlopen try: from . import __version__ VERSION = __version__ except ImportError: VERSION = "unknown" from .graphql_queries import ( DISCUSSION_DETAIL_QUERY, DISCUSSION_LIST_QUERY, DISCUSSION_PAGE_SIZE, DISCUSSION_REPLIES_QUERY, ) FNULL = open(os.devnull, "w") FILE_URI_PREFIX = "file://" logger = logging.getLogger(__name__) class RepositoryUnavailableError(Exception): """Raised when a repository is unavailable due to legal reasons (e.g., DMCA takedown, TOS violation).""" def __init__(self, message, legal_url=None): super().__init__(message) self.legal_url = legal_url # Setup SSL context with fallback chain https_ctx = ssl.create_default_context() if https_ctx.get_ca_certs(): # Layer 1: Certificates pre-loaded from system (file-based) pass else: paths = ssl.get_default_verify_paths() if (paths.cafile and os.path.exists(paths.cafile)) or ( paths.capath and os.path.exists(paths.capath) ): # Layer 2: Cert paths exist, will be lazy-loaded on first use (directory-based) pass else: # Layer 3: Try certifi package as optional fallback try: import certifi https_ctx = ssl.create_default_context(cafile=certifi.where()) except ImportError: # All layers failed - no certificates available anywhere sys.exit( "\nERROR: No CA certificates found. Cannot connect to GitHub over SSL.\n\n" "Solutions you can explore:\n" " 1. pip install certifi\n" " 2. Alpine: apk add ca-certificates\n" " 3. Debian/Ubuntu: apt-get install ca-certificates\n\n" ) def logging_subprocess( popenargs, stdout_log_level=logging.DEBUG, stderr_log_level=logging.ERROR, **kwargs ): """ Variant of subprocess.call that accepts a logger instead of stdout/stderr, and logs stdout messages via logger.debug and stderr messages via logger.error. """ child = subprocess.Popen( popenargs, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs ) if sys.platform == "win32": logger.info( "Windows operating system detected - no subprocess logging will be returned" ) log_level = {child.stdout: stdout_log_level, child.stderr: stderr_log_level} def check_io(): if sys.platform == "win32": return ready_to_read = select.select([child.stdout, child.stderr], [], [], 1000)[0] for io in ready_to_read: line = io.readline() if not logger: continue if not (io == child.stderr and not line): logger.log(log_level[io], line[:-1]) # keep checking stdout/stderr until the child exits while child.poll() is None: check_io() check_io() # check again to catch anything after the process exits rc = child.wait() if rc != 0: print("{} returned {}:".format(popenargs[0], rc), file=sys.stderr) print("\t", " ".join(popenargs), file=sys.stderr) return rc def mkdir_p(*args): for path in args: try: os.makedirs(path) except OSError as exc: # Python >2.5 if exc.errno == errno.EEXIST and os.path.isdir(path): pass else: raise def mask_password(url, secret="*****"): parsed = urlparse(url) if not parsed.password: return url elif parsed.password == "x-oauth-basic": return url.replace(parsed.username, secret) return url.replace(parsed.password, secret) def non_negative_int(value): """Argparse type validator for non-negative integers.""" try: ivalue = int(value) except ValueError: raise argparse.ArgumentTypeError(f"'{value}' is not a valid integer") if ivalue < 0: raise argparse.ArgumentTypeError(f"{value} must be 0 or greater") return ivalue def parse_args(args=None): parser = argparse.ArgumentParser(description="Backup a github account") parser.add_argument("user", metavar="USER", type=str, help="github username") parser.add_argument( "-t", "--token", dest="token_classic", help="personal access, OAuth, or JSON Web token, or path to token (file://...)", ) # noqa parser.add_argument( "-f", "--token-fine", dest="token_fine", help="fine-grained personal access token (github_pat_....), or path to token (file://...)", ) # noqa parser.add_argument( "--token-from-gh", action="store_true", dest="token_from_gh", help="read token from GitHub CLI (gh auth token)", ) parser.add_argument( "-q", "--quiet", action="store_true", dest="quiet", help="supress log messages less severe than warning, e.g. info", ) parser.add_argument( "--as-app", action="store_true", dest="as_app", help="authenticate as github app instead of as a user.", ) parser.add_argument( "-o", "--output-directory", default=".", dest="output_directory", help="directory at which to backup the repositories", ) parser.add_argument( "-l", "--log-level", default="info", dest="log_level", help="log level to use (default: info, possible levels: debug, info, warning, error, critical)", ) parser.add_argument( "-i", "--incremental", action="store_true", dest="incremental", help="incremental backup", ) parser.add_argument( "--incremental-by-files", action="store_true", dest="incremental_by_files", help="incremental backup based on modification date of files", ) parser.add_argument( "--starred", action="store_true", dest="include_starred", help="include JSON output of starred repositories in backup", ) parser.add_argument( "--all-starred", action="store_true", dest="all_starred", help="include starred repositories in backup [*]", ) parser.add_argument( "--starred-skip-size-over", type=int, metavar="MB", dest="starred_skip_size_over", help="skip starred repositories larger than this size in MB", ) parser.add_argument( "--watched", action="store_true", dest="include_watched", help="include JSON output of watched repositories in backup", ) parser.add_argument( "--followers", action="store_true", dest="include_followers", help="include JSON output of followers in backup", ) parser.add_argument( "--following", action="store_true", dest="include_following", help="include JSON output of following users in backup", ) parser.add_argument( "--all", action="store_true", dest="include_everything", help="include everything in backup (not including [*])", ) parser.add_argument( "--issues", action="store_true", dest="include_issues", help="include issues in backup", ) parser.add_argument( "--issue-comments", action="store_true", dest="include_issue_comments", help="include issue comments in backup", ) parser.add_argument( "--issue-events", action="store_true", dest="include_issue_events", help="include issue events in backup", ) parser.add_argument( "--pulls", action="store_true", dest="include_pulls", help="include pull requests in backup", ) parser.add_argument( "--pull-comments", action="store_true", dest="include_pull_comments", help="include pull request review comments in backup", ) parser.add_argument( "--pull-reviews", action="store_true", dest="include_pull_reviews", help="include pull request reviews in backup", ) parser.add_argument( "--pull-commits", action="store_true", dest="include_pull_commits", help="include pull request commits in backup", ) parser.add_argument( "--pull-details", action="store_true", dest="include_pull_details", help="include more pull request details in backup [*]", ) parser.add_argument( "--labels", action="store_true", dest="include_labels", help="include labels in backup", ) parser.add_argument( "--hooks", action="store_true", dest="include_hooks", help="include hooks in backup (works only when authenticated)", ) # noqa parser.add_argument( "--milestones", action="store_true", dest="include_milestones", help="include milestones in backup", ) parser.add_argument( "--security-advisories", action="store_true", dest="include_security_advisories", help="include security advisories in backup", ) parser.add_argument( "--discussions", action="store_true", dest="include_discussions", help="include discussions in backup", ) parser.add_argument( "--repositories", action="store_true", dest="include_repository", help="include repository clone in backup", ) parser.add_argument( "--bare", action="store_true", dest="bare_clone", help="clone bare repositories" ) parser.add_argument( "--no-prune", action="store_true", dest="no_prune", help="disable prune option for git fetch", ) parser.add_argument( "--lfs", action="store_true", dest="lfs_clone", help="clone LFS repositories (requires Git LFS to be installed, https://git-lfs.github.com) [*]", ) parser.add_argument( "--wikis", action="store_true", dest="include_wiki", help="include wiki clone in backup", ) parser.add_argument( "--gists", action="store_true", dest="include_gists", help="include gists in backup [*]", ) parser.add_argument( "--starred-gists", action="store_true", dest="include_starred_gists", help="include starred gists in backup [*]", ) parser.add_argument( "--skip-archived", action="store_true", dest="skip_archived", help="skip project if it is archived", ) parser.add_argument( "--skip-existing", action="store_true", dest="skip_existing", help="skip project if a backup directory exists", ) parser.add_argument( "-L", "--languages", dest="languages", help="only allow these languages", nargs="*", ) parser.add_argument( "-N", "--name-regex", dest="name_regex", help="python regex to match names against", ) parser.add_argument( "-H", "--github-host", dest="github_host", help="GitHub Enterprise hostname" ) parser.add_argument( "-O", "--organization", action="store_true", dest="organization", help="whether or not this is an organization user", ) parser.add_argument( "-R", "--repository", dest="repository", help="name of repository to limit backup to", ) parser.add_argument( "-P", "--private", action="store_true", dest="private", help="include private repositories [*]", ) parser.add_argument( "-F", "--fork", action="store_true", dest="fork", help="include forked repositories [*]", ) parser.add_argument( "--prefer-ssh", action="store_true", help="Clone repositories using SSH instead of HTTPS", ) parser.add_argument( "-v", "--version", action="version", version="%(prog)s " + VERSION ) parser.add_argument( "--keychain-name", dest="osx_keychain_item_name", help="OSX ONLY: name field of password item in OSX keychain that holds the personal access or OAuth token", ) parser.add_argument( "--keychain-account", dest="osx_keychain_item_account", help="OSX ONLY: account field of password item in OSX keychain that holds the personal access or OAuth token", ) parser.add_argument( "--releases", action="store_true", dest="include_releases", help="include release information, not including assets or binaries", ) parser.add_argument( "--latest-releases", type=int, default=0, dest="number_of_latest_releases", help="include certain number of the latest releases; only applies if including releases", ) parser.add_argument( "--skip-prerelease", action="store_true", dest="skip_prerelease", help="skip prerelease and draft versions; only applies if including releases", ) parser.add_argument( "--assets", action="store_true", dest="include_assets", help="include assets alongside release information; only applies if including releases", ) parser.add_argument( "--skip-assets-on", dest="skip_assets_on", nargs="*", help="skip asset downloads for these repositories", ) parser.add_argument( "--attachments", action="store_true", dest="include_attachments", help="download user-attachments from issues, pull requests, and discussions", ) parser.add_argument( "--throttle-limit", dest="throttle_limit", type=int, default=0, help="start throttling of GitHub API requests after this amount of API requests remain", ) parser.add_argument( "--throttle-pause", dest="throttle_pause", type=float, default=30.0, help="wait this amount of seconds when API request throttling is active (default: 30.0, requires --throttle-limit to be set)", ) parser.add_argument( "--exclude", dest="exclude", help="names of repositories to exclude", nargs="*" ) parser.add_argument( "--retries", dest="max_retries", type=non_negative_int, default=5, help="maximum number of retries for API calls (default: 5)", ) return parser.parse_args(args) def get_auth(args, encode=True, for_git_cli=False): auth = None if args.osx_keychain_item_name: if not args.osx_keychain_item_account: raise Exception( "You must specify both name and account fields for osx keychain password items" ) else: if platform.system() != "Darwin": raise Exception("Keychain arguments are only supported on Mac OSX") try: with open(os.devnull, "w") as devnull: token = subprocess.check_output( [ "security", "find-generic-password", "-s", args.osx_keychain_item_name, "-a", args.osx_keychain_item_account, "-w", ], stderr=devnull, ).strip() token = token.decode("utf-8") auth = token + ":" + "x-oauth-basic" except subprocess.SubprocessError: raise Exception( "No password item matching the provided name and account could be found in the osx keychain." ) elif args.osx_keychain_item_account: raise Exception( "You must specify both name and account fields for osx keychain password items" ) elif args.token_fine: if args.token_fine.startswith(FILE_URI_PREFIX): args.token_fine = read_file_contents(args.token_fine) if args.token_fine.startswith("github_pat_"): auth = args.token_fine else: raise Exception( "Fine-grained token supplied does not look like a GitHub PAT" ) elif args.token_classic or args.token_from_gh: if args.token_from_gh: if args.as_app: raise Exception( "--token-from-gh cannot be used with --as-app; provide the app token with --token instead" ) args.token_classic = read_token_from_gh_cli(args) elif args.token_classic.startswith(FILE_URI_PREFIX): args.token_classic = read_file_contents(args.token_classic) if not args.as_app: auth = args.token_classic + ":" + "x-oauth-basic" else: if not for_git_cli: auth = args.token_classic else: auth = "x-access-token:" + args.token_classic if not auth: return None if not encode or args.token_fine is not None: return auth return base64.b64encode(auth.encode("ascii")) def get_github_api_host(args): if args.github_host: host = args.github_host + "/api/v3" else: host = "api.github.com" return host def get_github_graphql_url(args): if args.github_host: return "https://{0}/api/graphql".format(args.github_host) return "https://api.github.com/graphql" def get_graphql_auth(args): auth = get_auth(args, encode=False) if not auth: return None # GraphQL expects a bearer token. Classic tokens and keychain tokens use # "token:x-oauth-basic" for REST Basic auth, so strip the synthetic # password before sending the GraphQL Authorization header. if ( not getattr(args, "as_app", False) and getattr(args, "token_fine", None) is None and ":" in auth ): auth = auth.split(":", 1)[0] return auth def get_github_host(args): if args.github_host: host = args.github_host else: host = "github.com" return host def read_file_contents(file_uri): return open(file_uri[len(FILE_URI_PREFIX) :], "rt").readline().strip() def read_token_from_gh_cli(args): cached_token = getattr(args, "_token_from_gh_value", None) if cached_token: return cached_token command = ["gh", "auth", "token"] if args.github_host: command.extend(["--hostname", get_github_host(args)]) try: token = subprocess.check_output(command, stderr=subprocess.PIPE).decode( "utf-8" ).strip() except FileNotFoundError: raise Exception( "Unable to read token from GitHub CLI: 'gh' executable not found" ) except subprocess.CalledProcessError as e: stderr = e.stderr.decode("utf-8", errors="replace").strip() if stderr: raise Exception( "Unable to read token from GitHub CLI: {0}".format(stderr) ) raise Exception("Unable to read token from GitHub CLI") if not token: raise Exception("Unable to read token from GitHub CLI: token was empty") args._token_from_gh_value = token return token def get_github_repo_url(args, repository): if repository.get("is_gist"): if args.prefer_ssh: # The git_pull_url value is always https for gists, so we need to transform it to ssh form repo_url = re.sub( r"^https?:\/\/(.+)\/(.+)\.git$", r"git@\1:\2.git", repository["git_pull_url"], ) repo_url = re.sub( r"^git@gist\.", "git@", repo_url ) # strip gist subdomain for better hostkey compatibility else: repo_url = repository["git_pull_url"] return repo_url if args.prefer_ssh: return repository["ssh_url"] auth = get_auth(args, encode=False, for_git_cli=True) if auth: repo_url = "https://{0}@{1}/{2}/{3}.git".format( auth if args.token_fine is None else "oauth2:" + auth, get_github_host(args), repository["owner"]["login"], repository["name"], ) else: repo_url = repository["clone_url"] return repo_url def calculate_retry_delay(attempt, headers): """Calculate delay before next retry with exponential backoff.""" # Respect retry-after header if present if retry_after := headers.get("retry-after"): return int(retry_after) # Respect rate limit reset time if int(headers.get("x-ratelimit-remaining", 1)) < 1: reset_time = int(headers.get("x-ratelimit-reset", 0)) return max(10, reset_time - calendar.timegm(time.gmtime())) # Exponential backoff with jitter for server errors (1s base, 120s max) delay = min(1.0 * (2**attempt), 120.0) return delay + random.uniform(0, delay * 0.1) def retrieve_data(args, template, query_args=None, paginated=True, lazy=False): """ Fetch the data from GitHub API. Handle both single requests and pagination. Returns a list by default, or a generator when lazy=True so callers can stop before fetching every page. Handles throttling, retries, read errors, and DMCA takedowns. """ query_args = query_args or {} auth = get_auth(args, encode=not args.as_app) per_page = 100 def _extract_next_page_url(link_header): for link in link_header.split(","): if 'rel="next"' in link: return link[link.find("<") + 1 : link.find(">")] return None def fetch_all() -> Generator[dict, None, None]: def _extract_legal_url(response_body_bytes): """Extract DMCA/legal notice URL from GitHub API error response body.""" try: data = json.loads(response_body_bytes.decode("utf-8")) return data.get("block", {}).get("html_url") except Exception: return None next_url = None while True: # FIRST: Fetch response for attempt in range(args.max_retries + 1): request = _construct_request( per_page=per_page if paginated else None, query_args=query_args, template=next_url or template, auth=auth, as_app=args.as_app, fine=args.token_fine is not None, ) try: http_response = make_request_with_retry( request, auth, args.max_retries ) except HTTPError as exc: if exc.code == 451: legal_url = _extract_legal_url(exc.read()) raise RepositoryUnavailableError( f"Repository unavailable due to legal reasons (HTTP {exc.code})", legal_url=legal_url, ) elif exc.code == 403: # Rate-limit 403s (x-ratelimit-remaining=0) are retried # by make_request_with_retry — re-raise if exhausted. if int(exc.headers.get("x-ratelimit-remaining", 1)) < 1: raise # Only convert to RepositoryUnavailableError if GitHub # indicates a TOS/DMCA block (response contains "block" # key). Other 403s (permissions, scopes) should propagate. body = exc.read() try: data = json.loads(body.decode("utf-8")) except Exception: data = {} if "block" in data: raise RepositoryUnavailableError( "Repository access blocked (HTTP 403)", legal_url=data.get("block", {}).get("html_url"), ) raise else: raise # urlopen raises HTTPError for non-2xx, so only success gets here. # Guard against unexpected status codes from proxies, future Python # changes, or other edge cases we haven't considered. status = http_response.getcode() if status != 200: raise Exception( f"Unexpected HTTP {status} from {next_url or template} " f"(expected non-2xx to raise HTTPError)" ) # Parse JSON response try: response = json.loads(http_response.read().decode("utf-8")) break # Exit retry loop and handle the data returned except ( IncompleteRead, json.decoder.JSONDecodeError, TimeoutError, ) as e: logger.warning(f"{type(e).__name__} reading response") if attempt < args.max_retries: delay = calculate_retry_delay(attempt, {}) logger.warning( f"Retrying read in {delay:.1f}s (attempt {attempt + 1}/{args.max_retries + 1})" ) time.sleep(delay) continue # Next retry attempt else: logger.error( f"Failed to read response after {args.max_retries + 1} attempts for {next_url or template}" ) raise Exception( f"Failed to read response after {args.max_retries + 1} attempts for {next_url or template}" ) # SECOND: Process and paginate # Pause before next request if rate limit is low if ( remaining := int(http_response.headers.get("x-ratelimit-remaining", 0)) ) <= (args.throttle_limit or 0): if args.throttle_limit: logger.info( f"Throttling: {remaining} requests left, pausing {args.throttle_pause}s" ) time.sleep(args.throttle_pause) # Yield results if isinstance(response, list): yield from response elif isinstance(response, dict): yield response # Check for more pages if not paginated or not ( next_url := _extract_next_page_url( http_response.headers.get("Link", "") ) ): break # No more data if lazy: return fetch_all() return list(fetch_all()) def retrieve_graphql_data(args, query, variables=None, log_context=None): """Fetch data from GitHub's GraphQL API.""" auth = get_graphql_auth(args) if not auth: raise Exception("GitHub GraphQL API requires authentication") variables = variables or {} payload = json.dumps( {"query": query, "variables": variables}, ensure_ascii=False ).encode("utf-8") endpoint = get_github_graphql_url(args) for attempt in range(args.max_retries + 1): request = Request(endpoint, data=payload, method="POST") request.add_header("Accept", "application/json") request.add_header("Content-Type", "application/json") request.add_header("Authorization", "bearer " + auth) log_url = endpoint if log_context: log_url = "{0} ({1})".format(log_url, log_context) logger.info("Requesting {0}".format(log_url)) http_response = make_request_with_retry(request, auth, args.max_retries) status = http_response.getcode() if status != 200: raise Exception( f"Unexpected HTTP {status} from {endpoint} " f"(expected non-2xx to raise HTTPError)" ) try: response = json.loads(http_response.read().decode("utf-8")) except (IncompleteRead, json.decoder.JSONDecodeError, TimeoutError) as e: logger.warning(f"{type(e).__name__} reading GraphQL response") if attempt < args.max_retries: delay = calculate_retry_delay(attempt, {}) logger.warning( f"Retrying GraphQL read in {delay:.1f}s " f"(attempt {attempt + 1}/{args.max_retries + 1})" ) time.sleep(delay) continue raise Exception( f"Failed to read GraphQL response after {args.max_retries + 1} " f"attempts for {endpoint}" ) if ( remaining := int(http_response.headers.get("x-ratelimit-remaining", 0)) ) <= (args.throttle_limit or 0): if args.throttle_limit: logger.info( f"Throttling: {remaining} requests left, pausing {args.throttle_pause}s" ) time.sleep(args.throttle_pause) errors = response.get("errors") or [] if errors: if any(error.get("type") == "RATE_LIMITED" for error in errors): if attempt < args.max_retries: delay = calculate_retry_delay(attempt, http_response.headers) logger.warning( f"GraphQL rate limit hit, retrying in {delay:.1f}s " f"(attempt {attempt + 1}/{args.max_retries + 1})" ) time.sleep(delay) continue messages = "; ".join( error.get("message", str(error)) for error in errors ) raise Exception("GraphQL Error: {0}".format(messages)) return response.get("data", {}) raise Exception( f"GraphQL request failed after {args.max_retries + 1} attempts" ) # pragma: no cover def make_request_with_retry(request, auth, max_retries=5): """Make HTTP request with automatic retry for transient errors.""" def is_retryable_status(status_code, headers): # Server errors are always retryable if status_code in (500, 502, 503, 504): return True # Rate limit (403/429) is retryable if limit exhausted if status_code in (403, 429): return int(headers.get("x-ratelimit-remaining", 1)) < 1 return False for attempt in range(max_retries + 1): try: return urlopen(request, context=https_ctx) except HTTPError as exc: # HTTPError can be used as a response-like object if not is_retryable_status(exc.code, exc.headers): logger.error( f"API Error: {exc.code} {exc.reason} for {request.full_url}" ) raise # Non-retryable error if attempt >= max_retries: logger.error( f"HTTP {exc.code} failed after {max_retries + 1} attempts for {request.full_url}" ) raise delay = calculate_retry_delay(attempt, exc.headers) logger.warning( f"HTTP {exc.code} ({exc.reason}), retrying in {delay:.1f}s " f"(attempt {attempt + 1}/{max_retries + 1}) for {request.full_url}" ) if auth is None and exc.code in (403, 429): logger.info("Hint: Authenticate to raise your GitHub rate limit") time.sleep(delay) except (URLError, socket.error) as e: if attempt >= max_retries: logger.error( f"Connection error failed after {max_retries + 1} attempts: {e} for {request.full_url}" ) raise delay = calculate_retry_delay(attempt, {}) logger.warning( f"Connection error: {e}, retrying in {delay:.1f}s " f"(attempt {attempt + 1}/{max_retries + 1}) for {request.full_url}" ) time.sleep(delay) raise Exception( f"Request failed after {max_retries + 1} attempts" ) # pragma: no cover def _construct_request(per_page, query_args, template, auth, as_app=None, fine=False): # If template is already a full URL with query params (from Link header), use it directly if "?" in template and template.startswith("http"): request_url = template # Extract query string for logging querystring = template.split("?", 1)[1] else: # Build URL with query parameters all_query_args = {} if per_page: all_query_args["per_page"] = per_page if query_args: all_query_args.update(query_args) request_url = template if all_query_args: querystring = urlencode(all_query_args) request_url = template + "?" + querystring else: querystring = "" request = Request(request_url) if auth is not None: if not as_app: if fine: request.add_header("Authorization", "token " + auth) else: request.add_header("Authorization", "Basic ".encode("ascii") + auth) else: auth = auth.encode("ascii") request.add_header("Authorization", "token ".encode("ascii") + auth) log_url = template if "?" not in template else template.split("?")[0] if querystring: log_url += "?" + querystring logger.info("Requesting {}".format(log_url)) return request class S3HTTPRedirectHandler(HTTPRedirectHandler): """ A subclassed redirect handler for downloading Github assets from S3. urllib will add the Authorization header to the redirected request to S3, which will result in a 400, so we should remove said header on redirect. """ def redirect_request(self, req, fp, code, msg, headers, newurl): request = super(S3HTTPRedirectHandler, self).redirect_request( req, fp, code, msg, headers, newurl ) # Only delete Authorization header if it exists (attachments may not have it) if "Authorization" in request.headers: del request.headers["Authorization"] return request def download_file(url, path, auth, as_app=False, fine=False): # Skip downloading release assets if they already exist on disk so we don't redownload on every sync if os.path.exists(path): return request = _construct_request( per_page=None, query_args={}, template=url, auth=auth, as_app=as_app, fine=fine, ) request.add_header("Accept", "application/octet-stream") opener = build_opener(S3HTTPRedirectHandler) try: response = opener.open(request) chunk_size = 16 * 1024 with open(path, "wb") as f: while True: chunk = response.read(chunk_size) if not chunk: break f.write(chunk) except HTTPError as exc: # Gracefully handle 404 responses (and others) when downloading from S3 logger.warning( "Skipping download of asset {0} due to HTTPError: {1}".format( url, exc.reason ) ) except URLError as e: # Gracefully handle other URL errors logger.warning( "Skipping download of asset {0} due to URLError: {1}".format(url, e.reason) ) except socket.error as e: # Gracefully handle socket errors # TODO: Implement retry logic logger.warning( "Skipping download of asset {0} due to socker error: {1}".format( url, e.strerror ) ) def download_attachment_file(url, path, auth, as_app=False, fine=False): """Download attachment file directly (not via GitHub API). Similar to download_file() but for direct file URLs, not API endpoints. Attachment URLs (user-images, user-attachments) are direct downloads, not API endpoints, so we skip _construct_request() which adds API params. URL Format Support & Authentication Requirements: | URL Format | Auth Required | Notes | |----------------------------------------------|---------------|--------------------------| | github.com/user-attachments/assets/* | Private only | Modern format (2024+) | | github.com/user-attachments/files/* | Private only | Modern format (2024+) | | user-images.githubusercontent.com/* | No (public) | Legacy CDN, all eras | | private-user-images.githubusercontent.com/* | JWT in URL | Legacy private (5min) | | github.com/{owner}/{repo}/files/* | Repo filter | Old repo files | - Modern user-attachments: Requires GitHub token auth for private repos - Legacy public CDN: No auth needed/accepted (returns 400 with auth header) - Legacy private CDN: Uses JWT token embedded in URL, no GitHub token needed - Repo files: Filtered to current repository only during extraction Returns dict with metadata: - success: bool - http_status: int (200, 404, etc.) - content_type: str or None - original_filename: str or None (from Content-Disposition) - size_bytes: int or None - error: str or None """ import re from datetime import datetime, timezone metadata = { "url": url, "success": False, "http_status": None, "content_type": None, "original_filename": None, "size_bytes": None, "downloaded_at": datetime.now(timezone.utc).isoformat(), "error": None, } # Create simple request (no API query params) request = Request(url) request.add_header("Accept", "application/octet-stream") # Add authentication header only for modern github.com/user-attachments URLs # Legacy CDN URLs (user-images.githubusercontent.com) are public and don't need/accept auth # Private CDN URLs (private-user-images) use JWT tokens embedded in the URL if auth is not None and "github.com/user-attachments/" in url: if not as_app: if fine: # Fine-grained token: plain token with "token " prefix request.add_header("Authorization", "token " + auth) else: # Classic token: base64-encoded with "Basic " prefix request.add_header("Authorization", "Basic ".encode("ascii") + auth) else: # App authentication auth = auth.encode("ascii") request.add_header("Authorization", "token ".encode("ascii") + auth) # Reuse S3HTTPRedirectHandler from download_file() opener = build_opener(S3HTTPRedirectHandler) temp_path = path + ".temp" try: response = opener.open(request) metadata["http_status"] = response.getcode() # Extract Content-Type content_type = response.headers.get("Content-Type", "").split(";")[0].strip() if content_type: metadata["content_type"] = content_type # Extract original filename from Content-Disposition header # Format: attachment; filename=example.mov or attachment;filename="example.mov" content_disposition = response.headers.get("Content-Disposition", "") if content_disposition: # Match: filename=something or filename="something" or filename*=UTF-8''something match = re.search(r'filename\*?=["\']?([^"\';\r\n]+)', content_disposition) if match: original_filename = match.group(1).strip() # Handle RFC 5987 encoding: filename*=UTF-8''example.mov if "UTF-8''" in original_filename: original_filename = original_filename.split("UTF-8''")[1] metadata["original_filename"] = original_filename # Fallback: Extract filename from final URL after redirects # This handles user-attachments/assets URLs which redirect to S3 with filename.ext if not metadata["original_filename"]: from urllib.parse import urlparse, unquote final_url = response.geturl() parsed = urlparse(final_url) # Get filename from path (last component before query string) path_parts = parsed.path.split("/") if path_parts: # URL might be encoded, decode it filename_from_url = unquote(path_parts[-1]) # Only use if it has an extension if "." in filename_from_url: metadata["original_filename"] = filename_from_url # Download file to temporary location chunk_size = 16 * 1024 bytes_downloaded = 0 with open(temp_path, "wb") as f: while True: chunk = response.read(chunk_size) if not chunk: break f.write(chunk) bytes_downloaded += len(chunk) # Atomic rename to final location os.replace(temp_path, path) metadata["size_bytes"] = bytes_downloaded metadata["success"] = True except HTTPError as exc: metadata["http_status"] = exc.code metadata["error"] = str(exc.reason) logger.warning( "Skipping download of attachment {0} due to HTTPError: {1}".format( url, exc.reason ) ) except URLError as e: metadata["error"] = str(e.reason) logger.warning( "Skipping download of attachment {0} due to URLError: {1}".format( url, e.reason ) ) except socket.error as e: metadata["error"] = str(e.strerror) if hasattr(e, "strerror") else str(e) logger.warning( "Skipping download of attachment {0} due to socket error: {1}".format( url, e.strerror if hasattr(e, "strerror") else str(e) ) ) except Exception as e: metadata["error"] = str(e) logger.warning( "Skipping download of attachment {0} due to error: {1}".format(url, str(e)) ) # Clean up temp file if it was partially created if os.path.exists(temp_path): try: os.remove(temp_path) except Exception: pass return metadata def get_jwt_signed_url_via_markdown_api(url, token, repo_context): """Convert a user-attachments/assets URL to a JWT-signed URL via Markdown API. GitHub's Markdown API renders image URLs and returns HTML containing JWT-signed private-user-images.githubusercontent.com URLs that work without token authentication. This is a workaround for issue #477 where fine-grained PATs cannot download user-attachments URLs from private repos directly. Limitations: - Only works for /assets/ URLs (images) - Does NOT work for /files/ URLs (PDFs, text files, etc.) - JWT URLs expire after ~5 minutes Args: url: The github.com/user-attachments/assets/UUID URL token: Raw fine-grained PAT (github_pat_...) repo_context: Repository context as "owner/repo" Returns: str: JWT-signed URL from private-user-images.githubusercontent.com None: If conversion fails """ try: payload = json.dumps( {"text": f"![img]({url})", "mode": "gfm", "context": repo_context} ).encode("utf-8") request = Request("https://api.github.com/markdown", data=payload, method="POST") request.add_header("Authorization", f"token {token}") request.add_header("Content-Type", "application/json") request.add_header("Accept", "application/vnd.github+json") html = urlopen(request, context=https_ctx, timeout=30).read().decode("utf-8") # Parse JWT-signed URL from HTML response # Format: if match := re.search( r'src="(https://private-user-images\.githubusercontent\.com/[^"]+)"', html ): jwt_url = match.group(1) logger.debug("Converted attachment URL to JWT-signed URL via Markdown API") return jwt_url logger.debug("Markdown API response did not contain JWT-signed URL") return None except HTTPError as e: logger.debug( "Markdown API request failed with HTTP {0}: {1}".format(e.code, e.reason) ) return None except Exception as e: logger.debug("Markdown API request failed: {0}".format(str(e))) return None def extract_attachment_urls(item_data, issue_number=None, repository_full_name=None): """Extract GitHub-hosted attachment URLs from issue/PR/discussion body and comments. What qualifies as an attachment? There is no "attachment" concept in the GitHub API - it's a user behavior pattern we've identified through analysis of real-world repositories. We define attachments as: - User-uploaded files hosted on GitHub's CDN domains - Found outside of code blocks (not examples/documentation) - Matches known GitHub attachment URL patterns This intentionally captures bare URLs pasted by users, not just markdown/HTML syntax. Some false positives (example URLs in documentation) may occur - these fail gracefully with HTTP 404 and are logged in the manifest. Supported URL formats: - Modern: github.com/user-attachments/{assets,files}/* - Legacy: user-images.githubusercontent.com/* (including private-user-images) - Repo files: github.com/{owner}/{repo}/files/* (filtered to current repo) - Repo assets: github.com/{owner}/{repo}/assets/* (filtered to current repo) Repository filtering (repo files/assets only): - Direct match: URL is for current repository → included - Redirect match: URL redirects to current repository → included (handles renames/transfers) - Different repo: URL is for different repository → excluded Code block filtering: - Removes fenced code blocks (```) and inline code (`) before extraction - Prevents extracting URLs from code examples and documentation snippets Args: item_data: Issue or PR data dict issue_number: Issue/PR number for logging repository_full_name: Full repository name (owner/repo) for filtering repo-scoped URLs """ import re urls = [] # Define all GitHub attachment patterns # Stop at markdown punctuation: whitespace, ), `, ", >, < # Trailing sentence punctuation (. ! ? , ; : ' ") is stripped in post-processing patterns = [ r'https://github\.com/user-attachments/(?:assets|files)/[^\s\)`"<>]+', # Modern r'https://(?:private-)?user-images\.githubusercontent\.com/[^\s\)`"<>]+', # Legacy CDN ] # Add repo-scoped patterns (will be filtered by repository later) # These patterns match ANY repo, then we filter to current repo with redirect checking repo_files_pattern = r'https://github\.com/[^/]+/[^/]+/files/\d+/[^\s\)`"<>]+' repo_assets_pattern = r'https://github\.com/[^/]+/[^/]+/assets/\d+/[^\s\)`"<>]+' patterns.append(repo_files_pattern) patterns.append(repo_assets_pattern) def clean_url(url): """Remove trailing sentence and markdown punctuation that's not part of the URL.""" return url.rstrip(".!?,;:'\")") def remove_code_blocks(text): """Remove markdown code blocks (fenced and inline) from text. This prevents extracting URLs from code examples like: - Fenced code blocks: ```code``` - Inline code: `code` """ # Remove fenced code blocks first (```...```) # DOTALL flag makes . match newlines text = re.sub(r"```.*?```", "", text, flags=re.DOTALL) # Remove inline code (`...`) # Non-greedy match between backticks text = re.sub(r"`[^`]*`", "", text) return text def is_repo_scoped_url(url): """Check if URL is a repo-scoped attachment (files or assets).""" return bool( re.match(r"https://github\.com/[^/]+/[^/]+/(?:files|assets)/\d+/", url) ) def check_redirect_to_current_repo(url, current_repo): """Check if URL redirects to current repository. Returns True if: - URL is already for current repo - URL redirects (301/302) to current repo (handles renames/transfers) Returns False otherwise (URL is for a different repo). """ # Extract owner/repo from URL match = re.match(r"https://github\.com/([^/]+)/([^/]+)/", url) if not match: return False url_owner, url_repo = match.groups() url_repo_full = f"{url_owner}/{url_repo}" # Direct match - no need to check redirect if url_repo_full.lower() == current_repo.lower(): return True # Different repo - check if it redirects to current repo # This handles repository transfers and renames try: import urllib.request import urllib.error # Make HEAD request with redirect following disabled # We need to manually handle redirects to see the Location header request = urllib.request.Request(url, method="HEAD") request.add_header("User-Agent", "python-github-backup") # Create opener that does NOT follow redirects class NoRedirectHandler(urllib.request.HTTPRedirectHandler): def redirect_request(self, req, fp, code, msg, headers, newurl): return None # Don't follow redirects opener = urllib.request.build_opener(NoRedirectHandler) try: _ = opener.open(request, timeout=10) # Got 200 - URL works as-is but for different repo return False except urllib.error.HTTPError as e: # Check if it's a redirect (301, 302, 307, 308) if e.code in (301, 302, 307, 308): location = e.headers.get("Location", "") # Check if redirect points to current repo if location: redirect_match = re.match( r"https://github\.com/([^/]+)/([^/]+)/", location ) if redirect_match: redirect_owner, redirect_repo = redirect_match.groups() redirect_repo_full = f"{redirect_owner}/{redirect_repo}" return redirect_repo_full.lower() == current_repo.lower() return False except Exception: # On any error (timeout, network issue, etc.), be conservative # and exclude the URL to avoid downloading from wrong repos return False def extract_from_text(text): text_cleaned = remove_code_blocks(text or "") for pattern in patterns: found_urls = re.findall(pattern, text_cleaned) urls.extend([clean_url(url) for url in found_urls]) def extract_from_comments(comments): for comment in comments: extract_from_text(comment.get("body") or "") # GitHub Discussions support one level of replies. Issues and pull # requests don't have reply_data, so this is a no-op for them. extract_from_comments(comment.get("reply_data") or []) # Extract from body extract_from_text(item_data.get("body") or "") # Extract from issue comments and discussion comments if "comment_data" in item_data: extract_from_comments(item_data["comment_data"]) # Extract from PR regular comments if "comment_regular_data" in item_data: extract_from_comments(item_data["comment_regular_data"]) regex_urls = list(set(urls)) # dedupe # Filter repo-scoped URLs to current repository only # This handles repository transfers/renames via redirect checking if repository_full_name: filtered_urls = [] for url in regex_urls: if is_repo_scoped_url(url): # Check if URL belongs to current repo (or redirects to it) if check_redirect_to_current_repo(url, repository_full_name): filtered_urls.append(url) # else: skip URLs from other repositories else: # Non-repo-scoped URLs (user-attachments, CDN) - always include filtered_urls.append(url) regex_urls = filtered_urls return regex_urls def get_attachment_filename(url): """Get filename from attachment URL, handling all GitHub formats. Formats: - github.com/user-attachments/assets/{uuid} → uuid (add extension later) - github.com/user-attachments/files/{id}/{filename} → filename - github.com/{owner}/{repo}/files/{id}/{filename} → filename - user-images.githubusercontent.com/{user}/{hash}.{ext} → hash.ext - private-user-images.githubusercontent.com/...?jwt=... → extract from path """ from urllib.parse import urlparse parsed = urlparse(url) path_parts = parsed.path.split("/") # Modern: /user-attachments/files/{id}/{filename} if "user-attachments/files" in parsed.path: return path_parts[-1] # Modern: /user-attachments/assets/{uuid} elif "user-attachments/assets" in parsed.path: return path_parts[-1] # extension added later via detect_and_add_extension # Repo files: /{owner}/{repo}/files/{id}/{filename} elif "/files/" in parsed.path and len(path_parts) >= 2: return path_parts[-1] # Legacy: user-images.githubusercontent.com/{user}/{hash-with-ext} elif "githubusercontent.com" in parsed.netloc: return path_parts[-1] # Already has extension usually # Fallback: use last path component return path_parts[-1] if path_parts[-1] else "unknown_attachment" def resolve_filename_collision(filepath): """Resolve filename collisions using counter suffix pattern. If filepath exists, returns a new filepath with counter suffix. Pattern: report.pdf → report_1.pdf → report_2.pdf Also protects against manifest.json collisions by treating it as reserved. Args: filepath: Full path to file that might exist Returns: filepath that doesn't collide (may be same as input if no collision) """ directory = os.path.dirname(filepath) filename = os.path.basename(filepath) # Protect manifest.json - it's a reserved filename if filename == "manifest.json": name, ext = os.path.splitext(filename) counter = 1 while True: new_filename = f"{name}_{counter}{ext}" new_filepath = os.path.join(directory, new_filename) if not os.path.exists(new_filepath): return new_filepath counter += 1 if not os.path.exists(filepath): return filepath name, ext = os.path.splitext(filename) counter = 1 while True: new_filename = f"{name}_{counter}{ext}" new_filepath = os.path.join(directory, new_filename) if not os.path.exists(new_filepath): return new_filepath counter += 1 def download_attachments( args, item_cwd, item_data, number, repository, item_type="issue" ): """Download user-attachments from issue/PR/discussion body and comments with manifest. Args: args: Command line arguments item_cwd: Working directory (issue_cwd, pulls_cwd, or discussion_cwd) item_data: Issue, PR, or discussion data dict number: Issue, PR, or discussion number repository: Repository dict item_type: "issue", "pull", or "discussion" for logging/manifest """ import json from datetime import datetime, timezone item_type_display = { "issue": "issue", "pull": "pull request", "discussion": "discussion", }.get(item_type, item_type) urls = extract_attachment_urls( item_data, issue_number=number, repository_full_name=repository["full_name"] ) if not urls: return attachments_dir = os.path.join(item_cwd, "attachments", str(number)) manifest_path = os.path.join(attachments_dir, "manifest.json") # Load existing manifest to prevent duplicate downloads existing_urls = set() existing_metadata = [] if os.path.exists(manifest_path): try: with open(manifest_path, "r") as f: existing_manifest = json.load(f) all_metadata = existing_manifest.get("attachments", []) # Only skip URLs that were successfully downloaded OR failed with permanent errors # Retry transient failures (5xx, timeouts, network errors) for item in all_metadata: if item.get("success"): existing_urls.add(item["url"]) else: # Check if this is a permanent failure (don't retry) or transient (retry) http_status = item.get("http_status") if http_status in [404, 410, 451]: # Permanent failures - don't retry existing_urls.add(item["url"]) # Transient failures (5xx, auth errors, timeouts) will be retried existing_metadata = all_metadata except (json.JSONDecodeError, IOError): # If manifest is corrupted, re-download everything logger.warning( "Corrupted manifest for {0} #{1}, will re-download".format( item_type_display, number ) ) existing_urls = set() existing_metadata = [] # Filter to only new URLs new_urls = [url for url in urls if url not in existing_urls] if not new_urls and existing_urls: logger.debug( "Skipping attachments for {0} #{1} (all {2} already downloaded)".format( item_type_display, number, len(urls) ) ) return if new_urls: logger.info( "Downloading {0} new attachment(s) for {1} #{2}".format( len(new_urls), item_type_display, number ) ) mkdir_p(item_cwd, attachments_dir) # Collect metadata for manifest (start with existing) attachment_metadata_list = existing_metadata[:] for url in new_urls: filename = get_attachment_filename(url) filepath = os.path.join(attachments_dir, filename) # Issue #477: Fine-grained PATs cannot download user-attachments/assets # from private repos directly (404). Use Markdown API workaround to get # a JWT-signed URL. Only works for /assets/ (images), not /files/. needs_jwt = ( args.token_fine is not None and repository.get("private", False) and "github.com/user-attachments/assets/" in url ) if not needs_jwt: # NORMAL download path metadata = download_attachment_file( url, filepath, get_auth(args, encode=not args.as_app), as_app=args.as_app, fine=args.token_fine is not None, ) elif jwt_url := get_jwt_signed_url_via_markdown_api( url, args.token_fine, repository["full_name"] ): # JWT needed and extracted, download via JWT metadata = download_attachment_file( jwt_url, filepath, auth=None, as_app=False, fine=False ) metadata["url"] = url # Apply back the original URL metadata["jwt_workaround"] = True else: # Markdown API workaround failed - skip download we know will fail metadata = { "url": url, "success": False, "skipped_at": datetime.now(timezone.utc).isoformat(), "error": "Fine-grained token cannot download private repo attachments. " "Markdown API workaround failed. Use --token-classic instead.", } logger.warning( "Skipping attachment {0}: {1}".format(url, metadata["error"]) ) # If download succeeded but we got an extension from Content-Disposition, # we may need to rename the file to add the extension if metadata["success"] and metadata.get("original_filename"): original_ext = os.path.splitext(metadata["original_filename"])[1] current_ext = os.path.splitext(filepath)[1] # Add extension if not present if original_ext and current_ext != original_ext: final_filepath = filepath + original_ext # Check for collision again with new extension final_filepath = resolve_filename_collision(final_filepath) logger.debug( "Adding extension {0} to {1}".format(original_ext, filepath) ) # Rename to add extension (already atomic from download) try: os.replace(filepath, final_filepath) metadata["saved_as"] = os.path.basename(final_filepath) except Exception as e: logger.warning( "Could not add extension to {0}: {1}".format(filepath, str(e)) ) metadata["saved_as"] = os.path.basename(filepath) else: metadata["saved_as"] = os.path.basename(filepath) elif metadata["success"]: metadata["saved_as"] = os.path.basename(filepath) else: metadata["saved_as"] = None attachment_metadata_list.append(metadata) # Write manifest if attachment_metadata_list: manifest = { "item_number": number, "item_type": item_type, "issue_number": number, "issue_type": item_type, "repository": ( f"{args.user}/{args.repository}" if hasattr(args, "repository") and args.repository else args.user ), "manifest_updated_at": datetime.now(timezone.utc).isoformat(), "attachments": attachment_metadata_list, } manifest_path = os.path.join(attachments_dir, "manifest.json") with open(manifest_path + ".temp", "w") as f: json.dump(manifest, f, indent=2) os.replace(manifest_path + ".temp", manifest_path) # Atomic write logger.debug( "Wrote manifest for {0} #{1}: {2} attachments".format( item_type_display, number, len(attachment_metadata_list) ) ) def get_authenticated_user(args): template = "https://{0}/user".format(get_github_api_host(args)) data = retrieve_data(args, template, paginated=False) return data[0] def check_git_lfs_install(): exit_code = subprocess.call(["git", "lfs", "version"]) if exit_code != 0: raise Exception( "The argument --lfs requires you to have Git LFS installed.\nYou can get it from https://git-lfs.github.com." ) def retrieve_repositories(args, authenticated_user): logger.info("Retrieving repositories") paginated = True if args.user == authenticated_user["login"]: # we must use the /user/repos API to be able to access private repos template = "https://{0}/user/repos".format(get_github_api_host(args)) else: if args.private and not args.organization: logger.warning( "Authenticated user is different from user being backed up, thus private repositories cannot be accessed" ) template = "https://{0}/users/{1}/repos".format( get_github_api_host(args), args.user ) if args.organization: template = "https://{0}/orgs/{1}/repos".format( get_github_api_host(args), args.user ) if args.repository: if "/" in args.repository: repo_path = args.repository else: repo_path = "{0}/{1}".format(args.user, args.repository) paginated = False template = "https://{0}/repos/{1}".format(get_github_api_host(args), repo_path) try: repos = retrieve_data(args, template, paginated=paginated) except RepositoryUnavailableError as e: logger.warning(f"Repository is unavailable: {e}") if e.legal_url: logger.warning(f"Legal notice: {e.legal_url}") return [] if args.all_starred: starred_template = "https://{0}/users/{1}/starred".format( get_github_api_host(args), args.user ) starred_repos = retrieve_data(args, starred_template) # flag each repo as starred for downstream processing for item in starred_repos: item.update({"is_starred": True}) repos.extend(starred_repos) if args.include_gists: gists_template = "https://{0}/users/{1}/gists".format( get_github_api_host(args), args.user ) gists = retrieve_data(args, gists_template) # flag each repo as a gist for downstream processing for item in gists: item.update({"is_gist": True}) repos.extend(gists) if args.include_starred_gists: if ( not authenticated_user.get("login") or args.user.lower() != authenticated_user["login"].lower() ): logger.warning( "Cannot retrieve starred gists for '%s'. GitHub only allows access to the authenticated user's starred gists.", args.user, ) else: starred_gists_template = "https://{0}/gists/starred".format( get_github_api_host(args) ) starred_gists = retrieve_data(args, starred_gists_template) # flag each repo as a starred gist for downstream processing for item in starred_gists: item.update({"is_gist": True, "is_starred": True}) repos.extend(starred_gists) return repos def filter_repositories(args, unfiltered_repositories): if args.repository: return unfiltered_repositories logger.info("Filtering repositories") repositories = [] for r in unfiltered_repositories: # gists can be anonymous, so need to safely check owner # Use case-insensitive comparison to match GitHub's case-insensitive username behavior owner_login = r.get("owner", {}).get("login", "") if owner_login.lower() == args.user.lower() or r.get("is_starred"): repositories.append(r) name_regex = None if args.name_regex: name_regex = re.compile(args.name_regex) languages = None if args.languages: languages = [x.lower() for x in args.languages] if not args.fork: repositories = [r for r in repositories if not r.get("fork")] if not args.private: repositories = [ r for r in repositories if not r.get("private") or r.get("public") ] if languages: repositories = [ r for r in repositories if r.get("language") and r.get("language").lower() in languages ] # noqa if name_regex: repositories = [ r for r in repositories if "name" not in r or name_regex.match(r["name"]) ] if args.skip_archived: repositories = [r for r in repositories if not r.get("archived")] if args.starred_skip_size_over is not None: if args.starred_skip_size_over <= 0: logger.warning("--starred-skip-size-over must be greater than 0, ignoring") else: size_limit_kb = args.starred_skip_size_over * 1024 filtered = [] for r in repositories: if r.get("is_starred") and r.get("size", 0) > size_limit_kb: size_mb = r.get("size", 0) / 1024 logger.info( "Skipping starred repo {0} ({1:.0f} MB) due to --starred-skip-size-over {2}".format( r.get("full_name", r.get("name")), size_mb, args.starred_skip_size_over, ) ) else: filtered.append(r) repositories = filtered if args.exclude: repositories = [ r for r in repositories if "name" not in r or r["name"] not in args.exclude ] return repositories INCREMENTAL_LAST_UPDATE_FILENAME = "last_update" INCREMENTAL_RESOURCE_DIRECTORIES = ("issues", "pulls") def get_repository_checkpoint_time(repository): timestamps = [ timestamp for timestamp in (repository.get("updated_at"), repository.get("pushed_at")) if timestamp ] if timestamps: return max(timestamps) return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.localtime()) def resource_backup_exists(resource_cwd): if not os.path.isdir(resource_cwd): return False ignored_names = { INCREMENTAL_LAST_UPDATE_FILENAME, PULL_REVIEWS_LAST_UPDATE_FILENAME, } for name in os.listdir(resource_cwd): if name in ignored_names or name.endswith(".temp"): continue return True return False def read_legacy_last_update(args, output_directory): if not args.incremental: return None, None last_update_path = os.path.join(output_directory, INCREMENTAL_LAST_UPDATE_FILENAME) if os.path.exists(last_update_path): return last_update_path, open(last_update_path).read().strip() return last_update_path, None def read_resource_last_update(args, resource_cwd, legacy_last_update=None): if not args.incremental: return None last_update_path = os.path.join(resource_cwd, INCREMENTAL_LAST_UPDATE_FILENAME) if os.path.exists(last_update_path): return open(last_update_path).read().strip() if legacy_last_update and resource_backup_exists(resource_cwd): return legacy_last_update return None def write_resource_last_update(args, resource_cwd, repository): if not args.incremental: return mkdir_p(resource_cwd) last_update_path = os.path.join(resource_cwd, INCREMENTAL_LAST_UPDATE_FILENAME) open(last_update_path, "w").write(get_repository_checkpoint_time(repository)) def iter_incremental_resource_dirs(output_directory): repositories_dir = os.path.join(output_directory, "repositories") if os.path.isdir(repositories_dir): for repository_name in os.listdir(repositories_dir): repo_cwd = os.path.join(repositories_dir, repository_name) if not os.path.isdir(repo_cwd): continue for resource_name in INCREMENTAL_RESOURCE_DIRECTORIES: yield os.path.join(repo_cwd, resource_name) starred_dir = os.path.join(output_directory, "starred") if os.path.isdir(starred_dir): for owner_name in os.listdir(starred_dir): owner_cwd = os.path.join(starred_dir, owner_name) if not os.path.isdir(owner_cwd): continue for repository_name in os.listdir(owner_cwd): repo_cwd = os.path.join(owner_cwd, repository_name) if not os.path.isdir(repo_cwd): continue for resource_name in INCREMENTAL_RESOURCE_DIRECTORIES: yield os.path.join(repo_cwd, resource_name) def has_unmigrated_incremental_resources(output_directory): for resource_cwd in iter_incremental_resource_dirs(output_directory): last_update_path = os.path.join( resource_cwd, INCREMENTAL_LAST_UPDATE_FILENAME ) if resource_backup_exists(resource_cwd) and not os.path.exists( last_update_path ): return True return False def remove_legacy_last_update_if_migrated( args, output_directory, legacy_last_update_path ): if not args.incremental or not legacy_last_update_path: return if not os.path.exists(legacy_last_update_path): return if has_unmigrated_incremental_resources(output_directory): logger.info( "Keeping legacy global last_update until all existing issue/pull " "backups have per-resource checkpoints" ) return os.remove(legacy_last_update_path) logger.info( "Removed legacy global last_update after migrating incremental checkpoints" ) def backup_repositories(args, output_directory, repositories): logger.info("Backing up repositories") repos_template = "https://{0}/repos".format(get_github_api_host(args)) legacy_last_update_path, legacy_last_update = read_legacy_last_update( args, output_directory ) incremental_resource_work_attempted = False for repository in repositories: if repository.get("is_gist"): repo_cwd = os.path.join(output_directory, "gists", repository["id"]) elif repository.get("is_starred"): # put starred repos in -o/starred/${owner}/${repo} to prevent collision of # any repositories with the same name repo_cwd = os.path.join( output_directory, "starred", repository["owner"]["login"], repository["name"], ) else: repo_cwd = os.path.join( output_directory, "repositories", repository["name"] ) repo_dir = os.path.join(repo_cwd, "repository") repo_url = get_github_repo_url(args, repository) include_gists = args.include_gists or args.include_starred_gists include_starred = args.all_starred and repository.get("is_starred") if ( (args.include_repository or args.include_everything) or (include_gists and repository.get("is_gist")) or include_starred ): repo_name = ( repository.get("name") if not repository.get("is_gist") else repository.get("id") ) fetch_repository( repo_name, repo_url, repo_dir, skip_existing=args.skip_existing, bare_clone=args.bare_clone, lfs_clone=args.lfs_clone, no_prune=args.no_prune, ) if repository.get("is_gist"): # dump gist information to a file as well output_file = "{0}/gist.json".format(repo_cwd) with codecs.open(output_file, "w", encoding="utf-8") as f: json_dump(repository, f) continue # don't try to back anything else for a gist; it doesn't exist try: download_wiki = args.include_wiki or args.include_everything if repository["has_wiki"] and download_wiki: fetch_repository( repository["name"], repo_url.replace(".git", ".wiki.git"), os.path.join(repo_cwd, "wiki"), skip_existing=args.skip_existing, bare_clone=args.bare_clone, lfs_clone=args.lfs_clone, no_prune=args.no_prune, ) if args.include_issues or args.include_everything: incremental_resource_work_attempted = True issue_cwd = os.path.join(repo_cwd, "issues") args.since = read_resource_last_update( args, issue_cwd, legacy_last_update ) backup_issues(args, repo_cwd, repository, repos_template) write_resource_last_update(args, issue_cwd, repository) if args.include_pulls or args.include_everything: incremental_resource_work_attempted = True pulls_cwd = os.path.join(repo_cwd, "pulls") args.since = read_resource_last_update( args, pulls_cwd, legacy_last_update ) backup_pulls(args, repo_cwd, repository, repos_template) write_resource_last_update(args, pulls_cwd, repository) if args.include_discussions or args.include_everything: backup_discussions(args, repo_cwd, repository) if args.include_milestones or args.include_everything: backup_milestones(args, repo_cwd, repository, repos_template) if args.include_security_advisories or ( args.include_everything and not repository.get("private", False) ): backup_security_advisories(args, repo_cwd, repository, repos_template) if args.include_labels or args.include_everything: backup_labels(args, repo_cwd, repository, repos_template) if args.include_hooks or args.include_everything: backup_hooks(args, repo_cwd, repository, repos_template) if args.include_releases or args.include_everything: backup_releases( args, repo_cwd, repository, repos_template, include_assets=args.include_assets or args.include_everything, ) except RepositoryUnavailableError as e: logger.warning(f"Repository {repository['full_name']} is unavailable: {e}") if e.legal_url: logger.warning(f"Legal notice: {e.legal_url}") logger.info(f"Skipping remaining resources for {repository['full_name']}") continue if incremental_resource_work_attempted: remove_legacy_last_update_if_migrated( args, output_directory, legacy_last_update_path ) def _repository_owner_name(repository): return repository["full_name"].split("/", 1) def _connection_nodes(connection): return [node for node in (connection or {}).get("nodes") or [] if node] def retrieve_discussion_summaries(args, repository, since=None): owner, name = _repository_owner_name(repository) after = None page = 1 summaries = [] newest_seen = None discussions_enabled = None total_count = 0 while True: data = retrieve_graphql_data( args, DISCUSSION_LIST_QUERY, { "owner": owner, "name": name, "after": after, "pageSize": DISCUSSION_PAGE_SIZE, }, log_context="discussion summaries {0} page {1}".format( repository["full_name"], page ), ) repository_data = data.get("repository") if repository_data is None: raise Exception( "Repository {0} not found in GraphQL response".format( repository["full_name"] ) ) discussions_enabled = repository_data.get("hasDiscussionsEnabled") if not discussions_enabled: return [], None, False, 0 discussions = repository_data.get("discussions") or {} total_count = discussions.get("totalCount", total_count) stop = False for discussion in _connection_nodes(discussions): updated_at = discussion.get("updatedAt") if updated_at and (newest_seen is None or updated_at > newest_seen): newest_seen = updated_at if since and updated_at and updated_at <= since: stop = True break summaries.append(discussion) page_info = discussions.get("pageInfo") or {} if stop or not page_info.get("hasNextPage"): break after = page_info.get("endCursor") page += 1 return summaries, newest_seen, discussions_enabled, total_count def retrieve_discussion_comment_replies(args, comment_id, after=None, log_context=None): data = retrieve_graphql_data( args, DISCUSSION_REPLIES_QUERY, { "commentId": comment_id, "repliesCursor": after, "pageSize": DISCUSSION_PAGE_SIZE, }, log_context=log_context, ) node = data.get("node") or {} return node.get("replies") or {} def _discussion_comment_log_identifier(comment_node): return ( comment_node.get("databaseId") or comment_node.get("url") or comment_node.get("id") ) def _discussion_comment_with_replies( args, comment_node, repository_full_name=None, discussion_number=None ): replies_connection = comment_node.get("replies") or {} replies = _connection_nodes(replies_connection) reply_total_count = replies_connection.get("totalCount", len(replies)) page_info = replies_connection.get("pageInfo") or {} reply_page = 2 while page_info.get("hasNextPage"): log_context = None if repository_full_name and discussion_number is not None: log_context = "discussion {0}#{1} comment {2} replies page {3}".format( repository_full_name, discussion_number, _discussion_comment_log_identifier(comment_node), reply_page, ) replies_connection = retrieve_discussion_comment_replies( args, comment_node["id"], page_info.get("endCursor"), log_context=log_context, ) replies.extend(_connection_nodes(replies_connection)) page_info = replies_connection.get("pageInfo") or {} reply_page += 1 comment = {key: value for key, value in comment_node.items() if key != "replies"} comment["reply_count"] = reply_total_count comment["reply_data"] = replies return comment def retrieve_discussion(args, repository, number): owner, name = _repository_owner_name(repository) comments_cursor = None comments_page = 1 discussion_data = None comments = [] comment_total_count = 0 while True: data = retrieve_graphql_data( args, DISCUSSION_DETAIL_QUERY, { "owner": owner, "name": name, "number": number, "commentsCursor": comments_cursor, "pageSize": DISCUSSION_PAGE_SIZE, }, log_context="discussion {0}#{1} details/comments page {2}".format( repository["full_name"], number, comments_page ), ) repository_data = data.get("repository") or {} discussion = repository_data.get("discussion") if discussion is None: raise Exception( "Discussion #{0} not found in {1}".format( number, repository["full_name"] ) ) if discussion_data is None: discussion_data = { key: value for key, value in discussion.items() if key != "comments" } comments_connection = discussion.get("comments") or {} comment_total_count = comments_connection.get( "totalCount", comment_total_count ) for comment_node in _connection_nodes(comments_connection): comments.append( _discussion_comment_with_replies( args, comment_node, repository["full_name"], number ) ) page_info = comments_connection.get("pageInfo") or {} if not page_info.get("hasNextPage"): break comments_cursor = page_info.get("endCursor") comments_page += 1 discussion_data["comment_count"] = comment_total_count discussion_data["comment_data"] = comments return discussion_data def backup_discussions(args, repo_cwd, repository): discussion_cwd = os.path.join(repo_cwd, "discussions") if args.skip_existing and os.path.isdir(discussion_cwd): return if not get_graphql_auth(args): logger.info( "Skipping {0} discussions since GitHub GraphQL API requires authentication".format( repository["full_name"] ) ) return discussions_since = None discussion_last_update_path = os.path.join(discussion_cwd, "last_update") if args.incremental and os.path.exists(discussion_last_update_path): discussions_since = open(discussion_last_update_path).read().strip() logger.info("Retrieving {0} discussions".format(repository["full_name"])) try: ( summaries, newest_seen, discussions_enabled, total_count, ) = retrieve_discussion_summaries(args, repository, since=discussions_since) except Exception as e: logger.warning( "Unable to retrieve discussions for {0}, skipping: {1}".format( repository["full_name"], e ) ) return if not discussions_enabled: logger.info( "Discussions are not enabled for {0}, skipping".format( repository["full_name"] ) ) return mkdir_p(repo_cwd, discussion_cwd) if discussions_since: logger.info( "Saving {0} updated discussions to disk ({1} total)".format( len(summaries), total_count ) ) else: logger.info("Saving {0} discussions to disk".format(len(summaries))) written_count = 0 skipped_count = 0 had_errors = False for summary in summaries: number = summary["number"] discussion_file = os.path.join(discussion_cwd, "{0}.json".format(number)) if args.incremental_by_files and os.path.isfile(discussion_file): modified = os.path.getmtime(discussion_file) modified = datetime.fromtimestamp(modified).strftime("%Y-%m-%dT%H:%M:%SZ") if modified > summary["updatedAt"]: logger.info( "Skipping discussion {0} because it wasn't modified since last backup".format( number ) ) skipped_count += 1 continue try: discussion = retrieve_discussion(args, repository, number) except Exception as e: logger.warning( "Unable to retrieve discussion {0}#{1}, skipping: {2}".format( repository["full_name"], number, e ) ) had_errors = True continue if args.include_attachments: download_attachments( args, discussion_cwd, discussion, number, repository, item_type="discussion", ) if json_dump_if_changed(discussion, discussion_file): written_count += 1 if ( args.incremental and not had_errors and newest_seen and (not discussions_since or newest_seen > discussions_since) ): open(discussion_last_update_path, "w").write(newest_seen) attempted_count = len(summaries) - skipped_count if not summaries: logger.info("No discussions to save") elif attempted_count == 0: logger.info("{0} discussions skipped".format(skipped_count)) elif written_count == attempted_count: logger.info("Saved {0} discussions to disk".format(written_count)) elif written_count == 0: logger.info( "{0} discussions unchanged, skipped write".format(attempted_count) ) else: logger.info( "Saved {0} discussions to disk ({1} unchanged, {2} skipped)".format( written_count, attempted_count - written_count, skipped_count, ) ) def backup_issues(args, repo_cwd, repository, repos_template): has_issues_dir = os.path.isdir("{0}/issues/.git".format(repo_cwd)) if args.skip_existing and has_issues_dir: return logger.info("Retrieving {0} issues".format(repository["full_name"])) issue_cwd = os.path.join(repo_cwd, "issues") mkdir_p(repo_cwd, issue_cwd) issues = {} issues_skipped = 0 issues_skipped_message = "" _issue_template = "{0}/{1}/issues".format(repos_template, repository["full_name"]) should_include_pulls = args.include_pulls or args.include_everything issue_states = ["open", "closed"] for issue_state in issue_states: query_args = {"filter": "all", "state": issue_state} if args.since: query_args["since"] = args.since _issues = retrieve_data(args, _issue_template, query_args=query_args) for issue in _issues: # skip pull requests which are also returned as issues # if retrieving pull requests is requested as well if "pull_request" in issue and should_include_pulls: issues_skipped += 1 continue issues[issue["number"]] = issue if issues_skipped: issues_skipped_message = " (skipped {0} pull requests)".format(issues_skipped) logger.info( "Saving {0} issues to disk{1}".format( len(list(issues.keys())), issues_skipped_message ) ) comments_template = _issue_template + "/{0}/comments" events_template = _issue_template + "/{0}/events" for number, issue in list(issues.items()): issue_file = "{0}/{1}.json".format(issue_cwd, number) if args.incremental_by_files and os.path.isfile(issue_file): modified = os.path.getmtime(issue_file) modified = datetime.fromtimestamp(modified).strftime("%Y-%m-%dT%H:%M:%SZ") if modified > issue["updated_at"]: logger.info( "Skipping issue {0} because it wasn't modified since last backup".format( number ) ) continue if args.include_issue_comments or args.include_everything: template = comments_template.format(number) issues[number]["comment_data"] = retrieve_data(args, template) if args.include_issue_events or args.include_everything: template = events_template.format(number) issues[number]["event_data"] = retrieve_data(args, template) if args.include_attachments: download_attachments( args, issue_cwd, issues[number], number, repository, item_type="issue" ) with codecs.open(issue_file + ".temp", "w", encoding="utf-8") as f: json_dump(issue, f) os.replace(issue_file + ".temp", issue_file) # Atomic write PULL_OPTIONAL_DATA_KEYS = ( "comment_regular_data", "comment_data", "commit_data", "review_data", ) PULL_REVIEWS_LAST_UPDATE_FILENAME = "reviews_last_update" def read_json_file_if_exists(path): if not os.path.isfile(path): return None try: with codecs.open(path, "r", encoding="utf-8") as f: return json.load(f) except (OSError, UnicodeDecodeError, json.decoder.JSONDecodeError) as e: logger.debug("Error reading existing JSON file {0}: {1}".format(path, e)) return None def restore_existing_pull_optional_data(pull, existing_pull): if not existing_pull: return for key in PULL_OPTIONAL_DATA_KEYS: if key not in pull and key in existing_pull: pull[key] = existing_pull[key] def get_pull_reviews_since(args, pulls_cwd): args_since = getattr(args, "since", None) if not args.incremental: return args_since, None, None reviews_last_update_path = os.path.join( pulls_cwd, PULL_REVIEWS_LAST_UPDATE_FILENAME ) if not os.path.exists(reviews_last_update_path): # One-time backfill for existing incremental backups: if the user adds # --pull-reviews after a repository checkpoint already exists, the # repository-level checkpoint would otherwise skip old PRs forever. return None, None, reviews_last_update_path reviews_since = open(reviews_last_update_path).read().strip() if args_since and reviews_since: return min(args_since, reviews_since), reviews_since, reviews_last_update_path return args_since or reviews_since, reviews_since, reviews_last_update_path def backup_pulls(args, repo_cwd, repository, repos_template): has_pulls_dir = os.path.isdir("{0}/pulls/.git".format(repo_cwd)) if args.skip_existing and has_pulls_dir: return logger.info("Retrieving {0} pull requests".format(repository["full_name"])) # noqa pulls_cwd = os.path.join(repo_cwd, "pulls") mkdir_p(repo_cwd, pulls_cwd) include_pull_reviews = args.include_pull_reviews or args.include_everything repository_since = getattr(args, "since", None) pulls_since = repository_since pull_reviews_since = None pull_reviews_last_update_path = None if include_pull_reviews: ( pulls_since, pull_reviews_since, pull_reviews_last_update_path, ) = get_pull_reviews_since(args, pulls_cwd) pulls = {} newest_pull_update = None _pulls_template = "{0}/{1}/pulls".format(repos_template, repository["full_name"]) _issue_template = "{0}/{1}/issues".format(repos_template, repository["full_name"]) query_args = { "filter": "all", "state": "all", "sort": "updated", "direction": "desc", } def track_newest_pull_update(pull): nonlocal newest_pull_update updated_at = pull.get("updated_at") if updated_at and ( newest_pull_update is None or updated_at > newest_pull_update ): newest_pull_update = updated_at def pull_is_due_for_repository_checkpoint(pull): return not repository_since or pull["updated_at"] > repository_since if not args.include_pull_details: pull_states = ["open", "closed"] for pull_state in pull_states: query_args["state"] = pull_state for pull in retrieve_data( args, _pulls_template, query_args=query_args, lazy=True ): track_newest_pull_update(pull) if pulls_since and pull["updated_at"] <= pulls_since: break if not pulls_since or pull["updated_at"] > pulls_since: pulls[pull["number"]] = pull else: for pull in retrieve_data( args, _pulls_template, query_args=query_args, lazy=True ): track_newest_pull_update(pull) if pulls_since and pull["updated_at"] <= pulls_since: break if not pulls_since or pull["updated_at"] > pulls_since: if pull_is_due_for_repository_checkpoint(pull): pulls[pull["number"]] = retrieve_data( args, _pulls_template + "/{}".format(pull["number"]), paginated=False, )[0] else: pulls[pull["number"]] = pull logger.info("Saving {0} pull requests to disk".format(len(list(pulls.keys())))) # Comments from pulls API are only _review_ comments # regular comments need to be fetched via issue API. # For backwards compatibility with versions <= 0.41.0 # keep name "comment_data" for review comments comments_regular_template = _issue_template + "/{0}/comments" comments_template = _pulls_template + "/{0}/comments" commits_template = _pulls_template + "/{0}/commits" reviews_template = _pulls_template + "/{0}/reviews" pull_review_errors = False for number, pull in list(pulls.items()): pull_file = "{0}/{1}.json".format(pulls_cwd, number) existing_pull = read_json_file_if_exists(pull_file) needs_review_backfill = ( include_pull_reviews and (not existing_pull or "review_data" not in existing_pull) ) if args.incremental_by_files and os.path.isfile(pull_file): modified = os.path.getmtime(pull_file) modified = datetime.fromtimestamp(modified).strftime("%Y-%m-%dT%H:%M:%SZ") if modified > pull["updated_at"] and not needs_review_backfill: logger.info( "Skipping pull request {0} because it wasn't modified since last backup".format( number ) ) continue should_fetch_non_review_data = pull_is_due_for_repository_checkpoint(pull) if ( args.include_pull_comments or args.include_everything ) and should_fetch_non_review_data: template = comments_regular_template.format(number) pulls[number]["comment_regular_data"] = retrieve_data(args, template) template = comments_template.format(number) pulls[number]["comment_data"] = retrieve_data(args, template) if include_pull_reviews: template = reviews_template.format(number) try: pulls[number]["review_data"] = retrieve_data(args, template) except Exception as e: pull_review_errors = True logger.warning( "Unable to retrieve reviews for pull request {0}#{1}, skipping reviews: {2}".format( repository["full_name"], number, e ) ) if ( args.include_pull_commits or args.include_everything ) and should_fetch_non_review_data: template = commits_template.format(number) pulls[number]["commit_data"] = retrieve_data(args, template) if args.include_attachments: download_attachments( args, pulls_cwd, pulls[number], number, repository, item_type="pull" ) restore_existing_pull_optional_data(pull, existing_pull) with codecs.open(pull_file + ".temp", "w", encoding="utf-8") as f: json_dump(pull, f) os.replace(pull_file + ".temp", pull_file) # Atomic write if ( include_pull_reviews and args.incremental and pull_reviews_last_update_path and newest_pull_update and not pull_review_errors and (not pull_reviews_since or newest_pull_update > pull_reviews_since) ): open(pull_reviews_last_update_path, "w").write(newest_pull_update) def backup_milestones(args, repo_cwd, repository, repos_template): milestone_cwd = os.path.join(repo_cwd, "milestones") if args.skip_existing and os.path.isdir(milestone_cwd): return logger.info("Retrieving {0} milestones".format(repository["full_name"])) mkdir_p(repo_cwd, milestone_cwd) template = "{0}/{1}/milestones".format(repos_template, repository["full_name"]) query_args = {"state": "all"} _milestones = retrieve_data(args, template, query_args=query_args) milestones = {} for milestone in _milestones: milestones[milestone["number"]] = milestone written_count = 0 for number, milestone in list(milestones.items()): milestone_file = "{0}/{1}.json".format(milestone_cwd, number) if json_dump_if_changed(milestone, milestone_file): written_count += 1 total = len(milestones) if written_count == total: logger.info("Saved {0} milestones to disk".format(total)) elif written_count == 0: logger.info("{0} milestones unchanged, skipped write".format(total)) else: logger.info( "Saved {0} of {1} milestones to disk ({2} unchanged)".format( written_count, total, total - written_count ) ) def backup_security_advisories(args, repo_cwd, repository, repos_template): advisory_cwd = os.path.join(repo_cwd, "security-advisories") if args.skip_existing and os.path.isdir(advisory_cwd): return logger.info("Retrieving {0} security advisories".format(repository["full_name"])) template = "{0}/{1}/security-advisories".format( repos_template, repository["full_name"] ) try: _advisories = retrieve_data(args, template) except Exception as e: if "404" in str(e): logger.info("Security advisories are not available for this repository, skipping") return raise mkdir_p(repo_cwd, advisory_cwd) advisories = {} for advisory in _advisories: advisories[advisory["ghsa_id"]] = advisory written_count = 0 for ghsa_id, advisory in list(advisories.items()): advisory_file = "{0}/{1}.json".format(advisory_cwd, ghsa_id) if json_dump_if_changed(advisory, advisory_file): written_count += 1 total = len(advisories) if written_count == total: logger.info("Saved {0} security advisories to disk".format(total)) elif written_count == 0: logger.info("{0} security advisories unchanged, skipped write".format(total)) else: logger.info( "Saved {0} of {1} security advisories to disk ({2} unchanged)".format( written_count, total, total - written_count ) ) def backup_labels(args, repo_cwd, repository, repos_template): label_cwd = os.path.join(repo_cwd, "labels") output_file = "{0}/labels.json".format(label_cwd) template = "{0}/{1}/labels".format(repos_template, repository["full_name"]) _backup_data(args, "labels", template, output_file, label_cwd) def backup_hooks(args, repo_cwd, repository, repos_template): auth = get_auth(args) if not auth: logger.info("Skipping hooks since no authentication provided") return hook_cwd = os.path.join(repo_cwd, "hooks") output_file = "{0}/hooks.json".format(hook_cwd) template = "{0}/{1}/hooks".format(repos_template, repository["full_name"]) try: _backup_data(args, "hooks", template, output_file, hook_cwd) except Exception as e: if "404" in str(e): logger.info("Unable to read hooks, skipping") else: raise e def backup_releases(args, repo_cwd, repository, repos_template, include_assets=False): repository_fullname = repository["full_name"] # give release files somewhere to live & log intent release_cwd = os.path.join(repo_cwd, "releases") logger.info("Retrieving {0} releases".format(repository_fullname)) mkdir_p(repo_cwd, release_cwd) query_args = {} release_template = "{0}/{1}/releases".format(repos_template, repository_fullname) releases = retrieve_data(args, release_template, query_args=query_args) if args.skip_prerelease: releases = [r for r in releases if not r["prerelease"] and not r["draft"]] if args.number_of_latest_releases and args.number_of_latest_releases < len( releases ): releases.sort( key=lambda item: datetime.strptime( item["created_at"], "%Y-%m-%dT%H:%M:%SZ" ), reverse=True, ) releases = releases[: args.number_of_latest_releases] # Check if this repo should skip asset downloads (case-insensitive) skip_assets = False if include_assets: repo_name = repository.get("name", "").lower() repo_full_name = repository.get("full_name", "").lower() skip_repos = [r.lower() for r in (args.skip_assets_on or [])] skip_assets = repo_name in skip_repos or repo_full_name in skip_repos if skip_assets: logger.info( "Skipping assets for {0} ({1} releases) due to --skip-assets-on".format( repository.get("name"), len(releases) ) ) # for each release, store it written_count = 0 for release in releases: release_name = release["tag_name"] release_name_safe = release_name.replace("/", "__") output_filepath = os.path.join( release_cwd, "{0}.json".format(release_name_safe) ) if json_dump_if_changed(release, output_filepath): written_count += 1 if include_assets and not skip_assets: # The releases list API already includes release asset metadata. Use # it to avoid an extra /releases/{id}/assets request per release. # Keep a fallback for older/enterprise responses that might omit it. assets = release.get("assets") if assets is None: assets = retrieve_data(args, release["assets_url"]) if len(assets) > 0: # give release asset files somewhere to live & download them (not including source archives) release_assets_cwd = os.path.join(release_cwd, release_name_safe) mkdir_p(release_assets_cwd) for asset in assets: download_file( asset["url"], os.path.join(release_assets_cwd, asset["name"]), get_auth(args, encode=not args.as_app), as_app=args.as_app, fine=True if args.token_fine is not None else False, ) # Log the results total = len(releases) if written_count == total: logger.info("Saved {0} releases to disk".format(total)) elif written_count == 0: logger.info("{0} releases unchanged, skipped write".format(total)) else: logger.info( "Saved {0} of {1} releases to disk ({2} unchanged)".format( written_count, total, total - written_count ) ) def fetch_repository( name, remote_url, local_dir, skip_existing=False, bare_clone=False, lfs_clone=False, no_prune=False, ): if bare_clone: if os.path.exists(local_dir): clone_exists = ( subprocess.check_output( ["git", "rev-parse", "--is-bare-repository"], cwd=local_dir ) == b"true\n" ) else: clone_exists = False else: clone_exists = os.path.exists(os.path.join(local_dir, ".git")) if clone_exists and skip_existing: return masked_remote_url = mask_password(remote_url) initialized = subprocess.call( ["git", "ls-remote", remote_url], stdout=FNULL, stderr=FNULL ) if initialized == 128: if ".wiki.git" in remote_url: logger.info( "Skipping {0} wiki (wiki is enabled but has no content)".format(name) ) else: logger.info( "Skipping {0} (repository not accessible - may be empty, private, or credentials invalid)".format( name ) ) return if clone_exists: logger.info("Updating {0} in {1}".format(name, local_dir)) remotes = subprocess.check_output(["git", "remote", "show"], cwd=local_dir) remotes = [i.strip() for i in remotes.decode("utf-8").splitlines()] if "origin" not in remotes: git_command = ["git", "remote", "rm", "origin"] logging_subprocess(git_command, cwd=local_dir) git_command = ["git", "remote", "add", "origin", remote_url] logging_subprocess(git_command, cwd=local_dir) else: git_command = ["git", "remote", "set-url", "origin", remote_url] logging_subprocess(git_command, cwd=local_dir) git_command = ["git", "fetch", "--all", "--force", "--tags", "--prune"] if no_prune: git_command.pop() logging_subprocess(git_command, cwd=local_dir) if lfs_clone: git_command = ["git", "lfs", "fetch", "--all", "--prune"] if no_prune: git_command.pop() logging_subprocess(git_command, cwd=local_dir) else: logger.info( "Cloning {0} repository from {1} to {2}".format( name, masked_remote_url, local_dir ) ) if bare_clone: git_command = ["git", "clone", "--mirror", remote_url, local_dir] logging_subprocess(git_command) if lfs_clone: git_command = ["git", "lfs", "fetch", "--all", "--prune"] if no_prune: git_command.pop() logging_subprocess(git_command, cwd=local_dir) else: git_command = ["git", "clone", remote_url, local_dir] logging_subprocess(git_command) if lfs_clone: git_command = ["git", "lfs", "fetch", "--all", "--prune"] if no_prune: git_command.pop() logging_subprocess(git_command, cwd=local_dir) def backup_account(args, output_directory): account_cwd = os.path.join(output_directory, "account") if args.include_starred or args.include_everything: output_file = "{0}/starred.json".format(account_cwd) template = "https://{0}/users/{1}/starred".format( get_github_api_host(args), args.user ) _backup_data(args, "starred repositories", template, output_file, account_cwd) if args.include_watched or args.include_everything: output_file = "{0}/watched.json".format(account_cwd) template = "https://{0}/users/{1}/subscriptions".format( get_github_api_host(args), args.user ) _backup_data(args, "watched repositories", template, output_file, account_cwd) if args.include_followers or args.include_everything: output_file = "{0}/followers.json".format(account_cwd) template = "https://{0}/users/{1}/followers".format( get_github_api_host(args), args.user ) _backup_data(args, "followers", template, output_file, account_cwd) if args.include_following or args.include_everything: output_file = "{0}/following.json".format(account_cwd) template = "https://{0}/users/{1}/following".format( get_github_api_host(args), args.user ) _backup_data(args, "following", template, output_file, account_cwd) def _backup_data(args, name, template, output_file, output_directory): skip_existing = args.skip_existing if not skip_existing or not os.path.exists(output_file): logger.info("Retrieving {0} {1}".format(args.user, name)) mkdir_p(output_directory) data = retrieve_data(args, template) if json_dump_if_changed(data, output_file): logger.info("Saved {0} {1} to disk".format(len(data), name)) else: logger.info("{0} {1} unchanged, skipped write".format(len(data), name)) def json_dump(data, output_file): json.dump( data, output_file, ensure_ascii=False, sort_keys=True, indent=4, separators=(",", ": "), ) def json_dump_if_changed(data, output_file_path): """ Write JSON data to file only if content has changed. Compares the serialized JSON data with the existing file content and only writes if different. This prevents unnecessary file modification timestamp updates and disk writes. Uses atomic writes (temp file + rename) to prevent corruption if the process is interrupted during the write. Args: data: The data to serialize as JSON output_file_path: The path to the output file Returns: True if file was written (content changed or new file) False if write was skipped (content unchanged) """ # Serialize new data with consistent formatting matching json_dump() new_content = json.dumps( data, ensure_ascii=False, sort_keys=True, indent=4, separators=(",", ": "), ) # Check if file exists and compare content if os.path.exists(output_file_path): try: with codecs.open(output_file_path, "r", encoding="utf-8") as f: existing_content = f.read() if existing_content == new_content: logger.debug( "Content unchanged, skipping write: {0}".format(output_file_path) ) return False except (OSError, UnicodeDecodeError) as e: # If we can't read the existing file, write the new one logger.debug( "Error reading existing file {0}, will overwrite: {1}".format( output_file_path, e ) ) # Write the file atomically using temp file + rename temp_file = output_file_path + ".temp" with codecs.open(temp_file, "w", encoding="utf-8") as f: f.write(new_content) os.replace(temp_file, output_file_path) # Atomic write return True