| import asyncio
|
| from datetime import datetime
|
| import aiohttp
|
| import pickle
|
|
|
| import pandas as pd
|
|
|
| from utils import normalize_audio_loudness
|
|
|
| import os
|
| from dotenv import load_dotenv
|
| from pymongo import MongoClient
|
| from bson import Binary, ObjectId
|
| import zlib
|
|
|
|
|
| BASE_URL = os.environ.get("BASE_URL")
|
| AUDIO_URL = os.environ.get("AUDIO_URL")
|
| MONGO_URI = os.environ.get("MONGO_URI")
|
| DATABASE_NAME = os.environ.get("DATABASE_NAME")
|
| COLLECTION_NAME = os.environ.get("COLLECTION_NAME")
|
| CREATE_COLLECTION = os.environ.get("CREATE_COLLECTION")
|
|
|
|
|
| if BASE_URL is None or AUDIO_URL is None or MONGO_URI is None or DATABASE_NAME is None or COLLECTION_NAME is None or CREATE_COLLECTION is None:
|
| print("从.env文件加载环境变量")
|
| load_dotenv()
|
| BASE_URL = os.getenv("BASE_URL")
|
| AUDIO_URL = os.getenv("AUDIO_URL")
|
| MONGO_URI = os.getenv("MONGO_URI")
|
| DATABASE_NAME = os.getenv("DATABASE_NAME")
|
| COLLECTION_NAME = os.getenv("COLLECTION_NAME")
|
| CREATE_COLLECTION = os.getenv("CREATE_COLLECTION")
|
|
|
| client = MongoClient(MONGO_URI)
|
| db = client[DATABASE_NAME]
|
| collection = db[COLLECTION_NAME]
|
| create_collection = db[CREATE_COLLECTION]
|
|
|
| async def generate_api(voice_ids, text):
|
| timeout = aiohttp.ClientTimeout(total=10)
|
| try:
|
| async with aiohttp.ClientSession(timeout=timeout) as session:
|
| async with session.post(BASE_URL+"tts", json={"ids": voice_ids, "text": text}) as response:
|
| if response.status == 200:
|
|
|
| audio_data = await response.read()
|
|
|
| audio_data = normalize_audio_loudness(audio_data)
|
| return audio_data
|
| else:
|
| print(response)
|
| return f"合成失败: {response.status}"
|
| except asyncio.TimeoutError:
|
| return "请求超时,请稍后重试"
|
| except aiohttp.ClientError as e:
|
| return f"网络错误: {str(e)}"
|
|
|
| async def get_audio(voice_id):
|
| url = AUDIO_URL + voice_id + ".ogg"
|
| try:
|
| async with aiohttp.ClientSession() as session:
|
| async with session.get(url) as response:
|
| if response.status == 200:
|
| return await response.read()
|
| else:
|
| return f"获取音频失败: {response.status}"
|
| except asyncio.TimeoutError:
|
| return "请求超时,请稍后重试"
|
| except aiohttp.ClientError as e:
|
| return f"网络错误: {str(e)}"
|
|
|
| def load_characters_csv(lang):
|
|
|
| cursor = collection.find({"language": lang, "is_public": True})
|
|
|
|
|
| data = list(cursor)
|
|
|
|
|
| df = pd.DataFrame(columns=["类别", "id", "名称", "情绪", "头像", "voice_id"])
|
|
|
|
|
| for item in data:
|
| df = pd.concat([df, pd.DataFrame({
|
| "类别": [item["category"]],
|
| "id": [str(item["id"])],
|
| "名称": [item["name"]],
|
| "情绪": [item["emotion"]],
|
| "头像": [item["avatar"]],
|
| "voice_id": [item["voice_id"]]
|
| })], ignore_index=True)
|
|
|
| 指定顺序 = {
|
| "zh": ["原神", "崩坏星穹铁道", "绝区零", "鸣潮"],
|
| "en": ["Genshin Impact", "Honkai: Star Rail", "Zenless Zone Zero", "Wuthering Waves"],
|
| "ja": ["原神[げんしん", "崩壊:スターレイル", "ゼンレスゾーンゼロ", "Wuthering Waves"],
|
| "ko": ["원신", "붕괴: 스타레일", "젠레스 존 제로", "Wuthering Waves"]
|
| }
|
| 当前语言顺序 = 指定顺序.get(lang, 指定顺序["en"])
|
| 其他类别 = sorted(set(df['类别'].unique()) - set(当前语言顺序))
|
| unique_categories = 当前语言顺序 + 其他类别
|
|
|
| return df, unique_categories
|
|
|
|
|
|
|
| async def generate_voice(avatar, name, emotion, tags, gender, audio_data, language):
|
|
|
| avatar_binary = zlib.compress(pickle.dumps(avatar))
|
|
|
| audio_binary = zlib.compress(pickle.dumps(audio_data))
|
|
|
|
|
| voice = {
|
| "avatar": Binary(avatar_binary),
|
| "name": name,
|
| "emotion": emotion,
|
| "tags": tags,
|
| "gender": gender,
|
| "audio_data": Binary(audio_binary),
|
| "language": language,
|
| "create_at": datetime.now().isoformat(),
|
| "is_public": False,
|
| "is_reviewed": False
|
| }
|
| result = create_collection.insert_one(voice)
|
| return result.inserted_id
|
|
|