import hashlib import secrets import uuid from sqlalchemy import select, func from sqlalchemy.ext.asyncio import AsyncSession from app.models import ApiKey from app.core.exceptions import BadRequestError, NotFoundError MAX_KEYS_PER_USER = 5 KEY_PREFIX = "sk-sd-" class KeyService: @staticmethod def _generate_key() -> str: return KEY_PREFIX + secrets.token_urlsafe(36) @staticmethod def _hash_key(raw_key: str) -> str: return hashlib.sha256(raw_key.encode()).hexdigest() @staticmethod async def list_keys(db: AsyncSession, user_id: str) -> list: result = await db.execute( select(ApiKey) .where(ApiKey.user_id == user_id, ApiKey.status == "active") .order_by(ApiKey.created_at.desc()) ) return result.scalars().all() @staticmethod async def create_key(db: AsyncSession, user_id: str, name: str = "") -> dict: # Check limit count_result = await db.execute( select(func.count()).select_from(ApiKey) .where(ApiKey.user_id == user_id, ApiKey.status == "active") ) count = count_result.scalar() if count >= MAX_KEYS_PER_USER: raise BadRequestError(f"最多创建 {MAX_KEYS_PER_USER} 个 Key") raw_key = KeyService._generate_key() key_hash = KeyService._hash_key(raw_key) # prefix/suffix for masked display (after "sk-sd-") body = raw_key[len(KEY_PREFIX):] key_prefix = body[:4] key_suffix = body[-4:] api_key = ApiKey( id=str(uuid.uuid4()), user_id=user_id, name=name, key_hash=key_hash, key_prefix=key_prefix, key_suffix=key_suffix, ) db.add(api_key) await db.commit() await db.refresh(api_key) return { "id": api_key.id, "name": api_key.name, "key": raw_key, # only returned once "key_prefix": key_prefix, "key_suffix": key_suffix, "created_at": api_key.created_at, } @staticmethod async def delete_key(db: AsyncSession, user_id: str, key_id: str) -> None: result = await db.execute( select(ApiKey).where(ApiKey.id == key_id, ApiKey.user_id == user_id) ) api_key = result.scalar_one_or_none() if not api_key: raise NotFoundError("Key not found") api_key.status = "revoked" await db.commit()