Skip to content

Commit

Permalink
Refactor streaming (#8438)
Browse files Browse the repository at this point in the history
  • Loading branch information
egbertbouman authored Feb 13, 2025
2 parents aab459b + f4743fd commit b2788eb
Show file tree
Hide file tree
Showing 19 changed files with 2,852 additions and 875 deletions.
2 changes: 1 addition & 1 deletion src/tribler/core/libtorrent/download_manager/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -811,7 +811,7 @@ def stop(self, user_stopped: bool | None = None) -> Awaitable[None]:
"""
self._logger.debug("Stopping %s", self.tdef.get_name())
if self.stream is not None:
self.stream.disable()
self.stream.close()
if user_stopped is not None:
self.config.set_user_stopped(user_stopped)
if self.handle and self.handle.is_valid():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -872,7 +872,7 @@ async def remove_download(self, download: Download, remove_content: bool = False
if handle:
if handle.is_valid():
if download.stream is not None:
download.stream.disable()
download.stream.close()
logger.debug("Removing handle %s", hexlify(infohash))
(await self.get_session(download.config.get_hops())).remove_torrent(handle, int(remove_content))
else:
Expand Down
706 changes: 201 additions & 505 deletions src/tribler/core/libtorrent/download_manager/stream.py

Large diffs are not rendered by default.

26 changes: 7 additions & 19 deletions src/tribler/core/libtorrent/restapi/downloads_endpoint.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

import mimetypes
from asyncio import get_event_loop, shield, wait_for
from asyncio import get_event_loop, shield
from binascii import hexlify, unhexlify
from pathlib import Path, PurePosixPath
from typing import TYPE_CHECKING, Any, Optional, TypedDict, cast
Expand All @@ -19,7 +19,7 @@
from tribler.core.libtorrent.download_manager.download_config import DownloadConfig
from tribler.core.libtorrent.download_manager.download_manager import DownloadManager
from tribler.core.libtorrent.download_manager.download_state import DOWNLOAD, UPLOAD, DownloadStatus
from tribler.core.libtorrent.download_manager.stream import Stream, StreamChunk
from tribler.core.libtorrent.download_manager.stream import Stream, StreamReader
from tribler.core.libtorrent.torrentdef import TorrentDef
from tribler.core.restapi.rest_endpoint import (
HTTP_BAD_REQUEST,
Expand Down Expand Up @@ -249,8 +249,6 @@ def get_files_info_json_paged(download: Download, view_start: Path, view_size: i
"availability": Float,
"peers": String,
"total_pieces": Integer,
"vod_prebuffering_progress": Float,
"vod_prebuffering_progress_consec": Float,
"error": String,
"time_added": Integer
}),
Expand Down Expand Up @@ -336,16 +334,9 @@ async def get_downloads(self, request: Request) -> RESTResponse: # noqa: C901
"completed_dir": download.config.get_completed_dir(),
"total_pieces": tdef.get_nr_pieces(),
"error": repr(state.get_error()) if state.get_error() else "",
"time_added": download.config.get_time_added()
"time_added": download.config.get_time_added(),
"streamable": bool(tdef and tdef.get_files_with_length({'mp4', 'm4v', 'mov', 'mkv'}))
}
if download.stream:
info.update({
"vod_prebuffering_progress": download.stream.prebuffprogress,
"vod_prebuffering_progress_consec": download.stream.prebuffprogress_consec,
"vod_header_progress": download.stream.headerprogress,
"vod_footer_progress": download.stream.footerprogress,

})

if unfiltered or params.get("infohash") == info["infohash"]:
# Add peers information if requested
Expand Down Expand Up @@ -1137,11 +1128,8 @@ async def prepare(self, request: BaseRequest) -> AbstractStreamWriter | None:
stream = self._download.stream

start = start or 0
if not stream.enabled or stream.fileindex != self._file_index:
await wait_for(stream.enable(self._file_index, start), 10)
await stream.updateprios()

reader = StreamChunk(self._download.stream, start)
await stream.enable(self._file_index)
reader = StreamReader(stream, start)
await reader.open()
try:
writer = await super().prepare(request)
Expand All @@ -1153,7 +1141,7 @@ async def prepare(self, request: BaseRequest) -> AbstractStreamWriter | None:
while data:
await writer.write(data[:todo])
todo -= len(data)
if todo <= 0:
if todo <= 0 or len(data) == 0:
break
data = await reader.read()

Expand Down
19 changes: 0 additions & 19 deletions src/tribler/core/libtorrent/torrents.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from pathlib import Path

from tribler.core.libtorrent.download_manager.download import Download
from tribler.core.libtorrent.download_manager.stream import Stream
from tribler.core.libtorrent.torrentdef import InfoDict

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -84,24 +83,6 @@ def done_cb(fut: Future[lt.torrent_handle]) -> None:
return invoke_func


def check_vod(default: WrappedReturn) -> Wrapped:
"""
Check if torrent is vod mode, else return default.
"""

def wrap(f: Wrapped) -> Wrapped:
def invoke_func(self: Stream,
*args: WrappedParams.args, **kwargs: WrappedParams.kwargs # type: ignore[valid-type]
) -> WrappedReturn:
if self.enabled:
return f(self, *args, **kwargs)
return default

return invoke_func

return wrap


def common_prefix(paths_list: list[Path]) -> Path:
"""
Get the path prefixes component-wise.
Expand Down
Loading

0 comments on commit b2788eb

Please sign in to comment.