Files
jc-video-recognize/apps/server/services/rtsp_service.py
wuzhuorong 40fd3089a7 feat(server): 新增RTSP多路视频流接入服务。- 实现基于Ring Buffer的帧缓冲区(frame_buffer),支持线程安全读写
- 实现RTSP流接入服务(rtsp_service),支持单路流连接/解码/帧采集
- 实现多路流调度管理器(stream_manager),统一管理多路RTSP流启停与状态监控
2026-06-12 13:56:35 +08:00

397 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""RTSP 流接入服务 (MVP-2 / D11-D12)
负责单路 RTSP 流的连接、解码、自动重连和帧产出。
核心设计:
1. 基于 OpenCV VideoCapture 的 RTSP 接入,兼容主流 IP 摄像头
2. 后台线程解码帧,避免阻塞事件循环
3. 自动重连: 断线后按指数退避策略重试
4. 帧回调: 每解码一帧触发回调,由 StreamManager 分发到检测管道
5. 优雅关闭: stop() 等待解码线程退出,释放资源
使用方式::
service = RTSPService(
stream_id="cam-01",
rtsp_url="rtsp://admin:pass@192.168.1.100:554/stream",
on_frame=handle_frame,
)
await service.start()
...
await service.stop()
"""
from __future__ import annotations
import asyncio
import enum
import logging
import threading
import time
from dataclasses import dataclass, field
from typing import Any, Callable, Coroutine, Dict, Optional
import cv2
import numpy as np
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# 数据模型
# ---------------------------------------------------------------------------
class StreamStatus(str, enum.Enum):
"""流状态。"""
IDLE = "idle" # 未启动
CONNECTING = "connecting" # 连接中
CONNECTED = "connected" # 已连接,正在解码
RECONNECTING = "reconnecting" # 断线重连中
STOPPED = "stopped" # 已停止
ERROR = "error" # 不可恢复错误
@dataclass
class StreamConfig:
"""单路 RTSP 流配置。"""
stream_id: str
rtsp_url: str
# 解码参数
reconnect_attempts: int = 10 # 最大重连次数0 = 无限
reconnect_interval_base: float = 2.0 # 首次重连间隔 (秒)
reconnect_interval_max: float = 60.0 # 最大重连间隔 (秒)
reconnect_backoff_factor: float = 2.0 # 退避因子
# 帧采样
frame_skip: int = 0 # 每隔 N 帧取 1 帧0 = 每帧都取
# OpenCV 参数
buffer_size: int = 1 # FFmpeg 缓冲区大小 (越小延迟越低)
# 超时
read_timeout: float = 5.0 # 单帧读取超时 (秒)
# 检测配置
model_id: str = "fire_detection"
confidence: float = 0.5
iou: float = 0.45
@dataclass
class StreamInfo:
"""流运行时信息。"""
stream_id: str
status: StreamStatus = StreamStatus.IDLE
rtsp_url: str = ""
# 统计
frames_decoded: int = 0
frames_dropped: int = 0
reconnect_count: int = 0
last_frame_time: float = 0.0
fps: float = 0.0
# 时间
connected_at: float = 0.0
error_message: str = ""
def to_dict(self) -> Dict[str, Any]:
return {
"stream_id": self.stream_id,
"status": self.status.value,
"rtsp_url": self._mask_url(self.rtsp_url),
"frames_decoded": self.frames_decoded,
"frames_dropped": self.frames_dropped,
"reconnect_count": self.reconnect_count,
"fps": round(self.fps, 2),
"connected_at": self.connected_at,
"error_message": self.error_message,
}
@staticmethod
def _mask_url(url: str) -> str:
"""遮蔽 RTSP URL 中的密码。"""
if "@" not in url:
return url
try:
prefix, rest = url.split("://", 1)
creds_host = rest.split("@", 1)
if len(creds_host) == 2:
creds, host_path = creds_host
if ":" in creds:
user, _ = creds.split(":", 1)
return f"{prefix}://{user}:****@{host_path}"
except Exception:
pass
return url
# ---------------------------------------------------------------------------
# 帧回调类型
# ---------------------------------------------------------------------------
# on_frame(stream_id, frame, frame_index, timestamp) -> None
FrameCallback = Callable[[str, np.ndarray, int, float], Coroutine[Any, Any, None]]
# ---------------------------------------------------------------------------
# RTSPService
# ---------------------------------------------------------------------------
class RTSPService:
"""单路 RTSP 流接入服务。
在后台线程中执行 OpenCV 解码循环,通过 asyncio 事件循环
将帧投递到异步回调,不阻塞主事件循环。
"""
def __init__(
self,
stream_id: str,
rtsp_url: str,
on_frame: Optional[FrameCallback] = None,
config: Optional[StreamConfig] = None,
loop: Optional[asyncio.AbstractEventLoop] = None,
) -> None:
self.config = config or StreamConfig(
stream_id=stream_id, rtsp_url=rtsp_url
)
self.config.stream_id = stream_id
self.config.rtsp_url = rtsp_url
self._on_frame = on_frame
self._loop = loop
self._info = StreamInfo(
stream_id=stream_id,
rtsp_url=rtsp_url,
)
self._cap: Optional[cv2.VideoCapture] = None
self._thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
self._frame_index: int = 0
# ------------------------------------------------------------------
# 生命周期
# ------------------------------------------------------------------
async def start(self) -> None:
"""启动 RTSP 流解码。"""
if self._info.status in (StreamStatus.CONNECTED, StreamStatus.CONNECTING):
logger.warning("RTSP 流 %s 已在运行中", self.config.stream_id)
return
if self._loop is None:
self._loop = asyncio.get_running_loop()
self._stop_event.clear()
self._frame_index = 0
self._info.status = StreamStatus.CONNECTING
self._thread = threading.Thread(
target=self._decode_loop,
name=f"rtsp-{self.config.stream_id}",
daemon=True,
)
self._thread.start()
logger.info("RTSP 流 %s 解码线程已启动", self.config.stream_id)
async def stop(self) -> None:
"""停止 RTSP 流解码,释放资源。"""
if self._info.status == StreamStatus.STOPPED:
return
self._stop_event.set()
self._info.status = StreamStatus.STOPPED
if self._thread and self._thread.is_alive():
self._thread.join(timeout=5.0)
self._release_capture()
logger.info(
"RTSP 流 %s 已停止, 共解码 %d",
self.config.stream_id,
self._info.frames_decoded,
)
# ------------------------------------------------------------------
# 状态
# ------------------------------------------------------------------
@property
def info(self) -> StreamInfo:
return self._info
@property
def status(self) -> StreamStatus:
return self._info.status
@property
def is_running(self) -> bool:
return self._info.status in (
StreamStatus.CONNECTED,
StreamStatus.CONNECTING,
StreamStatus.RECONNECTING,
)
# ------------------------------------------------------------------
# 解码循环 (后台线程)
# ------------------------------------------------------------------
def _decode_loop(self) -> None:
"""后台线程: RTSP 解码 + 自动重连。"""
attempt = 0
while not self._stop_event.is_set():
# 尝试连接
connected = self._connect()
if not connected:
if self._stop_event.is_set():
break
attempt += 1
if (
self.config.reconnect_attempts > 0
and attempt > self.config.reconnect_attempts
):
self._info.status = StreamStatus.ERROR
self._info.error_message = (
f"超过最大重连次数 ({self.config.reconnect_attempts})"
)
logger.error(
"RTSP 流 %s %s",
self.config.stream_id,
self._info.error_message,
)
break
# 指数退避
interval = min(
self.config.reconnect_interval_base
* (self.config.reconnect_backoff_factor ** (attempt - 1)),
self.config.reconnect_interval_max,
)
self._info.status = StreamStatus.RECONNECTING
self._info.reconnect_count += 1
logger.warning(
"RTSP 流 %s 连接失败,第 %d 次重连,等待 %.1fs",
self.config.stream_id,
attempt,
interval,
)
self._stop_event.wait(timeout=interval)
continue
# 连接成功,重置计数
attempt = 0
self._info.status = StreamStatus.CONNECTED
self._info.connected_at = time.time()
logger.info("RTSP 流 %s 已连接: %s", self.config.stream_id, self.config.rtsp_url)
# 解码帧
self._read_frames()
# 如果 read_frames 退出且未被停止,说明断线了
if not self._stop_event.is_set():
self._release_capture()
self._info.status = StreamStatus.RECONNECTING
logger.warning("RTSP 流 %s 断线,准备重连", self.config.stream_id)
def _connect(self) -> bool:
"""尝试连接 RTSP 流。"""
try:
cap = cv2.VideoCapture(self.config.rtsp_url, cv2.CAP_FFMPEG)
# 降低缓冲以减少延迟
cap.set(cv2.CAP_PROP_BUFFERSIZE, self.config.buffer_size)
if not cap.isOpened():
return False
# 验证: 尝试读取一帧
ret, _ = cap.read()
if not ret:
cap.release()
return False
self._cap = cap
return True
except Exception as e:
logger.debug("RTSP 流 %s 连接异常: %s", self.config.stream_id, e)
return False
def _read_frames(self) -> None:
"""持续读取帧直到断线或停止信号。"""
if self._cap is None:
return
fps_counter_start = time.time()
fps_frame_count = 0
while not self._stop_event.is_set():
try:
ret, frame = self._cap.read()
except Exception as e:
logger.warning("RTSP 流 %s 读取异常: %s", self.config.stream_id, e)
break
if not ret or frame is None:
logger.warning("RTSP 流 %s 读取帧失败,可能断线", self.config.stream_id)
break
self._frame_index += 1
self._info.frames_decoded += 1
self._info.last_frame_time = time.time()
# 帧采样
if self.config.frame_skip > 0 and self._frame_index % (self.config.frame_skip + 1) != 1:
self._info.frames_dropped += 1
continue
# FPS 统计
fps_frame_count += 1
elapsed = time.time() - fps_counter_start
if elapsed >= 1.0:
self._info.fps = fps_frame_count / elapsed
fps_frame_count = 0
fps_counter_start = time.time()
# 通过事件循环投递帧到异步回调
if self._on_frame and self._loop and not self._loop.is_closed():
try:
asyncio.run_coroutine_threadsafe(
self._on_frame(
self.config.stream_id,
frame,
self._frame_index,
self._info.last_frame_time,
),
self._loop,
)
except RuntimeError as e:
logger.debug("投递帧回调失败 (事件循环可能已关闭): %s", e)
break
def _release_capture(self) -> None:
"""释放 VideoCapture 资源。"""
if self._cap is not None:
try:
self._cap.release()
except Exception:
pass
self._cap = None
__all__ = [
"RTSPService",
"StreamConfig",
"StreamInfo",
"StreamStatus",
"FrameCallback",
]