"""事件聚合器 (MVP-1 / P5 简化版) 时间窗口去重: 对同一 ``(source_id, event_type, track_id_or_bbox_hash)`` 在配置窗口内只保留一条预警事件,重复命中时更新 ``last_seen`` 与 ``occurrence_count``,避免预警刷屏。 后续 MVP-2 会扩展空间邻近合并、置信度加权融合等能力。 """ from __future__ import annotations import logging import time from collections import OrderedDict from typing import Dict, List, Optional, Tuple from models.event_schemas import AlertEvent, UnifiedDetection logger = logging.getLogger(__name__) _AggKey = Tuple[Optional[str], str, str] class EventAggregator: """基于时间窗口的预警去重 / 聚合器。 Args: dedup_window_seconds: 去重窗口 (秒),同 key 在窗口内不会重复产出 max_active_events: 内存中最大活跃事件数,超过时按 LRU 淘汰 """ def __init__( self, dedup_window_seconds: float = 30.0, max_active_events: int = 1000, ) -> None: self.dedup_window_seconds = max(0.0, dedup_window_seconds) self.max_active_events = max(1, max_active_events) # 按插入顺序保存以便 LRU 淘汰 self._active: "OrderedDict[_AggKey, AlertEvent]" = OrderedDict() # ------------------------------------------------------------------ # 主入口 # ------------------------------------------------------------------ def aggregate(self, alerts: List[AlertEvent]) -> List[AlertEvent]: """聚合一批预警事件,返回去重后真正应当对外发出的事件。""" now = time.time() self._evict_expired(now) emitted: List[AlertEvent] = [] for alert in alerts: key = self._make_key(alert) existing = self._active.get(key) if existing is None: self._active[key] = alert self._active.move_to_end(key) emitted.append(alert) if len(self._active) > self.max_active_events: dropped_key, _ = self._active.popitem(last=False) logger.debug("聚合器 LRU 淘汰事件: %s", dropped_key) else: # 窗口内重复:仅更新统计 existing.last_seen = now existing.occurrence_count += 1 if alert.confidence > existing.confidence: existing.confidence = alert.confidence self._active.move_to_end(key) return emitted # ------------------------------------------------------------------ # 内部 # ------------------------------------------------------------------ @staticmethod def _make_key(alert: AlertEvent) -> _AggKey: if alert.detections: target_id = EventAggregator._target_identity(alert.detections[0]) else: target_id = "no_target" return (alert.source_id, alert.event_type.value, target_id) @staticmethod def _target_identity(det: UnifiedDetection) -> str: """构造目标稳定标识:优先 track_id,否则用 bbox 网格哈希。""" if det.track_id is not None: return f"t{det.track_id}" # 按 50px 栅格归一,避免微小抖动导致漏聚合 cx, cy = det.bbox.center return f"g{int(cx) // 50}_{int(cy) // 50}_{det.class_name}" def _evict_expired(self, now: float) -> None: if self.dedup_window_seconds <= 0: self._active.clear() return # 基于 first_seen 判断窗口过期:保证 "同一目标 30 秒内只产生 1 条事件", # 同时窗口到期后允许再次触发 (而不会因为不断收到同类检测而无限延期)。 expired_keys = [ key for key, alert in self._active.items() if now - alert.first_seen > self.dedup_window_seconds ] for key in expired_keys: self._active.pop(key, None) # ------------------------------------------------------------------ # 自省 # ------------------------------------------------------------------ @property def active_count(self) -> int: return len(self._active) def snapshot(self) -> Dict[str, AlertEvent]: """返回当前活跃事件的快照 (用于调试 / 监控)。""" return {f"{k[0]}|{k[1]}|{k[2]}": v for k, v in self._active.items()} __all__ = ["EventAggregator"]