Improve validation, correlation IDs, and client safeguards

This commit is contained in:
latinogino
2026-01-02 17:52:07 +01:00
parent 59e91efd30
commit 7356d6c409
9 changed files with 501 additions and 80 deletions

View File

@@ -53,6 +53,31 @@ class Config(BaseSettings):
default=8080,
)
allow_ref_autogen: bool = Field(
description="Allow automatic generation of reference fields when missing",
default=False,
)
ref_autogen_prefix: str = Field(
description="Prefix to use when auto-generating references",
default="AUTO",
)
debug_mode: bool = Field(
description="Enable verbose debug logging without exposing secrets",
default=False,
)
max_retries: int = Field(
description="Maximum retries for transient HTTP errors",
default=2,
)
retry_backoff_seconds: float = Field(
description="Base backoff (seconds) for retry strategy",
default=0.5,
)
@field_validator("dolibarr_url")
@classmethod
def validate_dolibarr_url(cls, v: str) -> str:

View File

@@ -1,8 +1,11 @@
"""Professional Dolibarr API client with comprehensive CRUD operations."""
import asyncio
import json
import logging
from typing import Any, Dict, List, Optional
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple
from uuid import uuid4
import aiohttp
from aiohttp import ClientSession, ClientTimeout
@@ -20,6 +23,10 @@ class DolibarrAPIError(Exception):
super().__init__(self.message)
class DolibarrValidationError(DolibarrAPIError):
"""Raised for client-side validation failures before hitting the API."""
class DolibarrClient:
"""Professional Dolibarr API client with comprehensive functionality."""
@@ -30,9 +37,15 @@ class DolibarrClient:
self.api_key = config.api_key
self.session: Optional[ClientSession] = None
self.logger = logging.getLogger(__name__)
self.debug_mode = getattr(config, "debug_mode", False)
self.allow_ref_autogen = getattr(config, "allow_ref_autogen", False)
self.ref_autogen_prefix = getattr(config, "ref_autogen_prefix", "AUTO")
self.max_retries = getattr(config, "max_retries", 2)
self.retry_backoff_seconds = getattr(config, "retry_backoff_seconds", 0.5)
# Configure timeout
self.timeout = ClientTimeout(total=30, connect=10)
self.logger.setLevel(config.log_level)
async def __aenter__(self):
"""Async context manager entry."""
@@ -104,6 +117,113 @@ class DolibarrClient:
return f"{base}/{endpoint}"
def _mask_api_key(self) -> str:
"""Return a masked representation of the API key for logging."""
if not self.api_key:
return "<not-set>"
if len(self.api_key) <= 6:
return "*" * len(self.api_key)
return f"{self.api_key[:2]}***{self.api_key[-2:]}"
@staticmethod
def _now_iso() -> str:
"""Return current UTC timestamp in ISO format with Z suffix."""
return datetime.utcnow().replace(microsecond=0).isoformat() + "Z"
@staticmethod
def _generate_correlation_id() -> str:
"""Create a unique correlation identifier."""
return str(uuid4())
def _build_validation_error(
self,
endpoint: str,
missing_fields: Optional[List[str]] = None,
invalid_fields: Optional[List[Dict[str, str]]] = None,
message: str = "Validation failed",
status: int = 400,
) -> Dict[str, Any]:
"""Build a structured validation error response."""
return {
"error": "Bad Request",
"status": status,
"message": message,
"missing_fields": missing_fields or [],
"invalid_fields": invalid_fields or [],
"endpoint": f"/{endpoint.lstrip('/')}",
"timestamp": self._now_iso(),
}
def _build_internal_error(self, endpoint: str, message: str, correlation_id: str) -> Dict[str, Any]:
"""Build a structured internal server error response."""
return {
"error": "Internal Server Error",
"status": 500,
"message": message,
"correlation_id": correlation_id,
"endpoint": f"/{endpoint.lstrip('/')}",
"timestamp": self._now_iso(),
}
def _apply_aliases(self, payload: Dict[str, Any], aliases: Dict[str, List[str]]) -> None:
"""Promote alias fields to canonical names."""
for target, options in aliases.items():
if target not in payload:
for alias in options:
if alias in payload and payload[alias] not in (None, ""):
payload[target] = payload.pop(alias)
break
def _validate_payload(
self,
endpoint: str,
payload: Dict[str, Any],
required_fields: List[str],
aliases: Optional[Dict[str, List[str]]] = None,
numeric_positive: Optional[List[str]] = None,
enum_fields: Optional[Dict[str, List[Any]]] = None,
) -> Dict[str, Any]:
"""Validate payload before sending to Dolibarr and optionally auto-generate refs."""
aliases = aliases or {}
numeric_positive = numeric_positive or []
enum_fields = enum_fields or {}
self._apply_aliases(payload, aliases)
missing_fields = [
field
for field in required_fields
if field not in payload or payload[field] in (None, "")
]
invalid_fields: List[Dict[str, str]] = []
for field in numeric_positive:
if field in payload and isinstance(payload[field], (int, float)) and payload[field] < 0:
invalid_fields.append({"field": field, "message": "must be a positive number"})
for field, values in enum_fields.items():
if field in payload and payload[field] not in values:
invalid_fields.append({"field": field, "message": f"must be one of {values}"})
if "ref" in missing_fields and self.allow_ref_autogen:
payload["ref"] = f"{self.ref_autogen_prefix}_{datetime.utcnow().strftime('%Y%m%d%H%M%S')}"
missing_fields = [f for f in missing_fields if f != "ref"]
if missing_fields or invalid_fields:
error_data = self._build_validation_error(
endpoint=endpoint,
missing_fields=missing_fields,
invalid_fields=invalid_fields,
)
raise DolibarrValidationError(
message=error_data["message"],
status_code=error_data["status"],
response_data=error_data,
)
return payload
async def _make_request(
self,
method: str,
@@ -117,76 +237,155 @@ class DolibarrClient:
url = self._build_url(endpoint)
try:
self.logger.debug(f"Making {method} request to {url}")
kwargs = {
"params": params or {},
}
if data and method.upper() in ["POST", "PUT"]:
kwargs["json"] = data
async with self.session.request(method, url, **kwargs) as response:
response_text = await response.text()
# Log response for debugging
self.logger.debug(f"Response status: {response.status}")
self.logger.debug(f"Response text: {response_text[:500]}...")
# Try to parse JSON response
try:
response_data = json.loads(response_text) if response_text else {}
except json.JSONDecodeError:
response_data = {"raw_response": response_text}
# Handle error responses
if response.status >= 400:
error_msg = f"HTTP {response.status}: {response.reason}"
if isinstance(response_data, dict):
if "error" in response_data:
error_details = response_data["error"]
if isinstance(error_details, dict):
error_msg = error_details.get("message", error_msg)
if "code" in error_details:
error_msg = f"{error_msg} (Code: {error_details['code']})"
else:
error_msg = str(error_details)
elif "message" in response_data:
error_msg = response_data["message"]
raise DolibarrAPIError(
message=error_msg,
status_code=response.status,
response_data=response_data
last_exception: Optional[Exception] = None
for attempt in range(self.max_retries + 1):
try:
if self.debug_mode:
self.logger.debug(
"Making %s request to %s with params=%s payload_keys=%s api_key=%s",
method,
url,
params or {},
list((data or {}).keys()),
self._mask_api_key(),
)
return response_data
kwargs = {
"params": params or {},
}
except aiohttp.ClientError as e:
# For status endpoint, try alternative URL if first attempt fails
if endpoint == "status" and not url.endswith("/api/status"):
try:
# Try with /api/index.php/setup/modules as alternative
alt_url = f"{self.base_url}/setup/modules"
self.logger.debug(f"Status failed, trying alternative: {alt_url}")
if data and method.upper() in ["POST", "PUT"]:
kwargs["json"] = data
async with self.session.request(method, url, **kwargs) as response:
response_text = await response.text()
async with self.session.get(alt_url) as response:
if response.status == 200:
# Return a status-like response
return {
"success": 1,
"dolibarr_version": "API Available",
"api_version": "1.0"
}
except:
pass
raise DolibarrAPIError(f"HTTP client error: {endpoint}")
except Exception as e:
if isinstance(e, DolibarrAPIError):
# Log response for debugging without leaking secrets
if self.debug_mode:
self.logger.debug("Response status: %s", response.status)
self.logger.debug("Response body (truncated): %s", response_text[:500])
# Try to parse JSON response
try:
response_data = json.loads(response_text) if response_text else {}
except json.JSONDecodeError:
response_data = {"raw_response": response_text}
# Handle error responses
if response.status >= 400:
if response.status == 400:
missing = []
invalid: List[Dict[str, str]] = []
if isinstance(response_data, dict):
if "missing_fields" in response_data:
missing = response_data.get("missing_fields") or []
if "invalid_fields" in response_data:
invalid = response_data.get("invalid_fields") or []
# Heuristic: derive missing ref from message
if not missing and isinstance(response_data.get("error"), str):
if "ref" in response_data.get("error").lower():
missing.append("ref")
if not missing and "message" in response_data and "ref" in str(response_data["message"]).lower():
missing.append("ref")
error_data = self._build_validation_error(
endpoint=endpoint,
missing_fields=missing,
invalid_fields=invalid,
message="Validation failed",
)
raise DolibarrValidationError(
message=error_data["message"],
status_code=400,
response_data=error_data,
)
if response.status >= 500:
correlation_id = self._generate_correlation_id()
internal_error = self._build_internal_error(
endpoint=endpoint,
message=response_data.get("message", f"An unexpected error occurred while processing {endpoint}"),
correlation_id=correlation_id,
)
self.logger.error(
"Server error %s for %s (correlation_id=%s): %s",
response.status,
endpoint,
correlation_id,
response_text[:500],
)
raise DolibarrAPIError(
message=internal_error["message"],
status_code=response.status,
response_data=internal_error,
)
error_msg = f"HTTP {response.status}: {response.reason}"
if isinstance(response_data, dict):
if "message" in response_data:
error_msg = response_data["message"]
elif "error" in response_data and isinstance(response_data["error"], str):
error_msg = response_data["error"]
raise DolibarrAPIError(
message=error_msg,
status_code=response.status,
response_data=response_data,
)
return response_data
except aiohttp.ClientError as e:
last_exception = e
if endpoint == "status" and not url.endswith("/api/status"):
try:
alt_url = f"{self.base_url}/setup/modules"
self.logger.debug(f"Status failed, trying alternative: {alt_url}")
async with self.session.get(alt_url) as response:
if response.status == 200:
return {
"success": 1,
"dolibarr_version": "API Available",
"api_version": "1.0"
}
except Exception as alt_exc: # pylint: disable=broad-except
last_exception = alt_exc
if attempt < self.max_retries and isinstance(e, aiohttp.ClientResponseError) and e.status in {502, 503, 504}:
backoff = self.retry_backoff_seconds * (2 ** attempt)
await asyncio.sleep(backoff)
continue
break
except DolibarrAPIError:
raise
raise DolibarrAPIError(f"Unexpected error: {str(e)}")
except Exception as e: # pylint: disable=broad-except
last_exception = e
break
if isinstance(last_exception, DolibarrAPIError):
raise last_exception
if isinstance(last_exception, Exception):
correlation_id = self._generate_correlation_id()
internal_error = self._build_internal_error(
endpoint=endpoint,
message=str(last_exception),
correlation_id=correlation_id,
)
self.logger.error(
"Unexpected error during %s %s (correlation_id=%s): %s",
method,
endpoint,
correlation_id,
last_exception,
)
raise DolibarrAPIError(
message=internal_error["message"],
status_code=500,
response_data=internal_error,
) from last_exception
raise DolibarrAPIError(f"HTTP client error: {endpoint}")
# ============================================================================
# SYSTEM ENDPOINTS
@@ -359,6 +558,14 @@ class DolibarrClient:
) -> Dict[str, Any]:
"""Create a new product or service."""
payload = self._merge_payload(data, **kwargs)
payload = self._validate_payload(
endpoint="products",
payload=payload,
required_fields=["ref", "label", "type", "price"],
aliases={"label": ["name"]},
numeric_positive=["price"],
enum_fields={"type": ["product", "service", 0, 1]},
)
result = await self.request("POST", "products", data=payload)
return self._extract_identifier(result)
@@ -414,6 +621,12 @@ class DolibarrClient:
if "product_type" in line:
line["product_type"] = line["product_type"]
payload = self._validate_payload(
endpoint="invoices",
payload=payload,
required_fields=["socid"],
)
result = await self.request("POST", "invoices", data=payload)
return self._extract_identifier(result)
@@ -573,6 +786,12 @@ class DolibarrClient:
async def create_project(self, data: Optional[Dict[str, Any]] = None, **kwargs) -> Dict[str, Any]:
"""Create a new project."""
payload = self._merge_payload(data, **kwargs)
payload = self._validate_payload(
endpoint="projects",
payload=payload,
required_fields=["ref", "name", "socid"],
aliases={"name": ["title"]},
)
result = await self.request("POST", "projects", data=payload)
return self._extract_identifier(result)

View File

@@ -4,6 +4,8 @@ import asyncio
import json
import sys
import logging
import uuid
from datetime import datetime
from contextlib import asynccontextmanager
# Import MCP components
@@ -1377,12 +1379,24 @@ async def handle_call_tool(name: str, arguments: dict):
return [TextContent(type="text", text=json.dumps(result, indent=2))]
except DolibarrAPIError as e:
error_result = {"error": f"Dolibarr API Error: {str(e)}", "type": "api_error"}
return [TextContent(type="text", text=json.dumps(error_result, indent=2))]
error_payload = e.response_data or {
"error": "Dolibarr API Error",
"status": e.status_code or 500,
"message": str(e),
"timestamp": datetime.utcnow().isoformat() + "Z",
}
return [TextContent(type="text", text=json.dumps(error_payload, indent=2))]
except Exception as e:
error_result = {"error": f"Tool execution failed: {str(e)}", "type": "internal_error"}
print(f"🔥 Tool execution error: {e}", file=sys.stderr) # Debug logging
correlation_id = str(uuid.uuid4())
error_result = {
"error": "Internal Server Error",
"status": 500,
"message": f"Tool execution failed: {str(e)}",
"correlation_id": correlation_id,
"timestamp": datetime.utcnow().isoformat() + "Z",
}
print(f"🔥 Tool execution error ({correlation_id}): {e}", file=sys.stderr) # Debug logging
return [TextContent(type="text", text=json.dumps(error_result, indent=2))]