chore: ci fixups and poetry update

This commit is contained in:
Andrew Herrington
2024-05-05 12:20:49 -05:00
parent cb862fa2c5
commit d0b986025e
24 changed files with 829 additions and 1026 deletions

View File

@@ -16,7 +16,7 @@ updates:
directory: "/docs"
schedule:
interval: daily
commit-message:
commit-message:
prefix: docs
- package-ecosystem: pip
directory: "/"
@@ -24,4 +24,3 @@ updates:
interval: daily
commit-message:
prefix: deps

View File

@@ -1,6 +1,6 @@
pip==23.1.2
nox==2023.4.22
nox-poetry==1.0.2
poetry==1.5.1
virtualenv==20.23.1
pip==24.0.0
nox==2024.4.15
nox-poetry==1.0.3
poetry==1.8.2
virtualenv==20.26.1
toml==0.10.2

View File

@@ -1,12 +1,35 @@
exclude: ".*tests\/fixtures.*"
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.6.0
hooks:
- id: check-yaml
- id: debug-statements
- id: end-of-file-fixer
- id: trailing-whitespace
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.4.3
hooks:
# Run the linter.
- id: ruff
args: [ --fix ]
# Run the formatter.
- id: ruff-format
- repo: https://github.com/rhysd/actionlint
rev: v1.6.27
hooks:
- id: actionlint-docker
name: Actionlint
- repo: local
hooks:
- id: black
name: black
entry: black
- id: bandit
name: bandit
entry: bandit
language: system
types: [python]
require_serial: true
args: ["-c", "pyproject.toml"]
- id: check-added-large-files
name: Check for added large files
entry: check-added-large-files
@@ -27,38 +50,16 @@ repos:
language: system
types: [text]
stages: [commit, push, manual]
- id: flake8
name: flake8
entry: flake8
language: system
types: [python]
exclude: "^(test/*|examples/*|noxfile.py)"
require_serial: true
args: ["--config=.flake8"]
- id: pyupgrade
name: pyupgrade
description: Automatically upgrade syntax for newer versions.
entry: pyupgrade
language: system
types: [python]
args: [--py37-plus]
- id: reorder-python-imports
name: Reorder python imports
entry: reorder-python-imports
language: system
types: [python]
args: [--application-directories=src]
args: [--py311-plus]
- id: trailing-whitespace
name: Trim Trailing Whitespace
entry: trailing-whitespace-fixer
language: system
types: [text]
stages: [commit, push, manual]
- repo: https://github.com/pre-commit/mirrors-prettier
rev: v2.7.1
hooks:
- id: prettier
- repo: https://github.com/rhysd/actionlint
rev: v1.6.15
hooks:
- id: actionlint-docker

View File

@@ -1,4 +1,5 @@
"""Sphinx configuration."""
from datetime import datetime

View File

