refactor(lbc): modernize typing and exports across client and models

This commit is contained in:
etienne-hd
2026-04-29 13:53:19 +02:00
parent ea459409c0
commit cbdf3d7821
16 changed files with 420 additions and 254 deletions

View File

@@ -1,2 +1,29 @@
from .client import Client
from .model import *
from .model import (
Proxy,
Search,
Ad,
User,
OwnerType,
AdType,
Sort,
Department,
Region,
Category,
City,
)
__all__ = [
"Client",
"Proxy",
"Search",
"Ad",
"User",
"OwnerType",
"AdType",
"Sort",
"Department",
"Region",
"Category",
"City",
]

View File

@@ -1,49 +1,54 @@
from typing import Optional, Union
from curl_cffi import BrowserTypeLiteral
import curl_cffi
from .mixin import (
SessionMixin,
SearchMixin,
UserMixin,
AdMixin
)
from .mixin import SessionMixin, SearchMixin, UserMixin, AdMixin
from .model import Proxy
from .exceptions import DatadomeError, RequestError, NotFoundError
class Client(
SessionMixin,
SearchMixin,
UserMixin,
AdMixin
):
def __init__(self, proxy: Optional[Proxy] = None, impersonate: BrowserTypeLiteral = None,
request_verify: bool = True, timeout: float = 30.0, max_retries: int = 5):
class Client(SessionMixin, SearchMixin, UserMixin, AdMixin):
def __init__(
self,
proxy: Proxy | None = None,
impersonate: BrowserTypeLiteral = None,
request_verify: bool = True,
timeout: float = 30.0,
max_retries: int = 5,
):
"""
Initializes a Leboncoin Client instance with optional proxy, browser impersonation, and SSL verification settings.
If no `impersonate` value is provided, a random browser type will be selected among common options.
Args:
proxy (Optional[Proxy], optional): Proxy configuration to use for the client. If provided, it will be applied to all requests. Defaults to None.
proxy (Proxy | None, optional): Proxy configuration to use for the client. If provided, it will be applied to all requests. Defaults to None.
impersonate (BrowserTypeLiteral, optional): Browser type to impersonate for requests (e.g., "firefox", "chrome", "edge", "safari", "safari_ios", "chrome_android"). If None, a random browser type will be chosen.
request_verify (bool, optional): Whether to verify SSL certificates when sending requests. Set to False to disable SSL verification (not recommended for production). Defaults to True.
timeout (int, optional): Maximum time in seconds to wait for a request before timing out. Defaults to 30.
timeout (float, optional): Maximum time in seconds to wait for a request before timing out. Defaults to 30.
max_retries (int, optional): Maximum number of times to retry a request in case of failure (403 error). Defaults to 5.
"""
super().__init__(proxy=proxy, impersonate=impersonate, request_verify=request_verify)
super().__init__(
proxy=proxy, impersonate=impersonate, request_verify=request_verify
)
self.request_verify = request_verify
self.timeout = timeout
self.max_retries = max_retries
def _fetch(self, method: str, url: str, payload: Optional[dict] = None, max_retries: int = -1) -> dict:
def _fetch(
self,
method: str,
url: str,
payload: dict | None = None,
max_retries: int = -1,
) -> dict:
"""
Internal method to send an HTTP request using the configured session.
Args:
method (str): HTTP method to use (e.g., "GET", "POST").
url (str): Full URL of the API endpoint.
payload (Optional[dict], optional): JSON payload to send with the request. Used for POST/PUT methods. Defaults to None.
payload (dict | None, optional): JSON payload to send with the request. Used for POST/PUT methods. Defaults to None.
timeout (int, optional): Timeout for the request, in seconds. Defaults to 30.
max_retries (int, optional): Number of times to retry the request in case of failure. Defaults to 5.
@@ -57,24 +62,36 @@ class Client(
if max_retries == -1:
max_retries = self.max_retries
response = self.session.request(
response: curl_cffi.Response = self.session.request(
method=method,
url=url,
url=url,
json=payload,
verify=self.request_verify,
timeout=self.timeout
timeout=self.timeout,
)
if response.ok:
return response.json()
elif response.status_code == 403:
if max_retries > 0:
self.session = self._init_session(proxy=self._proxy, impersonate=self._impersonate, request_verify=self.request_verify) # Re-init session
return self._fetch(method=method, url=url, payload=payload, max_retries=max_retries - 1)
self.session = self._init_session(
proxy=self._proxy,
impersonate=self._impersonate,
request_verify=self.request_verify,
) # Re-init session
return self._fetch(
method=method, url=url, payload=payload, max_retries=max_retries - 1
)
if self.proxy:
raise DatadomeError(f"Access blocked by Datadome: your proxy appears to have a poor reputation, try to change it.")
raise DatadomeError(
"Access blocked by Datadome: your proxy appears to have a poor reputation, try to change it."
)
else:
raise DatadomeError(f"Access blocked by Datadome: your activity was flagged as suspicious. Please avoid sending excessive requests.")
raise DatadomeError(
"Access blocked by Datadome: your activity was flagged as suspicious. Please avoid sending excessive requests."
)
elif response.status_code == 404 or response.status_code == 410:
raise NotFoundError(f"Unable to find ad or user.")
raise NotFoundError("Unable to find ad or user.")
else:
raise RequestError(f"Request failed with status code {response.status_code}.")
raise RequestError(
f"Request failed with status code {response.status_code}."
)

View File

@@ -1,14 +1,18 @@
class LBCError(Exception):
"""Base exception for all errors raised by the LBC client."""
class InvalidValue(LBCError):
"""Raised when a provided value is invalid or improperly formatted."""
class RequestError(LBCError):
"""Raised when an HTTP request fails with a non-success status code."""
class DatadomeError(RequestError):
"""Raised when access is blocked by Datadome anti-bot protection."""
class NotFoundError(LBCError):
"""Raised when a user or ad is not found."""

View File

@@ -1,4 +1,11 @@
from .session import SessionMixin
from .search import SearchMixin
from .user import UserMixin
from .ad import AdMixin
from .ad import AdMixin
__all__ = [
"SessionMixin",
"SearchMixin",
"UserMixin",
"AdMixin"
]

View File

@@ -1,9 +1,8 @@
from typing import Union
from ..model import Ad
class AdMixin:
def get_ad(self, ad_id: Union[str, int]) -> Ad:
def get_ad(self, ad_id: str | int) -> Ad:
"""
Retrieve detailed information about a classified ad using its ID.
@@ -17,5 +16,8 @@ class AdMixin:
Returns:
Ad: An `Ad` object containing the parsed ad information.
"""
body = self._fetch(method="GET", url=f"https://api.leboncoin.fr/api/adfinder/v1/classified/{ad_id}")
return Ad._build(raw=body, client=self)
body = self._fetch(
method="GET",
url=f"https://api.leboncoin.fr/api/adfinder/v1/classified/{ad_id}",
)
return Ad._build(raw=body, client=self)

View File

@@ -1,24 +1,27 @@
from typing import Optional, Union, List
from ..model import Category, Sort, Region, Department, City, AdType, OwnerType, Search
from ..utils import build_search_payload_with_args, build_search_payload_with_url
class SearchMixin:
def search(
self,
url: Optional[str] = None,
text: Optional[str] = None,
url: str | None = None,
text: str | None = None,
category: Category = Category.TOUTES_CATEGORIES,
sort: Sort = Sort.RELEVANCE,
locations: Optional[Union[List[Union[Region, Department, City]], Union[Region, Department, City]]] = None,
limit: int = 35,
limit_alu: int = 3,
page: int = 1,
locations: list[Region, Department, City]
| Region
| Department
| City
| None = None,
limit: int = 35,
limit_alu: int = 3,
page: int = 1,
ad_type: AdType = AdType.OFFER,
owner_type: Optional[OwnerType] = None,
shippable: Optional[bool] = None,
owner_type: OwnerType | None = None,
shippable: bool | None = None,
search_in_title_only: bool = False,
**kwargs
**kwargs,
) -> Search:
"""
Perform a classified ads search on Leboncoin with the specified criteria.
@@ -28,7 +31,7 @@ class SearchMixin:
- Or use the individual parameters (`text`, `category`, `locations`, etc.) to construct a custom search.
Args:
url (Optional[str], optional): A full Leboncoin search URL. If provided, all other parameters will be ignored and the search will replicate the results from the URL.
url (Optional[str], optional): A full Leboncoin search URL. If provided, all other parameters will be ignored and the search will replicate the results from the URL.
text (Optional[str], optional): Search keywords. If None, returns all matching ads without filtering by keyword. Defaults to None.
category (Category, optional): Category to search in. Defaults to Category.TOUTES_CATEGORIES.
sort (Sort, optional): Sorting method for results (e.g., relevance, date, price). Defaults to Sort.RELEVANCE.
@@ -46,15 +49,24 @@ class SearchMixin:
Search: A `Search` object containing the parsed search results.
"""
if url:
payload = build_search_payload_with_url(
url=url, limit=limit, page=page
)
payload = build_search_payload_with_url(url=url, limit=limit, page=page)
else:
payload = build_search_payload_with_args(
text=text, category=category, sort=sort, locations=locations,
limit=limit, limit_alu=limit_alu, page=page, ad_type=ad_type,
owner_type=owner_type, shippable=shippable, search_in_title_only=search_in_title_only, **kwargs
text=text,
category=category,
sort=sort,
locations=locations,
limit=limit,
limit_alu=limit_alu,
page=page,
ad_type=ad_type,
owner_type=owner_type,
shippable=shippable,
search_in_title_only=search_in_title_only,
**kwargs,
)
body = self._fetch(method="POST", url="https://api.leboncoin.fr/finder/search", payload=payload)
return Search._build(raw=body, client=self)
body = self._fetch(
method="POST", url="https://api.leboncoin.fr/finder/search", payload=payload
)
return Search._build(raw=body, client=self)

View File

@@ -1,14 +1,21 @@
from curl_cffi import requests, BrowserTypeLiteral
from typing import Optional
import random
import uuid
from ..model import Proxy
class SessionMixin:
def __init__(self, proxy: Optional[Proxy] = None, impersonate: BrowserTypeLiteral = None,
request_verify: bool = True, **kwargs):
self.session = self._init_session(proxy=proxy, impersonate=impersonate, request_verify=request_verify)
def __init__(
self,
proxy: Proxy | None = None,
impersonate: BrowserTypeLiteral = None,
request_verify: bool = True,
**kwargs,
):
self.session = self._init_session(
proxy=proxy, impersonate=impersonate, request_verify=request_verify
)
self._proxy = proxy
self._impersonate = impersonate
super().__init__(**kwargs)
@@ -19,22 +26,90 @@ class SessionMixin:
# LBC;<OS>;<OS_VERSION>;<MODEL>;phone;<DEVICE_ID>;wifi;<APP_VERSION>
os = random.choice(["iOS", "Android"])
if os == "iOS":
os_version = random.choice(["18.0", "18.1", "18.2", "18.3", "18.4", "18.5", "18.6", "18.7", "18.7.3",
"26.0", "26.1", "26.2"])
os_version = random.choice(
[
"18.0",
"18.1",
"18.2",
"18.3",
"18.4",
"18.5",
"18.6",
"18.7",
"18.7.3",
"26.0",
"26.1",
"26.2",
]
)
model = "iPhone"
device_id = str(uuid.uuid4())
app_version = random.choice(["101.45.0", "101.44.0", "101.43.1", "101.43.0", "101.42.1", "101.42.0", "101.41.0", "101.40.0", "101.39.0", "101.38.0"])
app_version = random.choice(
[
"101.45.0",
"101.44.0",
"101.43.1",
"101.43.0",
"101.42.1",
"101.42.0",
"101.41.0",
"101.40.0",
"101.39.0",
"101.38.0",
]
)
else:
os_version = random.choice(["11", "12", "13", "14", "15"])
model = random.choice(["SM-G991B", "SM-G996B", "SM-G998B", "SM-S911B", "SM-S916B", "SM-S918B", "SM-A505F", "SM-A546B", "SM-A137F", "SM-M336B",
"Pixel 5", "Pixel 6", "Pixel 6a", "Pixel 7", "Pixel 7 Pro", "Pixel 8", "Pixel 8 Pro",
"Mi 10", "Mi 11", "Mi 11 Lite", "Redmi Note 10", "Redmi Note 11", "Redmi Note 12", "POCO F3", "POCO F4", "POCO X3 Pro",
"ONEPLUS A6003", "ONEPLUS A6013", "ONEPLUS A5000", "ONEPLUS A5010", "OnePlus 8", "OnePlus 9", "OnePlus 10 Pro", "OnePlus Nord"])
model = random.choice(
[
"SM-G991B",
"SM-G996B",
"SM-G998B",
"SM-S911B",
"SM-S916B",
"SM-S918B",
"SM-A505F",
"SM-A546B",
"SM-A137F",
"SM-M336B",
"Pixel 5",
"Pixel 6",
"Pixel 6a",
"Pixel 7",
"Pixel 7 Pro",
"Pixel 8",
"Pixel 8 Pro",
"Mi 10",
"Mi 11",
"Mi 11 Lite",
"Redmi Note 10",
"Redmi Note 11",
"Redmi Note 12",
"POCO F3",
"POCO F4",
"POCO X3 Pro",
"ONEPLUS A6003",
"ONEPLUS A6013",
"ONEPLUS A5000",
"ONEPLUS A5010",
"OnePlus 8",
"OnePlus 9",
"OnePlus 10 Pro",
"OnePlus Nord",
]
)
device_id = uuid.uuid4().hex[:16]
app_version = random.choice(["100.85.2", "100.84.1", "100.83.1", "100.82.0", "100.81.1"])
app_version = random.choice(
["100.85.2", "100.84.1", "100.83.1", "100.82.0", "100.81.1"]
)
return f"LBC;{os};{os_version};{model};phone;{device_id};wifi;{app_version}"
def _init_session(self, proxy: Optional[Proxy] = None, impersonate: BrowserTypeLiteral = None, request_verify: bool = True) -> requests.Session:
def _init_session(
self,
proxy: Proxy | None = None,
impersonate: BrowserTypeLiteral = None,
request_verify: bool = True,
) -> requests.Session:
"""
Initializes an HTTP session with optional proxy configuration and browser impersonation.
@@ -42,57 +117,44 @@ class SessionMixin:
Args:
proxy (Optional[Proxy], optional): Proxy configuration to use for the session. If provided, it will be applied to both HTTP and HTTPS traffic. Defaults to None.
impersonate (BrowserTypeLiteral, optional): Browser type to impersonate for requests (e.g., "firefox", "chrome", "edge", "safari", "safari_ios", "chrome_android"). If None, a random browser type will be chosen.
impersonate (BrowserTypeLiteral, optional): Browser type to impersonate for requests (e.g., "firefox", "chrome", "edge", "safari", "safari_ios", "chrome_android"). If None, a random browser type will be chosen.
request_verify (bool, optional): Whether to verify SSL certificates for HTTPS requests. Defaults to True.
Returns:
requests.Session: A configured session instance ready to send requests.
"""
if impersonate == None: # Pick a random browser client
if impersonate is None: # Pick a random browser client
impersonate: BrowserTypeLiteral = random.choice(
[
"safari",
"safari_ios",
"chrome_android",
"firefox"
]
["safari", "safari_ios", "chrome_android", "firefox"]
)
session = requests.Session(
impersonate=impersonate
)
session = requests.Session(impersonate=impersonate)
session.headers.update(
{
'User-Agent': self._generate_user_agent(),
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-site',
"User-Agent": self._generate_user_agent(),
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-site",
}
)
if proxy:
session.proxies = {
"http": proxy.url,
"https": proxy.url
}
session.proxies = {"http": proxy.url, "https": proxy.url}
session.get("https://www.leboncoin.fr/", verify=request_verify) # Init cookies
session.get("https://www.leboncoin.fr/", verify=request_verify) # Init cookies
return session
@property
def proxy(self) -> Proxy:
return self._proxy
@proxy.setter
def proxy(self, value: Proxy):
if value:
if isinstance(value, Proxy):
self.session.proxies = {
"http": value.url,
"https": value.url
}
self.session.proxies = {"http": value.url, "https": value.url}
else:
raise TypeError("Proxy must be an instance of the lbc.Proxy")
else:
self.session.proxies = {}
self._proxy = value
self._proxy = value

View File

@@ -1,6 +1,7 @@
from ..model import User
from ..exceptions import NotFoundError
class UserMixin:
def get_user(self, user_id: str) -> User:
"""
@@ -15,12 +16,18 @@ class UserMixin:
Returns:
User: A `User` object containing the parsed user information.
"""
user_data = self._fetch(method="GET", url=f"https://api.leboncoin.fr/api/user-card/v2/{user_id}/infos")
user_data = self._fetch(
method="GET",
url=f"https://api.leboncoin.fr/api/user-card/v2/{user_id}/infos",
)
pro_data = None
if user_data.get("account_type") == "pro":
try:
pro_data = self._fetch(method="GET", url=f"https://api.leboncoin.fr/api/onlinestores/v2/users/{user_id}?fields=all")
pro_data = self._fetch(
method="GET",
url=f"https://api.leboncoin.fr/api/onlinestores/v2/users/{user_id}?fields=all",
)
except NotFoundError:
pass # Some professional users may not have a Leboncoin page.
return User._build(user_data=user_data, pro_data=pro_data)
pass # Some professional users may not have a Leboncoin page.
return User._build(user_data=user_data, pro_data=pro_data)

View File

@@ -2,5 +2,19 @@ from .proxy import Proxy
from .search import Search
from .ad import Ad
from .user import User
from .enums import *
from .city import City
from .enums import OwnerType, AdType, Sort, Department, Region, Category
from .city import City
__all__ = [
"Proxy",
"Search",
"Ad",
"User",
"OwnerType",
"AdType",
"Sort",
"Department",
"Region",
"Category",
"City",
]

View File

@@ -1,8 +1,9 @@
from dataclasses import dataclass
from typing import List, Any, Optional
from typing import Any
from .user import User
@dataclass
class Location:
country_id: str
@@ -19,17 +20,19 @@ class Location:
provider: str
is_shape: bool
@dataclass
class Attribute:
key: str
key_label: Optional[str]
key_label: str | None
value: str
value_label: str
values: List[str]
values_label: Optional[List[str]]
value_label_reader: Optional[str]
values: list[str]
values_label: list[str] | None
value_label_reader: str | None
generic: bool
@dataclass
class Ad:
id: int
@@ -45,11 +48,11 @@ class Ad:
ad_type: str
url: str
price: float
images: List[str]
attributes: List[Attribute]
images: list[str]
attributes: list[Attribute]
location: Location
has_phone: bool
favorites: int # Unvailaible on Ad from Search
favorites: int # Unavailable on Ad from Search
_client: Any
_user_id: str
@@ -57,7 +60,7 @@ class Ad:
@staticmethod
def _build(raw: dict, client: Any) -> "Ad":
attributes: List[Attribute] = []
attributes: list[Attribute] = []
for raw_attribute in raw.get("attributes", []):
attributes.append(
Attribute(
@@ -68,10 +71,10 @@ class Ad:
values=raw_attribute.get("values"),
values_label=raw_attribute.get("values_label"),
value_label_reader=raw_attribute.get("value_label_reader"),
generic=raw_attribute.get("generic")
generic=raw_attribute.get("generic"),
)
)
raw_location: dict = raw.get("location", {})
location = Location(
country_id=raw_location.get("country_id"),
@@ -86,9 +89,9 @@ class Ad:
lng=raw_location.get("lng"),
source=raw_location.get("source"),
provider=raw_location.get("provider"),
is_shape=raw_location.get("is_shape")
is_shape=raw_location.get("is_shape"),
)
raw_owner: dict = raw.get("owner", {})
return Ad(
id=raw.get("list_id"),
@@ -111,15 +114,15 @@ class Ad:
favorites=raw.get("counters", {}).get("favorites"),
_client=client,
_user_id=raw_owner.get("user_id"),
_user=None
_user=None,
)
@property
def title(self) -> str:
return self.subject
@property
def user(self) -> User:
if self._user is None:
self._user = self._client.get_user(user_id=self._user_id)
return self._user
return self._user

View File

@@ -1,9 +1,8 @@
from dataclasses import dataclass
from typing import Optional
@dataclass
class City:
lat: float
lng: float
radius: int = 10_000
city: Optional[str] = None
city: str | None = None

View File

@@ -1,14 +1,17 @@
from enum import Enum
class OwnerType(Enum):
PRO = "pro"
PRIVATE = "private"
ALL = "all"
class AdType(Enum):
OFFER = "offer"
DEMAND = "demand"
class Sort(Enum):
RELEVANCE = ("relevance", None)
NEWEST = ("time", "desc")
@@ -16,6 +19,7 @@ class Sort(Enum):
EXPENSIVE = ("price", "asc")
CHEAPEST = ("price", "desc")
class Department(Enum):
BAS_RHIN = ("1", "ALSACE", "67", "BAS_RHIN")
HAUT_RHIN = ("1", "ALSACE", "68", "HAUT_RHIN")
@@ -97,7 +101,12 @@ class Department(Enum):
CHARENTE_MARITIME = ("20", "POITOU_CHARENTES", "17", "CHARENTE_MARITIME")
DEUX_SEVRES = ("20", "POITOU_CHARENTES", "79", "DEUX_SEVRES")
VIENNE = ("20", "POITOU_CHARENTES", "86", "VIENNE")
ALPES_DE_HAUTE_PROVENCE = ("21", "PROVENCE_ALPES_COTE_DAZUR", "4", "ALPES_DE_HAUTE_PROVENCE")
ALPES_DE_HAUTE_PROVENCE = (
"21",
"PROVENCE_ALPES_COTE_DAZUR",
"4",
"ALPES_DE_HAUTE_PROVENCE",
)
HAUTES_ALPES = ("21", "PROVENCE_ALPES_COTE_DAZUR", "5", "HAUTES_ALPES")
ALPES_MARITIMES = ("21", "PROVENCE_ALPES_COTE_DAZUR", "6", "ALPES_MARITIMES")
BOUCHES_DU_RHONE = ("21", "PROVENCE_ALPES_COTE_DAZUR", "13", "BOUCHES_DU_RHONE")
@@ -112,6 +121,7 @@ class Department(Enum):
SAVOIE = ("22", "RHONE_ALPES", "73", "SAVOIE")
HAUTE_SAVOIE = ("22", "RHONE_ALPES", "74", "HAUTE_SAVOIE")
class Region(Enum):
ALSACE = ("1", "ALSACE")
AQUITAINE = ("2", "AQUITAINE")
@@ -148,6 +158,7 @@ class Region(Enum):
RHONE_ALPES = ("22", "RHONE_ALPES")
REUNION = ("26", "REUNION")
class Category(Enum):
TOUTES_CATEGORIES = "0"
EMPLOI = "71"
@@ -263,4 +274,4 @@ class Category(Enum):
SERVICES_AUTRES_SERVICES = "34"
DONS = "1000"
DIVERS = "37"
DIVERS_AUTRES = "38"
DIVERS_AUTRES = "38"

View File

@@ -1,12 +1,11 @@
from dataclasses import dataclass
from typing import Union, Optional
@dataclass
class Proxy:
host: str
port: Union[str, int]
username: Optional[str] = None
password: Optional[str] = None
port: str | int
username: str | None = None
password: str | None = None
scheme: str = "http"
@property
@@ -14,4 +13,4 @@ class Proxy:
if self.username and self.password:
return f"{self.scheme}://{self.username}:{self.password}@{self.host}:{self.port}"
else:
return f"{self.scheme}://{self.host}:{self.port}"
return f"{self.scheme}://{self.host}:{self.port}"

View File

@@ -1,8 +1,9 @@
from dataclasses import dataclass
from typing import List, Any
from typing import Any
from .ad import Ad
@dataclass
class Search:
total: int
@@ -13,14 +14,11 @@ class Search:
total_inactive: int
total_shippable: int
max_pages: int
ads: List[Ad]
ads: list[Ad]
@staticmethod
def _build(raw: dict, client: Any) -> "Search":
ads: List[Ad] = [
Ad._build(raw=ad, client=client)
for ad in raw.get("ads", [])
]
ads: list[Ad] = [Ad._build(raw=ad, client=client) for ad in raw.get("ads", [])]
return Search(
total=raw.get("total"),
@@ -32,4 +30,4 @@ class Search:
total_shippable=raw.get("total_shippable"),
max_pages=raw.get("max_pages"),
ads=ads,
)
)

View File

@@ -1,5 +1,4 @@
from dataclasses import dataclass
from typing import List, Optional
@dataclass
class Reply:
@@ -9,6 +8,7 @@ class Reply:
rate: int
reply_time_text: str
@dataclass
class Presence:
status: str
@@ -16,11 +16,13 @@ class Presence:
last_activity: str
enabled: bool
@dataclass
class Badge:
type: str
name: str
@dataclass
class Feedback:
overall_score: float
@@ -39,6 +41,7 @@ class Feedback:
def score(self) -> float:
return self.overall_score * 5 if self.overall_score else None
@dataclass
class Location:
address: str
@@ -56,6 +59,7 @@ class Location:
department_label: str
country: str
@dataclass
class Review:
author_name: str
@@ -63,6 +67,7 @@ class Review:
text: str
review_time: str
@dataclass
class Rating:
rating_value: int
@@ -71,7 +76,8 @@ class Rating:
source_display: str
retrieval_time: str
url: str
reviews: List[Review]
reviews: list[Review]
@dataclass
class Pro:
@@ -93,6 +99,7 @@ class Pro:
website_url: str
rating: Rating
@dataclass
class User:
id: str
@@ -103,15 +110,15 @@ class User:
profile_picture: str
reply: Reply
presence: Presence
badges: List[Badge]
badges: list[Badge]
total_ads: int
store_id: int
account_type: str
description: str
pro: Optional[Pro]
pro: Pro | None
@staticmethod
def _build(user_data: dict, pro_data: Optional[dict]) -> "User":
def _build(user_data: dict, pro_data: dict | None) -> "User":
raw_feedback = user_data.get("feedback", {})
feedback = Feedback(
overall_score=raw_feedback.get("overall_score"),
@@ -120,11 +127,15 @@ class User:
conformity=raw_feedback.get("category_scores", {}).get("CONFORMITY"),
package=raw_feedback.get("category_scores", {}).get("PACKAGE"),
product=raw_feedback.get("category_scores", {}).get("PRODUCT"),
recommendation=raw_feedback.get("category_scores", {}).get("RECOMMENDATION"),
recommendation=raw_feedback.get("category_scores", {}).get(
"RECOMMENDATION"
),
respect=raw_feedback.get("category_scores", {}).get("RESPECT"),
transaction=raw_feedback.get("category_scores", {}).get("TRANSACTION"),
user_attention=raw_feedback.get("category_scores", {}).get("USER_ATTENTION"),
received_count=raw_feedback.get("received_count")
user_attention=raw_feedback.get("category_scores", {}).get(
"USER_ATTENTION"
),
received_count=raw_feedback.get("received_count"),
)
raw_reply = user_data.get("reply", {})
@@ -133,7 +144,7 @@ class User:
text=raw_reply.get("text"),
rate_text=raw_reply.get("rate_text"),
rate=raw_reply.get("rate"),
reply_time_text=raw_reply.get("reply_time_text")
reply_time_text=raw_reply.get("reply_time_text"),
)
raw_presence = user_data.get("presence", {})
@@ -141,7 +152,7 @@ class User:
status=raw_presence.get("status"),
presence_text=raw_presence.get("presence_text"),
last_activity=raw_presence.get("last_activity"),
enabled=raw_presence.get("enabled")
enabled=raw_presence.get("enabled"),
)
badges = [
@@ -166,7 +177,7 @@ class User:
region_label=raw_pro_location.get("region_label"),
department=raw_pro_location.get("department"),
department_label=raw_pro_location.get("dpt_label"),
country=raw_pro_location.get("country")
country=raw_pro_location.get("country"),
)
raw_pro_rating = pro_data.get("rating", {})
@@ -175,7 +186,7 @@ class User:
author_name=review.get("author_name"),
rating_value=review.get("rating_value"),
text=review.get("text"),
review_time=review.get("review_time")
review_time=review.get("review_time"),
)
for review in raw_pro_rating.get("reviews", [])
]
@@ -187,12 +198,12 @@ class User:
source_display=raw_pro_rating.get("source_display"),
retrieval_time=raw_pro_rating.get("retrieval_time"),
url=raw_pro_rating.get("url"),
reviews=pro_rating_reviews
reviews=pro_rating_reviews,
)
pro_owner = pro_data.get("owner", {})
pro_brand = pro_data.get("brand", {})
pro_information = pro_data.get("information", {})
pro_information = pro_data.get("information", {})
pro = Pro(
online_store_id=pro_data.get("online_store_id"),
online_store_name=pro_data.get("online_store_name"),
@@ -210,7 +221,7 @@ class User:
description=pro_information.get("description"),
opening_hours=pro_information.get("opening_hours"),
website_url=pro_information.get("website_url"),
rating=pro_rating
rating=pro_rating,
)
return User(
@@ -227,9 +238,9 @@ class User:
store_id=user_data.get("store_id"),
account_type=user_data.get("account_type"),
description=user_data.get("description"),
pro=pro
pro=pro,
)
@property
def is_pro(self):
return self.account_type == "pro"
return self.account_type == "pro"

View File

@@ -1,25 +1,18 @@
from typing import Optional, Union, List
from .model import Category, AdType, OwnerType, Sort, Region, Department, City
from .exceptions import InvalidValue
def build_search_payload_with_url(
url: str,
limit: int = 35,
limit_alu: int = 3,
page: int = 1
url: str, limit: int = 35, limit_alu: int = 3, page: int = 1
):
def build_area(area_values: list[str]) -> dict:
area = {
"lat": float(area_values[0]),
"lng": float(area_values[1])
}
area = {"lat": float(area_values[0]), "lng": float(area_values[1])}
if len(area_values) >= 3:
area["default_radius"] = int(area_values[2])
if len(area_values) >= 4:
area["radius"] = int(area_values[3])
return area
payload = {
"filters": {},
"limit": limit,
@@ -27,71 +20,76 @@ def build_search_payload_with_url(
"offset": limit * (page - 1),
"disable_total": True,
"extend": True,
"listing_source": "direct-search" if page == 1 else "pagination"
}
"listing_source": "direct-search" if page == 1 else "pagination",
}
args: List[str] = url.split("?")[1].split("&")
args: list[str] = url.split("?")[1].split("&")
for arg in args:
key, value = arg.split("=") # e.g: real_estate_type 3,4 / square 300-400 / category 9
key, value = arg.split(
"="
) # e.g: real_estate_type 3,4 / square 300-400 / category 9
match key:
case "text":
payload["filters"]["keywords"] = {
"text": value
}
payload["filters"]["keywords"] = {"text": value}
case "category":
payload["filters"]["category"] = {
"id": value
}
payload["filters"]["category"] = {"id": value}
case "locations":
payload["filters"]["location"] = {
"locations": []
}
payload["filters"]["location"] = {"locations": []}
locations = value.split(",")
for location in locations:
location_parts = location.split("__") # City ['Paris', '48.86023250788424_2.339006433295173_9256'], Department ['d_69'], Region ['r_18'] or Place ['p_give a star if you like it!', '0.1234567891234_-0.1234567891234567_5000_5500']
location_parts = location.split(
"__"
) # City ['Paris', '48.86023250788424_2.339006433295173_9256'], Department ['d_69'], Region ['r_18'] or Place ['p_give a star if you like it!', '0.1234567891234_-0.1234567891234567_5000_5500']
prefix_parts = location_parts[0].split("_")
if len(prefix_parts[0]) == 1: # Department ['d', '1'], Region ['r', '1'], or Place ['p', 'give a star if you like it!']
location_id = prefix_parts[1] # Department '1', Region '1' or Place 'give a star if you like it!'
if (
len(prefix_parts[0]) == 1
): # Department ['d', '1'], Region ['r', '1'], or Place ['p', 'give a star if you like it!']
location_id = prefix_parts[
1
] # Department '1', Region '1' or Place 'give a star if you like it!'
match prefix_parts[0]:
case "d": # Department
case "d": # Department
payload["filters"]["location"]["locations"].append(
{
"locationType": "department",
"department_id": location_id
"department_id": location_id,
}
)
case "r": # Region
case "r": # Region
payload["filters"]["location"]["locations"].append(
{
"locationType": "region",
"region_id": location_id
}
)
case "p": # Place
area_values = location_parts[1].split("_") # lat, lng, default_radius, radius
{"locationType": "region", "region_id": location_id}
)
case "p": # Place
area_values = location_parts[1].split(
"_"
) # lat, lng, default_radius, radius
payload["filters"]["location"]["locations"].append(
{
"locationType": "place",
"place": location_id,
"label": location_id,
"area": build_area(area_values)
"area": build_area(area_values),
}
)
)
case _:
raise InvalidValue(f"Unknown location type: {prefix_parts[0]}")
else: # City
area_values = location_parts[1].split("_") # lat, lng, default_radius, radius
raise InvalidValue(
f"Unknown location type: {prefix_parts[0]}"
)
else: # City
area_values = location_parts[1].split(
"_"
) # lat, lng, default_radius, radius
payload["filters"]["location"]["locations"].append(
{
"locationType": "city",
#"city": location_parts[0],
"area": build_area(area_values)
# "city": location_parts[0],
"area": build_area(area_values),
}
)
@@ -108,12 +106,12 @@ def build_search_payload_with_url(
if value == "1":
payload["filters"]["location"]["shippable"] = True
case _:
if value in ["page"]: # Pass
case _:
if value in ["page"]: # Pass
continue
# Range or Enum
elif len(value.split("-")) == 2: # Range
elif len(value.split("-")) == 2: # Range
range_values = value.split("-", 1)
if len(range_values) == 2:
min_val, max_val = range_values
@@ -143,7 +141,7 @@ def build_search_payload_with_url(
if ranges:
payload["filters"]["ranges"][key] = ranges
else: # Enum
else: # Enum
if not payload["filters"].get("enums"):
payload["filters"]["enums"] = {}
@@ -151,48 +149,43 @@ def build_search_payload_with_url(
return payload
def build_search_payload_with_args(
text: Optional[str] = None,
text: str | None = None,
category: Category = Category.TOUTES_CATEGORIES,
sort: Sort = Sort.RELEVANCE,
locations: Optional[Union[List[Union[Region, Department, City]], Union[Region, Department, City]]] = None,
limit: int = 35,
limit_alu: int = 3,
page: int = 1,
locations: list[Region | Department | City]
| Region
| Department
| City
| None = None,
limit: int = 35,
limit_alu: int = 3,
page: int = 1,
ad_type: AdType = AdType.OFFER,
owner_type: Optional[OwnerType] = None,
shippable: Optional[bool] = False,
owner_type: OwnerType | None = None,
shippable: bool | None = False,
search_in_title_only: bool = False,
**kwargs
**kwargs,
) -> dict:
payload = {
"filters": {
"category": {
"id": category.value
},
"enums": {
"ad_type": [
ad_type.value
]
},
"keywords": {
"text": text
},
"location": {}
"category": {"id": category.value},
"enums": {"ad_type": [ad_type.value]},
"keywords": {"text": text},
"location": {},
},
"limit": limit,
"limit_alu": limit_alu,
"offset": limit * (page - 1),
"disable_total": True,
"extend": True,
"listing_source": "direct-search" if page == 1 else "pagination"
}
"listing_source": "direct-search" if page == 1 else "pagination",
}
# Text
if text:
payload["filters"]["keywords"] = {
"text": text
}
payload["filters"]["keywords"] = {"text": text}
# Owner Type
if owner_type:
@@ -203,30 +196,25 @@ def build_search_payload_with_args(
payload["sort_by"] = sort_by
if sort_order:
payload["sort_order"] = sort_order
# Location
if locations and not isinstance(locations, list):
locations = [locations]
if locations:
payload["filters"]["location"] = {
"locations": []
}
payload["filters"]["location"] = {"locations": []}
for location in locations:
match location:
case Region():
payload["filters"]["location"]["locations"].append(
{
"locationType": "region",
"region_id": location.value[0]
}
{"locationType": "region", "region_id": location.value[0]}
)
case Department():
payload["filters"]["location"]["locations"].append(
{
"locationType": "department",
"region_id": location.value[0],
"department_id": location.value[2]
"department_id": location.value[2],
}
)
case City():
@@ -235,15 +223,19 @@ def build_search_payload_with_args(
"area": {
"lat": location.lat,
"lng": location.lng,
"radius": location.radius
"radius": location.radius,
},
"city": location.city,
"label": f"{location.city} (toute la ville)" if location.city else None,
"locationType": "city"
"label": f"{location.city} (toute la ville)"
if location.city
else None,
"locationType": "city",
}
)
case _:
raise InvalidValue("The provided location is invalid. It must be an instance of Region, Department, or City.")
raise InvalidValue(
"The provided location is invalid. It must be an instance of Region, Department, or City."
)
# Search in title only
if text:
@@ -256,23 +248,24 @@ def build_search_payload_with_args(
if kwargs:
for key, value in kwargs.items():
if not isinstance(value, (list, tuple)):
raise InvalidValue(f"The value of '{key}' must be a list or a tuple.")
raise InvalidValue(f"The value of '{key}' must be a list or a tuple.")
# Range
if all(isinstance(x, int) for x in value):
if len(value) <= 1:
raise InvalidValue(f"The value of '{key}' must be a list or tuple with at least two elements.")
raise InvalidValue(
f"The value of '{key}' must be a list or tuple with at least two elements."
)
if not "ranges" in payload["filters"]:
if "ranges" not in payload["filters"]:
payload["filters"]["ranges"] = {}
payload["filters"]["ranges"][key] = {
"min": value[0],
"max": value[1]
}
payload["filters"]["ranges"][key] = {"min": value[0], "max": value[1]}
# Enum
elif all(isinstance(x, str) for x in value):
payload["filters"]["enums"][key] = value
else:
raise InvalidValue(f"The value of '{key}' must be a list or tuple containing only integers or only strings.")
return payload
raise InvalidValue(
f"The value of '{key}' must be a list or tuple containing only integers or only strings."
)
return payload