feat:新增统一事件数据契约,为所有检测服务统一输出格式

This commit is contained in:
2026-06-11 17:22:32 +08:00
parent cb2a7dcca3
commit 05d4a5edf6

View File

@@ -0,0 +1,226 @@
"""统一事件数据契约 (MVP-1 / P0)
为后续事件决策引擎、规则引擎、聚合器、LLM 复审等模块提供一致的数据模型。
所有检测服务 (YOLO / PaddleDetection / ActionDetection 等) 的输出,
都应通过 ``services/adapters/detection_adapter.py`` 适配为本文件定义的
``UnifiedDetection`` / ``DetectionResult``。
字段命名遵循 Google Python Style Guide所有字段使用 snake_case。
"""
from __future__ import annotations
import time
import uuid
from enum import Enum
from typing import Any, Dict, List, Optional, Tuple
from pydantic import BaseModel, ConfigDict, Field, field_validator
# ---------------------------------------------------------------------------
# 枚举
# ---------------------------------------------------------------------------
class EventType(str, Enum):
"""统一事件类型枚举。
与各服务底层 class_name 解耦,事件引擎只面向本枚举决策。
"""
FIRE = "fire"
SMOKE = "smoke"
SMOKING = "smoking"
FIGHT = "fight"
LOITERING = "loitering"
STATIONARY = "stationary"
INTRUSION = "intrusion"
ILLEGAL_PARKING = "illegal_parking"
VEHICLE = "vehicle"
PERSON = "person"
UNKNOWN = "unknown"
class SeverityLevel(str, Enum):
"""事件严重性级别 (供规则引擎 / 聚合器使用)。"""
INFO = "info"
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class DetectionSource(str, Enum):
"""检测来源 (用于适配器溯源 & 调度策略)。"""
YOLO = "yolo"
PADDLE = "paddle"
ACTION_DOCKER = "action_docker"
BEHAVIOR = "behavior" # 由徘徊/静止等行为分析模块产出
COMPOSITE = "composite" # 复合检测 (例如火灾双模型)
# ---------------------------------------------------------------------------
# 基础结构
# ---------------------------------------------------------------------------
class BBox(BaseModel):
"""边界框 (xyxy)。"""
model_config = ConfigDict(frozen=True)
x1: int
y1: int
x2: int
y2: int
@field_validator("x2")
@classmethod
def _check_x2(cls, v: int, info) -> int:
x1 = info.data.get("x1", 0)
if v < x1:
raise ValueError(f"bbox.x2({v}) 必须 >= x1({x1})")
return v
@field_validator("y2")
@classmethod
def _check_y2(cls, v: int, info) -> int:
y1 = info.data.get("y1", 0)
if v < y1:
raise ValueError(f"bbox.y2({v}) 必须 >= y1({y1})")
return v
@property
def width(self) -> int:
return self.x2 - self.x1
@property
def height(self) -> int:
return self.y2 - self.y1
@property
def area(self) -> int:
return self.width * self.height
@property
def center(self) -> Tuple[float, float]:
return (self.x1 + self.x2) / 2.0, (self.y1 + self.y2) / 2.0
def to_list(self) -> List[int]:
return [self.x1, self.y1, self.x2, self.y2]
# ---------------------------------------------------------------------------
# 统一检测结果
# ---------------------------------------------------------------------------
class UnifiedDetection(BaseModel):
"""统一检测对象 (单个目标)。
适配器将各引擎原始输出转换为本结构,作为事件决策引擎的唯一输入。
"""
detection_id: str = Field(default_factory=lambda: uuid.uuid4().hex)
track_id: Optional[int] = None
class_name: str
label: str
confidence: float = Field(ge=0.0, le=1.0)
bbox: BBox
source: DetectionSource = DetectionSource.YOLO
model_id: Optional[str] = None
timestamp: float = Field(default_factory=time.time)
extra: Dict[str, Any] = Field(default_factory=dict)
class DetectionStats(BaseModel):
"""检测统计信息 (与现有 API 字段保持兼容)。"""
total_detections: int = 0
avg_confidence: float = 0.0
processing_time: float = 0.0
model_used: Optional[str] = None
fps: Optional[float] = None
class DetectionResult(BaseModel):
"""统一检测结果 (单帧 / 单图)。"""
success: bool = True
message: str = ""
source: DetectionSource = DetectionSource.YOLO
model_id: Optional[str] = None
frame_id: Optional[str] = Field(default_factory=lambda: uuid.uuid4().hex)
timestamp: float = Field(default_factory=time.time)
detections: List[UnifiedDetection] = Field(default_factory=list)
stats: DetectionStats = Field(default_factory=DetectionStats)
def to_legacy_dict(self) -> Dict[str, Any]:
"""转换为旧接口字段格式 (供 API 层向后兼容输出)。"""
return {
"success": self.success,
"message": self.message,
"detections": [
{
"class": d.class_name,
"label": d.label,
"confidence": round(d.confidence, 3),
"bbox": d.bbox.to_list(),
**({"track_id": d.track_id} if d.track_id is not None else {}),
}
for d in self.detections
],
"stats": self.stats.model_dump(exclude_none=True),
}
# ---------------------------------------------------------------------------
# 候选事件 (决策引擎产出)
# ---------------------------------------------------------------------------
class CandidateEvent(BaseModel):
"""候选事件 (供规则引擎与聚合器进一步处理)。"""
event_id: str = Field(default_factory=lambda: uuid.uuid4().hex)
event_type: EventType
severity: SeverityLevel = SeverityLevel.INFO
confidence: float = Field(ge=0.0, le=1.0)
detection: UnifiedDetection
source_id: Optional[str] = None # 摄像头/视频流标识
timestamp: float = Field(default_factory=time.time)
triggered_rules: List[str] = Field(default_factory=list)
metadata: Dict[str, Any] = Field(default_factory=dict)
class AlertEvent(BaseModel):
"""最终预警事件 (聚合 + 规则筛选后产出)。"""
alert_id: str = Field(default_factory=lambda: uuid.uuid4().hex)
event_type: EventType
severity: SeverityLevel
confidence: float = Field(ge=0.0, le=1.0)
source_id: Optional[str] = None
detections: List[UnifiedDetection] = Field(default_factory=list)
rule_name: Optional[str] = None
first_seen: float = Field(default_factory=time.time)
last_seen: float = Field(default_factory=time.time)
occurrence_count: int = 1
metadata: Dict[str, Any] = Field(default_factory=dict)
__all__ = [
"EventType",
"SeverityLevel",
"DetectionSource",
"BBox",
"UnifiedDetection",
"DetectionStats",
"DetectionResult",
"CandidateEvent",
"AlertEvent",
]