Files
jc-video-recognize/docs/event-judgment-algorithm-architecture.md

74 KiB
Raw Permalink Blame History

事件判断算法架构设计文档

文档信息

  • 项目名称: jc-video-recognize 视频模型检测平台
  • 模块名称: 事件判断算法引擎 (Event Judgment Algorithm Engine)
  • 制定日期: 2026-06-02
  • 文档版本: v1.0

目录

  1. 架构概述
  2. 核心模块设计
  3. 数据流设计
  4. 算法详细设计
  5. 配置管理
  6. 接口定义
  7. 性能考虑

1. 架构概述

1.1 设计目标

事件判断算法引擎的核心目标是:

  • 准确性: 减少误报,确保真实事件被捕获
  • 实时性: 低延迟的事件判断和预警
  • 可配置性: 支持灵活的规则配置和场景适配
  • 可扩展性: 易于添加新的事件类型和判断逻辑

1.2 整体架构图

┌─────────────────────────────────────────────────────────────────────────────┐
│                           事件判断算法引擎架构                                │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  ┌─────────────────────────────────────────────────────────────────────┐   │
│  │                        输入层 (Input Layer)                          │   │
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐ │   │
│  │  │ YOLO检测    │  │ 行为检测    │  │ RTSP流      │  │ 外部系统    │ │   │
│  │  │ 结果        │  │ 结果        │  │ 帧数据      │  │ 输入        │ │   │
│  │  └──────┬──────┘  └──────┬──────┘  └──────┬──────┘  └──────┬──────┘ │   │
│  │         └─────────────────┴─────────────────┴─────────────────┘      │   │
│  └────────────────────────────────────┬────────────────────────────────┘   │
│                                       │                                      │
│                                       ▼                                      │
│  ┌─────────────────────────────────────────────────────────────────────┐   │
│  │                      决策层 (Decision Layer)                         │   │
│  │                                                                      │   │
│  │   ┌─────────────────┐      ┌─────────────────┐      ┌─────────────┐ │   │
│  │   │  事件决策引擎    │─────▶│  预警规则引擎    │─────▶│ 事件聚合器  │ │   │
│  │   │                 │      │                 │      │             │ │   │
│  │   │ • 置信度评估    │      │ • 规则匹配      │      │ • 去重合并  │ │   │
│  │   │ • 场景识别      │      │ • 时间窗口      │      │ • 时间窗口  │ │   │
│  │   │ • 初步筛选      │      │ • 区域规则      │      │ • 事件关联  │ │   │
│  │   └─────────────────┘      └─────────────────┘      └──────┬──────┘ │   │
│  │                                                            │        │   │
│  └────────────────────────────────────────────────────────────┼────────┘   │
│                                                               │              │
│                                                               ▼              │
│  ┌─────────────────────────────────────────────────────────────────────┐   │
│  │                      AI增强层 (AI Enhancement)                       │   │
│  │                                                                      │   │
│  │   ┌─────────────────┐      ┌─────────────────┐      ┌─────────────┐ │   │
│  │   │  大模型触发器    │─────▶│  LLM视觉分析    │─────▶│ 结果融合    │ │   │
│  │   │                 │      │                 │      │             │ │   │
│  │   │ • 智能决策      │      │ • 图像理解      │      │ • 置信融合  │ │   │
│  │   │ • 优先级排序    │      │ • 场景分析      │      │ • 决策输出  │ │   │
│  │   │ • 并发控制      │      │ • 推理验证      │      │             │ │   │
│  │   └─────────────────┘      └─────────────────┘      └──────┬──────┘ │   │
│  │                                                            │        │   │
│  └────────────────────────────────────────────────────────────┼────────┘   │
│                                                               │              │
│                                                               ▼              │
│  ┌─────────────────────────────────────────────────────────────────────┐   │
│  │                      输出层 (Output Layer)                           │   │
│  │                                                                      │   │
│  │   ┌─────────────────┐      ┌─────────────────┐      ┌─────────────┐ │   │
│  │   │  严重性评估器    │─────▶│  事件格式化     │─────▶│ MQTT发布    │ │   │
│  │   │                 │      │                 │      │             │ │   │
│  │   │ • 风险等级      │      │ • 标准格式      │      │ • 消息发布  │ │   │
│  │   │ • 优先级计算    │      │ • 元数据填充    │      │ • QoS管理   │ │   │
│  │   │ • 响应建议      │      │ • 快照关联      │      │             │ │   │
│  │   └─────────────────┘      └─────────────────┘      └─────────────┘ │   │
│  │                                                                      │   │
│  └─────────────────────────────────────────────────────────────────────┘   │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

1.3 模块职责划分

模块名称 核心职责 输入 输出
事件决策引擎 初步判断检测结果是否构成事件 检测结果 候选事件列表
预警规则引擎 根据配置规则过滤事件 候选事件 符合规则的事件
事件聚合器 合并重复事件,去重 规则事件 聚合后的事件
大模型触发器 决策是否需要LLM二次判断 聚合事件 LLM任务队列
LLM视觉分析 大模型图像理解和验证 图像ROI 分析结果
结果融合 融合YOLO和LLM结果 多源结果 最终决策
严重性评估器 评估事件严重等级 最终事件 分级事件

2. 核心模块设计

2.1 事件决策引擎 (EventDecisionEngine)

2.1.1 功能说明

事件决策引擎是事件判断的第一道关卡,负责:

  • 评估检测结果的置信度
  • 识别检测场景(室内/室外/禁区等)
  • 进行初步的事件筛选

2.1.2 类设计

class EventDecisionEngine:
    """
    事件决策引擎
    
    职责:
    1. 评估单个检测结果是否构成候选事件
    2. 根据场景和置信度进行初步筛选
    3. 为后续规则引擎提供标准化的事件描述
    """
    
    def __init__(self, config: DecisionEngineConfig):
        self.config = config
        self.scene_classifier = SceneClassifier()
        self.confidence_calibrator = ConfidenceCalibrator()
    
    def evaluate(
        self, 
        detection_result: DetectionResult,
        context: DetectionContext
    ) -> List[CandidateEvent]:
        """
        评估检测结果
        
        Args:
            detection_result: YOLO/行为检测的结果
            context: 检测上下文(时间、位置、摄像头信息等)
        
        Returns:
            候选事件列表(可能为空)
        """
        pass
    
    def _assess_confidence(
        self, 
        detection: Detection,
        model_type: str
    ) -> ConfidenceAssessment:
        """评估检测置信度的可靠性"""
        pass
    
    def _classify_scene(
        self, 
        frame: np.ndarray,
        camera_config: CameraConfig
    ) -> SceneType:
        """识别检测场景类型"""
        pass
    
    def _create_candidate_event(
        self,
        detection: Detection,
        scene: SceneType,
        confidence: ConfidenceAssessment
    ) -> CandidateEvent:
        """创建候选事件"""
        pass

2.1.3 决策流程

┌─────────────┐     ┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│  输入检测   │────▶│ 置信度评估  │────▶│ 场景识别    │────▶│ 事件创建    │
│  结果       │     │             │     │             │     │             │
└─────────────┘     └──────┬──────┘     └──────┬──────┘     └──────┬──────┘
                           │                   │                   │
                           ▼                   ▼                   ▼
                    ┌─────────────┐     ┌─────────────┐     ┌─────────────┐
                    │ • 原始置信度 │     │ • 室内/室外  │     │ • 事件ID    │
                    │ • 模型类型  │     │ • 区域类型   │     │ • 事件类型  │
                    │ • 历史校准  │     │ • 时间特征   │     │ • 置信度    │
                    │ • 可信度    │     │ • 光照条件   │     │ • 场景信息  │
                    └─────────────┘     └─────────────┘     └─────────────┘

2.1.4 置信度评估策略

置信度区间 可信度等级 处理策略
0.0 - 0.5 直接丢弃
0.5 - 0.7 标记为待验证优先走LLM
0.7 - 0.9 直接通过可选LLM验证
0.9 - 1.0 极高 直接通过跳过LLM

