Files
jc-video-recognize/apps/server/services/detection_service.py

629 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import cv2
import numpy as np
import time
import uuid
import logging
from typing import Dict, List, Optional
from PIL import Image, ImageDraw, ImageFont
import torch
from .loitering_service import get_loitering_service
from .adapters import DetectionAdapter
from .event import AlertRuleEngine, EventAggregator, EventDecisionEngine
from core.settings import get_settings
from models.event_schemas import DetectionSource
logger = logging.getLogger(__name__)
class DetectionService:
def __init__(self, model_service):
self.model_service = model_service
self.base_dir = os.path.dirname(os.path.dirname(__file__))
self.results_dir = os.path.join(self.base_dir, "static", "results")
self.temp_dir = os.path.join(self.base_dir, "static", "temp")
os.makedirs(self.results_dir, exist_ok=True)
os.makedirs(self.temp_dir, exist_ok=True)
# 初始化徘徊检测服务(懒加载,实际初始化在第一次使用时)
self.loitering_service = get_loitering_service()
# 事件管道 (MVP-1 / P1-P2-P5)
settings = get_settings()
self.decision_engine = EventDecisionEngine(
min_confidence=settings.detection.min_confidence,
)
rules_dir = os.path.join(self.base_dir, settings.event_engine.rules_dir)
self.rule_engine = AlertRuleEngine.from_directory(rules_dir)
self.event_aggregator = EventAggregator(
dedup_window_seconds=settings.event_engine.dedup_window_seconds,
max_active_events=settings.event_engine.max_active_events,
)
async def detect_image(
self,
image: np.ndarray,
model_id: str,
confidence: float = 0.5,
iou: float = 0.45,
algorithm_config: Optional[Dict] = None
) -> Dict:
start_time = time.time()
model = await self.model_service.load_model(model_id)
if not model:
return {
'success': False,
'message': f'模型加载失败: {model_id}',
'detections': [],
'stats': None
}
try:
results = model(image, conf=confidence, iou=iou, verbose=False)
detections = []
for result in results:
boxes = result.boxes
if len(boxes) == 0:
logger.info(f"模型 {model_id} 没有检测到目标")
continue
for box in boxes:
try:
if isinstance(box.xyxy, torch.Tensor) and box.xyxy.dim() > 0:
xyxy_values = box.xyxy.squeeze().tolist()
if len(xyxy_values) >= 4:
x1, y1, x2, y2 = float(xyxy_values[0]), float(xyxy_values[1]), float(xyxy_values[2]), float(xyxy_values[3])
else:
continue
elif isinstance(box.xyxy, (list, tuple)):
x1, y1, x2, y2 = float(box.xyxy[0]), float(box.xyxy[1]), float(box.xyxy[2]), float(box.xyxy[3])
else:
continue
if isinstance(box.conf, torch.Tensor):
if box.conf.dim() == 0:
conf = float(box.conf)
else:
conf = float(box.conf[0])
elif hasattr(box.conf, '__getitem__'):
conf = float(box.conf[0])
else:
conf = float(box.conf)
if isinstance(box.cls, torch.Tensor):
if box.cls.dim() == 0:
cls = int(box.cls)
else:
cls = int(box.cls[0])
elif hasattr(box.cls, '__getitem__'):
cls = int(box.cls[0])
else:
cls = int(box.cls)
except Exception as e:
import traceback
logger.error(f"访问 box 属性失败: {e}, box 类型: {type(box)}")
logger.error(f"错误堆栈: {traceback.format_exc()}")
logger.error(f"box 属性: {vars(box) if hasattr(box, '__dict__') else '无法获取'}")
continue
class_name = result.names[cls]
label_map = self.model_service.model_configs[model_id]['labels']
label = label_map.get(class_name, class_name)
detections.append({
'class': class_name,
'label': label,
'confidence': round(conf, 3),
'bbox': [int(x1), int(y1), int(x2), int(y2)]
})
processing_time = time.time() - start_time
avg_confidence = sum(d['confidence'] for d in detections) / len(detections) if detections else 0
result_data = {
'success': True,
'message': '检测完成',
'detections': detections,
'stats': {
'total_detections': len(detections),
'avg_confidence': round(avg_confidence, 3),
'processing_time': round(processing_time, 3),
'model_used': model_id
}
}
# 如果启用了行为检测算法
if algorithm_config and detections:
result_data = self._apply_behavior_analysis(
result_data, algorithm_config
)
# 事件管道 (MVP-1): 决策 → 规则 → 聚合
result_data = self._apply_event_pipeline(result_data, model_id)
return result_data
except Exception as e:
logger.error(f"图片检测失败: {e}")
return {
'success': False,
'message': f'检测失败: {str(e)}',
'detections': [],
'stats': None
}
async def detect_frame(
self,
frame: np.ndarray,
model_id: str,
confidence: float = 0.5,
iou: float = 0.45,
draw: bool = True,
composite: bool = False
) -> tuple:
start_time = time.time()
try:
# 如果是火灾检测模型且启用了复合检测
if composite and model_id == 'fire_detection':
result_data = await self.detect_fire_composite(frame, confidence=confidence, iou=iou)
if result_data['success']:
detections = result_data['detections']
processing_time = time.time() - start_time
fps = 1.0 / processing_time if processing_time > 0 else 0
# 更新 stats 中的 fps 和处理时间
result_data['stats']['fps'] = round(fps, 2)
result_data['stats']['processing_time'] = round(processing_time, 3)
if draw:
frame = self.draw_detections(frame, detections, fps)
# detect_fire_composite 已自带事件管道,此处无需再次调用
return frame, result_data
# 普通单模型检测
model = await self.model_service.load_model(model_id)
if not model:
return frame, {
'success': False,
'detections': [],
'stats': None
}
results = model(frame, conf=confidence, iou=iou, verbose=False)
detections = []
for result in results:
boxes = result.boxes
for box in boxes:
try:
if isinstance(box.xyxy, torch.Tensor) and box.xyxy.dim() > 0:
xyxy_values = box.xyxy.squeeze().tolist()
if len(xyxy_values) >= 4:
x1, y1, x2, y2 = float(xyxy_values[0]), float(xyxy_values[1]), float(xyxy_values[2]), float(xyxy_values[3])
else:
continue
elif isinstance(box.xyxy, (list, tuple)):
x1, y1, x2, y2 = float(box.xyxy[0]), float(box.xyxy[1]), float(box.xyxy[2]), float(box.xyxy[3])
else:
continue
if isinstance(box.conf, torch.Tensor):
if box.conf.dim() == 0:
conf = float(box.conf)
else:
conf = float(box.conf[0])
elif hasattr(box.conf, '__getitem__'):
conf = float(box.conf[0])
else:
conf = float(box.conf)
if isinstance(box.cls, torch.Tensor):
if box.cls.dim() == 0:
cls = int(box.cls)
else:
cls = int(box.cls[0])
elif hasattr(box.cls, '__getitem__'):
cls = int(box.cls[0])
else:
cls = int(box.cls)
class_name = result.names[cls]
label_map = self.model_service.model_configs[model_id]['labels']
label = label_map.get(class_name, class_name)
detections.append({
'class': class_name,
'label': label,
'confidence': round(conf, 3),
'bbox': [int(x1), int(y1), int(x2), int(y2)]
})
except Exception as e:
import traceback
logger.error(f"VIDEO DEBUG: 访问 box 属性失败: {e}, box 类型: {type(box)}")
logger.error(f"VIDEO DEBUG: 错误堆栈: {traceback.format_exc()}")
logger.error(f"VIDEO DEBUG: box 属性: {vars(box) if hasattr(box, '__dict__') else '无法获取'}")
continue
processing_time = time.time() - start_time
fps = 1.0 / processing_time if processing_time > 0 else 0
avg_confidence = sum(d['confidence'] for d in detections) / len(detections) if detections else 0
result_data = {
'success': True,
'detections': detections,
'stats': {
'total_detections': len(detections),
'avg_confidence': round(avg_confidence, 3),
'processing_time': round(processing_time, 3),
'fps': round(fps, 2),
'model_used': model_id
}
}
# 如果是人员检测模型,进行行为分析
logger.info(f"[DetectionService] 模型: {model_id}, 检测目标: {len(detections)}")
if model_id == 'loitering_detection' and detections:
logger.info("[DetectionService] 调用行为分析...")
# 确保服务已初始化
if not self.loitering_service.is_initialized:
logger.info("[DetectionService] 初始化徘徊检测服务...")
self.loitering_service.initialize(
# 检测阈值(用于判断是否静止/徘徊)
stationary_threshold=10.0,
position_tolerance=50,
loitering_threshold=300.0,
movement_threshold=5.0,
# 告警阈值(用于触发告警,应该比检测阈值高)
stationary_alert_threshold=30.0,
loitering_alert_threshold=600.0,
# 启用告警
enable_stationary_alert=True,
enable_loitering_alert=True
)
behavior_result = self.loitering_service.process_detections(
detections,
use_tracking=False # 可以改为 True 如果使用跟踪
)
detections = behavior_result['detections']
result_data['alerts'] = behavior_result['alerts']
result_data['behavior_stats'] = behavior_result['stats']
logger.info(f"[DetectionService] 行为分析完成: alerts={len(behavior_result['alerts'])}, stats={behavior_result['stats']}")
if draw:
frame = self.draw_detections(frame, detections, fps)
# 事件管道 (MVP-1): 决策 → 规则 → 聚合
result_data = self._apply_event_pipeline(result_data, model_id=model_id)
return frame, result_data
except Exception as e:
logger.error(f"帧检测失败: {e}")
return frame, {
'success': False,
'detections': [],
'stats': None
}
async def detect_fire_composite(
self,
image: np.ndarray,
confidence: float = 0.1,
iou: float = 0.45
) -> Dict:
"""
复合火灾检测:同时检测火焰和烟雾
"""
start_time = time.time()
try:
# 1. 检测火焰
fire_model = await self.model_service.load_model('fire_detection')
fire_results = fire_model(image, conf=confidence, iou=iou, verbose=False)
fire_detections = []
for result in fire_results:
for box in result.boxes:
try:
xyxy_values = box.xyxy.squeeze().tolist()
if len(xyxy_values) >= 4:
x1, y1, x2, y2 = float(xyxy_values[0]), float(xyxy_values[1]), float(xyxy_values[2]), float(xyxy_values[3])
else:
continue
conf = float(box.conf[0]) if hasattr(box.conf, '__getitem__') else float(box.conf)
cls = int(box.cls[0]) if hasattr(box.cls, '__getitem__') else int(box.cls)
class_name = result.names[cls]
if class_name == 'Fire':
fire_detections.append({
'class': 'Fire',
'label': '火焰',
'confidence': round(conf, 3),
'bbox': [int(x1), int(y1), int(x2), int(y2)],
'type': 'fire'
})
except Exception as e:
logger.error(f"火焰检测解析失败: {e}")
continue
# 2. 检测烟雾使用YOLOv10-M专用火灾烟雾模型只保留 Smoke 类别)
smoke_model = await self.model_service.load_model('smoke_detection')
smoke_results = smoke_model(image, conf=confidence, iou=iou, verbose=False)
smoke_detections = []
for result in smoke_results:
for box in result.boxes:
try:
xyxy_values = box.xyxy.squeeze().tolist()
if len(xyxy_values) >= 4:
x1, y1, x2, y2 = float(xyxy_values[0]), float(xyxy_values[1]), float(xyxy_values[2]), float(xyxy_values[3])
else:
continue
conf = float(box.conf[0]) if hasattr(box.conf, '__getitem__') else float(box.conf)
cls = int(box.cls[0]) if hasattr(box.cls, '__getitem__') else int(box.cls)
class_name = result.names[cls]
# 只保留 Smoke 类别(注意模型输出为大写 S
if class_name == 'Smoke':
smoke_detections.append({
'class': 'Smoke',
'label': '烟雾',
'confidence': round(conf, 3),
'bbox': [int(x1), int(y1), int(x2), int(y2)],
'type': 'smoke'
})
except Exception as e:
logger.error(f"烟雾检测解析失败: {e}")
continue
# 3. 合并所有检测
all_detections = fire_detections + smoke_detections
# 4. 判定是否疑似火灾(火焰或烟雾任一检测到即判定为是)
suspected_fire = len(fire_detections) > 0 or len(smoke_detections) > 0
processing_time = time.time() - start_time
avg_confidence = sum(d['confidence'] for d in all_detections) / len(all_detections) if all_detections else 0
result_data = {
'success': True,
'message': '复合火灾检测完成',
'detections': all_detections,
'stats': {
'total_detections': len(all_detections),
'fire_count': len(fire_detections),
'smoke_count': len(smoke_detections),
'avg_confidence': round(avg_confidence, 3),
'suspected_fire': suspected_fire,
'suspected_fire_label': '' if suspected_fire else '',
'processing_time': round(processing_time, 3),
'model_used': 'fire_composite'
}
}
# 事件管道 (MVP-1): 复合火灾检测产出 candidate/alert 事件
result_data = self._apply_event_pipeline(
result_data,
model_id='fire_composite',
source=DetectionSource.COMPOSITE,
)
return result_data
except Exception as e:
logger.error(f"复合火灾检测失败: {e}")
import traceback
logger.error(f"错误堆栈: {traceback.format_exc()}")
return {
'success': False,
'message': f'复合火灾检测失败: {str(e)}',
'detections': [],
'stats': None
}
def _apply_behavior_analysis(
self,
result_data: Dict,
algorithm_config: Dict
) -> Dict:
"""
应用行为分析算法
Args:
result_data: 检测结果
algorithm_config: 算法配置
{
"enable_stationary_detection": true,
"enable_loitering_detection": false,
"stationary_threshold": 10.0,
"position_tolerance": 50,
...
}
Returns:
添加行为分析结果的检测结果
"""
detections = result_data['detections']
# 检查是否需要行为分析
enable_stationary = algorithm_config.get('enable_stationary_detection', False)
enable_loitering = algorithm_config.get('enable_loitering_detection', False)
if not enable_stationary and not enable_loitering:
return result_data
try:
# 使用前端传入的配置初始化服务
self.loitering_service.initialize(
stationary_threshold=algorithm_config.get('stationary_threshold', 10.0),
position_tolerance=algorithm_config.get('position_tolerance', 50),
loitering_threshold=algorithm_config.get('loitering_threshold', 300.0),
movement_threshold=algorithm_config.get('movement_threshold', 5.0),
enable_stationary_alert=enable_stationary,
enable_loitering_alert=enable_loitering
)
# 处理检测
behavior_result = self.loitering_service.process_detections(
detections,
use_tracking=enable_loitering # 只有启用徘徊检测时才使用跟踪
)
result_data['detections'] = behavior_result['detections']
result_data['alerts'] = behavior_result['alerts']
result_data['behavior_stats'] = behavior_result['stats']
except Exception as e:
logger.error(f"行为分析失败: {e}")
result_data['behavior_error'] = str(e)
return result_data
def _apply_event_pipeline(
self,
result_data: Dict,
model_id: Optional[str] = None,
source_id: Optional[str] = None,
source: DetectionSource = DetectionSource.YOLO,
) -> Dict:
"""对检测结果执行 决策 → 规则 → 聚合 三段管道。
在 ``result_data`` 中追加两个字段:
- ``candidate_events``: List[dict] 决策引擎产出的候选事件
- ``alert_events``: List[dict] 规则命中后经聚合的预警事件
"""
if not result_data.get('success') or not result_data.get('detections'):
result_data['candidate_events'] = []
result_data['alert_events'] = []
return result_data
try:
unified_result = DetectionAdapter.from_yolo(
result_data, model_id=model_id, source=source
)
candidates = self.decision_engine.decide(unified_result, source_id=source_id)
alerts = self.rule_engine.evaluate(candidates)
emitted = self.event_aggregator.aggregate(alerts)
result_data['candidate_events'] = [
event.model_dump(mode='json') for event in candidates
]
result_data['alert_events'] = [
event.model_dump(mode='json') for event in emitted
]
except Exception as e: # noqa: BLE001
logger.error(f"事件管道执行失败: {e}")
result_data['candidate_events'] = []
result_data['alert_events'] = []
result_data['event_pipeline_error'] = str(e)
return result_data
def draw_detections(
self,
frame: np.ndarray,
detections: List[Dict],
fps: float = 0,
algorithm_config: Optional[Dict] = None
) -> np.ndarray:
"""
绘制检测结果和行为告警
Args:
frame: 图像帧
detections: 检测结果列表(可能包含 stationary_info/loitering_info
fps: 帧率
algorithm_config: 算法配置(已废弃,保留用于向后兼容)
"""
try:
img_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
pil_img = Image.fromarray(img_rgb)
draw = ImageDraw.Draw(pil_img)
try:
font = ImageFont.truetype("/System/Library/Fonts/PingFang.ttc", 20)
font_large = ImageFont.truetype("/System/Library/Fonts/PingFang.ttc", 24)
except:
font = ImageFont.load_default()
font_large = font
class_colors = {
'Fire': (255, 0, 0),
'Smoke': (128, 128, 128),
'person': (0, 255, 0),
'helmet': (255, 255, 0),
'no_helmet': (255, 0, 255),
'cigarette': (0, 165, 255),
# 兼容旧模型类别
'violence': (0, 0, 255),
'fight': (0, 0, 255),
'normal': (0, 200, 0),
'non_violence': (0, 200, 0)
}
for det in detections:
x1, y1, x2, y2 = det['bbox']
class_name = det['class']
conf = det['confidence']
label = det['label']
# 根据是否有行为告警选择颜色
color = class_colors.get(class_name, (0, 255, 0))
# 检查行为告警
if algorithm_config:
if 'stationary_info' in det:
info = det['stationary_info']
if info.get('is_stationary'):
color = (0, 0, 255) # 红色警告
label = f"静止{int(info['duration'])}s"
if 'loitering_info' in det:
info = det['loitering_info']
if info.get('is_loitering'):
color = (255, 0, 0) # 蓝色警告
label = f"徘徊{int(info['loitering_duration']//60)}min"
draw.rectangle([x1, y1, x2, y2], outline=color, width=3)
label_text = f"{label} {conf:.2f}"
bbox = draw.textbbox((0, 0), label_text, font=font)
text_w = bbox[2] - bbox[0]
text_h = bbox[3] - bbox[1]
draw.rectangle([x1, y1 - text_h - 4, x1 + text_w + 4, y1], fill=color)
draw.text((x1 + 2, y1 - text_h - 2), label_text, fill=(255, 255, 255), font=font)
if fps > 0:
fps_text = f"FPS: {fps:.1f} | Detections: {len(detections)}"
draw.text((10, 10), fps_text, fill=(0, 255, 0), font=font)
return cv2.cvtColor(np.array(pil_img), cv2.COLOR_RGB2BGR)
except Exception as e:
logger.error(f"绘制检测结果失败: {e}")
return frame