From 05d4a5edf6ada276a703e0dc53200604f0c6c74b Mon Sep 17 00:00:00 2001 From: wuzhuorong <973204353@qq.com> Date: Thu, 11 Jun 2026 17:22:32 +0800 Subject: [PATCH] =?UTF-8?q?feat:=E6=96=B0=E5=A2=9E=E7=BB=9F=E4=B8=80?= =?UTF-8?q?=E4=BA=8B=E4=BB=B6=E6=95=B0=E6=8D=AE=E5=A5=91=E7=BA=A6,?= =?UTF-8?q?=E4=B8=BA=E6=89=80=E6=9C=89=E6=A3=80=E6=B5=8B=E6=9C=8D=E5=8A=A1?= =?UTF-8?q?=E7=BB=9F=E4=B8=80=E8=BE=93=E5=87=BA=E6=A0=BC=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/server/models/event_schemas.py | 226 ++++++++++++++++++++++++++++ 1 file changed, 226 insertions(+) create mode 100644 apps/server/models/event_schemas.py diff --git a/apps/server/models/event_schemas.py b/apps/server/models/event_schemas.py new file mode 100644 index 0000000..f04188e --- /dev/null +++ b/apps/server/models/event_schemas.py @@ -0,0 +1,226 @@ +"""统一事件数据契约 (MVP-1 / P0) + +为后续事件决策引擎、规则引擎、聚合器、LLM 复审等模块提供一致的数据模型。 +所有检测服务 (YOLO / PaddleDetection / ActionDetection 等) 的输出, +都应通过 ``services/adapters/detection_adapter.py`` 适配为本文件定义的 +``UnifiedDetection`` / ``DetectionResult``。 + +字段命名遵循 Google Python Style Guide,所有字段使用 snake_case。 +""" + +from __future__ import annotations + +import time +import uuid +from enum import Enum +from typing import Any, Dict, List, Optional, Tuple + +from pydantic import BaseModel, ConfigDict, Field, field_validator + + +# --------------------------------------------------------------------------- +# 枚举 +# --------------------------------------------------------------------------- + + +class EventType(str, Enum): + """统一事件类型枚举。 + + 与各服务底层 class_name 解耦,事件引擎只面向本枚举决策。 + """ + + FIRE = "fire" + SMOKE = "smoke" + SMOKING = "smoking" + FIGHT = "fight" + LOITERING = "loitering" + STATIONARY = "stationary" + INTRUSION = "intrusion" + ILLEGAL_PARKING = "illegal_parking" + VEHICLE = "vehicle" + PERSON = "person" + UNKNOWN = "unknown" + + +class SeverityLevel(str, Enum): + """事件严重性级别 (供规则引擎 / 聚合器使用)。""" + + INFO = "info" + LOW = "low" + MEDIUM = "medium" + HIGH = "high" + CRITICAL = "critical" + + +class DetectionSource(str, Enum): + """检测来源 (用于适配器溯源 & 调度策略)。""" + + YOLO = "yolo" + PADDLE = "paddle" + ACTION_DOCKER = "action_docker" + BEHAVIOR = "behavior" # 由徘徊/静止等行为分析模块产出 + COMPOSITE = "composite" # 复合检测 (例如火灾双模型) + + +# --------------------------------------------------------------------------- +# 基础结构 +# --------------------------------------------------------------------------- + + +class BBox(BaseModel): + """边界框 (xyxy)。""" + + model_config = ConfigDict(frozen=True) + + x1: int + y1: int + x2: int + y2: int + + @field_validator("x2") + @classmethod + def _check_x2(cls, v: int, info) -> int: + x1 = info.data.get("x1", 0) + if v < x1: + raise ValueError(f"bbox.x2({v}) 必须 >= x1({x1})") + return v + + @field_validator("y2") + @classmethod + def _check_y2(cls, v: int, info) -> int: + y1 = info.data.get("y1", 0) + if v < y1: + raise ValueError(f"bbox.y2({v}) 必须 >= y1({y1})") + return v + + @property + def width(self) -> int: + return self.x2 - self.x1 + + @property + def height(self) -> int: + return self.y2 - self.y1 + + @property + def area(self) -> int: + return self.width * self.height + + @property + def center(self) -> Tuple[float, float]: + return (self.x1 + self.x2) / 2.0, (self.y1 + self.y2) / 2.0 + + def to_list(self) -> List[int]: + return [self.x1, self.y1, self.x2, self.y2] + + +# --------------------------------------------------------------------------- +# 统一检测结果 +# --------------------------------------------------------------------------- + + +class UnifiedDetection(BaseModel): + """统一检测对象 (单个目标)。 + + 适配器将各引擎原始输出转换为本结构,作为事件决策引擎的唯一输入。 + """ + + detection_id: str = Field(default_factory=lambda: uuid.uuid4().hex) + track_id: Optional[int] = None + class_name: str + label: str + confidence: float = Field(ge=0.0, le=1.0) + bbox: BBox + source: DetectionSource = DetectionSource.YOLO + model_id: Optional[str] = None + timestamp: float = Field(default_factory=time.time) + extra: Dict[str, Any] = Field(default_factory=dict) + + +class DetectionStats(BaseModel): + """检测统计信息 (与现有 API 字段保持兼容)。""" + + total_detections: int = 0 + avg_confidence: float = 0.0 + processing_time: float = 0.0 + model_used: Optional[str] = None + fps: Optional[float] = None + + +class DetectionResult(BaseModel): + """统一检测结果 (单帧 / 单图)。""" + + success: bool = True + message: str = "" + source: DetectionSource = DetectionSource.YOLO + model_id: Optional[str] = None + frame_id: Optional[str] = Field(default_factory=lambda: uuid.uuid4().hex) + timestamp: float = Field(default_factory=time.time) + detections: List[UnifiedDetection] = Field(default_factory=list) + stats: DetectionStats = Field(default_factory=DetectionStats) + + def to_legacy_dict(self) -> Dict[str, Any]: + """转换为旧接口字段格式 (供 API 层向后兼容输出)。""" + + return { + "success": self.success, + "message": self.message, + "detections": [ + { + "class": d.class_name, + "label": d.label, + "confidence": round(d.confidence, 3), + "bbox": d.bbox.to_list(), + **({"track_id": d.track_id} if d.track_id is not None else {}), + } + for d in self.detections + ], + "stats": self.stats.model_dump(exclude_none=True), + } + + +# --------------------------------------------------------------------------- +# 候选事件 (决策引擎产出) +# --------------------------------------------------------------------------- + + +class CandidateEvent(BaseModel): + """候选事件 (供规则引擎与聚合器进一步处理)。""" + + event_id: str = Field(default_factory=lambda: uuid.uuid4().hex) + event_type: EventType + severity: SeverityLevel = SeverityLevel.INFO + confidence: float = Field(ge=0.0, le=1.0) + detection: UnifiedDetection + source_id: Optional[str] = None # 摄像头/视频流标识 + timestamp: float = Field(default_factory=time.time) + triggered_rules: List[str] = Field(default_factory=list) + metadata: Dict[str, Any] = Field(default_factory=dict) + + +class AlertEvent(BaseModel): + """最终预警事件 (聚合 + 规则筛选后产出)。""" + + alert_id: str = Field(default_factory=lambda: uuid.uuid4().hex) + event_type: EventType + severity: SeverityLevel + confidence: float = Field(ge=0.0, le=1.0) + source_id: Optional[str] = None + detections: List[UnifiedDetection] = Field(default_factory=list) + rule_name: Optional[str] = None + first_seen: float = Field(default_factory=time.time) + last_seen: float = Field(default_factory=time.time) + occurrence_count: int = 1 + metadata: Dict[str, Any] = Field(default_factory=dict) + + +__all__ = [ + "EventType", + "SeverityLevel", + "DetectionSource", + "BBox", + "UnifiedDetection", + "DetectionStats", + "DetectionResult", + "CandidateEvent", + "AlertEvent", +]