from fastapi import FastAPI, Request, HTTPException from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, EmailStr, Field from typing import Optional, List import os, time, json from app.emailer import send_smtp_email, send_webhook from dotenv import load_dotenv load_dotenv() RATE_LIMIT_BUCKETS = {} # {ip: [timestamps]} RL_WINDOW_SEC = int(os.getenv("RL_WINDOW_SEC", "600")) # 10 min RL_MAX_REQ = int(os.getenv("RL_MAX_REQ", "5")) ALLOW_ORIGINS = [o.strip() for o in os.getenv("ALLOW_ORIGINS", "*").split(",")] SMTP_HOST = os.getenv("SMTP_HOST") SMTP_PORT = int(os.getenv("SMTP_PORT", "587")) SMTP_USER = os.getenv("SMTP_USER") SMTP_PASS = os.getenv("SMTP_PASS") MAIL_TO = os.getenv("MAIL_TO") # where to send MAIL_FROM = os.getenv("MAIL_FROM", SMTP_USER or "noreply@example.com") WEBHOOK_URL = os.getenv("WEBHOOK_URL") # alternative to SMTP # ------- Models class ContactIn(BaseModel): name: str = Field(min_length=2, max_length=120) email: EmailStr subject: str = Field(min_length=2, max_length=200) message: str = Field(min_length=5, max_length=5000) company: Optional[str] = None # honeypot _href: Optional[str] = None _ua: Optional[str] = None class ContactOut(BaseModel): ok: bool message: str # ------- App app = FastAPI(title="Contact API", version="1.0.0") app.add_middleware( CORSMiddleware, allow_origins=["*"] if ALLOW_ORIGINS == ["*"] else ALLOW_ORIGINS, allow_credentials=False, allow_methods=["POST", "GET", "OPTIONS"], allow_headers=["*"], ) # ------- Helpers def client_ip(req: Request) -> str: # trust proxy headers if present (Traefik/Nginx) xff = req.headers.get("x-forwarded-for") if xff: return xff.split(",")[0].strip() return req.client.host if req.client else "unknown" def rate_limit_check(ip: str): now = time.time() window_start = now - RL_WINDOW_SEC bucket = RATE_LIMIT_BUCKETS.get(ip, []) bucket = [t for t in bucket if t >= window_start] if len(bucket) >= RL_MAX_REQ: raise HTTPException(status_code=429, detail="Too many requests. Try again later.") bucket.append(now) RATE_LIMIT_BUCKETS[ip] = bucket # ------- Routes @app.get("/health") async def health(): return {"status": "ok"} @app.post("/api/contact", response_model=ContactOut) async def contact(req: Request, data: ContactIn): ip = client_ip(req) rate_limit_check(ip) # Prepare content meta = { "ip": ip, "href": data._href, "ua": data._ua, } subject = f"[Contact] {data.subject} — {data.name}" text_body = ( f"Name: {data.name}\n" f"Email: {data.email}\n" f"Subject: {data.subject}\n\n" f"{data.message}\n\n" f"--\nmeta: {json.dumps(meta, ensure_ascii=False)}" ) # Dispatch: SMTP preferred, else webhook delivered = False error_messages: List[str] = [] if SMTP_HOST and MAIL_TO: try: send_smtp_email( host=SMTP_HOST, port=SMTP_PORT, user=SMTP_USER, password=SMTP_PASS, mail_from=MAIL_FROM, mail_to=MAIL_TO, subject=subject, body=text_body, ) delivered = True except Exception as e: error_messages.append(f"smtp: {e}") if not delivered and WEBHOOK_URL: try: send_webhook(WEBHOOK_URL, { "subject": subject, "from": f"{data.name} <{data.email}>", "message": data.message, "meta": meta, }) delivered = True except Exception as e: error_messages.append(f"webhook: {e}") if not delivered: print("[WARN] Contact not delivered", {"errors": error_messages}) return ContactOut(ok=False, message="Sorry, something went wrong. Please try again later.") return ContactOut(ok=True, message="Thanks! I’ll reply soon.") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)