Skip to content

Commit

Permalink
Merge pull request #21 from mhdzumair/develop
Browse files Browse the repository at this point in the history
Add asynchronous URL extraction and speed test features to mediaflow_proxy
  • Loading branch information
mhdzumair authored Nov 12, 2024
2 parents 3ca55c5 + 81ebf36 commit 0487d4a
Show file tree
Hide file tree
Showing 23 changed files with 1,344 additions and 20 deletions.
4 changes: 4 additions & 0 deletions mediaflow_proxy/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ class Settings(BaseSettings):
proxy_url: str | None = None # The URL of the proxy server to route requests through.
enable_streaming_progress: bool = False # Whether to enable streaming progress tracking.

user_agent: str = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3" # The user agent to use for HTTP requests.
)

class Config:
env_file = ".env"
extra = "ignore"
Expand Down
Empty file.
43 changes: 43 additions & 0 deletions mediaflow_proxy/extractors/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from abc import ABC, abstractmethod
from typing import Dict, Tuple, Optional

import httpx

from mediaflow_proxy.configs import settings


class BaseExtractor(ABC):
"""Base class for all URL extractors."""

def __init__(self, proxy_enabled: bool, request_headers: dict):
self.proxy_url = settings.proxy_url if proxy_enabled else None
self.base_headers = {
"User-Agent": settings.user_agent,
"Accept-Language": "en-US,en;q=0.5",
**request_headers,
}

async def _make_request(
self, url: str, headers: Optional[Dict] = None, follow_redirects: bool = True, **kwargs
) -> httpx.Response:
"""Make HTTP request with error handling."""
try:
async with httpx.AsyncClient(proxy=self.proxy_url) as client:
response = await client.get(
url,
headers={**self.base_headers, **(headers or {})},
follow_redirects=follow_redirects,
timeout=30,
**kwargs,
)
response.raise_for_status()
return response
except httpx.HTTPError as e:
raise ValueError(f"HTTP request failed: {str(e)}")
except Exception as e:
raise ValueError(f"Request failed: {str(e)}")

@abstractmethod
async def extract(self, url: str) -> Tuple[str, Dict[str, str]]:
"""Extract final URL and required headers."""
pass
34 changes: 34 additions & 0 deletions mediaflow_proxy/extractors/doodstream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import re
import time
from typing import Tuple, Dict

from mediaflow_proxy.extractors.base import BaseExtractor


class DoodStreamExtractor(BaseExtractor):
"""DoodStream URL extractor."""

def __init__(self, proxy_enabled: bool, request_headers: dict):
super().__init__(proxy_enabled, request_headers)
self.base_url = "https://d000d.com"

async def extract(self, url: str) -> Tuple[str, Dict[str, str]]:
"""Extract DoodStream URL."""
response = await self._make_request(url)

# Extract URL pattern
pattern = r"(\/pass_md5\/.*?)'.*(\?token=.*?expiry=)"
match = re.search(pattern, response.text, re.DOTALL)
if not match:
raise ValueError("Failed to extract URL pattern")

# Build final URL
pass_url = f"{self.base_url}{match[1]}"
referer = f"{self.base_url}/"
headers = {"Range": "bytes=0-", "Referer": referer}

rebobo_response = await self._make_request(pass_url, headers=headers)
timestamp = str(int(time.time()))
final_url = f"{rebobo_response.text}123456789{match[2]}{timestamp}"

return final_url, {"Referer": referer}
24 changes: 24 additions & 0 deletions mediaflow_proxy/extractors/factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from typing import Dict, Type

from mediaflow_proxy.extractors.base import BaseExtractor
from mediaflow_proxy.extractors.doodstream import DoodStreamExtractor
from mediaflow_proxy.extractors.mixdrop import MixdropExtractor
from mediaflow_proxy.extractors.uqload import UqloadExtractor


class ExtractorFactory:
"""Factory for creating URL extractors."""

_extractors: Dict[str, Type[BaseExtractor]] = {
"Doodstream": DoodStreamExtractor,
"Uqload": UqloadExtractor,
"Mixdrop": MixdropExtractor,
}

@classmethod
def get_extractor(cls, host: str, proxy_enabled: bool, request_headers: dict) -> BaseExtractor:
"""Get appropriate extractor instance for the given host."""
extractor_class = cls._extractors.get(host)
if not extractor_class:
raise ValueError(f"Unsupported host: {host}")
return extractor_class(proxy_enabled, request_headers)
31 changes: 31 additions & 0 deletions mediaflow_proxy/extractors/mixdrop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import re
import string
from typing import Dict, Tuple

from mediaflow_proxy.extractors.base import BaseExtractor


class MixdropExtractor(BaseExtractor):
"""Mixdrop URL extractor."""

