Files
jc-video-recognize/apps/server/services/detection_service.py
wwh a16e684e46 feat: 新增车辆检测Paddle模型及相关服务,优化依赖与代码兼容性
1. 新增3套PaddlePaddle车辆检测相关模型文件
2. 新增车辆检测服务类与违停检测功能
3. 更新服务依赖并添加环境初始化脚本与文档
4. 修复YOLO检测tensor转换兼容问题
5. 新增PyTorch版本兼容性修复逻辑
6. 扩展模型服务支持Paddle模型加载
2026-05-21 16:26:26 +08:00

420 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import cv2
import numpy as np
import time
import uuid
import logging
import torch
from typing import Dict, List, Optional
from PIL import Image, ImageDraw, ImageFont
from .loitering_service import get_loitering_service
logger = logging.getLogger(__name__)
class DetectionService:
def __init__(self, model_service):
self.model_service = model_service
self.base_dir = os.path.dirname(os.path.dirname(__file__))
self.results_dir = os.path.join(self.base_dir, "static", "results")
self.temp_dir = os.path.join(self.base_dir, "static", "temp")
os.makedirs(self.results_dir, exist_ok=True)
os.makedirs(self.temp_dir, exist_ok=True)
# 初始化徘徊检测服务(懒加载,实际初始化在第一次使用时)
self.loitering_service = get_loitering_service()
async def detect_image(
self,
image: np.ndarray,
model_id: str,
confidence: float = 0.5,
iou: float = 0.45,
algorithm_config: Optional[Dict] = None
) -> Dict:
start_time = time.time()
model = await self.model_service.load_model(model_id)
if not model:
return {
'success': False,
'message': f'模型加载失败: {model_id}',
'detections': [],
'stats': None
}
try:
results = model(image, conf=confidence, iou=iou, verbose=False)
detections = []
for result in results:
boxes = result.boxes
if len(boxes) == 0:
logger.info(f"模型 {model_id} 没有检测到目标")
continue
for box in boxes:
try:
if isinstance(box.xyxy, torch.Tensor) and box.xyxy.dim() > 0:
xyxy_values = box.xyxy.squeeze().tolist()
if len(xyxy_values) >= 4:
x1, y1, x2, y2 = float(xyxy_values[0]), float(xyxy_values[1]), float(xyxy_values[2]), float(xyxy_values[3])
else:
continue
elif isinstance(box.xyxy, (list, tuple)):
x1, y1, x2, y2 = float(box.xyxy[0]), float(box.xyxy[1]), float(box.xyxy[2]), float(box.xyxy[3])
else:
continue
if isinstance(box.conf, torch.Tensor):
if box.conf.dim() == 0:
conf = float(box.conf)
else:
conf = float(box.conf[0])
elif hasattr(box.conf, '__getitem__'):
conf = float(box.conf[0])
else:
conf = float(box.conf)
if isinstance(box.cls, torch.Tensor):
if box.cls.dim() == 0:
cls = int(box.cls)
else:
cls = int(box.cls[0])
elif hasattr(box.cls, '__getitem__'):
cls = int(box.cls[0])
else:
cls = int(box.cls)
except Exception as e:
import traceback
logger.error(f"访问 box 属性失败: {e}, box 类型: {type(box)}")
logger.error(f"错误堆栈: {traceback.format_exc()}")
logger.error(f"box 属性: {vars(box) if hasattr(box, '__dict__') else '无法获取'}")
continue
class_name = result.names[cls]
label_map = self.model_service.model_configs[model_id]['labels']
label = label_map.get(class_name, class_name)
detections.append({
'class': class_name,
'label': label,
'confidence': round(conf, 3),
'bbox': [int(x1), int(y1), int(x2), int(y2)]
})
processing_time = time.time() - start_time
avg_confidence = sum(d['confidence'] for d in detections) / len(detections) if detections else 0
result_data = {
'success': True,
'message': '检测完成',
'detections': detections,
'stats': {
'total_detections': len(detections),
'avg_confidence': round(avg_confidence, 3),
'processing_time': round(processing_time, 3),
'model_used': model_id
}
}
# 如果启用了行为检测算法
if algorithm_config and detections:
result_data = self._apply_behavior_analysis(
result_data, algorithm_config
)
return result_data
except Exception as e:
logger.error(f"图片检测失败: {e}")
return {
'success': False,
'message': f'检测失败: {str(e)}',
'detections': [],
'stats': None
}
async def detect_frame(
self,
frame: np.ndarray,
model_id: str,
confidence: float = 0.5,
iou: float = 0.45,
draw: bool = True
) -> tuple:
start_time = time.time()
model = await self.model_service.load_model(model_id)
if not model:
return frame, {
'success': False,
'detections': [],
'stats': None
}
try:
results = model(frame, conf=confidence, iou=iou, verbose=False)
detections = []
for result in results:
boxes = result.boxes
for box in boxes:
try:
if isinstance(box.xyxy, torch.Tensor) and box.xyxy.dim() > 0:
xyxy_values = box.xyxy.squeeze().tolist()
if len(xyxy_values) >= 4:
x1, y1, x2, y2 = float(xyxy_values[0]), float(xyxy_values[1]), float(xyxy_values[2]), float(xyxy_values[3])
else:
continue
elif isinstance(box.xyxy, (list, tuple)):
x1, y1, x2, y2 = float(box.xyxy[0]), float(box.xyxy[1]), float(box.xyxy[2]), float(box.xyxy[3])
else:
continue
if isinstance(box.conf, torch.Tensor):
if box.conf.dim() == 0:
conf = float(box.conf)
else:
conf = float(box.conf[0])
elif hasattr(box.conf, '__getitem__'):
conf = float(box.conf[0])
else:
conf = float(box.conf)
if isinstance(box.cls, torch.Tensor):
if box.cls.dim() == 0:
cls = int(box.cls)
else:
cls = int(box.cls[0])
elif hasattr(box.cls, '__getitem__'):
cls = int(box.cls[0])
else:
cls = int(box.cls)
class_name = result.names[cls]
label_map = self.model_service.model_configs[model_id]['labels']
label = label_map.get(class_name, class_name)
detections.append({
'class': class_name,
'label': label,
'confidence': round(conf, 3),
'bbox': [int(x1), int(y1), int(x2), int(y2)]
})
except Exception as e:
import traceback
logger.error(f"VIDEO DEBUG: 访问 box 属性失败: {e}, box 类型: {type(box)}")
logger.error(f"VIDEO DEBUG: 错误堆栈: {traceback.format_exc()}")
logger.error(f"VIDEO DEBUG: box 属性: {vars(box) if hasattr(box, '__dict__') else '无法获取'}")
continue
processing_time = time.time() - start_time
fps = 1.0 / processing_time if processing_time > 0 else 0
avg_confidence = sum(d['confidence'] for d in detections) / len(detections) if detections else 0
result_data = {
'success': True,
'detections': detections,
'stats': {
'total_detections': len(detections),
'avg_confidence': round(avg_confidence, 3),
'processing_time': round(processing_time, 3),
'fps': round(fps, 2),
'model_used': model_id
}
}
# 如果是人员检测模型,进行行为分析
logger.info(f"[DetectionService] 模型: {model_id}, 检测目标: {len(detections)}")
if model_id == 'loitering_detection' and detections:
logger.info("[DetectionService] 调用行为分析...")
# 确保服务已初始化
if not self.loitering_service.is_initialized:
logger.info("[DetectionService] 初始化徘徊检测服务...")
self.loitering_service.initialize(
# 检测阈值(用于判断是否静止/徘徊)
stationary_threshold=10.0,
position_tolerance=50,
loitering_threshold=300.0,
movement_threshold=5.0,
# 告警阈值(用于触发告警,应该比检测阈值高)
stationary_alert_threshold=30.0,
loitering_alert_threshold=600.0,
# 启用告警
enable_stationary_alert=True,
enable_loitering_alert=True
)
behavior_result = self.loitering_service.process_detections(
detections,
use_tracking=False # 可以改为 True 如果使用跟踪
)
detections = behavior_result['detections']
result_data['alerts'] = behavior_result['alerts']
result_data['behavior_stats'] = behavior_result['stats']
logger.info(f"[DetectionService] 行为分析完成: alerts={len(behavior_result['alerts'])}, stats={behavior_result['stats']}")
if draw:
frame = self.draw_detections(frame, detections, fps)
return frame, result_data
except Exception as e:
logger.error(f"帧检测失败: {e}")
return frame, {
'success': False,
'detections': [],
'stats': None
}
def _apply_behavior_analysis(
self,
result_data: Dict,
algorithm_config: Dict
) -> Dict:
"""
应用行为分析算法
Args:
result_data: 检测结果
algorithm_config: 算法配置
{
"enable_stationary_detection": true,
"enable_loitering_detection": false,
"stationary_threshold": 10.0,
"position_tolerance": 50,
...
}
Returns:
添加行为分析结果的检测结果
"""
detections = result_data['detections']
# 检查是否需要行为分析
enable_stationary = algorithm_config.get('enable_stationary_detection', False)
enable_loitering = algorithm_config.get('enable_loitering_detection', False)
if not enable_stationary and not enable_loitering:
return result_data
try:
# 使用前端传入的配置初始化服务
self.loitering_service.initialize(
stationary_threshold=algorithm_config.get('stationary_threshold', 10.0),
position_tolerance=algorithm_config.get('position_tolerance', 50),
loitering_threshold=algorithm_config.get('loitering_threshold', 300.0),
movement_threshold=algorithm_config.get('movement_threshold', 5.0),
enable_stationary_alert=enable_stationary,
enable_loitering_alert=enable_loitering
)
# 处理检测
behavior_result = self.loitering_service.process_detections(
detections,
use_tracking=enable_loitering # 只有启用徘徊检测时才使用跟踪
)
result_data['detections'] = behavior_result['detections']
result_data['alerts'] = behavior_result['alerts']
result_data['behavior_stats'] = behavior_result['stats']
except Exception as e:
logger.error(f"行为分析失败: {e}")
result_data['behavior_error'] = str(e)
return result_data
def draw_detections(
self,
frame: np.ndarray,
detections: List[Dict],
fps: float = 0,
algorithm_config: Optional[Dict] = None
) -> np.ndarray:
"""
绘制检测结果和行为告警
Args:
frame: 图像帧
detections: 检测结果列表(可能包含 stationary_info/loitering_info
fps: 帧率
algorithm_config: 算法配置(已废弃,保留用于向后兼容)
"""
try:
img_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
pil_img = Image.fromarray(img_rgb)
draw = ImageDraw.Draw(pil_img)
try:
font = ImageFont.truetype("/System/Library/Fonts/PingFang.ttc", 20)
font_large = ImageFont.truetype("/System/Library/Fonts/PingFang.ttc", 24)
except:
font = ImageFont.load_default()
font_large = font
class_colors = {
'Fire': (255, 0, 0),
'Smoke': (128, 128, 128),
'person': (0, 255, 0),
'helmet': (255, 255, 0),
'no_helmet': (255, 0, 255),
'cigarette': (0, 165, 255)
}
for det in detections:
x1, y1, x2, y2 = det['bbox']
class_name = det['class']
conf = det['confidence']
label = det['label']
# 根据是否有行为告警选择颜色
color = class_colors.get(class_name, (0, 255, 0))
# 检查行为告警
if algorithm_config:
if 'stationary_info' in det:
info = det['stationary_info']
if info.get('is_stationary'):
color = (0, 0, 255) # 红色警告
label = f"静止{int(info['duration'])}s"
if 'loitering_info' in det:
info = det['loitering_info']
if info.get('is_loitering'):
color = (255, 0, 0) # 蓝色警告
label = f"徘徊{int(info['loitering_duration']//60)}min"
draw.rectangle([x1, y1, x2, y2], outline=color, width=3)
label_text = f"{label} {conf:.2f}"
bbox = draw.textbbox((0, 0), label_text, font=font)
text_w = bbox[2] - bbox[0]
text_h = bbox[3] - bbox[1]
draw.rectangle([x1, y1 - text_h - 4, x1 + text_w + 4, y1], fill=color)
draw.text((x1 + 2, y1 - text_h - 2), label_text, fill=(255, 255, 255), font=font)
if fps > 0:
fps_text = f"FPS: {fps:.1f} | Detections: {len(detections)}"
draw.text((10, 10), fps_text, fill=(0, 255, 0), font=font)
return cv2.cvtColor(np.array(pil_img), cv2.COLOR_RGB2BGR)
except Exception as e:
logger.error(f"绘制检测结果失败: {e}")
return frame