Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor streaming #8438

Merged
merged 2 commits into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/tribler/core/libtorrent/download_manager/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -811,7 +811,7 @@ def stop(self, user_stopped: bool | None = None) -> Awaitable[None]:
"""
self._logger.debug("Stopping %s", self.tdef.get_name())
if self.stream is not None:
self.stream.disable()
self.stream.close()
if user_stopped is not None:
self.config.set_user_stopped(user_stopped)
if self.handle and self.handle.is_valid():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -872,7 +872,7 @@ async def remove_download(self, download: Download, remove_content: bool = False
if handle:
if handle.is_valid():
if download.stream is not None:
download.stream.disable()
download.stream.close()
logger.debug("Removing handle %s", hexlify(infohash))
(await self.get_session(download.config.get_hops())).remove_torrent(handle, int(remove_content))
else:
Expand Down
706 changes: 201 additions & 505 deletions src/tribler/core/libtorrent/download_manager/stream.py

Large diffs are not rendered by default.

26 changes: 7 additions & 19 deletions src/tribler/core/libtorrent/restapi/downloads_endpoint.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

import mimetypes
from asyncio import get_event_loop, shield, wait_for
from asyncio import get_event_loop, shield
from binascii import hexlify, unhexlify
from pathlib import Path, PurePosixPath
from typing import TYPE_CHECKING, Any, Optional, TypedDict, cast
Expand All @@ -19,7 +19,7 @@
from tribler.core.libtorrent.download_manager.download_config import DownloadConfig
from tribler.core.libtorrent.download_manager.download_manager import DownloadManager
from tribler.core.libtorrent.download_manager.download_state import DOWNLOAD, UPLOAD, DownloadStatus
from tribler.core.libtorrent.download_manager.stream import Stream, StreamChunk
from tribler.core.libtorrent.download_manager.stream import Stream, StreamReader
from tribler.core.libtorrent.torrentdef import TorrentDef
from tribler.core.restapi.rest_endpoint import (
HTTP_BAD_REQUEST,
Expand Down Expand Up @@ -249,8 +249,6 @@ def get_files_info_json_paged(download: Download, view_start: Path, view_size: i
"availability": Float,
"peers": String,
"total_pieces": Integer,
"vod_prebuffering_progress": Float,
"vod_prebuffering_progress_consec": Float,
"error": String,
"time_added": Integer
}),
Expand Down Expand Up @@ -336,16 +334,9 @@ async def get_downloads(self, request: Request) -> RESTResponse: # noqa: C901
"completed_dir": download.config.get_completed_dir(),
"total_pieces": tdef.get_nr_pieces(),
"error": repr(state.get_error()) if state.get_error() else "",
"time_added": download.config.get_time_added()
"time_added": download.config.get_time_added(),
"streamable": bool(tdef and tdef.get_files_with_length({'mp4', 'm4v', 'mov', 'mkv'}))
}
if download.stream:
info.update({
"vod_prebuffering_progress": download.stream.prebuffprogress,
"vod_prebuffering_progress_consec": download.stream.prebuffprogress_consec,
"vod_header_progress": download.stream.headerprogress,
"vod_footer_progress": download.stream.footerprogress,

})

if unfiltered or params.get("infohash") == info["infohash"]:
# Add peers information if requested
Expand Down Expand Up @@ -1137,11 +1128,8 @@ async def prepare(self, request: BaseRequest) -> AbstractStreamWriter | None:
stream = self._download.stream

start = start or 0
if not stream.enabled or stream.fileindex != self._file_index:
await wait_for(stream.enable(self._file_index, start), 10)
await stream.updateprios()

reader = StreamChunk(self._download.stream, start)
await stream.enable(self._file_index)
reader = StreamReader(stream, start)
await reader.open()
try:
writer = await super().prepare(request)
Expand All @@ -1153,7 +1141,7 @@ async def prepare(self, request: BaseRequest) -> AbstractStreamWriter | None:
while data:
await writer.write(data[:todo])
todo -= len(data)
if todo <= 0:
if todo <= 0 or len(data) == 0:
break
data = await reader.read()

Expand Down
19 changes: 0 additions & 19 deletions src/tribler/core/libtorrent/torrents.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from pathlib import Path

from tribler.core.libtorrent.download_manager.download import Download
from tribler.core.libtorrent.download_manager.stream import Stream
from tribler.core.libtorrent.torrentdef import InfoDict

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -84,24 +83,6 @@ def done_cb(fut: Future[lt.torrent_handle]) -> None:
return invoke_func


def check_vod(default: WrappedReturn) -> Wrapped:
"""
Check if torrent is vod mode, else return default.
"""

def wrap(f: Wrapped) -> Wrapped:
def invoke_func(self: Stream,
*args: WrappedParams.args, **kwargs: WrappedParams.kwargs # type: ignore[valid-type]
) -> WrappedReturn:
if self.enabled:
return f(self, *args, **kwargs)
return default

return invoke_func

return wrap


def common_prefix(paths_list: list[Path]) -> Path:
"""
Get the path prefixes component-wise.
Expand Down
Loading