refactor: project structure, move all source into /lbc-finder

This commit is contained in:
etienne-hd
2026-03-06 16:07:44 +01:00
parent 6764ebf631
commit fee4101455
9 changed files with 0 additions and 0 deletions

31
lbc-finder/config.py Normal file
View File

@@ -0,0 +1,31 @@
from model import Search, Parameters
import lbc
def handle(ad: lbc.Ad, search_name: str):
print(f"[{search_name}] New ads!")
print(f"Title : {ad.subject}")
print(f"Price : {ad.price}")
print(f"URL : {ad.url}")
print("-" * 40)
location = lbc.City(
lat=48.85994982004764,
lng=2.33801967847424,
radius=10_000, # 10 km
city="Paris"
)
CONFIG = [
Search(
name="Location Paris",
parameters=Parameters(
text="maison",
locations=[location],
category=lbc.Category.IMMOBILIER,
square=[200, 400],
price=[300_000, 700_000]
),
delay=60 * 5, # Check every 5 minutes
handler=handle
),
]

9
lbc-finder/main.py Normal file
View File

@@ -0,0 +1,9 @@
from searcher import Searcher
from config import CONFIG
def main() -> None:
searcher = Searcher(searches=CONFIG)
searcher.start()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,2 @@
from .search import Search
from .parameters import Parameters

View File

@@ -0,0 +1,24 @@
from typing import Optional, Union, List
from lbc import Category, Region, Department, City, OwnerType
from typing import overload
class Parameters:
@overload
def __init__(
self,
url: Optional[str] = None,
text: Optional[str] = None,
category: Category = Category.TOUTES_CATEGORIES,
locations: Optional[Union[List[Union[Region, Department, City]], Union[Region, Department, City]]] = None,
limit: int = 35,
limit_alu: int = 3,
page: int = 1,
owner_type: Optional[OwnerType] = None,
shippable: Optional[bool] = None,
search_in_title_only: bool = False,
**kwargs
): ...
def __init__(self, **kwargs):
self._kwargs = kwargs

View File

@@ -0,0 +1,12 @@
from lbc import Proxy, Ad
from .parameters import Parameters
from dataclasses import dataclass
from typing import Callable, Optional
@dataclass
class Search:
name: str
parameters: Parameters
delay: float
handler: Callable[[Ad, str], None]
proxy: Optional[Proxy] = None

View File

@@ -0,0 +1,2 @@
from .searcher import Searcher
from .logger import logger

39
lbc-finder/searcher/id.py Normal file
View File

@@ -0,0 +1,39 @@
from .logger import logger
from typing import List, Final
import os
import json
MAX_ID: Final[int] = 10_000
class ID:
def __init__(self):
self._ids: List[str] = self._get_ids()
@property
def ids(self) -> List[str]:
return self._ids
def _get_ids(self) -> List[str]:
ids: List[str] = []
if os.path.exists("id.json"):
with open("id.json", "r") as f:
try:
ids = json.load(f)
except json.JSONDecodeError:
os.remove("id.json")
except:
logger.exception("An error occurred while attempting to open the id.json file.")
return ids
def contains(self, id_: str) -> bool:
return id_ in self._ids
def add(self, id_: str) -> bool:
if not id_ in self._ids:
self._ids.append(id_)
with open("id.json", "w") as f:
json.dump(self._ids[-MAX_ID:], f, indent=3)
self._ids = self._ids[-MAX_ID:]
return True
return False

View File

@@ -0,0 +1,23 @@
import logging
import os
from datetime import datetime
# File management
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
file_path: str = os.path.join("logs", f"log_{timestamp}.log")
os.makedirs("logs", exist_ok=True)
# Config logging
logger = logging.getLogger("lbc-finder")
formatter = logging.Formatter('[%(asctime)s] [%(levelname)s] [%(threadName)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)
logger.setLevel(logging.INFO)
# Log File
file_handler = logging.FileHandler(file_path, mode='w', encoding='utf-8')
file_handler.setLevel(logging.WARNING)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)

View File

@@ -0,0 +1,71 @@
from model import Search
from lbc import Client, Sort
from .id import ID
from .logger import logger
import time
import threading
from typing import List, Union
class Searcher:
def __init__(self, searches: Union[List[Search], Search], request_verify: bool = True,
handler_max_attempts: int = 3, handler_initial_backoff: float = 2.0):
self._searches: List[Search] = searches if isinstance(searches, list) else [searches]
self._request_verify = request_verify
self._handler_max_attempts = handler_max_attempts
self._handler_initial_backoff = handler_initial_backoff
self._id = ID()
def _handle_with_retry(self, search: Search, ad) -> bool:
for attempt in range(1, self._handler_max_attempts + 1):
try:
search.handler(ad, search.name)
return True
except Exception:
if attempt == self._handler_max_attempts:
logger.exception(
f"[{search.name}] Handler failed for ad {ad.id} after {attempt} attempts."
)
return False
delay = self._handler_initial_backoff * (2 ** (attempt - 1))
logger.warning(
f"[{search.name}] Handler failed for ad {ad.id}. "
f"Retrying in {delay:.0f}s ({attempt}/{self._handler_max_attempts})."
)
time.sleep(delay)
return False
def _search(self, search: Search) -> None:
client = Client(proxy=search.proxy, request_verify=self._request_verify)
while True:
before = time.time()
try:
response = client.search(**search.parameters._kwargs, sort=Sort.NEWEST)
logger.debug(f"Successfully found {response.total} ad{'s' if response.total > 1 else ''}.")
ads = [ad for ad in response.ads if not self._id.contains(ad.id)]
if len(ads):
logger.info(f"Successfully found {len(ads)} new ad{'s' if len(ads) > 1 else ''}!")
notified = 0
for ad in ads:
if self._handle_with_retry(search, ad) and self._id.add(ad.id):
notified += 1
if len(ads) and notified != len(ads):
logger.warning(
f"[{search.name}] {len(ads) - notified} ad{'s were' if len(ads) - notified > 1 else ' was'} not marked as seen and will be retried."
)
except Exception:
logger.exception(f"An error occured.")
time.sleep(search.delay - (time.time() - before) if search.delay - (time.time() - before) > 0 else 0)
def start(self) -> bool:
if not len(self._searches):
logger.warning("No search rules have been set. Please create search rules in config.py (see example in README.md).")
return False
for search in self._searches:
threading.Thread(target=self._search, args=(search,), name=search.name).start()
time.sleep(5) # Add latency between each thread to prevent spam
return True