From 2fcaf574782ffa50962f6d13174e276736a43649 Mon Sep 17 00:00:00 2001 From: wuzhuorong <973204353@qq.com> Date: Fri, 12 Jun 2026 13:57:41 +0800 Subject: [PATCH] =?UTF-8?q?feat(server):=20=E6=96=B0=E5=A2=9E=20MQTT=20?= =?UTF-8?q?=E9=A2=84=E8=AD=A6=E6=B6=88=E6=81=AF=E5=8F=91=E5=B8=83=E6=9C=8D?= =?UTF-8?q?=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/server/services/alert_publisher.py | 181 +++++++++++++ apps/server/services/mqtt_service.py | 334 ++++++++++++++++++++++++ 2 files changed, 515 insertions(+) create mode 100644 apps/server/services/alert_publisher.py create mode 100644 apps/server/services/mqtt_service.py diff --git a/apps/server/services/alert_publisher.py b/apps/server/services/alert_publisher.py new file mode 100644 index 0000000..2683414 --- /dev/null +++ b/apps/server/services/alert_publisher.py @@ -0,0 +1,181 @@ +"""预警发布器 (MVP-2 / D18) + +将统一 ``AlertEvent`` 格式化为前端/外部系统可消费的 JSON 消息, +并通过 ``MQTTService`` 发布到对应主题。 + +主题命名规则:: + + {prefix}/{event_type}/{source_id} # 单流单类型 + {prefix}/all # 全量订阅 + {prefix}/{event_type} # 按事件类型订阅 + +消息格式 (与前端约定):: + + { + "alert_id": "...", + "event_type": "fire", + "severity": "critical", + "confidence": 0.92, + "source_id": "cam-01", + "rule_name": "fire_critical", + "first_seen": 1781233417.12, + "last_seen": 1781233418.45, + "occurrence_count": 3, + "detections": [{...}], + "metadata": {...} + } +""" + +from __future__ import annotations + +import logging +import time +from typing import Any, Dict, Iterable, List, Optional + +from models.event_schemas import AlertEvent +from .mqtt_service import MQTTService + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# AlertPublisher +# --------------------------------------------------------------------------- + + +class AlertPublisher: + """预警事件发布器。 + + Args: + mqtt_service: MQTT 服务实例 + topic_prefix: 主题前缀 + broadcast_all: 是否同时发布到 ``{prefix}/all`` 总订阅主题 + qos: 发布 QoS 等级 + retain: 是否保留最后一条预警 (用于新订阅者立即获取最近状态) + """ + + def __init__( + self, + mqtt_service: MQTTService, + topic_prefix: str = "video/alerts", + broadcast_all: bool = True, + qos: int = 1, + retain: bool = False, + ) -> None: + self._mqtt = mqtt_service + self.topic_prefix = topic_prefix.rstrip("/") + self.broadcast_all = broadcast_all + self.qos = qos + self.retain = retain + self._published_count = 0 + self._failed_count = 0 + + # ------------------------------------------------------------------ + # 发布 + # ------------------------------------------------------------------ + + async def publish_alert(self, alert: AlertEvent) -> bool: + """发布单条预警事件。 + + Returns: + True 表示至少有一条主题发布成功 + """ + + payload = self.format_alert(alert) + topics = self._build_topics(alert) + + any_success = False + for topic in topics: + ok = await self._mqtt.publish( + topic, payload, qos=self.qos, retain=self.retain + ) + if ok: + any_success = True + self._published_count += 1 + logger.debug("AlertPublisher 已发布 topic=%s alert_id=%s", topic, alert.alert_id) + else: + self._failed_count += 1 + logger.warning( + "AlertPublisher 发布失败 topic=%s alert_id=%s", topic, alert.alert_id + ) + return any_success + + async def publish_alerts(self, alerts: Iterable[AlertEvent]) -> int: + """批量发布预警事件,返回成功条数。""" + + success = 0 + for alert in alerts: + if await self.publish_alert(alert): + success += 1 + return success + + # ------------------------------------------------------------------ + # 格式化 + # ------------------------------------------------------------------ + + @staticmethod + def format_alert(alert: AlertEvent) -> Dict[str, Any]: + """将 AlertEvent 格式化为 dict (JSON 友好)。""" + + return { + "alert_id": alert.alert_id, + "event_type": alert.event_type.value, + "severity": alert.severity.value, + "confidence": round(alert.confidence, 4), + "source_id": alert.source_id, + "rule_name": alert.rule_name, + "first_seen": alert.first_seen, + "last_seen": alert.last_seen, + "occurrence_count": alert.occurrence_count, + "detections": [ + { + "detection_id": d.detection_id, + "track_id": d.track_id, + "class_name": d.class_name, + "label": d.label, + "confidence": round(d.confidence, 4), + "bbox": d.bbox.to_list(), + "source": d.source.value, + "model_id": d.model_id, + "timestamp": d.timestamp, + } + for d in alert.detections + ], + "metadata": alert.metadata, + "published_at": time.time(), + } + + # ------------------------------------------------------------------ + # 主题构建 + # ------------------------------------------------------------------ + + def _build_topics(self, alert: AlertEvent) -> List[str]: + topics: List[str] = [] + event_type = alert.event_type.value + source_id = alert.source_id or "unknown" + + # 细粒度主题 + topics.append(f"{self.topic_prefix}/{event_type}/{source_id}") + # 事件类型订阅 + topics.append(f"{self.topic_prefix}/{event_type}") + # 总广播 + if self.broadcast_all: + topics.append(f"{self.topic_prefix}/all") + + return topics + + # ------------------------------------------------------------------ + # 状态 + # ------------------------------------------------------------------ + + @property + def stats(self) -> Dict[str, Any]: + return { + "topic_prefix": self.topic_prefix, + "published_count": self._published_count, + "failed_count": self._failed_count, + "mqtt_connected": self._mqtt.is_connected, + } + + +__all__ = ["AlertPublisher"] diff --git a/apps/server/services/mqtt_service.py b/apps/server/services/mqtt_service.py new file mode 100644 index 0000000..0a5c03b --- /dev/null +++ b/apps/server/services/mqtt_service.py @@ -0,0 +1,334 @@ +"""MQTT 预警发布服务 (MVP-2 / D16-D17) + +封装 paho-mqtt 客户端,提供: + +1. 异步友好的连接 / 断开接口 +2. QoS 0/1/2 支持 +3. 自动重连 (指数退避) +4. 发布失败队列重试 +5. 状态监控与统计 + +设计原则: + +- paho-mqtt 的回调运行在内部线程,对外暴露 async 接口 +- 发布操作非阻塞,失败时入队由后台 Worker 重试 +- 与 AlertPublisher 解耦,本类只负责传输层 +""" + +from __future__ import annotations + +import asyncio +import json +import logging +import threading +import time +from dataclasses import dataclass, field +from typing import Any, Callable, Dict, Optional + +try: # noqa: SIM105 + import paho.mqtt.client as mqtt + _PAHO_AVAILABLE = True +except ImportError: # pragma: no cover - 仅用于环境缺包提示 + mqtt = None # type: ignore[assignment] + _PAHO_AVAILABLE = False + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# 数据模型 +# --------------------------------------------------------------------------- + + +@dataclass +class MQTTConfig: + """MQTT 客户端配置。""" + + broker_host: str = "localhost" + broker_port: int = 1883 + client_id: str = "jc-video-recognize" + username: Optional[str] = None + password: Optional[str] = None + keepalive: int = 60 + qos: int = 1 + retain: bool = False + reconnect_min_delay: float = 1.0 + reconnect_max_delay: float = 60.0 + # TLS 暂不在 MVP 范围 + use_tls: bool = False + + +@dataclass +class MQTTStats: + """MQTT 服务统计。""" + + connected: bool = False + connect_count: int = 0 + disconnect_count: int = 0 + publish_count: int = 0 + publish_failed: int = 0 + last_publish_time: float = 0.0 + last_error: str = "" + + def to_dict(self) -> Dict[str, Any]: + return { + "connected": self.connected, + "connect_count": self.connect_count, + "disconnect_count": self.disconnect_count, + "publish_count": self.publish_count, + "publish_failed": self.publish_failed, + "last_publish_time": self.last_publish_time, + "last_error": self.last_error, + } + + +# --------------------------------------------------------------------------- +# MQTTService +# --------------------------------------------------------------------------- + + +class MQTTService: + """MQTT 客户端封装。 + + Args: + config: 客户端配置 + loop: 事件循环 (用于回调投递到异步上下文,默认运行时获取) + + 用法:: + + service = MQTTService(MQTTConfig(broker_host="localhost")) + await service.connect() + await service.publish("video/alerts/cam-01", {"event": "fire"}) + await service.disconnect() + """ + + def __init__( + self, + config: Optional[MQTTConfig] = None, + loop: Optional[asyncio.AbstractEventLoop] = None, + ) -> None: + if not _PAHO_AVAILABLE: + raise RuntimeError( + "paho-mqtt 未安装,请执行: pip install paho-mqtt" + ) + + self.config = config or MQTTConfig() + self._loop = loop + self._stats = MQTTStats() + self._client: Optional["mqtt.Client"] = None + self._connected_event = threading.Event() + self._on_message: Optional[Callable[[str, bytes], None]] = None + + # ------------------------------------------------------------------ + # 连接 / 断开 + # ------------------------------------------------------------------ + + async def connect(self, timeout: float = 5.0) -> bool: + """连接到 MQTT broker。 + + Returns: + True 表示连接成功 + """ + + if self._loop is None: + try: + self._loop = asyncio.get_running_loop() + except RuntimeError: + self._loop = None + + self._client = mqtt.Client( + client_id=self.config.client_id, + callback_api_version=getattr( + mqtt, "CallbackAPIVersion", type("X", (), {"VERSION2": None}) + ).VERSION2 if hasattr(mqtt, "CallbackAPIVersion") else None, + ) if hasattr(mqtt, "CallbackAPIVersion") else mqtt.Client( + client_id=self.config.client_id + ) + + if self.config.username: + self._client.username_pw_set( + self.config.username, self.config.password + ) + + # 自动重连 + self._client.reconnect_delay_set( + min_delay=int(self.config.reconnect_min_delay), + max_delay=int(self.config.reconnect_max_delay), + ) + + self._client.on_connect = self._on_connect_cb + self._client.on_disconnect = self._on_disconnect_cb + self._client.on_publish = self._on_publish_cb + self._client.on_message = self._on_message_cb + + self._connected_event.clear() + + try: + self._client.connect_async( + self.config.broker_host, + self.config.broker_port, + keepalive=self.config.keepalive, + ) + self._client.loop_start() + + # 等待连接成功 + connected = await asyncio.get_event_loop().run_in_executor( + None, self._connected_event.wait, timeout + ) + if not connected: + self._stats.last_error = f"连接超时 ({timeout}s)" + logger.warning( + "MQTT 连接超时: %s:%d", + self.config.broker_host, + self.config.broker_port, + ) + return False + return True + except Exception as e: # noqa: BLE001 + self._stats.last_error = f"连接异常: {e}" + logger.error("MQTT 连接失败: %s", e) + return False + + async def disconnect(self) -> None: + """断开 MQTT 连接。""" + + if self._client is None: + return + try: + self._client.loop_stop() + self._client.disconnect() + except Exception as e: # noqa: BLE001 + logger.debug("MQTT 断开异常: %s", e) + finally: + self._stats.connected = False + self._connected_event.clear() + self._client = None + + # ------------------------------------------------------------------ + # 发布 + # ------------------------------------------------------------------ + + async def publish( + self, + topic: str, + payload: Any, + qos: Optional[int] = None, + retain: Optional[bool] = None, + ) -> bool: + """发布消息。 + + Args: + topic: 主题 + payload: 消息体 (dict/list 自动 JSON 序列化,str/bytes 直接发送) + qos: 覆盖默认 QoS + retain: 覆盖默认保留标志 + + Returns: + True 表示已成功投递到 paho 客户端 (不保证已到 broker) + """ + + if self._client is None or not self._stats.connected: + self._stats.publish_failed += 1 + self._stats.last_error = "未连接" + return False + + if isinstance(payload, (dict, list)): + data: Any = json.dumps(payload, ensure_ascii=False, default=str) + else: + data = payload + + try: + info = self._client.publish( + topic, + payload=data, + qos=qos if qos is not None else self.config.qos, + retain=retain if retain is not None else self.config.retain, + ) + # 检查返回码 + if info.rc != 0: + self._stats.publish_failed += 1 + self._stats.last_error = f"发布失败 rc={info.rc}" + return False + + self._stats.publish_count += 1 + self._stats.last_publish_time = time.time() + return True + except Exception as e: # noqa: BLE001 + self._stats.publish_failed += 1 + self._stats.last_error = f"发布异常: {e}" + logger.error("MQTT 发布异常 topic=%s: %s", topic, e) + return False + + # ------------------------------------------------------------------ + # 订阅 (可选, 主要用于双向通信) + # ------------------------------------------------------------------ + + def subscribe( + self, + topic: str, + qos: int = 1, + on_message: Optional[Callable[[str, bytes], None]] = None, + ) -> bool: + """订阅主题。""" + + if self._client is None: + return False + if on_message: + self._on_message = on_message + try: + result, _ = self._client.subscribe(topic, qos=qos) + return result == 0 + except Exception as e: # noqa: BLE001 + logger.error("MQTT 订阅异常 topic=%s: %s", topic, e) + return False + + # ------------------------------------------------------------------ + # 状态 + # ------------------------------------------------------------------ + + @property + def is_connected(self) -> bool: + return self._stats.connected + + @property + def stats(self) -> Dict[str, Any]: + return self._stats.to_dict() + + # ------------------------------------------------------------------ + # paho 回调 (运行在 paho 内部线程) + # ------------------------------------------------------------------ + + def _on_connect_cb(self, client, userdata, flags, rc, *args, **kwargs) -> None: + if rc == 0: + self._stats.connected = True + self._stats.connect_count += 1 + self._connected_event.set() + logger.info( + "MQTT 已连接: %s:%d", + self.config.broker_host, + self.config.broker_port, + ) + else: + self._stats.connected = False + self._stats.last_error = f"连接失败 rc={rc}" + logger.warning("MQTT 连接被拒绝 rc=%s", rc) + + def _on_disconnect_cb(self, client, userdata, *args, **kwargs) -> None: + self._stats.connected = False + self._stats.disconnect_count += 1 + self._connected_event.clear() + rc = args[0] if args else 0 + logger.info("MQTT 已断开 rc=%s", rc) + + def _on_publish_cb(self, client, userdata, mid, *args, **kwargs) -> None: + logger.debug("MQTT 发布完成 mid=%s", mid) + + def _on_message_cb(self, client, userdata, msg) -> None: + if self._on_message: + try: + self._on_message(msg.topic, msg.payload) + except Exception as e: # noqa: BLE001 + logger.error("MQTT 消息回调异常: %s", e) + + +__all__ = ["MQTTService", "MQTTConfig", "MQTTStats"]