"""API Key service: proxied to sub2api. - Reads use the admin API key to call ``/admin/users/:id/api-keys``. - Writes (create/update/delete) require the user's own sub2api JWT; we fetch one via ``ensure_access_token`` using the stored refresh token. """ from __future__ import annotations from typing import Any from fastapi import HTTPException from sqlalchemy.ext.asyncio import AsyncSession from app.core.exceptions import BadRequestError from app.integrations.sub2api import admin as sub2api_admin from app.integrations.sub2api import user as sub2api_user from app.integrations.sub2api.client import ( Sub2APIError, Sub2APIReauthRequired, Sub2APITransportError, ) from app.models import User from app.services.sub2api_session import ensure_access_token def _require_sub2api_binding(user: User) -> int: if not user.sub2api_user_id: raise BadRequestError("账号未完成 sub2api 绑定,请重新登录") return user.sub2api_user_id def _translate_upstream(exc: Exception) -> HTTPException: if isinstance(exc, Sub2APIReauthRequired): return HTTPException(status_code=401, detail="sub2api_reauth_required") if isinstance(exc, Sub2APIError): status = exc.http_status or 502 if status < 400 or status >= 600: status = 502 return HTTPException(status_code=status, detail=exc.message or exc.reason or "upstream_error") return HTTPException(status_code=504, detail="upstream_timeout") class KeyService: @staticmethod async def list_keys( db: AsyncSession, user: User, *, page: int = 1, page_size: int = 20, sort_by: str = "created_at", sort_order: str = "desc", ) -> dict[str, Any]: uid = _require_sub2api_binding(user) try: return await sub2api_admin.list_user_api_keys( uid, page=page, page_size=page_size, sort_by=sort_by, sort_order=sort_order, ) except (Sub2APIError, Sub2APITransportError) as exc: raise _translate_upstream(exc) from exc @staticmethod async def create_key(db: AsyncSession, user: User, payload: dict[str, Any]) -> dict[str, Any]: _require_sub2api_binding(user) try: token = await ensure_access_token(db, user) return await sub2api_user.create_key(token, payload) except (Sub2APIError, Sub2APITransportError) as exc: raise _translate_upstream(exc) from exc @staticmethod async def update_key( db: AsyncSession, user: User, key_id: int, payload: dict[str, Any] ) -> dict[str, Any]: _require_sub2api_binding(user) try: token = await ensure_access_token(db, user) return await sub2api_user.update_key(token, key_id, payload) except (Sub2APIError, Sub2APITransportError) as exc: raise _translate_upstream(exc) from exc @staticmethod async def delete_key(db: AsyncSession, user: User, key_id: int) -> dict[str, Any]: _require_sub2api_binding(user) try: token = await ensure_access_token(db, user) return await sub2api_user.delete_key(token, key_id) except (Sub2APIError, Sub2APITransportError) as exc: raise _translate_upstream(exc) from exc @staticmethod async def get_key(db: AsyncSession, user: User, key_id: int) -> dict[str, Any]: _require_sub2api_binding(user) try: token = await ensure_access_token(db, user) return await sub2api_user.get_key(token, key_id) except (Sub2APIError, Sub2APITransportError) as exc: raise _translate_upstream(exc) from exc @staticmethod async def available_groups(db: AsyncSession, user: User) -> list[dict[str, Any]]: _require_sub2api_binding(user) try: token = await ensure_access_token(db, user) return await sub2api_user.list_available_groups(token) except (Sub2APIError, Sub2APITransportError) as exc: raise _translate_upstream(exc) from exc