- 实现RTSP流接入服务(rtsp_service),支持单路流连接/解码/帧采集 - 实现多路流调度管理器(stream_manager),统一管理多路RTSP流启停与状态监控
397 lines
12 KiB
Python
397 lines
12 KiB
Python
"""RTSP 流接入服务 (MVP-2 / D11-D12)
|
||
|
||
负责单路 RTSP 流的连接、解码、自动重连和帧产出。
|
||
|
||
核心设计:
|
||
|
||
1. 基于 OpenCV VideoCapture 的 RTSP 接入,兼容主流 IP 摄像头
|
||
2. 后台线程解码帧,避免阻塞事件循环
|
||
3. 自动重连: 断线后按指数退避策略重试
|
||
4. 帧回调: 每解码一帧触发回调,由 StreamManager 分发到检测管道
|
||
5. 优雅关闭: stop() 等待解码线程退出,释放资源
|
||
|
||
使用方式::
|
||
|
||
service = RTSPService(
|
||
stream_id="cam-01",
|
||
rtsp_url="rtsp://admin:pass@192.168.1.100:554/stream",
|
||
on_frame=handle_frame,
|
||
)
|
||
await service.start()
|
||
...
|
||
await service.stop()
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import enum
|
||
import logging
|
||
import threading
|
||
import time
|
||
from dataclasses import dataclass, field
|
||
from typing import Any, Callable, Coroutine, Dict, Optional
|
||
|
||
import cv2
|
||
import numpy as np
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 数据模型
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
class StreamStatus(str, enum.Enum):
|
||
"""流状态。"""
|
||
|
||
IDLE = "idle" # 未启动
|
||
CONNECTING = "connecting" # 连接中
|
||
CONNECTED = "connected" # 已连接,正在解码
|
||
RECONNECTING = "reconnecting" # 断线重连中
|
||
STOPPED = "stopped" # 已停止
|
||
ERROR = "error" # 不可恢复错误
|
||
|
||
|
||
@dataclass
|
||
class StreamConfig:
|
||
"""单路 RTSP 流配置。"""
|
||
|
||
stream_id: str
|
||
rtsp_url: str
|
||
# 解码参数
|
||
reconnect_attempts: int = 10 # 最大重连次数,0 = 无限
|
||
reconnect_interval_base: float = 2.0 # 首次重连间隔 (秒)
|
||
reconnect_interval_max: float = 60.0 # 最大重连间隔 (秒)
|
||
reconnect_backoff_factor: float = 2.0 # 退避因子
|
||
# 帧采样
|
||
frame_skip: int = 0 # 每隔 N 帧取 1 帧,0 = 每帧都取
|
||
# OpenCV 参数
|
||
buffer_size: int = 1 # FFmpeg 缓冲区大小 (越小延迟越低)
|
||
# 超时
|
||
read_timeout: float = 5.0 # 单帧读取超时 (秒)
|
||
# 检测配置
|
||
model_id: str = "fire_detection"
|
||
confidence: float = 0.5
|
||
iou: float = 0.45
|
||
|
||
|
||
@dataclass
|
||
class StreamInfo:
|
||
"""流运行时信息。"""
|
||
|
||
stream_id: str
|
||
status: StreamStatus = StreamStatus.IDLE
|
||
rtsp_url: str = ""
|
||
# 统计
|
||
frames_decoded: int = 0
|
||
frames_dropped: int = 0
|
||
reconnect_count: int = 0
|
||
last_frame_time: float = 0.0
|
||
fps: float = 0.0
|
||
# 时间
|
||
connected_at: float = 0.0
|
||
error_message: str = ""
|
||
|
||
def to_dict(self) -> Dict[str, Any]:
|
||
return {
|
||
"stream_id": self.stream_id,
|
||
"status": self.status.value,
|
||
"rtsp_url": self._mask_url(self.rtsp_url),
|
||
"frames_decoded": self.frames_decoded,
|
||
"frames_dropped": self.frames_dropped,
|
||
"reconnect_count": self.reconnect_count,
|
||
"fps": round(self.fps, 2),
|
||
"connected_at": self.connected_at,
|
||
"error_message": self.error_message,
|
||
}
|
||
|
||
@staticmethod
|
||
def _mask_url(url: str) -> str:
|
||
"""遮蔽 RTSP URL 中的密码。"""
|
||
if "@" not in url:
|
||
return url
|
||
try:
|
||
prefix, rest = url.split("://", 1)
|
||
creds_host = rest.split("@", 1)
|
||
if len(creds_host) == 2:
|
||
creds, host_path = creds_host
|
||
if ":" in creds:
|
||
user, _ = creds.split(":", 1)
|
||
return f"{prefix}://{user}:****@{host_path}"
|
||
except Exception:
|
||
pass
|
||
return url
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 帧回调类型
|
||
# ---------------------------------------------------------------------------
|
||
|
||
# on_frame(stream_id, frame, frame_index, timestamp) -> None
|
||
FrameCallback = Callable[[str, np.ndarray, int, float], Coroutine[Any, Any, None]]
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# RTSPService
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
class RTSPService:
|
||
"""单路 RTSP 流接入服务。
|
||
|
||
在后台线程中执行 OpenCV 解码循环,通过 asyncio 事件循环
|
||
将帧投递到异步回调,不阻塞主事件循环。
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
stream_id: str,
|
||
rtsp_url: str,
|
||
on_frame: Optional[FrameCallback] = None,
|
||
config: Optional[StreamConfig] = None,
|
||
loop: Optional[asyncio.AbstractEventLoop] = None,
|
||
) -> None:
|
||
self.config = config or StreamConfig(
|
||
stream_id=stream_id, rtsp_url=rtsp_url
|
||
)
|
||
self.config.stream_id = stream_id
|
||
self.config.rtsp_url = rtsp_url
|
||
|
||
self._on_frame = on_frame
|
||
self._loop = loop
|
||
|
||
self._info = StreamInfo(
|
||
stream_id=stream_id,
|
||
rtsp_url=rtsp_url,
|
||
)
|
||
self._cap: Optional[cv2.VideoCapture] = None
|
||
self._thread: Optional[threading.Thread] = None
|
||
self._stop_event = threading.Event()
|
||
self._frame_index: int = 0
|
||
|
||
# ------------------------------------------------------------------
|
||
# 生命周期
|
||
# ------------------------------------------------------------------
|
||
|
||
async def start(self) -> None:
|
||
"""启动 RTSP 流解码。"""
|
||
|
||
if self._info.status in (StreamStatus.CONNECTED, StreamStatus.CONNECTING):
|
||
logger.warning("RTSP 流 %s 已在运行中", self.config.stream_id)
|
||
return
|
||
|
||
if self._loop is None:
|
||
self._loop = asyncio.get_running_loop()
|
||
|
||
self._stop_event.clear()
|
||
self._frame_index = 0
|
||
self._info.status = StreamStatus.CONNECTING
|
||
|
||
self._thread = threading.Thread(
|
||
target=self._decode_loop,
|
||
name=f"rtsp-{self.config.stream_id}",
|
||
daemon=True,
|
||
)
|
||
self._thread.start()
|
||
logger.info("RTSP 流 %s 解码线程已启动", self.config.stream_id)
|
||
|
||
async def stop(self) -> None:
|
||
"""停止 RTSP 流解码,释放资源。"""
|
||
|
||
if self._info.status == StreamStatus.STOPPED:
|
||
return
|
||
|
||
self._stop_event.set()
|
||
self._info.status = StreamStatus.STOPPED
|
||
|
||
if self._thread and self._thread.is_alive():
|
||
self._thread.join(timeout=5.0)
|
||
|
||
self._release_capture()
|
||
logger.info(
|
||
"RTSP 流 %s 已停止, 共解码 %d 帧",
|
||
self.config.stream_id,
|
||
self._info.frames_decoded,
|
||
)
|
||
|
||
# ------------------------------------------------------------------
|
||
# 状态
|
||
# ------------------------------------------------------------------
|
||
|
||
@property
|
||
def info(self) -> StreamInfo:
|
||
return self._info
|
||
|
||
@property
|
||
def status(self) -> StreamStatus:
|
||
return self._info.status
|
||
|
||
@property
|
||
def is_running(self) -> bool:
|
||
return self._info.status in (
|
||
StreamStatus.CONNECTED,
|
||
StreamStatus.CONNECTING,
|
||
StreamStatus.RECONNECTING,
|
||
)
|
||
|
||
# ------------------------------------------------------------------
|
||
# 解码循环 (后台线程)
|
||
# ------------------------------------------------------------------
|
||
|
||
def _decode_loop(self) -> None:
|
||
"""后台线程: RTSP 解码 + 自动重连。"""
|
||
|
||
attempt = 0
|
||
|
||
while not self._stop_event.is_set():
|
||
# 尝试连接
|
||
connected = self._connect()
|
||
if not connected:
|
||
if self._stop_event.is_set():
|
||
break
|
||
attempt += 1
|
||
if (
|
||
self.config.reconnect_attempts > 0
|
||
and attempt > self.config.reconnect_attempts
|
||
):
|
||
self._info.status = StreamStatus.ERROR
|
||
self._info.error_message = (
|
||
f"超过最大重连次数 ({self.config.reconnect_attempts})"
|
||
)
|
||
logger.error(
|
||
"RTSP 流 %s %s",
|
||
self.config.stream_id,
|
||
self._info.error_message,
|
||
)
|
||
break
|
||
|
||
# 指数退避
|
||
interval = min(
|
||
self.config.reconnect_interval_base
|
||
* (self.config.reconnect_backoff_factor ** (attempt - 1)),
|
||
self.config.reconnect_interval_max,
|
||
)
|
||
self._info.status = StreamStatus.RECONNECTING
|
||
self._info.reconnect_count += 1
|
||
logger.warning(
|
||
"RTSP 流 %s 连接失败,第 %d 次重连,等待 %.1fs",
|
||
self.config.stream_id,
|
||
attempt,
|
||
interval,
|
||
)
|
||
self._stop_event.wait(timeout=interval)
|
||
continue
|
||
|
||
# 连接成功,重置计数
|
||
attempt = 0
|
||
self._info.status = StreamStatus.CONNECTED
|
||
self._info.connected_at = time.time()
|
||
logger.info("RTSP 流 %s 已连接: %s", self.config.stream_id, self.config.rtsp_url)
|
||
|
||
# 解码帧
|
||
self._read_frames()
|
||
|
||
# 如果 read_frames 退出且未被停止,说明断线了
|
||
if not self._stop_event.is_set():
|
||
self._release_capture()
|
||
self._info.status = StreamStatus.RECONNECTING
|
||
logger.warning("RTSP 流 %s 断线,准备重连", self.config.stream_id)
|
||
|
||
def _connect(self) -> bool:
|
||
"""尝试连接 RTSP 流。"""
|
||
|
||
try:
|
||
cap = cv2.VideoCapture(self.config.rtsp_url, cv2.CAP_FFMPEG)
|
||
# 降低缓冲以减少延迟
|
||
cap.set(cv2.CAP_PROP_BUFFERSIZE, self.config.buffer_size)
|
||
|
||
if not cap.isOpened():
|
||
return False
|
||
|
||
# 验证: 尝试读取一帧
|
||
ret, _ = cap.read()
|
||
if not ret:
|
||
cap.release()
|
||
return False
|
||
|
||
self._cap = cap
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.debug("RTSP 流 %s 连接异常: %s", self.config.stream_id, e)
|
||
return False
|
||
|
||
def _read_frames(self) -> None:
|
||
"""持续读取帧直到断线或停止信号。"""
|
||
|
||
if self._cap is None:
|
||
return
|
||
|
||
fps_counter_start = time.time()
|
||
fps_frame_count = 0
|
||
|
||
while not self._stop_event.is_set():
|
||
try:
|
||
ret, frame = self._cap.read()
|
||
except Exception as e:
|
||
logger.warning("RTSP 流 %s 读取异常: %s", self.config.stream_id, e)
|
||
break
|
||
|
||
if not ret or frame is None:
|
||
logger.warning("RTSP 流 %s 读取帧失败,可能断线", self.config.stream_id)
|
||
break
|
||
|
||
self._frame_index += 1
|
||
self._info.frames_decoded += 1
|
||
self._info.last_frame_time = time.time()
|
||
|
||
# 帧采样
|
||
if self.config.frame_skip > 0 and self._frame_index % (self.config.frame_skip + 1) != 1:
|
||
self._info.frames_dropped += 1
|
||
continue
|
||
|
||
# FPS 统计
|
||
fps_frame_count += 1
|
||
elapsed = time.time() - fps_counter_start
|
||
if elapsed >= 1.0:
|
||
self._info.fps = fps_frame_count / elapsed
|
||
fps_frame_count = 0
|
||
fps_counter_start = time.time()
|
||
|
||
# 通过事件循环投递帧到异步回调
|
||
if self._on_frame and self._loop and not self._loop.is_closed():
|
||
try:
|
||
asyncio.run_coroutine_threadsafe(
|
||
self._on_frame(
|
||
self.config.stream_id,
|
||
frame,
|
||
self._frame_index,
|
||
self._info.last_frame_time,
|
||
),
|
||
self._loop,
|
||
)
|
||
except RuntimeError as e:
|
||
logger.debug("投递帧回调失败 (事件循环可能已关闭): %s", e)
|
||
break
|
||
|
||
def _release_capture(self) -> None:
|
||
"""释放 VideoCapture 资源。"""
|
||
|
||
if self._cap is not None:
|
||
try:
|
||
self._cap.release()
|
||
except Exception:
|
||
pass
|
||
self._cap = None
|
||
|
||
|
||
__all__ = [
|
||
"RTSPService",
|
||
"StreamConfig",
|
||
"StreamInfo",
|
||
"StreamStatus",
|
||
"FrameCallback",
|
||
]
|