Skip to content

Commit

Permalink
Add CLI, fix bandwidth logic error, ... + README needs an update
Browse files Browse the repository at this point in the history
  • Loading branch information
egeakman committed Feb 7, 2024
1 parent 6ecb103 commit a9a11c3
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 34 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ A class that represents a single video stream. A stream consists of a sequence o
***Constructor:***

```python
Stream(name: str, size: Optional[Tuple[int, int]] = None, quality: int = 50, fps: int = 24)
Stream(name: str, size: Optional[Tuple[int, int]] = None, quality: int = 50, fps: int = 30)
```

Creates a new Stream instance with the given unique name, image size, JPEG quality (1-100), and FPS.
Expand Down
2 changes: 1 addition & 1 deletion examples/show_bandwidth.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
stream.set_frame(frame)

# Print the bandwidth in KB/s
print(round(stream.get_bandwidth() / 1024, 2), "KB/s")
print(round(stream.get_bandwidth() / 1024, 2), "KB/s", end="\r")

server.stop()
cap.release()
Expand Down
2 changes: 1 addition & 1 deletion mjpeg_streamer/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .mjpeg_streamer import MjpegServer, Stream

__all__ = ["MjpegServer", "Stream"]
__version__ = "2023.12.25.post0"
__version__ = "2024.2.7"
119 changes: 119 additions & 0 deletions mjpeg_streamer/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
from __future__ import annotations

import argparse
import re
import threading
from typing import Union

import cv2

from mjpeg_streamer import MjpegServer, Stream


def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument("--host", type=str, default="localhost")
parser.add_argument("--port", type=int, default=8080)
parser.add_argument(
"--prefix", type=str, default="source", help="Name prefix for the streams"
)
parser.add_argument(
"--source",
"-s",
action="append",
nargs="+",
required=False,
help="Source(s) to stream (repeatable)",
)
parser.add_argument("--width", type=int, default=640)
parser.add_argument("--height", type=int, default=480)
parser.add_argument("--quality", "-q", type=int, default=50)
parser.add_argument("--fps", type=int, default=30)
parser.add_argument(
"--show-bandwidth",
action="store_true",
help="Shows the bandwidth used by each stream in kilobytes per second",
)
args: argparse.Namespace = parser.parse_args()
args.prefix = re.sub("[^0-9a-zA-Z]+", "_", args.prefix)
args.source: list[Union[int, str],] = [[0]] if args.source is None else args.source
args.source = [item for sublist in args.source for item in sublist]
return args


def run(
cap: cv2.VideoCapture,
stream: Stream,
stop_event: threading.Event,
show_bandwidth: bool,
) -> None:
while not stop_event.is_set():
ret, frame = cap.read()
if not ret:
stop_event.set()
break
stream.set_frame(frame)
if show_bandwidth:
global bandwidth
bandwidth[stream.name] = stream.get_bandwidth()
cap.release()


def main() -> None:
args = parse_args()
size: tuple[int, int] = (args.width, args.height)
server = MjpegServer(args.host, args.port)
threads: list[threading.Thread,] = []
stop_events: list[threading.Event,] = []

if args.show_bandwidth:
global bandwidth
bandwidth = {} # dict[str, int]

for source in args.source:
source: Union[int, str] = int(source) if str(source).isdigit() else source
cap = cv2.VideoCapture(source)
source_display = (
re.sub("[^0-9a-zA-Z]+", "_", source) if isinstance(source, str) else source
)
stream = Stream(
f"{args.prefix}_{source_display!s}",
size=size,
quality=args.quality,
fps=args.fps,
)
server.add_stream(stream)
stop_event = threading.Event()
stop_events.append(stop_event)
thread = threading.Thread(
target=run, args=(cap, stream, stop_event, args.show_bandwidth)
)
threads.append(thread)

try:
for thread in threads:
thread.start()
server.start()
while True:
if args.show_bandwidth:
print(
f"{' | '.join([f'{k}: {round(v / 1024, 2)} KB/s' for k, v in bandwidth.items()])}",
end="\r",
)
except KeyboardInterrupt:
for stop_event in stop_events:
stop_event.set()
server.stop()
for thread in threads:
thread.join()
except Exception as e:
print(e)
for stop_event in stop_events:
stop_event.set()
server.stop()
for thread in threads:
thread.join()


if __name__ == "__main__":
main()
57 changes: 27 additions & 30 deletions mjpeg_streamer/mjpeg_streamer.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from __future__ import annotations

import asyncio
import sys
import threading
from collections import deque
from typing import Optional, Union