async def extract(self, url: str) -> Tuple[str, Dict[str, str]]:
"""Extract Mixdrop URL."""
response = await self._make_request(url)

# Extract and decode URL
match = re.search(r"\}\('(.+)',.+,'(.+)'\.split", response.text)
if not match:
raise ValueError("Failed to extract URL components")

s1, s2 = match.group(1, 2)
schema = s1.split(";")[2][5:-1]
terms = s2.split("|")

# Build character mapping
charset = string.digits + string.ascii_letters
char_map = {charset[i]: terms[i] or charset[i] for i in range(len(terms))}

# Construct final URL
final_url = "https:" + "".join(char_map.get(c, c) for c in schema)

return final_url, {"User-Agent": self.base_headers["User-Agent"]}
18 changes: 18 additions & 0 deletions mediaflow_proxy/extractors/uqload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import re
from typing import Dict, Tuple

from mediaflow_proxy.extractors.base import BaseExtractor


class UqloadExtractor(BaseExtractor):
"""Uqload URL extractor."""

async def extract(self, url: str) -> Tuple[str, Dict[str, str]]:
"""Extract Uqload URL."""
response = await self._make_request(url)

video_url_match = re.search(r'sources: \["(.*?)"\]', response.text)
if not video_url_match:
raise ValueError("Failed to extract video URL")

return video_url_match.group(1), {"Referer": "https://uqload.to/"}
9 changes: 8 additions & 1 deletion mediaflow_proxy/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from starlette.staticfiles import StaticFiles

from mediaflow_proxy.configs import settings
from mediaflow_proxy.routes import proxy_router
from mediaflow_proxy.routes import proxy_router, extractor_router, speedtest_router
from mediaflow_proxy.schemas import GenerateUrlRequest
from mediaflow_proxy.utils.crypto_utils import EncryptionHandler, EncryptionMiddleware
from mediaflow_proxy.utils.http_utils import encode_mediaflow_proxy_url
Expand Down Expand Up @@ -54,6 +54,11 @@ async def get_favicon():
return RedirectResponse(url="/logo.png")


@app.get("/speedtest")
async def show_speedtest_page():
return RedirectResponse(url="/speedtest.html")


@app.post("/generate_encrypted_or_encoded_url")
async def generate_encrypted_or_encoded_url(request: GenerateUrlRequest):
if "api_password" not in request.query_params:
Expand All @@ -74,6 +79,8 @@ async def generate_encrypted_or_encoded_url(request: GenerateUrlRequest):


app.include_router(proxy_router, prefix="/proxy", tags=["proxy"], dependencies=[Depends(verify_api_key)])
app.include_router(extractor_router, prefix="/extractor", tags=["extractors"], dependencies=[Depends(verify_api_key)])
app.include_router(speedtest_router, prefix="/speedtest", tags=["speedtest"], dependencies=[Depends(verify_api_key)])

static_path = resources.files("mediaflow_proxy").joinpath("static")
app.mount("/", StaticFiles(directory=str(static_path), html=True), name="static")
Expand Down
5 changes: 5 additions & 0 deletions mediaflow_proxy/routes/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .proxy import proxy_router
from .extractor import extractor_router
from .speedtest import speedtest_router

__all__ = ["proxy_router", "extractor_router", "speedtest_router"]
48 changes: 48 additions & 0 deletions mediaflow_proxy/routes/extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from typing import Annotated

from fastapi import APIRouter, Query, HTTPException, Request, Depends
from fastapi.responses import RedirectResponse

from mediaflow_proxy.configs import settings
from mediaflow_proxy.extractors.factory import ExtractorFactory
from mediaflow_proxy.schemas import ExtractorURLParams
from mediaflow_proxy.utils.http_utils import (
encode_mediaflow_proxy_url,
get_original_scheme,
ProxyRequestHeaders,
get_proxy_headers,
)

extractor_router = APIRouter()


@extractor_router.get("/video")
async def extract_url(
extractor_params: Annotated[ExtractorURLParams, Query()],
request: Request,
proxy_headers: Annotated[ProxyRequestHeaders, Depends(get_proxy_headers)],
):
"""Extract clean links from various video hosting services."""
try:
extractor = ExtractorFactory.get_extractor(
extractor_params.host, extractor_params.use_request_proxy, proxy_headers.request
)
final_url, headers = await extractor.extract(extractor_params.destination)

if extractor_params.redirect_stream:
headers.update(proxy_headers.request)
stream_url = encode_mediaflow_proxy_url(
str(request.url_for("proxy_stream_endpoint").replace(scheme=get_original_scheme(request))),
destination_url=final_url,
query_params={"api_password": settings.api_password},
request_headers=headers,
response_headers=proxy_headers.response,
)
return RedirectResponse(url=stream_url)

