""" YOLO 格式的抽烟检测模型适配器(V2 - 常驻进程版) 使用 Docker 容器内的常驻 Python 进程,避免每次检测都启动新进程 """ import os import cv2 import numpy as np import subprocess import tempfile import logging import time import json from typing import List, Dict, Optional from pathlib import Path logger = logging.getLogger(__name__) class SmokingDetectionYOLO: """ 模拟 YOLO 接口的抽烟检测模型(V2 - 常驻进程版) 使用 Docker 容器内的常驻 Python 进程 """ _container_name = "smoking-detection-v2" _process = None _initialized = False def __init__(self, model_path=None): self.model_name = "smoking_detection" self.docker_image = "smoking-detection:test" self.model_dir = "output_inference/ppyoloe_crn_s_80e_smoking_visdrone" self.threshold = 0.1 # YOLO 兼容属性 self.names = {0: 'cigarette'} self.model = self # 检查 Docker 并启动常驻进程 self._check_docker() if self.available: self._start_server() logger.info(f"抽烟检测模型 V2 初始化完成,Docker可用: {self.available}") def _check_docker(self): """检查 Docker 环境""" try: result = subprocess.run( ["docker", "info"], capture_output=True, text=True, timeout=5 ) self.available = result.returncode == 0 if self.available: result = subprocess.run( ["docker", "image", "inspect", self.docker_image], capture_output=True, text=True, timeout=5 ) self.available = result.returncode == 0 except Exception as e: logger.error(f"Docker 检查失败: {e}") self.available = False def _start_server(self): """启动常驻服务器进程""" try: # 检查是否已有进程在运行 if SmokingDetectionYOLO._process is not None: logger.info("常驻进程已在运行") return # 检查容器是否已存在 result = subprocess.run( ["docker", "ps", "-aq", "-f", f"name={self._container_name}"], capture_output=True, text=True, timeout=5 ) if result.stdout.strip(): # 删除旧容器 logger.info("删除旧容器") subprocess.run( ["docker", "rm", "-f", self._container_name], capture_output=True, timeout=10 ) # 启动新容器并运行服务器 logger.info("启动常驻服务器...") # 获取 smoking_server.py 的绝对路径 server_script_path = "/Users/wwh/project/video-model/PaddlePaddle/PaddleDetection-release-2.9/smoking_server.py" # 使用 Popen 保持进程运行,挂载 server 脚本 SmokingDetectionYOLO._process = subprocess.Popen( [ "docker", "run", "-i", "--rm", "--name", self._container_name, "-v", "/tmp:/workspace/input", "-v", f"{server_script_path}:/workspace/PaddleDetection/smoking_server.py", "-w", "/workspace/PaddleDetection", self.docker_image, "python", "smoking_server.py" ], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1 ) # 等待服务器启动(读取模型加载完成的消息) logger.info("等待服务器启动...") start_wait = time.time() while time.time() - start_wait < 30: # 最多等待30秒 if SmokingDetectionYOLO._process.poll() is not None: # 进程已退出 stderr = SmokingDetectionYOLO._process.stderr.read() logger.error(f"服务器启动失败: {stderr}") SmokingDetectionYOLO._process = None return # 尝试读取 stderr 看是否加载完成 import select if SmokingDetectionYOLO._process.stderr: ready, _, _ = select.select([SmokingDetectionYOLO._process.stderr], [], [], 0.5) if ready: line = SmokingDetectionYOLO._process.stderr.readline() if line: logger.info(f"Server: {line.strip()}") if "模型加载完成" in line: break time.sleep(0.1) # 检查进程是否还在运行 if SmokingDetectionYOLO._process.poll() is None: SmokingDetectionYOLO._initialized = True logger.info("常驻服务器启动成功") else: stderr = SmokingDetectionYOLO._process.stderr.read() logger.error(f"服务器启动失败: {stderr}") SmokingDetectionYOLO._process = None except Exception as e: logger.error(f"启动常驻服务器失败: {e}") SmokingDetectionYOLO._process = None def __call__(self, source, conf=0.1, iou=0.45, verbose=False, stream=False): """模拟 YOLO 模型的调用接口""" if not self.available: logger.error("Docker 不可用,无法运行检测") return [YOLOResult([])] if not SmokingDetectionYOLO._initialized: logger.error("常驻服务器未初始化") return [YOLOResult([])] # 处理不同类型的输入 if isinstance(source, str): image = cv2.imread(source) if image is None: logger.error(f"无法读取图片: {source}") return [YOLOResult([])] return self._detect_single(image, conf, verbose) elif isinstance(source, np.ndarray): return self._detect_single(source, conf, verbose) elif isinstance(source, list): results = [] for img in source: if isinstance(img, str): img = cv2.imread(img) if img is not None: results.extend(self._detect_single(img, conf, verbose)) return results else: logger.error(f"不支持的输入类型: {type(source)}") return [YOLOResult([])] def _detect_single(self, image: np.ndarray, conf: float, verbose: bool) -> List['YOLOResult']: """检测单张图片(使用常驻进程)""" start_time = time.time() try: # 创建临时文件 input_filename = f"smoking_v2_{int(time.time()*1000)}.jpg" temp_input = f"/tmp/{input_filename}" # 保存输入图片 cv2.imwrite(temp_input, image) if verbose: logger.info(f"正在检测: {temp_input}") # 发送请求到常驻进程 request = { 'image_path': f'/workspace/input/{input_filename}', 'threshold': conf } SmokingDetectionYOLO._process.stdin.write(json.dumps(request) + '\n') SmokingDetectionYOLO._process.stdin.flush() # 读取响应 response_line = SmokingDetectionYOLO._process.stdout.readline() response = json.loads(response_line) elapsed = time.time() - start_time if verbose: logger.info(f"检测完成,耗时: {elapsed:.2f}秒") # 解析结果 if response.get('success'): detections = response.get('detections', []) else: logger.error(f"检测失败: {response.get('error')}") detections = [] # 清理临时文件 try: os.remove(temp_input) except: pass return [YOLOResult(detections)] except Exception as e: logger.error(f"检测失败: {e}") return [YOLOResult([])] def predict(self, source, **kwargs): """兼容 predict 方法""" return self.__call__(source, **kwargs) @classmethod def stop_server(cls): """停止常驻服务器""" if cls._process is not None: cls._process.terminate() cls._process.wait() cls._process = None cls._initialized = False logger.info("常驻服务器已停止") # YOLOResult, Boxes, Box 类(与之前相同) class YOLOResult: """模拟 YOLO 检测结果对象""" def __init__(self, detections: List[Dict]): self.detections = detections self.names = {0: 'cigarette'} self.boxes = Boxes(detections) self.probs = None self.keypoints = None self.obb = None self.speed = {'preprocess': 0, 'inference': 0, 'postprocess': 0} def __len__(self): return len(self.detections) def __getitem__(self, idx): if idx < len(self.detections): return YOLOResult([self.detections[idx]]) return YOLOResult([]) def plot(self, **kwargs): return None class Boxes: """模拟 YOLO boxes 对象""" def __init__(self, detections: List[Dict]): self.detections = detections try: import torch if detections: xyxy_list = [[d['bbox'][0], d['bbox'][1], d['bbox'][2], d['bbox'][3]] for d in detections] conf_list = [[d['confidence']] for d in detections] cls_list = [[d['class']] for d in detections] self.xyxy = torch.tensor(xyxy_list, dtype=torch.float32) self.conf = torch.tensor(conf_list, dtype=torch.float32) self.cls = torch.tensor(cls_list, dtype=torch.int64) self.id = None else: self.xyxy = torch.empty((0, 4)) self.conf = torch.empty((0, 1)) self.cls = torch.empty((0, 1), dtype=torch.int64) self.id = None except ImportError: import numpy as np if detections: xyxy_list = [[d['bbox'][0], d['bbox'][1], d['bbox'][2], d['bbox'][3]] for d in detections] conf_list = [[d['confidence']] for d in detections] cls_list = [[d['class']] for d in detections] self.xyxy = np.array(xyxy_list, dtype=np.float32) self.conf = np.array(conf_list, dtype=np.float32) self.cls = np.array(cls_list, dtype=np.int64) self.id = None else: self.xyxy = np.empty((0, 4), dtype=np.float32) self.conf = np.empty((0, 1), dtype=np.float32) self.cls = np.empty((0, 1), dtype=np.int64) self.id = None def __len__(self): return len(self.detections) def __iter__(self): for i in range(len(self.detections)): yield Box(self, i) def cpu(self): return self def numpy(self): if hasattr(self.xyxy, 'numpy'): return type('Boxes', (), { 'xyxy': self.xyxy.numpy(), 'conf': self.conf.numpy(), 'cls': self.cls.numpy(), 'id': self.id })() return self class Box: """模拟单个检测框对象""" def __init__(self, boxes: Boxes, index: int): self._boxes = boxes self._index = index @property def xyxy(self): import torch import numpy as np coords = self._boxes.xyxy[self._index] if isinstance(coords, torch.Tensor): return coords.unsqueeze(0) else: return np.array([coords]) @property def conf(self): import torch import numpy as np conf_val = self._boxes.conf[self._index] if isinstance(conf_val, torch.Tensor): return conf_val.view(1) else: return np.array([conf_val]) @property def cls(self): import torch import numpy as np cls_val = self._boxes.cls[self._index] if isinstance(cls_val, torch.Tensor): return cls_val.view(1) else: return np.array([cls_val])