"""Low-level HTTP client for sub2api. Responsibilities: - Singleton httpx.AsyncClient with configured base URL and timeout. - Uniform envelope parsing: {code, message, reason, metadata, data}. - Error translation to domain exceptions. """ from __future__ import annotations import logging from typing import Any, Mapping import httpx from app.config.settings import settings logger = logging.getLogger(__name__) class Sub2APIError(Exception): """sub2api returned a non-zero envelope code or HTTP error.""" def __init__( self, code: int, message: str, reason: str = "", metadata: Mapping[str, str] | None = None, http_status: int | None = None, ) -> None: super().__init__(f"[{code}] {message}" + (f" ({reason})" if reason else "")) self.code = code self.message = message self.reason = reason self.metadata = dict(metadata) if metadata else {} self.http_status = http_status class Sub2APIReauthRequired(Sub2APIError): """User-level token invalid/expired; caller must re-authenticate with password.""" class Sub2APITransportError(Exception): """Network / timeout / invalid JSON etc.""" _client: httpx.AsyncClient | None = None def get_client() -> httpx.AsyncClient: global _client if _client is None or _client.is_closed: _client = httpx.AsyncClient( base_url=settings.sub2api_api_prefix, timeout=settings.sub2api_request_timeout, ) return _client async def close_client() -> None: global _client if _client is not None and not _client.is_closed: await _client.aclose() _client = None def _parse_envelope(resp: httpx.Response) -> Any: try: body = resp.json() except ValueError as exc: raise Sub2APITransportError( f"sub2api returned non-JSON body (status={resp.status_code}): {resp.text[:200]}" ) from exc code = body.get("code", resp.status_code) message = body.get("message", "") reason = body.get("reason", "") or "" metadata = body.get("metadata") or {} if code == 0: return body.get("data") # Token invalidation signals from sub2api admin middleware if reason in {"TOKEN_EXPIRED", "INVALID_TOKEN", "TOKEN_REVOKED", "USER_INACTIVE"}: raise Sub2APIReauthRequired(code, message, reason, metadata, resp.status_code) raise Sub2APIError(code, message, reason, metadata, resp.status_code) async def request( method: str, path: str, *, json: Any = None, params: Mapping[str, Any] | None = None, headers: Mapping[str, str] | None = None, ) -> Any: """Execute a sub2api call and return the unwrapped ``data`` field.""" client = get_client() try: resp = await client.request( method, path, json=json, params={k: v for k, v in (params or {}).items() if v is not None}, headers=headers, ) except httpx.TimeoutException as exc: raise Sub2APITransportError(f"sub2api timeout on {method} {path}") from exc except httpx.HTTPError as exc: raise Sub2APITransportError(f"sub2api transport error on {method} {path}: {exc}") from exc return _parse_envelope(resp) async def admin_request(method: str, path: str, **kwargs: Any) -> Any: """Issue a request authenticated with the admin API key.""" if not settings.sub2api_admin_token: raise RuntimeError("SD_SUB2API_ADMIN_TOKEN is not configured") headers = dict(kwargs.pop("headers", None) or {}) headers["x-api-key"] = settings.sub2api_admin_token return await request(method, path, headers=headers, **kwargs) async def user_request(access_token: str, method: str, path: str, **kwargs: Any) -> Any: """Issue a request authenticated with a user's JWT access token.""" if not access_token: raise Sub2APIReauthRequired(401, "missing user access token", "MISSING_TOKEN") headers = dict(kwargs.pop("headers", None) or {}) headers["Authorization"] = f"Bearer {access_token}" return await request(method, path, headers=headers, **kwargs)