| import asyncio |
| import aiohttp |
| import os, uuid |
| from collections import deque |
| import wave |
| import uuid |
| from pydub import AudioSegment |
|
|
|
|
| import wave |
| import struct |
|
|
|
|
| def concatenate_wave_files(input_file_paths, output_file_path): |
| """ |
| Concatenates multiple wave files and saves the result to a new file. |
| |
| :param input_file_paths: A list of paths to the input wave files. |
| :param output_file_path: The path to the output wave file. |
| """ |
| |
| if not input_file_paths: |
| raise ValueError("No input file paths provided.") |
|
|
| |
| if not output_file_path: |
| raise ValueError("Output file path is empty.") |
|
|
| |
| for input_file_path in input_file_paths: |
| if not input_file_path: |
| raise ValueError("Empty input file path found.") |
|
|
| |
| with wave.open(input_file_paths[0], "rb") as input_file: |
| n_channels = input_file.getnchannels() |
| sampwidth = input_file.getsampwidth() |
| framerate = input_file.getframerate() |
| comptype = input_file.getcomptype() |
| compname = input_file.getcompname() |
|
|
| |
| output_file = wave.open(output_file_path, "wb") |
| output_file.setnchannels(n_channels) |
| output_file.setsampwidth(sampwidth) |
| output_file.setframerate(framerate) |
| output_file.setcomptype(comptype, compname) |
|
|
| |
| for input_file_path in input_file_paths: |
| with wave.open(input_file_path, "rb") as input_file: |
| output_file.writeframes(input_file.readframes(input_file.getnframes())) |
|
|
| |
| output_file.close() |
|
|
| print( |
| f"Successfully concatenated {len(input_file_paths)} files into {output_file_path}" |
| ) |
|
|
|
|
| |
| |
| |
| |
|
|
|
|
| def concatenate_wav_files(input_files, file_directory): |
| print(input_files) |
| output_file = file_directory + str(uuid.uuid4()) + "final.wav" |
| |
| output = None |
| output_params = None |
|
|
| try: |
| |
| output = wave.open(output_file, "wb") |
|
|
| |
| for input_file in input_files: |
| with wave.open(input_file, "rb") as input_wav: |
| |
| if output_params is None: |
| output_params = input_wav.getparams() |
| output.setparams(output_params) |
| |
| else: |
| pass |
| |
| |
| |
| |
|
|
| |
| output.writeframes(input_wav.readframes(input_wav.getnframes())) |
| finally: |
| |
| if output is not None: |
| output.close() |
| return (output_file,) |
|
|
|
|
| class Speak: |
| def __init__(self, api_url="https://yakova-embedding.hf.space", dir="./tmp"): |
| self.api_url = api_url |
| self.dir = dir |
|
|
| async def _make_transcript(self, links, text): |
| data = {"audio_url": links, "text": text} |
| tries = 0 |
| max_retries = 10 |
| while tries < max_retries: |
| try: |
| response_data = await self._make_request( |
| "post", "descript_transcript", json=data |
| ) |
| if isinstance(response_data, dict): |
| return response_data |
| except Exception as e: |
| print(f"Attempt {tries + 1} failed: {e}") |
|
|
| tries += 1 |
| await asyncio.sleep(1) |
|
|
| raise Exception("Max retries reached. Unable to get a valid response.") |
|
|
| async def _make_request(self, method, endpoint, json=None): |
| async with aiohttp.ClientSession() as session: |
| async with getattr(session, method)( |
| f"{self.api_url}/{endpoint}", json=json |
| ) as response: |
| return await response.json() |
|
|
| async def say(self, text, speaker="Gabi"): |
| data = {"text": text, "speaker": speaker} |
|
|
| response_data = await self._make_request("post", "descript_tts", json=data) |
| tts_id = response_data["id"] |
|
|
| |
| while True: |
| status_data = await self._make_request( |
| "post", "descript_status", json={"id": tts_id} |
| ) |
| print(status_data) |
| if "status" in status_data: |
| if status_data["status"] == "done": |
| audio_url = status_data["url"] |
| temp = await self.download_file(audio_url) |
| return audio_url, temp |
| else: |
| pass |
|
|
| await asyncio.sleep(1) |
|
|
| async def download_file(self, url): |
| filename = str(uuid.uuid4()) + ".wav" |
| os.makedirs(self.dir, exist_ok=True) |
| save_path = os.path.join(self.dir, filename) |
| async with aiohttp.ClientSession() as session: |
| async with session.get(url) as response: |
| if response.status == 200: |
| with open(save_path, "wb") as file: |
| while True: |
| chunk = await response.content.read(1024) |
| if not chunk: |
| break |
| file.write(chunk) |
|
|
| return save_path |
|
|
| async def download_file_with_aria2c(self, url): |
| filename = str(uuid.uuid4()) + ".wav" |
| os.makedirs(self.dir, exist_ok=True) |
| save_path = os.path.join(self.dir, filename) |
| print(url) |
| command = f"aria2c {url} -o {save_path}" |
| process = await asyncio.create_subprocess_shell( |
| command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE |
| ) |
|
|
| stdout, stderr = await process.communicate() |
|
|
| if process.returncode == 0: |
| print(f"File downloaded successfully to {save_path}") |
| else: |
| print(f"Failed to download file. Error: {stderr.decode()}") |
| return save_path |
|
|
|
|
| async def process_narrations(narrations): |
| speak = Speak() |
| tasks = deque() |
| results = [] |
| files = [] |
|
|
| async def process_task(): |
| while tasks: |
| text = tasks.popleft() |
| result = await speak.say(text) |
| _, temp = result |
| results.append(result) |
| files.append(temp) |
|
|
| for narration in narrations: |
| tasks.append(narration) |
| if len(tasks) >= 2: |
| await asyncio.gather(*[process_task() for _ in range(2)]) |
|
|
| |
| await asyncio.gather(*[process_task() for _ in range(len(tasks))]) |
| concatinated_file = concatenate_wav_files(files, speak.dir) |
|
|
| audio_file = AudioSegment.from_file(concatinated_file, format="wav") |
| duration_in_seconds = int(len(audio_file) / 1000) |
|
|
| return results, [concatinated_file, duration_in_seconds] |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
|
|
|
|
| |
| |
| |
| |
|
|
|
|
| |
|
|