mirror of
https://github.com/andrewthetechie/py-healthchecks.io.git
synced 2025-12-06 01:28:26 +01:00
feat: add python 3.11 support (#392)
* feat: add python 3.11 support - adds testing for python 3.11 - moves from black and flake8 to ruff for linting - Updates coverage output from tests * fix: fix dev dependency group * fix: add installing dev dependencies * ci: add toml * ci: simplify coverage * ci: update coverage
This commit is contained in:
@@ -42,9 +42,7 @@ class AsyncClient(AbstractClient):
|
||||
client (Optional[HTTPXAsyncClient], optional): A httpx.Asyncclient. If not
|
||||
passed in, one will be created for this object. Defaults to None.
|
||||
"""
|
||||
self._client: HTTPXAsyncClient = (
|
||||
HTTPXAsyncClient() if client is None else client
|
||||
)
|
||||
self._client: HTTPXAsyncClient = HTTPXAsyncClient() if client is None else client
|
||||
super().__init__(
|
||||
api_key=api_key,
|
||||
ping_key=ping_key,
|
||||
@@ -53,9 +51,7 @@ class AsyncClient(AbstractClient):
|
||||
api_version=api_version,
|
||||
)
|
||||
self._client.headers["X-Api-Key"] = self._api_key
|
||||
self._client.headers[
|
||||
"user-agent"
|
||||
] = f"py-healthchecks.io-async/{client_version}"
|
||||
self._client.headers["user-agent"] = f"py-healthchecks.io-async/{client_version}"
|
||||
self._client.headers["Content-type"] = "application/json"
|
||||
|
||||
async def __aenter__(self) -> "AsyncClient":
|
||||
@@ -97,9 +93,7 @@ class AsyncClient(AbstractClient):
|
||||
Check: check that was just created
|
||||
"""
|
||||
request_url = self._get_api_request_url("checks/")
|
||||
response = self.check_response(
|
||||
await self._client.post(request_url, json=new_check.dict(exclude_none=True))
|
||||
)
|
||||
response = self.check_response(await self._client.post(request_url, json=new_check.dict(exclude_none=True)))
|
||||
return Check.from_api_result(response.json())
|
||||
|
||||
async def update_check(self, uuid: str, update_check: CheckCreate) -> Check:
|
||||
@@ -143,16 +137,11 @@ class AsyncClient(AbstractClient):
|
||||
request_url = self._get_api_request_url("checks/")
|
||||
if tags is not None:
|
||||
for tag in tags:
|
||||
request_url = self._add_url_params(
|
||||
request_url, {"tag": tag}, replace=False
|
||||
)
|
||||
request_url = self._add_url_params(request_url, {"tag": tag}, replace=False)
|
||||
|
||||
response = self.check_response(await self._client.get(request_url))
|
||||
|
||||
return [
|
||||
Check.from_api_result(check_data)
|
||||
for check_data in response.json()["checks"]
|
||||
]
|
||||
return [Check.from_api_result(check_data) for check_data in response.json()["checks"]]
|
||||
|
||||
async def get_check(self, check_id: str) -> Check:
|
||||
"""Get a single check by id.
|
||||
@@ -247,10 +236,7 @@ class AsyncClient(AbstractClient):
|
||||
"""
|
||||
request_url = self._get_api_request_url(f"checks/{check_id}/pings/")
|
||||
response = self.check_response(await self._client.get(request_url))
|
||||
return [
|
||||
CheckPings.from_api_result(check_data)
|
||||
for check_data in response.json()["pings"]
|
||||
]
|
||||
return [CheckPings.from_api_result(check_data) for check_data in response.json()["pings"]]
|
||||
|
||||
async def get_check_flips(
|
||||
self,
|
||||
@@ -274,8 +260,10 @@ class AsyncClient(AbstractClient):
|
||||
Args:
|
||||
check_id (str): check uuid
|
||||
seconds (Optional[int], optional): Returns the flips from the last value seconds. Defaults to None.
|
||||
start (Optional[int], optional): Returns flips that are newer than the specified UNIX timestamp.. Defaults to None.
|
||||
end (Optional[int], optional): Returns flips that are older than the specified UNIX timestamp.. Defaults to None.
|
||||
start (Optional[int], optional): Returns flips that are newer than the specified UNIX timestamp.
|
||||
Defaults to None.
|
||||
end (Optional[int], optional): Returns flips that are older than the specified UNIX timestamp.
|
||||
Defaults to None.
|
||||
|
||||
Returns:
|
||||
List[CheckStatuses]: List of status flips for this check
|
||||
@@ -307,10 +295,7 @@ class AsyncClient(AbstractClient):
|
||||
"""
|
||||
request_url = self._get_api_request_url("channels/")
|
||||
response = self.check_response(await self._client.get(request_url))
|
||||
return [
|
||||
Integration.from_api_result(integration_dict)
|
||||
for integration_dict in response.json()["channels"]
|
||||
]
|
||||
return [Integration.from_api_result(integration_dict) for integration_dict in response.json()["channels"]]
|
||||
|
||||
async def get_badges(self) -> Dict[str, Badges]:
|
||||
"""Returns a dict of all tags in the project, with badge URLs for each tag.
|
||||
@@ -321,7 +306,8 @@ class AsyncClient(AbstractClient):
|
||||
shields: returns JSON in a Shields.io compatible format.
|
||||
In addition, badges have 2-state and 3-state variations:
|
||||
|
||||
svg, json, shields: reports two states: "up" and "down". It considers any checks in the grace period as still "up".
|
||||
svg, json, shields: reports two states: "up" and "down". It considers any checks in the grace period
|
||||
as still "up".
|
||||
svg3, json3, shields3: reports three states: "up", "late", and "down".
|
||||
|
||||
The response includes a special * entry: this pseudo-tag reports the overal status
|
||||
@@ -337,14 +323,9 @@ class AsyncClient(AbstractClient):
|
||||
"""
|
||||
request_url = self._get_api_request_url("badges/")
|
||||
response = self.check_response(await self._client.get(request_url))
|
||||
return {
|
||||
key: Badges.from_api_result(item)
|
||||
for key, item in response.json()["badges"].items()
|
||||
}
|
||||
return {key: Badges.from_api_result(item) for key, item in response.json()["badges"].items()}
|
||||
|
||||
async def success_ping(
|
||||
self, uuid: str = "", slug: str = "", data: str = ""
|
||||
) -> Tuple[bool, str]:
|
||||
async def success_ping(self, uuid: str = "", slug: str = "", data: str = "") -> Tuple[bool, str]:
|
||||
"""Signals to Healthchecks.io that a job has completed successfully.
|
||||
|
||||
Can also be used to indicate a continuously running process is still running and healthy.
|
||||
@@ -375,14 +356,10 @@ class AsyncClient(AbstractClient):
|
||||
Tuple[bool, str]: success (true or false) and the response text
|
||||
"""
|
||||
ping_url = self._get_ping_url(uuid, slug, "")
|
||||
response = self.check_ping_response(
|
||||
await self._client.post(ping_url, content=data)
|
||||
)
|
||||
response = self.check_ping_response(await self._client.post(ping_url, content=data))
|
||||
return (True if response.status_code == 200 else False, response.text)
|
||||
|
||||
async def start_ping(
|
||||
self, uuid: str = "", slug: str = "", data: str = ""
|
||||
) -> Tuple[bool, str]:
|
||||
async def start_ping(self, uuid: str = "", slug: str = "", data: str = "") -> Tuple[bool, str]:
|
||||
"""Sends a "job has started!" message to Healthchecks.io.
|
||||
|
||||
Sending a "start" signal is optional, but it enables a few extra features:
|
||||
@@ -415,14 +392,10 @@ class AsyncClient(AbstractClient):
|
||||
Tuple[bool, str]: success (true or false) and the response text
|
||||
"""
|
||||
ping_url = self._get_ping_url(uuid, slug, "/start")
|
||||
response = self.check_ping_response(
|
||||
await self._client.post(ping_url, content=data)
|
||||
)
|
||||
response = self.check_ping_response(await self._client.post(ping_url, content=data))
|
||||
return (True if response.status_code == 200 else False, response.text)
|
||||
|
||||
async def fail_ping(
|
||||
self, uuid: str = "", slug: str = "", data: str = ""
|
||||
) -> Tuple[bool, str]:
|
||||
async def fail_ping(self, uuid: str = "", slug: str = "", data: str = "") -> Tuple[bool, str]:
|
||||
"""Signals to Healthchecks.io that the job has failed.
|
||||
|
||||
Actively signaling a failure minimizes the delay from your monitored service failing to you receiving an alert.
|
||||
@@ -453,14 +426,10 @@ class AsyncClient(AbstractClient):
|
||||
Tuple[bool, str]: success (true or false) and the response text
|
||||
"""
|
||||
ping_url = self._get_ping_url(uuid, slug, "/fail")
|
||||
response = self.check_ping_response(
|
||||
await self._client.post(ping_url, content=data)
|
||||
)
|
||||
response = self.check_ping_response(await self._client.post(ping_url, content=data))
|
||||
return (True if response.status_code == 200 else False, response.text)
|
||||
|
||||
async def exit_code_ping(
|
||||
self, exit_code: int, uuid: str = "", slug: str = "", data: str = ""
|
||||
) -> Tuple[bool, str]:
|
||||
async def exit_code_ping(self, exit_code: int, uuid: str = "", slug: str = "", data: str = "") -> Tuple[bool, str]:
|
||||
"""Signals to Healthchecks.io that the job has failed.
|
||||
|
||||
Actively signaling a failure minimizes the delay from your monitored service failing to you receiving an alert.
|
||||
@@ -492,7 +461,5 @@ class AsyncClient(AbstractClient):
|
||||
Tuple[bool, str]: success (true or false) and the response text
|
||||
"""
|
||||
ping_url = self._get_ping_url(uuid, slug, f"/{exit_code}")
|
||||
response = self.check_ping_response(
|
||||
await self._client.post(ping_url, content=data)
|
||||
)
|
||||
response = self.check_ping_response(await self._client.post(ping_url, content=data))
|
||||
return (True if response.status_code == 200 else False, response.text)
|
||||
|
||||
Reference in New Issue
Block a user