134 lines
4.0 KiB
Python
134 lines
4.0 KiB
Python
from fastapi import FastAPI, Request, HTTPException
|
||
from fastapi.middleware.cors import CORSMiddleware
|
||
from pydantic import BaseModel, EmailStr, Field
|
||
from typing import Optional, List
|
||
import os, time, json
|
||
from app.emailer import send_smtp_email, send_webhook
|
||
from dotenv import load_dotenv
|
||
load_dotenv()
|
||
|
||
RATE_LIMIT_BUCKETS = {} # {ip: [timestamps]}
|
||
RL_WINDOW_SEC = int(os.getenv("RL_WINDOW_SEC", "600")) # 10 min
|
||
RL_MAX_REQ = int(os.getenv("RL_MAX_REQ", "5"))
|
||
|
||
ALLOW_ORIGINS = [o.strip() for o in os.getenv("ALLOW_ORIGINS", "*").split(",")]
|
||
SMTP_HOST = os.getenv("SMTP_HOST")
|
||
SMTP_PORT = int(os.getenv("SMTP_PORT", "587"))
|
||
SMTP_USER = os.getenv("SMTP_USER")
|
||
SMTP_PASS = os.getenv("SMTP_PASS")
|
||
MAIL_TO = os.getenv("MAIL_TO") # where to send
|
||
MAIL_FROM = os.getenv("MAIL_FROM", SMTP_USER or "noreply@example.com")
|
||
|
||
WEBHOOK_URL = os.getenv("WEBHOOK_URL") # alternative to SMTP
|
||
|
||
# ------- Models
|
||
class ContactIn(BaseModel):
|
||
name: str = Field(min_length=2, max_length=120)
|
||
email: EmailStr
|
||
subject: str = Field(min_length=2, max_length=200)
|
||
message: str = Field(min_length=5, max_length=5000)
|
||
company: Optional[str] = None # honeypot
|
||
_href: Optional[str] = None
|
||
_ua: Optional[str] = None
|
||
|
||
class ContactOut(BaseModel):
|
||
ok: bool
|
||
message: str
|
||
|
||
# ------- App
|
||
app = FastAPI(title="Contact API", version="1.0.0")
|
||
|
||
app.add_middleware(
|
||
CORSMiddleware,
|
||
allow_origins=["*"] if ALLOW_ORIGINS == ["*"] else ALLOW_ORIGINS,
|
||
allow_credentials=False,
|
||
allow_methods=["POST", "GET", "OPTIONS"],
|
||
allow_headers=["*"],
|
||
)
|
||
|
||
# ------- Helpers
|
||
def client_ip(req: Request) -> str:
|
||
# trust proxy headers if present (Traefik/Nginx)
|
||
xff = req.headers.get("x-forwarded-for")
|
||
if xff:
|
||
return xff.split(",")[0].strip()
|
||
return req.client.host if req.client else "unknown"
|
||
|
||
def rate_limit_check(ip: str):
|
||
now = time.time()
|
||
window_start = now - RL_WINDOW_SEC
|
||
bucket = RATE_LIMIT_BUCKETS.get(ip, [])
|
||
bucket = [t for t in bucket if t >= window_start]
|
||
if len(bucket) >= RL_MAX_REQ:
|
||
raise HTTPException(status_code=429, detail="Too many requests. Try again later.")
|
||
bucket.append(now)
|
||
RATE_LIMIT_BUCKETS[ip] = bucket
|
||
|
||
# ------- Routes
|
||
@app.get("/health")
|
||
async def health():
|
||
return {"status": "ok"}
|
||
|
||
@app.post("/api/contact", response_model=ContactOut)
|
||
async def contact(req: Request, data: ContactIn):
|
||
ip = client_ip(req)
|
||
rate_limit_check(ip)
|
||
|
||
# Prepare content
|
||
meta = {
|
||
"ip": ip,
|
||
"href": data._href,
|
||
"ua": data._ua,
|
||
}
|
||
subject = f"[Contact] {data.subject} — {data.name}"
|
||
text_body = (
|
||
f"Name: {data.name}\n"
|
||
f"Email: {data.email}\n"
|
||
f"Subject: {data.subject}\n\n"
|
||
f"{data.message}\n\n"
|
||
f"--\nmeta: {json.dumps(meta, ensure_ascii=False)}"
|
||
)
|
||
|
||
# Dispatch: SMTP preferred, else webhook
|
||
delivered = False
|
||
error_messages: List[str] = []
|
||
|
||
if SMTP_HOST and MAIL_TO:
|
||
try:
|
||
send_smtp_email(
|
||
host=SMTP_HOST,
|
||
port=SMTP_PORT,
|
||
user=SMTP_USER,
|
||
password=SMTP_PASS,
|
||
mail_from=MAIL_FROM,
|
||
mail_to=MAIL_TO,
|
||
subject=subject,
|
||
body=text_body,
|
||
)
|
||
delivered = True
|
||
except Exception as e:
|
||
error_messages.append(f"smtp: {e}")
|
||
|
||
if not delivered and WEBHOOK_URL:
|
||
try:
|
||
send_webhook(WEBHOOK_URL, {
|
||
"subject": subject,
|
||
"from": f"{data.name} <{data.email}>",
|
||
"message": data.message,
|
||
"meta": meta,
|
||
})
|
||
delivered = True
|
||
except Exception as e:
|
||
error_messages.append(f"webhook: {e}")
|
||
|
||
if not delivered:
|
||
print("[WARN] Contact not delivered", {"errors": error_messages})
|
||
return ContactOut(ok=False, message="Sorry, something went wrong. Please try again later.")
|
||
|
||
return ContactOut(ok=True, message="Thanks! I’ll reply soon.")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
import uvicorn
|
||
uvicorn.run(app, host="0.0.0.0", port=8000)
|