From 6819e57d79ded197648b88369620f421a684376d Mon Sep 17 00:00:00 2001 From: wwh <496479012@qq.com> Date: Tue, 2 Jun 2026 20:28:17 +0800 Subject: [PATCH] =?UTF-8?q?docs:=20=E5=8A=A0=E4=BA=86=E4=B8=A4=E4=B8=AA?= =?UTF-8?q?=E8=AE=A1=E5=88=92=E4=B9=A6=EF=BC=8C=E5=90=8E=E7=BB=AD=E5=81=9A?= =?UTF-8?q?=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/event-judgment-algorithm-architecture.md | 2232 +++++++++++++++++ docs/project-integration-plan.md | 522 ++++ 2 files changed, 2754 insertions(+) create mode 100644 docs/event-judgment-algorithm-architecture.md create mode 100644 docs/project-integration-plan.md diff --git a/docs/event-judgment-algorithm-architecture.md b/docs/event-judgment-algorithm-architecture.md new file mode 100644 index 0000000..bc0af7c --- /dev/null +++ b/docs/event-judgment-algorithm-architecture.md @@ -0,0 +1,2232 @@ +# 事件判断算法架构设计文档 + +## 文档信息 + +- **项目名称**: jc-video-recognize 视频模型检测平台 +- **模块名称**: 事件判断算法引擎 (Event Judgment Algorithm Engine) +- **制定日期**: 2026-06-02 +- **文档版本**: v1.0 + +--- + +## 目录 + +1. [架构概述](#1-架构概述) +2. [核心模块设计](#2-核心模块设计) +3. [数据流设计](#3-数据流设计) +4. [算法详细设计](#4-算法详细设计) +5. [配置管理](#5-配置管理) +6. [接口定义](#6-接口定义) +7. [性能考虑](#7-性能考虑) + +--- + +## 1. 架构概述 + +### 1.1 设计目标 + +事件判断算法引擎的核心目标是: +- **准确性**: 减少误报,确保真实事件被捕获 +- **实时性**: 低延迟的事件判断和预警 +- **可配置性**: 支持灵活的规则配置和场景适配 +- **可扩展性**: 易于添加新的事件类型和判断逻辑 + +### 1.2 整体架构图 + +``` +┌─────────────────────────────────────────────────────────────────────────────┐ +│ 事件判断算法引擎架构 │ +├─────────────────────────────────────────────────────────────────────────────┤ +│ │ +│ ┌─────────────────────────────────────────────────────────────────────┐ │ +│ │ 输入层 (Input Layer) │ │ +│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ +│ │ │ YOLO检测 │ │ 行为检测 │ │ RTSP流 │ │ 外部系统 │ │ │ +│ │ │ 结果 │ │ 结果 │ │ 帧数据 │ │ 输入 │ │ │ +│ │ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │ │ +│ │ └─────────────────┴─────────────────┴─────────────────┘ │ │ +│ └────────────────────────────────────┬────────────────────────────────┘ │ +│ │ │ +│ ▼ │ +│ ┌─────────────────────────────────────────────────────────────────────┐ │ +│ │ 决策层 (Decision Layer) │ │ +│ │ │ │ +│ │ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────┐ │ │ +│ │ │ 事件决策引擎 │─────▶│ 预警规则引擎 │─────▶│ 事件聚合器 │ │ │ +│ │ │ │ │ │ │ │ │ │ +│ │ │ • 置信度评估 │ │ • 规则匹配 │ │ • 去重合并 │ │ │ +│ │ │ • 场景识别 │ │ • 时间窗口 │ │ • 时间窗口 │ │ │ +│ │ │ • 初步筛选 │ │ • 区域规则 │ │ • 事件关联 │ │ │ +│ │ └─────────────────┘ └─────────────────┘ └──────┬──────┘ │ │ +│ │ │ │ │ +│ └────────────────────────────────────────────────────────────┼────────┘ │ +│ │ │ +│ ▼ │ +│ ┌─────────────────────────────────────────────────────────────────────┐ │ +│ │ AI增强层 (AI Enhancement) │ │ +│ │ │ │ +│ │ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────┐ │ │ +│ │ │ 大模型触发器 │─────▶│ LLM视觉分析 │─────▶│ 结果融合 │ │ │ +│ │ │ │ │ │ │ │ │ │ +│ │ │ • 智能决策 │ │ • 图像理解 │ │ • 置信融合 │ │ │ +│ │ │ • 优先级排序 │ │ • 场景分析 │ │ • 决策输出 │ │ │ +│ │ │ • 并发控制 │ │ • 推理验证 │ │ │ │ │ +│ │ └─────────────────┘ └─────────────────┘ └──────┬──────┘ │ │ +│ │ │ │ │ +│ └────────────────────────────────────────────────────────────┼────────┘ │ +│ │ │ +│ ▼ │ +│ ┌─────────────────────────────────────────────────────────────────────┐ │ +│ │ 输出层 (Output Layer) │ │ +│ │ │ │ +│ │ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────┐ │ │ +│ │ │ 严重性评估器 │─────▶│ 事件格式化 │─────▶│ MQTT发布 │ │ │ +│ │ │ │ │ │ │ │ │ │ +│ │ │ • 风险等级 │ │ • 标准格式 │ │ • 消息发布 │ │ │ +│ │ │ • 优先级计算 │ │ • 元数据填充 │ │ • QoS管理 │ │ │ +│ │ │ • 响应建议 │ │ • 快照关联 │ │ │ │ │ +│ │ └─────────────────┘ └─────────────────┘ └─────────────┘ │ │ +│ │ │ │ +│ └─────────────────────────────────────────────────────────────────────┘ │ +│ │ +└─────────────────────────────────────────────────────────────────────────────┘ +``` + +### 1.3 模块职责划分 + +| 模块名称 | 核心职责 | 输入 | 输出 | +|----------|----------|------|------| +| **事件决策引擎** | 初步判断检测结果是否构成事件 | 检测结果 | 候选事件列表 | +| **预警规则引擎** | 根据配置规则过滤事件 | 候选事件 | 符合规则的事件 | +| **事件聚合器** | 合并重复事件,去重 | 规则事件 | 聚合后的事件 | +| **大模型触发器** | 决策是否需要LLM二次判断 | 聚合事件 | LLM任务队列 | +| **LLM视觉分析** | 大模型图像理解和验证 | 图像ROI | 分析结果 | +| **结果融合** | 融合YOLO和LLM结果 | 多源结果 | 最终决策 | +| **严重性评估器** | 评估事件严重等级 | 最终事件 | 分级事件 | + +--- + +## 2. 核心模块设计 + +### 2.1 事件决策引擎 (EventDecisionEngine) + +#### 2.1.1 功能说明 + +事件决策引擎是事件判断的第一道关卡,负责: +- 评估检测结果的置信度 +- 识别检测场景(室内/室外/禁区等) +- 进行初步的事件筛选 + +#### 2.1.2 类设计 + +```python +class EventDecisionEngine: + """ + 事件决策引擎 + + 职责: + 1. 评估单个检测结果是否构成候选事件 + 2. 根据场景和置信度进行初步筛选 + 3. 为后续规则引擎提供标准化的事件描述 + """ + + def __init__(self, config: DecisionEngineConfig): + self.config = config + self.scene_classifier = SceneClassifier() + self.confidence_calibrator = ConfidenceCalibrator() + + def evaluate( + self, + detection_result: DetectionResult, + context: DetectionContext + ) -> List[CandidateEvent]: + """ + 评估检测结果 + + Args: + detection_result: YOLO/行为检测的结果 + context: 检测上下文(时间、位置、摄像头信息等) + + Returns: + 候选事件列表(可能为空) + """ + pass + + def _assess_confidence( + self, + detection: Detection, + model_type: str + ) -> ConfidenceAssessment: + """评估检测置信度的可靠性""" + pass + + def _classify_scene( + self, + frame: np.ndarray, + camera_config: CameraConfig + ) -> SceneType: + """识别检测场景类型""" + pass + + def _create_candidate_event( + self, + detection: Detection, + scene: SceneType, + confidence: ConfidenceAssessment + ) -> CandidateEvent: + """创建候选事件""" + pass +``` + +#### 2.1.3 决策流程 + +``` +┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ +│ 输入检测 │────▶│ 置信度评估 │────▶│ 场景识别 │────▶│ 事件创建 │ +│ 结果 │ │ │ │ │ │ │ +└─────────────┘ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ + │ │ │ + ▼ ▼ ▼ + ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ + │ • 原始置信度 │ │ • 室内/室外 │ │ • 事件ID │ + │ • 模型类型 │ │ • 区域类型 │ │ • 事件类型 │ + │ • 历史校准 │ │ • 时间特征 │ │ • 置信度 │ + │ • 可信度 │ │ • 光照条件 │ │ • 场景信息 │ + └─────────────┘ └─────────────┘ └─────────────┘ +``` + +#### 2.1.4 置信度评估策略 + +| 置信度区间 | 可信度等级 | 处理策略 | +|------------|------------|----------| +| 0.0 - 0.5 | 低 | 直接丢弃 | +| 0.5 - 0.7 | 中 | 标记为待验证,优先走LLM | +| 0.7 - 0.9 | 高 | 直接通过,可选LLM验证 | +| 0.9 - 1.0 | 极高 | 直接通过,跳过LLM | + +### 2.2 预警规则引擎 (AlertRuleEngine) + +#### 2.2.1 功能说明 + +预警规则引擎负责根据配置的规则判断事件是否需要触发预警: +- 时间规则(工作时间/非工作时间) +- 区域规则(禁区/普通区域) +- 频率规则(冷却时间、最大频次) +- 组合规则(多条件组合) + +#### 2.2.2 类设计 + +```python +class AlertRuleEngine: + """ + 预警规则引擎 + + 职责: + 1. 管理预警规则集合 + 2. 根据规则过滤候选事件 + 3. 支持动态规则更新 + """ + + def __init__(self, rule_repository: RuleRepository): + self.rule_repository = rule_repository + self.rule_cache = {} + + def check( + self, + candidate_event: CandidateEvent + ) -> RuleCheckResult: + """ + 检查候选事件是否满足预警规则 + + Args: + candidate_event: 候选事件 + + Returns: + 规则检查结果 + """ + pass + + def get_applicable_rules( + self, + event_type: str, + scene: SceneType + ) -> List[AlertRule]: + """获取适用于当前事件的所有规则""" + pass + + def evaluate_rule( + self, + rule: AlertRule, + event: CandidateEvent + ) -> bool: + """评估单个规则""" + pass + + +@dataclass +class AlertRule: + """预警规则定义""" + rule_id: str + rule_name: str + event_type: str + + # 置信度规则 + min_confidence: float = 0.5 + max_confidence: float = 1.0 + + # 时间规则 + time_windows: List[TimeWindow] = field(default_factory=list) + timezone: str = "Asia/Shanghai" + + # 区域规则 + applicable_areas: List[str] = field(default_factory=list) + excluded_areas: List[str] = field(default_factory=list) + + # 频率规则 + cooldown_seconds: int = 30 + max_alerts_per_hour: int = 10 + + # 启用状态 + enabled: bool = True + priority: int = 1 # 数字越小优先级越高 + + +@dataclass +class TimeWindow: + """时间窗口定义""" + days_of_week: List[int] # 0=周一, 6=周日 + start_time: str # "HH:MM" 格式 + end_time: str # "HH:MM" 格式 +``` + +#### 2.2.3 规则类型 + +```python +class RuleType(Enum): + """规则类型枚举""" + CONFIDENCE = "confidence" # 置信度规则 + TIME = "time" # 时间规则 + AREA = "area" # 区域规则 + FREQUENCY = "frequency" # 频率规则 + COMBINATION = "combination" # 组合规则 + SCENE = "scene" # 场景规则 +``` + +#### 2.2.4 规则配置示例 + +```yaml +# rules/fire_detection.yaml +rules: + - rule_id: "fire_critical" + rule_name: "火灾紧急预警" + event_type: "fire" + min_confidence: 0.8 + time_windows: + - days_of_week: [0, 1, 2, 3, 4, 5, 6] + start_time: "00:00" + end_time: "23:59" + applicable_areas: ["warehouse", "lab", "office"] + cooldown_seconds: 10 + max_alerts_per_hour: 100 + enabled: true + priority: 1 + + - rule_id: "fire_normal" + rule_name: "火灾普通预警" + event_type: "fire" + min_confidence: 0.5 + max_confidence: 0.8 + time_windows: + - days_of_week: [0, 1, 2, 3, 4, 5, 6] + start_time: "00:00" + end_time: "23:59" + applicable_areas: ["warehouse", "lab", "office"] + cooldown_seconds: 30 + max_alerts_per_hour: 20 + enabled: true + priority: 2 + +# rules/smoking_detection.yaml +rules: + - rule_id: "smoking_indoor" + rule_name: "室内吸烟预警" + event_type: "smoking" + min_confidence: 0.6 + scene_filter: + location_type: "indoor" + no_smoking_zone: true + time_windows: + - days_of_week: [0, 1, 2, 3, 4] + start_time: "09:00" + end_time: "18:00" + cooldown_seconds: 60 + enabled: true + priority: 1 +``` + +### 2.3 事件聚合器 (EventAggregator) + +#### 2.3.1 功能说明 + +事件聚合器负责: +- 合并同一目标的连续检测事件 +- 防止短时间内重复预警 +- 关联相似事件 + +#### 2.3.2 类设计 + +```python +class EventAggregator: + """ + 事件聚合器 + + 职责: + 1. 去重:合并同一目标的连续事件 + 2. 关联:识别相关事件 + 3. 窗口:基于时间窗口管理事件 + """ + + def __init__(self, config: AggregatorConfig): + self.config = config + self.active_events: Dict[str, AggregatedEvent] = {} + self.event_history: deque = deque(maxlen=1000) + + def aggregate( + self, + new_event: CandidateEvent + ) -> Optional[AggregatedEvent]: + """ + 聚合新事件 + + Args: + new_event: 新检测到的事件 + + Returns: + 聚合后的事件(如果是新事件)或None(如果是重复事件) + """ + pass + + def _find_matching_event( + self, + new_event: CandidateEvent + ) -> Optional[AggregatedEvent]: + """查找匹配的历史事件""" + pass + + def _calculate_similarity( + self, + event1: CandidateEvent, + event2: AggregatedEvent + ) -> float: + """计算两个事件的相似度""" + pass + + def _merge_events( + self, + existing: AggregatedEvent, + new_event: CandidateEvent + ) -> AggregatedEvent: + """合并两个事件""" + pass + + def cleanup_expired_events(self): + """清理过期的事件""" + pass + + +@dataclass +class AggregatedEvent: + """聚合事件""" + event_id: str + event_type: str + + # 时间信息 + first_seen: datetime + last_seen: datetime + + # 位置信息 + camera_id: str + location: Location + bbox_history: List[BBox] + + # 置信度信息 + confidence_max: float + confidence_min: float + confidence_avg: float + + # 检测次数 + detection_count: int + + # 关联的检测ID + detection_ids: List[str] + + # 状态 + status: EventStatus # active, closed, confirmed + + +class EventStatus(Enum): + """事件状态""" + ACTIVE = "active" # 活跃中(持续检测) + PENDING = "pending" # 待确认(等待LLM) + CONFIRMED = "confirmed" # 已确认 + CLOSED = "closed" # 已关闭 + FALSE_POSITIVE = "false_positive" # 误报 +``` + +#### 2.3.3 聚合策略 + +| 策略 | 说明 | 适用场景 | +|------|------|----------| +| **时间窗口** | 同一目标在N秒内的事件合并 | 持续检测场景 | +| **空间邻近** | 位置距离小于阈值的事件合并 | 移动目标跟踪 | +| **类型相同** | 同一类型的事件合并 | 同类事件去重 | +| **置信度加权** | 保留置信度最高的检测结果 | 质量优先 | + +#### 2.3.4 相似度计算 + +```python +def calculate_event_similarity( + event1: CandidateEvent, + event2: AggregatedEvent +) -> float: + """ + 计算事件相似度(0-1) + + 考虑因素: + 1. 时间 proximity + 2. 空间 proximity + 3. 类型匹配 + 4. 置信度差异 + """ + + # 时间相似度 + time_diff = abs( + (event1.timestamp - event2.last_seen).total_seconds() + ) + time_sim = max(0, 1 - time_diff / TIME_WINDOW_SECONDS) + + # 空间相似度(IoU或中心点距离) + spatial_sim = calculate_spatial_similarity( + event1.bbox, + event2.bbox_history[-1] + ) + + # 类型匹配 + type_sim = 1.0 if event1.event_type == event2.event_type else 0.0 + + # 加权综合 + similarity = ( + 0.3 * time_sim + + 0.4 * spatial_sim + + 0.3 * type_sim + ) + + return similarity +``` + +### 2.4 大模型触发器 (LLMTrigger) + +#### 2.4.1 功能说明 + +大模型触发器负责智能决策是否需要调用大模型进行二次判断: +- **多帧累积判断**: 不是单帧触发,而是基于多帧检测结果的综合判断 +- **置信度趋势分析**: 分析置信度变化趋势,避免瞬时波动误触发 +- **基于场景的强制触发**: 特定场景下强制触发LLM验证 +- **并发控制和队列管理**: 控制LLM调用频率和成本 +- **成本优化策略**: 缓存、去重、优先级队列 + +#### 2.4.2 多帧累积判断机制 + +``` +┌─────────────────────────────────────────────────────────────────────────────┐ +│ 多帧累积判断流程 │ +├─────────────────────────────────────────────────────────────────────────────┤ +│ │ +│ 帧N-3 帧N-2 帧N-1 帧N(当前) │ +│ │ │ │ │ │ +│ ▼ ▼ ▼ ▼ │ +│ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ │ +│ │检测 │ │检测 │ │检测 │ │检测 │ │ +│ │conf: │ │conf: │ │conf: │ │conf: │ │ +│ │ 0.55 │ │ 0.62 │ │ 0.58 │ │ 0.65 │ │ +│ └──┬───┘ └──┬───┘ └──┬───┘ └──┬───┘ │ +│ │ │ │ │ │ +│ └────────────┴────────────┴────────────┘ │ +│ │ │ +│ ▼ │ +│ ┌─────────────────────┐ │ +│ │ 多帧累积分析器 │ │ +│ │ │ │ +│ │ 分析维度: │ │ +│ │ • 检测频次: 4/4帧 │ ← 连续检测次数 │ +│ │ • 平均置信度: 0.60 │ ← 累积置信度 │ +│ │ • 置信度趋势: 上升 │ ← 趋势判断 │ +│ │ • 位置稳定性: 稳定 │ ← 空间一致性 │ +│ │ • 持续时间: 1.2秒 │ ← 时间累积 │ +│ └──────────┬──────────┘ │ +│ │ │ +│ ┌──────────┴──────────┐ │ +│ │ 触发决策判断 │ │ +│ │ │ │ +│ │ 触发条件检查: │ │ +│ │ ✓ 连续帧数 >= 3 │ ← 最小帧数阈值 │ +│ │ ✓ 平均置信度 >= 0.55│ ← 累积置信度阈值 │ +│ │ ✓ 位置变化 < 50px │ ← 空间稳定性阈值 │ +│ │ ✓ 持续时间 >= 1s │ ← 时间阈值 │ +│ │ │ │ +│ │ 决策结果: 触发LLM │ │ +│ └─────────────────────┘ │ +│ │ +└─────────────────────────────────────────────────────────────────────────────┘ +``` + +#### 2.4.3 类设计 + +```python +class LLMTrigger: + """ + 大模型触发器 + + 职责: + 1. 基于多帧累积的触发决策 + 2. 管理帧历史缓冲区 + 3. 分析检测趋势和稳定性 + 4. 管理LLM调用队列和成本 + """ + + def __init__(self, config: LLMTriggerConfig): + self.config = config + + # 多帧累积分析器 + self.frame_accumulator = MultiFrameAccumulator( + buffer_size=config.frame_buffer_size, + time_window=config.frame_time_window + ) + + # 队列和缓存 + self.task_queue = PriorityQueue() + self.result_cache = LRUCache(maxsize=1000) + + # 成本控制 + self.concurrent_calls = 0 + self.cost_tracker = CostTracker() + + # 触发状态追踪 + self.trigger_state: Dict[str, TriggerState] = {} + + def should_trigger( + self, + event: AggregatedEvent, + frame_context: FrameContext + ) -> TriggerDecision: + """ + 决策是否触发LLM分析 + + 基于多帧累积判断,不是单帧决策 + + Args: + event: 当前帧的聚合事件 + frame_context: 帧上下文信息 + + Returns: + 触发决策 + """ + # 1. 将当前帧加入累积器 + self.frame_accumulator.add_frame(event, frame_context) + + # 2. 获取多帧分析结果 + frame_analysis = self.frame_accumulator.analyze(event.event_id) + + # 3. 基于多帧结果做触发决策 + return self._make_trigger_decision(event, frame_analysis) + + def _make_trigger_decision( + self, + event: AggregatedEvent, + frame_analysis: FrameAnalysisResult + ) -> TriggerDecision: + """基于多帧分析结果做触发决策""" + + # 检查多帧触发条件 + trigger_conditions = self._evaluate_trigger_conditions(frame_analysis) + + if not trigger_conditions.all_met: + return TriggerDecision( + should_trigger=False, + priority=0.0, + reason=trigger_conditions.reason, + strategy="multi_frame_accumulation", + frame_stats=frame_analysis + ) + + # 计算优先级 + priority = self._calculate_priority_with_frames(event, frame_analysis) + + return TriggerDecision( + should_trigger=True, + priority=priority, + reason=f"多帧累积满足条件: {trigger_conditions.summary}", + strategy="multi_frame_accumulation", + frame_stats=frame_analysis + ) + + def _evaluate_trigger_conditions( + self, + frame_analysis: FrameAnalysisResult + ) -> TriggerConditions: + """评估多帧触发条件""" + conditions = TriggerConditions() + + # 条件1: 最小连续帧数 + if frame_analysis.consecutive_frames < self.config.min_consecutive_frames: + conditions.add_failure( + "consecutive_frames", + f"连续帧数不足: {frame_analysis.consecutive_frames} < {self.config.min_consecutive_frames}" + ) + + # 条件2: 累积置信度阈值 + if frame_analysis.avg_confidence < self.config.min_accumulated_confidence: + conditions.add_failure( + "avg_confidence", + f"平均置信度不足: {frame_analysis.avg_confidence:.2f} < {self.config.min_accumulated_confidence}" + ) + + # 条件3: 空间稳定性 + if frame_analysis.position_variance > self.config.max_position_variance: + conditions.add_failure( + "position_stability", + f"位置变化过大: {frame_analysis.position_variance:.1f}px" + ) + + # 条件4: 最小持续时间 + if frame_analysis.duration_seconds < self.config.min_duration_seconds: + conditions.add_failure( + "duration", + f"持续时间不足: {frame_analysis.duration_seconds:.1f}s < {self.config.min_duration_seconds}s" + ) + + # 条件5: 置信度趋势(可选) + if self.config.require_confidence_trend: + if frame_analysis.confidence_trend < 0: # 下降趋势 + conditions.add_failure( + "trend", + "置信度呈下降趋势" + ) + + return conditions + + def _calculate_priority_with_frames( + self, + event: AggregatedEvent, + frame_analysis: FrameAnalysisResult + ) -> float: + """基于多帧信息计算优先级""" + base_priority = self.get_priority_score(event) + + # 多帧因子调整 + frame_factors = { + # 连续帧数越多,优先级越高 + "consecutive_bonus": min(0.2, frame_analysis.consecutive_frames * 0.05), + + # 置信度趋势向上,增加优先级 + "trend_bonus": 0.1 if frame_analysis.confidence_trend > 0 else 0, + + # 位置越稳定,优先级越高 + "stability_bonus": 0.1 * (1 - min(1, frame_analysis.position_variance / 100)) + } + + adjusted_priority = base_priority + sum(frame_factors.values()) + return min(1.0, adjusted_priority) + + def get_priority_score(self, event: AggregatedEvent) -> float: + """计算基础优先级分数""" + pass + + def enqueue_analysis(self, event: AggregatedEvent) -> str: + """将事件加入LLM分析队列""" + pass + + def get_analysis_result( + self, + task_id: str, + timeout: float = 30.0 + ) -> Optional[LLMResult]: + """获取LLM分析结果""" + pass + + +class MultiFrameAccumulator: + """ + 多帧累积分析器 + + 管理多帧检测历史,分析检测趋势和稳定性 + """ + + def __init__(self, buffer_size: int = 30, time_window: float = 5.0): + self.buffer_size = buffer_size + self.time_window = time_window + + # 帧历史缓冲区: {event_id: deque[FrameRecord]} + self.frame_buffers: Dict[str, deque] = {} + + def add_frame( + self, + event: AggregatedEvent, + frame_context: FrameContext + ): + """添加一帧检测记录""" + event_id = event.event_id + + if event_id not in self.frame_buffers: + self.frame_buffers[event_id] = deque(maxlen=self.buffer_size) + + record = FrameRecord( + timestamp=frame_context.timestamp, + confidence=event.confidence_avg, + bbox=event.bbox, + frame_id=frame_context.frame_id + ) + + self.frame_buffers[event_id].append(record) + + # 清理过期缓冲区 + self._cleanup_expired_buffers() + + def analyze(self, event_id: str) -> FrameAnalysisResult: + """分析多帧检测数据""" + buffer = self.frame_buffers.get(event_id, deque()) + + if len(buffer) < 2: + return FrameAnalysisResult( + consecutive_frames=len(buffer), + avg_confidence=buffer[0].confidence if buffer else 0, + position_variance=0, + duration_seconds=0, + confidence_trend=0 + ) + + # 计算各项指标 + confidences = [r.confidence for r in buffer] + positions = [self._bbox_center(r.bbox) for r in buffer] + timestamps = [r.timestamp for r in buffer] + + return FrameAnalysisResult( + consecutive_frames=len(buffer), + avg_confidence=np.mean(confidences), + max_confidence=np.max(confidences), + min_confidence=np.min(confidences), + position_variance=self._calculate_position_variance(positions), + duration_seconds=(timestamps[-1] - timestamps[0]).total_seconds(), + confidence_trend=self._calculate_trend(confidences), + frames_per_second=len(buffer) / max(0.001, (timestamps[-1] - timestamps[0]).total_seconds()) + ) + + def _calculate_position_variance( + self, + positions: List[Tuple[int, int]] + ) -> float: + """计算位置方差(稳定性指标)""" + if len(positions) < 2: + return 0.0 + + x_coords = [p[0] for p in positions] + y_coords = [p[1] for p in positions] + + x_var = np.var(x_coords) + y_var = np.var(y_coords) + + return np.sqrt(x_var + y_var) + + def _calculate_trend(self, values: List[float]) -> float: + """计算趋势(线性回归斜率)""" + if len(values) < 2: + return 0.0 + + x = np.arange(len(values)) + slope, _, _, _, _ = linregress(x, values) + return slope + + def _bbox_center(self, bbox: Tuple[int, int, int, int]) -> Tuple[int, int]: + """计算边界框中心点""" + x1, y1, x2, y2 = bbox + return ((x1 + x2) // 2, (y1 + y2) // 2) + + def _cleanup_expired_buffers(self): + """清理过期的帧缓冲区""" + current_time = datetime.now() + expired_events = [] + + for event_id, buffer in self.frame_buffers.items(): + if buffer: + last_time = buffer[-1].timestamp + if (current_time - last_time).total_seconds() > self.time_window * 2: + expired_events.append(event_id) + + for event_id in expired_events: + del self.frame_buffers[event_id] + + +@dataclass +class FrameRecord: + """单帧检测记录""" + timestamp: datetime + confidence: float + bbox: Tuple[int, int, int, int] + frame_id: int + + +@dataclass +class FrameAnalysisResult: + """多帧分析结果""" + consecutive_frames: int # 连续检测帧数 + avg_confidence: float # 平均置信度 + max_confidence: float # 最大置信度 + min_confidence: float # 最小置信度 + position_variance: float # 位置方差(像素) + duration_seconds: float # 持续时间(秒) + confidence_trend: float # 置信度趋势(斜率) + frames_per_second: float = 0.0 # 检测帧率 + + +@dataclass +class TriggerDecision: + """触发决策结果""" + should_trigger: bool + priority: float # 优先级分数 + reason: str # 决策原因 + strategy: str # 使用的策略 + frame_stats: Optional[FrameAnalysisResult] = None # 多帧统计 + + +@dataclass +class TriggerConditions: + """触发条件评估结果""" + failures: List[Dict] = field(default_factory=list) + + def add_failure(self, condition: str, reason: str): + self.failures.append({"condition": condition, "reason": reason}) + + @property + def all_met(self) -> bool: + return len(self.failures) == 0 + + @property + def reason(self) -> str: + if self.all_met: + return "所有条件满足" + return "; ".join([f["reason"] for f in self.failures]) + + @property + def summary(self) -> str: + return f"通过{len(self.failures)}项, 失败{len(self.failures)}项" + + +@dataclass +class LLMTriggerConfig: + """LLM触发器配置""" + + # ========== 多帧累积配置 ========== + # 帧缓冲区大小 + frame_buffer_size: int = 30 + + # 帧时间窗口(秒) + frame_time_window: float = 5.0 + + # 最小连续帧数(触发LLM的最小帧数) + min_consecutive_frames: int = 3 + + # 最小累积置信度 + min_accumulated_confidence: float = 0.55 + + # 最大位置方差(像素平方) + max_position_variance: float = 2500 # 50px * 50px + + # 最小持续时间(秒) + min_duration_seconds: float = 1.0 + + # 是否要求置信度趋势向上 + require_confidence_trend: bool = False + + # ========== 置信度触发区间 ========== + confidence_trigger_range: Tuple[float, float] = (0.5, 0.85) + + # ========== 强制触发场景 ========== + force_trigger_scenes: List[str] = field( + default_factory=lambda: ["indoor_no_smoking_area", "chemical_lab"] + ) + + # ========== 并发控制 ========== + max_concurrent_calls: int = 5 + max_queue_size: int = 100 + + # ========== 成本限制 ========== + max_calls_per_minute: int = 30 + max_cost_per_hour: float = 10.0 + + # ========== 缓存配置 ========== + cache_ttl_seconds: int = 300 + similarity_threshold: float = 0.9 +``` + +#### 2.4.4 触发策略(含多帧判断) + +```python +class TriggerStrategy: + """触发策略集合""" + + @staticmethod + def multi_frame_strategy( + event: AggregatedEvent, + frame_analysis: FrameAnalysisResult, + config: LLMTriggerConfig + ) -> TriggerDecision: + """ + 多帧累积触发策略(默认策略) + + 核心逻辑: + 1. 不是单帧触发,而是基于多帧的综合判断 + 2. 要求检测在时间和空间上都是稳定的 + 3. 避免瞬时误检导致的误触发 + """ + # 快速拒绝:如果帧数太少 + if frame_analysis.consecutive_frames < config.min_consecutive_frames: + return TriggerDecision( + should_trigger=False, + priority=0.0, + reason=f"连续帧数不足: {frame_analysis.consecutive_frames}/{config.min_consecutive_frames}", + strategy="multi_frame", + frame_stats=frame_analysis + ) + + # 快速拒绝:如果平均置信度太低 + if frame_analysis.avg_confidence < config.min_accumulated_confidence: + return TriggerDecision( + should_trigger=False, + priority=0.0, + reason=f"平均置信度不足: {frame_analysis.avg_confidence:.2f}/{config.min_accumulated_confidence}", + strategy="multi_frame", + frame_stats=frame_analysis + ) + + # 检查空间稳定性 + if frame_analysis.position_variance > config.max_position_variance: + return TriggerDecision( + should_trigger=False, + priority=0.0, + reason=f"位置不稳定: 方差={frame_analysis.position_variance:.1f}px²", + strategy="multi_frame", + frame_stats=frame_analysis + ) + + # 检查持续时间 + if frame_analysis.duration_seconds < config.min_duration_seconds: + return TriggerDecision( + should_trigger=False, + priority=0.0, + reason=f"持续时间不足: {frame_analysis.duration_seconds:.1f}s/{config.min_duration_seconds}s", + strategy="multi_frame", + frame_stats=frame_analysis + ) + + # 所有条件满足,计算优先级 + priority = TriggerStrategy._calculate_frame_based_priority( + frame_analysis, config + ) + + return TriggerDecision( + should_trigger=True, + priority=priority, + reason=f"多帧累积满足条件: {frame_analysis.consecutive_frames}帧, " + f"平均置信度{frame_analysis.avg_confidence:.2f}, " + f"持续{frame_analysis.duration_seconds:.1f}s", + strategy="multi_frame", + frame_stats=frame_analysis + ) + + @staticmethod + def _calculate_frame_based_priority( + frame_analysis: FrameAnalysisResult, + config: LLMTriggerConfig + ) -> float: + """基于多帧信息计算优先级""" + + # 基础分:平均置信度 + base_score = frame_analysis.avg_confidence + + # 连续帧数奖励(帧数越多越可靠) + frame_bonus = min(0.15, (frame_analysis.consecutive_frames - config.min_consecutive_frames) * 0.03) + + # 稳定性奖励(方差越小越稳定) + stability_ratio = 1 - min(1, frame_analysis.position_variance / config.max_position_variance) + stability_bonus = 0.1 * stability_ratio + + # 趋势奖励(置信度上升) + trend_bonus = 0.05 if frame_analysis.confidence_trend > 0 else 0 + + total_score = base_score + frame_bonus + stability_bonus + trend_bonus + return min(1.0, total_score) + + @staticmethod + def scene_based_strategy( + event: AggregatedEvent, + config: LLMTriggerConfig + ) -> TriggerDecision: + """ + 基于场景的策略 + + 某些场景强制触发LLM验证(不依赖多帧累积) + """ + scene = event.location.scene_type + + if scene in config.force_trigger_scenes: + return TriggerDecision( + should_trigger=True, + priority=1.0, + reason=f"强制触发场景: {scene}", + strategy="scene_based" + ) + + return TriggerDecision( + should_trigger=False, + priority=0.0, + reason="非强制触发场景", + strategy="scene_based" + ) + + @staticmethod + def single_frame_high_confidence_strategy( + event: AggregatedEvent, + config: LLMTriggerConfig + ) -> TriggerDecision: + """ + 单帧高置信度策略 + + 对于极高置信度的检测,可以跳过多帧累积直接触发 + (用于紧急情况) + """ + if event.confidence_avg > 0.95: + return TriggerDecision( + should_trigger=True, + priority=0.9, + reason=f"单帧极高置信度: {event.confidence_avg:.2f}", + strategy="single_frame_high_conf" + ) + + return TriggerDecision( + should_trigger=False, + priority=0.0, + reason="置信度未达到单帧触发阈值", + strategy="single_frame_high_conf" + ) +``` + +#### 2.4.5 多帧累积配置示例 + +```yaml +# config/llm_trigger.yaml + +llm_trigger: + # ========== 多帧累积配置 ========== + multi_frame: + # 帧缓冲区大小(保存最近N帧) + buffer_size: 30 + + # 帧时间窗口(秒)- 只分析最近5秒内的帧 + time_window: 5.0 + + # 最小连续帧数 - 至少检测到3帧才考虑触发 + min_consecutive_frames: 3 + + # 最小累积置信度 - 多帧平均置信度阈值 + min_accumulated_confidence: 0.55 + + # 最大位置方差(像素平方)- 位置变化不能超过50px + max_position_variance: 2500 + + # 最小持续时间(秒)- 检测至少持续1秒 + min_duration_seconds: 1.0 + + # 是否要求置信度趋势向上 + require_confidence_trend: false + + # 帧率要求(fps)- 检测帧率不能太低 + min_detection_fps: 2.0 + + # ========== 触发策略优先级 ========== + strategy_priority: + - "scene_based" # 场景强制触发(最高优先级) + - "single_frame_high_conf" # 单帧极高置信度 + - "multi_frame" # 多帧累积(默认) + + # ========== 不同事件类型的多帧配置 ========== + event_type_overrides: + fire: + # 火灾检测更敏感,减少帧数要求 + min_consecutive_frames: 2 + min_duration_seconds: 0.5 + min_accumulated_confidence: 0.50 + + smoking: + # 抽烟检测需要更稳定 + min_consecutive_frames: 5 + min_duration_seconds: 2.0 + min_accumulated_confidence: 0.60 + require_confidence_trend: true + + loitering: + # 徘徊检测本来就需要时间 + min_consecutive_frames: 10 + min_duration_seconds: 5.0 + min_accumulated_confidence: 0.55 +``` + +### 2.5 结果融合 (ResultFusion) + +#### 2.5.1 功能说明 + +结果融合模块负责将YOLO检测结果和LLM分析结果进行融合,得出最终决策: +- 置信度加权融合 +- 冲突解决策略 +- 决策解释生成 + +#### 2.5.2 类设计 + +```python +class ResultFusion: + """ + 结果融合器 + + 职责: + 1. 融合多源检测结果 + 2. 解决结果冲突 + 3. 生成决策解释 + """ + + def __init__(self, config: FusionConfig): + self.config = config + self.fusion_strategies = { + "weighted": WeightedFusionStrategy(), + "conservative": ConservativeFusionStrategy(), + "optimistic": OptimisticFusionStrategy() + } + + def fuse( + self, + yolo_result: DetectionResult, + llm_result: Optional[LLMResult], + strategy: str = "weighted" + ) -> FusionResult: + """ + 融合检测结果 + + Args: + yolo_result: YOLO检测结果 + llm_result: LLM分析结果(可能为None) + strategy: 融合策略 + + Returns: + 融合后的结果 + """ + pass + + def _calculate_fused_confidence( + self, + yolo_conf: float, + llm_conf: float, + weights: Tuple[float, float] + ) -> float: + """计算融合后的置信度""" + pass + + def _generate_explanation( + self, + yolo_result: DetectionResult, + llm_result: Optional[LLMResult], + fused_result: FusionResult + ) -> str: + """生成决策解释""" + pass + + +@dataclass +class FusionResult: + """融合结果""" + is_positive: bool # 是否为真实事件 + confidence: float # 融合后的置信度 + yolo_contribution: float # YOLO贡献度 + llm_contribution: float # LLM贡献度 + explanation: str # 决策解释 + metadata: Dict # 元数据 +``` + +#### 2.5.3 融合策略 + +| 策略 | 说明 | 适用场景 | +|------|------|----------| +| **加权融合** | 按权重合并YOLO和LLM置信度 | 通用场景 | +| **保守策略** | 两者都高才算高置信度 | 高风险场景 | +| **乐观策略** | 任一高就算高置信度 | 宁可误报不可漏报 | +| **LLM优先** | LLM结果覆盖YOLO | LLM可靠性高时 | + +### 2.6 严重性评估器 (SeverityAssessor) + +#### 2.6.1 功能说明 + +严重性评估器负责评估事件的严重程度: +- 基于事件类型的基础等级 +- 基于场景的调整 +- 基于持续时间的升级 +- 生成响应建议 + +#### 2.6.2 类设计 + +```python +class SeverityAssessor: + """ + 严重性评估器 + + 职责: + 1. 评估事件严重程度 + 2. 计算响应优先级 + 3. 生成响应建议 + """ + + def __init__(self, config: SeverityConfig): + self.config = config + self.severity_matrix = self._load_severity_matrix() + + def assess( + self, + event: FusionResult, + context: EventContext + ) -> SeverityAssessment: + """ + 评估事件严重性 + + Args: + event: 融合后的事件结果 + context: 事件上下文 + + Returns: + 严重性评估结果 + """ + pass + + def _calculate_base_severity( + self, + event_type: str + ) -> SeverityLevel: + """计算基础严重等级""" + pass + + def _apply_scene_modifier( + self, + base_severity: SeverityLevel, + scene: SceneType + ) -> SeverityLevel: + """应用场景调整""" + pass + + def _apply_duration_modifier( + self, + severity: SeverityLevel, + duration_seconds: float + ) -> SeverityLevel: + """应用持续时间调整""" + pass + + def generate_response_recommendation( + self, + assessment: SeverityAssessment + ) -> ResponseRecommendation: + """生成响应建议""" + pass + + +class SeverityLevel(Enum): + """严重等级枚举""" + LOW = "low" # 低 + MEDIUM = "medium" # 中 + HIGH = "high" # 高 + CRITICAL = "critical" # 紧急 + + +@dataclass +class SeverityAssessment: + """严重性评估结果""" + level: SeverityLevel + score: float # 0-100 + factors: List[str] # 影响因素 + escalation_recommended: bool # 建议升级 + response_time_target: int # 目标响应时间(秒) + + +@dataclass +class ResponseRecommendation: + """响应建议""" + immediate_actions: List[str] + notification_targets: List[str] + escalation_path: Optional[str] + documentation_required: bool +``` + +#### 2.6.3 严重性矩阵 + +| 事件类型 | 基础等级 | 室内场景 | 夜间场景 | 持续>5分钟 | +|----------|----------|----------|----------|------------| +| 火灾 | CRITICAL | +0 | +0 | +0 | +| 抽烟 | MEDIUM | HIGH | MEDIUM | HIGH | +| 徘徊 | LOW | MEDIUM | HIGH | MEDIUM | +| 违停 | LOW | +0 | LOW | MEDIUM | + +--- + +## 3. 数据流设计 + +### 3.1 完整数据流图 + +``` +┌─────────────────────────────────────────────────────────────────────────────┐ +│ 完整事件处理数据流 │ +└─────────────────────────────────────────────────────────────────────────────┘ + +[视频帧输入] + │ + ▼ +┌─────────────────┐ +│ YOLO检测 │ +│ • 目标检测 │ +│ • 行为分析 │ +└────────┬────────┘ + │ DetectionResult + ▼ +┌─────────────────┐ ┌─────────────────┐ +│ 事件决策引擎 │────▶│ 置信度评估 │ +│ │ │ • 原始置信度 │ +│ • 初步筛选 │ │ • 模型可信度 │ +│ • 场景识别 │ │ • 历史校准 │ +└────────┬────────┘ └─────────────────┘ + │ CandidateEvent + ▼ +┌─────────────────┐ ┌─────────────────┐ +│ 预警规则引擎 │────▶│ 规则匹配 │ +│ │ │ • 时间规则 │ +│ • 规则过滤 │ │ • 区域规则 │ +│ • 频次检查 │ │ • 置信度规则 │ +└────────┬────────┘ └─────────────────┘ + │ RuleCheckResult + ▼ +┌─────────────────┐ ┌─────────────────┐ +│ 事件聚合器 │────▶│ 去重策略 │ +│ │ │ • 时间窗口 │ +│ • 事件合并 │ │ • 空间邻近 │ +│ • 关联分析 │ │ • 相似度计算 │ +└────────┬────────┘ └─────────────────┘ + │ AggregatedEvent + ▼ +┌─────────────────┐ ┌─────────────────┐ +│ 大模型触发器 │────▶│ 触发决策 │ +│ │ │ • 置信度区间 │ +│ • 智能决策 │ │ • 场景匹配 │ +│ • 队列管理 │ │ • 并发控制 │ +└────────┬────────┘ └─────────────────┘ + │ (可选分支) + ┌────┴────┐ + │ │ + ▼ ▼ +┌──────┐ ┌─────────────────┐ +│ 跳过 │ │ LLM视觉分析 │ +│ LLM │ │ │ +│ │ │ • 图像理解 │ +│ │ │ • 场景分析 │ +│ │ │ • 推理验证 │ +└──┬───┘ └────────┬────────┘ + │ │ LLMResult + └───────┬───────┘ + ▼ +┌─────────────────┐ ┌─────────────────┐ +│ 结果融合 │────▶│ 融合策略 │ +│ │ │ • 加权融合 │ +│ • 置信度合并 │ │ • 冲突解决 │ +│ • 决策生成 │ │ • 解释生成 │ +└────────┬────────┘ └─────────────────┘ + │ FusionResult + ▼ +┌─────────────────┐ ┌─────────────────┐ +│ 严重性评估器 │────▶│ 等级计算 │ +│ │ │ • 基础等级 │ +│ • 风险评估 │ │ • 场景调整 │ +│ • 响应建议 │ │ • 持续升级 │ +└────────┬────────┘ └─────────────────┘ + │ SeverityAssessment + ▼ +┌─────────────────┐ ┌─────────────────┐ +│ 事件格式化 │────▶│ 标准格式 │ +│ │ │ • 事件ID │ +│ • 元数据填充 │ │ • 时间戳 │ +│ • 快照关联 │ │ • 位置信息 │ +└────────┬────────┘ └─────────────────┘ + │ FinalEvent + ▼ +┌─────────────────┐ +│ MQTT发布 │ +│ │ +│ • 消息序列化 │ +│ • QoS管理 │ +│ • 发布确认 │ +└─────────────────┘ +``` + +### 3.2 状态流转 + +``` + ┌─────────────┐ + │ DETECTED │ + │ (检测到) │ + └──────┬──────┘ + │ + ┌────────────┼────────────┐ + │ │ │ + ▼ ▼ ▼ + ┌──────────┐ ┌──────────┐ ┌──────────┐ + │ REJECTED │ │ PENDING │ │ CONFIRMED│ + │ (已拒绝) │ │ (待确认) │ │ (已确认) │ + └──────────┘ └────┬─────┘ └────┬─────┘ + │ │ + ▼ │ + ┌──────────┐ │ + │ LLM_ANAL │ │ + │ (LLM分析)│ │ + └────┬─────┘ │ + │ │ + ▼ │ + ┌──────────┐ │ + │ FUSION │◀───────┘ + │ (融合) │ + └────┬─────┘ + │ + ▼ + ┌──────────┐ + │ SEVERITY │ + │ (严重性) │ + └────┬─────┘ + │ + ▼ + ┌──────────┐ + │ PUBLISHED│ + │ (已发布) │ + └──────────┘ +``` + +--- + +## 4. 算法详细设计 + +### 4.1 置信度校准算法 + +```python +class ConfidenceCalibrator: + """ + 置信度校准器 + + 使用温度缩放(Temperature Scaling)校准模型置信度 + """ + + def __init__(self, temperature: float = 1.0): + self.temperature = temperature + self.calibration_history = [] + + def calibrate( + self, + raw_confidence: float, + model_type: str, + scene: SceneType + ) -> float: + """ + 校准置信度 + + 公式: calibrated = sigmoid(logit(raw) / temperature) + """ + # 防止除零 + eps = 1e-10 + + # 转换为logit + raw_clipped = np.clip(raw_confidence, eps, 1 - eps) + logit = np.log(raw_clipped / (1 - raw_clipped)) + + # 温度缩放 + scaled_logit = logit / self.temperature + + # 转回概率 + calibrated = 1 / (1 + np.exp(-scaled_logit)) + + # 场景调整 + scene_factor = self._get_scene_factor(scene) + + return min(1.0, calibrated * scene_factor) + + def _get_scene_factor(self, scene: SceneType) -> float: + """获取场景调整因子""" + factors = { + SceneType.INDOOR: 0.95, + SceneType.OUTDOOR: 1.0, + SceneType.LOW_LIGHT: 0.85, + SceneType.CROWDED: 0.9 + } + return factors.get(scene, 1.0) +``` + +### 4.2 事件相似度算法 + +```python +class EventSimilarityCalculator: + """事件相似度计算器""" + + @staticmethod + def calculate_iou( + bbox1: Tuple[int, int, int, int], + bbox2: Tuple[int, int, int, int] + ) -> float: + """计算边界框IoU""" + x1_1, y1_1, x2_1, y2_1 = bbox1 + x1_2, y1_2, x2_2, y2_2 = bbox2 + + # 计算交集 + xi1 = max(x1_1, x1_2) + yi1 = max(y1_1, y1_2) + xi2 = min(x2_1, x2_2) + yi2 = min(y2_1, y2_2) + + inter_width = max(0, xi2 - xi1) + inter_height = max(0, yi2 - yi1) + inter_area = inter_width * inter_height + + # 计算并集 + box1_area = (x2_1 - x1_1) * (y2_1 - y1_1) + box2_area = (x2_2 - x1_2) * (y2_2 - y1_2) + union_area = box1_area + box2_area - inter_area + + return inter_area / union_area if union_area > 0 else 0.0 + + @staticmethod + def calculate_temporal_proximity( + time1: datetime, + time2: datetime, + max_window: int = 30 + ) -> float: + """计算时间邻近度""" + diff_seconds = abs((time1 - time2).total_seconds()) + return max(0, 1 - diff_seconds / max_window) + + @staticmethod + def calculate_feature_similarity( + event1: CandidateEvent, + event2: CandidateEvent + ) -> float: + """计算特征相似度""" + similarities = [] + + # 类型相似度 + type_sim = 1.0 if event1.event_type == event2.event_type else 0.0 + similarities.append(type_sim) + + # 置信度相似度 + conf_sim = 1.0 - abs( + event1.confidence - event2.confidence + ) + similarities.append(conf_sim) + + # 空间相似度 + spatial_sim = EventSimilarityCalculator.calculate_iou( + event1.bbox, event2.bbox + ) + similarities.append(spatial_sim) + + return np.mean(similarities) +``` + +### 4.3 优先级队列算法 + +```python +class PriorityQueueManager: + """优先级队列管理器""" + + def __init__(self): + self.queue = [] + self.counter = 0 + + def enqueue( + self, + event: AggregatedEvent, + priority_score: float + ) -> str: + """ + 将事件加入优先级队列 + + 优先级计算考虑: + 1. 事件严重性(40%) + 2. 置信度不确定性(30%) + 3. 时间紧迫性(20%) + 4. 历史误报率(10%) + """ + task_id = f"llm_task_{self.counter}" + self.counter += 1 + + # 使用堆实现优先级队列 + # Python heapq是最小堆,所以用负数优先级 + heapq.heappush( + self.queue, + (-priority_score, self.counter, task_id, event) + ) + + return task_id + + def dequeue(self) -> Optional[Tuple[str, AggregatedEvent]]: + """取出最高优先级的事件""" + if not self.queue: + return None + + _, _, task_id, event = heapq.heappop(self.queue) + return task_id, event + + def calculate_priority( + self, + event: AggregatedEvent, + severity: SeverityLevel, + fp_rate: float + ) -> float: + """计算优先级分数""" + # 严重性权重 (0-1) + severity_weights = { + SeverityLevel.CRITICAL: 1.0, + SeverityLevel.HIGH: 0.8, + SeverityLevel.MEDIUM: 0.5, + SeverityLevel.LOW: 0.2 + } + severity_score = severity_weights.get(severity, 0.5) + + # 置信度不确定性 (0-1, 越高越不确定) + uncertainty = 1.0 - event.confidence_avg + + # 时间紧迫性 (0-1) + time_urgency = self._calculate_time_urgency(event) + + # 综合计算 + priority = ( + 0.4 * severity_score + + 0.3 * uncertainty + + 0.2 * time_urgency + + 0.1 * min(1.0, fp_rate * 2) # 误报率越高优先级越高(需要验证) + ) + + return priority +``` + +--- + +## 5. 配置管理 + +### 5.1 配置文件结构 + +```yaml +# config/event_engine.yaml + +event_decision_engine: + confidence_thresholds: + discard: 0.5 + llm_required: 0.7 + direct_pass: 0.85 + + scene_classification: + enabled: true + model_path: "models/scene_classifier.onnx" + + confidence_calibration: + enabled: true + temperature: 1.2 + history_window: 1000 + +alert_rule_engine: + rule_files: + - "rules/fire_detection.yaml" + - "rules/smoking_detection.yaml" + - "rules/loitering_detection.yaml" + + default_rules: + cooldown_seconds: 30 + max_alerts_per_hour: 50 + + dynamic_update: + enabled: true + check_interval: 60 + +event_aggregator: + dedup_window_seconds: 30 + spatial_tolerance_pixels: 50 + iou_threshold: 0.3 + max_events_in_memory: 1000 + cleanup_interval: 300 + +llm_trigger: + confidence_trigger_range: [0.5, 0.85] + + force_trigger_scenes: + - "indoor_no_smoking_area" + - "chemical_lab" + - "server_room" + + concurrency_control: + max_concurrent_calls: 5 + max_queue_size: 100 + timeout_seconds: 30 + + cost_control: + max_calls_per_minute: 30 + max_cost_per_hour: 10.0 + cache_enabled: true + cache_ttl_seconds: 300 + + priority_weights: + severity: 0.4 + uncertainty: 0.3 + urgency: 0.2 + false_positive_rate: 0.1 + +result_fusion: + default_strategy: "weighted" + weights: + yolo: 0.6 + llm: 0.4 + + strategies: + weighted: + yolo_weight: 0.6 + llm_weight: 0.4 + conservative: + mode: "both_high" + optimistic: + mode: "either_high" + +severity_assessor: + base_severity: + fire: "critical" + smoking: "medium" + loitering: "low" + illegal_parking: "low" + + scene_modifiers: + indoor: + smoking: +1 # 升级一级 + night_time: + loitering: +1 + crowded: + fire: +0 # 已经是最高 + + duration_escalation: + enabled: true + thresholds: + - duration: 300 # 5分钟 + escalation: +1 + - duration: 600 # 10分钟 + escalation: +2 + + response_time_targets: + critical: 10 # 秒 + high: 30 + medium: 120 + low: 300 +``` + +### 5.2 规则文件示例 + +```yaml +# rules/fire_detection.yaml +rules: + - rule_id: "fire_critical" + rule_name: "火灾紧急预警" + event_type: "fire" + description: "高置信度火灾检测,立即预警" + + conditions: + confidence: + min: 0.8 + max: 1.0 + time: + - days: [0, 1, 2, 3, 4, 5, 6] # 每天 + start: "00:00" + end: "23:59" + areas: + include: ["warehouse", "lab", "office", "server_room"] + exclude: [] + + actions: + alert_level: "critical" + cooldown_seconds: 10 + max_frequency: + per_hour: 100 + per_day: 500 + notifications: + - type: "mqtt" + topic: "jc-video/alerts/fire/critical" + qos: 2 + - type: "webhook" + url: "https://api.company.com/alerts" + + enabled: true + priority: 1 + + - rule_id: "fire_normal" + rule_name: "火灾普通预警" + event_type: "fire" + description: "中等置信度火灾检测,需要LLM验证" + + conditions: + confidence: + min: 0.5 + max: 0.8 + time: + - days: [0, 1, 2, 3, 4, 5, 6] + start: "00:00" + end: "23:59" + + actions: + alert_level: "high" + llm_verification: true + cooldown_seconds: 30 + max_frequency: + per_hour: 20 + notifications: + - type: "mqtt" + topic: "jc-video/alerts/fire/high" + qos: 2 + + enabled: true + priority: 2 +``` + +--- + +## 6. 接口定义 + +### 6.1 内部接口 + +```python +# 事件决策引擎接口 +class IEventDecisionEngine(ABC): + @abstractmethod + def evaluate( + self, + detection_result: DetectionResult, + context: DetectionContext + ) -> List[CandidateEvent]: + pass + +# 预警规则引擎接口 +class IAlertRuleEngine(ABC): + @abstractmethod + def check( + self, + candidate_event: CandidateEvent + ) -> RuleCheckResult: + pass + + @abstractmethod + def reload_rules(self) -> bool: + """热加载规则""" + pass + +# 事件聚合器接口 +class IEventAggregator(ABC): + @abstractmethod + def aggregate( + self, + new_event: CandidateEvent + ) -> Optional[AggregatedEvent]: + pass + + @abstractmethod + def get_active_events(self) -> List[AggregatedEvent]: + pass + +# LLM触发器接口 +class ILLMTrigger(ABC): + @abstractmethod + def should_trigger( + self, + event: AggregatedEvent + ) -> TriggerDecision: + pass + + @abstractmethod + def submit_analysis( + self, + event: AggregatedEvent + ) -> str: + """提交分析任务,返回任务ID""" + pass + + @abstractmethod + def get_result( + self, + task_id: str + ) -> Optional[LLMResult]: + pass + +# 严重性评估器接口 +class ISeverityAssessor(ABC): + @abstractmethod + def assess( + self, + event: FusionResult, + context: EventContext + ) -> SeverityAssessment: + pass +``` + +### 6.2 外部接口 + +```python +# 主入口接口 +class IEventJudgmentEngine(ABC): + """ + 事件判断引擎主接口 + + 这是外部系统调用的统一入口 + """ + + @abstractmethod + async def process_detection( + self, + detection_result: DetectionResult, + context: DetectionContext + ) -> Optional[FinalEvent]: + """ + 处理检测结果 + + Args: + detection_result: YOLO/行为检测结果 + context: 检测上下文 + + Returns: + 最终事件(如果需要预警)或None + """ + pass + + @abstractmethod + async def process_frame( + self, + frame: np.ndarray, + camera_id: str, + timestamp: datetime + ) -> List[FinalEvent]: + """ + 处理视频帧(完整流程) + + Args: + frame: 视频帧 + camera_id: 摄像头ID + timestamp: 时间戳 + + Returns: + 检测到的事件列表 + """ + pass + + @abstractmethod + def get_statistics(self) -> EngineStatistics: + """获取引擎统计信息""" + pass + + @abstractmethod + def reload_configuration(self) -> bool: + """重新加载配置""" + pass +``` + +--- + +## 7. 性能考虑 + +### 7.1 性能指标目标 + +| 指标 | 目标值 | 说明 | +|------|--------|------| +| 事件决策延迟 | < 10ms | 单事件决策时间 | +| 规则匹配延迟 | < 5ms | 单规则匹配时间 | +| 事件聚合延迟 | < 20ms | 包括相似度计算 | +| LLM触发决策 | < 5ms | 决策时间(不含LLM调用) | +| 严重性评估 | < 5ms | 评估时间 | +| 总处理延迟 | < 50ms | 不含LLM的完整流程 | +| 并发处理能力 | > 100事件/秒 | 单实例 | +| 内存占用 | < 500MB | 事件缓存和队列 | + +### 7.2 优化策略 + +#### 7.2.1 计算优化 + +| 优化点 | 策略 | 预期收益 | +|--------|------|----------| +| 规则匹配 | 使用倒排索引 | 减少90%的比较次数 | +| 相似度计算 | 空间哈希网格 | O(n) -> O(1) | +| 置信度校准 | 预计算查找表 | 避免重复计算 | +| 场景分类 | 轻量级模型 | < 5ms推理时间 | + +#### 7.2.2 缓存策略 + +```python +class EventEngineCache: + """事件引擎缓存管理""" + + def __init__(self): + # 规则缓存 + self.rule_cache = LRUCache(maxsize=1000) + + # 场景分类缓存(帧级别) + self.scene_cache = LRUCache(maxsize=100) + + # LLM结果缓存(图像特征级别) + self.llm_cache = LRUCache(maxsize=1000) + + # 置信度校准缓存 + self.calibration_cache = LRUCache(maxsize=5000) + + def get_cache_key( + self, + frame_hash: str, + detection_bbox: Tuple[int, int, int, int] + ) -> str: + """生成缓存键""" + return f"{frame_hash}_{bbox_hash(detection_bbox)}" +``` + +#### 7.2.3 异步处理 + +```python +class AsyncEventProcessor: + """异步事件处理器""" + + def __init__(self): + self.processing_queue = asyncio.Queue() + self.llm_semaphore = asyncio.Semaphore(5) + self.worker_tasks = [] + + async def start(self, num_workers: int = 4): + """启动处理工作线程""" + self.worker_tasks = [ + asyncio.create_task(self._worker()) + for _ in range(num_workers) + ] + + async def _worker(self): + """工作线程""" + while True: + event = await self.processing_queue.get() + try: + await self._process_event(event) + finally: + self.processing_queue.task_done() + + async def _process_event(self, event: AggregatedEvent): + """处理单个事件""" + # 决策是否需要LLM + trigger_decision = await self.llm_trigger.should_trigger(event) + + if trigger_decision.should_trigger: + async with self.llm_semaphore: + llm_result = await self.llm_client.analyze(event) + else: + llm_result = None + + # 融合结果 + fused_result = self.result_fusion.fuse( + event.yolo_result, + llm_result + ) + + # 评估严重性 + severity = self.severity_assessor.assess(fused_result) + + # 发布事件 + await self._publish_event(fused_result, severity) +``` + +### 7.3 监控指标 + +```python +@dataclass +class EngineMetrics: + """引擎监控指标""" + + # 处理指标 + events_processed: int = 0 + events_accepted: int = 0 + events_rejected: int = 0 + events_aggregated: int = 0 + + # 延迟指标(毫秒) + decision_latency_ms: float = 0.0 + rule_match_latency_ms: float = 0.0 + aggregation_latency_ms: float = 0.0 + llm_decision_latency_ms: float = 0.0 + severity_latency_ms: float = 0.0 + total_latency_ms: float = 0.0 + + # LLM指标 + llm_calls_total: int = 0 + llm_calls_cached: int = 0 + llm_queue_size: int = 0 + llm_cost_usd: float = 0.0 + + # 质量指标 + false_positive_rate: float = 0.0 + precision: float = 0.0 + recall: float = 0.0 + + # 资源指标 + memory_usage_mb: float = 0.0 + cpu_usage_percent: float = 0.0 +``` + +--- + +## 附录 + +### A. 术语表 + +| 术语 | 英文 | 说明 | +|------|------|------| +| 候选事件 | Candidate Event | 通过初步筛选的潜在事件 | +| 聚合事件 | Aggregated Event | 合并去重后的事件 | +| 最终事件 | Final Event | 经过完整流程处理后的事件 | +| 触发决策 | Trigger Decision | 是否触发LLM分析的决策 | +| 融合结果 | Fusion Result | YOLO和LLM结果融合后的结果 | +| 严重等级 | Severity Level | 事件的严重程度分级 | +| 冷却时间 | Cooldown | 同一类型事件的重复预警间隔 | + +### B. 数据模型关系图 + +``` +DetectionResult + │ + ├──> CandidateEvent (1:N) + │ │ + │ └──> RuleCheckResult + │ │ + │ └──> AggregatedEvent (N:1) + │ │ + │ ├──> TriggerDecision + │ │ │ + │ │ └──> LLMResult (Optional) + │ │ + │ └──> FusionResult + │ │ + │ └──> SeverityAssessment + │ │ + │ └──> FinalEvent + │ + └──> BehaviorAlert (if behavior detected) +``` + +--- + +*文档结束* diff --git a/docs/project-integration-plan.md b/docs/project-integration-plan.md new file mode 100644 index 0000000..2f2760c --- /dev/null +++ b/docs/project-integration-plan.md @@ -0,0 +1,522 @@ +# 项目整合计划书:视频检测平台功能扩展 + +## 文档信息 + +- **项目名称**: jc-video-recognize 视频模型检测平台 +- **模块路径**: `/Users/wwh/project/jc-video-recognize/apps/server` +- **制定日期**: 2026-06-02 +- **文档版本**: v1.0 + +--- + +## 目录 + +1. [项目概述](#1-项目概述) +2. [视频流接入方案](#2-视频流接入方案) +3. [MQTT预警事件消息系统](#3-mqtt预警事件消息系统) +4. [AI分析辅助功能集成](#4-ai分析辅助功能集成) +5. [大模型二次判断架构](#5-大模型二次判断架构) +6. [项目管理与实施计划](#6-项目管理与实施计划) +7. [交付成果清单](#7-交付成果清单) + +--- + +## 1. 项目概述 + +### 1.1 现有系统架构 + +当前系统是一个基于 **FastAPI + YOLO/PaddlePaddle** 的实时视频检测平台: + +| 层级 | 技术栈 | 说明 | +|------|--------|------| +| **前端** | Vue 3 + Vite + Element Plus | 用户界面与可视化 | +| **后端** | FastAPI + Uvicorn | REST API + WebSocket | +| **AI推理** | YOLOv8/v10 + PaddlePaddle | 多模型检测服务 | +| **实时通信** | WebSocket | 摄像头视频流传输 | +| **检测模型** | 火灾/安全帽/人群/抽烟/徘徊检测 | 已集成的AI能力 | + +### 1.2 核心需求确认 + +| 需求 | 确认内容 | +|------|----------| +| **视频流接入** | 局域网环境,RTSP协议直接接入 | +| **消息系统** | 复用现有MQTT Broker | +| **AI扩展** | 大模型(GPT-4V/Claude等)进行二次深度判断 | + +--- + +## 2. 视频流接入方案 + +### 2.1 系统架构图 + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ 局域网环境 │ +│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ +│ │ 摄像头-1 │ │ 摄像头-2 │ │ 摄像头-N │ │ +│ │ RTSP流 │ │ RTSP流 │ │ RTSP流 │ │ +│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ +│ │ │ │ │ +│ └───────────────┼───────────────┘ │ +│ ▼ │ +│ ┌─────────────────────┐ │ +│ │ Stream Manager │ │ +│ │ (多路流管理调度) │ │ +│ └──────────┬──────────┘ │ +│ │ │ +│ ┌───────────────┼───────────────┐ │ +│ ▼ ▼ ▼ │ +│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ +│ │ 解码线程-1 │ │ 解码线程-2 │ │ 解码线程-N │ │ +│ │(FFmpeg/CV2)│ │(FFmpeg/CV2)│ │(FFmpeg/CV2)│ │ +│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ +│ │ │ │ │ +│ └──────────────┼──────────────┘ │ +│ ▼ │ +│ ┌──────────────────┐ │ +│ │ AI检测引擎 │ │ +│ │ YOLO/Paddle+大模型 │ │ +│ └────────┬─────────┘ │ +│ │ │ +│ ▼ │ +│ ┌──────────────────┐ │ +│ │ MQTT Publisher │ │ +│ │ 预警事件发布 │ │ +│ └──────────────────┘ │ +└─────────────────────────────────────────────────────────────────┘ +``` + +### 2.2 技术方案选型 + +| 组件 | 技术选型 | 说明 | +|------|----------|------| +| **RTSP客户端** | `opencv-python` + `ffmpeg` | 支持硬件加速解码 | +| **流管理** | 异步线程池 | 每路流独立解码线程 | +| **帧缓冲** | 环形缓冲区 (Ring Buffer) | 防止内存溢出,支持丢帧策略 | +| **解码优化** | FFmpeg硬解码 (可选) | NVIDIA/Intel GPU加速 | + +### 2.3 关键设计决策 + +**1. 解码方案选择** +- **方案A**: OpenCV VideoCapture (简单,已验证) +- **方案B**: FFmpeg子进程 (更稳定,支持更多格式) +- **推荐**: 方案A为主,方案B作为备选 + +**2. 多路并发策略** + +```python +# 每路摄像头配置 +{ + "camera_id": "cam_001", + "rtsp_url": "rtsp://192.168.1.100:554/stream", + "decode_fps": 5, # 解码帧率(降低AI负载) + "buffer_size": 30, # 缓冲区大小 + "reconnect_interval": 5 # 断线重连间隔(秒) +} +``` + +**3. 性能预估** + +| 指标 | 目标值 | 说明 | +|------|--------|------| +| 单路延迟 | < 300ms | 解码+AI推理 | +| 并发路数 | 16路+ | CPU推理 | +| CPU占用 | < 70% | 16路@720p | + +### 2.4 开发与集成任务 + +- [ ] 设计视频流接收、解码、存储及转发的系统架构 +- [ ] 开发RTSP协议解析模块 +- [ ] 实现视频流数据处理管道,确保低延迟与高稳定性 +- [ ] 编写视频流接入模块的单元测试与集成测试用例 + +### 2.5 质量保障与验收标准 + +| 指标 | 目标值 | +|------|--------| +| 视频流传输延迟 | < 500ms | +| 异常恢复时间 | < 5秒 | +| 并发视频流支持 | ≥ 16路 | +| 系统可用性 | 7×24小时 | + +--- + +## 3. MQTT预警事件消息系统 + +### 3.1 系统架构设计 + +``` +┌─────────────┐ ┌─────────────┐ ┌─────────────┐ +│ AI检测服务 │────▶│ 事件处理器 │────▶│ MQTT Broker │ +└─────────────┘ └─────────────┘ └──────┬──────┘ + │ + ┌──────────────────────┼──────────┐ + ▼ ▼ ▼ + ┌─────────┐ ┌─────────┐ ┌─────────┐ + │ Web前端 │ │ 移动端 │ │ 第三方 │ + └─────────┘ └─────────┘ └─────────┘ +``` + +### 3.2 消息主题设计 + +``` +# 主题层级结构 +jc-video/ +├── alerts/ +│ ├── fire/ # 火灾预警 +│ │ └── {camera_id} # 具体摄像头 +│ ├── smoking/ # 抽烟预警 +│ ├── loitering/ # 徘徊预警 +│ └── illegal_parking/ # 违停预警 +├── status/ +│ ├── camera/ # 摄像头状态 +│ └── system/ # 系统状态 +└── commands/ + └── camera/ # 摄像头控制指令 +``` + +### 3.3 预警事件消息格式 + +```json +{ + "event_id": "evt_550e8400-e29b-41d4-a716-446655440000", + "event_type": "fire", + "event_level": "critical", + "timestamp": "2026-06-02T10:30:00.123Z", + "camera_id": "cam_001", + "camera_name": "大厅摄像头", + "location": { + "building": "A栋", + "floor": "1F", + "area": "大厅" + }, + "detection": { + "class": "Fire", + "confidence": 0.95, + "bbox": [100, 200, 150, 180], + "snapshot_url": "/static/alerts/evt_550e8400.jpg" + }, + "ai_analysis": { + "llm_verified": true, + "llm_confidence": 0.92, + "llm_reasoning": "检测到明显火焰特征,伴随烟雾..." + }, + "metadata": { + "model_id": "fire_detection", + "processing_time_ms": 150 + } +} +``` + +### 3.4 QoS策略 + +| 消息类型 | QoS等级 | 说明 | +|----------|---------|------| +| 预警事件 | QoS 2 | 确保只送达一次 | +| 状态上报 | QoS 1 | 至少送达一次 | +| 控制指令 | QoS 1 | 至少送达一次 | + +### 3.5 开发与集成任务 + +- [ ] 设计预警事件消息的结构格式与传输协议 +- [ ] 配置MQTT服务器安全认证机制 +- [ ] 开发MQTT客户端模块,实现消息发布与订阅功能 +- [ ] 设计预警事件触发机制与消息路由策略 +- [ ] 实现消息持久化存储与消费确认机制 +- [ ] 开发预警消息管理后台,支持消息查询与统计分析 + +### 3.6 质量保障与验收标准 + +| 指标 | 目标值 | +|------|--------| +| 消息传输成功率 | 100% | +| 消息丢失率 | 0 | +| 预警消息延迟 | < 1秒 | +| 并发客户端连接 | ≥ 100个 | + +--- + +## 4. AI分析辅助功能集成 + +### 4.1 当前已支持的检测能力 + +- ✅ 火灾检测 (YOLOv10) +- ✅ 安全帽检测 (YOLOv8) +- ✅ 人群检测 (YOLOv8) +- ✅ 抽烟检测 (YOLOv8 + PaddlePaddle) +- ✅ 徘徊检测 (YOLOv8 + 行为分析) + +### 4.2 建议扩展的AI能力 + +| 功能 | 技术方案 | 优先级 | +|------|----------|--------| +| 人脸识别/属性分析 | InsightFace / PaddleFace | 中 | +| 车辆识别/车牌检测 | PP-Vehicle (已有基础) | 高 | +| 异常行为检测 | 自定义行为分析算法 | 中 | +| 越界/入侵检测 | 虚拟围栏算法 | 高 | + +### 4.3 开发与集成任务 + +- [ ] 根据业务需求评估并选择合适的AI模型 +- [ ] 搭建AI模型训练与推理环境,配置必要的GPU资源 +- [ ] 制定AI模型性能评估指标与优化策略 +- [ ] 设计AI分析接口,实现与视频流数据的对接 +- [ ] 开发AI模型推理服务,支持实时分析与批量处理两种模式 +- [ ] 实现AI分析结果与预警系统的联动机制 +- [ ] 开发AI模型效果监控与反馈优化模块 + +### 4.4 质量保障与验收标准 + +| 指标 | 目标值 | +|------|--------| +| AI分析准确率 | ≥ 90% | +| 单帧视频分析处理时间 | < 200ms | +| 模型更新支持 | 定期更新与持续优化 | + +--- + +## 5. 大模型二次判断架构 + +### 5.1 架构设计 + +``` +┌─────────────────────────────────────────────────────────────┐ +│ AI分析流程 │ +│ │ +│ 视频帧 → YOLO初检 → 触发条件? ──否──→ 丢弃 │ +│ │ │ +│ 是 │ +│ ▼ │ +│ ┌─────────────────┐ │ +│ │ 截取ROI区域 │ │ +│ │ (目标区域放大) │ │ +│ └────────┬────────┘ │ +│ │ │ +│ ▼ │ +│ ┌─────────────────┐ │ +│ │ 大模型视觉分析 │ │ +│ │ (GPT-4V/Claude) │ │ +│ │ │ │ +│ │ Prompt: │ │ +│ │ "分析图片中是否 │ │ +│ │ 存在火灾/抽烟 │ │ +│ │ 等危险行为" │ │ +│ └────────┬────────┘ │ +│ │ │ +│ ▼ │ +│ ┌─────────────────┐ │ +│ │ 结果融合决策 │ │ +│ │ │ │ +│ │ YOLO置信度 × │ │ +│ │ LLM置信度 = │ │ +│ │ 最终预警等级 │ │ +│ └────────┬────────┘ │ +│ │ │ +│ ▼ │ +│ ┌─────────────────┐ │ +│ │ 生成预警事件 │ │ +│ │ 发布MQTT │ │ +│ └─────────────────┘ │ +└─────────────────────────────────────────────────────────────┘ +``` + +### 5.2 大模型选型对比 + +| 模型 | 优点 | 缺点 | 成本 | 适用场景 | +|------|------|------|------|----------| +| **GPT-4o** | 视觉能力强,响应快 | 需翻墙,成本较高 | $$$ | 高精度场景 | +| **Claude 3.5 Sonnet** | 推理能力强,中文好 | 国内访问不稳定 | $$ | 复杂场景分析 | +| **Qwen-VL (通义千问)** | 国内可用,成本低 | 精度略低 | $ | 成本敏感场景 | +| **GLM-4V (智谱)** | 国内可用,中文优化 | 生态较小 | $ | 本地化部署 | + +### 5.3 推荐方案:多模型策略 + +```python +# 大模型配置 +LLM_CONFIGS = { + "primary": { + "provider": "openai", + "model": "gpt-4o", + "api_key_env": "OPENAI_API_KEY", + "max_tokens": 500, + "temperature": 0.3 + }, + "fallback": { + "provider": "dashscope", # 阿里通义千问 + "model": "qwen-vl-max", + "api_key_env": "DASHSCOPE_API_KEY", + "max_tokens": 500, + "temperature": 0.3 + } +} +``` + +### 5.4 Prompt设计模板 + +#### 火灾检测Prompt + +``` +你是一位专业的消防安全分析师。请仔细分析这张图片,判断是否存在以下情况: + +1. **真实火焰** - 明火、燃烧现象 +2. **烟雾** - 可见烟雾 +3. **误报可能** - 红色物体、灯光、反光等非火灾情况 + +请按以下格式回复: +```json +{ + "has_fire": true/false, + "has_smoke": true/false, + "confidence": 0-1, + "reasoning": "详细分析说明", + "risk_level": "low/medium/high/critical" +} +``` + +注意:必须严格区分真实火灾和视觉相似物(如红色衣服、灯光)。 + + + + +### 5.5 触发大模型的条件 + +```python +# 触发策略配置 +TRIGGER_CONFIG = { + # 条件1: YOLO置信度在模糊区间 + "confidence_range": [0.5, 0.85], + + # 条件2: 特定场景强制触发 + "force_trigger_scenes": ["indoor_no_smoking_area"], + + # 条件3: 时间窗口去重(避免重复调用) + "dedup_window_seconds": 30, + + # 条件4: 并发限制 + "max_concurrent_llm_calls": 5 +} +``` + +--- + +## 6. 项目管理与实施计划 + +### 6.1 项目时间轴 + +| 阶段 | 时间范围 | 主要任务 | 交付物 | +|------|----------|----------|--------| +| **阶段一** | 第1周 | 需求分析与技术方案确定 | 技术方案文档 | +| **阶段二** | 第2-3周 | RTSP接入开发 | RTSP服务模块 | +| **阶段三** | 第4周 | MQTT预警系统开发 | MQTT服务模块 | +| **阶段四** | 第5-6周 | 大模型集成 | AI分析服务 | +| **阶段五** | 第7周 | 系统集成与联调 | 集成测试报告 | +| **阶段六** | 第8周 | 系统测试与性能优化 | 性能测试报告 | +| **阶段七** | 第9周 | 部署上线与运维支持 | 生产版本 | + +### 6.2 资源需求 + +#### 6.2.1 人员配置 + +| 角色 | 人数 | 职责 | +|------|------|------| +| 后端开发 | 2人 | FastAPI服务、MQTT、视频流 | +| AI算法 | 1人 | 模型优化、大模型集成 | +| 测试 | 1人 | 功能测试、性能测试 | + +#### 6.2.2 硬件资源 + +| 资源 | 数量 | 用途 | +|------|------|------| +| 服务器 | 2台 | 主备部署 | +| GPU (可选) | 1块 | AI推理加速 | +| 摄像头 | 若干 | 测试验证 | + +#### 6.2.3 软件资源 + +- 开发工具:VS Code、PyCharm +- 测试环境:Docker、Postman +- 部署平台:Linux服务器 + +### 6.3 风险管理 + +| 风险点 | 影响 | 应对策略 | +|--------|------|----------| +| RTSP延迟过高 | 高 | 采用硬件解码、优化缓冲策略 | +| MQTT消息丢失 | 高 | 实现QoS等级、消息持久化 | +| AI模型性能不足 | 中 | 模型量化、批处理优化 | +| 大模型API不稳定 | 中 | 多模型降级策略、本地缓存 | +| 并发处理能力 | 中 | 异步架构、服务拆分 | + +--- + +## 7. 交付成果清单 + +### 7.1 文档交付物 + +- [ ] 系统设计文档与技术方案说明书 +- [ ] 用户操作手册与开发文档 +- [ ] 系统部署包与环境配置说明 + +### 7.2 代码交付物 + +| 文件路径 | 功能 | +|----------|------| +| `services/rtsp_service.py` | RTSP流接收与解码 | +| `services/stream_manager.py` | 多路流管理调度 | +| `services/mqtt_service.py` | MQTT客户端连接 | +| `services/alert_service.py` | 预警事件生成 | +| `services/llm_analysis_service.py` | 大模型视觉分析 | +| `models/stream_schemas.py` | 流数据模型 | +| `models/alert_schemas.py` | 预警数据模型 | +| `api/streaming.py` | 流管理API | +| `api/alerts.py` | 预警查询API | + +### 7.3 测试交付物 + +- [ ] 测试报告与性能评估报告 +- [ ] 单元测试用例与覆盖率报告 +- [ ] 集成测试报告 + +### 7.4 部署交付物 + +- [ ] 源代码与构建部署脚本 +- [ ] Docker镜像与编排文件 +- [ ] 环境变量配置模板 + +--- + +## 附录 + +### A. 技术依赖清单 + +``` +# 新增Python依赖 +paho-mqtt>=1.6.0 # MQTT客户端 +openai>=1.0.0 # OpenAI API +httpx>=0.24.0 # 异步HTTP客户端 +aioredis>=2.0.0 # Redis缓存(可选) +pydantic-settings>=2.0.0 # 配置管理 +``` + +### B. 环境变量配置 + +```bash +# MQTT配置 +MQTT_BROKER_HOST=localhost +MQTT_BROKER_PORT=1883 +MQTT_USERNAME=admin +MQTT_PASSWORD=password + +# 大模型API配置 +OPENAI_API_KEY=sk-xxx +DASHSCOPE_API_KEY=sk-xxx + +# RTSP配置 +RTSP_DEFAULT_FPS=5 +RTSP_BUFFER_SIZE=30 +RTSP_RECONNECT_INTERVAL=5 +``` + +--- + +*文档结束*