import os import cv2 import numpy as np import time import uuid import logging from typing import Dict, List, Optional from PIL import Image, ImageDraw, ImageFont logger = logging.getLogger(__name__) class DetectionService: def __init__(self, model_service): self.model_service = model_service self.base_dir = os.path.dirname(os.path.dirname(__file__)) self.results_dir = os.path.join(self.base_dir, "static", "results") self.temp_dir = os.path.join(self.base_dir, "static", "temp") os.makedirs(self.results_dir, exist_ok=True) os.makedirs(self.temp_dir, exist_ok=True) def draw_detections(self, frame: np.ndarray, detections: List[Dict], fps: float = 0) -> np.ndarray: try: img_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) pil_img = Image.fromarray(img_rgb) draw = ImageDraw.Draw(pil_img) try: font = ImageFont.truetype("/System/Library/Fonts/PingFang.ttc", 20) font_large = ImageFont.truetype("/System/Library/Fonts/PingFang.ttc", 24) except: font = ImageFont.load_default() font_large = font class_colors = { 'Fire': (255, 0, 0), 'Smoke': (128, 128, 128), 'person': (0, 255, 0), 'helmet': (255, 255, 0), 'no_helmet': (255, 0, 255), 'cigarette': (0, 165, 255) # 橙色,用于抽烟检测 } for det in detections: x1, y1, x2, y2 = det['bbox'] class_name = det['class'] conf = det['confidence'] label = det['label'] color = class_colors.get(class_name, (0, 255, 0)) draw.rectangle([x1, y1, x2, y2], outline=color, width=3) label_text = f"{label} {conf:.2f}" bbox = draw.textbbox((0, 0), label_text, font=font) text_w = bbox[2] - bbox[0] text_h = bbox[3] - bbox[1] draw.rectangle([x1, y1 - text_h - 4, x1 + text_w + 4, y1], fill=color) draw.text((x1 + 2, y1 - text_h - 2), label_text, fill=(255, 255, 255), font=font) if fps > 0: fps_text = f"FPS: {fps:.1f} | Detections: {len(detections)}" draw.text((10, 10), fps_text, fill=(0, 255, 0), font=font) return cv2.cvtColor(np.array(pil_img), cv2.COLOR_RGB2BGR) except Exception as e: logger.error(f"绘制检测结果失败: {e}") return frame async def detect_image( self, image: np.ndarray, model_id: str, confidence: float = 0.5, iou: float = 0.45 ) -> Dict: start_time = time.time() model = await self.model_service.load_model(model_id) if not model: return { 'success': False, 'message': f'模型加载失败: {model_id}', 'detections': [], 'stats': None } try: results = model(image, conf=confidence, iou=iou, verbose=False) detections = [] for result in results: boxes = result.boxes for box in boxes: x1, y1, x2, y2 = box.xyxy[0].cpu().numpy() conf = float(box.conf[0].cpu().numpy()) cls = int(box.cls[0].cpu().numpy()) class_name = result.names[cls] label_map = self.model_service.model_configs[model_id]['labels'] label = label_map.get(class_name, class_name) detections.append({ 'class': class_name, 'label': label, 'confidence': round(conf, 3), 'bbox': [int(x1), int(y1), int(x2), int(y2)] }) processing_time = time.time() - start_time avg_confidence = sum(d['confidence'] for d in detections) / len(detections) if detections else 0 return { 'success': True, 'message': '检测完成', 'detections': detections, 'stats': { 'total_detections': len(detections), 'avg_confidence': round(avg_confidence, 3), 'processing_time': round(processing_time, 3), 'model_used': model_id } } except Exception as e: logger.error(f"图片检测失败: {e}") return { 'success': False, 'message': f'检测失败: {str(e)}', 'detections': [], 'stats': None } async def detect_frame( self, frame: np.ndarray, model_id: str, confidence: float = 0.5, iou: float = 0.45, draw: bool = True ) -> tuple: start_time = time.time() model = await self.model_service.load_model(model_id) if not model: return frame, { 'success': False, 'detections': [], 'stats': None } try: results = model(frame, conf=confidence, iou=iou, verbose=False) detections = [] for result in results: boxes = result.boxes for box in boxes: x1, y1, x2, y2 = box.xyxy[0].cpu().numpy() conf = float(box.conf[0].cpu().numpy()) cls = int(box.cls[0].cpu().numpy()) class_name = result.names[cls] label_map = self.model_service.model_configs[model_id]['labels'] label = label_map.get(class_name, class_name) detections.append({ 'class': class_name, 'label': label, 'confidence': round(conf, 3), 'bbox': [int(x1), int(y1), int(x2), int(y2)] }) processing_time = time.time() - start_time fps = 1.0 / processing_time if processing_time > 0 else 0 avg_confidence = sum(d['confidence'] for d in detections) / len(detections) if detections else 0 result_data = { 'success': True, 'detections': detections, 'stats': { 'total_detections': len(detections), 'avg_confidence': round(avg_confidence, 3), 'processing_time': round(processing_time, 3), 'fps': round(fps, 2), 'model_used': model_id } } if draw: frame = self.draw_detections(frame, detections, fps) return frame, result_data except Exception as e: logger.error(f"帧检测失败: {e}") return frame, { 'success': False, 'detections': [], 'stats': None }