Features: - Fire detection (YOLOv10) - Helmet detection (YOLOv8) - Crowd detection (YOLOv8) - Smoking detection (YOLOv8) - Loitering detection (YOLOv8) Tech Stack: - Frontend: Vue 3 + Vite + Element Plus - Backend: FastAPI + WebSocket - Monorepo: pnpm workspace + Turbo - Docker support included
400 lines
13 KiB
Python
400 lines
13 KiB
Python
"""
|
||
YOLO 格式的抽烟检测模型适配器(优化版)
|
||
使用后台 Docker 容器,避免重复启动
|
||
"""
|
||
|
||
import os
|
||
import cv2
|
||
import numpy as np
|
||
import subprocess
|
||
import tempfile
|
||
import logging
|
||
import time
|
||
from typing import List, Dict, Optional
|
||
from pathlib import Path
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class SmokingDetectionYOLO:
|
||
"""
|
||
模拟 YOLO 接口的抽烟检测模型(优化版)
|
||
使用后台 Docker 容器,避免每次检测都启动新容器
|
||
"""
|
||
|
||
_container_name = "smoking-detection-server"
|
||
_container_started = False
|
||
|
||
def __init__(self, model_path=None):
|
||
self.model_name = "smoking_detection"
|
||
self.docker_image = "smoking-detection:test"
|
||
self.model_dir = "output_inference/ppyoloe_crn_s_80e_smoking_visdrone"
|
||
self.threshold = 0.1
|
||
|
||
# YOLO 兼容属性
|
||
self.names = {0: 'cigarette'}
|
||
self.model = self
|
||
|
||
# 检查 Docker 并启动后台容器
|
||
self._check_docker()
|
||
if self.available:
|
||
self._start_background_container()
|
||
|
||
logger.info(f"抽烟检测模型初始化完成,Docker可用: {self.available}")
|
||
|
||
def _check_docker(self):
|
||
"""检查 Docker 环境"""
|
||
try:
|
||
result = subprocess.run(
|
||
["docker", "info"],
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=5
|
||
)
|
||
self.available = result.returncode == 0
|
||
|
||
if self.available:
|
||
result = subprocess.run(
|
||
["docker", "image", "inspect", self.docker_image],
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=5
|
||
)
|
||
self.available = result.returncode == 0
|
||
|
||
except Exception as e:
|
||
logger.error(f"Docker 检查失败: {e}")
|
||
self.available = False
|
||
|
||
def _start_background_container(self):
|
||
"""启动后台容器"""
|
||
try:
|
||
# 检查容器是否已在运行
|
||
result = subprocess.run(
|
||
["docker", "ps", "-q", "-f", f"name={self._container_name}"],
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=5
|
||
)
|
||
|
||
if result.stdout.strip():
|
||
logger.info(f"后台容器已在运行: {self._container_name}")
|
||
SmokingDetectionYOLO._container_started = True
|
||
return
|
||
|
||
# 检查容器是否存在但已停止
|
||
result = subprocess.run(
|
||
["docker", "ps", "-aq", "-f", f"name={self._container_name}"],
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=5
|
||
)
|
||
|
||
if result.stdout.strip():
|
||
# 启动已存在的容器
|
||
logger.info(f"启动已存在的容器: {self._container_name}")
|
||
subprocess.run(
|
||
["docker", "start", self._container_name],
|
||
capture_output=True,
|
||
timeout=10
|
||
)
|
||
else:
|
||
# 创建新容器
|
||
logger.info(f"创建后台容器: {self._container_name}")
|
||
subprocess.run(
|
||
[
|
||
"docker", "run", "-d",
|
||
"--name", self._container_name,
|
||
"-v", "/tmp:/workspace/input",
|
||
"-v", "/tmp:/workspace/output",
|
||
self.docker_image,
|
||
"tail", "-f", "/dev/null"
|
||
],
|
||
capture_output=True,
|
||
timeout=10
|
||
)
|
||
|
||
SmokingDetectionYOLO._container_started = True
|
||
logger.info("后台容器启动成功")
|
||
|
||
except Exception as e:
|
||
logger.error(f"启动后台容器失败: {e}")
|
||
SmokingDetectionYOLO._container_started = False
|
||
|
||
def __call__(self, source, conf=0.1, iou=0.45, verbose=False, stream=False):
|
||
"""模拟 YOLO 模型的调用接口"""
|
||
if not self.available:
|
||
logger.error("Docker 不可用,无法运行检测")
|
||
return [YOLOResult([])]
|
||
|
||
# 处理不同类型的输入
|
||
if isinstance(source, str):
|
||
image = cv2.imread(source)
|
||
if image is None:
|
||
logger.error(f"无法读取图片: {source}")
|
||
return [YOLOResult([])]
|
||
return self._detect_single(image, conf, verbose)
|
||
|
||
elif isinstance(source, np.ndarray):
|
||
return self._detect_single(source, conf, verbose)
|
||
|
||
elif isinstance(source, list):
|
||
results = []
|
||
for img in source:
|
||
if isinstance(img, str):
|
||
img = cv2.imread(img)
|
||
if img is not None:
|
||
results.extend(self._detect_single(img, conf, verbose))
|
||
return results
|
||
|
||
else:
|
||
logger.error(f"不支持的输入类型: {type(source)}")
|
||
return [YOLOResult([])]
|
||
|
||
def _detect_single(self, image: np.ndarray, conf: float, verbose: bool) -> List['YOLOResult']:
|
||
"""检测单张图片(使用后台容器)"""
|
||
start_time = time.time()
|
||
|
||
try:
|
||
# 创建临时文件
|
||
input_filename = f"smoking_input_{int(time.time()*1000)}.jpg"
|
||
output_filename = f"smoking_output_{int(time.time()*1000)}.jpg"
|
||
temp_input = f"/tmp/{input_filename}"
|
||
temp_output = f"/tmp/{output_filename}"
|
||
|
||
# 保存输入图片
|
||
cv2.imwrite(temp_input, image)
|
||
|
||
if verbose:
|
||
logger.info(f"正在检测: {temp_input}")
|
||
|
||
# 使用后台容器执行检测
|
||
if SmokingDetectionYOLO._container_started:
|
||
# 使用 exec 在运行中的容器内执行
|
||
cmd = [
|
||
"docker", "exec",
|
||
self._container_name,
|
||
"python", "deploy/python/infer.py",
|
||
f"--model_dir={self.model_dir}",
|
||
f"--image_file=/workspace/input/{input_filename}",
|
||
"--device=CPU",
|
||
f"--output_dir=/workspace/output",
|
||
f"--threshold={conf}"
|
||
]
|
||
else:
|
||
# 回退到原来的方式
|
||
cmd = [
|
||
"docker", "run", "--rm",
|
||
"-v", f"{temp_input}:/workspace/input.jpg",
|
||
self.docker_image,
|
||
"python", "deploy/python/infer.py",
|
||
f"--model_dir={self.model_dir}",
|
||
"--image_file=/workspace/input.jpg",
|
||
"--device=CPU",
|
||
"--output_dir=/workspace",
|
||
f"--threshold={conf}"
|
||
]
|
||
|
||
# 执行检测
|
||
result = subprocess.run(
|
||
cmd,
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=60
|
||
)
|
||
|
||
elapsed = time.time() - start_time
|
||
if verbose:
|
||
logger.info(f"检测完成,耗时: {elapsed:.2f}秒")
|
||
|
||
# 解析结果
|
||
detections = self._parse_output(result.stdout)
|
||
|
||
# 清理临时文件
|
||
try:
|
||
os.remove(temp_input)
|
||
except:
|
||
pass
|
||
|
||
return [YOLOResult(detections)]
|
||
|
||
except subprocess.TimeoutExpired:
|
||
logger.error("检测超时")
|
||
return [YOLOResult([])]
|
||
except Exception as e:
|
||
logger.error(f"检测失败: {e}")
|
||
return [YOLOResult([])]
|
||
|
||
def _parse_output(self, output: str) -> List[Dict]:
|
||
"""解析检测输出"""
|
||
detections = []
|
||
import re
|
||
|
||
pattern = r'class_id:\d+,\s*confidence:([\d.]+),\s*left_top:\[([\d.]+),\s*([\d.]+)\],\s*right_bottom:\[([\d.]+),\s*([\d.]+)\]'
|
||
|
||
for line in output.split('\n'):
|
||
match = re.search(pattern, line)
|
||
if match:
|
||
try:
|
||
confidence = float(match.group(1))
|
||
x1 = float(match.group(2))
|
||
y1 = float(match.group(3))
|
||
x2 = float(match.group(4))
|
||
y2 = float(match.group(5))
|
||
|
||
detections.append({
|
||
'bbox': [int(x1), int(y1), int(x2), int(y2)],
|
||
'confidence': confidence,
|
||
'class': 0,
|
||
'name': 'cigarette'
|
||
})
|
||
|
||
except Exception as e:
|
||
logger.warning(f"解析检测结果失败: {e}, line: {line}")
|
||
continue
|
||
|
||
return detections
|
||
|
||
def predict(self, source, **kwargs):
|
||
"""兼容 predict 方法"""
|
||
return self.__call__(source, **kwargs)
|
||
|
||
@classmethod
|
||
def stop_background_container(cls):
|
||
"""停止后台容器"""
|
||
try:
|
||
subprocess.run(
|
||
["docker", "stop", cls._container_name],
|
||
capture_output=True,
|
||
timeout=10
|
||
)
|
||
logger.info("后台容器已停止")
|
||
except Exception as e:
|
||
logger.error(f"停止后台容器失败: {e}")
|
||
|
||
|
||
# 复制 YOLOResult, Boxes, Box 类(与原版相同)
|
||
class YOLOResult:
|
||
"""模拟 YOLO 检测结果对象"""
|
||
|
||
def __init__(self, detections: List[Dict]):
|
||
self.detections = detections
|
||
self.names = {0: 'cigarette'}
|
||
self.boxes = Boxes(detections)
|
||
self.probs = None
|
||
self.keypoints = None
|
||
self.obb = None
|
||
self.speed = {'preprocess': 0, 'inference': 0, 'postprocess': 0}
|
||
|
||
def __len__(self):
|
||
return len(self.detections)
|
||
|
||
def __getitem__(self, idx):
|
||
if idx < len(self.detections):
|
||
return YOLOResult([self.detections[idx]])
|
||
return YOLOResult([])
|
||
|
||
def plot(self, **kwargs):
|
||
return None
|
||
|
||
|
||
class Boxes:
|
||
"""模拟 YOLO boxes 对象"""
|
||
|
||
def __init__(self, detections: List[Dict]):
|
||
self.detections = detections
|
||
|
||
try:
|
||
import torch
|
||
|
||
if detections:
|
||
xyxy_list = [[d['bbox'][0], d['bbox'][1], d['bbox'][2], d['bbox'][3]] for d in detections]
|
||
conf_list = [[d['confidence']] for d in detections]
|
||
cls_list = [[d['class']] for d in detections]
|
||
|
||
self.xyxy = torch.tensor(xyxy_list, dtype=torch.float32)
|
||
self.conf = torch.tensor(conf_list, dtype=torch.float32)
|
||
self.cls = torch.tensor(cls_list, dtype=torch.int64)
|
||
self.id = None
|
||
else:
|
||
self.xyxy = torch.empty((0, 4))
|
||
self.conf = torch.empty((0, 1))
|
||
self.cls = torch.empty((0, 1), dtype=torch.int64)
|
||
self.id = None
|
||
|
||
except ImportError:
|
||
import numpy as np
|
||
|
||
if detections:
|
||
xyxy_list = [[d['bbox'][0], d['bbox'][1], d['bbox'][2], d['bbox'][3]] for d in detections]
|
||
conf_list = [[d['confidence']] for d in detections]
|
||
cls_list = [[d['class']] for d in detections]
|
||
|
||
self.xyxy = np.array(xyxy_list, dtype=np.float32)
|
||
self.conf = np.array(conf_list, dtype=np.float32)
|
||
self.cls = np.array(cls_list, dtype=np.int64)
|
||
self.id = None
|
||
else:
|
||
self.xyxy = np.empty((0, 4), dtype=np.float32)
|
||
self.conf = np.empty((0, 1), dtype=np.float32)
|
||
self.cls = np.empty((0, 1), dtype=np.int64)
|
||
self.id = None
|
||
|
||
def __len__(self):
|
||
return len(self.detections)
|
||
|
||
def __iter__(self):
|
||
for i in range(len(self.detections)):
|
||
yield Box(self, i)
|
||
|
||
def cpu(self):
|
||
return self
|
||
|
||
def numpy(self):
|
||
if hasattr(self.xyxy, 'numpy'):
|
||
return type('Boxes', (), {
|
||
'xyxy': self.xyxy.numpy(),
|
||
'conf': self.conf.numpy(),
|
||
'cls': self.cls.numpy(),
|
||
'id': self.id
|
||
})()
|
||
return self
|
||
|
||
|
||
class Box:
|
||
"""模拟单个检测框对象"""
|
||
|
||
def __init__(self, boxes: Boxes, index: int):
|
||
self._boxes = boxes
|
||
self._index = index
|
||
|
||
@property
|
||
def xyxy(self):
|
||
import torch
|
||
import numpy as np
|
||
coords = self._boxes.xyxy[self._index]
|
||
if isinstance(coords, torch.Tensor):
|
||
return coords.unsqueeze(0)
|
||
else:
|
||
return np.array([coords])
|
||
|
||
@property
|
||
def conf(self):
|
||
import torch
|
||
import numpy as np
|
||
conf_val = self._boxes.conf[self._index]
|
||
if isinstance(conf_val, torch.Tensor):
|
||
return conf_val.view(1)
|
||
else:
|
||
return np.array([conf_val])
|
||
|
||
@property
|
||
def cls(self):
|
||
import torch
|
||
import numpy as np
|
||
cls_val = self._boxes.cls[self._index]
|
||
if isinstance(cls_val, torch.Tensor):
|
||
return cls_val.view(1)
|
||
else:
|
||
return np.array([cls_val])
|