Files
jc-video-recognize/apps/server/services/detection_service.py
wwh 7aa71c5f83 feat: 新增人员徘徊/静止行为分析功能
本次提交实现了完整的人员行为分析系统,包括:
1. 新增基于位置和跟踪ID的两种行为检测算法
2. 新增徘徊检测服务与行为处理器模块
3. 前后端集成算法配置界面与告警展示
4. 支持图片和视频流场景下的行为分析
5. 新增算法配置接口与文档说明

具体改动:
- 新增loitering_detection模型目录与算法实现
- 新增AlgorithmConfig组件实现可视化配置
- 扩展图片/视频检测接口支持算法参数传递
- 新增行为告警推送与前端展示页面
- 优化检测服务,集成行为分析逻辑
- 移除冗余日志输出,完善代码注释
2026-05-19 09:17:09 +08:00

333 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import cv2
import numpy as np
import time
import uuid
import logging
from typing import Dict, List, Optional
from PIL import Image, ImageDraw, ImageFont
from .loitering_service import get_loitering_service
logger = logging.getLogger(__name__)
class DetectionService:
def __init__(self, model_service):
self.model_service = model_service
self.base_dir = os.path.dirname(os.path.dirname(__file__))
self.results_dir = os.path.join(self.base_dir, "static", "results")
self.temp_dir = os.path.join(self.base_dir, "static", "temp")
os.makedirs(self.results_dir, exist_ok=True)
os.makedirs(self.temp_dir, exist_ok=True)
# 初始化徘徊检测服务(懒加载,实际初始化在第一次使用时)
self.loitering_service = get_loitering_service()
async def detect_image(
self,
image: np.ndarray,
model_id: str,
confidence: float = 0.5,
iou: float = 0.45,
algorithm_config: Optional[Dict] = None
) -> Dict:
start_time = time.time()
model = await self.model_service.load_model(model_id)
if not model:
return {
'success': False,
'message': f'模型加载失败: {model_id}',
'detections': [],
'stats': None
}
try:
results = model(image, conf=confidence, iou=iou, verbose=False)
detections = []
for result in results:
boxes = result.boxes
for box in boxes:
x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
conf = float(box.conf[0].cpu().numpy())
cls = int(box.cls[0].cpu().numpy())
class_name = result.names[cls]
label_map = self.model_service.model_configs[model_id]['labels']
label = label_map.get(class_name, class_name)
detections.append({
'class': class_name,
'label': label,
'confidence': round(conf, 3),
'bbox': [int(x1), int(y1), int(x2), int(y2)]
})
processing_time = time.time() - start_time
avg_confidence = sum(d['confidence'] for d in detections) / len(detections) if detections else 0
result_data = {
'success': True,
'message': '检测完成',
'detections': detections,
'stats': {
'total_detections': len(detections),
'avg_confidence': round(avg_confidence, 3),
'processing_time': round(processing_time, 3),
'model_used': model_id
}
}
# 如果启用了行为检测算法
if algorithm_config and detections:
result_data = self._apply_behavior_analysis(
result_data, algorithm_config
)
return result_data
except Exception as e:
logger.error(f"图片检测失败: {e}")
return {
'success': False,
'message': f'检测失败: {str(e)}',
'detections': [],
'stats': None
}
async def detect_frame(
self,
frame: np.ndarray,
model_id: str,
confidence: float = 0.5,
iou: float = 0.45,
draw: bool = True
) -> tuple:
start_time = time.time()
model = await self.model_service.load_model(model_id)
if not model:
return frame, {
'success': False,
'detections': [],
'stats': None
}
try:
results = model(frame, conf=confidence, iou=iou, verbose=False)
detections = []
for result in results:
boxes = result.boxes
for box in boxes:
x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
conf = float(box.conf[0].cpu().numpy())
cls = int(box.cls[0].cpu().numpy())
class_name = result.names[cls]
label_map = self.model_service.model_configs[model_id]['labels']
label = label_map.get(class_name, class_name)
detections.append({
'class': class_name,
'label': label,
'confidence': round(conf, 3),
'bbox': [int(x1), int(y1), int(x2), int(y2)]
})
processing_time = time.time() - start_time
fps = 1.0 / processing_time if processing_time > 0 else 0
avg_confidence = sum(d['confidence'] for d in detections) / len(detections) if detections else 0
result_data = {
'success': True,
'detections': detections,
'stats': {
'total_detections': len(detections),
'avg_confidence': round(avg_confidence, 3),
'processing_time': round(processing_time, 3),
'fps': round(fps, 2),
'model_used': model_id
}
}
# 如果是人员检测模型,进行行为分析
logger.info(f"[DetectionService] 模型: {model_id}, 检测目标: {len(detections)}")
if model_id == 'loitering_detection' and detections:
logger.info("[DetectionService] 调用行为分析...")
# 确保服务已初始化
if not self.loitering_service.is_initialized:
logger.info("[DetectionService] 初始化徘徊检测服务...")
self.loitering_service.initialize(
# 检测阈值(用于判断是否静止/徘徊)
stationary_threshold=10.0,
position_tolerance=50,
loitering_threshold=300.0,
movement_threshold=5.0,
# 告警阈值(用于触发告警,应该比检测阈值高)
stationary_alert_threshold=30.0,
loitering_alert_threshold=600.0,
# 启用告警
enable_stationary_alert=True,
enable_loitering_alert=True
)
behavior_result = self.loitering_service.process_detections(
detections,
use_tracking=False # 可以改为 True 如果使用跟踪
)
detections = behavior_result['detections']
result_data['alerts'] = behavior_result['alerts']
result_data['behavior_stats'] = behavior_result['stats']
logger.info(f"[DetectionService] 行为分析完成: alerts={len(behavior_result['alerts'])}, stats={behavior_result['stats']}")
if draw:
frame = self.draw_detections(frame, detections, fps)
return frame, result_data
except Exception as e:
logger.error(f"帧检测失败: {e}")
return frame, {
'success': False,
'detections': [],
'stats': None
}
def _apply_behavior_analysis(
self,
result_data: Dict,
algorithm_config: Dict
) -> Dict:
"""
应用行为分析算法
Args:
result_data: 检测结果
algorithm_config: 算法配置
{
"enable_stationary_detection": true,
"enable_loitering_detection": false,
"stationary_threshold": 10.0,
"position_tolerance": 50,
...
}
Returns:
添加行为分析结果的检测结果
"""
detections = result_data['detections']
# 检查是否需要行为分析
enable_stationary = algorithm_config.get('enable_stationary_detection', False)
enable_loitering = algorithm_config.get('enable_loitering_detection', False)
if not enable_stationary and not enable_loitering:
return result_data
try:
# 使用前端传入的配置初始化服务
self.loitering_service.initialize(
stationary_threshold=algorithm_config.get('stationary_threshold', 10.0),
position_tolerance=algorithm_config.get('position_tolerance', 50),
loitering_threshold=algorithm_config.get('loitering_threshold', 300.0),
movement_threshold=algorithm_config.get('movement_threshold', 5.0),
enable_stationary_alert=enable_stationary,
enable_loitering_alert=enable_loitering
)
# 处理检测
behavior_result = self.loitering_service.process_detections(
detections,
use_tracking=enable_loitering # 只有启用徘徊检测时才使用跟踪
)
result_data['detections'] = behavior_result['detections']
result_data['alerts'] = behavior_result['alerts']
result_data['behavior_stats'] = behavior_result['stats']
except Exception as e:
logger.error(f"行为分析失败: {e}")
result_data['behavior_error'] = str(e)
return result_data
def draw_detections(
self,
frame: np.ndarray,
detections: List[Dict],
fps: float = 0,
algorithm_config: Optional[Dict] = None
) -> np.ndarray:
"""
绘制检测结果和行为告警
Args:
frame: 图像帧
detections: 检测结果列表(可能包含 stationary_info/loitering_info
fps: 帧率
algorithm_config: 算法配置(已废弃,保留用于向后兼容)
"""
try:
img_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
pil_img = Image.fromarray(img_rgb)
draw = ImageDraw.Draw(pil_img)
try:
font = ImageFont.truetype("/System/Library/Fonts/PingFang.ttc", 20)
font_large = ImageFont.truetype("/System/Library/Fonts/PingFang.ttc", 24)
except:
font = ImageFont.load_default()
font_large = font
class_colors = {
'Fire': (255, 0, 0),
'Smoke': (128, 128, 128),
'person': (0, 255, 0),
'helmet': (255, 255, 0),
'no_helmet': (255, 0, 255),
'cigarette': (0, 165, 255)
}
for det in detections:
x1, y1, x2, y2 = det['bbox']
class_name = det['class']
conf = det['confidence']
label = det['label']
# 根据是否有行为告警选择颜色
color = class_colors.get(class_name, (0, 255, 0))
# 检查行为告警
if algorithm_config:
if 'stationary_info' in det:
info = det['stationary_info']
if info.get('is_stationary'):
color = (0, 0, 255) # 红色警告
label = f"静止{int(info['duration'])}s"
if 'loitering_info' in det:
info = det['loitering_info']
if info.get('is_loitering'):
color = (255, 0, 0) # 蓝色警告
label = f"徘徊{int(info['loitering_duration']//60)}min"
draw.rectangle([x1, y1, x2, y2], outline=color, width=3)
label_text = f"{label} {conf:.2f}"
bbox = draw.textbbox((0, 0), label_text, font=font)
text_w = bbox[2] - bbox[0]
text_h = bbox[3] - bbox[1]
draw.rectangle([x1, y1 - text_h - 4, x1 + text_w + 4, y1], fill=color)
draw.text((x1 + 2, y1 - text_h - 2), label_text, fill=(255, 255, 255), font=font)
if fps > 0:
fps_text = f"FPS: {fps:.1f} | Detections: {len(detections)}"
draw.text((10, 10), fps_text, fill=(0, 255, 0), font=font)
return cv2.cvtColor(np.array(pil_img), cv2.COLOR_RGB2BGR)
except Exception as e:
logger.error(f"绘制检测结果失败: {e}")
return frame