Files
py-healthchecks.io/src/healthchecks_io/client/_abstract.py
2024-05-05 13:39:25 -05:00

270 lines
10 KiB
Python

from abc import ABC
from abc import abstractmethod
from typing import Any
from typing import Dict
from typing import Optional
from typing import Union
from urllib.parse import parse_qsl
from urllib.parse import ParseResult
from urllib.parse import unquote
from urllib.parse import urlencode
from urllib.parse import urljoin
from urllib.parse import urlparse
from weakref import finalize
from httpx import Response
from .exceptions import BadAPIRequestError
from .exceptions import CheckNotFoundError
from .exceptions import HCAPIAuthError
from .exceptions import HCAPIError
from .exceptions import HCAPIRateLimitError
from .exceptions import NonUniqueSlugError
class AbstractClient(ABC):
"""An abstract client class that can be implemented by client classes."""
def __init__(
self,
api_key: str = "",
ping_key: str = "",
api_url: str = "https://healthchecks.io/api/",
ping_url: str = "https://hc-ping.com/",
api_version: int = 1,
) -> None:
"""An AbstractClient that other clients can implement.
Args:
api_key (str): Healthchecks.io API key. Defaults to an empty string.
ping_key (str): Healthchecks.io Ping key. Defaults to an empty string.
api_url (str): API URL. Defaults to "https://healthchecks.io/api/".
ping_url (str): Ping API url. Defaults to "https://hc-ping.com/".
api_version (int): Versiopn of the api to use. Defaults to 1.
"""
self._api_key = api_key
self._ping_key = ping_key
if not api_url.endswith("/"):
api_url = f"{api_url}/"
if not ping_url.endswith("/"):
ping_url = f"{ping_url}/"
self._api_url = urljoin(api_url, f"v{api_version}/")
self._ping_url = ping_url
self._finalizer = finalize(self, self._finalizer_method)
@abstractmethod
def _finalizer_method(self) -> None: # pragma: no cover
"""Finalizer method is called by weakref.finalize when the object is dereferenced to do cleanup of clients."""
pass
def _get_api_request_url(self, path: str, params: Optional[Dict[str, Any]] = None) -> str:
"""Get a full request url for the healthchecks api.
Args:
path (str): Path to request from
params (Optional[Dict[str, str]], optional): URL Parameters. Defaults to None.
Returns:
str: url
"""
url = urljoin(self._api_url, path)
return self._add_url_params(url, params) if params is not None else url
def _get_ping_url(self, uuid: str, slug: str, endpoint: str) -> str:
"""Get a url for sending a ping.
Can take either a UUID or a Slug, but not both.
Args:
uuid (str): uuid of a check
slug (str): slug of a check
endpoint (str): Endpoint to request
Raises:
BadAPIRequestError: Raised if you pass a uuid and a slug, or if pinging by a slug and do not have a
ping key set
Returns:
str: url for this
"""
if uuid == "" and slug == "" or uuid != "" and slug != "":
raise BadAPIRequestError("Must pass a uuid or a slug")
if slug != "" and self._ping_key == "":
raise BadAPIRequestError("If pinging by slug, must have a ping key set")
if uuid != "":
return self._get_ping_url_uuid(uuid, endpoint)
return self._get_ping_url_slug(slug, endpoint)
def _get_ping_url_uuid(self, uuid: str, endpoint: str) -> str:
"""Get a ping url for a check with a uuid.
Args:
uuid (str): uuid of a check
endpoint (str): Endpoint to request
Returns:
str: ping url
"""
return urljoin(self._ping_url, f"{uuid}{endpoint}")
def _get_ping_url_slug(self, slug: str, endpoint: str) -> str:
"""Get a ping url for a check with a slug.
Args:
slug (str): slug of a check
endpoint (str): Endpoint to request
Returns:
str: ping url
"""
return urljoin(self._ping_url, f"{self._ping_key}/{slug}{endpoint}")
@property
def is_closed(self) -> bool:
"""Is the client closed?
Returns:
bool: is the client closed
"""
return self._client.is_closed # type: ignore
@staticmethod
def check_response(response: Response) -> Response:
"""Checks a healthchecks.io response.
Args:
response (Response): a response from the healthchecks.io api
Raises:
HCAPIAuthError: Raised when status_code == 401 or 403
HCAPIError: Raised when status_code is 5xx
CheckNotFoundError: Raised when status_code is 404
BadAPIRequestError: Raised when status_code is 400
HCAPIRateLimitError: Raised when status code is 429
Returns:
Response: the passed in response object
"""
if response.status_code == 401 or response.status_code == 403:
raise HCAPIAuthError("Auth failure when getting checks")
if str(response.status_code).startswith("5"):
raise HCAPIError(
f"Error when reaching out to HC API at {response.request.url}. "
f"Status Code {response.status_code}. Response {response.text}"
)
if response.status_code == 429:
raise HCAPIRateLimitError(f"Rate limited on {response.request.url}")
if response.status_code == 404:
raise CheckNotFoundError(f"CHeck not found at {response.request.url}")
if response.status_code == 400:
raise BadAPIRequestError(f"Bad request when requesting {response.request.url}. {response.text}")
return response
@staticmethod
def check_ping_response(response: Response) -> Response:
"""Checks a healthchecks.io ping response.
Args:
response (Response): a response from the healthchecks.io api
Raises:
HCAPIAuthError: Raised when status_code == 401 or 403
HCAPIError: Raised when status_code is 5xx
CheckNotFoundError: Raised when status_code is 404 or response text has "not found" in it
BadAPIRequestError: Raised when status_code is 400
HCAPIRateLimitError: Raised when status code is 429 or response text has "rate limited" in it
NonUniqueSlugError: Raused when status code is 409.
Returns:
Response: the passed in response object
"""
if response.status_code == 401 or response.status_code == 403:
raise HCAPIAuthError("Auth failure when pinging")
if str(response.status_code).startswith("5"):
raise HCAPIError(
f"Error when reaching out to HC API at {response.request.url}. "
f"Status Code {response.status_code}. Response {response.text}"
)
# ping api docs say it can return a 200 with not found for a not found check.
# in my testing, its always a 404, but this will cover what the docs say
# https://healthchecks.io/docs/http_api/
if response.status_code == 404 or "not found" in response.text:
raise CheckNotFoundError(f"CHeck not found at {response.request.url}")
if "rate limited" in response.text or response.status_code == 429:
raise HCAPIRateLimitError(f"Rate limited on {response.request.url}")
if response.status_code == 400:
raise BadAPIRequestError(f"Bad request when requesting {response.request.url}. {response.text}")
if response.status_code == 409:
raise NonUniqueSlugError(f"Bad request, slug conflict {response.request.url}. {response.text}")
return response
@staticmethod
def _add_url_params(url: str, params: Dict[str, Union[str, int, bool]], replace: bool = True) -> str:
"""Add GET params to provided URL being aware of existing.
:param url: string of target URL
:param params: dict containing requested params to be added
:param replace: bool True If true, replace params if they exist with new values, otherwise append
:return: string with updated URL
>> url = 'http://stackoverflow.com/test?answers=true'
>> new_params = {'answers': False, 'data': ['some','values']}
>> add_url_params(url, new_params)
'http://stackoverflow.com/test?data=some&data=values&answers=false'
"""
# Unquoting URL first so we don't loose existing args
url = unquote(url)
# Extracting url info
parsed_url = urlparse(url)
# Extracting URL arguments from parsed URL
get_args = parsed_url.query
# Converting URL arguments to dict
parsed_get_args = dict(parse_qsl(get_args))
# we want all string values
parsed_params = {k: str(val) for k, val in params.items()}
if replace:
# Merging URL arguments dict with new params
parsed_get_args.update(parsed_params)
extra_parameters = ""
else:
# get all the duplicated keys from params and urlencode them, we'll concat this to the params string later
duplicated_params = [x for x in params if x in parsed_get_args]
# get all the args that aren't duplicated and add them to parsed_get_args
parsed_get_args.update({key: parsed_params[key] for key in [x for x in params if x not in parsed_get_args]})
# if we have any duplicated parameters, urlencode them, we append them later
extra_parameters = (
f"&{urlencode({key: params[key] for key in duplicated_params}, doseq=True)}"
if len(duplicated_params) > 0
else ""
)
# Converting URL argument to proper query string
encoded_get_args = f"{urlencode(parsed_get_args, doseq=True)}{extra_parameters}"
# Creating new parsed result object based on provided with new
# URL arguments. Same thing happens inside of urlparse.
new_url = ParseResult(
parsed_url.scheme,
parsed_url.netloc,
parsed_url.path,
parsed_url.params,
encoded_get_args,
parsed_url.fragment,
).geturl()
return new_url