From 18cfc9b16a6f5f4c39b22ec1f0443e2b675917c8 Mon Sep 17 00:00:00 2001 From: wuzhuorong <973204353@qq.com> Date: Fri, 12 Jun 2026 15:00:24 +0800 Subject: [PATCH] =?UTF-8?q?feat(server):=E8=A1=A5=E5=85=85RESTful=20?= =?UTF-8?q?=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/server/api/rtsp.py | 200 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 200 insertions(+) create mode 100644 apps/server/api/rtsp.py diff --git a/apps/server/api/rtsp.py b/apps/server/api/rtsp.py new file mode 100644 index 0000000..c5bd5c2 --- /dev/null +++ b/apps/server/api/rtsp.py @@ -0,0 +1,200 @@ +"""RTSP 流管理 API (MVP-2 / D13-D14) + +提供 RTSP 流的增删改查、启停控制、状态监控等 RESTful 接口。 +""" + +from __future__ import annotations + +import logging +from typing import Any, Dict, List, Optional + +from fastapi import APIRouter, HTTPException +from pydantic import BaseModel, Field + +from core.settings import get_settings +from services.stream_manager import StreamManager + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/rtsp", tags=["RTSP 流管理"]) + +# 全局 StreamManager 实例 (由 main.py 初始化) +_stream_manager: Optional[StreamManager] = None + + +def init_stream_manager(model_service: Any = None) -> StreamManager: + """初始化全局 StreamManager 实例。""" + + global _stream_manager + settings = get_settings() + _stream_manager = StreamManager( + model_service=model_service, + buffer_capacity=settings.rtsp.buffer_capacity, + max_streams=settings.rtsp.max_streams, + detect_interval=settings.rtsp.detect_interval, + ) + return _stream_manager + + +def get_stream_manager() -> StreamManager: + """获取全局 StreamManager 实例。""" + + if _stream_manager is None: + raise RuntimeError("StreamManager 未初始化,请先调用 init_stream_manager()") + return _stream_manager + + +# --------------------------------------------------------------------------- +# 请求/响应模型 +# --------------------------------------------------------------------------- + + +class AddStreamRequest(BaseModel): + """添加 RTSP 流请求。""" + + stream_id: str = Field(..., min_length=1, max_length=64, description="流标识") + rtsp_url: str = Field(..., description="RTSP 流地址") + model_id: str = Field(default="fire_detection", description="检测模型ID") + confidence: float = Field(default=0.5, ge=0.0, le=1.0, description="置信度阈值") + iou: float = Field(default=0.45, ge=0.0, le=1.0, description="IOU阈值") + frame_skip: int = Field(default=0, ge=0, description="帧采样间隔") + + +class UpdateStreamConfigRequest(BaseModel): + """更新流配置请求。""" + + model_id: Optional[str] = None + confidence: Optional[float] = Field(default=None, ge=0.0, le=1.0) + iou: Optional[float] = Field(default=None, ge=0.0, le=1.0) + frame_skip: Optional[int] = Field(default=None, ge=0) + + +class StreamOperationResponse(BaseModel): + """通用操作响应。""" + + success: bool + message: str + + +# --------------------------------------------------------------------------- +# API 路由 +# --------------------------------------------------------------------------- + + +@router.post("/streams", response_model=StreamOperationResponse) +async def add_stream(req: AddStreamRequest): + """添加一路 RTSP 流。""" + + from services.rtsp_service import StreamConfig + + config = StreamConfig( + stream_id=req.stream_id, + rtsp_url=req.rtsp_url, + model_id=req.model_id, + confidence=req.confidence, + iou=req.iou, + frame_skip=req.frame_skip, + ) + manager = get_stream_manager() + result = await manager.add_stream(req.stream_id, req.rtsp_url, config=config) + if not result["success"]: + raise HTTPException(status_code=400, detail=result["message"]) + return StreamOperationResponse(**result) + + +@router.delete("/streams/{stream_id}", response_model=StreamOperationResponse) +async def remove_stream(stream_id: str): + """移除一路 RTSP 流。""" + + manager = get_stream_manager() + result = await manager.remove_stream(stream_id) + if not result["success"]: + raise HTTPException(status_code=404, detail=result["message"]) + return StreamOperationResponse(**result) + + +@router.post("/streams/{stream_id}/start", response_model=StreamOperationResponse) +async def start_stream(stream_id: str): + """启动一路 RTSP 流。""" + + manager = get_stream_manager() + result = await manager.start_stream(stream_id) + if not result["success"]: + raise HTTPException(status_code=400, detail=result["message"]) + return StreamOperationResponse(**result) + + +@router.post("/streams/{stream_id}/stop", response_model=StreamOperationResponse) +async def stop_stream(stream_id: str): + """停止单路 RTSP 流。""" + + manager = get_stream_manager() + result = await manager.stop_stream(stream_id) + if not result["success"]: + raise HTTPException(status_code=404, detail=result["message"]) + return StreamOperationResponse(**result) + + +@router.put("/streams/{stream_id}/config", response_model=StreamOperationResponse) +async def update_stream_config(stream_id: str, req: UpdateStreamConfigRequest): + """更新流的检测配置。""" + + manager = get_stream_manager() + result = await manager.update_stream_config( + stream_id, + model_id=req.model_id, + confidence=req.confidence, + iou=req.iou, + frame_skip=req.frame_skip, + ) + if not result["success"]: + raise HTTPException(status_code=404, detail=result["message"]) + return StreamOperationResponse(**result) + + +@router.get("/streams/{stream_id}") +async def get_stream_info(stream_id: str): + """获取单路流状态信息。""" + + manager = get_stream_manager() + info = manager.get_stream_info(stream_id) + if info is None: + raise HTTPException(status_code=404, detail=f"流 {stream_id} 不存在") + return info + + +@router.get("/streams") +async def list_streams(): + """获取所有流状态信息。""" + + manager = get_stream_manager() + return {"streams": manager.get_all_streams_info()} + + +@router.get("/health") +async def rtsp_health(): + """RTSP 管理器健康检查。""" + + manager = get_stream_manager() + return manager.get_health() + + +@router.post("/start-all", response_model=StreamOperationResponse) +async def start_all_streams(): + """启动所有已添加的流。""" + + manager = get_stream_manager() + await manager.start_all() + return StreamOperationResponse(success=True, message="所有流已启动") + + +@router.post("/stop-all", response_model=StreamOperationResponse) +async def stop_all_streams(): + """停止所有流。""" + + manager = get_stream_manager() + await manager.stop_all() + return StreamOperationResponse(success=True, message="所有流已停止") + + +__all__ = ["router", "init_stream_manager", "get_stream_manager"]