diff --git a/apps/server/services/event/__init__.py b/apps/server/services/event/__init__.py new file mode 100644 index 0000000..c3f15b4 --- /dev/null +++ b/apps/server/services/event/__init__.py @@ -0,0 +1,14 @@ +"""事件引擎子包 (MVP-1 / P1-P2 / P5)。 + +模块结构:: + + decision_engine.py 决策引擎 (置信度评估 + 事件类型映射) + rule_engine.py 规则引擎 (YAML 驱动) + aggregator.py 事件聚合器 (时间窗口去重) +""" + +from .decision_engine import EventDecisionEngine +from .rule_engine import AlertRuleEngine +from .aggregator import EventAggregator + +__all__ = ["EventDecisionEngine", "AlertRuleEngine", "EventAggregator"] diff --git a/apps/server/services/event/aggregator.py b/apps/server/services/event/aggregator.py new file mode 100644 index 0000000..14830e4 --- /dev/null +++ b/apps/server/services/event/aggregator.py @@ -0,0 +1,123 @@ +"""事件聚合器 (MVP-1 / P5 简化版) + +时间窗口去重: 对同一 ``(source_id, event_type, track_id_or_bbox_hash)`` +在配置窗口内只保留一条预警事件,重复命中时更新 ``last_seen`` +与 ``occurrence_count``,避免预警刷屏。 + +后续 MVP-2 会扩展空间邻近合并、置信度加权融合等能力。 +""" + +from __future__ import annotations + +import logging +import time +from collections import OrderedDict +from typing import Dict, List, Optional, Tuple + +from models.event_schemas import AlertEvent, UnifiedDetection + +logger = logging.getLogger(__name__) + + +_AggKey = Tuple[Optional[str], str, str] + + +class EventAggregator: + """基于时间窗口的预警去重 / 聚合器。 + + Args: + dedup_window_seconds: 去重窗口 (秒),同 key 在窗口内不会重复产出 + max_active_events: 内存中最大活跃事件数,超过时按 LRU 淘汰 + """ + + def __init__( + self, + dedup_window_seconds: float = 30.0, + max_active_events: int = 1000, + ) -> None: + self.dedup_window_seconds = max(0.0, dedup_window_seconds) + self.max_active_events = max(1, max_active_events) + # 按插入顺序保存以便 LRU 淘汰 + self._active: "OrderedDict[_AggKey, AlertEvent]" = OrderedDict() + + # ------------------------------------------------------------------ + # 主入口 + # ------------------------------------------------------------------ + + def aggregate(self, alerts: List[AlertEvent]) -> List[AlertEvent]: + """聚合一批预警事件,返回去重后真正应当对外发出的事件。""" + + now = time.time() + self._evict_expired(now) + + emitted: List[AlertEvent] = [] + for alert in alerts: + key = self._make_key(alert) + existing = self._active.get(key) + if existing is None: + self._active[key] = alert + self._active.move_to_end(key) + emitted.append(alert) + if len(self._active) > self.max_active_events: + dropped_key, _ = self._active.popitem(last=False) + logger.debug("聚合器 LRU 淘汰事件: %s", dropped_key) + else: + # 窗口内重复:仅更新统计 + existing.last_seen = now + existing.occurrence_count += 1 + if alert.confidence > existing.confidence: + existing.confidence = alert.confidence + self._active.move_to_end(key) + return emitted + + # ------------------------------------------------------------------ + # 内部 + # ------------------------------------------------------------------ + + @staticmethod + def _make_key(alert: AlertEvent) -> _AggKey: + if alert.detections: + target_id = EventAggregator._target_identity(alert.detections[0]) + else: + target_id = "no_target" + return (alert.source_id, alert.event_type.value, target_id) + + @staticmethod + def _target_identity(det: UnifiedDetection) -> str: + """构造目标稳定标识:优先 track_id,否则用 bbox 网格哈希。""" + + if det.track_id is not None: + return f"t{det.track_id}" + # 按 50px 栅格归一,避免微小抖动导致漏聚合 + cx, cy = det.bbox.center + return f"g{int(cx) // 50}_{int(cy) // 50}_{det.class_name}" + + def _evict_expired(self, now: float) -> None: + if self.dedup_window_seconds <= 0: + self._active.clear() + return + # 基于 first_seen 判断窗口过期:保证 "同一目标 30 秒内只产生 1 条事件", + # 同时窗口到期后允许再次触发 (而不会因为不断收到同类检测而无限延期)。 + expired_keys = [ + key + for key, alert in self._active.items() + if now - alert.first_seen > self.dedup_window_seconds + ] + for key in expired_keys: + self._active.pop(key, None) + + # ------------------------------------------------------------------ + # 自省 + # ------------------------------------------------------------------ + + @property + def active_count(self) -> int: + return len(self._active) + + def snapshot(self) -> Dict[str, AlertEvent]: + """返回当前活跃事件的快照 (用于调试 / 监控)。""" + + return {f"{k[0]}|{k[1]}|{k[2]}": v for k, v in self._active.items()} + + +__all__ = ["EventAggregator"] diff --git a/apps/server/services/event/decision_engine.py b/apps/server/services/event/decision_engine.py new file mode 100644 index 0000000..4923631 --- /dev/null +++ b/apps/server/services/event/decision_engine.py @@ -0,0 +1,163 @@ +"""事件决策引擎 (MVP-1 / P1) + +职责: + +1. 对统一 ``DetectionResult`` 中的每个检测应用置信度过滤 +2. 将底层 ``class_name`` 映射到统一 ``EventType`` +3. 产出 ``CandidateEvent`` 列表,供规则引擎与聚合器继续处理 + +设计原则 (MVP 简化版): + +- 不做温度缩放 / 校准 (后续 MVP-3 再迭代) +- 不做场景分类 (后续按需引入) +- 类型映射规则可外部覆盖,避免硬编码 +""" + +from __future__ import annotations + +import logging +from typing import Dict, List, Optional + +from models.event_schemas import ( + CandidateEvent, + DetectionResult, + EventType, + SeverityLevel, + UnifiedDetection, +) + +logger = logging.getLogger(__name__) + + +# 默认的 class_name -> EventType 映射 (覆盖当前已有模型) +DEFAULT_CLASS_TO_EVENT: Dict[str, EventType] = { + # 火灾 + "fire": EventType.FIRE, + "flame": EventType.FIRE, + "smoke": EventType.SMOKE, + # 抽烟 + "smoking": EventType.SMOKING, + "cigarette": EventType.SMOKING, + # 打架 + "fight": EventType.FIGHT, + "fighting": EventType.FIGHT, + # 行为 + "loitering": EventType.LOITERING, + "stationary": EventType.STATIONARY, + "intrusion": EventType.INTRUSION, + # 车辆 + "vehicle": EventType.VEHICLE, + "car": EventType.VEHICLE, + "truck": EventType.VEHICLE, + "bus": EventType.VEHICLE, + "illegal_parking": EventType.ILLEGAL_PARKING, + # 人员 + "person": EventType.PERSON, +} + +# 事件类型 -> 默认严重性 (规则引擎可覆盖) +DEFAULT_SEVERITY: Dict[EventType, SeverityLevel] = { + EventType.FIRE: SeverityLevel.CRITICAL, + EventType.SMOKE: SeverityLevel.HIGH, + EventType.SMOKING: SeverityLevel.MEDIUM, + EventType.FIGHT: SeverityLevel.HIGH, + EventType.LOITERING: SeverityLevel.MEDIUM, + EventType.STATIONARY: SeverityLevel.LOW, + EventType.INTRUSION: SeverityLevel.HIGH, + EventType.ILLEGAL_PARKING: SeverityLevel.MEDIUM, + EventType.VEHICLE: SeverityLevel.INFO, + EventType.PERSON: SeverityLevel.INFO, + EventType.UNKNOWN: SeverityLevel.INFO, +} + + +class EventDecisionEngine: + """事件决策引擎 (简化版)。 + + Args: + min_confidence: 全局最低置信度阈值,低于此值的检测会被丢弃 + class_to_event: 自定义类别映射,会与默认映射合并 (覆盖) + ignore_event_types: 不希望产出的事件类型集合 (例如 PERSON 太频繁) + """ + + def __init__( + self, + min_confidence: float = 0.5, + class_to_event: Optional[Dict[str, EventType]] = None, + ignore_event_types: Optional[List[EventType]] = None, + ) -> None: + self.min_confidence = max(0.0, min(1.0, min_confidence)) + self.class_to_event: Dict[str, EventType] = dict(DEFAULT_CLASS_TO_EVENT) + if class_to_event: + self.class_to_event.update(class_to_event) + self.ignore_event_types = set(ignore_event_types or []) + + # ------------------------------------------------------------------ + # 主入口 + # ------------------------------------------------------------------ + + def decide( + self, + result: DetectionResult, + source_id: Optional[str] = None, + ) -> List[CandidateEvent]: + """根据检测结果产出候选事件列表。 + + Args: + result: 统一检测结果 + source_id: 摄像头/视频流标识,用于后续聚合 + """ + + if not result.success or not result.detections: + return [] + + events: List[CandidateEvent] = [] + for det in result.detections: + event = self._build_candidate(det, source_id) + if event is not None: + events.append(event) + + if events: + logger.debug( + "DecisionEngine 产出 %d 条候选事件 (source_id=%s, model=%s)", + len(events), + source_id, + result.model_id, + ) + return events + + # ------------------------------------------------------------------ + # 内部 + # ------------------------------------------------------------------ + + def _build_candidate( + self, + det: UnifiedDetection, + source_id: Optional[str], + ) -> Optional[CandidateEvent]: + if det.confidence < self.min_confidence: + return None + + event_type = self.map_event_type(det.class_name) + if event_type in self.ignore_event_types: + return None + + severity = DEFAULT_SEVERITY.get(event_type, SeverityLevel.INFO) + + return CandidateEvent( + event_type=event_type, + severity=severity, + confidence=det.confidence, + detection=det, + source_id=source_id, + ) + + def map_event_type(self, class_name: str) -> EventType: + """将 class_name 映射为 EventType (大小写不敏感)。""" + + if not class_name: + return EventType.UNKNOWN + return self.class_to_event.get(class_name.lower().strip(), EventType.UNKNOWN) + + +__all__ = ["EventDecisionEngine", "DEFAULT_CLASS_TO_EVENT", "DEFAULT_SEVERITY"] diff --git a/apps/server/services/event/rule_engine.py b/apps/server/services/event/rule_engine.py new file mode 100644 index 0000000..73c24dc --- /dev/null +++ b/apps/server/services/event/rule_engine.py @@ -0,0 +1,206 @@ +"""预警规则引擎 (MVP-1 / P2) + +YAML 驱动的预警规则引擎,对 ``CandidateEvent`` 列表应用规则, +满足条件的事件会被升级为 ``AlertEvent``。 + +规则 YAML 示例 (config/rules/fire.yaml):: + + name: fire_critical + event_type: fire + enabled: true + min_confidence: 0.6 + severity: critical # 覆盖默认严重性 (可选) + description: 检测到火焰,立即触发预警 + +规则条件支持: + +- ``min_confidence``: 置信度阈值 +- ``allowed_sources``: 允许的来源 (source_id 白名单,None 表示不限制) +- ``required_labels``: 检测项 label 必须包含其中之一 +- ``min_bbox_area``: 边界框最小面积 +""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Dict, List, Optional + +import yaml + +from models.event_schemas import ( + AlertEvent, + CandidateEvent, + EventType, + SeverityLevel, +) + +logger = logging.getLogger(__name__) + + +@dataclass +class AlertRule: + """单条预警规则。""" + + name: str + event_type: EventType + enabled: bool = True + min_confidence: float = 0.0 + severity: Optional[SeverityLevel] = None + allowed_sources: Optional[List[str]] = None + required_labels: Optional[List[str]] = None + min_bbox_area: int = 0 + description: str = "" + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "AlertRule": + return cls( + name=str(data["name"]), + event_type=EventType(data["event_type"]), + enabled=bool(data.get("enabled", True)), + min_confidence=float(data.get("min_confidence", 0.0)), + severity=SeverityLevel(data["severity"]) if data.get("severity") else None, + allowed_sources=data.get("allowed_sources"), + required_labels=data.get("required_labels"), + min_bbox_area=int(data.get("min_bbox_area", 0)), + description=str(data.get("description", "")), + ) + + def matches(self, event: CandidateEvent) -> bool: + """判断候选事件是否命中规则。""" + + if not self.enabled: + return False + if event.event_type != self.event_type: + return False + if event.confidence < self.min_confidence: + return False + if self.allowed_sources and event.source_id not in self.allowed_sources: + return False + if self.required_labels: + labels = {event.detection.class_name, event.detection.label} + if not any(lbl in labels for lbl in self.required_labels): + return False + if self.min_bbox_area > 0 and event.detection.bbox.area < self.min_bbox_area: + return False + return True + + +@dataclass +class _RuleStats: + loaded: int = 0 + enabled: int = 0 + files: List[str] = field(default_factory=list) + + +class AlertRuleEngine: + """预警规则引擎。 + + 支持从单个 YAML 文件或目录批量加载规则。 + """ + + def __init__(self, rules: Optional[List[AlertRule]] = None) -> None: + self.rules: List[AlertRule] = list(rules or []) + self._stats = _RuleStats( + loaded=len(self.rules), + enabled=sum(1 for r in self.rules if r.enabled), + ) + + # ------------------------------------------------------------------ + # 加载 + # ------------------------------------------------------------------ + + @classmethod + def from_directory(cls, rules_dir: str | Path) -> "AlertRuleEngine": + """从目录加载所有 ``*.yaml`` / ``*.yml`` 规则文件。""" + + engine = cls() + engine.load_directory(rules_dir) + return engine + + def load_directory(self, rules_dir: str | Path) -> int: + path = Path(rules_dir) + if not path.exists(): + logger.warning("规则目录不存在: %s", path) + return 0 + + count = 0 + for file in sorted(path.glob("*.y*ml")): + try: + count += self.load_file(file) + except Exception as exc: # noqa: BLE001 + logger.error("加载规则文件失败 %s: %s", file, exc) + logger.info("规则引擎加载完成: 共 %d 条规则 (来自 %s)", count, path) + return count + + def load_file(self, rule_file: str | Path) -> int: + path = Path(rule_file) + with path.open("r", encoding="utf-8") as fp: + data = yaml.safe_load(fp) + + if data is None: + return 0 + + # 单条规则 dict 或 列表 或 {"rules": [...]} 三种格式都支持 + if isinstance(data, dict) and "rules" in data: + rule_items = data["rules"] + elif isinstance(data, list): + rule_items = data + else: + rule_items = [data] + + added = 0 + for item in rule_items: + rule = AlertRule.from_dict(item) + self.rules.append(rule) + self._stats.loaded += 1 + if rule.enabled: + self._stats.enabled += 1 + added += 1 + self._stats.files.append(str(path)) + return added + + # ------------------------------------------------------------------ + # 评估 + # ------------------------------------------------------------------ + + def evaluate(self, events: List[CandidateEvent]) -> List[AlertEvent]: + """对一批候选事件执行规则评估,返回触发的预警事件。""" + + alerts: List[AlertEvent] = [] + for event in events: + for rule in self.rules: + if rule.matches(event): + alerts.append(self._build_alert(event, rule)) + event.triggered_rules.append(rule.name) + break # 命中一条即可,避免重复 + return alerts + + @staticmethod + def _build_alert(event: CandidateEvent, rule: AlertRule) -> AlertEvent: + severity = rule.severity or event.severity + return AlertEvent( + event_type=event.event_type, + severity=severity, + confidence=event.confidence, + source_id=event.source_id, + detections=[event.detection], + rule_name=rule.name, + metadata={"description": rule.description}, + ) + + # ------------------------------------------------------------------ + # 自省 + # ------------------------------------------------------------------ + + @property + def stats(self) -> Dict[str, Any]: + return { + "loaded": self._stats.loaded, + "enabled": self._stats.enabled, + "files": list(self._stats.files), + } + + +__all__ = ["AlertRule", "AlertRuleEngine"]