Initial commit: Video detection platform with YOLO models

Features:
- Fire detection (YOLOv10)
- Helmet detection (YOLOv8)
- Crowd detection (YOLOv8)
- Smoking detection (YOLOv8)
- Loitering detection (YOLOv8)

Tech Stack:
- Frontend: Vue 3 + Vite + Element Plus
- Backend: FastAPI + WebSocket
- Monorepo: pnpm workspace + Turbo
- Docker support included
This commit is contained in:
wwh
2026-05-18 10:54:10 +08:00
commit 8fb58c75fe
42 changed files with 6663 additions and 0 deletions

View File

@@ -0,0 +1,357 @@
"""
YOLO 格式的抽烟检测模型适配器
将 PaddleDetection 模型包装为 YOLO 接口
"""
import os
import cv2
import numpy as np
import subprocess
import tempfile
import logging
from typing import List, Dict, Union
from pathlib import Path
logger = logging.getLogger(__name__)
class SmokingDetectionYOLO:
"""
模拟 YOLO 接口的抽烟检测模型
底层使用 PaddleDetection Docker 容器
"""
def __init__(self, model_path=None):
"""
初始化模型
Args:
model_path: 模型路径(可选,仅用于兼容性)
"""
self.model_name = "smoking_detection"
self.docker_image = "smoking-detection:test"
self.model_dir = "output_inference/ppyoloe_crn_s_80e_smoking_visdrone"
self.threshold = 0.1
# YOLO 兼容属性
self.names = {0: 'cigarette'}
self.model = self # 自我引用,保持与 YOLO 相同的接口
# 检查 Docker
self._check_docker()
logger.info(f"抽烟检测模型初始化完成Docker可用: {self.available}")
def _check_docker(self):
"""检查 Docker 环境"""
try:
result = subprocess.run(
["docker", "info"],
capture_output=True,
text=True,
timeout=5
)
self.available = result.returncode == 0
if self.available:
# 检查镜像
result = subprocess.run(
["docker", "image", "inspect", self.docker_image],
capture_output=True,
text=True,
timeout=5
)
self.available = result.returncode == 0
except Exception as e:
logger.error(f"Docker 检查失败: {e}")
self.available = False
def __call__(self, source, conf=0.1, iou=0.45, verbose=False, stream=False):
"""
模拟 YOLO 模型的调用接口
Args:
source: 图片路径、OpenCV 图片、或图片列表
conf: 置信度阈值
iou: IoU 阈值PaddleDetection 不支持,仅用于兼容)
verbose: 是否输出详细信息
stream: 是否流式输出(仅用于兼容)
Returns:
YOLOResult 对象列表
"""
if not self.available:
logger.error("Docker 不可用,无法运行检测")
return [YOLOResult([])]
# 处理不同类型的输入
if isinstance(source, str):
# 图片路径
image = cv2.imread(source)
if image is None:
logger.error(f"无法读取图片: {source}")
return [YOLOResult([])]
return self._detect_single(image, conf, verbose)
elif isinstance(source, np.ndarray):
# OpenCV 图片
return self._detect_single(source, conf, verbose)
elif isinstance(source, list):
# 图片列表
results = []
for img in source:
if isinstance(img, str):
img = cv2.imread(img)
if img is not None:
results.extend(self._detect_single(img, conf, verbose))
return results
else:
logger.error(f"不支持的输入类型: {type(source)}")
return [YOLOResult([])]
def _detect_single(self, image: np.ndarray, conf: float, verbose: bool) -> List['YOLOResult']:
"""检测单张图片"""
try:
# 创建临时文件
with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as f:
temp_input = f.name
# 保存输入图片
cv2.imwrite(temp_input, image)
if verbose:
logger.info(f"正在检测: {temp_input}")
# 构建 Docker 命令
cmd = [
"docker", "run", "--rm",
"-v", f"{temp_input}:/workspace/input.jpg",
self.docker_image,
"python", "deploy/python/infer.py",
f"--model_dir={self.model_dir}",
"--image_file=/workspace/input.jpg",
"--device=CPU",
"--output_dir=/workspace",
f"--threshold={conf}"
]
# 执行检测
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=60
)
if verbose:
logger.info(f"检测完成,返回码: {result.returncode}")
# 解析结果
detections = self._parse_output(result.stdout)
if verbose and detections:
logger.info(f"检测到 {len(detections)} 个目标")
# 清理临时文件
try:
os.remove(temp_input)
except:
pass
return [YOLOResult(detections)]
except subprocess.TimeoutExpired:
logger.error("检测超时")
return [YOLOResult([])]
except Exception as e:
logger.error(f"检测失败: {e}")
return [YOLOResult([])]
def _parse_output(self, output: str) -> List[Dict]:
"""解析检测输出"""
detections = []
import re
# 使用正则表达式匹配检测行
# 格式: class_id:0, confidence:0.8921, left_top:[268.66,231.64],right_bottom:[351.87,258.66]
pattern = r'class_id:\d+,\s*confidence:([\d.]+),\s*left_top:\[([\d.]+),\s*([\d.]+)\],\s*right_bottom:\[([\d.]+),\s*([\d.]+)\]'
for line in output.split('\n'):
match = re.search(pattern, line)
if match:
try:
confidence = float(match.group(1))
x1 = float(match.group(2))
y1 = float(match.group(3))
x2 = float(match.group(4))
y2 = float(match.group(5))
detections.append({
'bbox': [int(x1), int(y1), int(x2), int(y2)],
'confidence': confidence,
'class': 0,
'name': 'cigarette'
})
except Exception as e:
logger.warning(f"解析检测结果失败: {e}, line: {line}")
continue
return detections
def predict(self, source, **kwargs):
"""兼容 predict 方法"""
return self.__call__(source, **kwargs)
class YOLOResult:
"""
模拟 YOLO 检测结果对象
提供与 ultralytics YOLO 结果相同的接口
"""
def __init__(self, detections: List[Dict]):
self.detections = detections
self.names = {0: 'cigarette'}
# 创建 boxes 对象
self.boxes = Boxes(detections)
# 其他 YOLO 结果属性
self.probs = None
self.keypoints = None
self.obb = None
self.speed = {'preprocess': 0, 'inference': 0, 'postprocess': 0}
def __len__(self):
return len(self.detections)
def __getitem__(self, idx):
"""支持索引访问"""
if idx < len(self.detections):
return YOLOResult([self.detections[idx]])
return YOLOResult([])
def plot(self, **kwargs):
"""绘制检测结果(兼容方法)"""
return None
class Boxes:
"""
模拟 YOLO boxes 对象
提供 xyxy, conf, cls 等属性
"""
def __init__(self, detections: List[Dict]):
self.detections = detections
# 尝试使用 torch如果没有则使用 numpy
try:
import torch
if detections:
xyxy_list = [[d['bbox'][0], d['bbox'][1], d['bbox'][2], d['bbox'][3]] for d in detections]
conf_list = [[d['confidence']] for d in detections]
cls_list = [[d['class']] for d in detections]
self.xyxy = torch.tensor(xyxy_list, dtype=torch.float32)
self.conf = torch.tensor(conf_list, dtype=torch.float32)
self.cls = torch.tensor(cls_list, dtype=torch.int64)
self.id = None
else:
self.xyxy = torch.empty((0, 4))
self.conf = torch.empty((0, 1))
self.cls = torch.empty((0, 1), dtype=torch.int64)
self.id = None
except ImportError:
# 如果没有 torch使用 numpy
import numpy as np
if detections:
xyxy_list = [[d['bbox'][0], d['bbox'][1], d['bbox'][2], d['bbox'][3]] for d in detections]
conf_list = [[d['confidence']] for d in detections]
cls_list = [[d['class']] for d in detections]
self.xyxy = np.array(xyxy_list, dtype=np.float32)
self.conf = np.array(conf_list, dtype=np.float32)
self.cls = np.array(cls_list, dtype=np.int64)
self.id = None
else:
self.xyxy = np.empty((0, 4), dtype=np.float32)
self.conf = np.empty((0, 1), dtype=np.float32)
self.cls = np.empty((0, 1), dtype=np.int64)
self.id = None
def __len__(self):
return len(self.detections)
def __iter__(self):
"""使 Boxes 可迭代"""
for i in range(len(self.detections)):
yield Box(self, i)
def cpu(self):
"""兼容方法"""
return self
def numpy(self):
"""转换为 numpy"""
if hasattr(self.xyxy, 'numpy'):
return type('Boxes', (), {
'xyxy': self.xyxy.numpy(),
'conf': self.conf.numpy(),
'cls': self.cls.numpy(),
'id': self.id
})()
return self
class Box:
"""
模拟单个检测框对象
"""
def __init__(self, boxes: Boxes, index: int):
self._boxes = boxes
self._index = index
@property
def xyxy(self):
"""返回 xyxy 坐标 (1, 4) 形状"""
import torch
import numpy as np
coords = self._boxes.xyxy[self._index]
if isinstance(coords, torch.Tensor):
return coords.unsqueeze(0)
else:
return np.array([coords])
@property
def conf(self):
"""返回置信度 (1,) 形状 - 与 YOLO 兼容"""
import torch
import numpy as np
conf_val = self._boxes.conf[self._index]
# 返回 (1,) 形状,与 YOLO 一致
if isinstance(conf_val, torch.Tensor):
return conf_val.view(1)
else:
return np.array([conf_val])
@property
def cls(self):
"""返回类别 (1,) 形状 - 与 YOLO 兼容"""
import torch
import numpy as np
cls_val = self._boxes.cls[self._index]
# 返回 (1,) 形状,与 YOLO 一致
if isinstance(cls_val, torch.Tensor):
return cls_val.view(1)
else:
return np.array([cls_val])