diff --git a/audiobook_generator/tts_providers/edge_tts_provider.py b/audiobook_generator/tts_providers/edge_tts_provider.py index ab67ad2..9211f36 100644 --- a/audiobook_generator/tts_providers/edge_tts_provider.py +++ b/audiobook_generator/tts_providers/edge_tts_provider.py @@ -3,8 +3,9 @@ import math import io -from edge_tts import Communicate, list_voices -from typing import Union, Optional +import edge_tts +from edge_tts import list_voices +from typing import Union from pydub import AudioSegment from audiobook_generator.config.general_config import GeneralConfig @@ -34,7 +35,7 @@ async def get_supported_voices(): # Credit: https://gist.github.com/moha-abdi/8ddbcb206c38f592c65ada1e5479f2bf # @phuchoang2603 contributed pause support in https://github.com/p0n1/epub_to_audiobook/pull/45 -class CommWithPauses(Communicate): +class CommWithPauses: # This class uses edge_tts to generate text # but with pauses for example:- text: 'Hello # this is simple text. [pause: 1000] Paused 1000ms' @@ -46,27 +47,30 @@ def __init__( break_duration: int = 1250, **kwargs, ) -> None: - super().__init__(text, voice_name, **kwargs) + self.full_text = text + self.voice_name = voice_name self.break_string = break_string self.break_duration = int(break_duration) + self.parsed = self.parse_text() self.file = io.BytesIO() def parse_text(self): logger.debug( - f"Parsing the text, looking for break/pauses in text: <{self.text}>" + f"Parsing the text, looking for break/pauses in text: <{self.full_text}>" ) - if self.break_string not in self.text: + if self.break_string not in self.full_text: logger.debug(f"No break/pauses found in the text") - return [self.text] + return [self.full_text] - parts = self.text.split(self.break_string) + parts = self.full_text.split(self.break_string) logger.debug(f"split into <{len(parts)}> parts: {parts}") return parts async def chunkify(self): logger.debug(f"Chunkifying the text") for content in self.parsed: + logger.debug(f"content from parsed: <{content}>") audio_bytes = await self.generate_audio(content) self.file.write(audio_bytes) if content != self.parsed[-1] and self.break_duration > 0: @@ -85,8 +89,8 @@ async def generate_audio(self, text: str) -> bytes: logger.debug(f"Generating audio for: <{text}>") # this genertes the real TTS using edge_tts for this part. temp_chunk = io.BytesIO() - self.text = text - async for chunk in self.stream(): + communicate = edge_tts.Communicate(text, self.voice_name) + async for chunk in communicate.stream(): if chunk["type"] == "audio": temp_chunk.write(chunk["data"]) @@ -106,9 +110,7 @@ async def generate_audio(self, text: str) -> bytes: async def save( self, audio_fname: Union[str, bytes], - metadata_fname: Optional[Union[str, bytes]] = None, ) -> None: - # Save the audio and metadata to the specified files. await self.chunkify() self.file.seek(0)