mirror of
https://github.com/josegonzalez/python-github-backup.git
synced 2025-12-05 08:08:02 +01:00
Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9ef496efad | ||
|
|
42bfe6f79d | ||
|
|
5af522a348 | ||
|
|
6dfba7a783 | ||
|
|
7551829677 | ||
|
|
72d35a9b94 | ||
|
|
3eae9d78ed | ||
|
|
90ba839c7d |
37
CHANGES.rst
37
CHANGES.rst
@@ -1,9 +1,44 @@
|
||||
Changelog
|
||||
=========
|
||||
|
||||
0.51.1 (2025-11-16)
|
||||
0.51.3 (2025-11-18)
|
||||
-------------------
|
||||
------------------------
|
||||
- Test: Add pagination tests for cursor and page-based Link headers.
|
||||
[Rodos]
|
||||
- Use cursor based pagination. [Helio Machado]
|
||||
|
||||
|
||||
0.51.2 (2025-11-16)
|
||||
-------------------
|
||||
|
||||
Fix
|
||||
~~~
|
||||
- Improve CA certificate detection with fallback chain. [Rodos]
|
||||
|
||||
The previous implementation incorrectly assumed empty get_ca_certs()
|
||||
meant broken SSL, causing false failures in GitHub Codespaces and other
|
||||
directory-based cert systems where certificates exist but aren't pre-loaded.
|
||||
It would then attempt to import certifi as a workaround, but certifi wasn't
|
||||
listed in requirements.txt, causing the fallback to fail with ImportError
|
||||
even though the system certificates would have worked fine.
|
||||
|
||||
This commit replaces the naive check with a layered fallback approach that
|
||||
checks multiple certificate sources. First it checks for pre-loaded system
|
||||
certs (file-based systems). Then it verifies system cert paths exist
|
||||
(directory-based systems like Ubuntu/Debian/Codespaces). Finally it attempts
|
||||
to use certifi as an optional fallback only if needed.
|
||||
|
||||
This approach eliminates hard dependencies (certifi is now optional), works
|
||||
in GitHub Codespaces without any setup, and fails gracefully with clear hints
|
||||
for resolution when SSL is actually broken rather than failing with
|
||||
ModuleNotFoundError.
|
||||
|
||||
Fixes #444
|
||||
|
||||
|
||||
0.51.1 (2025-11-16)
|
||||
-------------------
|
||||
|
||||
Fix
|
||||
~~~
|
||||
|
||||
@@ -1 +1 @@
|
||||
__version__ = "0.51.1"
|
||||
__version__ = "0.51.3"
|
||||
|
||||
@@ -37,22 +37,33 @@ FNULL = open(os.devnull, "w")
|
||||
FILE_URI_PREFIX = "file://"
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Setup SSL context with fallback chain
|
||||
https_ctx = ssl.create_default_context()
|
||||
if not https_ctx.get_ca_certs():
|
||||
import warnings
|
||||
if https_ctx.get_ca_certs():
|
||||
# Layer 1: Certificates pre-loaded from system (file-based)
|
||||
pass
|
||||
else:
|
||||
paths = ssl.get_default_verify_paths()
|
||||
if (paths.cafile and os.path.exists(paths.cafile)) or (
|
||||
paths.capath and os.path.exists(paths.capath)
|
||||
):
|
||||
# Layer 2: Cert paths exist, will be lazy-loaded on first use (directory-based)
|
||||
pass
|
||||
else:
|
||||
# Layer 3: Try certifi package as optional fallback
|
||||
try:
|
||||
import certifi
|
||||
|
||||
warnings.warn(
|
||||
"\n\nYOUR DEFAULT CA CERTS ARE EMPTY.\n"
|
||||
+ "PLEASE POPULATE ANY OF:"
|
||||
+ "".join(
|
||||
["\n - " + x for x in ssl.get_default_verify_paths() if type(x) is str]
|
||||
)
|
||||
+ "\n",
|
||||
stacklevel=2,
|
||||
)
|
||||
import certifi
|
||||
|
||||
https_ctx = ssl.create_default_context(cafile=certifi.where())
|
||||
https_ctx = ssl.create_default_context(cafile=certifi.where())
|
||||
except ImportError:
|
||||
# All layers failed - no certificates available anywhere
|
||||
sys.exit(
|
||||
"\nERROR: No CA certificates found. Cannot connect to GitHub over SSL.\n\n"
|
||||
"Solutions you can explore:\n"
|
||||
" 1. pip install certifi\n"
|
||||
" 2. Alpine: apk add ca-certificates\n"
|
||||
" 3. Debian/Ubuntu: apt-get install ca-certificates\n\n"
|
||||
)
|
||||
|
||||
|
||||
def logging_subprocess(
|
||||
@@ -581,27 +592,26 @@ def retrieve_data_gen(args, template, query_args=None, single_request=False):
|
||||
auth = get_auth(args, encode=not args.as_app)
|
||||
query_args = get_query_args(query_args)
|
||||
per_page = 100
|
||||
page = 0
|
||||
next_url = None
|
||||
|
||||
while True:
|
||||
if single_request:
|
||||
request_page, request_per_page = None, None
|
||||
request_per_page = None
|
||||
else:
|
||||
page = page + 1
|
||||
request_page, request_per_page = page, per_page
|
||||
request_per_page = per_page
|
||||
|
||||
request = _construct_request(
|
||||
request_per_page,
|
||||
request_page,
|
||||
query_args,
|
||||
template,
|
||||
next_url or template,
|
||||
auth,
|
||||
as_app=args.as_app,
|
||||
fine=True if args.token_fine is not None else False,
|
||||
) # noqa
|
||||
r, errors = _get_response(request, auth, template)
|
||||
r, errors = _get_response(request, auth, next_url or template)
|
||||
|
||||
status_code = int(r.getcode())
|
||||
|
||||
# Check if we got correct data
|
||||
try:
|
||||
response = json.loads(r.read().decode("utf-8"))
|
||||
@@ -633,15 +643,14 @@ def retrieve_data_gen(args, template, query_args=None, single_request=False):
|
||||
retries += 1
|
||||
time.sleep(5)
|
||||
request = _construct_request(
|
||||
per_page,
|
||||
page,
|
||||
request_per_page,
|
||||
query_args,
|
||||
template,
|
||||
next_url or template,
|
||||
auth,
|
||||
as_app=args.as_app,
|
||||
fine=True if args.token_fine is not None else False,
|
||||
) # noqa
|
||||
r, errors = _get_response(request, auth, template)
|
||||
r, errors = _get_response(request, auth, next_url or template)
|
||||
|
||||
status_code = int(r.getcode())
|
||||
try:
|
||||
@@ -671,7 +680,16 @@ def retrieve_data_gen(args, template, query_args=None, single_request=False):
|
||||
if type(response) is list:
|
||||
for resp in response:
|
||||
yield resp
|
||||
if len(response) < per_page:
|
||||
# Parse Link header for next page URL (cursor-based pagination)
|
||||
link_header = r.headers.get("Link", "")
|
||||
next_url = None
|
||||
if link_header:
|
||||
# Parse Link header: <https://api.github.com/...?per_page=100&after=cursor>; rel="next"
|
||||
for link in link_header.split(","):
|
||||
if 'rel="next"' in link:
|
||||
next_url = link[link.find("<") + 1:link.find(">")]
|
||||
break
|
||||
if not next_url:
|
||||
break
|
||||
elif type(response) is dict and single_request:
|
||||
yield response
|
||||
@@ -724,22 +742,27 @@ def _get_response(request, auth, template):
|
||||
|
||||
|
||||
def _construct_request(
|
||||
per_page, page, query_args, template, auth, as_app=None, fine=False
|
||||
per_page, query_args, template, auth, as_app=None, fine=False
|
||||
):
|
||||
all_query_args = {}
|
||||
if per_page:
|
||||
all_query_args["per_page"] = per_page
|
||||
if page:
|
||||
all_query_args["page"] = page
|
||||
if query_args:
|
||||
all_query_args.update(query_args)
|
||||
|
||||
request_url = template
|
||||
if all_query_args:
|
||||
querystring = urlencode(all_query_args)
|
||||
request_url = template + "?" + querystring
|
||||
# If template is already a full URL with query params (from Link header), use it directly
|
||||
if "?" in template and template.startswith("http"):
|
||||
request_url = template
|
||||
# Extract query string for logging
|
||||
querystring = template.split("?", 1)[1]
|
||||
else:
|
||||
querystring = ""
|
||||
# Build URL with query parameters
|
||||
all_query_args = {}
|
||||
if per_page:
|
||||
all_query_args["per_page"] = per_page
|
||||
if query_args:
|
||||
all_query_args.update(query_args)
|
||||
|
||||
request_url = template
|
||||
if all_query_args:
|
||||
querystring = urlencode(all_query_args)
|
||||
request_url = template + "?" + querystring
|
||||
else:
|
||||
querystring = ""
|
||||
|
||||
request = Request(request_url)
|
||||
if auth is not None:
|
||||
@@ -755,7 +778,7 @@ def _construct_request(
|
||||
"Accept", "application/vnd.github.machine-man-preview+json"
|
||||
)
|
||||
|
||||
log_url = template
|
||||
log_url = template if "?" not in template else template.split("?")[0]
|
||||
if querystring:
|
||||
log_url += "?" + querystring
|
||||
logger.info("Requesting {}".format(log_url))
|
||||
@@ -832,8 +855,7 @@ def download_file(url, path, auth, as_app=False, fine=False):
|
||||
return
|
||||
|
||||
request = _construct_request(
|
||||
per_page=100,
|
||||
page=1,
|
||||
per_page=None,
|
||||
query_args={},
|
||||
template=url,
|
||||
auth=auth,
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
|
||||
|
||||
153
tests/test_pagination.py
Normal file
153
tests/test_pagination.py
Normal file
@@ -0,0 +1,153 @@
|
||||
"""Tests for Link header pagination handling."""
|
||||
|
||||
import json
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from github_backup import github_backup
|
||||
|
||||
|
||||
class MockHTTPResponse:
|
||||
"""Mock HTTP response for paginated API calls."""
|
||||
|
||||
def __init__(self, data, link_header=None):
|
||||
self._content = json.dumps(data).encode("utf-8")
|
||||
self._link_header = link_header
|
||||
self._read = False
|
||||
self.reason = "OK"
|
||||
|
||||
def getcode(self):
|
||||
return 200
|
||||
|
||||
def read(self):
|
||||
if self._read:
|
||||
return b""
|
||||
self._read = True
|
||||
return self._content
|
||||
|
||||
def get_header(self, name, default=None):
|
||||
"""Mock method for headers.get()."""
|
||||
return self.headers.get(name, default)
|
||||
|
||||
@property
|
||||
def headers(self):
|
||||
headers = {"x-ratelimit-remaining": "5000"}
|
||||
if self._link_header:
|
||||
headers["Link"] = self._link_header
|
||||
return headers
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_args():
|
||||
"""Mock args for retrieve_data_gen."""
|
||||
args = Mock()
|
||||
args.as_app = False
|
||||
args.token_fine = None
|
||||
args.token_classic = "fake_token"
|
||||
args.username = None
|
||||
args.password = None
|
||||
args.osx_keychain_item_name = None
|
||||
args.osx_keychain_item_account = None
|
||||
args.throttle_limit = None
|
||||
args.throttle_pause = 0
|
||||
return args
|
||||
|
||||
|
||||
def test_cursor_based_pagination(mock_args):
|
||||
"""Link header with 'after' cursor parameter works correctly."""
|
||||
|
||||
# Simulate issues endpoint behavior: returns cursor in Link header
|
||||
responses = [
|
||||
# Issues endpoint returns 'after' cursor parameter (not 'page')
|
||||
MockHTTPResponse(
|
||||
data=[{"issue": i} for i in range(1, 101)], # Page 1 contents
|
||||
link_header='<https://api.github.com/repos/owner/repo/issues?per_page=100&after=ABC123&page=2>; rel="next"',
|
||||
),
|
||||
MockHTTPResponse(
|
||||
data=[{"issue": i} for i in range(101, 151)], # Page 2 contents
|
||||
link_header=None, # No Link header - signals end of pagination
|
||||
),
|
||||
]
|
||||
requests_made = []
|
||||
|
||||
def mock_urlopen(request, *args, **kwargs):
|
||||
url = request.get_full_url()
|
||||
requests_made.append(url)
|
||||
return responses[len(requests_made) - 1]
|
||||
|
||||
with patch("github_backup.github_backup.urlopen", side_effect=mock_urlopen):
|
||||
results = list(
|
||||
github_backup.retrieve_data_gen(
|
||||
mock_args, "https://api.github.com/repos/owner/repo/issues"
|
||||
)
|
||||
)
|
||||
|
||||
# Verify all items retrieved and cursor was used in second request
|
||||
assert len(results) == 150
|
||||
assert len(requests_made) == 2
|
||||
assert "after=ABC123" in requests_made[1]
|
||||
|
||||
|
||||
def test_page_based_pagination(mock_args):
|
||||
"""Link header with 'page' parameter works correctly."""
|
||||
|
||||
# Simulate pulls/repos endpoint behavior: returns page numbers in Link header
|
||||
responses = [
|
||||
# Pulls endpoint uses traditional 'page' parameter (not cursor)
|
||||
MockHTTPResponse(
|
||||
data=[{"pull": i} for i in range(1, 101)], # Page 1 contents
|
||||
link_header='<https://api.github.com/repos/owner/repo/pulls?per_page=100&page=2>; rel="next"',
|
||||
),
|
||||
MockHTTPResponse(
|
||||
data=[{"pull": i} for i in range(101, 181)], # Page 2 contents
|
||||
link_header=None, # No Link header - signals end of pagination
|
||||
),
|
||||
]
|
||||
requests_made = []
|
||||
|
||||
def mock_urlopen(request, *args, **kwargs):
|
||||
url = request.get_full_url()
|
||||
requests_made.append(url)
|
||||
return responses[len(requests_made) - 1]
|
||||
|
||||
with patch("github_backup.github_backup.urlopen", side_effect=mock_urlopen):
|
||||
results = list(
|
||||
github_backup.retrieve_data_gen(
|
||||
mock_args, "https://api.github.com/repos/owner/repo/pulls"
|
||||
)
|
||||
)
|
||||
|
||||
# Verify all items retrieved and page parameter was used (not cursor)
|
||||
assert len(results) == 180
|
||||
assert len(requests_made) == 2
|
||||
assert "page=2" in requests_made[1]
|
||||
assert "after" not in requests_made[1]
|
||||
|
||||
|
||||
def test_no_link_header_stops_pagination(mock_args):
|
||||
"""Pagination stops when Link header is absent."""
|
||||
|
||||
# Simulate endpoint with results that fit in a single page
|
||||
responses = [
|
||||
MockHTTPResponse(
|
||||
data=[{"label": i} for i in range(1, 51)], # Page contents
|
||||
link_header=None, # No Link header - signals end of pagination
|
||||
)
|
||||
]
|
||||
requests_made = []
|
||||
|
||||
def mock_urlopen(request, *args, **kwargs):
|
||||
requests_made.append(request.get_full_url())
|
||||
return responses[len(requests_made) - 1]
|
||||
|
||||
with patch("github_backup.github_backup.urlopen", side_effect=mock_urlopen):
|
||||
results = list(
|
||||
github_backup.retrieve_data_gen(
|
||||
mock_args, "https://api.github.com/repos/owner/repo/labels"
|
||||
)
|
||||
)
|
||||
|
||||
# Verify pagination stopped after first request
|
||||
assert len(results) == 50
|
||||
assert len(requests_made) == 1
|
||||
Reference in New Issue
Block a user