2.2 预警规则引擎 (AlertRuleEngine)

2.2.1 功能说明

预警规则引擎负责根据配置的规则判断事件是否需要触发预警:

  • 时间规则(工作时间/非工作时间)
  • 区域规则(禁区/普通区域)
  • 频率规则(冷却时间、最大频次)
  • 组合规则(多条件组合)

2.2.2 类设计

class AlertRuleEngine:
    """
    预警规则引擎
    
    职责:
    1. 管理预警规则集合
    2. 根据规则过滤候选事件
    3. 支持动态规则更新
    """
    
    def __init__(self, rule_repository: RuleRepository):
        self.rule_repository = rule_repository
        self.rule_cache = {}
    
    def check(
        self, 
        candidate_event: CandidateEvent
    ) -> RuleCheckResult:
        """
        检查候选事件是否满足预警规则
        
        Args:
            candidate_event: 候选事件
        
        Returns:
            规则检查结果
        """
        pass
    
    def get_applicable_rules(
        self, 
        event_type: str,
        scene: SceneType
    ) -> List[AlertRule]:
        """获取适用于当前事件的所有规则"""
        pass
    
    def evaluate_rule(
        self, 
        rule: AlertRule,
        event: CandidateEvent
    ) -> bool:
        """评估单个规则"""
        pass


@dataclass
class AlertRule:
    """预警规则定义"""
    rule_id: str
    rule_name: str
    event_type: str
    
    # 置信度规则
    min_confidence: float = 0.5
    max_confidence: float = 1.0
    
    # 时间规则
    time_windows: List[TimeWindow] = field(default_factory=list)
    timezone: str = "Asia/Shanghai"
    
    # 区域规则
    applicable_areas: List[str] = field(default_factory=list)
    excluded_areas: List[str] = field(default_factory=list)
    
    # 频率规则
    cooldown_seconds: int = 30
    max_alerts_per_hour: int = 10
    
    # 启用状态
    enabled: bool = True
    priority: int = 1  # 数字越小优先级越高


@dataclass
class TimeWindow:
    """时间窗口定义"""
    days_of_week: List[int]  # 0=周一, 6=周日
    start_time: str  # "HH:MM" 格式
    end_time: str    # "HH:MM" 格式

2.2.3 规则类型

class RuleType(Enum):
    """规则类型枚举"""
    CONFIDENCE = "confidence"      # 置信度规则
    TIME = "time"                  # 时间规则
    AREA = "area"                  # 区域规则
    FREQUENCY = "frequency"        # 频率规则
    COMBINATION = "combination"    # 组合规则
    SCENE = "scene"                # 场景规则

2.2.4 规则配置示例

# rules/fire_detection.yaml
rules:
  - rule_id: "fire_critical"
    rule_name: "火灾紧急预警"
    event_type: "fire"
    min_confidence: 0.8
    time_windows:
      - days_of_week: [0, 1, 2, 3, 4, 5, 6]
        start_time: "00:00"
        end_time: "23:59"
    applicable_areas: ["warehouse", "lab", "office"]
    cooldown_seconds: 10
    max_alerts_per_hour: 100
    enabled: true
    priority: 1

  - rule_id: "fire_normal"
    rule_name: "火灾普通预警"
    event_type: "fire"
    min_confidence: 0.5
    max_confidence: 0.8
    time_windows:
      - days_of_week: [0, 1, 2, 3, 4, 5, 6]
        start_time: "00:00"
        end_time: "23:59"
    applicable_areas: ["warehouse", "lab", "office"]
    cooldown_seconds: 30
    max_alerts_per_hour: 20
    enabled: true
    priority: 2

# rules/smoking_detection.yaml
rules:
  - rule_id: "smoking_indoor"
    rule_name: "室内吸烟预警"
    event_type: "smoking"
    min_confidence: 0.6
    scene_filter:
      location_type: "indoor"
      no_smoking_zone: true
    time_windows:
      - days_of_week: [0, 1, 2, 3, 4]
        start_time: "09:00"
        end_time: "18:00"
    cooldown_seconds: 60
    enabled: true
    priority: 1

2.3 事件聚合器 (EventAggregator)

2.3.1 功能说明

事件聚合器负责:

  • 合并同一目标的连续检测事件
  • 防止短时间内重复预警
  • 关联相似事件

2.3.2 类设计

class EventAggregator:
    """
    事件聚合器
    
    职责:
    1. 去重:合并同一目标的连续事件
    2. 关联:识别相关事件
    3. 窗口:基于时间窗口管理事件
    """
    
    def __init__(self, config: AggregatorConfig):
        self.config = config
        self.active_events: Dict[str, AggregatedEvent] = {}
        self.event_history: deque = deque(maxlen=1000)
    
    def aggregate(
        self, 
        new_event: CandidateEvent
    ) -> Optional[AggregatedEvent]:
        """
        聚合新事件
        
        Args:
            new_event: 新检测到的事件
        
        Returns:
            聚合后的事件如果是新事件或None如果是重复事件
        """
        pass
    
    def _find_matching_event(
        self, 
        new_event: CandidateEvent
    ) -> Optional[AggregatedEvent]:
        """查找匹配的历史事件"""
        pass
    
    def _calculate_similarity(
        self, 
        event1: CandidateEvent,
        event2: AggregatedEvent
    ) -> float:
        """计算两个事件的相似度"""
        pass
    
    def _merge_events(
        self, 
        existing: AggregatedEvent,
        new_event: CandidateEvent
    ) -> AggregatedEvent:
        """合并两个事件"""
        pass
    
    def cleanup_expired_events(self):
        """清理过期的事件"""
        pass


@dataclass
class AggregatedEvent:
    """聚合事件"""
    event_id: str
    event_type: str
    
    # 时间信息
    first_seen: datetime
    last_seen: datetime
    
    # 位置信息
    camera_id: str
    location: Location
    bbox_history: List[BBox]
    
    # 置信度信息
    confidence_max: float
    confidence_min: float
    confidence_avg: float
    
    # 检测次数
    detection_count: int
    
    # 关联的检测ID
    detection_ids: List[str]
    
    # 状态
    status: EventStatus  # active, closed, confirmed


class EventStatus(Enum):
    """事件状态"""
    ACTIVE = "active"        # 活跃中(持续检测)
    PENDING = "pending"      # 待确认等待LLM
    CONFIRMED = "confirmed"  # 已确认
    CLOSED = "closed"        # 已关闭
    FALSE_POSITIVE = "false_positive"  # 误报

2.3.3 聚合策略

策略 说明 适用场景
时间窗口 同一目标在N秒内的事件合并 持续检测场景
空间邻近 位置距离小于阈值的事件合并 移动目标跟踪
类型相同 同一类型的事件合并 同类事件去重
置信度加权 保留置信度最高的检测结果 质量优先

2.3.4 相似度计算

def calculate_event_similarity(
    event1: CandidateEvent,
    event2: AggregatedEvent
) -> float:
    """
    计算事件相似度0-1
    
    考虑因素:
    1. 时间 proximity
    2. 空间 proximity
    3. 类型匹配
    4. 置信度差异
    """
    
    # 时间相似度
    time_diff = abs(
        (event1.timestamp - event2.last_seen).total_seconds()
    )
    time_sim = max(0, 1 - time_diff / TIME_WINDOW_SECONDS)
    
    # 空间相似度IoU或中心点距离
    spatial_sim = calculate_spatial_similarity(
        event1.bbox, 
        event2.bbox_history[-1]
    )
    
    # 类型匹配
    type_sim = 1.0 if event1.event_type == event2.event_type else 0.0
    
    # 加权综合
    similarity = (
        0.3 * time_sim +
        0.4 * spatial_sim +
        0.3 * type_sim
    )
    
    return similarity

2.4 大模型触发器 (LLMTrigger)

