mirror of
https://github.com/josegonzalez/python-github-backup.git
synced 2025-12-05 16:18:02 +01:00
Compare commits
18 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9ef496efad | ||
|
|
42bfe6f79d | ||
|
|
5af522a348 | ||
|
|
6dfba7a783 | ||
|
|
7551829677 | ||
|
|
72d35a9b94 | ||
|
|
3eae9d78ed | ||
|
|
90ba839c7d | ||
|
|
1ec0820936 | ||
|
|
ca463e5cd4 | ||
|
|
1750d0eff1 | ||
|
|
e4d1c78993 | ||
|
|
7a9455db88 | ||
|
|
a98ff7f23d | ||
|
|
7b78f06a68 | ||
|
|
56db3ff0e8 | ||
|
|
5c9c20f6ee | ||
|
|
c8c585cbb5 |
33
.github/workflows/test.yml
vendored
Normal file
33
.github/workflows/test.yml
vendored
Normal file
@@ -0,0 +1,33 @@
|
||||
---
|
||||
name: "test"
|
||||
|
||||
# yamllint disable-line rule:truthy
|
||||
on:
|
||||
pull_request:
|
||||
branches:
|
||||
- "*"
|
||||
push:
|
||||
branches:
|
||||
- "main"
|
||||
- "master"
|
||||
|
||||
jobs:
|
||||
test:
|
||||
name: test
|
||||
runs-on: ubuntu-24.04
|
||||
strategy:
|
||||
matrix:
|
||||
python-version: ["3.10", "3.11", "3.12", "3.13", "3.14"]
|
||||
|
||||
steps:
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
- name: Setup Python
|
||||
uses: actions/setup-python@v6
|
||||
with:
|
||||
python-version: ${{ matrix.python-version }}
|
||||
cache: "pip"
|
||||
- run: pip install -r release-requirements.txt
|
||||
- run: pytest tests/ -v
|
||||
125
CHANGES.rst
125
CHANGES.rst
@@ -1,9 +1,132 @@
|
||||
Changelog
|
||||
=========
|
||||
|
||||
0.51.0 (2025-11-06)
|
||||
0.51.3 (2025-11-18)
|
||||
-------------------
|
||||
------------------------
|
||||
- Test: Add pagination tests for cursor and page-based Link headers.
|
||||
[Rodos]
|
||||
- Use cursor based pagination. [Helio Machado]
|
||||
|
||||
|
||||
0.51.2 (2025-11-16)
|
||||
-------------------
|
||||
|
||||
Fix
|
||||
~~~
|
||||
- Improve CA certificate detection with fallback chain. [Rodos]
|
||||
|
||||
The previous implementation incorrectly assumed empty get_ca_certs()
|
||||
meant broken SSL, causing false failures in GitHub Codespaces and other
|
||||
directory-based cert systems where certificates exist but aren't pre-loaded.
|
||||
It would then attempt to import certifi as a workaround, but certifi wasn't
|
||||
listed in requirements.txt, causing the fallback to fail with ImportError
|
||||
even though the system certificates would have worked fine.
|
||||
|
||||
This commit replaces the naive check with a layered fallback approach that
|
||||
checks multiple certificate sources. First it checks for pre-loaded system
|
||||
certs (file-based systems). Then it verifies system cert paths exist
|
||||
(directory-based systems like Ubuntu/Debian/Codespaces). Finally it attempts
|
||||
to use certifi as an optional fallback only if needed.
|
||||
|
||||
This approach eliminates hard dependencies (certifi is now optional), works
|
||||
in GitHub Codespaces without any setup, and fails gracefully with clear hints
|
||||
for resolution when SSL is actually broken rather than failing with
|
||||
ModuleNotFoundError.
|
||||
|
||||
Fixes #444
|
||||
|
||||
|
||||
0.51.1 (2025-11-16)
|
||||
-------------------
|
||||
|
||||
Fix
|
||||
~~~
|
||||
- Prevent duplicate attachment downloads. [Rodos]
|
||||
|
||||
Fixes bug where attachments were downloaded multiple times with
|
||||
incremented filenames (file.mov, file_1.mov, file_2.mov) when
|
||||
running backups without --skip-existing flag.
|
||||
|
||||
I should not have used the --skip-existing flag for attachments,
|
||||
it did not do what I thought it did.
|
||||
|
||||
The correct approach is to always use the manifest to guide what
|
||||
has already been downloaded and what now needs to be done.
|
||||
|
||||
Other
|
||||
~~~~~
|
||||
- Chore(deps): bump certifi in the python-packages group.
|
||||
[dependabot[bot]]
|
||||
|
||||
Bumps the python-packages group with 1 update: [certifi](https://github.com/certifi/python-certifi).
|
||||
|
||||
|
||||
Updates `certifi` from 2025.10.5 to 2025.11.12
|
||||
- [Commits](https://github.com/certifi/python-certifi/compare/2025.10.05...2025.11.12)
|
||||
|
||||
---
|
||||
updated-dependencies:
|
||||
- dependency-name: certifi
|
||||
dependency-version: 2025.11.12
|
||||
dependency-type: direct:production
|
||||
update-type: version-update:semver-minor
|
||||
dependency-group: python-packages
|
||||
...
|
||||
- Test: Add pytest infrastructure and attachment tests. [Rodos]
|
||||
|
||||
In making my last fix to attachments, I found it challenging not
|
||||
having tests to ensure there was no regression.
|
||||
|
||||
Added pytest with minimal setup and isolated configuration. Created
|
||||
a separate test workflow to keep tests isolated from linting.
|
||||
|
||||
Tests cover the key elements of the attachment logic:
|
||||
- URL extraction from issue bodies
|
||||
- Filename extraction from different URL types
|
||||
- Filename collision resolution
|
||||
- Manifest duplicate prevention
|
||||
- Chore(deps): bump black in the python-packages group.
|
||||
[dependabot[bot]]
|
||||
|
||||
Bumps the python-packages group with 1 update: [black](https://github.com/psf/black).
|
||||
|
||||
|
||||
Updates `black` from 25.9.0 to 25.11.0
|
||||
- [Release notes](https://github.com/psf/black/releases)
|
||||
- [Changelog](https://github.com/psf/black/blob/main/CHANGES.md)
|
||||
- [Commits](https://github.com/psf/black/compare/25.9.0...25.11.0)
|
||||
|
||||
---
|
||||
updated-dependencies:
|
||||
- dependency-name: black
|
||||
dependency-version: 25.11.0
|
||||
dependency-type: direct:production
|
||||
update-type: version-update:semver-minor
|
||||
dependency-group: python-packages
|
||||
...
|
||||
- Chore(deps): bump docutils in the python-packages group.
|
||||
[dependabot[bot]]
|
||||
|
||||
Bumps the python-packages group with 1 update: [docutils](https://github.com/rtfd/recommonmark).
|
||||
|
||||
|
||||
Updates `docutils` from 0.22.2 to 0.22.3
|
||||
- [Changelog](https://github.com/readthedocs/recommonmark/blob/master/CHANGELOG.md)
|
||||
- [Commits](https://github.com/rtfd/recommonmark/commits)
|
||||
|
||||
---
|
||||
updated-dependencies:
|
||||
- dependency-name: docutils
|
||||
dependency-version: 0.22.3
|
||||
dependency-type: direct:production
|
||||
update-type: version-update:semver-patch
|
||||
dependency-group: python-packages
|
||||
...
|
||||
|
||||
|
||||
0.51.0 (2025-11-06)
|
||||
-------------------
|
||||
|
||||
Fix
|
||||
~~~
|
||||
|
||||
@@ -1 +1 @@
|
||||
__version__ = "0.51.0"
|
||||
__version__ = "0.51.3"
|
||||
|
||||
@@ -37,22 +37,33 @@ FNULL = open(os.devnull, "w")
|
||||
FILE_URI_PREFIX = "file://"
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Setup SSL context with fallback chain
|
||||
https_ctx = ssl.create_default_context()
|
||||
if not https_ctx.get_ca_certs():
|
||||
import warnings
|
||||
if https_ctx.get_ca_certs():
|
||||
# Layer 1: Certificates pre-loaded from system (file-based)
|
||||
pass
|
||||
else:
|
||||
paths = ssl.get_default_verify_paths()
|
||||
if (paths.cafile and os.path.exists(paths.cafile)) or (
|
||||
paths.capath and os.path.exists(paths.capath)
|
||||
):
|
||||
# Layer 2: Cert paths exist, will be lazy-loaded on first use (directory-based)
|
||||
pass
|
||||
else:
|
||||
# Layer 3: Try certifi package as optional fallback
|
||||
try:
|
||||
import certifi
|
||||
|
||||
warnings.warn(
|
||||
"\n\nYOUR DEFAULT CA CERTS ARE EMPTY.\n"
|
||||
+ "PLEASE POPULATE ANY OF:"
|
||||
+ "".join(
|
||||
["\n - " + x for x in ssl.get_default_verify_paths() if type(x) is str]
|
||||
)
|
||||
+ "\n",
|
||||
stacklevel=2,
|
||||
)
|
||||
import certifi
|
||||
|
||||
https_ctx = ssl.create_default_context(cafile=certifi.where())
|
||||
https_ctx = ssl.create_default_context(cafile=certifi.where())
|
||||
except ImportError:
|
||||
# All layers failed - no certificates available anywhere
|
||||
sys.exit(
|
||||
"\nERROR: No CA certificates found. Cannot connect to GitHub over SSL.\n\n"
|
||||
"Solutions you can explore:\n"
|
||||
" 1. pip install certifi\n"
|
||||
" 2. Alpine: apk add ca-certificates\n"
|
||||
" 3. Debian/Ubuntu: apt-get install ca-certificates\n\n"
|
||||
)
|
||||
|
||||
|
||||
def logging_subprocess(
|
||||
@@ -581,27 +592,26 @@ def retrieve_data_gen(args, template, query_args=None, single_request=False):
|
||||
auth = get_auth(args, encode=not args.as_app)
|
||||
query_args = get_query_args(query_args)
|
||||
per_page = 100
|
||||
page = 0
|
||||
next_url = None
|
||||
|
||||
while True:
|
||||
if single_request:
|
||||
request_page, request_per_page = None, None
|
||||
request_per_page = None
|
||||
else:
|
||||
page = page + 1
|
||||
request_page, request_per_page = page, per_page
|
||||
request_per_page = per_page
|
||||
|
||||
request = _construct_request(
|
||||
request_per_page,
|
||||
request_page,
|
||||
query_args,
|
||||
template,
|
||||
next_url or template,
|
||||
auth,
|
||||
as_app=args.as_app,
|
||||
fine=True if args.token_fine is not None else False,
|
||||
) # noqa
|
||||
r, errors = _get_response(request, auth, template)
|
||||
r, errors = _get_response(request, auth, next_url or template)
|
||||
|
||||
status_code = int(r.getcode())
|
||||
|
||||
# Check if we got correct data
|
||||
try:
|
||||
response = json.loads(r.read().decode("utf-8"))
|
||||
@@ -633,15 +643,14 @@ def retrieve_data_gen(args, template, query_args=None, single_request=False):
|
||||
retries += 1
|
||||
time.sleep(5)
|
||||
request = _construct_request(
|
||||
per_page,
|
||||
page,
|
||||
request_per_page,
|
||||
query_args,
|
||||
template,
|
||||
next_url or template,
|
||||
auth,
|
||||
as_app=args.as_app,
|
||||
fine=True if args.token_fine is not None else False,
|
||||
) # noqa
|
||||
r, errors = _get_response(request, auth, template)
|
||||
r, errors = _get_response(request, auth, next_url or template)
|
||||
|
||||
status_code = int(r.getcode())
|
||||
try:
|
||||
@@ -671,7 +680,16 @@ def retrieve_data_gen(args, template, query_args=None, single_request=False):
|
||||
if type(response) is list:
|
||||
for resp in response:
|
||||
yield resp
|
||||
if len(response) < per_page:
|
||||
# Parse Link header for next page URL (cursor-based pagination)
|
||||
link_header = r.headers.get("Link", "")
|
||||
next_url = None
|
||||
if link_header:
|
||||
# Parse Link header: <https://api.github.com/...?per_page=100&after=cursor>; rel="next"
|
||||
for link in link_header.split(","):
|
||||
if 'rel="next"' in link:
|
||||
next_url = link[link.find("<") + 1:link.find(">")]
|
||||
break
|
||||
if not next_url:
|
||||
break
|
||||
elif type(response) is dict and single_request:
|
||||
yield response
|
||||
@@ -724,22 +742,27 @@ def _get_response(request, auth, template):
|
||||
|
||||
|
||||
def _construct_request(
|
||||
per_page, page, query_args, template, auth, as_app=None, fine=False
|
||||
per_page, query_args, template, auth, as_app=None, fine=False
|
||||
):
|
||||
all_query_args = {}
|
||||
if per_page:
|
||||
all_query_args["per_page"] = per_page
|
||||
if page:
|
||||
all_query_args["page"] = page
|
||||
if query_args:
|
||||
all_query_args.update(query_args)
|
||||
|
||||
request_url = template
|
||||
if all_query_args:
|
||||
querystring = urlencode(all_query_args)
|
||||
request_url = template + "?" + querystring
|
||||
# If template is already a full URL with query params (from Link header), use it directly
|
||||
if "?" in template and template.startswith("http"):
|
||||
request_url = template
|
||||
# Extract query string for logging
|
||||
querystring = template.split("?", 1)[1]
|
||||
else:
|
||||
querystring = ""
|
||||
# Build URL with query parameters
|
||||
all_query_args = {}
|
||||
if per_page:
|
||||
all_query_args["per_page"] = per_page
|
||||
if query_args:
|
||||
all_query_args.update(query_args)
|
||||
|
||||
request_url = template
|
||||
if all_query_args:
|
||||
querystring = urlencode(all_query_args)
|
||||
request_url = template + "?" + querystring
|
||||
else:
|
||||
querystring = ""
|
||||
|
||||
request = Request(request_url)
|
||||
if auth is not None:
|
||||
@@ -755,7 +778,7 @@ def _construct_request(
|
||||
"Accept", "application/vnd.github.machine-man-preview+json"
|
||||
)
|
||||
|
||||
log_url = template
|
||||
log_url = template if "?" not in template else template.split("?")[0]
|
||||
if querystring:
|
||||
log_url += "?" + querystring
|
||||
logger.info("Requesting {}".format(log_url))
|
||||
@@ -832,8 +855,7 @@ def download_file(url, path, auth, as_app=False, fine=False):
|
||||
return
|
||||
|
||||
request = _construct_request(
|
||||
per_page=100,
|
||||
page=1,
|
||||
per_page=None,
|
||||
query_args={},
|
||||
template=url,
|
||||
auth=auth,
|
||||
@@ -919,12 +941,6 @@ def download_attachment_file(url, path, auth, as_app=False, fine=False):
|
||||
"error": None,
|
||||
}
|
||||
|
||||
if os.path.exists(path):
|
||||
metadata["success"] = True
|
||||
metadata["http_status"] = 200 # Assume success if already exists
|
||||
metadata["size_bytes"] = os.path.getsize(path)
|
||||
return metadata
|
||||
|
||||
# Create simple request (no API query params)
|
||||
request = Request(url)
|
||||
request.add_header("Accept", "application/octet-stream")
|
||||
@@ -1337,10 +1353,10 @@ def download_attachments(
|
||||
attachments_dir = os.path.join(item_cwd, "attachments", str(number))
|
||||
manifest_path = os.path.join(attachments_dir, "manifest.json")
|
||||
|
||||
# Load existing manifest if skip_existing is enabled
|
||||
# Load existing manifest to prevent duplicate downloads
|
||||
existing_urls = set()
|
||||
existing_metadata = []
|
||||
if args.skip_existing and os.path.exists(manifest_path):
|
||||
if os.path.exists(manifest_path):
|
||||
try:
|
||||
with open(manifest_path, "r") as f:
|
||||
existing_manifest = json.load(f)
|
||||
@@ -1395,9 +1411,6 @@ def download_attachments(
|
||||
filename = get_attachment_filename(url)
|
||||
filepath = os.path.join(attachments_dir, filename)
|
||||
|
||||
# Check for collision BEFORE downloading
|
||||
filepath = resolve_filename_collision(filepath)
|
||||
|
||||
# Download and get metadata
|
||||
metadata = download_attachment_file(
|
||||
url,
|
||||
|
||||
6
pytest.ini
Normal file
6
pytest.ini
Normal file
@@ -0,0 +1,6 @@
|
||||
[pytest]
|
||||
testpaths = tests
|
||||
python_files = test_*.py
|
||||
python_classes = Test*
|
||||
python_functions = test_*
|
||||
addopts = -v
|
||||
@@ -1,13 +1,14 @@
|
||||
autopep8==2.3.2
|
||||
black==25.9.0
|
||||
black==25.11.0
|
||||
bleach==6.3.0
|
||||
certifi==2025.10.5
|
||||
certifi==2025.11.12
|
||||
charset-normalizer==3.4.4
|
||||
click==8.3.0
|
||||
colorama==0.4.6
|
||||
docutils==0.22.2
|
||||
docutils==0.22.3
|
||||
flake8==7.3.0
|
||||
gitchangelog==3.0.4
|
||||
pytest==8.3.3
|
||||
idna==3.11
|
||||
importlib-metadata==8.7.0
|
||||
jaraco.classes==3.4.0
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
|
||||
|
||||
1
tests/__init__.py
Normal file
1
tests/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Tests for python-github-backup."""
|
||||
353
tests/test_attachments.py
Normal file
353
tests/test_attachments.py
Normal file
@@ -0,0 +1,353 @@
|
||||
"""Behavioral tests for attachment functionality."""
|
||||
|
||||
import json
|
||||
import os
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from unittest.mock import Mock
|
||||
|
||||
import pytest
|
||||
|
||||
from github_backup import github_backup
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def attachment_test_setup(tmp_path):
|
||||
"""Fixture providing setup and helper for attachment download tests."""
|
||||
from unittest.mock import patch
|
||||
|
||||
issue_cwd = tmp_path / "issues"
|
||||
issue_cwd.mkdir()
|
||||
|
||||
# Mock args
|
||||
args = Mock()
|
||||
args.as_app = False
|
||||
args.token_fine = None
|
||||
args.token_classic = None
|
||||
args.username = None
|
||||
args.password = None
|
||||
args.osx_keychain_item_name = None
|
||||
args.osx_keychain_item_account = None
|
||||
args.user = "testuser"
|
||||
args.repository = "testrepo"
|
||||
|
||||
repository = {"full_name": "testuser/testrepo"}
|
||||
|
||||
def call_download(issue_data, issue_number=123):
|
||||
"""Call download_attachments with mocked HTTP downloads.
|
||||
|
||||
Returns list of URLs that were actually downloaded.
|
||||
"""
|
||||
downloaded_urls = []
|
||||
|
||||
def mock_download(url, path, auth, as_app, fine):
|
||||
downloaded_urls.append(url)
|
||||
return {
|
||||
"success": True,
|
||||
"saved_as": os.path.basename(path),
|
||||
"url": url,
|
||||
}
|
||||
|
||||
with patch(
|
||||
"github_backup.github_backup.download_attachment_file",
|
||||
side_effect=mock_download,
|
||||
):
|
||||
github_backup.download_attachments(
|
||||
args, str(issue_cwd), issue_data, issue_number, repository
|
||||
)
|
||||
|
||||
return downloaded_urls
|
||||
|
||||
return {
|
||||
"issue_cwd": str(issue_cwd),
|
||||
"args": args,
|
||||
"repository": repository,
|
||||
"call_download": call_download,
|
||||
}
|
||||
|
||||
|
||||
class TestURLExtraction:
|
||||
"""Test URL extraction with realistic issue content."""
|
||||
|
||||
def test_mixed_urls(self):
|
||||
issue_data = {
|
||||
"body": """
|
||||
## Bug Report
|
||||
|
||||
When uploading files, I see this error. Here's a screenshot:
|
||||
https://github.com/user-attachments/assets/abc123def456
|
||||
|
||||
The logs show: https://github.com/user-attachments/files/789/error-log.txt
|
||||
|
||||
This is similar to https://github.com/someorg/somerepo/issues/42 but different.
|
||||
|
||||
You can also see the video at https://user-images.githubusercontent.com/12345/video-demo.mov
|
||||
|
||||
Here's how to reproduce:
|
||||
```bash
|
||||
# Don't extract this example URL:
|
||||
curl https://github.com/user-attachments/assets/example999
|
||||
```
|
||||
|
||||
More info at https://docs.example.com/guide
|
||||
|
||||
Also see this inline code `https://github.com/user-attachments/files/111/inline.pdf` should not extract.
|
||||
|
||||
Final attachment: https://github.com/user-attachments/files/222/report.pdf.
|
||||
""",
|
||||
"comment_data": [
|
||||
{
|
||||
"body": "Here's another attachment: https://private-user-images.githubusercontent.com/98765/secret.png?jwt=token123"
|
||||
},
|
||||
{
|
||||
"body": """
|
||||
Example code:
|
||||
```python
|
||||
url = "https://github.com/user-attachments/assets/code-example"
|
||||
```
|
||||
But this is real: https://github.com/user-attachments/files/333/actual.zip
|
||||
"""
|
||||
},
|
||||
],
|
||||
}
|
||||
|
||||
# Extract URLs
|
||||
urls = github_backup.extract_attachment_urls(issue_data)
|
||||
|
||||
expected_urls = [
|
||||
"https://github.com/user-attachments/assets/abc123def456",
|
||||
"https://github.com/user-attachments/files/789/error-log.txt",
|
||||
"https://user-images.githubusercontent.com/12345/video-demo.mov",
|
||||
"https://github.com/user-attachments/files/222/report.pdf",
|
||||
"https://private-user-images.githubusercontent.com/98765/secret.png?jwt=token123",
|
||||
"https://github.com/user-attachments/files/333/actual.zip",
|
||||
]
|
||||
|
||||
assert set(urls) == set(expected_urls)
|
||||
|
||||
def test_trailing_punctuation_stripped(self):
|
||||
"""URLs with trailing punctuation should have punctuation stripped."""
|
||||
issue_data = {
|
||||
"body": """
|
||||
See this file: https://github.com/user-attachments/files/1/doc.pdf.
|
||||
And this one (https://github.com/user-attachments/files/2/image.png).
|
||||
Check it out! https://github.com/user-attachments/files/3/data.csv!
|
||||
"""
|
||||
}
|
||||
|
||||
urls = github_backup.extract_attachment_urls(issue_data)
|
||||
|
||||
expected = [
|
||||
"https://github.com/user-attachments/files/1/doc.pdf",
|
||||
"https://github.com/user-attachments/files/2/image.png",
|
||||
"https://github.com/user-attachments/files/3/data.csv",
|
||||
]
|
||||
assert set(urls) == set(expected)
|
||||
|
||||
def test_deduplication_across_body_and_comments(self):
|
||||
"""Same URL in body and comments should only appear once."""
|
||||
duplicate_url = "https://github.com/user-attachments/assets/abc123"
|
||||
|
||||
issue_data = {
|
||||
"body": f"First mention: {duplicate_url}",
|
||||
"comment_data": [
|
||||
{"body": f"Second mention: {duplicate_url}"},
|
||||
{"body": f"Third mention: {duplicate_url}"},
|
||||
],
|
||||
}
|
||||
|
||||
urls = github_backup.extract_attachment_urls(issue_data)
|
||||
|
||||
assert set(urls) == {duplicate_url}
|
||||
|
||||
|
||||
class TestFilenameExtraction:
|
||||
"""Test filename extraction from different URL types."""
|
||||
|
||||
def test_modern_assets_url(self):
|
||||
"""Modern assets URL returns UUID."""
|
||||
url = "https://github.com/user-attachments/assets/abc123def456"
|
||||
filename = github_backup.get_attachment_filename(url)
|
||||
assert filename == "abc123def456"
|
||||
|
||||
def test_modern_files_url(self):
|
||||
"""Modern files URL returns filename."""
|
||||
url = "https://github.com/user-attachments/files/12345/report.pdf"
|
||||
filename = github_backup.get_attachment_filename(url)
|
||||
assert filename == "report.pdf"
|
||||
|
||||
def test_legacy_cdn_url(self):
|
||||
"""Legacy CDN URL returns filename with extension."""
|
||||
url = "https://user-images.githubusercontent.com/123456/abc-def.png"
|
||||
filename = github_backup.get_attachment_filename(url)
|
||||
assert filename == "abc-def.png"
|
||||
|
||||
def test_private_cdn_url(self):
|
||||
"""Private CDN URL returns filename."""
|
||||
url = "https://private-user-images.githubusercontent.com/98765/secret.png?jwt=token123"
|
||||
filename = github_backup.get_attachment_filename(url)
|
||||
assert filename == "secret.png"
|
||||
|
||||
def test_repo_files_url(self):
|
||||
"""Repo-scoped files URL returns filename."""
|
||||
url = "https://github.com/owner/repo/files/789/document.txt"
|
||||
filename = github_backup.get_attachment_filename(url)
|
||||
assert filename == "document.txt"
|
||||
|
||||
|
||||
class TestFilenameCollision:
|
||||
"""Test filename collision resolution."""
|
||||
|
||||
def test_collision_behavior(self):
|
||||
"""Test filename collision resolution with real files."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
# No collision - file doesn't exist
|
||||
result = github_backup.resolve_filename_collision(
|
||||
os.path.join(tmpdir, "report.pdf")
|
||||
)
|
||||
assert result == os.path.join(tmpdir, "report.pdf")
|
||||
|
||||
# Create the file, now collision exists
|
||||
Path(os.path.join(tmpdir, "report.pdf")).touch()
|
||||
result = github_backup.resolve_filename_collision(
|
||||
os.path.join(tmpdir, "report.pdf")
|
||||
)
|
||||
assert result == os.path.join(tmpdir, "report_1.pdf")
|
||||
|
||||
# Create report_1.pdf too
|
||||
Path(os.path.join(tmpdir, "report_1.pdf")).touch()
|
||||
result = github_backup.resolve_filename_collision(
|
||||
os.path.join(tmpdir, "report.pdf")
|
||||
)
|
||||
assert result == os.path.join(tmpdir, "report_2.pdf")
|
||||
|
||||
def test_manifest_reserved(self):
|
||||
"""manifest.json is always treated as reserved."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
# Even if manifest.json doesn't exist, should get manifest_1.json
|
||||
result = github_backup.resolve_filename_collision(
|
||||
os.path.join(tmpdir, "manifest.json")
|
||||
)
|
||||
assert result == os.path.join(tmpdir, "manifest_1.json")
|
||||
|
||||
|
||||
class TestManifestDuplicatePrevention:
|
||||
"""Test that manifest prevents duplicate downloads (the bug fix)."""
|
||||
|
||||
def test_manifest_filters_existing_urls(self, attachment_test_setup):
|
||||
"""URLs in manifest are not re-downloaded."""
|
||||
setup = attachment_test_setup
|
||||
|
||||
# Create manifest with existing URLs
|
||||
attachments_dir = os.path.join(setup["issue_cwd"], "attachments", "123")
|
||||
os.makedirs(attachments_dir)
|
||||
manifest_path = os.path.join(attachments_dir, "manifest.json")
|
||||
|
||||
manifest = {
|
||||
"attachments": [
|
||||
{
|
||||
"url": "https://github.com/user-attachments/assets/old1",
|
||||
"success": True,
|
||||
"saved_as": "old1.pdf",
|
||||
},
|
||||
{
|
||||
"url": "https://github.com/user-attachments/assets/old2",
|
||||
"success": True,
|
||||
"saved_as": "old2.pdf",
|
||||
},
|
||||
]
|
||||
}
|
||||
with open(manifest_path, "w") as f:
|
||||
json.dump(manifest, f)
|
||||
|
||||
# Issue data with 2 old URLs and 1 new URL
|
||||
issue_data = {
|
||||
"body": """
|
||||
Old: https://github.com/user-attachments/assets/old1
|
||||
Old: https://github.com/user-attachments/assets/old2
|
||||
New: https://github.com/user-attachments/assets/new1
|
||||
"""
|
||||
}
|
||||
|
||||
downloaded_urls = setup["call_download"](issue_data)
|
||||
|
||||
# Should only download the NEW URL (old ones filtered by manifest)
|
||||
assert len(downloaded_urls) == 1
|
||||
assert downloaded_urls[0] == "https://github.com/user-attachments/assets/new1"
|
||||
|
||||
def test_no_manifest_downloads_all(self, attachment_test_setup):
|
||||
"""Without manifest, all URLs should be downloaded."""
|
||||
setup = attachment_test_setup
|
||||
|
||||
# Issue data with 2 URLs
|
||||
issue_data = {
|
||||
"body": """
|
||||
https://github.com/user-attachments/assets/url1
|
||||
https://github.com/user-attachments/assets/url2
|
||||
"""
|
||||
}
|
||||
|
||||
downloaded_urls = setup["call_download"](issue_data)
|
||||
|
||||
# Should download ALL URLs (no manifest to filter)
|
||||
assert len(downloaded_urls) == 2
|
||||
assert set(downloaded_urls) == {
|
||||
"https://github.com/user-attachments/assets/url1",
|
||||
"https://github.com/user-attachments/assets/url2",
|
||||
}
|
||||
|
||||
def test_manifest_skips_permanent_failures(self, attachment_test_setup):
|
||||
"""Manifest skips permanent failures (404, 410) but retries transient (503)."""
|
||||
setup = attachment_test_setup
|
||||
|
||||
# Create manifest with different failure types
|
||||
attachments_dir = os.path.join(setup["issue_cwd"], "attachments", "123")
|
||||
os.makedirs(attachments_dir)
|
||||
manifest_path = os.path.join(attachments_dir, "manifest.json")
|
||||
|
||||
manifest = {
|
||||
"attachments": [
|
||||
{
|
||||
"url": "https://github.com/user-attachments/assets/success",
|
||||
"success": True,
|
||||
"saved_as": "success.pdf",
|
||||
},
|
||||
{
|
||||
"url": "https://github.com/user-attachments/assets/notfound",
|
||||
"success": False,
|
||||
"http_status": 404,
|
||||
},
|
||||
{
|
||||
"url": "https://github.com/user-attachments/assets/gone",
|
||||
"success": False,
|
||||
"http_status": 410,
|
||||
},
|
||||
{
|
||||
"url": "https://github.com/user-attachments/assets/unavailable",
|
||||
"success": False,
|
||||
"http_status": 503,
|
||||
},
|
||||
]
|
||||
}
|
||||
with open(manifest_path, "w") as f:
|
||||
json.dump(manifest, f)
|
||||
|
||||
# Issue data has all 4 URLs
|
||||
issue_data = {
|
||||
"body": """
|
||||
https://github.com/user-attachments/assets/success
|
||||
https://github.com/user-attachments/assets/notfound
|
||||
https://github.com/user-attachments/assets/gone
|
||||
https://github.com/user-attachments/assets/unavailable
|
||||
"""
|
||||
}
|
||||
|
||||
downloaded_urls = setup["call_download"](issue_data)
|
||||
|
||||
# Should only retry 503 (transient failure)
|
||||
# Success, 404, and 410 should be skipped
|
||||
assert len(downloaded_urls) == 1
|
||||
assert (
|
||||
downloaded_urls[0]
|
||||
== "https://github.com/user-attachments/assets/unavailable"
|
||||
)
|
||||
153
tests/test_pagination.py
Normal file
153
tests/test_pagination.py
Normal file
@@ -0,0 +1,153 @@
|
||||
"""Tests for Link header pagination handling."""
|
||||
|
||||
import json
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from github_backup import github_backup
|
||||
|
||||
|
||||
class MockHTTPResponse:
|
||||
"""Mock HTTP response for paginated API calls."""
|
||||
|
||||
def __init__(self, data, link_header=None):
|
||||
self._content = json.dumps(data).encode("utf-8")
|
||||
self._link_header = link_header
|
||||
self._read = False
|
||||
self.reason = "OK"
|
||||
|
||||
def getcode(self):
|
||||
return 200
|
||||
|
||||
def read(self):
|
||||
if self._read:
|
||||
return b""
|
||||
self._read = True
|
||||
return self._content
|
||||
|
||||
def get_header(self, name, default=None):
|
||||
"""Mock method for headers.get()."""
|
||||
return self.headers.get(name, default)
|
||||
|
||||
@property
|
||||
def headers(self):
|
||||
headers = {"x-ratelimit-remaining": "5000"}
|
||||
if self._link_header:
|
||||
headers["Link"] = self._link_header
|
||||
return headers
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_args():
|
||||
"""Mock args for retrieve_data_gen."""
|
||||
args = Mock()
|
||||
args.as_app = False
|
||||
args.token_fine = None
|
||||
args.token_classic = "fake_token"
|
||||
args.username = None
|
||||
args.password = None
|
||||
args.osx_keychain_item_name = None
|
||||
args.osx_keychain_item_account = None
|
||||
args.throttle_limit = None
|
||||
args.throttle_pause = 0
|
||||
return args
|
||||
|
||||
|
||||
def test_cursor_based_pagination(mock_args):
|
||||
"""Link header with 'after' cursor parameter works correctly."""
|
||||
|
||||
# Simulate issues endpoint behavior: returns cursor in Link header
|
||||
responses = [
|
||||
# Issues endpoint returns 'after' cursor parameter (not 'page')
|
||||
MockHTTPResponse(
|
||||
data=[{"issue": i} for i in range(1, 101)], # Page 1 contents
|
||||
link_header='<https://api.github.com/repos/owner/repo/issues?per_page=100&after=ABC123&page=2>; rel="next"',
|
||||
),
|
||||
MockHTTPResponse(
|
||||
data=[{"issue": i} for i in range(101, 151)], # Page 2 contents
|
||||
link_header=None, # No Link header - signals end of pagination
|
||||
),
|
||||
]
|
||||
requests_made = []
|
||||
|
||||
def mock_urlopen(request, *args, **kwargs):
|
||||
url = request.get_full_url()
|
||||
requests_made.append(url)
|
||||
return responses[len(requests_made) - 1]
|
||||
|
||||
with patch("github_backup.github_backup.urlopen", side_effect=mock_urlopen):
|
||||
results = list(
|
||||
github_backup.retrieve_data_gen(
|
||||
mock_args, "https://api.github.com/repos/owner/repo/issues"
|
||||
)
|
||||
)
|
||||
|
||||
# Verify all items retrieved and cursor was used in second request
|
||||
assert len(results) == 150
|
||||
assert len(requests_made) == 2
|
||||
assert "after=ABC123" in requests_made[1]
|
||||
|
||||
|
||||
def test_page_based_pagination(mock_args):
|
||||
"""Link header with 'page' parameter works correctly."""
|
||||
|
||||
# Simulate pulls/repos endpoint behavior: returns page numbers in Link header
|
||||
responses = [
|
||||
# Pulls endpoint uses traditional 'page' parameter (not cursor)
|
||||
MockHTTPResponse(
|
||||
data=[{"pull": i} for i in range(1, 101)], # Page 1 contents
|
||||
link_header='<https://api.github.com/repos/owner/repo/pulls?per_page=100&page=2>; rel="next"',
|
||||
),
|
||||
MockHTTPResponse(
|
||||
data=[{"pull": i} for i in range(101, 181)], # Page 2 contents
|
||||
link_header=None, # No Link header - signals end of pagination
|
||||
),
|
||||
]
|
||||
requests_made = []
|
||||
|
||||
def mock_urlopen(request, *args, **kwargs):
|
||||
url = request.get_full_url()
|
||||
requests_made.append(url)
|
||||
return responses[len(requests_made) - 1]
|
||||
|
||||
with patch("github_backup.github_backup.urlopen", side_effect=mock_urlopen):
|
||||
results = list(
|
||||
github_backup.retrieve_data_gen(
|
||||
mock_args, "https://api.github.com/repos/owner/repo/pulls"
|
||||
)
|
||||
)
|
||||
|
||||
# Verify all items retrieved and page parameter was used (not cursor)
|
||||
assert len(results) == 180
|
||||
assert len(requests_made) == 2
|
||||
assert "page=2" in requests_made[1]
|
||||
assert "after" not in requests_made[1]
|
||||
|
||||
|
||||
def test_no_link_header_stops_pagination(mock_args):
|
||||
"""Pagination stops when Link header is absent."""
|
||||
|
||||
# Simulate endpoint with results that fit in a single page
|
||||
responses = [
|
||||
MockHTTPResponse(
|
||||
data=[{"label": i} for i in range(1, 51)], # Page contents
|
||||
link_header=None, # No Link header - signals end of pagination
|
||||
)
|
||||
]
|
||||
requests_made = []
|
||||
|
||||
def mock_urlopen(request, *args, **kwargs):
|
||||
requests_made.append(request.get_full_url())
|
||||
return responses[len(requests_made) - 1]
|
||||
|
||||
with patch("github_backup.github_backup.urlopen", side_effect=mock_urlopen):
|
||||
results = list(
|
||||
github_backup.retrieve_data_gen(
|
||||
mock_args, "https://api.github.com/repos/owner/repo/labels"
|
||||
)
|
||||
)
|
||||
|
||||
# Verify pagination stopped after first request
|
||||
assert len(results) == 50
|
||||
assert len(requests_made) == 1
|
||||
Reference in New Issue
Block a user