Features: - Fire detection (YOLOv10) - Helmet detection (YOLOv8) - Crowd detection (YOLOv8) - Smoking detection (YOLOv8) - Loitering detection (YOLOv8) Tech Stack: - Frontend: Vue 3 + Vite + Element Plus - Backend: FastAPI + WebSocket - Monorepo: pnpm workspace + Turbo - Docker support included
200 lines
7.3 KiB
Python
200 lines
7.3 KiB
Python
import os
|
|
import cv2
|
|
import numpy as np
|
|
import time
|
|
import uuid
|
|
import logging
|
|
from typing import Dict, List, Optional
|
|
from PIL import Image, ImageDraw, ImageFont
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class DetectionService:
|
|
def __init__(self, model_service):
|
|
self.model_service = model_service
|
|
self.base_dir = os.path.dirname(os.path.dirname(__file__))
|
|
self.results_dir = os.path.join(self.base_dir, "static", "results")
|
|
self.temp_dir = os.path.join(self.base_dir, "static", "temp")
|
|
|
|
os.makedirs(self.results_dir, exist_ok=True)
|
|
os.makedirs(self.temp_dir, exist_ok=True)
|
|
|
|
def draw_detections(self, frame: np.ndarray, detections: List[Dict], fps: float = 0) -> np.ndarray:
|
|
try:
|
|
img_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
|
pil_img = Image.fromarray(img_rgb)
|
|
draw = ImageDraw.Draw(pil_img)
|
|
|
|
try:
|
|
font = ImageFont.truetype("/System/Library/Fonts/PingFang.ttc", 20)
|
|
font_large = ImageFont.truetype("/System/Library/Fonts/PingFang.ttc", 24)
|
|
except:
|
|
font = ImageFont.load_default()
|
|
font_large = font
|
|
|
|
class_colors = {
|
|
'Fire': (255, 0, 0),
|
|
'Smoke': (128, 128, 128),
|
|
'person': (0, 255, 0),
|
|
'helmet': (255, 255, 0),
|
|
'no_helmet': (255, 0, 255),
|
|
'cigarette': (0, 165, 255) # 橙色,用于抽烟检测
|
|
}
|
|
|
|
for det in detections:
|
|
x1, y1, x2, y2 = det['bbox']
|
|
class_name = det['class']
|
|
conf = det['confidence']
|
|
label = det['label']
|
|
|
|
color = class_colors.get(class_name, (0, 255, 0))
|
|
|
|
draw.rectangle([x1, y1, x2, y2], outline=color, width=3)
|
|
|
|
label_text = f"{label} {conf:.2f}"
|
|
bbox = draw.textbbox((0, 0), label_text, font=font)
|
|
text_w = bbox[2] - bbox[0]
|
|
text_h = bbox[3] - bbox[1]
|
|
draw.rectangle([x1, y1 - text_h - 4, x1 + text_w + 4, y1], fill=color)
|
|
draw.text((x1 + 2, y1 - text_h - 2), label_text, fill=(255, 255, 255), font=font)
|
|
|
|
if fps > 0:
|
|
fps_text = f"FPS: {fps:.1f} | Detections: {len(detections)}"
|
|
draw.text((10, 10), fps_text, fill=(0, 255, 0), font=font)
|
|
|
|
return cv2.cvtColor(np.array(pil_img), cv2.COLOR_RGB2BGR)
|
|
except Exception as e:
|
|
logger.error(f"绘制检测结果失败: {e}")
|
|
return frame
|
|
|
|
async def detect_image(
|
|
self,
|
|
image: np.ndarray,
|
|
model_id: str,
|
|
confidence: float = 0.5,
|
|
iou: float = 0.45
|
|
) -> Dict:
|
|
start_time = time.time()
|
|
|
|
model = await self.model_service.load_model(model_id)
|
|
if not model:
|
|
return {
|
|
'success': False,
|
|
'message': f'模型加载失败: {model_id}',
|
|
'detections': [],
|
|
'stats': None
|
|
}
|
|
|
|
try:
|
|
results = model(image, conf=confidence, iou=iou, verbose=False)
|
|
|
|
detections = []
|
|
for result in results:
|
|
boxes = result.boxes
|
|
for box in boxes:
|
|
x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
|
|
conf = float(box.conf[0].cpu().numpy())
|
|
cls = int(box.cls[0].cpu().numpy())
|
|
class_name = result.names[cls]
|
|
|
|
label_map = self.model_service.model_configs[model_id]['labels']
|
|
label = label_map.get(class_name, class_name)
|
|
|
|
detections.append({
|
|
'class': class_name,
|
|
'label': label,
|
|
'confidence': round(conf, 3),
|
|
'bbox': [int(x1), int(y1), int(x2), int(y2)]
|
|
})
|
|
|
|
processing_time = time.time() - start_time
|
|
avg_confidence = sum(d['confidence'] for d in detections) / len(detections) if detections else 0
|
|
|
|
return {
|
|
'success': True,
|
|
'message': '检测完成',
|
|
'detections': detections,
|
|
'stats': {
|
|
'total_detections': len(detections),
|
|
'avg_confidence': round(avg_confidence, 3),
|
|
'processing_time': round(processing_time, 3),
|
|
'model_used': model_id
|
|
}
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"图片检测失败: {e}")
|
|
return {
|
|
'success': False,
|
|
'message': f'检测失败: {str(e)}',
|
|
'detections': [],
|
|
'stats': None
|
|
}
|
|
|
|
async def detect_frame(
|
|
self,
|
|
frame: np.ndarray,
|
|
model_id: str,
|
|
confidence: float = 0.5,
|
|
iou: float = 0.45,
|
|
draw: bool = True
|
|
) -> tuple:
|
|
start_time = time.time()
|
|
|
|
model = await self.model_service.load_model(model_id)
|
|
if not model:
|
|
return frame, {
|
|
'success': False,
|
|
'detections': [],
|
|
'stats': None
|
|
}
|
|
|
|
try:
|
|
results = model(frame, conf=confidence, iou=iou, verbose=False)
|
|
|
|
detections = []
|
|
for result in results:
|
|
boxes = result.boxes
|
|
for box in boxes:
|
|
x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
|
|
conf = float(box.conf[0].cpu().numpy())
|
|
cls = int(box.cls[0].cpu().numpy())
|
|
class_name = result.names[cls]
|
|
|
|
label_map = self.model_service.model_configs[model_id]['labels']
|
|
label = label_map.get(class_name, class_name)
|
|
|
|
detections.append({
|
|
'class': class_name,
|
|
'label': label,
|
|
'confidence': round(conf, 3),
|
|
'bbox': [int(x1), int(y1), int(x2), int(y2)]
|
|
})
|
|
|
|
processing_time = time.time() - start_time
|
|
fps = 1.0 / processing_time if processing_time > 0 else 0
|
|
avg_confidence = sum(d['confidence'] for d in detections) / len(detections) if detections else 0
|
|
|
|
result_data = {
|
|
'success': True,
|
|
'detections': detections,
|
|
'stats': {
|
|
'total_detections': len(detections),
|
|
'avg_confidence': round(avg_confidence, 3),
|
|
'processing_time': round(processing_time, 3),
|
|
'fps': round(fps, 2),
|
|
'model_used': model_id
|
|
}
|
|
}
|
|
|
|
if draw:
|
|
frame = self.draw_detections(frame, detections, fps)
|
|
|
|
return frame, result_data
|
|
except Exception as e:
|
|
logger.error(f"帧检测失败: {e}")
|
|
return frame, {
|
|
'success': False,
|
|
'detections': [],
|
|
'stats': None
|
|
}
|