Files
wwh 7aa71c5f83 feat: 新增人员徘徊/静止行为分析功能
本次提交实现了完整的人员行为分析系统,包括:
1. 新增基于位置和跟踪ID的两种行为检测算法
2. 新增徘徊检测服务与行为处理器模块
3. 前后端集成算法配置界面与告警展示
4. 支持图片和视频流场景下的行为分析
5. 新增算法配置接口与文档说明

具体改动:
- 新增loitering_detection模型目录与算法实现
- 新增AlgorithmConfig组件实现可视化配置
- 扩展图片/视频检测接口支持算法参数传递
- 新增行为告警推送与前端展示页面
- 优化检测服务,集成行为分析逻辑
- 移除冗余日志输出,完善代码注释
2026-05-19 09:17:09 +08:00

237 lines
7.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
基于位置的静止人员检测算法
不依赖跟踪 ID而是根据位置来关联人员
适用于跟踪不稳定但人员相对静止的场景
"""
import time
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass, field
from collections import defaultdict
@dataclass
class PositionRecord:
"""位置记录"""
first_seen: float
last_seen: float
center: Tuple[int, int]
box: Tuple[int, int, int, int]
duration: float = 0.0
class PositionBasedStationaryDetector:
"""
基于位置的静止检测器
特点:
- 不依赖跟踪 ID直接用位置关联人员
- 适用于 SORT 等跟踪器不稳定的场景
- 使用网格化位置 + 距离容差进行匹配
"""
def __init__(
self,
stationary_threshold: float = 10.0, # 静止阈值(秒)
position_tolerance: int = 50, # 位置容差(像素)
cleanup_interval: float = 5.0 # 清理间隔(秒)
):
self.stationary_threshold = stationary_threshold
self.position_tolerance = position_tolerance
self.cleanup_interval = cleanup_interval
# 位置历史记录: {position_key: PositionRecord}
self._position_history: Dict[Tuple[int, int], PositionRecord] = {}
self._last_cleanup = time.time()
def _get_position_key(self, center: Tuple[int, int]) -> Tuple[int, int]:
"""
将连续坐标转换为离散的位置键
用于将相近位置归为一类
"""
x, y = center
grid_x = int(x / self.position_tolerance)
grid_y = int(y / self.position_tolerance)
return (grid_x, grid_y)
def _find_matching_position(
self,
center: Tuple[int, int]
) -> Optional[Tuple[int, int]]:
"""
查找与当前位置匹配的历史位置
返回匹配的位置键,如果没有则返回 None
"""
current_key = self._get_position_key(center)
# 首先检查精确匹配
if current_key in self._position_history:
hist_center = self._position_history[current_key].center
distance = ((center[0] - hist_center[0]) ** 2 +
(center[1] - hist_center[1]) ** 2) ** 0.5
if distance < self.position_tolerance:
return current_key
# 检查相邻网格
for dx in [-1, 0, 1]:
for dy in [-1, 0, 1]:
if dx == 0 and dy == 0:
continue
neighbor_key = (current_key[0] + dx, current_key[1] + dy)
if neighbor_key in self._position_history:
hist_center = self._position_history[neighbor_key].center
distance = ((center[0] - hist_center[0]) ** 2 +
(center[1] - hist_center[1]) ** 2) ** 0.5
if distance < self.position_tolerance:
return neighbor_key
return None
def update(
self,
center: Tuple[int, int],
box: Tuple[int, int, int, int]
) -> Tuple[str, float, bool]:
"""
更新位置信息
Args:
center: (x, y) 中心点坐标
box: (x1, y1, x2, y2) 边界框
Returns:
position_id: 位置 ID用于关联
stationary_duration: 静止时长(秒)
is_stationary: 是否静止超过阈值
"""
current_time = time.time()
# 定期清理旧记录
if current_time - self._last_cleanup > self.cleanup_interval:
self.cleanup_old_positions()
self._last_cleanup = current_time
# 查找匹配的历史位置
matching_key = self._find_matching_position(center)
if matching_key is not None:
# 更新已有位置
record = self._position_history[matching_key]
record.last_seen = current_time
# 平滑更新中心位置(使用移动平均)
old_center = record.center
record.center = (
int(0.7 * old_center[0] + 0.3 * center[0]),
int(0.7 * old_center[1] + 0.3 * center[1])
)
record.box = box
duration = current_time - record.first_seen
record.duration = duration
is_stationary = duration > self.stationary_threshold
position_id = f"pos_{matching_key[0]}_{matching_key[1]}"
return position_id, duration, is_stationary
else:
# 创建新位置记录
new_key = self._get_position_key(center)
self._position_history[new_key] = PositionRecord(
first_seen=current_time,
last_seen=current_time,
center=center,
box=box,
duration=0.0
)
new_id = f"pos_{new_key[0]}_{new_key[1]}"
return new_id, 0.0, False
def cleanup_old_positions(self, max_age: float = 5.0) -> int:
"""
清理长时间未更新的位置记录
Args:
max_age: 最大保留时间(秒)
Returns:
清理的记录数量
"""
current_time = time.time()
to_remove = [
key for key, data in self._position_history.items()
if current_time - data.last_seen > max_age
]
for key in to_remove:
del self._position_history[key]
return len(to_remove)
def get_all_stationary(
self,
threshold: Optional[float] = None
) -> List[Dict]:
"""
获取所有静止超过阈值的位置
Args:
threshold: 静止阈值(秒),默认使用初始化时的阈值
Returns:
list: [{position_id, duration, center, box}, ...]
"""
threshold = threshold or self.stationary_threshold
result = []
for key, data in self._position_history.items():
if data.duration > threshold:
result.append({
'position_id': f"pos_{key[0]}_{key[1]}",
'duration': data.duration,
'center': data.center,
'box': data.box
})
# 按时长排序
result.sort(key=lambda x: x['duration'], reverse=True)
return result
def reset(self):
"""重置所有跟踪数据"""
self._position_history.clear()
self._last_cleanup = time.time()
def detect(
self,
detections: List[Dict]
) -> List[Dict]:
"""
批量检测静止状态
Args:
detections: 检测结果列表,每项包含 'bbox': [x1, y1, x2, y2]
Returns:
添加 'stationary_info' 字段的检测结果
"""
results = []
for det in detections:
x1, y1, x2, y2 = det['bbox']
center = ((x1 + x2) // 2, (y1 + y2) // 2)
box = (x1, y1, x2, y2)
position_id, duration, is_stationary = self.update(center, box)
det_copy = det.copy()
det_copy['stationary_info'] = {
'position_id': position_id,
'duration': round(duration, 2),
'is_stationary': is_stationary,
'threshold': self.stationary_threshold
}
results.append(det_copy)
return results