Files
jc-video-recognize/apps/server/services/frame_buffer.py
wuzhuorong 40fd3089a7 feat(server): 新增RTSP多路视频流接入服务。- 实现基于Ring Buffer的帧缓冲区(frame_buffer),支持线程安全读写
- 实现RTSP流接入服务(rtsp_service),支持单路流连接/解码/帧采集
- 实现多路流调度管理器(stream_manager),统一管理多路RTSP流启停与状态监控
2026-06-12 13:56:35 +08:00

255 lines
7.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""帧缓冲区 (MVP-2 / D15)
基于 Ring Buffer 的帧缓冲,配合丢帧策略,避免多路 RTSP 流场景下
内存无限增长。
核心设计:
1. 固定容量的环形缓冲区,写满后自动覆盖最旧帧
2. 支持按策略丢帧: 最新帧优先 (实时性) / 均匀采样 (覆盖率)
3. 线程安全: 使用 asyncio.Lock 保护并发读写
4. 帧元数据: 每帧附带 stream_id / timestamp / frame_index
"""
from __future__ import annotations
import asyncio
import logging
import time
from collections import deque
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, List, Optional, Tuple
import numpy as np
logger = logging.getLogger(__name__)
class DropPolicy(str, Enum):
"""丢帧策略。"""
LATEST = "latest" # 保留最新帧,覆盖最旧帧 (默认,适合实时检测)
SAMPLE = "sample" # 均匀采样保留,丢弃中间帧 (适合回溯分析)
@dataclass
class FrameMeta:
"""帧元数据。"""
stream_id: str
frame_index: int
timestamp: float
width: int = 0
height: int = 0
@dataclass
class FrameItem:
"""缓冲区中的帧条目。"""
frame: np.ndarray
meta: FrameMeta
class FrameBuffer:
"""环形帧缓冲区。
Args:
capacity: 缓冲区最大帧数
drop_policy: 丢帧策略
max_memory_mb: 内存上限 (MB)超过时强制丢帧0 表示不限制
"""
def __init__(
self,
capacity: int = 300,
drop_policy: DropPolicy = DropPolicy.LATEST,
max_memory_mb: float = 0,
) -> None:
self.capacity = max(1, capacity)
self.drop_policy = drop_policy
self.max_memory_mb = max(0.0, max_memory_mb)
self._buffer: deque[FrameItem] = deque(maxlen=self.capacity)
self._lock = asyncio.Lock()
self._total_written: int = 0
self._total_dropped: int = 0
# ------------------------------------------------------------------
# 写入
# ------------------------------------------------------------------
async def write(
self,
frame: np.ndarray,
stream_id: str,
frame_index: int,
timestamp: Optional[float] = None,
) -> None:
"""写入一帧到缓冲区。
当缓冲区已满时,根据 ``drop_policy`` 决定丢弃策略。
"""
meta = FrameMeta(
stream_id=stream_id,
frame_index=frame_index,
timestamp=timestamp or time.time(),
width=frame.shape[1] if frame.ndim >= 2 else 0,
height=frame.shape[0] if frame.ndim >= 2 else 0,
)
item = FrameItem(frame=frame, meta=meta)
async with self._lock:
self._total_written += 1
if len(self._buffer) >= self.capacity:
self._apply_drop_policy(item)
else:
self._buffer.append(item)
# 内存上限检查
if self.max_memory_mb > 0:
self._enforce_memory_limit()
# ------------------------------------------------------------------
# 读取
# ------------------------------------------------------------------
async def read_latest(self) -> Optional[FrameItem]:
"""读取最新一帧 (不消费)。"""
async with self._lock:
if not self._buffer:
return None
return self._buffer[-1]
async def read_oldest(self) -> Optional[FrameItem]:
"""读取最旧一帧 (不消费)。"""
async with self._lock:
if not self._buffer:
return None
return self._buffer[0]
async def read_all(self) -> List[FrameItem]:
"""读取缓冲区所有帧 (快照,不消费)。"""
async with self._lock:
return list(self._buffer)
async def read_range(
self,
start_index: int = 0,
count: Optional[int] = None,
) -> List[FrameItem]:
"""读取指定范围的帧 (快照)。
Args:
start_index: 从缓冲区开头的偏移量
count: 读取帧数None 表示到末尾
"""
async with self._lock:
items = list(self._buffer)
if start_index >= len(items):
return []
end = len(items) if count is None else start_index + count
return items[start_index:end]
async def pop_latest(self) -> Optional[FrameItem]:
"""弹出最新一帧 (消费)。"""
async with self._lock:
if not self._buffer:
return None
return self._buffer.pop()
async def pop_oldest(self) -> Optional[FrameItem]:
"""弹出最旧一帧 (消费)。"""
async with self._lock:
if not self._buffer:
return None
return self._buffer.popleft()
# ------------------------------------------------------------------
# 状态
# ------------------------------------------------------------------
async def clear(self) -> None:
"""清空缓冲区。"""
async with self._lock:
self._buffer.clear()
@property
def size(self) -> int:
"""当前缓冲区帧数。"""
return len(self._buffer)
@property
def stats(self) -> Dict[str, Any]:
"""缓冲区统计信息。"""
return {
"size": len(self._buffer),
"capacity": self.capacity,
"total_written": self._total_written,
"total_dropped": self._total_dropped,
"drop_policy": self.drop_policy.value,
"usage_percent": round(len(self._buffer) / self.capacity * 100, 1),
}
def estimate_memory_mb(self) -> float:
"""估算当前缓冲区占用内存 (MB)。"""
if not self._buffer:
return 0.0
# 取第一帧估算单帧大小
sample = self._buffer[0].frame
frame_bytes = sample.nbytes if isinstance(sample, np.ndarray) else 0
return len(self._buffer) * frame_bytes / (1024 * 1024)
# ------------------------------------------------------------------
# 内部
# ------------------------------------------------------------------
def _apply_drop_policy(self, new_item: FrameItem) -> None:
"""缓冲区满时应用丢帧策略。"""
if self.drop_policy == DropPolicy.LATEST:
# 覆盖最旧帧 (deque maxlen 自动处理)
self._total_dropped += 1
self._buffer.append(new_item)
elif self.drop_policy == DropPolicy.SAMPLE:
# 均匀采样: 丢弃偶数位置的帧,腾出空间
sampled = deque(maxlen=self.capacity)
step = 2
for i, item in enumerate(self._buffer):
if i % step != 0:
self._total_dropped += 1
else:
sampled.append(item)
sampled.append(new_item)
self._buffer = sampled
def _enforce_memory_limit(self) -> None:
"""强制执行内存上限,超出时丢弃最旧帧。"""
while self.max_memory_mb > 0 and self._buffer:
current_mb = self.estimate_memory_mb()
if current_mb <= self.max_memory_mb:
break
self._buffer.popleft()
self._total_dropped += 1
logger.debug(
"FrameBuffer 内存超限 (%.1f > %.1f MB),丢弃最旧帧",
current_mb,
self.max_memory_mb,
)
__all__ = ["FrameBuffer", "FrameItem", "FrameMeta", "DropPolicy"]