feat:新增事件决策/规则/聚合三段管道引擎
This commit is contained in:
123
apps/server/services/event/aggregator.py
Normal file
123
apps/server/services/event/aggregator.py
Normal file
@@ -0,0 +1,123 @@
|
||||
"""事件聚合器 (MVP-1 / P5 简化版)
|
||||
|
||||
时间窗口去重: 对同一 ``(source_id, event_type, track_id_or_bbox_hash)``
|
||||
在配置窗口内只保留一条预警事件,重复命中时更新 ``last_seen``
|
||||
与 ``occurrence_count``,避免预警刷屏。
|
||||
|
||||
后续 MVP-2 会扩展空间邻近合并、置信度加权融合等能力。
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import time
|
||||
from collections import OrderedDict
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
|
||||
from models.event_schemas import AlertEvent, UnifiedDetection
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
_AggKey = Tuple[Optional[str], str, str]
|
||||
|
||||
|
||||
class EventAggregator:
|
||||
"""基于时间窗口的预警去重 / 聚合器。
|
||||
|
||||
Args:
|
||||
dedup_window_seconds: 去重窗口 (秒),同 key 在窗口内不会重复产出
|
||||
max_active_events: 内存中最大活跃事件数,超过时按 LRU 淘汰
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
dedup_window_seconds: float = 30.0,
|
||||
max_active_events: int = 1000,
|
||||
) -> None:
|
||||
self.dedup_window_seconds = max(0.0, dedup_window_seconds)
|
||||
self.max_active_events = max(1, max_active_events)
|
||||
# 按插入顺序保存以便 LRU 淘汰
|
||||
self._active: "OrderedDict[_AggKey, AlertEvent]" = OrderedDict()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# 主入口
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def aggregate(self, alerts: List[AlertEvent]) -> List[AlertEvent]:
|
||||
"""聚合一批预警事件,返回去重后真正应当对外发出的事件。"""
|
||||
|
||||
now = time.time()
|
||||
self._evict_expired(now)
|
||||
|
||||
emitted: List[AlertEvent] = []
|
||||
for alert in alerts:
|
||||
key = self._make_key(alert)
|
||||
existing = self._active.get(key)
|
||||
if existing is None:
|
||||
self._active[key] = alert
|
||||
self._active.move_to_end(key)
|
||||
emitted.append(alert)
|
||||
if len(self._active) > self.max_active_events:
|
||||
dropped_key, _ = self._active.popitem(last=False)
|
||||
logger.debug("聚合器 LRU 淘汰事件: %s", dropped_key)
|
||||
else:
|
||||
# 窗口内重复:仅更新统计
|
||||
existing.last_seen = now
|
||||
existing.occurrence_count += 1
|
||||
if alert.confidence > existing.confidence:
|
||||
existing.confidence = alert.confidence
|
||||
self._active.move_to_end(key)
|
||||
return emitted
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# 内部
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
@staticmethod
|
||||
def _make_key(alert: AlertEvent) -> _AggKey:
|
||||
if alert.detections:
|
||||
target_id = EventAggregator._target_identity(alert.detections[0])
|
||||
else:
|
||||
target_id = "no_target"
|
||||
return (alert.source_id, alert.event_type.value, target_id)
|
||||
|
||||
@staticmethod
|
||||
def _target_identity(det: UnifiedDetection) -> str:
|
||||
"""构造目标稳定标识:优先 track_id,否则用 bbox 网格哈希。"""
|
||||
|
||||
if det.track_id is not None:
|
||||
return f"t{det.track_id}"
|
||||
# 按 50px 栅格归一,避免微小抖动导致漏聚合
|
||||
cx, cy = det.bbox.center
|
||||
return f"g{int(cx) // 50}_{int(cy) // 50}_{det.class_name}"
|
||||
|
||||
def _evict_expired(self, now: float) -> None:
|
||||
if self.dedup_window_seconds <= 0:
|
||||
self._active.clear()
|
||||
return
|
||||
# 基于 first_seen 判断窗口过期:保证 "同一目标 30 秒内只产生 1 条事件",
|
||||
# 同时窗口到期后允许再次触发 (而不会因为不断收到同类检测而无限延期)。
|
||||
expired_keys = [
|
||||
key
|
||||
for key, alert in self._active.items()
|
||||
if now - alert.first_seen > self.dedup_window_seconds
|
||||
]
|
||||
for key in expired_keys:
|
||||
self._active.pop(key, None)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# 自省
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
@property
|
||||
def active_count(self) -> int:
|
||||
return len(self._active)
|
||||
|
||||
def snapshot(self) -> Dict[str, AlertEvent]:
|
||||
"""返回当前活跃事件的快照 (用于调试 / 监控)。"""
|
||||
|
||||
return {f"{k[0]}|{k[1]}|{k[2]}": v for k, v in self._active.items()}
|
||||
|
||||
|
||||
__all__ = ["EventAggregator"]
|
||||
Reference in New Issue
Block a user