import aiohttp
Expand All @@ -19,38 +19,48 @@ def __init__(
name: str,
size: Optional[tuple[int, int]] = None,
quality: int = 50,
fps: int = 24,
fps: int = 30,
) -> None:
self.name = name.lower().casefold().replace(" ", "_")
self.size = size
self.quality = max(1, min(quality, 100))
self.fps = fps
self._frame = np.zeros((320, 240, 1), dtype=np.uint8)
self._lock = asyncio.Lock()
self._byte_frame_window = deque(maxlen=30)

def set_frame(self, frame: np.ndarray) -> None:
self._frame = frame

def get_bandwidth(self) -> float:
global byte_frame_size
if len(byte_frame_size) > self.fps:
for _ in range(len(byte_frame_size) - self.fps):
byte_frame_size.pop(0)
return sum(byte_frame_size)
return sum(self._byte_frame_window)

def __process_current_frame(self) -> np.ndarray:
frame = cv2.resize(
self._frame, self.size or (self._frame.shape[1], self._frame.shape[0])
)
val, frame = cv2.imencode(
".jpg", frame, [cv2.IMWRITE_JPEG_QUALITY, self.quality]
)
if not val:
raise ValueError("Error encoding frame")
self._byte_frame_window.append(len(frame.tobytes()))
return frame

async def get_frame(self) -> np.ndarray:
async with self._lock:
return self._frame

async def get_frame_processed(self) -> np.ndarray:
async with self._lock:
return self.__process_current_frame()


class _StreamHandler:
def __init__(self, stream: Stream) -> None:
global byte_frame_size
self._stream = stream
byte_frame_size = []

async def __call__(self, request: web.Request) -> web.StreamResponse:
global byte_frame_size
response = web.StreamResponse(
status=200,
reason="OK",
Expand All @@ -62,20 +72,7 @@ async def __call__(self, request: web.Request) -> web.StreamResponse:

while True:
await asyncio.sleep(1 / self._stream.fps)
frame = await self._stream.get_frame()
frame = cv2.resize(
frame, self._stream.size or (frame.shape[1], frame.shape[0])
)
val, frame = cv2.imencode(
".jpg", frame, [cv2.IMWRITE_JPEG_QUALITY, self._stream.quality]
)

if not val:
print("Error while encoding frame")
break

byte_frame_size.append(sys.getsizeof(frame.tobytes()))

frame = await self._stream.get_frame_processed()
with MultipartWriter("image/jpeg", boundary="image-boundary") as mpwriter:
mpwriter.append(frame.tobytes(), {"Content-Type": "image/jpeg"})
try:
Expand Down Expand Up @@ -113,11 +110,9 @@ def is_running(self) -> bool:
return self._app.is_running

async def __root_handler(self, _) -> web.Response:
text = "<h2>Available streams:</h2>\n\n"
text = "<h2>Available streams:</h2>"
for route in self._cap_routes:
text += (
f"<a href='http://{self._host[0]}:{self._port}{route}'>{route}</a>\n\n"
)
text += f"<a href='http://{self._host[0]}:{self._port}{route}'>{route}</a>\n<br>\n"
return aiohttp.web.Response(text=text, content_type="text/html")

def add_stream(self, stream: Stream) -> None:
Expand Down Expand Up @@ -147,15 +142,17 @@ def start(self) -> None:
else:
print("\nServer is already running\n")

print("\nAvailable streams:\n")
for addr in self._host:
print(f"\nStreams index: http://{addr}:{self._port!s}")
print("Available streams:\n")
for route in self._cap_routes: # route has a leading slash
print(f"http://{addr}:{self._port!s}{route}")
print("--------------------------------")
print("--------------------------------\n")
print("\nPress Ctrl+C to stop the server\n")

def stop(self) -> None:
if self.is_running():
self._app.is_running = False
print("\nStopping...\n")
raise GracefulExit
print("\nServer is not running\n")
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ dependencies = [
Homepage = "https://github.com/egeakman/mjpeg-streamer"
Issues = "https://github.com/egeakman/mjpeg-streamer/issues"
Releases = "https://github.com/egeakman/mjpeg-streamer/releases"
[project.scripts]
mjpeg-streamer = "mjpeg_streamer.cli:main"

[tool.hatch.build]
packages = ["mjpeg_streamer"]
Expand All @@ -81,13 +83,13 @@ exclude = ["examples"]
fix = false
ignore = [
"TID252", # Relative imports are banned | __init__.py
"INP001", # ... is part of an implicit namespace package
"T201", # `print` found | TODO: Migrate to logging
"S104", # Possible binding to all interfaces | False positive
"EM101", # Exception must not use a string literal
"EM102", # Exception must not use an f-string literal
"TRY003", # Avoid specifying long messages outside the exception class
"UP007", # Use `X | Y` for type annotations | Have to use `typing.Union` for Python 3.6 compatibility
"FBT001", # Boolean-typed positional argument in function definition
]

[tool.isort]
Expand Down

0 comments on commit a9a11c3

Please sign in to comment.