""" 行为检测处理器 集成基于位置和基于跟踪ID的检测算法 """ import sys import os import logging from typing import Dict, List, Optional, Tuple from dataclasses import dataclass # 添加算法模块路径 sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) from algorithms import PositionBasedStationaryDetector, LoiteringDetector logger = logging.getLogger(__name__) @dataclass class BehaviorAlert: """行为告警""" alert_type: str # 'stationary', 'loitering' level: str # 'low', 'medium', 'high' message: str person_id: Optional[str] = None position_id: Optional[str] = None duration: float = 0.0 bbox: Optional[Tuple[int, int, int, int]] = None class BehaviorProcessor: """ 行为检测处理器 整合两种检测方式: 1. 基于位置的静止检测(无需跟踪ID) 2. 基于跟踪ID的徘徊检测(需要跟踪ID) """ def __init__( self, # 静止检测参数 stationary_threshold: float = 10.0, position_tolerance: int = 50, # 徘徊检测参数 loitering_threshold: float = 300.0, movement_threshold: float = 5.0, # 告警参数 enable_stationary_alert: bool = True, enable_loitering_alert: bool = True, stationary_alert_threshold: float = 10.0, # 超过此时间产生告警 loitering_alert_threshold: float = 300.0 # 超过此时间产生告警 ): # 初始化检测器 self.stationary_detector = PositionBasedStationaryDetector( stationary_threshold=stationary_threshold, position_tolerance=position_tolerance ) self.loitering_detector = LoiteringDetector( loitering_threshold=loitering_threshold, stationary_threshold=stationary_threshold, movement_threshold=movement_threshold ) # 配置 self.enable_stationary_alert = enable_stationary_alert self.enable_loitering_alert = enable_loitering_alert self.stationary_alert_threshold = stationary_alert_threshold self.loitering_alert_threshold = loitering_alert_threshold def process( self, detections: List[Dict], use_tracking: bool = False, track_id_key: str = 'track_id' ) -> Dict: """ 处理检测结果,检测行为 Args: detections: 检测结果列表 use_tracking: 是否使用跟踪ID(如果有的话) track_id_key: 跟踪ID字段名 Returns: { 'detections': 添加行为信息的检测结果, 'alerts': 触发的告警列表, 'stats': 统计信息 } """ logger.info(f"[BehaviorProcessor] 开始处理 {len(detections)} 个检测结果") logger.info(f"[BehaviorProcessor] 配置: stationary={self.enable_stationary_alert}, loitering={self.enable_loitering_alert}") alerts = [] # 1. 始终进行基于位置的静止检测 logger.info(f"[BehaviorProcessor] 调用静止检测器...") detections = self.stationary_detector.detect(detections) logger.info(f"[BehaviorProcessor] 静止检测完成,检测到 {len(detections)} 个结果") # 检查静止告警 stationary_alerts = 0 if self.enable_stationary_alert: for det in detections: info = det.get('stationary_info', {}) if info.get('is_stationary') and info.get('duration', 0) >= self.stationary_alert_threshold: alert = BehaviorAlert( alert_type='stationary', level='medium' if info['duration'] < 30 else 'high', message=f"人员静止停留 {int(info['duration'])} 秒", position_id=info.get('position_id'), duration=info['duration'], bbox=tuple(det['bbox']) ) alerts.append(alert) stationary_alerts += 1 logger.info(f"[BehaviorProcessor] 静止告警: {stationary_alerts} 个") # 2. 如果有跟踪ID,进行徘徊检测 logger.info(f"[BehaviorProcessor] use_tracking={use_tracking}") if use_tracking: detections = self.loitering_detector.detect(detections, id_key=track_id_key) # 检查徘徊告警 if self.enable_loitering_alert: for det in detections: info = det.get('loitering_info', {}) if info.get('is_loitering') and info.get('loitering_duration', 0) >= self.loitering_alert_threshold: alert = BehaviorAlert( alert_type='loitering', level='high', message=f"人员徘徊 {int(info['loitering_duration'] // 60)} 分钟", person_id=str(info.get('person_id')), duration=info['loitering_duration'], bbox=tuple(det['bbox']) ) alerts.append(alert) # 统计信息 stats = { 'total_detections': len(detections), 'stationary_count': len(self.stationary_detector.get_all_stationary()), 'alert_count': len(alerts) } if use_tracking: stats.update({ 'loitering_count': len(self.loitering_detector.get_all_loitering()), 'tracking_count': self.loitering_detector.get_stats()['total_tracks'] }) logger.info(f"[BehaviorProcessor] 处理完成: {stats}") return { 'detections': detections, 'alerts': [self._alert_to_dict(a) for a in alerts], 'stats': stats } def _alert_to_dict(self, alert: BehaviorAlert) -> Dict: """将告警对象转换为字典""" return { 'type': alert.alert_type, 'level': alert.level, 'message': alert.message, 'person_id': alert.person_id, 'position_id': alert.position_id, 'duration': round(alert.duration, 2), 'bbox': alert.bbox } def get_stationary_persons(self) -> List[Dict]: """获取所有静止人员""" return self.stationary_detector.get_all_stationary() def get_loitering_persons(self) -> List[Dict]: """获取所有徘徊人员""" return self.loitering_detector.get_all_loitering() def reset(self): """重置所有检测器""" self.stationary_detector.reset() self.loitering_detector.reset() def get_config(self) -> Dict: """获取当前配置""" return { 'stationary_threshold': self.stationary_detector.stationary_threshold, 'position_tolerance': self.stationary_detector.position_tolerance, 'loitering_threshold': self.loitering_detector.loitering_threshold, 'movement_threshold': self.loitering_detector.movement_threshold, 'enable_stationary_alert': self.enable_stationary_alert, 'enable_loitering_alert': self.enable_loitering_alert, 'stationary_alert_threshold': self.stationary_alert_threshold, 'loitering_alert_threshold': self.loitering_alert_threshold }