2.4.1 功能说明

大模型触发器负责智能决策是否需要调用大模型进行二次判断:

  • 多帧累积判断: 不是单帧触发,而是基于多帧检测结果的综合判断
  • 置信度趋势分析: 分析置信度变化趋势,避免瞬时波动误触发
  • 基于场景的强制触发: 特定场景下强制触发LLM验证
  • 并发控制和队列管理: 控制LLM调用频率和成本
  • 成本优化策略: 缓存、去重、优先级队列

2.4.2 多帧累积判断机制

┌─────────────────────────────────────────────────────────────────────────────┐
│                         多帧累积判断流程                                      │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   帧N-3        帧N-2        帧N-1        帧N(当前)                           │
│     │            │            │            │                                 │
│     ▼            ▼            ▼            ▼                                 │
│  ┌──────┐    ┌──────┐    ┌──────┐    ┌──────┐                              │
│  │检测  │    │检测  │    │检测  │    │检测  │                              │
│  │conf: │    │conf: │    │conf: │    │conf: │                              │
│  │ 0.55 │    │ 0.62 │    │ 0.58 │    │ 0.65 │                              │
│  └──┬───┘    └──┬───┘    └──┬───┘    └──┬───┘                              │
│     │            │            │            │                                 │
│     └────────────┴────────────┴────────────┘                                 │
│                    │                                                         │
│                    ▼                                                         │
│         ┌─────────────────────┐                                              │
│         │   多帧累积分析器     │                                              │
│         │                     │                                              │
│         │ 分析维度:           │                                              │
│         │ • 检测频次: 4/4帧   │  ← 连续检测次数                              │
│         │ • 平均置信度: 0.60  │  ← 累积置信度                                │
│         │ • 置信度趋势: 上升  │  ← 趋势判断                                  │
│         │ • 位置稳定性: 稳定  │  ← 空间一致性                                │
│         │ • 持续时间: 1.2秒   │  ← 时间累积                                  │
│         └──────────┬──────────┘                                              │
│                    │                                                         │
│         ┌──────────┴──────────┐                                              │
│         │   触发决策判断       │                                              │
│         │                     │                                              │
│         │ 触发条件检查:       │                                              │
│         │ ✓ 连续帧数 >= 3     │  ← 最小帧数阈值                              │
│         │ ✓ 平均置信度 >= 0.55│  ← 累积置信度阈值                            │
│         │ ✓ 位置变化 < 50px   │  ← 空间稳定性阈值                            │
│         │ ✓ 持续时间 >= 1s    │  ← 时间阈值                                  │
│         │                     │                                              │
│         │ 决策结果: 触发LLM   │                                              │
│         └─────────────────────┘                                              │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

2.4.3 类设计

class LLMTrigger:
    """
    大模型触发器
    
    职责:
    1. 基于多帧累积的触发决策
    2. 管理帧历史缓冲区
    3. 分析检测趋势和稳定性
    4. 管理LLM调用队列和成本
    """
    
    def __init__(self, config: LLMTriggerConfig):
        self.config = config
        
        # 多帧累积分析器
        self.frame_accumulator = MultiFrameAccumulator(
            buffer_size=config.frame_buffer_size,
            time_window=config.frame_time_window
        )
        
        # 队列和缓存
        self.task_queue = PriorityQueue()
        self.result_cache = LRUCache(maxsize=1000)
        
        # 成本控制
        self.concurrent_calls = 0
        self.cost_tracker = CostTracker()
        
        # 触发状态追踪
        self.trigger_state: Dict[str, TriggerState] = {}
    
    def should_trigger(
        self, 
        event: AggregatedEvent,
        frame_context: FrameContext
    ) -> TriggerDecision:
        """
        决策是否触发LLM分析
        
        基于多帧累积判断,不是单帧决策
        
        Args:
            event: 当前帧的聚合事件
            frame_context: 帧上下文信息
        
        Returns:
            触发决策
        """
        # 1. 将当前帧加入累积器
        self.frame_accumulator.add_frame(event, frame_context)
        
        # 2. 获取多帧分析结果
        frame_analysis = self.frame_accumulator.analyze(event.event_id)
        
        # 3. 基于多帧结果做触发决策
        return self._make_trigger_decision(event, frame_analysis)
    
    def _make_trigger_decision(
        self,
        event: AggregatedEvent,
        frame_analysis: FrameAnalysisResult
    ) -> TriggerDecision:
        """基于多帧分析结果做触发决策"""
        
        # 检查多帧触发条件
        trigger_conditions = self._evaluate_trigger_conditions(frame_analysis)
        
        if not trigger_conditions.all_met:
            return TriggerDecision(
                should_trigger=False,
                priority=0.0,
                reason=trigger_conditions.reason,
                strategy="multi_frame_accumulation",
                frame_stats=frame_analysis
            )
        
        # 计算优先级
        priority = self._calculate_priority_with_frames(event, frame_analysis)
        
        return TriggerDecision(
            should_trigger=True,
            priority=priority,
            reason=f"多帧累积满足条件: {trigger_conditions.summary}",
            strategy="multi_frame_accumulation",
            frame_stats=frame_analysis
        )
    
    def _evaluate_trigger_conditions(
        self,
        frame_analysis: FrameAnalysisResult
    ) -> TriggerConditions:
        """评估多帧触发条件"""
        conditions = TriggerConditions()
        
        # 条件1: 最小连续帧数
        if frame_analysis.consecutive_frames < self.config.min_consecutive_frames:
            conditions.add_failure(
                "consecutive_frames",
                f"连续帧数不足: {frame_analysis.consecutive_frames} < {self.config.min_consecutive_frames}"
            )
        
        # 条件2: 累积置信度阈值
        if frame_analysis.avg_confidence < self.config.min_accumulated_confidence:
            conditions.add_failure(
                "avg_confidence",
                f"平均置信度不足: {frame_analysis.avg_confidence:.2f} < {self.config.min_accumulated_confidence}"
            )
        
        # 条件3: 空间稳定性
        if frame_analysis.position_variance > self.config.max_position_variance:
            conditions.add_failure(
                "position_stability",
                f"位置变化过大: {frame_analysis.position_variance:.1f}px"
            )
        
        # 条件4: 最小持续时间
        if frame_analysis.duration_seconds < self.config.min_duration_seconds:
            conditions.add_failure(
                "duration",
                f"持续时间不足: {frame_analysis.duration_seconds:.1f}s < {self.config.min_duration_seconds}s"
            )
        
        # 条件5: 置信度趋势(可选)
        if self.config.require_confidence_trend:
            if frame_analysis.confidence_trend < 0:  # 下降趋势
                conditions.add_failure(
                    "trend",
                    "置信度呈下降趋势"
                )
        
        return conditions
    
    def _calculate_priority_with_frames(
        self,
        event: AggregatedEvent,
        frame_analysis: FrameAnalysisResult
    ) -> float:
        """基于多帧信息计算优先级"""
        base_priority = self.get_priority_score(event)
        
        # 多帧因子调整
        frame_factors = {
            # 连续帧数越多,优先级越高
            "consecutive_bonus": min(0.2, frame_analysis.consecutive_frames * 0.05),
            
            # 置信度趋势向上,增加优先级
            "trend_bonus": 0.1 if frame_analysis.confidence_trend > 0 else 0,
            
            # 位置越稳定,优先级越高
            "stability_bonus": 0.1 * (1 - min(1, frame_analysis.position_variance / 100))
        }
        
        adjusted_priority = base_priority + sum(frame_factors.values())
        return min(1.0, adjusted_priority)
    
    def get_priority_score(self, event: AggregatedEvent) -> float:
        """计算基础优先级分数"""
        pass
    
    def enqueue_analysis(self, event: AggregatedEvent) -> str:
        """将事件加入LLM分析队列"""
        pass
    
    def get_analysis_result(
        self, 
        task_id: str,
        timeout: float = 30.0
    ) -> Optional[LLMResult]:
        """获取LLM分析结果"""
        pass


