85 lines
2.5 KiB
Python
85 lines
2.5 KiB
Python
import hashlib
|
|
import secrets
|
|
import uuid
|
|
|
|
from sqlalchemy import select, func
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.models import ApiKey
|
|
from app.core.exceptions import BadRequestError, NotFoundError
|
|
|
|
MAX_KEYS_PER_USER = 5
|
|
KEY_PREFIX = "sk-sd-"
|
|
|
|
|
|
class KeyService:
|
|
|
|
@staticmethod
|
|
def _generate_key() -> str:
|
|
return KEY_PREFIX + secrets.token_urlsafe(36)
|
|
|
|
@staticmethod
|
|
def _hash_key(raw_key: str) -> str:
|
|
return hashlib.sha256(raw_key.encode()).hexdigest()
|
|
|
|
@staticmethod
|
|
async def list_keys(db: AsyncSession, user_id: str) -> list:
|
|
result = await db.execute(
|
|
select(ApiKey)
|
|
.where(ApiKey.user_id == user_id, ApiKey.status == "active")
|
|
.order_by(ApiKey.created_at.desc())
|
|
)
|
|
return result.scalars().all()
|
|
|
|
@staticmethod
|
|
async def create_key(db: AsyncSession, user_id: str, name: str = "") -> dict:
|
|
# Check limit
|
|
count_result = await db.execute(
|
|
select(func.count()).select_from(ApiKey)
|
|
.where(ApiKey.user_id == user_id, ApiKey.status == "active")
|
|
)
|
|
count = count_result.scalar()
|
|
if count >= MAX_KEYS_PER_USER:
|
|
raise BadRequestError(f"最多创建 {MAX_KEYS_PER_USER} 个 Key")
|
|
|
|
raw_key = KeyService._generate_key()
|
|
key_hash = KeyService._hash_key(raw_key)
|
|
|
|
# prefix/suffix for masked display (after "sk-sd-")
|
|
body = raw_key[len(KEY_PREFIX):]
|
|
key_prefix = body[:4]
|
|
key_suffix = body[-4:]
|
|
|
|
api_key = ApiKey(
|
|
id=str(uuid.uuid4()),
|
|
user_id=user_id,
|
|
name=name,
|
|
key_hash=key_hash,
|
|
key_prefix=key_prefix,
|
|
key_suffix=key_suffix,
|
|
)
|
|
db.add(api_key)
|
|
await db.commit()
|
|
await db.refresh(api_key)
|
|
|
|
return {
|
|
"id": api_key.id,
|
|
"name": api_key.name,
|
|
"key": raw_key, # only returned once
|
|
"key_prefix": key_prefix,
|
|
"key_suffix": key_suffix,
|
|
"created_at": api_key.created_at,
|
|
}
|
|
|
|
@staticmethod
|
|
async def delete_key(db: AsyncSession, user_id: str, key_id: str) -> None:
|
|
result = await db.execute(
|
|
select(ApiKey).where(ApiKey.id == key_id, ApiKey.user_id == user_id)
|
|
)
|
|
api_key = result.scalar_one_or_none()
|
|
if not api_key:
|
|
raise NotFoundError("Key not found")
|
|
|
|
api_key.status = "revoked"
|
|
await db.commit()
|