@@ -1,4 +1,5 @@
"""Nox sessions."""
import os
import shlex
import shutil
@@ -23,7 +24,7 @@ except ImportError:
package = "healthchecks_io"
python_versions = ["3.10", "3.11", "3.9", "3.8", "3.7"]
python_versions = ["3.10", "3.11", "3.9", "3.8"]
nox.needs_version = ">= 2021.6.6"
nox.options.sessions = (
"pre-commit",
@@ -31,7 +32,6 @@ nox.options.sessions = (
"safety",
"mypy",
"tests",
# "typeguard",
"xdoctest",
"docs-build",
)
@@ -59,7 +59,7 @@ def activate_virtualenv_in_precommit_hooks(session: Session) -> None:
Args:
session: The Session object.
"""
assert session.bin is not None # noqa: S101
assert session.bin is not None # nosec: B101
# Only patch hooks containing a reference to this session's bindir. Support
# quoting rules for Python and bash, but strip the outermost quotes so we

1340
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -25,7 +25,7 @@ classifiers = [
Changelog = "https://github.com/andrewthetechie/py-healthchecks.io/releases"
[tool.poetry.dependencies]
python = "^3.7"
python = "^3.8"
pydantic = "^1.9.1"
httpx = ">=0.23.0,<0.25.0"
croniter = "^1.1.0"
@@ -67,6 +67,9 @@ source = ["healthchecks_io"]
show_missing = true
fail_under = 100
[tool.bandit]
exclude_dirs = ["tests", "noxfile.py", ".github/scripts", "test_errbot", "dist"]
[tool.mypy]
strict = true
warn_unreachable = true
@@ -84,4 +87,4 @@ addopts = "-n 4 --ignore examples --cov=healthchecks_io --cov-report xml:.covera
[tool.ruff]
line-length = 120
target-version = "py37"
target-version = "py38"

View File

@@ -1,4 +1,5 @@
"""Py Healthchecks.Io."""
# set by poetry-dynamic-versioning
__version__ = "0.4.0" # noqa: E402

View File

@@ -1,4 +1,5 @@
"""healthchecks_io clients."""
from .async_client import AsyncClient # noqa: F401
from .check_trap import CheckTrap # noqa: F401
from .sync_client import Client # noqa: F401

View File

@@ -1,9 +1,6 @@
from abc import ABC
from abc import abstractmethod
from typing import Any
from typing import Dict
from typing import Optional
from typing import Union
from urllib.parse import parse_qsl
from urllib.parse import ParseResult
from urllib.parse import unquote
@@ -57,9 +54,7 @@ class AbstractClient(ABC):
"""Finalizer method is called by weakref.finalize when the object is dereferenced to do cleanup of clients."""
pass
def _get_api_request_url(
self, path: str, params: Optional[Dict[str, Any]] = None
) -> str:
def _get_api_request_url(self, path: str, params: dict[str, Any] | None = None) -> str:
"""Get a full request url for the healthchecks api.
Args:
@@ -165,9 +160,7 @@ class AbstractClient(ABC):
raise CheckNotFoundError(f"CHeck not found at {response.request.url}")
if response.status_code == 400:
raise BadAPIRequestError(
f"Bad request when requesting {response.request.url}. {response.text}"
)
raise BadAPIRequestError(f"Bad request when requesting {response.request.url}. {response.text}")
return response
@@ -208,21 +201,15 @@ class AbstractClient(ABC):
raise HCAPIRateLimitError(f"Rate limited on {response.request.url}")
if response.status_code == 400:
raise BadAPIRequestError(
f"Bad request when requesting {response.request.url}. {response.text}"
)
raise BadAPIRequestError(f"Bad request when requesting {response.request.url}. {response.text}")
if response.status_code == 409:
raise NonUniqueSlugError(
f"Bad request, slug conflict {response.request.url}. {response.text}"
)
raise NonUniqueSlugError(f"Bad request, slug conflict {response.request.url}. {response.text}")
return response
@staticmethod
def _add_url_params(
url: str, params: Dict[str, Union[str, int, bool]], replace: bool = True
) -> str:
def _add_url_params(url: str, params: dict[str, str | int | bool], replace: bool = True) -> str:
"""Add GET params to provided URL being aware of existing.
:param url: string of target URL
@@ -255,12 +242,7 @@ class AbstractClient(ABC):
# get all the duplicated keys from params and urlencode them, we'll concat this to the params string later
duplicated_params = [x for x in params if x in parsed_get_args]
# get all the args that aren't duplicated and add them to parsed_get_args
parsed_get_args.update(
{
key: parsed_params[key]
for key in [x for x in params if x not in parsed_get_args]
}
)
parsed_get_args.update({key: parsed_params[key] for key in [x for x in params if x not in parsed_get_args]})
# if we have any duplicated parameters, urlencode them, we append them later
extra_parameters = (
f"&{urlencode({key: params[key] for key in duplicated_params}, doseq=True)}"

View File

@@ -1,11 +1,7 @@
"""An async healthchecks.io client."""
import asyncio
from types import TracebackType
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from typing import Type
from httpx import AsyncClient as HTTPXAsyncClient
@@ -29,7 +25,7 @@ class AsyncClient(AbstractClient):
api_url: str = "https://healthchecks.io/api/",
ping_url: str = "https://hc-ping.com/",
api_version: int = 1,
client: Optional[HTTPXAsyncClient] = None,
client: HTTPXAsyncClient | None = None,
) -> None:
"""An AsyncClient can be used in code using asyncio to work with the Healthchecks.io api.
@@ -64,9 +60,9 @@ class AsyncClient(AbstractClient):
async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc: Optional[BaseException],
traceback: Optional[TracebackType],
exc_type: type[BaseException] | None,
exc: BaseException | None,
traceback: TracebackType | None,
) -> None:
"""Context manager exit."""
await self._afinalizer_method()
@@ -118,7 +114,7 @@ class AsyncClient(AbstractClient):
)
return Check.from_api_result(response.json())
async def get_checks(self, tags: Optional[List[str]] = None) -> List[Check]:
async def get_checks(self, tags: list[str] | None = None) -> list[Check]:
"""Get a list of checks from the healthchecks api.
Args:
@@ -213,7 +209,7 @@ class AsyncClient(AbstractClient):
response = self.check_response(await self._client.delete(request_url))
return Check.from_api_result(response.json())
async def get_check_pings(self, check_id: str) -> List[CheckPings]:
async def get_check_pings(self, check_id: str) -> list[CheckPings]:
"""Returns a list of pings this check has received.
This endpoint returns pings in reverse order (most recent first),
@@ -241,10 +237,10 @@ class AsyncClient(AbstractClient):
async def get_check_flips(
self,
check_id: str,
seconds: Optional[int] = None,
start: Optional[int] = None,
end: Optional[int] = None,
) -> List[CheckStatuses]:
seconds: int | None = None,
start: int | None = None,
end: int | None = None,
) -> list[CheckStatuses]:
"""Returns a list of "flips" this check has experienced.
A flip is a change of status (from "down" to "up," or from "up" to "down").
@@ -281,7 +277,7 @@ class AsyncClient(AbstractClient):
response = self.check_response(await self._client.get(request_url))
return [CheckStatuses(**status_data) for status_data in response.json()]
async def get_integrations(self) -> List[Optional[Integration]]:
async def get_integrations(self) -> list[Integration | None]:
"""Returns a list of integrations belonging to the project.
Raises:
@@ -297,7 +293,7 @@ class AsyncClient(AbstractClient):
response = self.check_response(await self._client.get(request_url))
return [Integration.from_api_result(integration_dict) for integration_dict in response.json()["channels"]]
async def get_badges(self) -> Dict[str, Badges]:
async def get_badges(self) -> dict[str, Badges]:
"""Returns a dict of all tags in the project, with badge URLs for each tag.
Healthchecks.io provides badges in a few different formats:
@@ -325,7 +321,7 @@ class AsyncClient(AbstractClient):
response = self.check_response(await self._client.get(request_url))
return {key: Badges.from_api_result(item) for key, item in response.json()["badges"].items()}
async def success_ping(self, uuid: str = "", slug: str = "", data: str = "") -> Tuple[bool, str]:
async def success_ping(self, uuid: str = "", slug: str = "", data: str = "") -> tuple[bool, str]:
"""Signals to Healthchecks.io that a job has completed successfully.
Can also be used to indicate a continuously running process is still running and healthy.
@@ -359,7 +355,7 @@ class AsyncClient(AbstractClient):
response = self.check_ping_response(await self._client.post(ping_url, content=data))
return (True if response.status_code == 200 else False, response.text)
async def start_ping(self, uuid: str = "", slug: str = "", data: str = "") -> Tuple[bool, str]:
async def start_ping(self, uuid: str = "", slug: str = "", data: str = "") -> tuple[bool, str]:
"""Sends a "job has started!" message to Healthchecks.io.
Sending a "start" signal is optional, but it enables a few extra features:
@@ -395,7 +391,7 @@ class AsyncClient(AbstractClient):
response = self.check_ping_response(await self._client.post(ping_url, content=data))
return (True if response.status_code == 200 else False, response.text)
async def fail_ping(self, uuid: str = "", slug: str = "", data: str = "") -> Tuple[bool, str]:
async def fail_ping(self, uuid: str = "", slug: str = "", data: str = "") -> tuple[bool, str]:
"""Signals to Healthchecks.io that the job has failed.
Actively signaling a failure minimizes the delay from your monitored service failing to you receiving an alert.
@@ -429,7 +425,7 @@ class AsyncClient(AbstractClient):
response = self.check_ping_response(await self._client.post(ping_url, content=data))
return (True if response.status_code == 200 else False, response.text)
async def exit_code_ping(self, exit_code: int, uuid: str = "", slug: str = "", data: str = "") -> Tuple[bool, str]:
async def exit_code_ping(self, exit_code: int, uuid: str = "", slug: str = "", data: str = "") -> tuple[bool, str]:
"""Signals to Healthchecks.io that the job has failed.
Actively signaling a failure minimizes the delay from your monitored service failing to you receiving an alert.

View File

@@ -1,9 +1,6 @@
"""CheckTrap is a context manager to wrap around python code to communicate results to a Healthchecks check."""
from types import TracebackType
from typing import List
from typing import Optional
from typing import Type
from typing import Union
from .async_client import AsyncClient
from .exceptions import PingFailedError
@@ -16,7 +13,7 @@ class CheckTrap:
def __init__(
self,
client: Union[Client, AsyncClient],
client: Client | AsyncClient,
uuid: str = "",
slug: str = "",
suppress_exceptions: bool = False,
@@ -34,10 +31,10 @@ class CheckTrap:
"""
if uuid == "" and slug == "":
raise Exception("Must pass a slug or an uuid")
self.client: Union[Client, AsyncClient] = client
self.client: Client | AsyncClient = client
self.uuid: str = uuid
self.slug: str = slug
self.log_lines: List[str] = list()
self.log_lines: list[str] = list()
self.suppress_exceptions: bool = suppress_exceptions
def add_log(self, line: str) -> None:
@@ -68,9 +65,7 @@ class CheckTrap:
CheckTrap: self
"""
if isinstance(self.client, AsyncClient):
raise WrongClientError(
"You passed an AsyncClient, use this as an async context manager"
)
raise WrongClientError("You passed an AsyncClient, use this as an async context manager")
result = self.client.start_ping(uuid=self.uuid, slug=self.slug)
if not result[0]:
raise PingFailedError(result[1])
@@ -78,10 +73,10 @@ class CheckTrap:
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc: Optional[BaseException],
traceback: Optional[TracebackType],
) -> Optional[bool]:
exc_type: type[BaseException] | None,
exc: BaseException | None,
traceback: TracebackType | None,
) -> bool | None:
"""Exit the context manager.
If there is an exception, add it to any log lines and send a fail ping.
@@ -96,9 +91,7 @@ class CheckTrap:
Optional[bool]: self.suppress_exceptions, if true will not raise any exceptions
"""
if exc_type is None:
self.client.success_ping(
self.uuid, self.slug, data="\n".join(self.log_lines)
)
self.client.success_ping(self.uuid, self.slug, data="\n".join(self.log_lines))
else:
self.add_log(str(exc))
self.add_log(str(traceback))
@@ -125,9 +118,7 @@ class CheckTrap:
CheckTrap: self
"""
if isinstance(self.client, Client):
raise WrongClientError(
"You passed a sync Client, use this as a regular context manager"
)
raise WrongClientError("You passed a sync Client, use this as a regular context manager")
result = await self.client.start_ping(self.uuid, self.slug)
if not result[0]:
raise PingFailedError(result[1])
@@ -135,10 +126,10 @@ class CheckTrap:
async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc: Optional[BaseException],
traceback: Optional[TracebackType],
) -> Optional[bool]:
exc_type: type[BaseException] | None,
exc: BaseException | None,
traceback: TracebackType | None,
) -> bool | None:
"""Exit the context manager.
If there is an exception, add it to any log lines and send a fail ping.

View File

@@ -1,10 +1,6 @@
"""An async healthchecks.io client."""
from types import TracebackType
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from typing import Type
from httpx import Client as HTTPXClient
@@ -27,7 +23,7 @@ class Client(AbstractClient):
api_url: str = "https://healthchecks.io/api/",
ping_url: str = "https://hc-ping.com/",
api_version: int = 1,
client: Optional[HTTPXClient] = None,
client: HTTPXClient | None = None,
) -> None:
"""An AsyncClient can be used in code using asyncio to work with the Healthchecks.io api.
@@ -62,9 +58,9 @@ class Client(AbstractClient):
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc: Optional[BaseException],
traceback: Optional[TracebackType],
exc_type: type[BaseException] | None,
exc: BaseException | None,
traceback: TracebackType | None,
) -> None:
"""Context manager exit."""
self._finalizer_method()
@@ -73,7 +69,7 @@ class Client(AbstractClient):
"""Closes the httpx client."""
self._client.close()
def get_checks(self, tags: Optional[List[str]] = None) -> List[checks.Check]:
def get_checks(self, tags: list[str] | None = None) -> list[checks.Check]:
"""Get a list of checks from the healthchecks api.
Args:
@@ -208,7 +204,7 @@ class Client(AbstractClient):
response = self.check_response(self._client.delete(request_url))
return checks.Check.from_api_result(response.json())
def get_check_pings(self, check_id: str) -> List[checks.CheckPings]:
def get_check_pings(self, check_id: str) -> list[checks.CheckPings]:
"""Returns a list of pings this check has received.
This endpoint returns pings in reverse order (most recent first),
@@ -234,10 +230,10 @@ class Client(AbstractClient):
def get_check_flips(
self,
check_id: str,
seconds: Optional[int] = None,
start: Optional[int] = None,
end: Optional[int] = None,
) -> List[checks.CheckStatuses]:
seconds: int | None = None,
start: int | None = None,
end: int | None = None,
) -> list[checks.CheckStatuses]:
"""Returns a list of "flips" this check has experienced.
A flip is a change of status (from "down" to "up," or from "up" to "down").
@@ -272,7 +268,7 @@ class Client(AbstractClient):
response = self.check_response(self._client.get(request_url))
return [checks.CheckStatuses(**status_data) for status_data in response.json()]
def get_integrations(self) -> List[Optional[integrations.Integration]]:
def get_integrations(self) -> list[integrations.Integration | None]:
"""Returns a list of integrations belonging to the project.
Raises:
@@ -290,7 +286,7 @@ class Client(AbstractClient):
for integration_dict in response.json()["channels"]
]
def get_badges(self) -> Dict[str, badges.Badges]:
def get_badges(self) -> dict[str, badges.Badges]:
"""Returns a dict of all tags in the project, with badge URLs for each tag.
Healthchecks.io provides badges in a few different formats:
@@ -317,7 +313,7 @@ class Client(AbstractClient):
response = self.check_response(self._client.get(request_url))
return {key: badges.Badges.from_api_result(item) for key, item in response.json()["badges"].items()}
def success_ping(self, uuid: str = "", slug: str = "", data: str = "") -> Tuple[bool, str]:
def success_ping(self, uuid: str = "", slug: str = "", data: str = "") -> tuple[bool, str]:
"""Signals to Healthchecks.io that a job has completed successfully.
Can also be used to indicate a continuously running process is still running and healthy.
@@ -351,7 +347,7 @@ class Client(AbstractClient):
response = self.check_ping_response(self._client.post(ping_url, content=data))
return (True if response.status_code == 200 else False, response.text)
def start_ping(self, uuid: str = "", slug: str = "", data: str = "") -> Tuple[bool, str]:
def start_ping(self, uuid: str = "", slug: str = "", data: str = "") -> tuple[bool, str]:
"""Sends a "job has started!" message to Healthchecks.io.
Sending a "start" signal is optional, but it enables a few extra features:
@@ -387,7 +383,7 @@ class Client(AbstractClient):
response = self.check_ping_response(self._client.post(ping_url, content=data))
return (True if response.status_code == 200 else False, response.text)
def fail_ping(self, uuid: str = "", slug: str = "", data: str = "") -> Tuple[bool, str]:
def fail_ping(self, uuid: str = "", slug: str = "", data: str = "") -> tuple[bool, str]:
"""Signals to Healthchecks.io that the job has failed.
Actively signaling a failure minimizes the delay from your monitored service failing to you receiving an alert.
@@ -421,7 +417,7 @@ class Client(AbstractClient):
response = self.check_ping_response(self._client.post(ping_url, content=data))
return (True if response.status_code == 200 else False, response.text)
def exit_code_ping(self, exit_code: int, uuid: str = "", slug: str = "", data: str = "") -> Tuple[bool, str]:
def exit_code_ping(self, exit_code: int, uuid: str = "", slug: str = "", data: str = "") -> tuple[bool, str]:
"""Signals to Healthchecks.io that the job has failed.
Actively signaling a failure minimizes the delay from your monitored service failing to you receiving an alert.

View File

@@ -1,4 +1,5 @@
"""Schemas for healthchecks_io."""
from .badges import Badges
from .checks import Check
from .checks import CheckCreate

View File

@@ -2,7 +2,6 @@
https://healthchecks.io/docs/api/
"""
from typing import Dict
from pydantic import BaseModel
@@ -18,7 +17,7 @@ class Badges(BaseModel):
shields3: str
@classmethod
def from_api_result(cls, badges_dict: Dict[str, str]) -> "Badges":
def from_api_result(cls, badges_dict: dict[str, str]) -> "Badges":
"""Converts a dictionary from the healthchecks api into a Badges object."""
badges_dict["json_url"] = badges_dict["json"]
badges_dict["json3_url"] = badges_dict["json3"]