class MultiFrameAccumulator:
    """
    多帧累积分析器
    
    管理多帧检测历史,分析检测趋势和稳定性
    """
    
    def __init__(self, buffer_size: int = 30, time_window: float = 5.0):
        self.buffer_size = buffer_size
        self.time_window = time_window
        
        # 帧历史缓冲区: {event_id: deque[FrameRecord]}
        self.frame_buffers: Dict[str, deque] = {}
    
    def add_frame(
        self, 
        event: AggregatedEvent,
        frame_context: FrameContext
    ):
        """添加一帧检测记录"""
        event_id = event.event_id
        
        if event_id not in self.frame_buffers:
            self.frame_buffers[event_id] = deque(maxlen=self.buffer_size)
        
        record = FrameRecord(
            timestamp=frame_context.timestamp,
            confidence=event.confidence_avg,
            bbox=event.bbox,
            frame_id=frame_context.frame_id
        )
        
        self.frame_buffers[event_id].append(record)
        
        # 清理过期缓冲区
        self._cleanup_expired_buffers()
    
    def analyze(self, event_id: str) -> FrameAnalysisResult:
        """分析多帧检测数据"""
        buffer = self.frame_buffers.get(event_id, deque())
        
        if len(buffer) < 2:
            return FrameAnalysisResult(
                consecutive_frames=len(buffer),
                avg_confidence=buffer[0].confidence if buffer else 0,
                position_variance=0,
                duration_seconds=0,
                confidence_trend=0
            )
        
        # 计算各项指标
        confidences = [r.confidence for r in buffer]
        positions = [self._bbox_center(r.bbox) for r in buffer]
        timestamps = [r.timestamp for r in buffer]
        
        return FrameAnalysisResult(
            consecutive_frames=len(buffer),
            avg_confidence=np.mean(confidences),
            max_confidence=np.max(confidences),
            min_confidence=np.min(confidences),
            position_variance=self._calculate_position_variance(positions),
            duration_seconds=(timestamps[-1] - timestamps[0]).total_seconds(),
            confidence_trend=self._calculate_trend(confidences),
            frames_per_second=len(buffer) / max(0.001, (timestamps[-1] - timestamps[0]).total_seconds())
        )
    
    def _calculate_position_variance(
        self, 
        positions: List[Tuple[int, int]]
    ) -> float:
        """计算位置方差(稳定性指标)"""
        if len(positions) < 2:
            return 0.0
        
        x_coords = [p[0] for p in positions]
        y_coords = [p[1] for p in positions]
        
        x_var = np.var(x_coords)
        y_var = np.var(y_coords)
        
        return np.sqrt(x_var + y_var)
    
    def _calculate_trend(self, values: List[float]) -> float:
        """计算趋势(线性回归斜率)"""
        if len(values) < 2:
            return 0.0
        
        x = np.arange(len(values))
        slope, _, _, _, _ = linregress(x, values)
        return slope
    
    def _bbox_center(self, bbox: Tuple[int, int, int, int]) -> Tuple[int, int]:
        """计算边界框中心点"""
        x1, y1, x2, y2 = bbox
        return ((x1 + x2) // 2, (y1 + y2) // 2)
    
    def _cleanup_expired_buffers(self):
        """清理过期的帧缓冲区"""
        current_time = datetime.now()
        expired_events = []
        
        for event_id, buffer in self.frame_buffers.items():
            if buffer:
                last_time = buffer[-1].timestamp
                if (current_time - last_time).total_seconds() > self.time_window * 2:
                    expired_events.append(event_id)
        
        for event_id in expired_events:
            del self.frame_buffers[event_id]


@dataclass
class FrameRecord:
    """单帧检测记录"""
    timestamp: datetime
    confidence: float
    bbox: Tuple[int, int, int, int]
    frame_id: int


@dataclass
class FrameAnalysisResult:
    """多帧分析结果"""
    consecutive_frames: int          # 连续检测帧数
    avg_confidence: float            # 平均置信度
    max_confidence: float            # 最大置信度
    min_confidence: float            # 最小置信度
    position_variance: float         # 位置方差(像素)
    duration_seconds: float          # 持续时间(秒)
    confidence_trend: float          # 置信度趋势(斜率)
    frames_per_second: float = 0.0   # 检测帧率


@dataclass
class TriggerDecision:
    """触发决策结果"""
    should_trigger: bool
    priority: float                  # 优先级分数
    reason: str                      # 决策原因
    strategy: str                    # 使用的策略
    frame_stats: Optional[FrameAnalysisResult] = None  # 多帧统计


@dataclass
class TriggerConditions:
    """触发条件评估结果"""
    failures: List[Dict] = field(default_factory=list)
    
    def add_failure(self, condition: str, reason: str):
        self.failures.append({"condition": condition, "reason": reason})
    
    @property
    def all_met(self) -> bool:
        return len(self.failures) == 0
    
    @property
    def reason(self) -> str:
        if self.all_met:
            return "所有条件满足"
        return "; ".join([f["reason"] for f in self.failures])
    
    @property
    def summary(self) -> str:
        return f"通过{len(self.failures)}项, 失败{len(self.failures)}项"


@dataclass
class LLMTriggerConfig:
    """LLM触发器配置"""
    
    # ========== 多帧累积配置 ==========
    # 帧缓冲区大小
    frame_buffer_size: int = 30
    
    # 帧时间窗口(秒)
    frame_time_window: float = 5.0
    
    # 最小连续帧数触发LLM的最小帧数
    min_consecutive_frames: int = 3
    
    # 最小累积置信度
    min_accumulated_confidence: float = 0.55
    
    # 最大位置方差(像素平方)
    max_position_variance: float = 2500  # 50px * 50px
    
    # 最小持续时间(秒)
    min_duration_seconds: float = 1.0
    
    # 是否要求置信度趋势向上
    require_confidence_trend: bool = False
    
    # ========== 置信度触发区间 ==========
    confidence_trigger_range: Tuple[float, float] = (0.5, 0.85)
    
    # ========== 强制触发场景 ==========
    force_trigger_scenes: List[str] = field(
        default_factory=lambda: ["indoor_no_smoking_area", "chemical_lab"]
    )
    
    # ========== 并发控制 ==========
    max_concurrent_calls: int = 5
    max_queue_size: int = 100
    
    # ========== 成本限制 ==========
    max_calls_per_minute: int = 30
    max_cost_per_hour: float = 10.0
    
    # ========== 缓存配置 ==========
    cache_ttl_seconds: int = 300
    similarity_threshold: float = 0.9

2.4.4 触发策略(含多帧判断)

class TriggerStrategy:
    """触发策略集合"""
    
    @staticmethod
    def multi_frame_strategy(
        event: AggregatedEvent,
        frame_analysis: FrameAnalysisResult,
        config: LLMTriggerConfig
    ) -> TriggerDecision:
        """
        多帧累积触发策略(默认策略)
        
        核心逻辑:
        1. 不是单帧触发,而是基于多帧的综合判断
        2. 要求检测在时间和空间上都是稳定的
        3. 避免瞬时误检导致的误触发
        """
        # 快速拒绝:如果帧数太少
        if frame_analysis.consecutive_frames < config.min_consecutive_frames:
            return TriggerDecision(
                should_trigger=False,
                priority=0.0,
                reason=f"连续帧数不足: {frame_analysis.consecutive_frames}/{config.min_consecutive_frames}",
                strategy="multi_frame",
                frame_stats=frame_analysis
            )
        
        # 快速拒绝:如果平均置信度太低
        if frame_analysis.avg_confidence < config.min_accumulated_confidence:
            return TriggerDecision(
                should_trigger=False,
                priority=0.0,
                reason=f"平均置信度不足: {frame_analysis.avg_confidence:.2f}/{config.min_accumulated_confidence}",
                strategy="multi_frame",
                frame_stats=frame_analysis
            )
        
        # 检查空间稳定性
        if frame_analysis.position_variance > config.max_position_variance:
            return TriggerDecision(
                should_trigger=False,
                priority=0.0,
                reason=f"位置不稳定: 方差={frame_analysis.position_variance:.1f}px²",
                strategy="multi_frame",
                frame_stats=frame_analysis
            )
        
        # 检查持续时间
        if frame_analysis.duration_seconds < config.min_duration_seconds:
            return TriggerDecision(
                should_trigger=False,
                priority=0.0,
                reason=f"持续时间不足: {frame_analysis.duration_seconds:.1f}s/{config.min_duration_seconds}s",
                strategy="multi_frame",
                frame_stats=frame_analysis
            )
        
        # 所有条件满足,计算优先级
        priority = TriggerStrategy._calculate_frame_based_priority(
            frame_analysis, config
        )
        
        return TriggerDecision(
            should_trigger=True,
            priority=priority,
            reason=f"多帧累积满足条件: {frame_analysis.consecutive_frames}帧, "
                   f"平均置信度{frame_analysis.avg_confidence:.2f}, "
                   f"持续{frame_analysis.duration_seconds:.1f}s",
            strategy="multi_frame",
            frame_stats=frame_analysis
        )
    
    @staticmethod
    def _calculate_frame_based_priority(
        frame_analysis: FrameAnalysisResult,
        config: LLMTriggerConfig
    ) -> float:
        """基于多帧信息计算优先级"""
        
        # 基础分:平均置信度
        base_score = frame_analysis.avg_confidence
        
        # 连续帧数奖励(帧数越多越可靠)
        frame_bonus = min(0.15, (frame_analysis.consecutive_frames - config.min_consecutive_frames) * 0.03)
        
        # 稳定性奖励(方差越小越稳定)
        stability_ratio = 1 - min(1, frame_analysis.position_variance / config.max_position_variance)
        stability_bonus = 0.1 * stability_ratio
        
        # 趋势奖励(置信度上升)
        trend_bonus = 0.05 if frame_analysis.confidence_trend > 0 else 0
        
        total_score = base_score + frame_bonus + stability_bonus + trend_bonus
        return min(1.0, total_score)
    
    @staticmethod
    def scene_based_strategy(
        event: AggregatedEvent,
        config: LLMTriggerConfig
    ) -> TriggerDecision:
        """
        基于场景的策略
        
        某些场景强制触发LLM验证不依赖多帧累积
        """
        scene = event.location.scene_type
        
        if scene in config.force_trigger_scenes:
            return TriggerDecision(
                should_trigger=True,
                priority=1.0,
                reason=f"强制触发场景: {scene}",
                strategy="scene_based"
            )
        
        return TriggerDecision(
            should_trigger=False,
            priority=0.0,
            reason="非强制触发场景",
            strategy="scene_based"
        )
    
    @staticmethod
    def single_frame_high_confidence_strategy(
        event: AggregatedEvent,
        config: LLMTriggerConfig
    ) -> TriggerDecision:
        """
        单帧高置信度策略
        
        对于极高置信度的检测,可以跳过多帧累积直接触发
        (用于紧急情况)
        """
        if event.confidence_avg > 0.95:
            return TriggerDecision(
                should_trigger=True,
                priority=0.9,
                reason=f"单帧极高置信度: {event.confidence_avg:.2f}",
                strategy="single_frame_high_conf"
            )
        
        return TriggerDecision(
            should_trigger=False,
            priority=0.0,
            reason="置信度未达到单帧触发阈值",
            strategy="single_frame_high_conf"
        )

2.4.5 多帧累积配置示例

# config/llm_trigger.yaml

llm_trigger:
  # ========== 多帧累积配置 ==========
  multi_frame:
    # 帧缓冲区大小保存最近N帧
    buffer_size: 30
    
    # 帧时间窗口(秒)- 只分析最近5秒内的帧
    time_window: 5.0
    
    # 最小连续帧数 - 至少检测到3帧才考虑触发
    min_consecutive_frames: 3
    
    # 最小累积置信度 - 多帧平均置信度阈值
    min_accumulated_confidence: 0.55
    
    # 最大位置方差(像素平方)- 位置变化不能超过50px
    max_position_variance: 2500
    
    # 最小持续时间(秒)- 检测至少持续1秒
    min_duration_seconds: 1.0
    
    # 是否要求置信度趋势向上
    require_confidence_trend: false
    
    # 帧率要求fps- 检测帧率不能太低
    min_detection_fps: 2.0
  
  # ========== 触发策略优先级 ==========
  strategy_priority:
    - "scene_based"           # 场景强制触发(最高优先级)
    - "single_frame_high_conf" # 单帧极高置信度
    - "multi_frame"           # 多帧累积(默认)
  
  # ========== 不同事件类型的多帧配置 ==========
  event_type_overrides:
    fire:
      # 火灾检测更敏感,减少帧数要求
      min_consecutive_frames: 2
      min_duration_seconds: 0.5
      min_accumulated_confidence: 0.50
    
    smoking:
      # 抽烟检测需要更稳定
      min_consecutive_frames: 5
      min_duration_seconds: 2.0
      min_accumulated_confidence: 0.60
      require_confidence_trend: true
    
    loitering:
      # 徘徊检测本来就需要时间
      min_consecutive_frames: 10
      min_duration_seconds: 5.0
      min_accumulated_confidence: 0.55

2.5 结果融合 (ResultFusion)

2.5.1 功能说明

结果融合模块负责将YOLO检测结果和LLM分析结果进行融合得出最终决策

  • 置信度加权融合
  • 冲突解决策略
  • 决策解释生成

2.5.2 类设计

class ResultFusion:
    """
    结果融合器
    
    职责:
    1. 融合多源检测结果
    2. 解决结果冲突
    3. 生成决策解释
    """
    
    def __init__(self, config: FusionConfig):
        self.config = config
        self.fusion_strategies = {
            "weighted": WeightedFusionStrategy(),
            "conservative": ConservativeFusionStrategy(),
            "optimistic": OptimisticFusionStrategy()
        }
    
    def fuse(
        self,
        yolo_result: DetectionResult,
        llm_result: Optional[LLMResult],
        strategy: str = "weighted"
    ) -> FusionResult:
        """
        融合检测结果
        
        Args:
            yolo_result: YOLO检测结果
            llm_result: LLM分析结果可能为None
            strategy: 融合策略
        
        Returns:
            融合后的结果
        """
        pass
    
    def _calculate_fused_confidence(
        self,
        yolo_conf: float,
        llm_conf: float,
        weights: Tuple[float, float]
    ) -> float:
        """计算融合后的置信度"""
        pass
    
    def _generate_explanation(
        self,
        yolo_result: DetectionResult,
        llm_result: Optional[LLMResult],
        fused_result: FusionResult
    ) -> str:
        """生成决策解释"""
        pass


@dataclass
class FusionResult:
    """融合结果"""
    is_positive: bool           # 是否为真实事件
    confidence: float           # 融合后的置信度
    yolo_contribution: float    # YOLO贡献度
    llm_contribution: float     # LLM贡献度
    explanation: str            # 决策解释
    metadata: Dict              # 元数据

2.5.3 融合策略

策略 说明 适用场景
加权融合 按权重合并YOLO和LLM置信度 通用场景
保守策略 两者都高才算高置信度 高风险场景
乐观策略 任一高就算高置信度 宁可误报不可漏报
LLM优先 LLM结果覆盖YOLO LLM可靠性高时

2.6 严重性评估器 (SeverityAssessor)

2.6.1 功能说明

严重性评估器负责评估事件的严重程度:

  • 基于事件类型的基础等级
  • 基于场景的调整
  • 基于持续时间的升级
  • 生成响应建议

2.6.2 类设计

class SeverityAssessor:
    """
    严重性评估器
    
    职责:
    1. 评估事件严重程度
    2. 计算响应优先级
    3. 生成响应建议
    """
    
    def __init__(self, config: SeverityConfig):
        self.config = config
        self.severity_matrix = self._load_severity_matrix()
    
    def assess(
        self,
        event: FusionResult,
        context: EventContext
    ) -> SeverityAssessment:
        """
        评估事件严重性
        
        Args:
            event: 融合后的事件结果
            context: 事件上下文
        
        Returns:
            严重性评估结果
        """
        pass
    
    def _calculate_base_severity(
        self,
        event_type: str
    ) -> SeverityLevel:
        """计算基础严重等级"""
        pass
    
    def _apply_scene_modifier(
        self,
        base_severity: SeverityLevel,
        scene: SceneType
    ) -> SeverityLevel:
        """应用场景调整"""
        pass
    
    def _apply_duration_modifier(
        self,
        severity: SeverityLevel,
        duration_seconds: float
    ) -> SeverityLevel:
        """应用持续时间调整"""
        pass
    
    def generate_response_recommendation(
        self,
        assessment: SeverityAssessment
    ) -> ResponseRecommendation:
        """生成响应建议"""
        pass


class SeverityLevel(Enum):
    """严重等级枚举"""
    LOW = "low"              # 低
    MEDIUM = "medium"        # 中
    HIGH = "high"            # 高
    CRITICAL = "critical"    # 紧急


@dataclass
class SeverityAssessment:
    """严重性评估结果"""
    level: SeverityLevel
    score: float  # 0-100
    factors: List[str]  # 影响因素
    escalation_recommended: bool  # 建议升级
    response_time_target: int  # 目标响应时间(秒)


@dataclass
class ResponseRecommendation:
    """响应建议"""
    immediate_actions: List[str]
    notification_targets: List[str]
    escalation_path: Optional[str]
    documentation_required: bool

2.6.3 严重性矩阵

事件类型 基础等级 室内场景 夜间场景 持续>5分钟
火灾 CRITICAL +0 +0 +0
抽烟 MEDIUM HIGH MEDIUM HIGH
徘徊 LOW MEDIUM HIGH MEDIUM
违停 LOW +0 LOW MEDIUM

3. 数据流设计

3.1 完整数据流图

┌─────────────────────────────────────────────────────────────────────────────┐
│                              完整事件处理数据流                               │
└─────────────────────────────────────────────────────────────────────────────┘

[视频帧输入]
     │
     ▼
┌─────────────────┐
│  YOLO检测       │
│  • 目标检测     │
│  • 行为分析     │
└────────┬────────┘
         │ DetectionResult
         ▼
┌─────────────────┐     ┌─────────────────┐
│ 事件决策引擎    │────▶│ 置信度评估      │
│                 │     │ • 原始置信度    │
│ • 初步筛选      │     │ • 模型可信度    │
│ • 场景识别      │     │ • 历史校准      │
└────────┬────────┘     └─────────────────┘
         │ CandidateEvent
         ▼
┌─────────────────┐     ┌─────────────────┐
│ 预警规则引擎    │────▶│ 规则匹配        │
│                 │     │ • 时间规则      │
│ • 规则过滤      │     │ • 区域规则      │
│ • 频次检查      │     │ • 置信度规则    │
└────────┬────────┘     └─────────────────┘
         │ RuleCheckResult
         ▼
┌─────────────────┐     ┌─────────────────┐
│ 事件聚合器      │────▶│ 去重策略        │
│                 │     │ • 时间窗口      │
│ • 事件合并      │     │ • 空间邻近      │
│ • 关联分析      │     │ • 相似度计算    │
└────────┬────────┘     └─────────────────┘
         │ AggregatedEvent
         ▼
┌─────────────────┐     ┌─────────────────┐
│ 大模型触发器    │────▶│ 触发决策        │
│                 │     │ • 置信度区间    │
│ • 智能决策      │     │ • 场景匹配      │
│ • 队列管理      │     │ • 并发控制      │
└────────┬────────┘     └─────────────────┘
         │ (可选分支)
    ┌────┴────┐
    │         │
    ▼         ▼
┌──────┐  ┌─────────────────┐
│ 跳过 │  │ LLM视觉分析     │
│ LLM  │  │                 │
│      │  │ • 图像理解      │
│      │  │ • 场景分析      │
│      │  │ • 推理验证      │
└──┬───┘  └────────┬────────┘
   │               │ LLMResult
   └───────┬───────┘
           ▼
┌─────────────────┐     ┌─────────────────┐
│ 结果融合        │────▶│ 融合策略        │
│                 │     │ • 加权融合      │
│ • 置信度合并    │     │ • 冲突解决      │
│ • 决策生成      │     │ • 解释生成      │
└────────┬────────┘     └─────────────────┘
         │ FusionResult
         ▼
┌─────────────────┐     ┌─────────────────┐
│ 严重性评估器    │────▶│ 等级计算        │
│                 │     │ • 基础等级      │
│ • 风险评估      │     │ • 场景调整      │
│ • 响应建议      │     │ • 持续升级      │
└────────┬────────┘     └─────────────────┘
         │ SeverityAssessment
         ▼
┌─────────────────┐     ┌─────────────────┐
│ 事件格式化      │────▶│ 标准格式        │
│                 │     │ • 事件ID        │
│ • 元数据填充    │     │ • 时间戳        │
│ • 快照关联      │     │ • 位置信息      │
└────────┬────────┘     └─────────────────┘
         │ FinalEvent
         ▼
┌─────────────────┐
│ MQTT发布        │
│                 │
│ • 消息序列化    │
│ • QoS管理       │
│ • 发布确认      │
└─────────────────┘

3.2 状态流转

                    ┌─────────────┐
                    │   DETECTED  │
                    │   (检测到)   │
                    └──────┬──────┘
                           │
              ┌────────────┼────────────┐
              │            │            │
              ▼            ▼            ▼
       ┌──────────┐ ┌──────────┐ ┌──────────┐
       │ REJECTED │ │ PENDING  │ │ CONFIRMED│
       │ (已拒绝)  │ │ (待确认) │ │ (已确认) │
       └──────────┘ └────┬─────┘ └────┬─────┘
                         │            │
                         ▼            │
                  ┌──────────┐        │
                  │ LLM_ANAL │        │
                  │ (LLM分析)│        │
                  └────┬─────┘        │
                       │              │
                       ▼              │
                  ┌──────────┐        │
                  │ FUSION   │◀───────┘
                  │ (融合)   │
                  └────┬─────┘
                       │
                       ▼
                  ┌──────────┐
                  │ SEVERITY │
                  │ (严重性) │
                  └────┬─────┘
                       │
                       ▼
                  ┌──────────┐
                  │ PUBLISHED│
                  │ (已发布) │
                  └──────────┘

4. 算法详细设计

4.1 置信度校准算法

class ConfidenceCalibrator:
    """
    置信度校准器
    
    使用温度缩放Temperature Scaling校准模型置信度
    """
    
    def __init__(self, temperature: float = 1.0):
        self.temperature = temperature
        self.calibration_history = []
    
    def calibrate(
        self, 
        raw_confidence: float,
        model_type: str,
        scene: SceneType
    ) -> float:
        """
        校准置信度
        
        公式: calibrated = sigmoid(logit(raw) / temperature)
        """
        # 防止除零
        eps = 1e-10
        
        # 转换为logit
        raw_clipped = np.clip(raw_confidence, eps, 1 - eps)
        logit = np.log(raw_clipped / (1 - raw_clipped))
        
        # 温度缩放
        scaled_logit = logit / self.temperature
        
        # 转回概率
        calibrated = 1 / (1 + np.exp(-scaled_logit))
        
        # 场景调整
        scene_factor = self._get_scene_factor(scene)
        
        return min(1.0, calibrated * scene_factor)
    
    def _get_scene_factor(self, scene: SceneType) -> float:
        """获取场景调整因子"""
        factors = {
            SceneType.INDOOR: 0.95,
            SceneType.OUTDOOR: 1.0,
            SceneType.LOW_LIGHT: 0.85,
            SceneType.CROWDED: 0.9
        }
        return factors.get(scene, 1.0)

4.2 事件相似度算法

class EventSimilarityCalculator:
    """事件相似度计算器"""
    
    @staticmethod
    def calculate_iou(
        bbox1: Tuple[int, int, int, int],
        bbox2: Tuple[int, int, int, int]
    ) -> float:
        """计算边界框IoU"""
        x1_1, y1_1, x2_1, y2_1 = bbox1
        x1_2, y1_2, x2_2, y2_2 = bbox2
        
        # 计算交集
        xi1 = max(x1_1, x1_2)
        yi1 = max(y1_1, y1_2)
        xi2 = min(x2_1, x2_2)
        yi2 = min(y2_1, y2_2)
        
        inter_width = max(0, xi2 - xi1)
        inter_height = max(0, yi2 - yi1)
        inter_area = inter_width * inter_height
        
        # 计算并集
        box1_area = (x2_1 - x1_1) * (y2_1 - y1_1)
        box2_area = (x2_2 - x1_2) * (y2_2 - y1_2)
        union_area = box1_area + box2_area - inter_area
        
        return inter_area / union_area if union_area > 0 else 0.0
    
    @staticmethod
    def calculate_temporal_proximity(
        time1: datetime,
        time2: datetime,
        max_window: int = 30
    ) -> float:
        """计算时间邻近度"""
        diff_seconds = abs((time1 - time2).total_seconds())
        return max(0, 1 - diff_seconds / max_window)
    
    @staticmethod
    def calculate_feature_similarity(
        event1: CandidateEvent,
        event2: CandidateEvent
    ) -> float:
        """计算特征相似度"""
        similarities = []
        
        # 类型相似度
        type_sim = 1.0 if event1.event_type == event2.event_type else 0.0
        similarities.append(type_sim)
        
        # 置信度相似度
        conf_sim = 1.0 - abs(
            event1.confidence - event2.confidence
        )
        similarities.append(conf_sim)
        
        # 空间相似度
        spatial_sim = EventSimilarityCalculator.calculate_iou(
            event1.bbox, event2.bbox
        )
        similarities.append(spatial_sim)
        
        return np.mean(similarities)

4.3 优先级队列算法

class PriorityQueueManager:
    """优先级队列管理器"""
    
    def __init__(self):
        self.queue = []
        self.counter = 0
    
    def enqueue(
        self, 
        event: AggregatedEvent,
        priority_score: float
    ) -> str:
        """
        将事件加入优先级队列
        
        优先级计算考虑:
        1. 事件严重性40%
        2. 置信度不确定性30%
        3. 时间紧迫性20%
        4. 历史误报率10%
        """
        task_id = f"llm_task_{self.counter}"
        self.counter += 1
        
        # 使用堆实现优先级队列
        # Python heapq是最小堆所以用负数优先级
        heapq.heappush(
            self.queue, 
            (-priority_score, self.counter, task_id, event)
        )
        
        return task_id
    
    def dequeue(self) -> Optional[Tuple[str, AggregatedEvent]]:
        """取出最高优先级的事件"""
        if not self.queue:
            return None
        
        _, _, task_id, event = heapq.heappop(self.queue)
        return task_id, event
    
    def calculate_priority(
        self,
        event: AggregatedEvent,
        severity: SeverityLevel,
        fp_rate: float
    ) -> float:
        """计算优先级分数"""
        # 严重性权重 (0-1)
        severity_weights = {
            SeverityLevel.CRITICAL: 1.0,
            SeverityLevel.HIGH: 0.8,
            SeverityLevel.MEDIUM: 0.5,
            SeverityLevel.LOW: 0.2
        }
        severity_score = severity_weights.get(severity, 0.5)
        
        # 置信度不确定性 (0-1, 越高越不确定)
        uncertainty = 1.0 - event.confidence_avg
        
        # 时间紧迫性 (0-1)
        time_urgency = self._calculate_time_urgency(event)
        
        # 综合计算
        priority = (
            0.4 * severity_score +
            0.3 * uncertainty +
            0.2 * time_urgency +
            0.1 * min(1.0, fp_rate * 2)  # 误报率越高优先级越高(需要验证)
        )
        
        return priority

5. 配置管理

5.1 配置文件结构

# config/event_engine.yaml

event_decision_engine:
  confidence_thresholds:
    discard: 0.5
    llm_required: 0.7
    direct_pass: 0.85
  
  scene_classification:
    enabled: true
    model_path: "models/scene_classifier.onnx"
  
  confidence_calibration:
    enabled: true
    temperature: 1.2
    history_window: 1000

alert_rule_engine:
  rule_files:
    - "rules/fire_detection.yaml"
    - "rules/smoking_detection.yaml"
    - "rules/loitering_detection.yaml"
  
  default_rules:
    cooldown_seconds: 30
    max_alerts_per_hour: 50
  
  dynamic_update:
    enabled: true
    check_interval: 60

event_aggregator:
  dedup_window_seconds: 30
  spatial_tolerance_pixels: 50
  iou_threshold: 0.3
  max_events_in_memory: 1000
  cleanup_interval: 300

llm_trigger:
  confidence_trigger_range: [0.5, 0.85]
  
  force_trigger_scenes:
    - "indoor_no_smoking_area"
    - "chemical_lab"
    - "server_room"
  
  concurrency_control:
    max_concurrent_calls: 5
    max_queue_size: 100
    timeout_seconds: 30
  
  cost_control:
    max_calls_per_minute: 30
    max_cost_per_hour: 10.0
    cache_enabled: true
    cache_ttl_seconds: 300
  
  priority_weights:
    severity: 0.4
    uncertainty: 0.3
    urgency: 0.2
    false_positive_rate: 0.1

result_fusion:
  default_strategy: "weighted"
  weights:
    yolo: 0.6
    llm: 0.4
  
  strategies:
    weighted:
      yolo_weight: 0.6
      llm_weight: 0.4
    conservative:
      mode: "both_high"
    optimistic:
      mode: "either_high"

severity_assessor:
  base_severity:
    fire: "critical"
    smoking: "medium"
    loitering: "low"
    illegal_parking: "low"
  
  scene_modifiers:
    indoor:
      smoking: +1  # 升级一级
    night_time:
      loitering: +1
    crowded:
      fire: +0  # 已经是最高
  
  duration_escalation:
    enabled: true
    thresholds:
      - duration: 300  # 5分钟
        escalation: +1
      - duration: 600  # 10分钟
        escalation: +2
  
  response_time_targets:
    critical: 10  # 秒
    high: 30
    medium: 120
    low: 300

5.2 规则文件示例

# rules/fire_detection.yaml
rules:
  - rule_id: "fire_critical"
    rule_name: "火灾紧急预警"
    event_type: "fire"
    description: "高置信度火灾检测,立即预警"
    
    conditions:
      confidence:
        min: 0.8
        max: 1.0
      time:
        - days: [0, 1, 2, 3, 4, 5, 6]  # 每天
          start: "00:00"
          end: "23:59"
      areas:
        include: ["warehouse", "lab", "office", "server_room"]
        exclude: []
    
    actions:
      alert_level: "critical"
      cooldown_seconds: 10
      max_frequency:
        per_hour: 100
        per_day: 500
      notifications:
        - type: "mqtt"
          topic: "jc-video/alerts/fire/critical"
          qos: 2
        - type: "webhook"
          url: "https://api.company.com/alerts"
      
    enabled: true
    priority: 1

  - rule_id: "fire_normal"
    rule_name: "火灾普通预警"
    event_type: "fire"
    description: "中等置信度火灾检测需要LLM验证"
    
    conditions:
      confidence:
        min: 0.5
        max: 0.8
      time:
        - days: [0, 1, 2, 3, 4, 5, 6]
          start: "00:00"
          end: "23:59"
    
    actions:
      alert_level: "high"
      llm_verification: true
      cooldown_seconds: 30
      max_frequency:
        per_hour: 20
      notifications:
        - type: "mqtt"
          topic: "jc-video/alerts/fire/high"
          qos: 2
    
    enabled: true
    priority: 2

6. 接口定义

6.1 内部接口

# 事件决策引擎接口
class IEventDecisionEngine(ABC):
    @abstractmethod
    def evaluate(
        self, 
        detection_result: DetectionResult,
        context: DetectionContext
    ) -> List[CandidateEvent]:
        pass

# 预警规则引擎接口
class IAlertRuleEngine(ABC):
    @abstractmethod
    def check(
        self, 
        candidate_event: CandidateEvent
    ) -> RuleCheckResult:
        pass
    
    @abstractmethod
    def reload_rules(self) -> bool:
        """热加载规则"""
        pass

# 事件聚合器接口
class IEventAggregator(ABC):
    @abstractmethod
    def aggregate(
        self, 
        new_event: CandidateEvent
    ) -> Optional[AggregatedEvent]:
        pass
    
    @abstractmethod
    def get_active_events(self) -> List[AggregatedEvent]:
        pass

# LLM触发器接口
class ILLMTrigger(ABC):
    @abstractmethod
    def should_trigger(
        self, 
        event: AggregatedEvent
    ) -> TriggerDecision:
        pass
    
    @abstractmethod
    def submit_analysis(
        self, 
        event: AggregatedEvent
    ) -> str:
        """提交分析任务返回任务ID"""
        pass
    
    @abstractmethod
    def get_result(
        self, 
        task_id: str
    ) -> Optional[LLMResult]:
        pass

# 严重性评估器接口
class ISeverityAssessor(ABC):
    @abstractmethod
    def assess(
        self,
        event: FusionResult,
        context: EventContext
    ) -> SeverityAssessment:
        pass

6.2 外部接口

# 主入口接口
class IEventJudgmentEngine(ABC):
    """
    事件判断引擎主接口
    
    这是外部系统调用的统一入口
    """
    
    @abstractmethod
    async def process_detection(
        self,
        detection_result: DetectionResult,
        context: DetectionContext
    ) -> Optional[FinalEvent]:
        """
        处理检测结果
        
        Args:
            detection_result: YOLO/行为检测结果
            context: 检测上下文
        
        Returns:
            最终事件如果需要预警或None
        """
        pass
    
    @abstractmethod
    async def process_frame(
        self,
        frame: np.ndarray,
        camera_id: str,
        timestamp: datetime
    ) -> List[FinalEvent]:
        """
        处理视频帧(完整流程)
        
        Args:
            frame: 视频帧
            camera_id: 摄像头ID
            timestamp: 时间戳
        
        Returns:
            检测到的事件列表
        """
        pass
    
    @abstractmethod
    def get_statistics(self) -> EngineStatistics:
        """获取引擎统计信息"""
        pass
    
    @abstractmethod
    def reload_configuration(self) -> bool:
        """重新加载配置"""
        pass

7. 性能考虑

7.1 性能指标目标

指标 目标值 说明
事件决策延迟 < 10ms 单事件决策时间
规则匹配延迟 < 5ms 单规则匹配时间
事件聚合延迟 < 20ms 包括相似度计算
LLM触发决策 < 5ms 决策时间不含LLM调用
严重性评估 < 5ms 评估时间
总处理延迟 < 50ms 不含LLM的完整流程
并发处理能力 > 100事件/秒 单实例
内存占用 < 500MB 事件缓存和队列

7.2 优化策略

7.2.1 计算优化

优化点 策略 预期收益
规则匹配 使用倒排索引 减少90%的比较次数
相似度计算 空间哈希网格 O(n) -> O(1)
置信度校准 预计算查找表 避免重复计算
场景分类 轻量级模型 < 5ms推理时间

7.2.2 缓存策略

class EventEngineCache:
    """事件引擎缓存管理"""
    
    def __init__(self):
        # 规则缓存
        self.rule_cache = LRUCache(maxsize=1000)
        
        # 场景分类缓存(帧级别)
        self.scene_cache = LRUCache(maxsize=100)
        
        # LLM结果缓存图像特征级别
        self.llm_cache = LRUCache(maxsize=1000)
        
        # 置信度校准缓存
        self.calibration_cache = LRUCache(maxsize=5000)
    
    def get_cache_key(
        self, 
        frame_hash: str,
        detection_bbox: Tuple[int, int, int, int]
    ) -> str:
        """生成缓存键"""
        return f"{frame_hash}_{bbox_hash(detection_bbox)}"

7.2.3 异步处理

class AsyncEventProcessor:
    """异步事件处理器"""
    
    def __init__(self):
        self.processing_queue = asyncio.Queue()
        self.llm_semaphore = asyncio.Semaphore(5)
        self.worker_tasks = []
    
    async def start(self, num_workers: int = 4):
        """启动处理工作线程"""
        self.worker_tasks = [
            asyncio.create_task(self._worker())
            for _ in range(num_workers)
        ]
    
    async def _worker(self):
        """工作线程"""
        while True:
            event = await self.processing_queue.get()
            try:
                await self._process_event(event)
            finally:
                self.processing_queue.task_done()
    
    async def _process_event(self, event: AggregatedEvent):
        """处理单个事件"""
        # 决策是否需要LLM
        trigger_decision = await self.llm_trigger.should_trigger(event)
        
        if trigger_decision.should_trigger:
            async with self.llm_semaphore:
                llm_result = await self.llm_client.analyze(event)
        else:
            llm_result = None
        
        # 融合结果
        fused_result = self.result_fusion.fuse(
            event.yolo_result,
            llm_result
        )
        
        # 评估严重性
        severity = self.severity_assessor.assess(fused_result)
        
        # 发布事件
        await self._publish_event(fused_result, severity)

7.3 监控指标

@dataclass
class EngineMetrics:
    """引擎监控指标"""
    
    # 处理指标
    events_processed: int = 0
    events_accepted: int = 0
    events_rejected: int = 0
    events_aggregated: int = 0
    
    # 延迟指标(毫秒)
    decision_latency_ms: float = 0.0
    rule_match_latency_ms: float = 0.0
    aggregation_latency_ms: float = 0.0
    llm_decision_latency_ms: float = 0.0
    severity_latency_ms: float = 0.0
    total_latency_ms: float = 0.0
    
    # LLM指标
    llm_calls_total: int = 0
    llm_calls_cached: int = 0
    llm_queue_size: int = 0
    llm_cost_usd: float = 0.0
    
    # 质量指标
    false_positive_rate: float = 0.0
    precision: float = 0.0
    recall: float = 0.0
    
    # 资源指标
    memory_usage_mb: float = 0.0
    cpu_usage_percent: float = 0.0

附录

A. 术语表

术语 英文 说明
候选事件 Candidate Event 通过初步筛选的潜在事件
聚合事件 Aggregated Event 合并去重后的事件
最终事件 Final Event 经过完整流程处理后的事件
触发决策 Trigger Decision 是否触发LLM分析的决策
融合结果 Fusion Result YOLO和LLM结果融合后的结果
严重等级 Severity Level 事件的严重程度分级
冷却时间 Cooldown 同一类型事件的重复预警间隔

B. 数据模型关系图

DetectionResult
    │
    ├──> CandidateEvent (1:N)
    │       │
    │       └──> RuleCheckResult
    │               │
    │               └──> AggregatedEvent (N:1)
    │                       │
    │                       ├──> TriggerDecision
    │                       │       │
    │                       │       └──> LLMResult (Optional)
    │                       │
    │                       └──> FusionResult
    │                               │
    │                               └──> SeverityAssessment
    │                                       │
    │                                       └──> FinalEvent
    │
    └──> BehaviorAlert (if behavior detected)

文档结束