"""RTSP 流管理 API (MVP-2 / D13-D14) 提供 RTSP 流的增删改查、启停控制、状态监控等 RESTful 接口。 """ from __future__ import annotations import logging from typing import Any, Dict, List, Optional from fastapi import APIRouter, HTTPException from pydantic import BaseModel, Field from core.settings import get_settings from services.stream_manager import StreamManager logger = logging.getLogger(__name__) router = APIRouter(prefix="/rtsp", tags=["RTSP 流管理"]) # 全局 StreamManager 实例 (由 main.py 初始化) _stream_manager: Optional[StreamManager] = None def init_stream_manager(model_service: Any = None) -> StreamManager: """初始化全局 StreamManager 实例。""" global _stream_manager settings = get_settings() _stream_manager = StreamManager( model_service=model_service, buffer_capacity=settings.rtsp.buffer_capacity, max_streams=settings.rtsp.max_streams, detect_interval=settings.rtsp.detect_interval, ) return _stream_manager def get_stream_manager() -> StreamManager: """获取全局 StreamManager 实例。""" if _stream_manager is None: raise RuntimeError("StreamManager 未初始化,请先调用 init_stream_manager()") return _stream_manager # --------------------------------------------------------------------------- # 请求/响应模型 # --------------------------------------------------------------------------- class AddStreamRequest(BaseModel): """添加 RTSP 流请求。""" stream_id: str = Field(..., min_length=1, max_length=64, description="流标识") rtsp_url: str = Field(..., description="RTSP 流地址") model_id: str = Field(default="fire_detection", description="检测模型ID") confidence: float = Field(default=0.5, ge=0.0, le=1.0, description="置信度阈值") iou: float = Field(default=0.45, ge=0.0, le=1.0, description="IOU阈值") frame_skip: int = Field(default=0, ge=0, description="帧采样间隔") class UpdateStreamConfigRequest(BaseModel): """更新流配置请求。""" model_id: Optional[str] = None confidence: Optional[float] = Field(default=None, ge=0.0, le=1.0) iou: Optional[float] = Field(default=None, ge=0.0, le=1.0) frame_skip: Optional[int] = Field(default=None, ge=0) class StreamOperationResponse(BaseModel): """通用操作响应。""" success: bool message: str # --------------------------------------------------------------------------- # API 路由 # --------------------------------------------------------------------------- @router.post("/streams", response_model=StreamOperationResponse) async def add_stream(req: AddStreamRequest): """添加一路 RTSP 流。""" from services.rtsp_service import StreamConfig config = StreamConfig( stream_id=req.stream_id, rtsp_url=req.rtsp_url, model_id=req.model_id, confidence=req.confidence, iou=req.iou, frame_skip=req.frame_skip, ) manager = get_stream_manager() result = await manager.add_stream(req.stream_id, req.rtsp_url, config=config) if not result["success"]: raise HTTPException(status_code=400, detail=result["message"]) return StreamOperationResponse(**result) @router.delete("/streams/{stream_id}", response_model=StreamOperationResponse) async def remove_stream(stream_id: str): """移除一路 RTSP 流。""" manager = get_stream_manager() result = await manager.remove_stream(stream_id) if not result["success"]: raise HTTPException(status_code=404, detail=result["message"]) return StreamOperationResponse(**result) @router.post("/streams/{stream_id}/start", response_model=StreamOperationResponse) async def start_stream(stream_id: str): """启动一路 RTSP 流。""" manager = get_stream_manager() result = await manager.start_stream(stream_id) if not result["success"]: raise HTTPException(status_code=400, detail=result["message"]) return StreamOperationResponse(**result) @router.post("/streams/{stream_id}/stop", response_model=StreamOperationResponse) async def stop_stream(stream_id: str): """停止单路 RTSP 流。""" manager = get_stream_manager() result = await manager.stop_stream(stream_id) if not result["success"]: raise HTTPException(status_code=404, detail=result["message"]) return StreamOperationResponse(**result) @router.put("/streams/{stream_id}/config", response_model=StreamOperationResponse) async def update_stream_config(stream_id: str, req: UpdateStreamConfigRequest): """更新流的检测配置。""" manager = get_stream_manager() result = await manager.update_stream_config( stream_id, model_id=req.model_id, confidence=req.confidence, iou=req.iou, frame_skip=req.frame_skip, ) if not result["success"]: raise HTTPException(status_code=404, detail=result["message"]) return StreamOperationResponse(**result) @router.get("/streams/{stream_id}") async def get_stream_info(stream_id: str): """获取单路流状态信息。""" manager = get_stream_manager() info = manager.get_stream_info(stream_id) if info is None: raise HTTPException(status_code=404, detail=f"流 {stream_id} 不存在") return info @router.get("/streams") async def list_streams(): """获取所有流状态信息。""" manager = get_stream_manager() return {"streams": manager.get_all_streams_info()} @router.get("/health") async def rtsp_health(): """RTSP 管理器健康检查。""" manager = get_stream_manager() return manager.get_health() @router.post("/start-all", response_model=StreamOperationResponse) async def start_all_streams(): """启动所有已添加的流。""" manager = get_stream_manager() await manager.start_all() return StreamOperationResponse(success=True, message="所有流已启动") @router.post("/stop-all", response_model=StreamOperationResponse) async def stop_all_streams(): """停止所有流。""" manager = get_stream_manager() await manager.stop_all() return StreamOperationResponse(success=True, message="所有流已停止") __all__ = ["router", "init_stream_manager", "get_stream_manager"]