return {"url": final_url, "headers": headers}

except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=f"Extraction failed: {str(e)}")
19 changes: 16 additions & 3 deletions mediaflow_proxy/routes.py → mediaflow_proxy/routes/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,22 @@

from fastapi import Request, Depends, APIRouter, Query, HTTPException

from .handlers import handle_hls_stream_proxy, proxy_stream, get_manifest, get_playlist, get_segment, get_public_ip
from .schemas import MPDSegmentParams, MPDPlaylistParams, HLSManifestParams, ProxyStreamParams, MPDManifestParams
from .utils.http_utils import get_proxy_headers, ProxyRequestHeaders
from mediaflow_proxy.handlers import (
handle_hls_stream_proxy,
proxy_stream,
get_manifest,
get_playlist,
get_segment,
get_public_ip,
)
from mediaflow_proxy.schemas import (
MPDSegmentParams,
MPDPlaylistParams,
HLSManifestParams,
ProxyStreamParams,
MPDManifestParams,
)
from mediaflow_proxy.utils.http_utils import get_proxy_headers, ProxyRequestHeaders

proxy_router = APIRouter()

Expand Down
43 changes: 43 additions & 0 deletions mediaflow_proxy/routes/speedtest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import uuid

from fastapi import APIRouter, BackgroundTasks, HTTPException, Request
from fastapi.responses import RedirectResponse

from mediaflow_proxy.speedtest.service import SpeedTestService, SpeedTestProvider

speedtest_router = APIRouter()

# Initialize service
speedtest_service = SpeedTestService()


@speedtest_router.get("/", summary="Show speed test interface")
async def show_speedtest_page():
"""Return the speed test HTML interface."""
return RedirectResponse(url="/speedtest.html")


@speedtest_router.post("/start", summary="Start a new speed test", response_model=dict)
async def start_speedtest(background_tasks: BackgroundTasks, provider: SpeedTestProvider, request: Request):
"""Start a new speed test for the specified provider."""
task_id = str(uuid.uuid4())
api_key = request.headers.get("api_key")

# Create and initialize the task
await speedtest_service.create_test(task_id, provider, api_key)

# Schedule the speed test
background_tasks.add_task(speedtest_service.run_speedtest, task_id, provider, api_key)

return {"task_id": task_id}


@speedtest_router.get("/results/{task_id}", summary="Get speed test results")
async def get_speedtest_results(task_id: str):
"""Get the results or current status of a speed test."""
task = await speedtest_service.get_test_results(task_id)

if not task:
raise HTTPException(status_code=404, detail="Speed test task not found or expired")

return task.dict()
8 changes: 8 additions & 0 deletions mediaflow_proxy/schemas.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Literal

from pydantic import BaseModel, Field, IPvAnyAddress, ConfigDict


Expand Down Expand Up @@ -55,3 +57,9 @@ class MPDSegmentParams(GenericParams):
mime_type: str = Field(..., description="The MIME type of the segment.")
key_id: str | None = Field(None, description="The DRM key ID (optional).")
key: str | None = Field(None, description="The DRM key (optional).")


class ExtractorURLParams(GenericParams):
host: Literal["Doodstream", "Mixdrop", "Uqload"] = Field(..., description="The host to extract the URL from.")
destination: str = Field(..., description="The URL of the stream.", alias="d")
redirect_stream: bool = Field(False, description="Whether to redirect to the stream endpoint automatically.")
Empty file.
46 changes: 46 additions & 0 deletions mediaflow_proxy/speedtest/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from datetime import datetime
from enum import Enum
from typing import Dict, Optional

from pydantic import BaseModel, Field


class SpeedTestProvider(str, Enum):
REAL_DEBRID = "real_debrid"
ALL_DEBRID = "all_debrid"


class ServerInfo(BaseModel):
url: str
name: str


class UserInfo(BaseModel):
ip: Optional[str] = None
isp: Optional[str] = None
country: Optional[str] = None


class SpeedTestResult(BaseModel):
speed_mbps: float = Field(..., description="Speed in Mbps")
duration: float = Field(..., description="Test duration in seconds")
data_transferred: int = Field(..., description="Data transferred in bytes")
timestamp: datetime = Field(default_factory=datetime.utcnow)


class LocationResult(BaseModel):
result: Optional[SpeedTestResult] = None
error: Optional[str] = None
server_name: str
server_url: str


class SpeedTestTask(BaseModel):
task_id: str
provider: SpeedTestProvider
results: Dict[str, LocationResult] = {}
started_at: datetime
completed_at: Optional[datetime] = None
status: str = "running"
user_info: Optional[UserInfo] = None
current_location: Optional[str] = None
Loading

0 comments on commit 0487d4a

Please sign in to comment.