Files
superDreamFront/app/api/v1/keys.py
xuyong 35c0b7de16 integrate sub2api as upstream for auth/keys/usage via FastAPI BFF
Preserve local user table for superDream-specific features while syncing
user lifecycle, API key CRUD and usage queries through sub2api. Admin token
handles reads and user lifecycle; per-user tokens (Fernet-encrypted in DB)
handle key writes that admin endpoints do not expose.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-17 21:23:08 +08:00

74 lines
2.2 KiB
Python

from typing import Any, Dict, List
from fastapi import APIRouter, Depends, Query
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_db
from app.core.dependencies import get_current_user
from app.datamodels.schemas import CreateKeyRequest, MessageResponse, UpdateKeyRequest
from app.models import User
from app.services.key_service import KeyService
router = APIRouter(prefix="/keys", tags=["keys"])
@router.get("")
async def list_keys(
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
sort_by: str = Query("created_at"),
sort_order: str = Query("desc"),
user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> Dict[str, Any]:
return await KeyService.list_keys(
db, user, page=page, page_size=page_size, sort_by=sort_by, sort_order=sort_order
)
@router.get("/meta/available-groups")
async def available_groups(
user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> List[Dict[str, Any]]:
return await KeyService.available_groups(db, user)
@router.get("/{key_id}")
async def get_key(
key_id: int,
user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> Dict[str, Any]:
return await KeyService.get_key(db, user, key_id)
@router.post("")
async def create_key(
body: CreateKeyRequest,
user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> Dict[str, Any]:
payload = body.model_dump(exclude_none=True)
return await KeyService.create_key(db, user, payload)
@router.put("/{key_id}")
async def update_key(
key_id: int,
body: UpdateKeyRequest,
user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> Dict[str, Any]:
payload = body.model_dump(exclude_none=True)
return await KeyService.update_key(db, user, key_id, payload)
@router.delete("/{key_id}", response_model=MessageResponse)
async def delete_key(
key_id: int,
user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
await KeyService.delete_key(db, user, key_id)
return {"message": "API key deleted successfully"}