View File

@@ -2,13 +2,10 @@
https://healthchecks.io/docs/api/
"""
from datetime import datetime
from pathlib import PurePath
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Union
from urllib.parse import urlparse
import pytz
@@ -21,28 +18,28 @@ from pydantic import validator
class Check(BaseModel):
"""Schema for a check object, either from a readonly api request or a rw api request."""
unique_key: Optional[str]
unique_key: str | None
name: str
slug: str
tags: Optional[str]
desc: Optional[str]
tags: str | None
desc: str | None
grace: int
n_pings: int
status: str
last_ping: Optional[datetime]
next_ping: Optional[datetime]
last_ping: datetime | None
next_ping: datetime | None
manual_resume: bool
methods: Optional[str]
methods: str | None
# healthchecks.io's api doesn't return a scheme so we cant use Pydantic AnyUrl here
ping_url: Optional[str]
update_url: Optional[str]
pause_url: Optional[str]
channels: Optional[str]
timeout: Optional[int]
uuid: Optional[str]
ping_url: str | None
update_url: str | None
pause_url: str | None
channels: str | None
timeout: int | None
uuid: str | None
@validator("uuid", always=True)
def validate_uuid(cls, value: Optional[str], values: Dict[str, Any]) -> Optional[str]: # noqa: B902
def validate_uuid(cls, value: str | None, values: dict[str, Any]) -> str | None: # noqa: B902
"""Tries to set the uuid from the ping_url.
Will return none if a read only token is used because it cannot retrieve the UUID of a check
@@ -56,7 +53,7 @@ class Check(BaseModel):
return value
@classmethod
def from_api_result(cls, check_dict: Dict[str, Any]) -> "Check":
def from_api_result(cls, check_dict: dict[str, Any]) -> "Check":
"""Converts a dictionary from the healthchecks api into an Check object."""
return cls(**check_dict)
@@ -64,34 +61,33 @@ class Check(BaseModel):
class CheckCreate(BaseModel):
"""Pydantic object for creating a check."""
name: Optional[str] = Field("", description="Name of the check")
tags: Optional[str] = Field("", description="String separated list of tags to apply")
desc: Optional[str] = Field("", description="Description of the check")
timeout: Optional[int] = Field(
name: str | None = Field("", description="Name of the check")
tags: str | None = Field("", description="String separated list of tags to apply")
desc: str | None = Field("", description="Description of the check")
timeout: int | None = Field(
86400,
description="The expected period of this check in seconds.",
gte=60,
lte=31536000,
)
grace: Optional[int] = Field(
grace: int | None = Field(
3600,
description="The grace period for this check in seconds.",
gte=60,
lte=31536000,
)
schedule: Optional[str] = Field(
schedule: str | None = Field(
None,
description="A cron expression defining this check's schedule. "
"If you specify both timeout and schedule parameters, "
"Healthchecks.io will create a Cron check and ignore the "
"timeout value.",
)
tz: Optional[str] = Field(
tz: str | None = Field(
"UTC",
description="Server's timezone. This setting only has an effect "
"in combination with the schedule parameter.",
description="Server's timezone. This setting only has an effect " "in combination with the schedule parameter.",
)
manual_resume: Optional[bool] = Field(
manual_resume: bool | None = Field(
False,
description="Controls whether a paused check automatically resumes "
"when pinged (the default) or not. If set to false, a paused "
@@ -99,7 +95,7 @@ class CheckCreate(BaseModel):
"If set to true, a paused check will ignore pings and stay "
"paused until you manually resume it from the web dashboard.",
)
methods: Optional[str] = Field(
methods: str | None = Field(
"",
description="Specifies the allowed HTTP methods for making "
"ping requests. Must be one of the two values: an empty "
@@ -107,7 +103,7 @@ class CheckCreate(BaseModel):
"allow HEAD, GET, and POST requests. Set this field to "
"POST to allow only POST requests.",
)
channels: Optional[str] = Field(
channels: str | None = Field(
None,
description="By default, this API call assigns no integrations"
"to the newly created check. By default, this API call "
@@ -115,7 +111,7 @@ class CheckCreate(BaseModel):
"To assign specific integrations, use a comma-separated list "
"of integration UUIDs.",
)
unique: Optional[List[Optional[str]]] = Field(
unique: list[str | None] | None = Field(
[],
description="Enables upsert functionality. Before creating a check, "
"Healthchecks.io looks for existing checks, filtered by fields listed "
@@ -148,7 +144,7 @@ class CheckCreate(BaseModel):
return value
@validator("unique")
def validate_unique(cls, value: List[Optional[str]]) -> List[Optional[str]]:
def validate_unique(cls, value: list[str | None]) -> list[str | None]:
"""Validate unique list."""
for unique in value:
if unique not in ("name", "tags", "timeout", "grace"):
@@ -161,33 +157,32 @@ class CheckCreate(BaseModel):
class CheckUpdate(CheckCreate):
"""Pydantic object for updating a check."""
name: Optional[str] = Field(None, description="Name of the check")
tags: Optional[str] = Field(None, description="String separated list of tags to apply")
timeout: Optional[int] = Field(
name: str | None = Field(None, description="Name of the check")
tags: str | None = Field(None, description="String separated list of tags to apply")
timeout: int | None = Field(
None,
description="The expected period of this check in seconds.",
gte=60,
lte=31536000,
)
grace: Optional[int] = Field(
grace: int | None = Field(
None,
description="The grace period for this check in seconds.",
gte=60,
lte=31536000,
)
schedule: Optional[str] = Field(
schedule: str | None = Field(
None,
description="A cron expression defining this check's schedule. "
"If you specify both timeout and schedule parameters, "
"Healthchecks.io will create a Cron check and ignore the "
"timeout value.",
)
tz: Optional[str] = Field(
tz: str | None = Field(
None,
description="Server's timezone. This setting only has an effect "
"in combination with the schedule parameter.",
description="Server's timezone. This setting only has an effect " "in combination with the schedule parameter.",
)
manual_resume: Optional[bool] = Field(
manual_resume: bool | None = Field(
None,
description="Controls whether a paused check automatically resumes "
"when pinged (the default) or not. If set to false, a paused "
@@ -195,7 +190,7 @@ class CheckUpdate(CheckCreate):
"If set to true, a paused check will ignore pings and stay "
"paused until you manually resume it from the web dashboard.",
)
methods: Optional[str] = Field(
methods: str | None = Field(
None,
description="Specifies the allowed HTTP methods for making "
"ping requests. Must be one of the two values: an empty "
@@ -203,7 +198,7 @@ class CheckUpdate(CheckCreate):
"allow HEAD, GET, and POST requests. Set this field to "
"POST to allow only POST requests.",
)
channels: Optional[str] = Field(
channels: str | None = Field(
None,
description="By default, this API call assigns no integrations"
"to the newly created check. By default, this API call "
@@ -211,7 +206,7 @@ class CheckUpdate(CheckCreate):
"To assign specific integrations, use a comma-separated list "
"of integration UUIDs.",
)
unique: Optional[List[Optional[str]]] = Field(
unique: list[str | None] | None = Field(
None,
description="Enables upsert functionality. Before creating a check, "
"Healthchecks.io looks for existing checks, filtered by fields listed "
@@ -233,10 +228,10 @@ class CheckPings(BaseModel):
remote_addr: str
method: str
user_agent: str
duration: Optional[float] = None
duration: float | None = None
@classmethod
def from_api_result(cls, ping_dict: Dict[str, Union[str, int, datetime]]) -> "CheckPings":
def from_api_result(cls, ping_dict: dict[str, str | int | datetime]) -> "CheckPings":
"""Converts a dictionary from the healthchecks api into a CheckPings object."""
ping_dict["number_of_pings"] = ping_dict["n"]
ping_dict["user_agent"] = ping_dict["ua"]

View File

@@ -2,7 +2,6 @@
https://healthchecks.io/docs/api/
"""
from typing import Dict
from pydantic import BaseModel
@@ -15,6 +14,6 @@ class Integration(BaseModel):
kind: str
@classmethod
def from_api_result(cls, integration_dict: Dict[str, str]) -> "Integration":
def from_api_result(cls, integration_dict: dict[str, str]) -> "Integration":
"""Converts a dictionary from the healthchecks api into an Integration object."""
return cls(**integration_dict)

View File

@@ -13,10 +13,7 @@ from healthchecks_io import NonUniqueSlugError
def test_abstract_add_url_params(test_abstract_client):
url = test_abstract_client._add_url_params(
"http://test.com/?test=test", {"test": "test2"}
)
url = test_abstract_client._add_url_params("http://test.com/?test=test", {"test": "test2"})
assert url == "http://test.com/?test=test2"

View File

@@ -15,9 +15,7 @@ from healthchecks_io.client.exceptions import HCAPIError
@pytest.mark.asyncio
@pytest.mark.respx
async def test_acreate_check_200_context_manager(
fake_check_api_result, respx_mock, test_async_client
):
async def test_acreate_check_200_context_manager(fake_check_api_result, respx_mock, test_async_client):
checks_url = urljoin(test_async_client._api_url, "checks/")
respx_mock.post(checks_url).mock(
return_value=Response(
@@ -43,9 +41,7 @@ async def test_acreate_check_200_context_manager(
)
)
async with test_async_client as test_client:
check = await test_client.create_check(
CheckCreate(name="test", tags="test", desc="test")
)
check = await test_client.create_check(CheckCreate(name="test", tags="test", desc="test"))
assert check.name == "Backups"
@@ -76,9 +72,7 @@ async def test_acreate_check_200(fake_check_api_result, respx_mock, test_async_c
},
)
)
check = await test_async_client.create_check(
CheckCreate(name="test", tags="test", desc="test")
)
check = await test_async_client.create_check(CheckCreate(name="test", tags="test", desc="test"))
assert check.name == "Backups"
@@ -109,9 +103,7 @@ async def test_aupdate_check_200(fake_check_api_result, respx_mock, test_async_c
},
)
)
check = await test_async_client.update_check(
"test", CheckUpdate(name="test", desc="test")
)
check = await test_async_client.update_check("test", CheckUpdate(name="test", desc="test"))
assert check.name == "Backups"
@@ -120,9 +112,7 @@ async def test_aupdate_check_200(fake_check_api_result, respx_mock, test_async_c
async def test_aget_checks_200(fake_check_api_result, respx_mock, test_async_client):
assert test_async_client._client is not None
checks_url = urljoin(test_async_client._api_url, "checks/")
respx_mock.get(checks_url).mock(
return_value=Response(status_code=200, json={"checks": [fake_check_api_result]})
)
respx_mock.get(checks_url).mock(return_value=Response(status_code=200, json={"checks": [fake_check_api_result]}))
checks = await test_async_client.get_checks()
assert len(checks) == 1
assert checks[0].name == fake_check_api_result["name"]
@@ -132,13 +122,9 @@ async def test_aget_checks_200(fake_check_api_result, respx_mock, test_async_cli
@pytest.mark.respx
async def test_aget_checks_pass_in_client(fake_check_api_result, respx_mock):
httpx_client = HTTPXAsyncClient()
test_async_client = AsyncClient(
api_key="test", api_url="http://localhost/api/", client=httpx_client
)
test_async_client = AsyncClient(api_key="test", api_url="http://localhost/api/", client=httpx_client)
checks_url = urljoin(test_async_client._api_url, "checks/")
respx_mock.get(checks_url).mock(
return_value=Response(status_code=200, json={"checks": [fake_check_api_result]})
)
respx_mock.get(checks_url).mock(return_value=Response(status_code=200, json={"checks": [fake_check_api_result]}))
checks = await test_async_client.get_checks()
assert len(checks) == 1
assert checks[0].name == fake_check_api_result["name"]
@@ -146,9 +132,7 @@ async def test_aget_checks_pass_in_client(fake_check_api_result, respx_mock):
@pytest.mark.asyncio
@pytest.mark.respx
async def test_aget_checks_exceptions(
fake_check_api_result, respx_mock, test_async_client
):
async def test_aget_checks_exceptions(fake_check_api_result, respx_mock, test_async_client):
checks_url = urljoin(test_async_client._api_url, "checks/")
# test exceptions
respx_mock.get(checks_url).mock(return_value=Response(status_code=401))
@@ -186,9 +170,7 @@ def test_finalizer_closes(test_async_client):
async def test_aget_check_200(fake_check_api_result, respx_mock, test_async_client):
assert test_async_client._client is not None
checks_url = urljoin(test_async_client._api_url, "checks/test")
respx_mock.get(checks_url).mock(
return_value=Response(status_code=200, json=fake_check_api_result)
)
respx_mock.get(checks_url).mock(return_value=Response(status_code=200, json=fake_check_api_result))
check = await test_async_client.get_check(check_id="test")
assert check.name == fake_check_api_result["name"]
@@ -207,9 +189,7 @@ async def test_acheck_get_404(respx_mock, test_async_client):
@pytest.mark.respx
async def test_pause_check_200(fake_check_api_result, respx_mock, test_async_client):
checks_url = urljoin(test_async_client._api_url, "checks/test/pause")
respx_mock.post(checks_url).mock(
return_value=Response(status_code=200, json=fake_check_api_result)
)
respx_mock.post(checks_url).mock(return_value=Response(status_code=200, json=fake_check_api_result))
check = await test_async_client.pause_check(check_id="test")
assert check.name == fake_check_api_result["name"]
@@ -229,9 +209,7 @@ async def test_acheck_pause_404(respx_mock, test_async_client):
async def test_adelete_check_200(fake_check_api_result, respx_mock, test_async_client):
assert test_async_client._client is not None
checks_url = urljoin(test_async_client._api_url, "checks/test")
respx_mock.delete(checks_url).mock(
return_value=Response(status_code=200, json=fake_check_api_result)
)
respx_mock.delete(checks_url).mock(return_value=Response(status_code=200, json=fake_check_api_result))
check = await test_async_client.delete_check(check_id="test")
assert check.name == fake_check_api_result["name"]
@@ -247,15 +225,9 @@ async def test_adelete_pause404(respx_mock, test_async_client):
@pytest.mark.asyncio
@pytest.mark.respx
async def test_aget_check_pings_200(
fake_check_pings_api_result, respx_mock, test_async_client
):
async def test_aget_check_pings_200(fake_check_pings_api_result, respx_mock, test_async_client):
checks_url = urljoin(test_async_client._api_url, "checks/test/pings/")
respx_mock.get(checks_url).mock(
return_value=Response(
status_code=200, json={"pings": fake_check_pings_api_result}
)
)
respx_mock.get(checks_url).mock(return_value=Response(status_code=200, json={"pings": fake_check_pings_api_result}))
pings = await test_async_client.get_check_pings("test")
assert len(pings) == len(fake_check_pings_api_result)
assert pings[0].type == fake_check_pings_api_result[0]["type"]
@@ -263,13 +235,9 @@ async def test_aget_check_pings_200(
@pytest.mark.asyncio
@pytest.mark.respx
async def test_aget_check_flips_200(
fake_check_flips_api_result, respx_mock, test_async_client
):
async def test_aget_check_flips_200(fake_check_flips_api_result, respx_mock, test_async_client):
checks_url = urljoin(test_async_client._api_url, "checks/test/flips/")
respx_mock.get(checks_url).mock(
return_value=Response(status_code=200, json=fake_check_flips_api_result)
)
respx_mock.get(checks_url).mock(return_value=Response(status_code=200, json=fake_check_flips_api_result))
flips = await test_async_client.get_check_flips("test")
assert len(flips) == len(fake_check_flips_api_result)
assert flips[0].up == fake_check_flips_api_result[0]["up"]
@@ -277,15 +245,9 @@ async def test_aget_check_flips_200(
@pytest.mark.asyncio
@pytest.mark.respx
async def test_get_check_flips_params_200(
fake_check_flips_api_result, respx_mock, test_async_client
):
checks_url = urljoin(
test_async_client._api_url, "checks/test/flips/?seconds=1&start=1&end=1"
)
respx_mock.get(checks_url).mock(
return_value=Response(status_code=200, json=fake_check_flips_api_result)
)
async def test_get_check_flips_params_200(fake_check_flips_api_result, respx_mock, test_async_client):
checks_url = urljoin(test_async_client._api_url, "checks/test/flips/?seconds=1&start=1&end=1")
respx_mock.get(checks_url).mock(return_value=Response(status_code=200, json=fake_check_flips_api_result))
flips = await test_async_client.get_check_flips("test", seconds=1, start=1, end=1)
assert len(flips) == len(fake_check_flips_api_result)
assert flips[0].up == fake_check_flips_api_result[0]["up"]
@@ -293,9 +255,7 @@ async def test_get_check_flips_params_200(
@pytest.mark.asyncio
@pytest.mark.respx
async def test_aget_check_flips_400(
fake_check_flips_api_result, respx_mock, test_async_client
):
async def test_aget_check_flips_400(fake_check_flips_api_result, respx_mock, test_async_client):
flips_url = urljoin(test_async_client._api_url, "checks/test/flips/")
respx_mock.get(flips_url).mock(return_value=Response(status_code=400))
with pytest.raises(BadAPIRequestError):
@@ -304,13 +264,9 @@ async def test_aget_check_flips_400(
@pytest.mark.asyncio
@pytest.mark.respx
async def test_aget_integrations(
fake_integrations_api_result, respx_mock, test_async_client
):
async def test_aget_integrations(fake_integrations_api_result, respx_mock, test_async_client):
channels_url = urljoin(test_async_client._api_url, "channels/")
respx_mock.get(channels_url).mock(
return_value=Response(status_code=200, json=fake_integrations_api_result)
)
respx_mock.get(channels_url).mock(return_value=Response(status_code=200, json=fake_integrations_api_result))
integrations = await test_async_client.get_integrations()
assert len(integrations) == len(fake_integrations_api_result["channels"])
assert integrations[0].id == fake_integrations_api_result["channels"][0]["id"]
@@ -320,9 +276,7 @@ async def test_aget_integrations(
@pytest.mark.respx
async def test_aget_badges(fake_badges_api_result, respx_mock, test_async_client):
channels_url = urljoin(test_async_client._api_url, "badges/")
respx_mock.get(channels_url).mock(
return_value=Response(status_code=200, json=fake_badges_api_result)
)
respx_mock.get(channels_url).mock(return_value=Response(status_code=200, json=fake_badges_api_result))
integrations = await test_async_client.get_badges()
assert integrations.keys() == fake_badges_api_result["badges"].keys()
@@ -389,14 +343,10 @@ ping_test_parameters = [
@pytest.mark.asyncio
@pytest.mark.respx
@pytest.mark.parametrize(
"respx_mocker, tc, url, ping_method, method_kwargs", ping_test_parameters
)
@pytest.mark.parametrize("respx_mocker, tc, url, ping_method, method_kwargs", ping_test_parameters)
async def test_asuccess_ping(respx_mocker, tc, url, ping_method, method_kwargs):
channels_url = urljoin(tc._ping_url, url)
respx_mocker.post(channels_url).mock(
return_value=Response(status_code=200, text="OK")
)
respx_mocker.post(channels_url).mock(return_value=Response(status_code=200, text="OK"))
ping_method = getattr(tc, ping_method)
result = await ping_method(**method_kwargs)
assert result[0] is True

View File

@@ -76,7 +76,6 @@ async def test_check_trap_async_exception(respx_mock, test_async_client):
@pytest.mark.asyncio
async def test_check_trap_wrong_client_error(test_client, test_async_client):
with pytest.raises(WrongClientError):
async with CheckTrap(test_client, uuid="test"):
pass

View File

@@ -14,9 +14,7 @@ from healthchecks_io.client.exceptions import HCAPIError
@pytest.mark.respx
def test_create_check_200_context_manager(
fake_check_api_result, respx_mock, test_client
):
def test_create_check_200_context_manager(fake_check_api_result, respx_mock, test_client):
checks_url = urljoin(test_client._api_url, "checks/")
respx_mock.post(checks_url).mock(
return_value=Response(
@@ -110,9 +108,7 @@ def test_update_check_200(fake_check_api_result, respx_mock, test_client):
def test_get_checks_200(fake_check_api_result, respx_mock, test_client):
assert test_client._client is not None
checks_url = urljoin(test_client._api_url, "checks/")
respx_mock.get(checks_url).mock(
return_value=Response(status_code=200, json={"checks": [fake_check_api_result]})
)
respx_mock.get(checks_url).mock(return_value=Response(status_code=200, json={"checks": [fake_check_api_result]}))
checks = test_client.get_checks()
assert len(checks) == 1
assert checks[0].name == fake_check_api_result["name"]
@@ -121,13 +117,9 @@ def test_get_checks_200(fake_check_api_result, respx_mock, test_client):
@pytest.mark.respx
def test_get_checks_pass_in_client(fake_check_api_result, respx_mock):
httpx_client = HTTPXClient()
test_client = Client(
api_key="test", api_url="http://localhost/api/", client=httpx_client
)
test_client = Client(api_key="test", api_url="http://localhost/api/", client=httpx_client)
checks_url = urljoin(test_client._api_url, "checks/")
respx_mock.get(checks_url).mock(
return_value=Response(status_code=200, json={"checks": [fake_check_api_result]})
)
respx_mock.get(checks_url).mock(return_value=Response(status_code=200, json={"checks": [fake_check_api_result]}))
checks = test_client.get_checks()
assert len(checks) == 1
assert checks[0].name == fake_check_api_result["name"]
@@ -169,9 +161,7 @@ def test_finalizer_closes(test_client):
def test_get_check_200(fake_check_api_result, respx_mock, test_client):
assert test_client._client is not None
checks_url = urljoin(test_client._api_url, "checks/test")
respx_mock.get(checks_url).mock(
return_value=Response(status_code=200, json=fake_check_api_result)
)
respx_mock.get(checks_url).mock(return_value=Response(status_code=200, json=fake_check_api_result))
check = test_client.get_check(check_id="test")
assert check.name == fake_check_api_result["name"]
@@ -188,9 +178,7 @@ def test_check_get_404(respx_mock, test_client):
@pytest.mark.respx
def test_pause_check_200(fake_check_api_result, respx_mock, test_client):
checks_url = urljoin(test_client._api_url, "checks/test/pause")
respx_mock.post(checks_url).mock(
return_value=Response(status_code=200, json=fake_check_api_result)
)
respx_mock.post(checks_url).mock(return_value=Response(status_code=200, json=fake_check_api_result))
check = test_client.pause_check(check_id="test")
assert check.name == fake_check_api_result["name"]
@@ -208,9 +196,7 @@ def test_check_pause_404(respx_mock, test_client):
def test_delete_check_200(fake_check_api_result, respx_mock, test_client):
assert test_client._client is not None
checks_url = urljoin(test_client._api_url, "checks/test")
respx_mock.delete(checks_url).mock(
return_value=Response(status_code=200, json=fake_check_api_result)
)
respx_mock.delete(checks_url).mock(return_value=Response(status_code=200, json=fake_check_api_result))
check = test_client.delete_check(check_id="test")
assert check.name == fake_check_api_result["name"]
@@ -226,11 +212,7 @@ def test_delete_pause404(respx_mock, test_client):
@pytest.mark.respx
def test_get_check_pings_200(fake_check_pings_api_result, respx_mock, test_client):
checks_url = urljoin(test_client._api_url, "checks/test/pings/")
respx_mock.get(checks_url).mock(
return_value=Response(
status_code=200, json={"pings": fake_check_pings_api_result}
)
)
respx_mock.get(checks_url).mock(return_value=Response(status_code=200, json={"pings": fake_check_pings_api_result}))
pings = test_client.get_check_pings("test")
assert len(pings) == len(fake_check_pings_api_result)
assert pings[0].type == fake_check_pings_api_result[0]["type"]
@@ -239,24 +221,16 @@ def test_get_check_pings_200(fake_check_pings_api_result, respx_mock, test_clien
@pytest.mark.respx
def test_get_check_flips_200(fake_check_flips_api_result, respx_mock, test_client):
checks_url = urljoin(test_client._api_url, "checks/test/flips/")
respx_mock.get(checks_url).mock(
return_value=Response(status_code=200, json=fake_check_flips_api_result)
)
respx_mock.get(checks_url).mock(return_value=Response(status_code=200, json=fake_check_flips_api_result))
flips = test_client.get_check_flips("test")
assert len(flips) == len(fake_check_flips_api_result)
assert flips[0].up == fake_check_flips_api_result[0]["up"]
@pytest.mark.respx
def test_get_check_flips_params_200(
fake_check_flips_api_result, respx_mock, test_client
):
checks_url = urljoin(
test_client._api_url, "checks/test/flips/?seconds=1&start=1&end=1"
)
respx_mock.get(checks_url).mock(
return_value=Response(status_code=200, json=fake_check_flips_api_result)
)
def test_get_check_flips_params_200(fake_check_flips_api_result, respx_mock, test_client):
checks_url = urljoin(test_client._api_url, "checks/test/flips/?seconds=1&start=1&end=1")
respx_mock.get(checks_url).mock(return_value=Response(status_code=200, json=fake_check_flips_api_result))
flips = test_client.get_check_flips("test", seconds=1, start=1, end=1)
assert len(flips) == len(fake_check_flips_api_result)
assert flips[0].up == fake_check_flips_api_result[0]["up"]
@@ -273,9 +247,7 @@ def test_get_check_flips_400(fake_check_flips_api_result, respx_mock, test_clien
@pytest.mark.respx
def test_get_integrations(fake_integrations_api_result, respx_mock, test_client):
channels_url = urljoin(test_client._api_url, "channels/")
respx_mock.get(channels_url).mock(
return_value=Response(status_code=200, json=fake_integrations_api_result)
)
respx_mock.get(channels_url).mock(return_value=Response(status_code=200, json=fake_integrations_api_result))
integrations = test_client.get_integrations()
assert len(integrations) == len(fake_integrations_api_result["channels"])
assert integrations[0].id == fake_integrations_api_result["channels"][0]["id"]
@@ -284,9 +256,7 @@ def test_get_integrations(fake_integrations_api_result, respx_mock, test_client)
@pytest.mark.respx
def test_get_badges(fake_badges_api_result, respx_mock, test_client):
channels_url = urljoin(test_client._api_url, "badges/")
respx_mock.get(channels_url).mock(
return_value=Response(status_code=200, json=fake_badges_api_result)
)
respx_mock.get(channels_url).mock(return_value=Response(status_code=200, json=fake_badges_api_result))
integrations = test_client.get_badges()
assert integrations.keys() == fake_badges_api_result["badges"].keys()
@@ -352,14 +322,10 @@ ping_test_parameters = [
@pytest.mark.respx
@pytest.mark.parametrize(
"respx_mocker, tc, url, ping_method, method_kwargs", ping_test_parameters
)
@pytest.mark.parametrize("respx_mocker, tc, url, ping_method, method_kwargs", ping_test_parameters)
def test_success_ping(respx_mocker, tc, url, ping_method, method_kwargs):
channels_url = urljoin(tc._ping_url, url)
respx_mocker.post(channels_url).mock(
return_value=Response(status_code=200, text="OK")
)
respx_mocker.post(channels_url).mock(return_value=Response(status_code=200, text="OK"))
ping_method = getattr(tc, ping_method)
result, text = ping_method(**method_kwargs)
assert result is True

View File

@@ -1,6 +1,4 @@
from datetime import datetime
from typing import Dict
from typing import Union
import pytest
@@ -11,7 +9,7 @@ from healthchecks_io.schemas import checks
@pytest.fixture
def fake_check_api_result() -> Dict[str, Union[str, int]]:
def fake_check_api_result() -> dict[str, str | int]:
yield {
"name": "Test Check",
"slug": "Test Check",
@@ -33,7 +31,7 @@ def fake_check_api_result() -> Dict[str, Union[str, int]]:
@pytest.fixture
def fake_check_ro_api_result() -> Dict[str, Union[str, int]]:
def fake_check_ro_api_result() -> dict[str, str | int]:
yield {
"name": "Test Check",
"slug": "Test Check",

View File

@@ -37,27 +37,19 @@ def test_check_create_validators():
# test validate_schedule
with pytest.raises(ValidationError):
check_create = checks.CheckCreate(
name="Test", tags="", desc="Test", schedule="no good"
)
check_create = checks.CheckCreate(name="Test", tags="", desc="Test", schedule="no good")
# test validate_tz
with pytest.raises(ValidationError):
check_create = checks.CheckCreate(
name="Test", tags="", desc="Test", tz="no good"
)
check_create = checks.CheckCreate(name="Test", tags="", desc="Test", tz="no good")
# test validate_methods
with pytest.raises(ValidationError):
check_create = checks.CheckCreate(
name="Test", tags="", desc="Test", methods="no good"
)
check_create = checks.CheckCreate(name="Test", tags="", desc="Test", methods="no good")
# test validate_unique
with pytest.raises(ValidationError):
check_create = checks.CheckCreate(
name="Test", tags="", desc="Test", unique=["no good"]
)
check_create = checks.CheckCreate(name="Test", tags="", desc="Test", unique=["no good"])
def test_check_pings_from_api():