""" 基于位置的静止人员检测算法 不依赖跟踪 ID,而是根据位置来关联人员 适用于跟踪不稳定但人员相对静止的场景 """ import time from typing import Dict, List, Tuple, Optional from dataclasses import dataclass, field from collections import defaultdict @dataclass class PositionRecord: """位置记录""" first_seen: float last_seen: float center: Tuple[int, int] box: Tuple[int, int, int, int] duration: float = 0.0 class PositionBasedStationaryDetector: """ 基于位置的静止检测器 特点: - 不依赖跟踪 ID,直接用位置关联人员 - 适用于 SORT 等跟踪器不稳定的场景 - 使用网格化位置 + 距离容差进行匹配 """ def __init__( self, stationary_threshold: float = 10.0, # 静止阈值(秒) position_tolerance: int = 50, # 位置容差(像素) cleanup_interval: float = 5.0 # 清理间隔(秒) ): self.stationary_threshold = stationary_threshold self.position_tolerance = position_tolerance self.cleanup_interval = cleanup_interval # 位置历史记录: {position_key: PositionRecord} self._position_history: Dict[Tuple[int, int], PositionRecord] = {} self._last_cleanup = time.time() def _get_position_key(self, center: Tuple[int, int]) -> Tuple[int, int]: """ 将连续坐标转换为离散的位置键 用于将相近位置归为一类 """ x, y = center grid_x = int(x / self.position_tolerance) grid_y = int(y / self.position_tolerance) return (grid_x, grid_y) def _find_matching_position( self, center: Tuple[int, int] ) -> Optional[Tuple[int, int]]: """ 查找与当前位置匹配的历史位置 返回匹配的位置键,如果没有则返回 None """ current_key = self._get_position_key(center) # 首先检查精确匹配 if current_key in self._position_history: hist_center = self._position_history[current_key].center distance = ((center[0] - hist_center[0]) ** 2 + (center[1] - hist_center[1]) ** 2) ** 0.5 if distance < self.position_tolerance: return current_key # 检查相邻网格 for dx in [-1, 0, 1]: for dy in [-1, 0, 1]: if dx == 0 and dy == 0: continue neighbor_key = (current_key[0] + dx, current_key[1] + dy) if neighbor_key in self._position_history: hist_center = self._position_history[neighbor_key].center distance = ((center[0] - hist_center[0]) ** 2 + (center[1] - hist_center[1]) ** 2) ** 0.5 if distance < self.position_tolerance: return neighbor_key return None def update( self, center: Tuple[int, int], box: Tuple[int, int, int, int] ) -> Tuple[str, float, bool]: """ 更新位置信息 Args: center: (x, y) 中心点坐标 box: (x1, y1, x2, y2) 边界框 Returns: position_id: 位置 ID(用于关联) stationary_duration: 静止时长(秒) is_stationary: 是否静止超过阈值 """ current_time = time.time() # 定期清理旧记录 if current_time - self._last_cleanup > self.cleanup_interval: self.cleanup_old_positions() self._last_cleanup = current_time # 查找匹配的历史位置 matching_key = self._find_matching_position(center) if matching_key is not None: # 更新已有位置 record = self._position_history[matching_key] record.last_seen = current_time # 平滑更新中心位置(使用移动平均) old_center = record.center record.center = ( int(0.7 * old_center[0] + 0.3 * center[0]), int(0.7 * old_center[1] + 0.3 * center[1]) ) record.box = box duration = current_time - record.first_seen record.duration = duration is_stationary = duration > self.stationary_threshold position_id = f"pos_{matching_key[0]}_{matching_key[1]}" return position_id, duration, is_stationary else: # 创建新位置记录 new_key = self._get_position_key(center) self._position_history[new_key] = PositionRecord( first_seen=current_time, last_seen=current_time, center=center, box=box, duration=0.0 ) new_id = f"pos_{new_key[0]}_{new_key[1]}" return new_id, 0.0, False def cleanup_old_positions(self, max_age: float = 5.0) -> int: """ 清理长时间未更新的位置记录 Args: max_age: 最大保留时间(秒) Returns: 清理的记录数量 """ current_time = time.time() to_remove = [ key for key, data in self._position_history.items() if current_time - data.last_seen > max_age ] for key in to_remove: del self._position_history[key] return len(to_remove) def get_all_stationary( self, threshold: Optional[float] = None ) -> List[Dict]: """ 获取所有静止超过阈值的位置 Args: threshold: 静止阈值(秒),默认使用初始化时的阈值 Returns: list: [{position_id, duration, center, box}, ...] """ threshold = threshold or self.stationary_threshold result = [] for key, data in self._position_history.items(): if data.duration > threshold: result.append({ 'position_id': f"pos_{key[0]}_{key[1]}", 'duration': data.duration, 'center': data.center, 'box': data.box }) # 按时长排序 result.sort(key=lambda x: x['duration'], reverse=True) return result def reset(self): """重置所有跟踪数据""" self._position_history.clear() self._last_cleanup = time.time() def detect( self, detections: List[Dict] ) -> List[Dict]: """ 批量检测静止状态 Args: detections: 检测结果列表,每项包含 'bbox': [x1, y1, x2, y2] Returns: 添加 'stationary_info' 字段的检测结果 """ results = [] for det in detections: x1, y1, x2, y2 = det['bbox'] center = ((x1 + x2) // 2, (y1 + y2) // 2) box = (x1, y1, x2, y2) position_id, duration, is_stationary = self.update(center, box) det_copy = det.copy() det_copy['stationary_info'] = { 'position_id': position_id, 'duration': round(duration, 2), 'is_stationary': is_stationary, 'threshold': self.stationary_threshold } results.append(det_copy) return results