Files
jc-video-recognize/apps/server/models/smoking_yolo_adapter_v2.py
wwh 8fb58c75fe Initial commit: Video detection platform with YOLO models
Features:
- Fire detection (YOLOv10)
- Helmet detection (YOLOv8)
- Crowd detection (YOLOv8)
- Smoking detection (YOLOv8)
- Loitering detection (YOLOv8)

Tech Stack:
- Frontend: Vue 3 + Vite + Element Plus
- Backend: FastAPI + WebSocket
- Monorepo: pnpm workspace + Turbo
- Docker support included
2026-05-18 10:54:10 +08:00

380 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
YOLO 格式的抽烟检测模型适配器V2 - 常驻进程版)
使用 Docker 容器内的常驻 Python 进程,避免每次检测都启动新进程
"""
import os
import cv2
import numpy as np
import subprocess
import tempfile
import logging
import time
import json
from typing import List, Dict, Optional
from pathlib import Path
logger = logging.getLogger(__name__)
class SmokingDetectionYOLO:
"""
模拟 YOLO 接口的抽烟检测模型V2 - 常驻进程版)
使用 Docker 容器内的常驻 Python 进程
"""
_container_name = "smoking-detection-v2"
_process = None
_initialized = False
def __init__(self, model_path=None):
self.model_name = "smoking_detection"
self.docker_image = "smoking-detection:test"
self.model_dir = "output_inference/ppyoloe_crn_s_80e_smoking_visdrone"
self.threshold = 0.1
# YOLO 兼容属性
self.names = {0: 'cigarette'}
self.model = self
# 检查 Docker 并启动常驻进程
self._check_docker()
if self.available:
self._start_server()
logger.info(f"抽烟检测模型 V2 初始化完成Docker可用: {self.available}")
def _check_docker(self):
"""检查 Docker 环境"""
try:
result = subprocess.run(
["docker", "info"],
capture_output=True,
text=True,
timeout=5
)
self.available = result.returncode == 0
if self.available:
result = subprocess.run(
["docker", "image", "inspect", self.docker_image],
capture_output=True,
text=True,
timeout=5
)
self.available = result.returncode == 0
except Exception as e:
logger.error(f"Docker 检查失败: {e}")
self.available = False
def _start_server(self):
"""启动常驻服务器进程"""
try:
# 检查是否已有进程在运行
if SmokingDetectionYOLO._process is not None:
logger.info("常驻进程已在运行")
return
# 检查容器是否已存在
result = subprocess.run(
["docker", "ps", "-aq", "-f", f"name={self._container_name}"],
capture_output=True,
text=True,
timeout=5
)
if result.stdout.strip():
# 删除旧容器
logger.info("删除旧容器")
subprocess.run(
["docker", "rm", "-f", self._container_name],
capture_output=True,
timeout=10
)
# 启动新容器并运行服务器
logger.info("启动常驻服务器...")
# 获取 smoking_server.py 的绝对路径
server_script_path = "/Users/wwh/project/video-model/PaddlePaddle/PaddleDetection-release-2.9/smoking_server.py"
# 使用 Popen 保持进程运行,挂载 server 脚本
SmokingDetectionYOLO._process = subprocess.Popen(
[
"docker", "run", "-i", "--rm",
"--name", self._container_name,
"-v", "/tmp:/workspace/input",
"-v", f"{server_script_path}:/workspace/PaddleDetection/smoking_server.py",
"-w", "/workspace/PaddleDetection",
self.docker_image,
"python", "smoking_server.py"
],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1
)
# 等待服务器启动(读取模型加载完成的消息)
logger.info("等待服务器启动...")
start_wait = time.time()
while time.time() - start_wait < 30: # 最多等待30秒
if SmokingDetectionYOLO._process.poll() is not None:
# 进程已退出
stderr = SmokingDetectionYOLO._process.stderr.read()
logger.error(f"服务器启动失败: {stderr}")
SmokingDetectionYOLO._process = None
return
# 尝试读取 stderr 看是否加载完成
import select
if SmokingDetectionYOLO._process.stderr:
ready, _, _ = select.select([SmokingDetectionYOLO._process.stderr], [], [], 0.5)
if ready:
line = SmokingDetectionYOLO._process.stderr.readline()
if line:
logger.info(f"Server: {line.strip()}")
if "模型加载完成" in line:
break
time.sleep(0.1)
# 检查进程是否还在运行
if SmokingDetectionYOLO._process.poll() is None:
SmokingDetectionYOLO._initialized = True
logger.info("常驻服务器启动成功")
else:
stderr = SmokingDetectionYOLO._process.stderr.read()
logger.error(f"服务器启动失败: {stderr}")
SmokingDetectionYOLO._process = None
except Exception as e:
logger.error(f"启动常驻服务器失败: {e}")
SmokingDetectionYOLO._process = None
def __call__(self, source, conf=0.1, iou=0.45, verbose=False, stream=False):
"""模拟 YOLO 模型的调用接口"""
if not self.available:
logger.error("Docker 不可用,无法运行检测")
return [YOLOResult([])]
if not SmokingDetectionYOLO._initialized:
logger.error("常驻服务器未初始化")
return [YOLOResult([])]
# 处理不同类型的输入
if isinstance(source, str):
image = cv2.imread(source)
if image is None:
logger.error(f"无法读取图片: {source}")
return [YOLOResult([])]
return self._detect_single(image, conf, verbose)
elif isinstance(source, np.ndarray):
return self._detect_single(source, conf, verbose)
elif isinstance(source, list):
results = []
for img in source:
if isinstance(img, str):
img = cv2.imread(img)
if img is not None:
results.extend(self._detect_single(img, conf, verbose))
return results
else:
logger.error(f"不支持的输入类型: {type(source)}")
return [YOLOResult([])]
def _detect_single(self, image: np.ndarray, conf: float, verbose: bool) -> List['YOLOResult']:
"""检测单张图片(使用常驻进程)"""
start_time = time.time()
try:
# 创建临时文件
input_filename = f"smoking_v2_{int(time.time()*1000)}.jpg"
temp_input = f"/tmp/{input_filename}"
# 保存输入图片
cv2.imwrite(temp_input, image)
if verbose:
logger.info(f"正在检测: {temp_input}")
# 发送请求到常驻进程
request = {
'image_path': f'/workspace/input/{input_filename}',
'threshold': conf
}
SmokingDetectionYOLO._process.stdin.write(json.dumps(request) + '\n')
SmokingDetectionYOLO._process.stdin.flush()
# 读取响应
response_line = SmokingDetectionYOLO._process.stdout.readline()
response = json.loads(response_line)
elapsed = time.time() - start_time
if verbose:
logger.info(f"检测完成,耗时: {elapsed:.2f}")
# 解析结果
if response.get('success'):
detections = response.get('detections', [])
else:
logger.error(f"检测失败: {response.get('error')}")
detections = []
# 清理临时文件
try:
os.remove(temp_input)
except:
pass
return [YOLOResult(detections)]
except Exception as e:
logger.error(f"检测失败: {e}")
return [YOLOResult([])]
def predict(self, source, **kwargs):
"""兼容 predict 方法"""
return self.__call__(source, **kwargs)
@classmethod
def stop_server(cls):
"""停止常驻服务器"""
if cls._process is not None:
cls._process.terminate()
cls._process.wait()
cls._process = None
cls._initialized = False
logger.info("常驻服务器已停止")
# YOLOResult, Boxes, Box 类(与之前相同)
class YOLOResult:
"""模拟 YOLO 检测结果对象"""
def __init__(self, detections: List[Dict]):
self.detections = detections
self.names = {0: 'cigarette'}
self.boxes = Boxes(detections)
self.probs = None
self.keypoints = None
self.obb = None
self.speed = {'preprocess': 0, 'inference': 0, 'postprocess': 0}
def __len__(self):
return len(self.detections)
def __getitem__(self, idx):
if idx < len(self.detections):
return YOLOResult([self.detections[idx]])
return YOLOResult([])
def plot(self, **kwargs):
return None
class Boxes:
"""模拟 YOLO boxes 对象"""
def __init__(self, detections: List[Dict]):
self.detections = detections
try:
import torch
if detections:
xyxy_list = [[d['bbox'][0], d['bbox'][1], d['bbox'][2], d['bbox'][3]] for d in detections]
conf_list = [[d['confidence']] for d in detections]
cls_list = [[d['class']] for d in detections]
self.xyxy = torch.tensor(xyxy_list, dtype=torch.float32)
self.conf = torch.tensor(conf_list, dtype=torch.float32)
self.cls = torch.tensor(cls_list, dtype=torch.int64)
self.id = None
else:
self.xyxy = torch.empty((0, 4))
self.conf = torch.empty((0, 1))
self.cls = torch.empty((0, 1), dtype=torch.int64)
self.id = None
except ImportError:
import numpy as np
if detections:
xyxy_list = [[d['bbox'][0], d['bbox'][1], d['bbox'][2], d['bbox'][3]] for d in detections]
conf_list = [[d['confidence']] for d in detections]
cls_list = [[d['class']] for d in detections]
self.xyxy = np.array(xyxy_list, dtype=np.float32)
self.conf = np.array(conf_list, dtype=np.float32)
self.cls = np.array(cls_list, dtype=np.int64)
self.id = None
else:
self.xyxy = np.empty((0, 4), dtype=np.float32)
self.conf = np.empty((0, 1), dtype=np.float32)
self.cls = np.empty((0, 1), dtype=np.int64)
self.id = None
def __len__(self):
return len(self.detections)
def __iter__(self):
for i in range(len(self.detections)):
yield Box(self, i)
def cpu(self):
return self
def numpy(self):
if hasattr(self.xyxy, 'numpy'):
return type('Boxes', (), {
'xyxy': self.xyxy.numpy(),
'conf': self.conf.numpy(),
'cls': self.cls.numpy(),
'id': self.id
})()
return self
class Box:
"""模拟单个检测框对象"""
def __init__(self, boxes: Boxes, index: int):
self._boxes = boxes
self._index = index
@property
def xyxy(self):
import torch
import numpy as np
coords = self._boxes.xyxy[self._index]
if isinstance(coords, torch.Tensor):
return coords.unsqueeze(0)
else:
return np.array([coords])
@property
def conf(self):
import torch
import numpy as np
conf_val = self._boxes.conf[self._index]
if isinstance(conf_val, torch.Tensor):
return conf_val.view(1)
else:
return np.array([conf_val])
@property
def cls(self):
import torch
import numpy as np
cls_val = self._boxes.cls[self._index]
if isinstance(cls_val, torch.Tensor):
return cls_val.view(1)
else:
return np.array([cls_val])