Skip to content

Commit

Permalink
fix(audioplayer): close writer buffer, catch exceptions in threads (#84)
Browse files Browse the repository at this point in the history
  • Loading branch information
maxime1907 authored Jan 29, 2025
1 parent c77028a commit 734e55d
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 85 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.8.0
1.8.1
8 changes: 6 additions & 2 deletions config/triggers.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
[
{"names": ["!tuturu"], "sound": "Tutturuu_v1.wav"},
{"names": ["!tuturulist"], "sound": ["Tutturuu_v1.wav", "Tutturuu_v1.wav"]}
{"names": ["!tuturu"], "sound": "Tutturuu_v1.wav"},
{"names": ["!tuturulist"], "sound": ["Tutturuu_v1.wav", "Tutturuu_v1.wav"]},
{
"names": ["!china"],
"sound": ["china1.mp3", "china2.mp3"]
}
]
Binary file added sounds/china1.mp3
Binary file not shown.
Binary file added sounds/china2.mp3
Binary file not shown.
7 changes: 3 additions & 4 deletions src/torchlight/Commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -762,14 +762,13 @@ async def _func(self, message: list[str], player: Player) -> int:

real_time = get_url_real_time(url=input_url)

if "parameters" in command_config and "proxy" in command_config["parameters"]:
proxy = command_config["parameters"]["proxy"]
proxy = command_config.get("parameters", {}).get("proxy", "")

try:
info = get_url_youtube_info(url=input_url, proxy=proxy)
except Exception as e:
except Exception as exc:
self.logger.error(f"Failed to extract youtube info from: {input_url}")
self.logger.error(e)
self.logger.error(exc)
self.torchlight.SayPrivate(
player,
"An error as occured while trying to retrieve youtube metadata.",
Expand Down
205 changes: 128 additions & 77 deletions src/torchlight/FFmpegAudioPlayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ def __init__(self, torchlight: Torchlight) -> None:
self.torchlight = torchlight
self.config = self.torchlight.config["VoiceServer"]
self.playing = False
self.uri = ""
self.position: int = 0

self.host = self.config["Host"]
self.port = self.config["Port"]
self.sample_rate = float(self.config["SampleRate"])
self.volume = float(self.config["Volume"])
self.proxy = self.config["Proxy"]
self.proxy = self.config.get("Proxy", "")

self.started_playing: float | None = None
self.stopped_playing: float | None = None
Expand All @@ -47,7 +48,21 @@ def __del__(self) -> None:

# @profile
def PlayURI(self, uri: str, position: int | None, *args: Any) -> bool:
curl_command = ["/usr/bin/curl", "-L", uri]
curl_command = [
"/usr/bin/curl",
"--silent",
"--show-error",
"--connect-timeout",
"1",
"--retry",
"2",
"--retry-delay",
"1",
"--output",
"-",
"-L",
uri,
]
if self.proxy:
curl_command.extend(
[
Expand Down Expand Up @@ -88,6 +103,10 @@ def PlayURI(self, uri: str, position: int | None, *args: Any) -> bool:
self.logger.debug(ffmpeg_command)

self.playing = True
self.uri = uri

self.logger.info("Playing %s", self.uri)

asyncio.ensure_future(self._stream_subprocess(curl_command, ffmpeg_command))
return True

Expand All @@ -100,29 +119,32 @@ def Stop(self, force: bool = True) -> bool:
try:
self.ffmpeg_process.terminate()
self.ffmpeg_process.kill()
self.ffmpeg_process = None
except ProcessLookupError as exc:
self.logger.debug(exc)
pass
self.ffmpeg_process = None

if self.curl_process:
try:
self.curl_process.terminate()
self.curl_process.kill()
self.curl_process = None
except ProcessLookupError as exc:
self.logger.debug(exc)
pass
self.curl_process = None

if self.writer:
if force:
writer_socket = self.writer.transport.get_extra_info("socket")
if writer_socket:
writer_socket.setsockopt(
socket.SOL_SOCKET,
socket.SO_LINGER,
struct.pack("ii", 1, 0),
)
try:
writer_socket.setsockopt(
socket.SOL_SOCKET,
socket.SO_LINGER,
struct.pack("ii", 1, 0),
)
except OSError as exc:
# Errno 9: Bad file descriptor
if exc.errno == 9:
self.logger.error("Unable to setsockopt: %s", exc)

self.writer.transport.abort()

Expand All @@ -135,9 +157,13 @@ def Stop(self, force: bool = True) -> bool:
loop.run_until_complete(self.writer.wait_closed())
except Exception as exc:
self.logger.warning(exc)
pass

self.writer = None

self.logger.info("Stopped %s", self.uri)

self.playing = False
self.uri = ""

self.Callback("Stop")
del self.callbacks
Expand All @@ -164,105 +190,130 @@ def Callback(self, cbtype: str, *args: Any, **kwargs: Any) -> None:

# @profile
async def _updater(self) -> None:
last_seconds_elapsed = 0.0
try:
last_seconds_elapsed = 0.0

while self.playing:
seconds_elapsed = 0.0
while self.playing:
seconds_elapsed = 0.0

if self.started_playing:
seconds_elapsed = time.time() - self.started_playing
if self.started_playing:
seconds_elapsed = time.time() - self.started_playing

if seconds_elapsed > self.seconds:
seconds_elapsed = self.seconds
if seconds_elapsed > self.seconds:
seconds_elapsed = self.seconds

self.Callback("Update", last_seconds_elapsed, seconds_elapsed)
self.Callback("Update", last_seconds_elapsed, seconds_elapsed)

if seconds_elapsed >= self.seconds:
if not self.stopped_playing:
self.logger.debug("BUFFER UNDERRUN!")
self.Stop(False)
return
if seconds_elapsed >= self.seconds:
if not self.stopped_playing:
self.logger.debug("BUFFER UNDERRUN!")
self.Stop(False)
return

last_seconds_elapsed = seconds_elapsed
last_seconds_elapsed = seconds_elapsed

await asyncio.sleep(0.1)
await asyncio.sleep(0.1)
except Exception as exc:
self.Stop()
self.torchlight.SayChat(f"Error: {str(exc)}")
raise exc

# @profile
async def _read_stream(self, stream: StreamReader | None, writer: StreamWriter) -> None:
started = False
try:
started = False

while stream and self.playing:
data = await stream.read(65536)
if not data:
break
while stream and self.playing:
data = await stream.read(65536)
if not data:
break

writer.write(data)
await writer.drain()
if writer is not None:
writer.write(data)
await writer.drain()

bytes_len = len(data)
samples = bytes_len / SAMPLEBYTES
seconds = samples / self.sample_rate
bytes_len = len(data)
samples = bytes_len / SAMPLEBYTES
seconds = samples / self.sample_rate

self.seconds += seconds
self.seconds += seconds

if not started:
started = True
self.Callback("Play")
self.started_playing = time.time()
asyncio.ensure_future(self._updater())
if not started:
self.logger.info("Streaming %s", self.uri)
started = True
self.Callback("Play")
self.started_playing = time.time()
asyncio.ensure_future(self._updater())

self.stopped_playing = time.time()
self.stopped_playing = time.time()
except Exception as exc:
self.Stop()
self.torchlight.SayChat(f"Error: {str(exc)}")
raise exc

# @profile
async def _stream_subprocess(self, curl_command: list[str], ffmpeg_command: list[str]) -> None:
if not self.playing:
return

_, self.writer = await asyncio.open_connection(self.host, self.port)
try:
_, self.writer = await asyncio.open_connection(self.host, self.port)

self.ffmpeg_process = await asyncio.create_subprocess_exec(
*ffmpeg_command,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
self.curl_process = await asyncio.create_subprocess_exec(
*curl_command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL,
)

self.curl_process = await asyncio.create_subprocess_exec(
*curl_command,
stdout=asyncio.subprocess.PIPE,
)
self.ffmpeg_process = await asyncio.create_subprocess_exec(
*ffmpeg_command,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL,
)

asyncio.create_task(self._wait_for_process_exit(self.curl_process))
asyncio.create_task(self._wait_for_process_exit(self.curl_process))

asyncio.create_task(self._write_stream(self.curl_process.stdout, self.ffmpeg_process.stdin))
asyncio.create_task(self._write_stream(self.curl_process.stdout, self.ffmpeg_process.stdin))

await self._read_stream(self.ffmpeg_process.stdout, self.writer)
asyncio.create_task(self._read_stream(self.ffmpeg_process.stdout, self.writer))

if self.ffmpeg_process is not None:
if self.ffmpeg_process.stdin:
self.ffmpeg_process.stdin.close()
await self.ffmpeg_process.wait()
if self.ffmpeg_process is not None:
await self.ffmpeg_process.wait()

self.writer.close()
await self.writer.wait_closed()
if self.seconds == 0.0:
self.Stop()

if self.seconds == 0.0:
except Exception as exc:
self.Stop()
self.torchlight.SayChat(f"Error: {str(exc)}")
raise exc

async def _write_stream(self, stream: StreamReader | None, writer: StreamWriter | None) -> None:
while True:
if not stream:
break
chunk = await stream.read(65536)
if not chunk:
break

try:
while True:
if not stream:
break
chunk = await stream.read(65536)
if not chunk:
break

if writer:
writer.write(chunk)
await writer.drain()
if writer:
writer.write(chunk)
await writer.drain()
writer.close()
except Exception as exc:
self.Stop()
self.torchlight.SayChat(f"Error: {str(exc)}")
raise exc

async def _wait_for_process_exit(self, curl_process: Process) -> None:
await curl_process.wait()
if curl_process.returncode != 0:
self.logger.error(f"Curl process exited with error code {curl_process.returncode}")
try:
await curl_process.wait()
if curl_process.returncode != 0 and curl_process.returncode != -15:
raise Exception(f"Curl process exited with error code {curl_process.returncode}")
except Exception as exc:
self.Stop()
self.torchlight.SayChat(f"Error: {str(exc)}")
raise exc
3 changes: 2 additions & 1 deletion src/torchlight/URLInfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,9 @@ def get_url_youtube_info(url: str, proxy: str = "") -> dict:
"format": "m4a/bestaudio/best",
"simulate": True,
"keepvideo": False,
"proxy": proxy,
}
if proxy:
ydl_opts["proxy"] = proxy
ydl = yt_dlp.YoutubeDL(ydl_opts)
ydl.add_default_info_extractors()
return ydl.extract_info(url, download=False)
Expand Down

0 comments on commit 734e55d

Please sign in to comment.