""" YOLO 格式的抽烟检测模型适配器(优化版) 使用后台 Docker 容器,避免重复启动 """ import os import cv2 import numpy as np import subprocess import tempfile import logging import time from typing import List, Dict, Optional from pathlib import Path logger = logging.getLogger(__name__) class SmokingDetectionYOLO: """ 模拟 YOLO 接口的抽烟检测模型(优化版) 使用后台 Docker 容器,避免每次检测都启动新容器 """ _container_name = "smoking-detection-server" _container_started = False def __init__(self, model_path=None): self.model_name = "smoking_detection" self.docker_image = "smoking-detection:test" self.model_dir = "output_inference/ppyoloe_crn_s_80e_smoking_visdrone" self.threshold = 0.1 # YOLO 兼容属性 self.names = {0: 'cigarette'} self.model = self # 检查 Docker 并启动后台容器 self._check_docker() if self.available: self._start_background_container() logger.info(f"抽烟检测模型初始化完成,Docker可用: {self.available}") def _check_docker(self): """检查 Docker 环境""" try: result = subprocess.run( ["docker", "info"], capture_output=True, text=True, timeout=5 ) self.available = result.returncode == 0 if self.available: result = subprocess.run( ["docker", "image", "inspect", self.docker_image], capture_output=True, text=True, timeout=5 ) self.available = result.returncode == 0 except Exception as e: logger.error(f"Docker 检查失败: {e}") self.available = False def _start_background_container(self): """启动后台容器""" try: # 检查容器是否已在运行 result = subprocess.run( ["docker", "ps", "-q", "-f", f"name={self._container_name}"], capture_output=True, text=True, timeout=5 ) if result.stdout.strip(): logger.info(f"后台容器已在运行: {self._container_name}") SmokingDetectionYOLO._container_started = True return # 检查容器是否存在但已停止 result = subprocess.run( ["docker", "ps", "-aq", "-f", f"name={self._container_name}"], capture_output=True, text=True, timeout=5 ) if result.stdout.strip(): # 启动已存在的容器 logger.info(f"启动已存在的容器: {self._container_name}") subprocess.run( ["docker", "start", self._container_name], capture_output=True, timeout=10 ) else: # 创建新容器 logger.info(f"创建后台容器: {self._container_name}") subprocess.run( [ "docker", "run", "-d", "--name", self._container_name, "-v", "/tmp:/workspace/input", "-v", "/tmp:/workspace/output", self.docker_image, "tail", "-f", "/dev/null" ], capture_output=True, timeout=10 ) SmokingDetectionYOLO._container_started = True logger.info("后台容器启动成功") except Exception as e: logger.error(f"启动后台容器失败: {e}") SmokingDetectionYOLO._container_started = False def __call__(self, source, conf=0.1, iou=0.45, verbose=False, stream=False): """模拟 YOLO 模型的调用接口""" if not self.available: logger.error("Docker 不可用,无法运行检测") return [YOLOResult([])] # 处理不同类型的输入 if isinstance(source, str): image = cv2.imread(source) if image is None: logger.error(f"无法读取图片: {source}") return [YOLOResult([])] return self._detect_single(image, conf, verbose) elif isinstance(source, np.ndarray): return self._detect_single(source, conf, verbose) elif isinstance(source, list): results = [] for img in source: if isinstance(img, str): img = cv2.imread(img) if img is not None: results.extend(self._detect_single(img, conf, verbose)) return results else: logger.error(f"不支持的输入类型: {type(source)}") return [YOLOResult([])] def _detect_single(self, image: np.ndarray, conf: float, verbose: bool) -> List['YOLOResult']: """检测单张图片(使用后台容器)""" start_time = time.time() try: # 创建临时文件 input_filename = f"smoking_input_{int(time.time()*1000)}.jpg" output_filename = f"smoking_output_{int(time.time()*1000)}.jpg" temp_input = f"/tmp/{input_filename}" temp_output = f"/tmp/{output_filename}" # 保存输入图片 cv2.imwrite(temp_input, image) if verbose: logger.info(f"正在检测: {temp_input}") # 使用后台容器执行检测 if SmokingDetectionYOLO._container_started: # 使用 exec 在运行中的容器内执行 cmd = [ "docker", "exec", self._container_name, "python", "deploy/python/infer.py", f"--model_dir={self.model_dir}", f"--image_file=/workspace/input/{input_filename}", "--device=CPU", f"--output_dir=/workspace/output", f"--threshold={conf}" ] else: # 回退到原来的方式 cmd = [ "docker", "run", "--rm", "-v", f"{temp_input}:/workspace/input.jpg", self.docker_image, "python", "deploy/python/infer.py", f"--model_dir={self.model_dir}", "--image_file=/workspace/input.jpg", "--device=CPU", "--output_dir=/workspace", f"--threshold={conf}" ] # 执行检测 result = subprocess.run( cmd, capture_output=True, text=True, timeout=60 ) elapsed = time.time() - start_time if verbose: logger.info(f"检测完成,耗时: {elapsed:.2f}秒") # 解析结果 detections = self._parse_output(result.stdout) # 清理临时文件 try: os.remove(temp_input) except: pass return [YOLOResult(detections)] except subprocess.TimeoutExpired: logger.error("检测超时") return [YOLOResult([])] except Exception as e: logger.error(f"检测失败: {e}") return [YOLOResult([])] def _parse_output(self, output: str) -> List[Dict]: """解析检测输出""" detections = [] import re pattern = r'class_id:\d+,\s*confidence:([\d.]+),\s*left_top:\[([\d.]+),\s*([\d.]+)\],\s*right_bottom:\[([\d.]+),\s*([\d.]+)\]' for line in output.split('\n'): match = re.search(pattern, line) if match: try: confidence = float(match.group(1)) x1 = float(match.group(2)) y1 = float(match.group(3)) x2 = float(match.group(4)) y2 = float(match.group(5)) detections.append({ 'bbox': [int(x1), int(y1), int(x2), int(y2)], 'confidence': confidence, 'class': 0, 'name': 'cigarette' }) except Exception as e: logger.warning(f"解析检测结果失败: {e}, line: {line}") continue return detections def predict(self, source, **kwargs): """兼容 predict 方法""" return self.__call__(source, **kwargs) @classmethod def stop_background_container(cls): """停止后台容器""" try: subprocess.run( ["docker", "stop", cls._container_name], capture_output=True, timeout=10 ) logger.info("后台容器已停止") except Exception as e: logger.error(f"停止后台容器失败: {e}") # 复制 YOLOResult, Boxes, Box 类(与原版相同) class YOLOResult: """模拟 YOLO 检测结果对象""" def __init__(self, detections: List[Dict]): self.detections = detections self.names = {0: 'cigarette'} self.boxes = Boxes(detections) self.probs = None self.keypoints = None self.obb = None self.speed = {'preprocess': 0, 'inference': 0, 'postprocess': 0} def __len__(self): return len(self.detections) def __getitem__(self, idx): if idx < len(self.detections): return YOLOResult([self.detections[idx]]) return YOLOResult([]) def plot(self, **kwargs): return None class Boxes: """模拟 YOLO boxes 对象""" def __init__(self, detections: List[Dict]): self.detections = detections try: import torch if detections: xyxy_list = [[d['bbox'][0], d['bbox'][1], d['bbox'][2], d['bbox'][3]] for d in detections] conf_list = [[d['confidence']] for d in detections] cls_list = [[d['class']] for d in detections] self.xyxy = torch.tensor(xyxy_list, dtype=torch.float32) self.conf = torch.tensor(conf_list, dtype=torch.float32) self.cls = torch.tensor(cls_list, dtype=torch.int64) self.id = None else: self.xyxy = torch.empty((0, 4)) self.conf = torch.empty((0, 1)) self.cls = torch.empty((0, 1), dtype=torch.int64) self.id = None except ImportError: import numpy as np if detections: xyxy_list = [[d['bbox'][0], d['bbox'][1], d['bbox'][2], d['bbox'][3]] for d in detections] conf_list = [[d['confidence']] for d in detections] cls_list = [[d['class']] for d in detections] self.xyxy = np.array(xyxy_list, dtype=np.float32) self.conf = np.array(conf_list, dtype=np.float32) self.cls = np.array(cls_list, dtype=np.int64) self.id = None else: self.xyxy = np.empty((0, 4), dtype=np.float32) self.conf = np.empty((0, 1), dtype=np.float32) self.cls = np.empty((0, 1), dtype=np.int64) self.id = None def __len__(self): return len(self.detections) def __iter__(self): for i in range(len(self.detections)): yield Box(self, i) def cpu(self): return self def numpy(self): if hasattr(self.xyxy, 'numpy'): return type('Boxes', (), { 'xyxy': self.xyxy.numpy(), 'conf': self.conf.numpy(), 'cls': self.cls.numpy(), 'id': self.id })() return self class Box: """模拟单个检测框对象""" def __init__(self, boxes: Boxes, index: int): self._boxes = boxes self._index = index @property def xyxy(self): import torch import numpy as np coords = self._boxes.xyxy[self._index] if isinstance(coords, torch.Tensor): return coords.unsqueeze(0) else: return np.array([coords]) @property def conf(self): import torch import numpy as np conf_val = self._boxes.conf[self._index] if isinstance(conf_val, torch.Tensor): return conf_val.view(1) else: return np.array([conf_val]) @property def cls(self): import torch import numpy as np cls_val = self._boxes.cls[self._index] if isinstance(cls_val, torch.Tensor): return cls_val.view(1) else: return np.array([cls_val])