mirror of
https://github.com/andrewthetechie/py-healthchecks.io.git
synced 2025-12-21 17:01:20 +01:00
Revert "chore: ci fixups and poetry update"
This reverts commit d0b986025e.
This commit is contained in:
1
.github/dependabot.yml
vendored
1
.github/dependabot.yml
vendored
@@ -24,3 +24,4 @@ updates:
|
|||||||
interval: daily
|
interval: daily
|
||||||
commit-message:
|
commit-message:
|
||||||
prefix: deps
|
prefix: deps
|
||||||
|
|
||||||
|
|||||||
10
.github/workflows/constraints.txt
vendored
10
.github/workflows/constraints.txt
vendored
@@ -1,6 +1,6 @@
|
|||||||
pip==24.0.0
|
pip==23.1.2
|
||||||
nox==2024.4.15
|
nox==2023.4.22
|
||||||
nox-poetry==1.0.3
|
nox-poetry==1.0.2
|
||||||
poetry==1.8.2
|
poetry==1.5.1
|
||||||
virtualenv==20.26.1
|
virtualenv==20.23.1
|
||||||
toml==0.10.2
|
toml==0.10.2
|
||||||
|
|||||||
@@ -1,35 +1,12 @@
|
|||||||
exclude: ".*tests\/fixtures.*"
|
|
||||||
repos:
|
repos:
|
||||||
- repo: https://github.com/pre-commit/pre-commit-hooks
|
|
||||||
rev: v4.6.0
|
|
||||||
hooks:
|
|
||||||
- id: check-yaml
|
|
||||||
- id: debug-statements
|
|
||||||
- id: end-of-file-fixer
|
|
||||||
- id: trailing-whitespace
|
|
||||||
- repo: https://github.com/astral-sh/ruff-pre-commit
|
|
||||||
# Ruff version.
|
|
||||||
rev: v0.4.3
|
|
||||||
hooks:
|
|
||||||
# Run the linter.
|
|
||||||
- id: ruff
|
|
||||||
args: [ --fix ]
|
|
||||||
# Run the formatter.
|
|
||||||
- id: ruff-format
|
|
||||||
- repo: https://github.com/rhysd/actionlint
|
|
||||||
rev: v1.6.27
|
|
||||||
hooks:
|
|
||||||
- id: actionlint-docker
|
|
||||||
name: Actionlint
|
|
||||||
- repo: local
|
- repo: local
|
||||||
hooks:
|
hooks:
|
||||||
- id: bandit
|
- id: black
|
||||||
name: bandit
|
name: black
|
||||||
entry: bandit
|
entry: black
|
||||||
language: system
|
language: system
|
||||||
types: [python]
|
types: [python]
|
||||||
require_serial: true
|
require_serial: true
|
||||||
args: ["-c", "pyproject.toml"]
|
|
||||||
- id: check-added-large-files
|
- id: check-added-large-files
|
||||||
name: Check for added large files
|
name: Check for added large files
|
||||||
entry: check-added-large-files
|
entry: check-added-large-files
|
||||||
@@ -50,16 +27,38 @@ repos:
|
|||||||
language: system
|
language: system
|
||||||
types: [text]
|
types: [text]
|
||||||
stages: [commit, push, manual]
|
stages: [commit, push, manual]
|
||||||
|
- id: flake8
|
||||||
|
name: flake8
|
||||||
|
entry: flake8
|
||||||
|
language: system
|
||||||
|
types: [python]
|
||||||
|
exclude: "^(test/*|examples/*|noxfile.py)"
|
||||||
|
require_serial: true
|
||||||
|
args: ["--config=.flake8"]
|
||||||
- id: pyupgrade
|
- id: pyupgrade
|
||||||
name: pyupgrade
|
name: pyupgrade
|
||||||
description: Automatically upgrade syntax for newer versions.
|
description: Automatically upgrade syntax for newer versions.
|
||||||
entry: pyupgrade
|
entry: pyupgrade
|
||||||
language: system
|
language: system
|
||||||
types: [python]
|
types: [python]
|
||||||
args: [--py311-plus]
|
args: [--py37-plus]
|
||||||
|
- id: reorder-python-imports
|
||||||
|
name: Reorder python imports
|
||||||
|
entry: reorder-python-imports
|
||||||
|
language: system
|
||||||
|
types: [python]
|
||||||
|
args: [--application-directories=src]
|
||||||
- id: trailing-whitespace
|
- id: trailing-whitespace
|
||||||
name: Trim Trailing Whitespace
|
name: Trim Trailing Whitespace
|
||||||
entry: trailing-whitespace-fixer
|
entry: trailing-whitespace-fixer
|
||||||
language: system
|
language: system
|
||||||
types: [text]
|
types: [text]
|
||||||
stages: [commit, push, manual]
|
stages: [commit, push, manual]
|
||||||
|
- repo: https://github.com/pre-commit/mirrors-prettier
|
||||||
|
rev: v2.7.1
|
||||||
|
hooks:
|
||||||
|
- id: prettier
|
||||||
|
- repo: https://github.com/rhysd/actionlint
|
||||||
|
rev: v1.6.15
|
||||||
|
hooks:
|
||||||
|
- id: actionlint-docker
|
||||||
|
|||||||
@@ -1,5 +1,4 @@
|
|||||||
"""Sphinx configuration."""
|
"""Sphinx configuration."""
|
||||||
|
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -1,5 +1,4 @@
|
|||||||
"""Nox sessions."""
|
"""Nox sessions."""
|
||||||
|
|
||||||
import os
|
import os
|
||||||
import shlex
|
import shlex
|
||||||
import shutil
|
import shutil
|
||||||
@@ -24,7 +23,7 @@ except ImportError:
|
|||||||
|
|
||||||
|
|
||||||
package = "healthchecks_io"
|
package = "healthchecks_io"
|
||||||
python_versions = ["3.10", "3.11", "3.9", "3.8"]
|
python_versions = ["3.10", "3.11", "3.9", "3.8", "3.7"]
|
||||||
nox.needs_version = ">= 2021.6.6"
|
nox.needs_version = ">= 2021.6.6"
|
||||||
nox.options.sessions = (
|
nox.options.sessions = (
|
||||||
"pre-commit",
|
"pre-commit",
|
||||||
@@ -32,6 +31,7 @@ nox.options.sessions = (
|
|||||||
"safety",
|
"safety",
|
||||||
"mypy",
|
"mypy",
|
||||||
"tests",
|
"tests",
|
||||||
|
# "typeguard",
|
||||||
"xdoctest",
|
"xdoctest",
|
||||||
"docs-build",
|
"docs-build",
|
||||||
)
|
)
|
||||||
@@ -59,7 +59,7 @@ def activate_virtualenv_in_precommit_hooks(session: Session) -> None:
|
|||||||
Args:
|
Args:
|
||||||
session: The Session object.
|
session: The Session object.
|
||||||
"""
|
"""
|
||||||
assert session.bin is not None # nosec: B101
|
assert session.bin is not None # noqa: S101
|
||||||
|
|
||||||
# Only patch hooks containing a reference to this session's bindir. Support
|
# Only patch hooks containing a reference to this session's bindir. Support
|
||||||
# quoting rules for Python and bash, but strip the outermost quotes so we
|
# quoting rules for Python and bash, but strip the outermost quotes so we
|
||||||
|
|||||||
1340
poetry.lock
generated
1340
poetry.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -25,7 +25,7 @@ classifiers = [
|
|||||||
Changelog = "https://github.com/andrewthetechie/py-healthchecks.io/releases"
|
Changelog = "https://github.com/andrewthetechie/py-healthchecks.io/releases"
|
||||||
|
|
||||||
[tool.poetry.dependencies]
|
[tool.poetry.dependencies]
|
||||||
python = "^3.8"
|
python = "^3.7"
|
||||||
pydantic = "^1.9.1"
|
pydantic = "^1.9.1"
|
||||||
httpx = ">=0.23.0,<0.25.0"
|
httpx = ">=0.23.0,<0.25.0"
|
||||||
croniter = "^1.1.0"
|
croniter = "^1.1.0"
|
||||||
@@ -67,9 +67,6 @@ source = ["healthchecks_io"]
|
|||||||
show_missing = true
|
show_missing = true
|
||||||
fail_under = 100
|
fail_under = 100
|
||||||
|
|
||||||
[tool.bandit]
|
|
||||||
exclude_dirs = ["tests", "noxfile.py", ".github/scripts", "test_errbot", "dist"]
|
|
||||||
|
|
||||||
[tool.mypy]
|
[tool.mypy]
|
||||||
strict = true
|
strict = true
|
||||||
warn_unreachable = true
|
warn_unreachable = true
|
||||||
@@ -87,4 +84,4 @@ addopts = "-n 4 --ignore examples --cov=healthchecks_io --cov-report xml:.covera
|
|||||||
|
|
||||||
[tool.ruff]
|
[tool.ruff]
|
||||||
line-length = 120
|
line-length = 120
|
||||||
target-version = "py38"
|
target-version = "py37"
|
||||||
|
|||||||
@@ -1,5 +1,4 @@
|
|||||||
"""Py Healthchecks.Io."""
|
"""Py Healthchecks.Io."""
|
||||||
|
|
||||||
# set by poetry-dynamic-versioning
|
# set by poetry-dynamic-versioning
|
||||||
__version__ = "0.4.0" # noqa: E402
|
__version__ = "0.4.0" # noqa: E402
|
||||||
|
|
||||||
|
|||||||
@@ -1,5 +1,4 @@
|
|||||||
"""healthchecks_io clients."""
|
"""healthchecks_io clients."""
|
||||||
|
|
||||||
from .async_client import AsyncClient # noqa: F401
|
from .async_client import AsyncClient # noqa: F401
|
||||||
from .check_trap import CheckTrap # noqa: F401
|
from .check_trap import CheckTrap # noqa: F401
|
||||||
from .sync_client import Client # noqa: F401
|
from .sync_client import Client # noqa: F401
|
||||||
|
|||||||
@@ -1,6 +1,9 @@
|
|||||||
from abc import ABC
|
from abc import ABC
|
||||||
from abc import abstractmethod
|
from abc import abstractmethod
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
from typing import Dict
|
||||||
|
from typing import Optional
|
||||||
|
from typing import Union
|
||||||
from urllib.parse import parse_qsl
|
from urllib.parse import parse_qsl
|
||||||
from urllib.parse import ParseResult
|
from urllib.parse import ParseResult
|
||||||
from urllib.parse import unquote
|
from urllib.parse import unquote
|
||||||
@@ -54,7 +57,9 @@ class AbstractClient(ABC):
|
|||||||
"""Finalizer method is called by weakref.finalize when the object is dereferenced to do cleanup of clients."""
|
"""Finalizer method is called by weakref.finalize when the object is dereferenced to do cleanup of clients."""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def _get_api_request_url(self, path: str, params: dict[str, Any] | None = None) -> str:
|
def _get_api_request_url(
|
||||||
|
self, path: str, params: Optional[Dict[str, Any]] = None
|
||||||
|
) -> str:
|
||||||
"""Get a full request url for the healthchecks api.
|
"""Get a full request url for the healthchecks api.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@@ -160,7 +165,9 @@ class AbstractClient(ABC):
|
|||||||
raise CheckNotFoundError(f"CHeck not found at {response.request.url}")
|
raise CheckNotFoundError(f"CHeck not found at {response.request.url}")
|
||||||
|
|
||||||
if response.status_code == 400:
|
if response.status_code == 400:
|
||||||
raise BadAPIRequestError(f"Bad request when requesting {response.request.url}. {response.text}")
|
raise BadAPIRequestError(
|
||||||
|
f"Bad request when requesting {response.request.url}. {response.text}"
|
||||||
|
)
|
||||||
|
|
||||||
return response
|
return response
|
||||||
|
|
||||||
@@ -201,15 +208,21 @@ class AbstractClient(ABC):
|
|||||||
raise HCAPIRateLimitError(f"Rate limited on {response.request.url}")
|
raise HCAPIRateLimitError(f"Rate limited on {response.request.url}")
|
||||||
|
|
||||||
if response.status_code == 400:
|
if response.status_code == 400:
|
||||||
raise BadAPIRequestError(f"Bad request when requesting {response.request.url}. {response.text}")
|
raise BadAPIRequestError(
|
||||||
|
f"Bad request when requesting {response.request.url}. {response.text}"
|
||||||
|
)
|
||||||
|
|
||||||
if response.status_code == 409:
|
if response.status_code == 409:
|
||||||
raise NonUniqueSlugError(f"Bad request, slug conflict {response.request.url}. {response.text}")
|
raise NonUniqueSlugError(
|
||||||
|
f"Bad request, slug conflict {response.request.url}. {response.text}"
|
||||||
|
)
|
||||||
|
|
||||||
return response
|
return response
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _add_url_params(url: str, params: dict[str, str | int | bool], replace: bool = True) -> str:
|
def _add_url_params(
|
||||||
|
url: str, params: Dict[str, Union[str, int, bool]], replace: bool = True
|
||||||
|
) -> str:
|
||||||
"""Add GET params to provided URL being aware of existing.
|
"""Add GET params to provided URL being aware of existing.
|
||||||
|
|
||||||
:param url: string of target URL
|
:param url: string of target URL
|
||||||
@@ -242,7 +255,12 @@ class AbstractClient(ABC):
|
|||||||
# get all the duplicated keys from params and urlencode them, we'll concat this to the params string later
|
# get all the duplicated keys from params and urlencode them, we'll concat this to the params string later
|
||||||
duplicated_params = [x for x in params if x in parsed_get_args]
|
duplicated_params = [x for x in params if x in parsed_get_args]
|
||||||
# get all the args that aren't duplicated and add them to parsed_get_args
|
# get all the args that aren't duplicated and add them to parsed_get_args
|
||||||
parsed_get_args.update({key: parsed_params[key] for key in [x for x in params if x not in parsed_get_args]})
|
parsed_get_args.update(
|
||||||
|
{
|
||||||
|
key: parsed_params[key]
|
||||||
|
for key in [x for x in params if x not in parsed_get_args]
|
||||||
|
}
|
||||||
|
)
|
||||||
# if we have any duplicated parameters, urlencode them, we append them later
|
# if we have any duplicated parameters, urlencode them, we append them later
|
||||||
extra_parameters = (
|
extra_parameters = (
|
||||||
f"&{urlencode({key: params[key] for key in duplicated_params}, doseq=True)}"
|
f"&{urlencode({key: params[key] for key in duplicated_params}, doseq=True)}"
|
||||||
|
|||||||
@@ -1,7 +1,11 @@
|
|||||||
"""An async healthchecks.io client."""
|
"""An async healthchecks.io client."""
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
from types import TracebackType
|
from types import TracebackType
|
||||||
|
from typing import Dict
|
||||||
|
from typing import List
|
||||||
|
from typing import Optional
|
||||||
|
from typing import Tuple
|
||||||
|
from typing import Type
|
||||||
|
|
||||||
from httpx import AsyncClient as HTTPXAsyncClient
|
from httpx import AsyncClient as HTTPXAsyncClient
|
||||||
|
|
||||||
@@ -25,7 +29,7 @@ class AsyncClient(AbstractClient):
|
|||||||
api_url: str = "https://healthchecks.io/api/",
|
api_url: str = "https://healthchecks.io/api/",
|
||||||
ping_url: str = "https://hc-ping.com/",
|
ping_url: str = "https://hc-ping.com/",
|
||||||
api_version: int = 1,
|
api_version: int = 1,
|
||||||
client: HTTPXAsyncClient | None = None,
|
client: Optional[HTTPXAsyncClient] = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""An AsyncClient can be used in code using asyncio to work with the Healthchecks.io api.
|
"""An AsyncClient can be used in code using asyncio to work with the Healthchecks.io api.
|
||||||
|
|
||||||
@@ -60,9 +64,9 @@ class AsyncClient(AbstractClient):
|
|||||||
|
|
||||||
async def __aexit__(
|
async def __aexit__(
|
||||||
self,
|
self,
|
||||||
exc_type: type[BaseException] | None,
|
exc_type: Optional[Type[BaseException]],
|
||||||
exc: BaseException | None,
|
exc: Optional[BaseException],
|
||||||
traceback: TracebackType | None,
|
traceback: Optional[TracebackType],
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Context manager exit."""
|
"""Context manager exit."""
|
||||||
await self._afinalizer_method()
|
await self._afinalizer_method()
|
||||||
@@ -114,7 +118,7 @@ class AsyncClient(AbstractClient):
|
|||||||
)
|
)
|
||||||
return Check.from_api_result(response.json())
|
return Check.from_api_result(response.json())
|
||||||
|
|
||||||
async def get_checks(self, tags: list[str] | None = None) -> list[Check]:
|
async def get_checks(self, tags: Optional[List[str]] = None) -> List[Check]:
|
||||||
"""Get a list of checks from the healthchecks api.
|
"""Get a list of checks from the healthchecks api.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@@ -209,7 +213,7 @@ class AsyncClient(AbstractClient):
|
|||||||
response = self.check_response(await self._client.delete(request_url))
|
response = self.check_response(await self._client.delete(request_url))
|
||||||
return Check.from_api_result(response.json())
|
return Check.from_api_result(response.json())
|
||||||
|
|
||||||
async def get_check_pings(self, check_id: str) -> list[CheckPings]:
|
async def get_check_pings(self, check_id: str) -> List[CheckPings]:
|
||||||
"""Returns a list of pings this check has received.
|
"""Returns a list of pings this check has received.
|
||||||
|
|
||||||
This endpoint returns pings in reverse order (most recent first),
|
This endpoint returns pings in reverse order (most recent first),
|
||||||
@@ -237,10 +241,10 @@ class AsyncClient(AbstractClient):
|
|||||||
async def get_check_flips(
|
async def get_check_flips(
|
||||||
self,
|
self,
|
||||||
check_id: str,
|
check_id: str,
|
||||||
seconds: int | None = None,
|
seconds: Optional[int] = None,
|
||||||
start: int | None = None,
|
start: Optional[int] = None,
|
||||||
end: int | None = None,
|
end: Optional[int] = None,
|
||||||
) -> list[CheckStatuses]:
|
) -> List[CheckStatuses]:
|
||||||
"""Returns a list of "flips" this check has experienced.
|
"""Returns a list of "flips" this check has experienced.
|
||||||
|
|
||||||
A flip is a change of status (from "down" to "up," or from "up" to "down").
|
A flip is a change of status (from "down" to "up," or from "up" to "down").
|
||||||
@@ -277,7 +281,7 @@ class AsyncClient(AbstractClient):
|
|||||||
response = self.check_response(await self._client.get(request_url))
|
response = self.check_response(await self._client.get(request_url))
|
||||||
return [CheckStatuses(**status_data) for status_data in response.json()]
|
return [CheckStatuses(**status_data) for status_data in response.json()]
|
||||||
|
|
||||||
async def get_integrations(self) -> list[Integration | None]:
|
async def get_integrations(self) -> List[Optional[Integration]]:
|
||||||
"""Returns a list of integrations belonging to the project.
|
"""Returns a list of integrations belonging to the project.
|
||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
@@ -293,7 +297,7 @@ class AsyncClient(AbstractClient):
|
|||||||
response = self.check_response(await self._client.get(request_url))
|
response = self.check_response(await self._client.get(request_url))
|
||||||
return [Integration.from_api_result(integration_dict) for integration_dict in response.json()["channels"]]
|
return [Integration.from_api_result(integration_dict) for integration_dict in response.json()["channels"]]
|
||||||
|
|
||||||
async def get_badges(self) -> dict[str, Badges]:
|
async def get_badges(self) -> Dict[str, Badges]:
|
||||||
"""Returns a dict of all tags in the project, with badge URLs for each tag.
|
"""Returns a dict of all tags in the project, with badge URLs for each tag.
|
||||||
|
|
||||||
Healthchecks.io provides badges in a few different formats:
|
Healthchecks.io provides badges in a few different formats:
|
||||||
@@ -321,7 +325,7 @@ class AsyncClient(AbstractClient):
|
|||||||
response = self.check_response(await self._client.get(request_url))
|
response = self.check_response(await self._client.get(request_url))
|
||||||
return {key: Badges.from_api_result(item) for key, item in response.json()["badges"].items()}
|
return {key: Badges.from_api_result(item) for key, item in response.json()["badges"].items()}
|
||||||
|
|
||||||
async def success_ping(self, uuid: str = "", slug: str = "", data: str = "") -> tuple[bool, str]:
|
async def success_ping(self, uuid: str = "", slug: str = "", data: str = "") -> Tuple[bool, str]:
|
||||||
"""Signals to Healthchecks.io that a job has completed successfully.
|
"""Signals to Healthchecks.io that a job has completed successfully.
|
||||||
|
|
||||||
Can also be used to indicate a continuously running process is still running and healthy.
|
Can also be used to indicate a continuously running process is still running and healthy.
|
||||||
@@ -355,7 +359,7 @@ class AsyncClient(AbstractClient):
|
|||||||
response = self.check_ping_response(await self._client.post(ping_url, content=data))
|
response = self.check_ping_response(await self._client.post(ping_url, content=data))
|
||||||
return (True if response.status_code == 200 else False, response.text)
|
return (True if response.status_code == 200 else False, response.text)
|
||||||
|
|
||||||
async def start_ping(self, uuid: str = "", slug: str = "", data: str = "") -> tuple[bool, str]:
|
async def start_ping(self, uuid: str = "", slug: str = "", data: str = "") -> Tuple[bool, str]:
|
||||||
"""Sends a "job has started!" message to Healthchecks.io.
|
"""Sends a "job has started!" message to Healthchecks.io.
|
||||||
|
|
||||||
Sending a "start" signal is optional, but it enables a few extra features:
|
Sending a "start" signal is optional, but it enables a few extra features:
|
||||||
@@ -391,7 +395,7 @@ class AsyncClient(AbstractClient):
|
|||||||
response = self.check_ping_response(await self._client.post(ping_url, content=data))
|
response = self.check_ping_response(await self._client.post(ping_url, content=data))
|
||||||
return (True if response.status_code == 200 else False, response.text)
|
return (True if response.status_code == 200 else False, response.text)
|
||||||
|
|
||||||
async def fail_ping(self, uuid: str = "", slug: str = "", data: str = "") -> tuple[bool, str]:
|
async def fail_ping(self, uuid: str = "", slug: str = "", data: str = "") -> Tuple[bool, str]:
|
||||||
"""Signals to Healthchecks.io that the job has failed.
|
"""Signals to Healthchecks.io that the job has failed.
|
||||||
|
|
||||||
Actively signaling a failure minimizes the delay from your monitored service failing to you receiving an alert.
|
Actively signaling a failure minimizes the delay from your monitored service failing to you receiving an alert.
|
||||||
@@ -425,7 +429,7 @@ class AsyncClient(AbstractClient):
|
|||||||
response = self.check_ping_response(await self._client.post(ping_url, content=data))
|
response = self.check_ping_response(await self._client.post(ping_url, content=data))
|
||||||
return (True if response.status_code == 200 else False, response.text)
|
return (True if response.status_code == 200 else False, response.text)
|
||||||
|
|
||||||
async def exit_code_ping(self, exit_code: int, uuid: str = "", slug: str = "", data: str = "") -> tuple[bool, str]:
|
async def exit_code_ping(self, exit_code: int, uuid: str = "", slug: str = "", data: str = "") -> Tuple[bool, str]:
|
||||||
"""Signals to Healthchecks.io that the job has failed.
|
"""Signals to Healthchecks.io that the job has failed.
|
||||||
|
|
||||||
Actively signaling a failure minimizes the delay from your monitored service failing to you receiving an alert.
|
Actively signaling a failure minimizes the delay from your monitored service failing to you receiving an alert.
|
||||||
|
|||||||
@@ -1,6 +1,9 @@
|
|||||||
"""CheckTrap is a context manager to wrap around python code to communicate results to a Healthchecks check."""
|
"""CheckTrap is a context manager to wrap around python code to communicate results to a Healthchecks check."""
|
||||||
|
|
||||||
from types import TracebackType
|
from types import TracebackType
|
||||||
|
from typing import List
|
||||||
|
from typing import Optional
|
||||||
|
from typing import Type
|
||||||
|
from typing import Union
|
||||||
|
|
||||||
from .async_client import AsyncClient
|
from .async_client import AsyncClient
|
||||||
from .exceptions import PingFailedError
|
from .exceptions import PingFailedError
|
||||||
@@ -13,7 +16,7 @@ class CheckTrap:
|
|||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
client: Client | AsyncClient,
|
client: Union[Client, AsyncClient],
|
||||||
uuid: str = "",
|
uuid: str = "",
|
||||||
slug: str = "",
|
slug: str = "",
|
||||||
suppress_exceptions: bool = False,
|
suppress_exceptions: bool = False,
|
||||||
@@ -31,10 +34,10 @@ class CheckTrap:
|
|||||||
"""
|
"""
|
||||||
if uuid == "" and slug == "":
|
if uuid == "" and slug == "":
|
||||||
raise Exception("Must pass a slug or an uuid")
|
raise Exception("Must pass a slug or an uuid")
|
||||||
self.client: Client | AsyncClient = client
|
self.client: Union[Client, AsyncClient] = client
|
||||||
self.uuid: str = uuid
|
self.uuid: str = uuid
|
||||||
self.slug: str = slug
|
self.slug: str = slug
|
||||||
self.log_lines: list[str] = list()
|
self.log_lines: List[str] = list()
|
||||||
self.suppress_exceptions: bool = suppress_exceptions
|
self.suppress_exceptions: bool = suppress_exceptions
|
||||||
|
|
||||||
def add_log(self, line: str) -> None:
|
def add_log(self, line: str) -> None:
|
||||||
@@ -65,7 +68,9 @@ class CheckTrap:
|
|||||||
CheckTrap: self
|
CheckTrap: self
|
||||||
"""
|
"""
|
||||||
if isinstance(self.client, AsyncClient):
|
if isinstance(self.client, AsyncClient):
|
||||||
raise WrongClientError("You passed an AsyncClient, use this as an async context manager")
|
raise WrongClientError(
|
||||||
|
"You passed an AsyncClient, use this as an async context manager"
|
||||||
|
)
|
||||||
result = self.client.start_ping(uuid=self.uuid, slug=self.slug)
|
result = self.client.start_ping(uuid=self.uuid, slug=self.slug)
|
||||||
if not result[0]:
|
if not result[0]:
|
||||||
raise PingFailedError(result[1])
|
raise PingFailedError(result[1])
|
||||||
@@ -73,10 +78,10 @@ class CheckTrap:
|
|||||||
|
|
||||||
def __exit__(
|
def __exit__(
|
||||||
self,
|
self,
|
||||||
exc_type: type[BaseException] | None,
|
exc_type: Optional[Type[BaseException]],
|
||||||
exc: BaseException | None,
|
exc: Optional[BaseException],
|
||||||
traceback: TracebackType | None,
|
traceback: Optional[TracebackType],
|
||||||
) -> bool | None:
|
) -> Optional[bool]:
|
||||||
"""Exit the context manager.
|
"""Exit the context manager.
|
||||||
|
|
||||||
If there is an exception, add it to any log lines and send a fail ping.
|
If there is an exception, add it to any log lines and send a fail ping.
|
||||||
@@ -91,7 +96,9 @@ class CheckTrap:
|
|||||||
Optional[bool]: self.suppress_exceptions, if true will not raise any exceptions
|
Optional[bool]: self.suppress_exceptions, if true will not raise any exceptions
|
||||||
"""
|
"""
|
||||||
if exc_type is None:
|
if exc_type is None:
|
||||||
self.client.success_ping(self.uuid, self.slug, data="\n".join(self.log_lines))
|
self.client.success_ping(
|
||||||
|
self.uuid, self.slug, data="\n".join(self.log_lines)
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
self.add_log(str(exc))
|
self.add_log(str(exc))
|
||||||
self.add_log(str(traceback))
|
self.add_log(str(traceback))
|
||||||
@@ -118,7 +125,9 @@ class CheckTrap:
|
|||||||
CheckTrap: self
|
CheckTrap: self
|
||||||
"""
|
"""
|
||||||
if isinstance(self.client, Client):
|
if isinstance(self.client, Client):
|
||||||
raise WrongClientError("You passed a sync Client, use this as a regular context manager")
|
raise WrongClientError(
|
||||||
|
"You passed a sync Client, use this as a regular context manager"
|
||||||
|
)
|
||||||
result = await self.client.start_ping(self.uuid, self.slug)
|
result = await self.client.start_ping(self.uuid, self.slug)
|
||||||
if not result[0]:
|
if not result[0]:
|
||||||
raise PingFailedError(result[1])
|
raise PingFailedError(result[1])
|
||||||
@@ -126,10 +135,10 @@ class CheckTrap:
|
|||||||
|
|
||||||
async def __aexit__(
|
async def __aexit__(
|
||||||
self,
|
self,
|
||||||
exc_type: type[BaseException] | None,
|
exc_type: Optional[Type[BaseException]],
|
||||||
exc: BaseException | None,
|
exc: Optional[BaseException],
|
||||||
traceback: TracebackType | None,
|
traceback: Optional[TracebackType],
|
||||||
) -> bool | None:
|
) -> Optional[bool]:
|
||||||
"""Exit the context manager.
|
"""Exit the context manager.
|
||||||
|
|
||||||
If there is an exception, add it to any log lines and send a fail ping.
|
If there is an exception, add it to any log lines and send a fail ping.
|
||||||
|
|||||||
@@ -1,6 +1,10 @@
|
|||||||
"""An async healthchecks.io client."""
|
"""An async healthchecks.io client."""
|
||||||
|
|
||||||
from types import TracebackType
|
from types import TracebackType
|
||||||
|
from typing import Dict
|
||||||
|
from typing import List
|
||||||
|
from typing import Optional
|
||||||
|
from typing import Tuple
|
||||||
|
from typing import Type
|
||||||
|
|
||||||
from httpx import Client as HTTPXClient
|
from httpx import Client as HTTPXClient
|
||||||
|
|
||||||
@@ -23,7 +27,7 @@ class Client(AbstractClient):
|
|||||||
api_url: str = "https://healthchecks.io/api/",
|
api_url: str = "https://healthchecks.io/api/",
|
||||||
ping_url: str = "https://hc-ping.com/",
|
ping_url: str = "https://hc-ping.com/",
|
||||||
api_version: int = 1,
|
api_version: int = 1,
|
||||||
client: HTTPXClient | None = None,
|
client: Optional[HTTPXClient] = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""An AsyncClient can be used in code using asyncio to work with the Healthchecks.io api.
|
"""An AsyncClient can be used in code using asyncio to work with the Healthchecks.io api.
|
||||||
|
|
||||||
@@ -58,9 +62,9 @@ class Client(AbstractClient):
|
|||||||
|
|
||||||
def __exit__(
|
def __exit__(
|
||||||
self,
|
self,
|
||||||
exc_type: type[BaseException] | None,
|
exc_type: Optional[Type[BaseException]],
|
||||||
exc: BaseException | None,
|
exc: Optional[BaseException],
|
||||||
traceback: TracebackType | None,
|
traceback: Optional[TracebackType],
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Context manager exit."""
|
"""Context manager exit."""
|
||||||
self._finalizer_method()
|
self._finalizer_method()
|
||||||
@@ -69,7 +73,7 @@ class Client(AbstractClient):
|
|||||||
"""Closes the httpx client."""
|
"""Closes the httpx client."""
|
||||||
self._client.close()
|
self._client.close()
|
||||||
|
|
||||||
def get_checks(self, tags: list[str] | None = None) -> list[checks.Check]:
|
def get_checks(self, tags: Optional[List[str]] = None) -> List[checks.Check]:
|
||||||
"""Get a list of checks from the healthchecks api.
|
"""Get a list of checks from the healthchecks api.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@@ -204,7 +208,7 @@ class Client(AbstractClient):
|
|||||||
response = self.check_response(self._client.delete(request_url))
|
response = self.check_response(self._client.delete(request_url))
|
||||||
return checks.Check.from_api_result(response.json())
|
return checks.Check.from_api_result(response.json())
|
||||||
|
|
||||||
def get_check_pings(self, check_id: str) -> list[checks.CheckPings]:
|
def get_check_pings(self, check_id: str) -> List[checks.CheckPings]:
|
||||||
"""Returns a list of pings this check has received.
|
"""Returns a list of pings this check has received.
|
||||||
|
|
||||||
This endpoint returns pings in reverse order (most recent first),
|
This endpoint returns pings in reverse order (most recent first),
|
||||||
@@ -230,10 +234,10 @@ class Client(AbstractClient):
|
|||||||
def get_check_flips(
|
def get_check_flips(
|
||||||
self,
|
self,
|
||||||
check_id: str,
|
check_id: str,
|
||||||
seconds: int | None = None,
|
seconds: Optional[int] = None,
|
||||||
start: int | None = None,
|
start: Optional[int] = None,
|
||||||
end: int | None = None,
|
end: Optional[int] = None,
|
||||||
) -> list[checks.CheckStatuses]:
|
) -> List[checks.CheckStatuses]:
|
||||||
"""Returns a list of "flips" this check has experienced.
|
"""Returns a list of "flips" this check has experienced.
|
||||||
|
|
||||||
A flip is a change of status (from "down" to "up," or from "up" to "down").
|
A flip is a change of status (from "down" to "up," or from "up" to "down").
|
||||||
@@ -268,7 +272,7 @@ class Client(AbstractClient):
|
|||||||
response = self.check_response(self._client.get(request_url))
|
response = self.check_response(self._client.get(request_url))
|
||||||
return [checks.CheckStatuses(**status_data) for status_data in response.json()]
|
return [checks.CheckStatuses(**status_data) for status_data in response.json()]
|
||||||
|
|
||||||
def get_integrations(self) -> list[integrations.Integration | None]:
|
def get_integrations(self) -> List[Optional[integrations.Integration]]:
|
||||||
"""Returns a list of integrations belonging to the project.
|
"""Returns a list of integrations belonging to the project.
|
||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
@@ -286,7 +290,7 @@ class Client(AbstractClient):
|
|||||||
for integration_dict in response.json()["channels"]
|
for integration_dict in response.json()["channels"]
|
||||||
]
|
]
|
||||||
|
|
||||||
def get_badges(self) -> dict[str, badges.Badges]:
|
def get_badges(self) -> Dict[str, badges.Badges]:
|
||||||
"""Returns a dict of all tags in the project, with badge URLs for each tag.
|
"""Returns a dict of all tags in the project, with badge URLs for each tag.
|
||||||
|
|
||||||
Healthchecks.io provides badges in a few different formats:
|
Healthchecks.io provides badges in a few different formats:
|
||||||
@@ -313,7 +317,7 @@ class Client(AbstractClient):
|
|||||||
response = self.check_response(self._client.get(request_url))
|
response = self.check_response(self._client.get(request_url))
|
||||||
return {key: badges.Badges.from_api_result(item) for key, item in response.json()["badges"].items()}
|
return {key: badges.Badges.from_api_result(item) for key, item in response.json()["badges"].items()}
|
||||||
|
|
||||||
def success_ping(self, uuid: str = "", slug: str = "", data: str = "") -> tuple[bool, str]:
|
def success_ping(self, uuid: str = "", slug: str = "", data: str = "") -> Tuple[bool, str]:
|
||||||
"""Signals to Healthchecks.io that a job has completed successfully.
|
"""Signals to Healthchecks.io that a job has completed successfully.
|
||||||
|
|
||||||
Can also be used to indicate a continuously running process is still running and healthy.
|
Can also be used to indicate a continuously running process is still running and healthy.
|
||||||
@@ -347,7 +351,7 @@ class Client(AbstractClient):
|
|||||||
response = self.check_ping_response(self._client.post(ping_url, content=data))
|
response = self.check_ping_response(self._client.post(ping_url, content=data))
|
||||||
return (True if response.status_code == 200 else False, response.text)
|
return (True if response.status_code == 200 else False, response.text)
|
||||||
|
|
||||||
def start_ping(self, uuid: str = "", slug: str = "", data: str = "") -> tuple[bool, str]:
|
def start_ping(self, uuid: str = "", slug: str = "", data: str = "") -> Tuple[bool, str]:
|
||||||
"""Sends a "job has started!" message to Healthchecks.io.
|
"""Sends a "job has started!" message to Healthchecks.io.
|
||||||
|
|
||||||
Sending a "start" signal is optional, but it enables a few extra features:
|
Sending a "start" signal is optional, but it enables a few extra features:
|
||||||
@@ -383,7 +387,7 @@ class Client(AbstractClient):
|
|||||||
response = self.check_ping_response(self._client.post(ping_url, content=data))
|
response = self.check_ping_response(self._client.post(ping_url, content=data))
|
||||||
return (True if response.status_code == 200 else False, response.text)
|
return (True if response.status_code == 200 else False, response.text)
|
||||||
|
|
||||||
def fail_ping(self, uuid: str = "", slug: str = "", data: str = "") -> tuple[bool, str]:
|
def fail_ping(self, uuid: str = "", slug: str = "", data: str = "") -> Tuple[bool, str]:
|
||||||
"""Signals to Healthchecks.io that the job has failed.
|
"""Signals to Healthchecks.io that the job has failed.
|
||||||
|
|
||||||
Actively signaling a failure minimizes the delay from your monitored service failing to you receiving an alert.
|
Actively signaling a failure minimizes the delay from your monitored service failing to you receiving an alert.
|
||||||
@@ -417,7 +421,7 @@ class Client(AbstractClient):
|
|||||||
response = self.check_ping_response(self._client.post(ping_url, content=data))
|
response = self.check_ping_response(self._client.post(ping_url, content=data))
|
||||||
return (True if response.status_code == 200 else False, response.text)
|
return (True if response.status_code == 200 else False, response.text)
|
||||||
|
|
||||||
def exit_code_ping(self, exit_code: int, uuid: str = "", slug: str = "", data: str = "") -> tuple[bool, str]:
|
def exit_code_ping(self, exit_code: int, uuid: str = "", slug: str = "", data: str = "") -> Tuple[bool, str]:
|
||||||
"""Signals to Healthchecks.io that the job has failed.
|
"""Signals to Healthchecks.io that the job has failed.
|
||||||
|
|
||||||
Actively signaling a failure minimizes the delay from your monitored service failing to you receiving an alert.
|
Actively signaling a failure minimizes the delay from your monitored service failing to you receiving an alert.
|
||||||
|
|||||||
@@ -1,5 +1,4 @@
|
|||||||
"""Schemas for healthchecks_io."""
|
"""Schemas for healthchecks_io."""
|
||||||
|
|
||||||
from .badges import Badges
|
from .badges import Badges
|
||||||
from .checks import Check
|
from .checks import Check
|
||||||
from .checks import CheckCreate
|
from .checks import CheckCreate
|
||||||
|
|||||||
@@ -2,6 +2,7 @@
|
|||||||
|
|
||||||
https://healthchecks.io/docs/api/
|
https://healthchecks.io/docs/api/
|
||||||
"""
|
"""
|
||||||
|
from typing import Dict
|
||||||
|
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
|
|
||||||
@@ -17,7 +18,7 @@ class Badges(BaseModel):
|
|||||||
shields3: str
|
shields3: str
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_api_result(cls, badges_dict: dict[str, str]) -> "Badges":
|
def from_api_result(cls, badges_dict: Dict[str, str]) -> "Badges":
|
||||||
"""Converts a dictionary from the healthchecks api into a Badges object."""
|
"""Converts a dictionary from the healthchecks api into a Badges object."""
|
||||||
badges_dict["json_url"] = badges_dict["json"]
|
badges_dict["json_url"] = badges_dict["json"]
|
||||||
badges_dict["json3_url"] = badges_dict["json3"]
|
badges_dict["json3_url"] = badges_dict["json3"]
|
||||||
|
|||||||
@@ -2,10 +2,13 @@
|
|||||||
|
|
||||||
https://healthchecks.io/docs/api/
|
https://healthchecks.io/docs/api/
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from pathlib import PurePath
|
from pathlib import PurePath
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
from typing import Dict
|
||||||
|
from typing import List
|
||||||
|
from typing import Optional
|
||||||
|
from typing import Union
|
||||||
from urllib.parse import urlparse
|
from urllib.parse import urlparse
|
||||||
|
|
||||||
import pytz
|
import pytz
|
||||||
@@ -18,28 +21,28 @@ from pydantic import validator
|
|||||||
class Check(BaseModel):
|
class Check(BaseModel):
|
||||||
"""Schema for a check object, either from a readonly api request or a rw api request."""
|
"""Schema for a check object, either from a readonly api request or a rw api request."""
|
||||||
|
|
||||||
unique_key: str | None
|
unique_key: Optional[str]
|
||||||
name: str
|
name: str
|
||||||
slug: str
|
slug: str
|
||||||
tags: str | None
|
tags: Optional[str]
|
||||||
desc: str | None
|
desc: Optional[str]
|
||||||
grace: int
|
grace: int
|
||||||
n_pings: int
|
n_pings: int
|
||||||
status: str
|
status: str
|
||||||
last_ping: datetime | None
|
last_ping: Optional[datetime]
|
||||||
next_ping: datetime | None
|
next_ping: Optional[datetime]
|
||||||
manual_resume: bool
|
manual_resume: bool
|
||||||
methods: str | None
|
methods: Optional[str]
|
||||||
# healthchecks.io's api doesn't return a scheme so we cant use Pydantic AnyUrl here
|
# healthchecks.io's api doesn't return a scheme so we cant use Pydantic AnyUrl here
|
||||||
ping_url: str | None
|
ping_url: Optional[str]
|
||||||
update_url: str | None
|
update_url: Optional[str]
|
||||||
pause_url: str | None
|
pause_url: Optional[str]
|
||||||
channels: str | None
|
channels: Optional[str]
|
||||||
timeout: int | None
|
timeout: Optional[int]
|
||||||
uuid: str | None
|
uuid: Optional[str]
|
||||||
|
|
||||||
@validator("uuid", always=True)
|
@validator("uuid", always=True)
|
||||||
def validate_uuid(cls, value: str | None, values: dict[str, Any]) -> str | None: # noqa: B902
|
def validate_uuid(cls, value: Optional[str], values: Dict[str, Any]) -> Optional[str]: # noqa: B902
|
||||||
"""Tries to set the uuid from the ping_url.
|
"""Tries to set the uuid from the ping_url.
|
||||||
|
|
||||||
Will return none if a read only token is used because it cannot retrieve the UUID of a check
|
Will return none if a read only token is used because it cannot retrieve the UUID of a check
|
||||||
@@ -53,7 +56,7 @@ class Check(BaseModel):
|
|||||||
return value
|
return value
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_api_result(cls, check_dict: dict[str, Any]) -> "Check":
|
def from_api_result(cls, check_dict: Dict[str, Any]) -> "Check":
|
||||||
"""Converts a dictionary from the healthchecks api into an Check object."""
|
"""Converts a dictionary from the healthchecks api into an Check object."""
|
||||||
return cls(**check_dict)
|
return cls(**check_dict)
|
||||||
|
|
||||||
@@ -61,33 +64,34 @@ class Check(BaseModel):
|
|||||||
class CheckCreate(BaseModel):
|
class CheckCreate(BaseModel):
|
||||||
"""Pydantic object for creating a check."""
|
"""Pydantic object for creating a check."""
|
||||||
|
|
||||||
name: str | None = Field("", description="Name of the check")
|
name: Optional[str] = Field("", description="Name of the check")
|
||||||
tags: str | None = Field("", description="String separated list of tags to apply")
|
tags: Optional[str] = Field("", description="String separated list of tags to apply")
|
||||||
desc: str | None = Field("", description="Description of the check")
|
desc: Optional[str] = Field("", description="Description of the check")
|
||||||
timeout: int | None = Field(
|
timeout: Optional[int] = Field(
|
||||||
86400,
|
86400,
|
||||||
description="The expected period of this check in seconds.",
|
description="The expected period of this check in seconds.",
|
||||||
gte=60,
|
gte=60,
|
||||||
lte=31536000,
|
lte=31536000,
|
||||||
)
|
)
|
||||||
grace: int | None = Field(
|
grace: Optional[int] = Field(
|
||||||
3600,
|
3600,
|
||||||
description="The grace period for this check in seconds.",
|
description="The grace period for this check in seconds.",
|
||||||
gte=60,
|
gte=60,
|
||||||
lte=31536000,
|
lte=31536000,
|
||||||
)
|
)
|
||||||
schedule: str | None = Field(
|
schedule: Optional[str] = Field(
|
||||||
None,
|
None,
|
||||||
description="A cron expression defining this check's schedule. "
|
description="A cron expression defining this check's schedule. "
|
||||||
"If you specify both timeout and schedule parameters, "
|
"If you specify both timeout and schedule parameters, "
|
||||||
"Healthchecks.io will create a Cron check and ignore the "
|
"Healthchecks.io will create a Cron check and ignore the "
|
||||||
"timeout value.",
|
"timeout value.",
|
||||||
)
|
)
|
||||||
tz: str | None = Field(
|
tz: Optional[str] = Field(
|
||||||
"UTC",
|
"UTC",
|
||||||
description="Server's timezone. This setting only has an effect " "in combination with the schedule parameter.",
|
description="Server's timezone. This setting only has an effect "
|
||||||
|
"in combination with the schedule parameter.",
|
||||||
)
|
)
|
||||||
manual_resume: bool | None = Field(
|
manual_resume: Optional[bool] = Field(
|
||||||
False,
|
False,
|
||||||
description="Controls whether a paused check automatically resumes "
|
description="Controls whether a paused check automatically resumes "
|
||||||
"when pinged (the default) or not. If set to false, a paused "
|
"when pinged (the default) or not. If set to false, a paused "
|
||||||
@@ -95,7 +99,7 @@ class CheckCreate(BaseModel):
|
|||||||
"If set to true, a paused check will ignore pings and stay "
|
"If set to true, a paused check will ignore pings and stay "
|
||||||
"paused until you manually resume it from the web dashboard.",
|
"paused until you manually resume it from the web dashboard.",
|
||||||
)
|
)
|
||||||
methods: str | None = Field(
|
methods: Optional[str] = Field(
|
||||||
"",
|
"",
|
||||||
description="Specifies the allowed HTTP methods for making "
|
description="Specifies the allowed HTTP methods for making "
|
||||||
"ping requests. Must be one of the two values: an empty "
|
"ping requests. Must be one of the two values: an empty "
|
||||||
@@ -103,7 +107,7 @@ class CheckCreate(BaseModel):
|
|||||||
"allow HEAD, GET, and POST requests. Set this field to "
|
"allow HEAD, GET, and POST requests. Set this field to "
|
||||||
"POST to allow only POST requests.",
|
"POST to allow only POST requests.",
|
||||||
)
|
)
|
||||||
channels: str | None = Field(
|
channels: Optional[str] = Field(
|
||||||
None,
|
None,
|
||||||
description="By default, this API call assigns no integrations"
|
description="By default, this API call assigns no integrations"
|
||||||
"to the newly created check. By default, this API call "
|
"to the newly created check. By default, this API call "
|
||||||
@@ -111,7 +115,7 @@ class CheckCreate(BaseModel):
|
|||||||
"To assign specific integrations, use a comma-separated list "
|
"To assign specific integrations, use a comma-separated list "
|
||||||
"of integration UUIDs.",
|
"of integration UUIDs.",
|
||||||
)
|
)
|
||||||
unique: list[str | None] | None = Field(
|
unique: Optional[List[Optional[str]]] = Field(
|
||||||
[],
|
[],
|
||||||
description="Enables upsert functionality. Before creating a check, "
|
description="Enables upsert functionality. Before creating a check, "
|
||||||
"Healthchecks.io looks for existing checks, filtered by fields listed "
|
"Healthchecks.io looks for existing checks, filtered by fields listed "
|
||||||
@@ -144,7 +148,7 @@ class CheckCreate(BaseModel):
|
|||||||
return value
|
return value
|
||||||
|
|
||||||
@validator("unique")
|
@validator("unique")
|
||||||
def validate_unique(cls, value: list[str | None]) -> list[str | None]:
|
def validate_unique(cls, value: List[Optional[str]]) -> List[Optional[str]]:
|
||||||
"""Validate unique list."""
|
"""Validate unique list."""
|
||||||
for unique in value:
|
for unique in value:
|
||||||
if unique not in ("name", "tags", "timeout", "grace"):
|
if unique not in ("name", "tags", "timeout", "grace"):
|
||||||
@@ -157,32 +161,33 @@ class CheckCreate(BaseModel):
|
|||||||
class CheckUpdate(CheckCreate):
|
class CheckUpdate(CheckCreate):
|
||||||
"""Pydantic object for updating a check."""
|
"""Pydantic object for updating a check."""
|
||||||
|
|
||||||
name: str | None = Field(None, description="Name of the check")
|
name: Optional[str] = Field(None, description="Name of the check")
|
||||||
tags: str | None = Field(None, description="String separated list of tags to apply")
|
tags: Optional[str] = Field(None, description="String separated list of tags to apply")
|
||||||
timeout: int | None = Field(
|
timeout: Optional[int] = Field(
|
||||||
None,
|
None,
|
||||||
description="The expected period of this check in seconds.",
|
description="The expected period of this check in seconds.",
|
||||||
gte=60,
|
gte=60,
|
||||||
lte=31536000,
|
lte=31536000,
|
||||||
)
|
)
|
||||||
grace: int | None = Field(
|
grace: Optional[int] = Field(
|
||||||
None,
|
None,
|
||||||
description="The grace period for this check in seconds.",
|
description="The grace period for this check in seconds.",
|
||||||
gte=60,
|
gte=60,
|
||||||
lte=31536000,
|
lte=31536000,
|
||||||
)
|
)
|
||||||
schedule: str | None = Field(
|
schedule: Optional[str] = Field(
|
||||||
None,
|
None,
|
||||||
description="A cron expression defining this check's schedule. "
|
description="A cron expression defining this check's schedule. "
|
||||||
"If you specify both timeout and schedule parameters, "
|
"If you specify both timeout and schedule parameters, "
|
||||||
"Healthchecks.io will create a Cron check and ignore the "
|
"Healthchecks.io will create a Cron check and ignore the "
|
||||||
"timeout value.",
|
"timeout value.",
|
||||||
)
|
)
|
||||||
tz: str | None = Field(
|
tz: Optional[str] = Field(
|
||||||
None,
|
None,
|
||||||
description="Server's timezone. This setting only has an effect " "in combination with the schedule parameter.",
|
description="Server's timezone. This setting only has an effect "
|
||||||
|
"in combination with the schedule parameter.",
|
||||||
)
|
)
|
||||||
manual_resume: bool | None = Field(
|
manual_resume: Optional[bool] = Field(
|
||||||
None,
|
None,
|
||||||
description="Controls whether a paused check automatically resumes "
|
description="Controls whether a paused check automatically resumes "
|
||||||
"when pinged (the default) or not. If set to false, a paused "
|
"when pinged (the default) or not. If set to false, a paused "
|
||||||
@@ -190,7 +195,7 @@ class CheckUpdate(CheckCreate):
|
|||||||
"If set to true, a paused check will ignore pings and stay "
|
"If set to true, a paused check will ignore pings and stay "
|
||||||
"paused until you manually resume it from the web dashboard.",
|
"paused until you manually resume it from the web dashboard.",
|
||||||
)
|
)
|
||||||
methods: str | None = Field(
|
methods: Optional[str] = Field(
|
||||||
None,
|
None,
|
||||||
description="Specifies the allowed HTTP methods for making "
|
description="Specifies the allowed HTTP methods for making "
|
||||||
"ping requests. Must be one of the two values: an empty "
|
"ping requests. Must be one of the two values: an empty "
|
||||||
@@ -198,7 +203,7 @@ class CheckUpdate(CheckCreate):
|
|||||||
"allow HEAD, GET, and POST requests. Set this field to "
|
"allow HEAD, GET, and POST requests. Set this field to "
|
||||||
"POST to allow only POST requests.",
|
"POST to allow only POST requests.",
|
||||||
)
|
)
|
||||||
channels: str | None = Field(
|
channels: Optional[str] = Field(
|
||||||
None,
|
None,
|
||||||
description="By default, this API call assigns no integrations"
|
description="By default, this API call assigns no integrations"
|
||||||
"to the newly created check. By default, this API call "
|
"to the newly created check. By default, this API call "
|
||||||
@@ -206,7 +211,7 @@ class CheckUpdate(CheckCreate):
|
|||||||
"To assign specific integrations, use a comma-separated list "
|
"To assign specific integrations, use a comma-separated list "
|
||||||
"of integration UUIDs.",
|
"of integration UUIDs.",
|
||||||
)
|
)
|
||||||
unique: list[str | None] | None = Field(
|
unique: Optional[List[Optional[str]]] = Field(
|
||||||
None,
|
None,
|
||||||
description="Enables upsert functionality. Before creating a check, "
|
description="Enables upsert functionality. Before creating a check, "
|
||||||
"Healthchecks.io looks for existing checks, filtered by fields listed "
|
"Healthchecks.io looks for existing checks, filtered by fields listed "
|
||||||
@@ -228,10 +233,10 @@ class CheckPings(BaseModel):
|
|||||||
remote_addr: str
|
remote_addr: str
|
||||||
method: str
|
method: str
|
||||||
user_agent: str
|
user_agent: str
|
||||||
duration: float | None = None
|
duration: Optional[float] = None
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_api_result(cls, ping_dict: dict[str, str | int | datetime]) -> "CheckPings":
|
def from_api_result(cls, ping_dict: Dict[str, Union[str, int, datetime]]) -> "CheckPings":
|
||||||
"""Converts a dictionary from the healthchecks api into a CheckPings object."""
|
"""Converts a dictionary from the healthchecks api into a CheckPings object."""
|
||||||
ping_dict["number_of_pings"] = ping_dict["n"]
|
ping_dict["number_of_pings"] = ping_dict["n"]
|
||||||
ping_dict["user_agent"] = ping_dict["ua"]
|
ping_dict["user_agent"] = ping_dict["ua"]
|
||||||
|
|||||||
@@ -2,6 +2,7 @@
|
|||||||
|
|
||||||
https://healthchecks.io/docs/api/
|
https://healthchecks.io/docs/api/
|
||||||
"""
|
"""
|
||||||
|
from typing import Dict
|
||||||
|
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
|
|
||||||
@@ -14,6 +15,6 @@ class Integration(BaseModel):
|
|||||||
kind: str
|
kind: str
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_api_result(cls, integration_dict: dict[str, str]) -> "Integration":
|
def from_api_result(cls, integration_dict: Dict[str, str]) -> "Integration":
|
||||||
"""Converts a dictionary from the healthchecks api into an Integration object."""
|
"""Converts a dictionary from the healthchecks api into an Integration object."""
|
||||||
return cls(**integration_dict)
|
return cls(**integration_dict)
|
||||||
|
|||||||
@@ -13,7 +13,10 @@ from healthchecks_io import NonUniqueSlugError
|
|||||||
|
|
||||||
|
|
||||||
def test_abstract_add_url_params(test_abstract_client):
|
def test_abstract_add_url_params(test_abstract_client):
|
||||||
url = test_abstract_client._add_url_params("http://test.com/?test=test", {"test": "test2"})
|
|
||||||
|
url = test_abstract_client._add_url_params(
|
||||||
|
"http://test.com/?test=test", {"test": "test2"}
|
||||||
|
)
|
||||||
assert url == "http://test.com/?test=test2"
|
assert url == "http://test.com/?test=test2"
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -15,7 +15,9 @@ from healthchecks_io.client.exceptions import HCAPIError
|
|||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
@pytest.mark.respx
|
@pytest.mark.respx
|
||||||
async def test_acreate_check_200_context_manager(fake_check_api_result, respx_mock, test_async_client):
|
async def test_acreate_check_200_context_manager(
|
||||||
|
fake_check_api_result, respx_mock, test_async_client
|
||||||
|
):
|
||||||
checks_url = urljoin(test_async_client._api_url, "checks/")
|
checks_url = urljoin(test_async_client._api_url, "checks/")
|
||||||
respx_mock.post(checks_url).mock(
|
respx_mock.post(checks_url).mock(
|
||||||
return_value=Response(
|
return_value=Response(
|
||||||
@@ -41,7 +43,9 @@ async def test_acreate_check_200_context_manager(fake_check_api_result, respx_mo
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
async with test_async_client as test_client:
|
async with test_async_client as test_client:
|
||||||
check = await test_client.create_check(CheckCreate(name="test", tags="test", desc="test"))
|
check = await test_client.create_check(
|
||||||
|
CheckCreate(name="test", tags="test", desc="test")
|
||||||
|
)
|
||||||
assert check.name == "Backups"
|
assert check.name == "Backups"
|
||||||
|
|
||||||
|
|
||||||
@@ -72,7 +76,9 @@ async def test_acreate_check_200(fake_check_api_result, respx_mock, test_async_c
|
|||||||
},
|
},
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
check = await test_async_client.create_check(CheckCreate(name="test", tags="test", desc="test"))
|
check = await test_async_client.create_check(
|
||||||
|
CheckCreate(name="test", tags="test", desc="test")
|
||||||
|
)
|
||||||
assert check.name == "Backups"
|
assert check.name == "Backups"
|
||||||
|
|
||||||
|
|
||||||
@@ -103,7 +109,9 @@ async def test_aupdate_check_200(fake_check_api_result, respx_mock, test_async_c
|
|||||||
},
|
},
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
check = await test_async_client.update_check("test", CheckUpdate(name="test", desc="test"))
|
check = await test_async_client.update_check(
|
||||||
|
"test", CheckUpdate(name="test", desc="test")
|
||||||
|
)
|
||||||
assert check.name == "Backups"
|
assert check.name == "Backups"
|
||||||
|
|
||||||
|
|
||||||
@@ -112,7 +120,9 @@ async def test_aupdate_check_200(fake_check_api_result, respx_mock, test_async_c
|
|||||||
async def test_aget_checks_200(fake_check_api_result, respx_mock, test_async_client):
|
async def test_aget_checks_200(fake_check_api_result, respx_mock, test_async_client):
|
||||||
assert test_async_client._client is not None
|
assert test_async_client._client is not None
|
||||||
checks_url = urljoin(test_async_client._api_url, "checks/")
|
checks_url = urljoin(test_async_client._api_url, "checks/")
|
||||||
respx_mock.get(checks_url).mock(return_value=Response(status_code=200, json={"checks": [fake_check_api_result]}))
|
respx_mock.get(checks_url).mock(
|
||||||
|
return_value=Response(status_code=200, json={"checks": [fake_check_api_result]})
|
||||||
|
)
|
||||||
checks = await test_async_client.get_checks()
|
checks = await test_async_client.get_checks()
|
||||||
assert len(checks) == 1
|
assert len(checks) == 1
|
||||||
assert checks[0].name == fake_check_api_result["name"]
|
assert checks[0].name == fake_check_api_result["name"]
|
||||||
@@ -122,9 +132,13 @@ async def test_aget_checks_200(fake_check_api_result, respx_mock, test_async_cli
|
|||||||
@pytest.mark.respx
|
@pytest.mark.respx
|
||||||
async def test_aget_checks_pass_in_client(fake_check_api_result, respx_mock):
|
async def test_aget_checks_pass_in_client(fake_check_api_result, respx_mock):
|
||||||
httpx_client = HTTPXAsyncClient()
|
httpx_client = HTTPXAsyncClient()
|
||||||
test_async_client = AsyncClient(api_key="test", api_url="http://localhost/api/", client=httpx_client)
|
test_async_client = AsyncClient(
|
||||||
|
api_key="test", api_url="http://localhost/api/", client=httpx_client
|
||||||
|
)
|
||||||
checks_url = urljoin(test_async_client._api_url, "checks/")
|
checks_url = urljoin(test_async_client._api_url, "checks/")
|
||||||
respx_mock.get(checks_url).mock(return_value=Response(status_code=200, json={"checks": [fake_check_api_result]}))
|
respx_mock.get(checks_url).mock(
|
||||||
|
return_value=Response(status_code=200, json={"checks": [fake_check_api_result]})
|
||||||
|
)
|
||||||
checks = await test_async_client.get_checks()
|
checks = await test_async_client.get_checks()
|
||||||
assert len(checks) == 1
|
assert len(checks) == 1
|
||||||
assert checks[0].name == fake_check_api_result["name"]
|
assert checks[0].name == fake_check_api_result["name"]
|
||||||
@@ -132,7 +146,9 @@ async def test_aget_checks_pass_in_client(fake_check_api_result, respx_mock):
|
|||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
@pytest.mark.respx
|
@pytest.mark.respx
|
||||||
async def test_aget_checks_exceptions(fake_check_api_result, respx_mock, test_async_client):
|
async def test_aget_checks_exceptions(
|
||||||
|
fake_check_api_result, respx_mock, test_async_client
|
||||||
|
):
|
||||||
checks_url = urljoin(test_async_client._api_url, "checks/")
|
checks_url = urljoin(test_async_client._api_url, "checks/")
|
||||||
# test exceptions
|
# test exceptions
|
||||||
respx_mock.get(checks_url).mock(return_value=Response(status_code=401))
|
respx_mock.get(checks_url).mock(return_value=Response(status_code=401))
|
||||||
@@ -170,7 +186,9 @@ def test_finalizer_closes(test_async_client):
|
|||||||
async def test_aget_check_200(fake_check_api_result, respx_mock, test_async_client):
|
async def test_aget_check_200(fake_check_api_result, respx_mock, test_async_client):
|
||||||
assert test_async_client._client is not None
|
assert test_async_client._client is not None
|
||||||
checks_url = urljoin(test_async_client._api_url, "checks/test")
|
checks_url = urljoin(test_async_client._api_url, "checks/test")
|
||||||
respx_mock.get(checks_url).mock(return_value=Response(status_code=200, json=fake_check_api_result))
|
respx_mock.get(checks_url).mock(
|
||||||
|
return_value=Response(status_code=200, json=fake_check_api_result)
|
||||||
|
)
|
||||||
check = await test_async_client.get_check(check_id="test")
|
check = await test_async_client.get_check(check_id="test")
|
||||||
assert check.name == fake_check_api_result["name"]
|
assert check.name == fake_check_api_result["name"]
|
||||||
|
|
||||||
@@ -189,7 +207,9 @@ async def test_acheck_get_404(respx_mock, test_async_client):
|
|||||||
@pytest.mark.respx
|
@pytest.mark.respx
|
||||||
async def test_pause_check_200(fake_check_api_result, respx_mock, test_async_client):
|
async def test_pause_check_200(fake_check_api_result, respx_mock, test_async_client):
|
||||||
checks_url = urljoin(test_async_client._api_url, "checks/test/pause")
|
checks_url = urljoin(test_async_client._api_url, "checks/test/pause")
|
||||||
respx_mock.post(checks_url).mock(return_value=Response(status_code=200, json=fake_check_api_result))
|
respx_mock.post(checks_url).mock(
|
||||||
|
return_value=Response(status_code=200, json=fake_check_api_result)
|
||||||
|
)
|
||||||
check = await test_async_client.pause_check(check_id="test")
|
check = await test_async_client.pause_check(check_id="test")
|
||||||
assert check.name == fake_check_api_result["name"]
|
assert check.name == fake_check_api_result["name"]
|
||||||
|
|
||||||
@@ -209,7 +229,9 @@ async def test_acheck_pause_404(respx_mock, test_async_client):
|
|||||||
async def test_adelete_check_200(fake_check_api_result, respx_mock, test_async_client):
|
async def test_adelete_check_200(fake_check_api_result, respx_mock, test_async_client):
|
||||||
assert test_async_client._client is not None
|
assert test_async_client._client is not None
|
||||||
checks_url = urljoin(test_async_client._api_url, "checks/test")
|
checks_url = urljoin(test_async_client._api_url, "checks/test")
|
||||||
respx_mock.delete(checks_url).mock(return_value=Response(status_code=200, json=fake_check_api_result))
|
respx_mock.delete(checks_url).mock(
|
||||||
|
return_value=Response(status_code=200, json=fake_check_api_result)
|
||||||
|
)
|
||||||
check = await test_async_client.delete_check(check_id="test")
|
check = await test_async_client.delete_check(check_id="test")
|
||||||
assert check.name == fake_check_api_result["name"]
|
assert check.name == fake_check_api_result["name"]
|
||||||
|
|
||||||
@@ -225,9 +247,15 @@ async def test_adelete_pause404(respx_mock, test_async_client):
|
|||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
@pytest.mark.respx
|
@pytest.mark.respx
|
||||||
async def test_aget_check_pings_200(fake_check_pings_api_result, respx_mock, test_async_client):
|
async def test_aget_check_pings_200(
|
||||||
|
fake_check_pings_api_result, respx_mock, test_async_client
|
||||||
|
):
|
||||||
checks_url = urljoin(test_async_client._api_url, "checks/test/pings/")
|
checks_url = urljoin(test_async_client._api_url, "checks/test/pings/")
|
||||||
respx_mock.get(checks_url).mock(return_value=Response(status_code=200, json={"pings": fake_check_pings_api_result}))
|
respx_mock.get(checks_url).mock(
|
||||||
|
return_value=Response(
|
||||||
|
status_code=200, json={"pings": fake_check_pings_api_result}
|
||||||
|
)
|
||||||
|
)
|
||||||
pings = await test_async_client.get_check_pings("test")
|
pings = await test_async_client.get_check_pings("test")
|
||||||
assert len(pings) == len(fake_check_pings_api_result)
|
assert len(pings) == len(fake_check_pings_api_result)
|
||||||
assert pings[0].type == fake_check_pings_api_result[0]["type"]
|
assert pings[0].type == fake_check_pings_api_result[0]["type"]
|
||||||
@@ -235,9 +263,13 @@ async def test_aget_check_pings_200(fake_check_pings_api_result, respx_mock, tes
|
|||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
@pytest.mark.respx
|
@pytest.mark.respx
|
||||||
async def test_aget_check_flips_200(fake_check_flips_api_result, respx_mock, test_async_client):
|
async def test_aget_check_flips_200(
|
||||||
|
fake_check_flips_api_result, respx_mock, test_async_client
|
||||||
|
):
|
||||||
checks_url = urljoin(test_async_client._api_url, "checks/test/flips/")
|
checks_url = urljoin(test_async_client._api_url, "checks/test/flips/")
|
||||||
respx_mock.get(checks_url).mock(return_value=Response(status_code=200, json=fake_check_flips_api_result))
|
respx_mock.get(checks_url).mock(
|
||||||
|
return_value=Response(status_code=200, json=fake_check_flips_api_result)
|
||||||
|
)
|
||||||
flips = await test_async_client.get_check_flips("test")
|
flips = await test_async_client.get_check_flips("test")
|
||||||
assert len(flips) == len(fake_check_flips_api_result)
|
assert len(flips) == len(fake_check_flips_api_result)
|
||||||
assert flips[0].up == fake_check_flips_api_result[0]["up"]
|
assert flips[0].up == fake_check_flips_api_result[0]["up"]
|
||||||
@@ -245,9 +277,15 @@ async def test_aget_check_flips_200(fake_check_flips_api_result, respx_mock, tes
|
|||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
@pytest.mark.respx
|
@pytest.mark.respx
|
||||||
async def test_get_check_flips_params_200(fake_check_flips_api_result, respx_mock, test_async_client):
|
async def test_get_check_flips_params_200(
|
||||||
checks_url = urljoin(test_async_client._api_url, "checks/test/flips/?seconds=1&start=1&end=1")
|
fake_check_flips_api_result, respx_mock, test_async_client
|
||||||
respx_mock.get(checks_url).mock(return_value=Response(status_code=200, json=fake_check_flips_api_result))
|
):
|
||||||
|
checks_url = urljoin(
|
||||||
|
test_async_client._api_url, "checks/test/flips/?seconds=1&start=1&end=1"
|
||||||
|
)
|
||||||
|
respx_mock.get(checks_url).mock(
|
||||||
|
return_value=Response(status_code=200, json=fake_check_flips_api_result)
|
||||||
|
)
|
||||||
flips = await test_async_client.get_check_flips("test", seconds=1, start=1, end=1)
|
flips = await test_async_client.get_check_flips("test", seconds=1, start=1, end=1)
|
||||||
assert len(flips) == len(fake_check_flips_api_result)
|
assert len(flips) == len(fake_check_flips_api_result)
|
||||||
assert flips[0].up == fake_check_flips_api_result[0]["up"]
|
assert flips[0].up == fake_check_flips_api_result[0]["up"]
|
||||||
@@ -255,7 +293,9 @@ async def test_get_check_flips_params_200(fake_check_flips_api_result, respx_moc
|
|||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
@pytest.mark.respx
|
@pytest.mark.respx
|
||||||
async def test_aget_check_flips_400(fake_check_flips_api_result, respx_mock, test_async_client):
|
async def test_aget_check_flips_400(
|
||||||
|
fake_check_flips_api_result, respx_mock, test_async_client
|
||||||
|
):
|
||||||
flips_url = urljoin(test_async_client._api_url, "checks/test/flips/")
|
flips_url = urljoin(test_async_client._api_url, "checks/test/flips/")
|
||||||
respx_mock.get(flips_url).mock(return_value=Response(status_code=400))
|
respx_mock.get(flips_url).mock(return_value=Response(status_code=400))
|
||||||
with pytest.raises(BadAPIRequestError):
|
with pytest.raises(BadAPIRequestError):
|
||||||
@@ -264,9 +304,13 @@ async def test_aget_check_flips_400(fake_check_flips_api_result, respx_mock, tes
|
|||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
@pytest.mark.respx
|
@pytest.mark.respx
|
||||||
async def test_aget_integrations(fake_integrations_api_result, respx_mock, test_async_client):
|
async def test_aget_integrations(
|
||||||
|
fake_integrations_api_result, respx_mock, test_async_client
|
||||||
|
):
|
||||||
channels_url = urljoin(test_async_client._api_url, "channels/")
|
channels_url = urljoin(test_async_client._api_url, "channels/")
|
||||||
respx_mock.get(channels_url).mock(return_value=Response(status_code=200, json=fake_integrations_api_result))
|
respx_mock.get(channels_url).mock(
|
||||||
|
return_value=Response(status_code=200, json=fake_integrations_api_result)
|
||||||
|
)
|
||||||
integrations = await test_async_client.get_integrations()
|
integrations = await test_async_client.get_integrations()
|
||||||
assert len(integrations) == len(fake_integrations_api_result["channels"])
|
assert len(integrations) == len(fake_integrations_api_result["channels"])
|
||||||
assert integrations[0].id == fake_integrations_api_result["channels"][0]["id"]
|
assert integrations[0].id == fake_integrations_api_result["channels"][0]["id"]
|
||||||
@@ -276,7 +320,9 @@ async def test_aget_integrations(fake_integrations_api_result, respx_mock, test_
|
|||||||
@pytest.mark.respx
|
@pytest.mark.respx
|
||||||
async def test_aget_badges(fake_badges_api_result, respx_mock, test_async_client):
|
async def test_aget_badges(fake_badges_api_result, respx_mock, test_async_client):
|
||||||
channels_url = urljoin(test_async_client._api_url, "badges/")
|
channels_url = urljoin(test_async_client._api_url, "badges/")
|
||||||
respx_mock.get(channels_url).mock(return_value=Response(status_code=200, json=fake_badges_api_result))
|
respx_mock.get(channels_url).mock(
|
||||||
|
return_value=Response(status_code=200, json=fake_badges_api_result)
|
||||||
|
)
|
||||||
integrations = await test_async_client.get_badges()
|
integrations = await test_async_client.get_badges()
|
||||||
assert integrations.keys() == fake_badges_api_result["badges"].keys()
|
assert integrations.keys() == fake_badges_api_result["badges"].keys()
|
||||||
|
|
||||||
@@ -343,10 +389,14 @@ ping_test_parameters = [
|
|||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
@pytest.mark.respx
|
@pytest.mark.respx
|
||||||
@pytest.mark.parametrize("respx_mocker, tc, url, ping_method, method_kwargs", ping_test_parameters)
|
@pytest.mark.parametrize(
|
||||||
|
"respx_mocker, tc, url, ping_method, method_kwargs", ping_test_parameters
|
||||||
|
)
|
||||||
async def test_asuccess_ping(respx_mocker, tc, url, ping_method, method_kwargs):
|
async def test_asuccess_ping(respx_mocker, tc, url, ping_method, method_kwargs):
|
||||||
channels_url = urljoin(tc._ping_url, url)
|
channels_url = urljoin(tc._ping_url, url)
|
||||||
respx_mocker.post(channels_url).mock(return_value=Response(status_code=200, text="OK"))
|
respx_mocker.post(channels_url).mock(
|
||||||
|
return_value=Response(status_code=200, text="OK")
|
||||||
|
)
|
||||||
ping_method = getattr(tc, ping_method)
|
ping_method = getattr(tc, ping_method)
|
||||||
result = await ping_method(**method_kwargs)
|
result = await ping_method(**method_kwargs)
|
||||||
assert result[0] is True
|
assert result[0] is True
|
||||||
|
|||||||
@@ -76,6 +76,7 @@ async def test_check_trap_async_exception(respx_mock, test_async_client):
|
|||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_check_trap_wrong_client_error(test_client, test_async_client):
|
async def test_check_trap_wrong_client_error(test_client, test_async_client):
|
||||||
|
|
||||||
with pytest.raises(WrongClientError):
|
with pytest.raises(WrongClientError):
|
||||||
async with CheckTrap(test_client, uuid="test"):
|
async with CheckTrap(test_client, uuid="test"):
|
||||||
pass
|
pass
|
||||||
|
|||||||
@@ -14,7 +14,9 @@ from healthchecks_io.client.exceptions import HCAPIError
|
|||||||
|
|
||||||
|
|
||||||
@pytest.mark.respx
|
@pytest.mark.respx
|
||||||
def test_create_check_200_context_manager(fake_check_api_result, respx_mock, test_client):
|
def test_create_check_200_context_manager(
|
||||||
|
fake_check_api_result, respx_mock, test_client
|
||||||
|
):
|
||||||
checks_url = urljoin(test_client._api_url, "checks/")
|
checks_url = urljoin(test_client._api_url, "checks/")
|
||||||
respx_mock.post(checks_url).mock(
|
respx_mock.post(checks_url).mock(
|
||||||
return_value=Response(
|
return_value=Response(
|
||||||
@@ -108,7 +110,9 @@ def test_update_check_200(fake_check_api_result, respx_mock, test_client):
|
|||||||
def test_get_checks_200(fake_check_api_result, respx_mock, test_client):
|
def test_get_checks_200(fake_check_api_result, respx_mock, test_client):
|
||||||
assert test_client._client is not None
|
assert test_client._client is not None
|
||||||
checks_url = urljoin(test_client._api_url, "checks/")
|
checks_url = urljoin(test_client._api_url, "checks/")
|
||||||
respx_mock.get(checks_url).mock(return_value=Response(status_code=200, json={"checks": [fake_check_api_result]}))
|
respx_mock.get(checks_url).mock(
|
||||||
|
return_value=Response(status_code=200, json={"checks": [fake_check_api_result]})
|
||||||
|
)
|
||||||
checks = test_client.get_checks()
|
checks = test_client.get_checks()
|
||||||
assert len(checks) == 1
|
assert len(checks) == 1
|
||||||
assert checks[0].name == fake_check_api_result["name"]
|
assert checks[0].name == fake_check_api_result["name"]
|
||||||
@@ -117,9 +121,13 @@ def test_get_checks_200(fake_check_api_result, respx_mock, test_client):
|
|||||||
@pytest.mark.respx
|
@pytest.mark.respx
|
||||||
def test_get_checks_pass_in_client(fake_check_api_result, respx_mock):
|
def test_get_checks_pass_in_client(fake_check_api_result, respx_mock):
|
||||||
httpx_client = HTTPXClient()
|
httpx_client = HTTPXClient()
|
||||||
test_client = Client(api_key="test", api_url="http://localhost/api/", client=httpx_client)
|
test_client = Client(
|
||||||
|
api_key="test", api_url="http://localhost/api/", client=httpx_client
|
||||||
|
)
|
||||||
checks_url = urljoin(test_client._api_url, "checks/")
|
checks_url = urljoin(test_client._api_url, "checks/")
|
||||||
respx_mock.get(checks_url).mock(return_value=Response(status_code=200, json={"checks": [fake_check_api_result]}))
|
respx_mock.get(checks_url).mock(
|
||||||
|
return_value=Response(status_code=200, json={"checks": [fake_check_api_result]})
|
||||||
|
)
|
||||||
checks = test_client.get_checks()
|
checks = test_client.get_checks()
|
||||||
assert len(checks) == 1
|
assert len(checks) == 1
|
||||||
assert checks[0].name == fake_check_api_result["name"]
|
assert checks[0].name == fake_check_api_result["name"]
|
||||||
@@ -161,7 +169,9 @@ def test_finalizer_closes(test_client):
|
|||||||
def test_get_check_200(fake_check_api_result, respx_mock, test_client):
|
def test_get_check_200(fake_check_api_result, respx_mock, test_client):
|
||||||
assert test_client._client is not None
|
assert test_client._client is not None
|
||||||
checks_url = urljoin(test_client._api_url, "checks/test")
|
checks_url = urljoin(test_client._api_url, "checks/test")
|
||||||
respx_mock.get(checks_url).mock(return_value=Response(status_code=200, json=fake_check_api_result))
|
respx_mock.get(checks_url).mock(
|
||||||
|
return_value=Response(status_code=200, json=fake_check_api_result)
|
||||||
|
)
|
||||||
check = test_client.get_check(check_id="test")
|
check = test_client.get_check(check_id="test")
|
||||||
assert check.name == fake_check_api_result["name"]
|
assert check.name == fake_check_api_result["name"]
|
||||||
|
|
||||||
@@ -178,7 +188,9 @@ def test_check_get_404(respx_mock, test_client):
|
|||||||
@pytest.mark.respx
|
@pytest.mark.respx
|
||||||
def test_pause_check_200(fake_check_api_result, respx_mock, test_client):
|
def test_pause_check_200(fake_check_api_result, respx_mock, test_client):
|
||||||
checks_url = urljoin(test_client._api_url, "checks/test/pause")
|
checks_url = urljoin(test_client._api_url, "checks/test/pause")
|
||||||
respx_mock.post(checks_url).mock(return_value=Response(status_code=200, json=fake_check_api_result))
|
respx_mock.post(checks_url).mock(
|
||||||
|
return_value=Response(status_code=200, json=fake_check_api_result)
|
||||||
|
)
|
||||||
check = test_client.pause_check(check_id="test")
|
check = test_client.pause_check(check_id="test")
|
||||||
assert check.name == fake_check_api_result["name"]
|
assert check.name == fake_check_api_result["name"]
|
||||||
|
|
||||||
@@ -196,7 +208,9 @@ def test_check_pause_404(respx_mock, test_client):
|
|||||||
def test_delete_check_200(fake_check_api_result, respx_mock, test_client):
|
def test_delete_check_200(fake_check_api_result, respx_mock, test_client):
|
||||||
assert test_client._client is not None
|
assert test_client._client is not None
|
||||||
checks_url = urljoin(test_client._api_url, "checks/test")
|
checks_url = urljoin(test_client._api_url, "checks/test")
|
||||||
respx_mock.delete(checks_url).mock(return_value=Response(status_code=200, json=fake_check_api_result))
|
respx_mock.delete(checks_url).mock(
|
||||||
|
return_value=Response(status_code=200, json=fake_check_api_result)
|
||||||
|
)
|
||||||
check = test_client.delete_check(check_id="test")
|
check = test_client.delete_check(check_id="test")
|
||||||
assert check.name == fake_check_api_result["name"]
|
assert check.name == fake_check_api_result["name"]
|
||||||
|
|
||||||
@@ -212,7 +226,11 @@ def test_delete_pause404(respx_mock, test_client):
|
|||||||
@pytest.mark.respx
|
@pytest.mark.respx
|
||||||
def test_get_check_pings_200(fake_check_pings_api_result, respx_mock, test_client):
|
def test_get_check_pings_200(fake_check_pings_api_result, respx_mock, test_client):
|
||||||
checks_url = urljoin(test_client._api_url, "checks/test/pings/")
|
checks_url = urljoin(test_client._api_url, "checks/test/pings/")
|
||||||
respx_mock.get(checks_url).mock(return_value=Response(status_code=200, json={"pings": fake_check_pings_api_result}))
|
respx_mock.get(checks_url).mock(
|
||||||
|
return_value=Response(
|
||||||
|
status_code=200, json={"pings": fake_check_pings_api_result}
|
||||||
|
)
|
||||||
|
)
|
||||||
pings = test_client.get_check_pings("test")
|
pings = test_client.get_check_pings("test")
|
||||||
assert len(pings) == len(fake_check_pings_api_result)
|
assert len(pings) == len(fake_check_pings_api_result)
|
||||||
assert pings[0].type == fake_check_pings_api_result[0]["type"]
|
assert pings[0].type == fake_check_pings_api_result[0]["type"]
|
||||||
@@ -221,16 +239,24 @@ def test_get_check_pings_200(fake_check_pings_api_result, respx_mock, test_clien
|
|||||||
@pytest.mark.respx
|
@pytest.mark.respx
|
||||||
def test_get_check_flips_200(fake_check_flips_api_result, respx_mock, test_client):
|
def test_get_check_flips_200(fake_check_flips_api_result, respx_mock, test_client):
|
||||||
checks_url = urljoin(test_client._api_url, "checks/test/flips/")
|
checks_url = urljoin(test_client._api_url, "checks/test/flips/")
|
||||||
respx_mock.get(checks_url).mock(return_value=Response(status_code=200, json=fake_check_flips_api_result))
|
respx_mock.get(checks_url).mock(
|
||||||
|
return_value=Response(status_code=200, json=fake_check_flips_api_result)
|
||||||
|
)
|
||||||
flips = test_client.get_check_flips("test")
|
flips = test_client.get_check_flips("test")
|
||||||
assert len(flips) == len(fake_check_flips_api_result)
|
assert len(flips) == len(fake_check_flips_api_result)
|
||||||
assert flips[0].up == fake_check_flips_api_result[0]["up"]
|
assert flips[0].up == fake_check_flips_api_result[0]["up"]
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.respx
|
@pytest.mark.respx
|
||||||
def test_get_check_flips_params_200(fake_check_flips_api_result, respx_mock, test_client):
|
def test_get_check_flips_params_200(
|
||||||
checks_url = urljoin(test_client._api_url, "checks/test/flips/?seconds=1&start=1&end=1")
|
fake_check_flips_api_result, respx_mock, test_client
|
||||||
respx_mock.get(checks_url).mock(return_value=Response(status_code=200, json=fake_check_flips_api_result))
|
):
|
||||||
|
checks_url = urljoin(
|
||||||
|
test_client._api_url, "checks/test/flips/?seconds=1&start=1&end=1"
|
||||||
|
)
|
||||||
|
respx_mock.get(checks_url).mock(
|
||||||
|
return_value=Response(status_code=200, json=fake_check_flips_api_result)
|
||||||
|
)
|
||||||
flips = test_client.get_check_flips("test", seconds=1, start=1, end=1)
|
flips = test_client.get_check_flips("test", seconds=1, start=1, end=1)
|
||||||
assert len(flips) == len(fake_check_flips_api_result)
|
assert len(flips) == len(fake_check_flips_api_result)
|
||||||
assert flips[0].up == fake_check_flips_api_result[0]["up"]
|
assert flips[0].up == fake_check_flips_api_result[0]["up"]
|
||||||
@@ -247,7 +273,9 @@ def test_get_check_flips_400(fake_check_flips_api_result, respx_mock, test_clien
|
|||||||
@pytest.mark.respx
|
@pytest.mark.respx
|
||||||
def test_get_integrations(fake_integrations_api_result, respx_mock, test_client):
|
def test_get_integrations(fake_integrations_api_result, respx_mock, test_client):
|
||||||
channels_url = urljoin(test_client._api_url, "channels/")
|
channels_url = urljoin(test_client._api_url, "channels/")
|
||||||
respx_mock.get(channels_url).mock(return_value=Response(status_code=200, json=fake_integrations_api_result))
|
respx_mock.get(channels_url).mock(
|
||||||
|
return_value=Response(status_code=200, json=fake_integrations_api_result)
|
||||||
|
)
|
||||||
integrations = test_client.get_integrations()
|
integrations = test_client.get_integrations()
|
||||||
assert len(integrations) == len(fake_integrations_api_result["channels"])
|
assert len(integrations) == len(fake_integrations_api_result["channels"])
|
||||||
assert integrations[0].id == fake_integrations_api_result["channels"][0]["id"]
|
assert integrations[0].id == fake_integrations_api_result["channels"][0]["id"]
|
||||||
@@ -256,7 +284,9 @@ def test_get_integrations(fake_integrations_api_result, respx_mock, test_client)
|
|||||||
@pytest.mark.respx
|
@pytest.mark.respx
|
||||||
def test_get_badges(fake_badges_api_result, respx_mock, test_client):
|
def test_get_badges(fake_badges_api_result, respx_mock, test_client):
|
||||||
channels_url = urljoin(test_client._api_url, "badges/")
|
channels_url = urljoin(test_client._api_url, "badges/")
|
||||||
respx_mock.get(channels_url).mock(return_value=Response(status_code=200, json=fake_badges_api_result))
|
respx_mock.get(channels_url).mock(
|
||||||
|
return_value=Response(status_code=200, json=fake_badges_api_result)
|
||||||
|
)
|
||||||
integrations = test_client.get_badges()
|
integrations = test_client.get_badges()
|
||||||
assert integrations.keys() == fake_badges_api_result["badges"].keys()
|
assert integrations.keys() == fake_badges_api_result["badges"].keys()
|
||||||
|
|
||||||
@@ -322,10 +352,14 @@ ping_test_parameters = [
|
|||||||
|
|
||||||
|
|
||||||
@pytest.mark.respx
|
@pytest.mark.respx
|
||||||
@pytest.mark.parametrize("respx_mocker, tc, url, ping_method, method_kwargs", ping_test_parameters)
|
@pytest.mark.parametrize(
|
||||||
|
"respx_mocker, tc, url, ping_method, method_kwargs", ping_test_parameters
|
||||||
|
)
|
||||||
def test_success_ping(respx_mocker, tc, url, ping_method, method_kwargs):
|
def test_success_ping(respx_mocker, tc, url, ping_method, method_kwargs):
|
||||||
channels_url = urljoin(tc._ping_url, url)
|
channels_url = urljoin(tc._ping_url, url)
|
||||||
respx_mocker.post(channels_url).mock(return_value=Response(status_code=200, text="OK"))
|
respx_mocker.post(channels_url).mock(
|
||||||
|
return_value=Response(status_code=200, text="OK")
|
||||||
|
)
|
||||||
ping_method = getattr(tc, ping_method)
|
ping_method = getattr(tc, ping_method)
|
||||||
result, text = ping_method(**method_kwargs)
|
result, text = ping_method(**method_kwargs)
|
||||||
assert result is True
|
assert result is True
|
||||||
|
|||||||
@@ -1,4 +1,6 @@
|
|||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
from typing import Dict
|
||||||
|
from typing import Union
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
@@ -9,7 +11,7 @@ from healthchecks_io.schemas import checks
|
|||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def fake_check_api_result() -> dict[str, str | int]:
|
def fake_check_api_result() -> Dict[str, Union[str, int]]:
|
||||||
yield {
|
yield {
|
||||||
"name": "Test Check",
|
"name": "Test Check",
|
||||||
"slug": "Test Check",
|
"slug": "Test Check",
|
||||||
@@ -31,7 +33,7 @@ def fake_check_api_result() -> dict[str, str | int]:
|
|||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def fake_check_ro_api_result() -> dict[str, str | int]:
|
def fake_check_ro_api_result() -> Dict[str, Union[str, int]]:
|
||||||
yield {
|
yield {
|
||||||
"name": "Test Check",
|
"name": "Test Check",
|
||||||
"slug": "Test Check",
|
"slug": "Test Check",
|
||||||
|
|||||||
@@ -37,19 +37,27 @@ def test_check_create_validators():
|
|||||||
|
|
||||||
# test validate_schedule
|
# test validate_schedule
|
||||||
with pytest.raises(ValidationError):
|
with pytest.raises(ValidationError):
|
||||||
check_create = checks.CheckCreate(name="Test", tags="", desc="Test", schedule="no good")
|
check_create = checks.CheckCreate(
|
||||||
|
name="Test", tags="", desc="Test", schedule="no good"
|
||||||
|
)
|
||||||
|
|
||||||
# test validate_tz
|
# test validate_tz
|
||||||
with pytest.raises(ValidationError):
|
with pytest.raises(ValidationError):
|
||||||
check_create = checks.CheckCreate(name="Test", tags="", desc="Test", tz="no good")
|
check_create = checks.CheckCreate(
|
||||||
|
name="Test", tags="", desc="Test", tz="no good"
|
||||||
|
)
|
||||||
|
|
||||||
# test validate_methods
|
# test validate_methods
|
||||||
with pytest.raises(ValidationError):
|
with pytest.raises(ValidationError):
|
||||||
check_create = checks.CheckCreate(name="Test", tags="", desc="Test", methods="no good")
|
check_create = checks.CheckCreate(
|
||||||
|
name="Test", tags="", desc="Test", methods="no good"
|
||||||
|
)
|
||||||
|
|
||||||
# test validate_unique
|
# test validate_unique
|
||||||
with pytest.raises(ValidationError):
|
with pytest.raises(ValidationError):
|
||||||
check_create = checks.CheckCreate(name="Test", tags="", desc="Test", unique=["no good"])
|
check_create = checks.CheckCreate(
|
||||||
|
name="Test", tags="", desc="Test", unique=["no good"]
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def test_check_pings_from_api():
|
def test_check_pings_from_api():
|
||||||
|
|||||||
Reference in New Issue
Block a user