diff --git a/README.md b/README.md index fd29cf1..df925a8 100644 --- a/README.md +++ b/README.md @@ -67,7 +67,7 @@ A class that represents a single video stream. A stream consists of a sequence o ***Constructor:*** ```python -Stream(name: str, size: Optional[Tuple[int, int]] = None, quality: int = 50, fps: int = 24) +Stream(name: str, size: Optional[Tuple[int, int]] = None, quality: int = 50, fps: int = 30) ``` Creates a new Stream instance with the given unique name, image size, JPEG quality (1-100), and FPS. diff --git a/examples/show_bandwidth.py b/examples/show_bandwidth.py index ff05103..c376bb8 100644 --- a/examples/show_bandwidth.py +++ b/examples/show_bandwidth.py @@ -18,7 +18,7 @@ stream.set_frame(frame) # Print the bandwidth in KB/s - print(round(stream.get_bandwidth() / 1024, 2), "KB/s") + print(round(stream.get_bandwidth() / 1024, 2), "KB/s", end="\r") server.stop() cap.release() diff --git a/mjpeg_streamer/__init__.py b/mjpeg_streamer/__init__.py index 4bb79cf..2035664 100644 --- a/mjpeg_streamer/__init__.py +++ b/mjpeg_streamer/__init__.py @@ -1,4 +1,4 @@ from .mjpeg_streamer import MjpegServer, Stream __all__ = ["MjpegServer", "Stream"] -__version__ = "2023.12.25.post0" +__version__ = "2024.2.7" diff --git a/mjpeg_streamer/cli.py b/mjpeg_streamer/cli.py new file mode 100644 index 0000000..f5cb846 --- /dev/null +++ b/mjpeg_streamer/cli.py @@ -0,0 +1,119 @@ +from __future__ import annotations + +import argparse +import re +import threading +from typing import Union + +import cv2 + +from mjpeg_streamer import MjpegServer, Stream + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser() + parser.add_argument("--host", type=str, default="localhost") + parser.add_argument("--port", type=int, default=8080) + parser.add_argument( + "--prefix", type=str, default="source", help="Name prefix for the streams" + ) + parser.add_argument( + "--source", + "-s", + action="append", + nargs="+", + required=False, + help="Source(s) to stream (repeatable)", + ) + parser.add_argument("--width", type=int, default=640) + parser.add_argument("--height", type=int, default=480) + parser.add_argument("--quality", "-q", type=int, default=50) + parser.add_argument("--fps", type=int, default=30) + parser.add_argument( + "--show-bandwidth", + action="store_true", + help="Shows the bandwidth used by each stream in kilobytes per second", + ) + args: argparse.Namespace = parser.parse_args() + args.prefix = re.sub("[^0-9a-zA-Z]+", "_", args.prefix) + args.source: list[Union[int, str],] = [[0]] if args.source is None else args.source + args.source = [item for sublist in args.source for item in sublist] + return args + + +def run( + cap: cv2.VideoCapture, + stream: Stream, + stop_event: threading.Event, + show_bandwidth: bool, +) -> None: + while not stop_event.is_set(): + ret, frame = cap.read() + if not ret: + stop_event.set() + break + stream.set_frame(frame) + if show_bandwidth: + global bandwidth + bandwidth[stream.name] = stream.get_bandwidth() + cap.release() + + +def main() -> None: + args = parse_args() + size: tuple[int, int] = (args.width, args.height) + server = MjpegServer(args.host, args.port) + threads: list[threading.Thread,] = [] + stop_events: list[threading.Event,] = [] + + if args.show_bandwidth: + global bandwidth + bandwidth = {} # dict[str, int] + + for source in args.source: + source: Union[int, str] = int(source) if str(source).isdigit() else source + cap = cv2.VideoCapture(source) + source_display = ( + re.sub("[^0-9a-zA-Z]+", "_", source) if isinstance(source, str) else source + ) + stream = Stream( + f"{args.prefix}_{source_display!s}", + size=size, + quality=args.quality, + fps=args.fps, + ) + server.add_stream(stream) + stop_event = threading.Event() + stop_events.append(stop_event) + thread = threading.Thread( + target=run, args=(cap, stream, stop_event, args.show_bandwidth) + ) + threads.append(thread) + + try: + for thread in threads: + thread.start() + server.start() + while True: + if args.show_bandwidth: + print( + f"{' | '.join([f'{k}: {round(v / 1024, 2)} KB/s' for k, v in bandwidth.items()])}", + end="\r", + ) + except KeyboardInterrupt: + for stop_event in stop_events: + stop_event.set() + server.stop() + for thread in threads: + thread.join() + except Exception as e: + print(e) + for stop_event in stop_events: + stop_event.set() + server.stop() + for thread in threads: + thread.join() + + +if __name__ == "__main__": + main() diff --git a/mjpeg_streamer/mjpeg_streamer.py b/mjpeg_streamer/mjpeg_streamer.py index a3c8b9c..e4211e1 100644 --- a/mjpeg_streamer/mjpeg_streamer.py +++ b/mjpeg_streamer/mjpeg_streamer.py @@ -1,8 +1,8 @@ from __future__ import annotations import asyncio -import sys import threading +from collections import deque from typing import Optional, Union import aiohttp @@ -19,7 +19,7 @@ def __init__( name: str, size: Optional[tuple[int, int]] = None, quality: int = 50, - fps: int = 24, + fps: int = 30, ) -> None: self.name = name.lower().casefold().replace(" ", "_") self.size = size @@ -27,30 +27,40 @@ def __init__( self.fps = fps self._frame = np.zeros((320, 240, 1), dtype=np.uint8) self._lock = asyncio.Lock() + self._byte_frame_window = deque(maxlen=30) def set_frame(self, frame: np.ndarray) -> None: self._frame = frame def get_bandwidth(self) -> float: - global byte_frame_size - if len(byte_frame_size) > self.fps: - for _ in range(len(byte_frame_size) - self.fps): - byte_frame_size.pop(0) - return sum(byte_frame_size) + return sum(self._byte_frame_window) + + def __process_current_frame(self) -> np.ndarray: + frame = cv2.resize( + self._frame, self.size or (self._frame.shape[1], self._frame.shape[0]) + ) + val, frame = cv2.imencode( + ".jpg", frame, [cv2.IMWRITE_JPEG_QUALITY, self.quality] + ) + if not val: + raise ValueError("Error encoding frame") + self._byte_frame_window.append(len(frame.tobytes())) + return frame async def get_frame(self) -> np.ndarray: async with self._lock: return self._frame + async def get_frame_processed(self) -> np.ndarray: + async with self._lock: + return self.__process_current_frame() + class _StreamHandler: def __init__(self, stream: Stream) -> None: - global byte_frame_size self._stream = stream - byte_frame_size = [] async def __call__(self, request: web.Request) -> web.StreamResponse: - global byte_frame_size response = web.StreamResponse( status=200, reason="OK", @@ -62,20 +72,7 @@ async def __call__(self, request: web.Request) -> web.StreamResponse: while True: await asyncio.sleep(1 / self._stream.fps) - frame = await self._stream.get_frame() - frame = cv2.resize( - frame, self._stream.size or (frame.shape[1], frame.shape[0]) - ) - val, frame = cv2.imencode( - ".jpg", frame, [cv2.IMWRITE_JPEG_QUALITY, self._stream.quality] - ) - - if not val: - print("Error while encoding frame") - break - - byte_frame_size.append(sys.getsizeof(frame.tobytes())) - + frame = await self._stream.get_frame_processed() with MultipartWriter("image/jpeg", boundary="image-boundary") as mpwriter: mpwriter.append(frame.tobytes(), {"Content-Type": "image/jpeg"}) try: @@ -113,11 +110,9 @@ def is_running(self) -> bool: return self._app.is_running async def __root_handler(self, _) -> web.Response: - text = "