Files
jc-video-recognize/apps/server/services/camera_service.py
wwh 7aa71c5f83 feat: 新增人员徘徊/静止行为分析功能
本次提交实现了完整的人员行为分析系统,包括:
1. 新增基于位置和跟踪ID的两种行为检测算法
2. 新增徘徊检测服务与行为处理器模块
3. 前后端集成算法配置界面与告警展示
4. 支持图片和视频流场景下的行为分析
5. 新增算法配置接口与文档说明

具体改动:
- 新增loitering_detection模型目录与算法实现
- 新增AlgorithmConfig组件实现可视化配置
- 扩展图片/视频检测接口支持算法参数传递
- 新增行为告警推送与前端展示页面
- 优化检测服务,集成行为分析逻辑
- 移除冗余日志输出,完善代码注释
2026-05-19 09:17:09 +08:00

362 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import cv2
import json
import logging
import asyncio
import os
import signal
import subprocess
import platform
from fastapi import WebSocket, WebSocketDisconnect
from typing import Dict, Optional
import numpy as np
logger = logging.getLogger(__name__)
class CameraService:
def __init__(self, model_service):
self.model_service = model_service
self.active_connections: Dict[str, WebSocket] = {}
self.camera_captures: Dict[str, cv2.VideoCapture] = {}
self.running = False
self.camera_configs: Dict[str, Dict] = {}
self._locks: Dict[str, asyncio.Lock] = {}
self._stop_events: Dict[str, asyncio.Event] = {}
@staticmethod
def force_release_cameras():
"""强制释放被占用的摄像头资源
在以下场景调用:
1. 服务启动前 - 清理之前异常退出残留的占用
2. 服务关闭时 - 确保资源被释放
3. 信号处理时 - 异常退出前的清理
"""
logger.info("强制释放摄像头资源...")
# 1. 尝试释放当前进程中的摄像头
try:
# 在macOS上摄像头设备通常是 /dev/video* 或 AVFoundation 设备
# 尝试打开并立即释放来清理状态
for i in range(10): # 检查前10个可能的摄像头索引
try:
cap = cv2.VideoCapture(i)
if cap.isOpened():
cap.release()
logger.info(f"已释放摄像头索引 {i}")
except Exception:
pass
except Exception as e:
logger.error(f"释放当前进程摄像头失败: {e}")
# 2. 终止占用摄像头的其他Python进程
current_pid = os.getpid()
system = platform.system()
try:
if system == "Darwin": # macOS
# 使用 lsof 查找占用摄像头的进程
result = subprocess.run(
['lsof', '-c', 'python'],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
pids_to_kill = set()
for line in result.stdout.split('\n'):
# 查找包含摄像头设备 (/dev/video* 或 V4L 相关) 的行
if any(x in line for x in ['/dev/video', 'Camera', 'V4L']):
parts = line.split()
if len(parts) >= 2:
try:
pid = int(parts[1])
if pid != current_pid:
pids_to_kill.add(pid)
except (ValueError, IndexError):
pass
# 终止找到的进程
for pid in pids_to_kill:
try:
logger.info(f"终止占用摄像头的进程: {pid}")
os.kill(pid, signal.SIGTERM)
# 给进程一点时间优雅退出
import time
time.sleep(0.5)
# 检查是否还在运行,如果是则强制终止
try:
os.kill(pid, 0) # 检查进程是否存在
logger.warning(f"进程 {pid} 未响应 SIGTERM使用 SIGKILL")
os.kill(pid, signal.SIGKILL)
except ProcessLookupError:
pass # 进程已退出
except ProcessLookupError:
pass # 进程已不存在
except Exception as e:
logger.error(f"终止进程 {pid} 失败: {e}")
elif system == "Linux":
# Linux 系统使用 fuser 或 lsof
for device in ['/dev/video0', '/dev/video1', '/dev/video2']:
try:
result = subprocess.run(
['fuser', '-k', device],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
logger.info(f"已释放设备 {device}")
except Exception:
pass
except Exception as e:
logger.error(f"终止占用进程失败: {e}")
# 3. 给系统一点时间完成资源释放
import time
time.sleep(0.5)
logger.info("摄像头资源释放完成")
async def handle_connection(self, websocket: WebSocket):
await websocket.accept()
connection_id = id(websocket)
self.active_connections[connection_id] = websocket
logger.info(f"新连接: {connection_id}")
try:
while True:
try:
data = await websocket.receive_text()
message = json.loads(data)
if message.get('action') == 'start':
await self.start_camera(connection_id, websocket, message.get('config'))
elif message.get('action') == 'stop':
await self.stop_camera(connection_id)
elif message.get('action') == 'update_config':
await self.update_config(connection_id, message.get('config', {}))
except WebSocketDisconnect:
logger.info(f"WebSocket连接断开: {connection_id}")
break
except WebSocketDisconnect:
logger.info(f"连接断开: {connection_id}")
except Exception as e:
logger.error(f"连接错误: {connection_id}, {e}")
finally:
await self.stop_camera(connection_id)
if connection_id in self.active_connections:
del self.active_connections[connection_id]
logger.info(f"连接清理完成: {connection_id}")
async def start_camera(self, connection_id: str, websocket: WebSocket, config: Dict = None):
try:
camera_source = 0 # 默认本地摄像头
if not config:
config = {
'model_id': 'fire_detection',
'confidence': 0.5,
'iou': 0.45,
'camera_source': 0 # 默认本地摄像头
}
# 支持多种摄像头源
if 'camera_source' in config:
camera_source = config['camera_source']
# 如果该连接已有摄像头在运行,先停止它
if connection_id in self.camera_captures:
await self.stop_camera(connection_id)
# 初始化锁和停止事件
self._locks[connection_id] = asyncio.Lock()
self._stop_events[connection_id] = asyncio.Event()
# 尝试打开摄像头
cap = cv2.VideoCapture(camera_source)
if not cap.isOpened():
await websocket.send_json({
'type': 'error',
'message': f'无法打开摄像头源: {camera_source}'
})
return
# 只对本地摄像头设置分辨率
if isinstance(camera_source, int):
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
self.camera_captures[connection_id] = cap
self.camera_configs[connection_id] = config
await websocket.send_json({
'type': 'camera_started',
'message': f'摄像头已启动: {camera_source}'
})
await self.process_frames(connection_id, websocket, cap)
except Exception as e:
logger.error(f"启动摄像头失败: {e}")
await websocket.send_json({
'type': 'error',
'message': f'启动摄像头失败: {str(e)}'
})
async def process_frames(self, connection_id: str, websocket: WebSocket, cap: cv2.VideoCapture):
from .detection_service import DetectionService
detection_service = DetectionService(self.model_service)
try:
frame_count = 0
stop_event = self._stop_events.get(connection_id)
while connection_id in self.active_connections:
# 检查是否收到停止信号
if stop_event and stop_event.is_set():
logger.info(f"收到停止信号,结束帧处理: {connection_id}")
break
ret, frame = cap.read()
if not ret:
await asyncio.sleep(0.1)
continue
config = self.camera_configs.get(connection_id, {
'model_id': 'fire_detection',
'confidence': 0.5,
'iou': 0.45
})
model_id = config.get('model_id', 'fire_detection')
confidence = config.get('confidence', 0.5)
iou = config.get('iou', 0.45)
draw = True
processed_frame, result = await detection_service.detect_frame(
frame,
model_id=model_id,
confidence=confidence,
iou=iou,
draw=draw
)
if result['success']:
frame_count += 1
logger.info(f"发送检测结果: {len(result['detections'])} 个目标, {result['stats']}")
detection_message = {
'type': 'detection',
'detections': result['detections'],
'stats': result['stats']
}
# 包含行为告警信息
if 'alerts' in result and result['alerts']:
detection_message['alerts'] = result['alerts']
logger.info(f"发送告警: {len(result['alerts'])}")
if 'behavior_stats' in result:
detection_message['behavior_stats'] = result['behavior_stats']
await websocket.send_json(detection_message)
_, buffer = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 80])
import base64
original_frame_data = base64.b64encode(buffer).decode('utf-8')
_, buffer = cv2.imencode('.jpg', processed_frame, [cv2.IMWRITE_JPEG_QUALITY, 80])
frame_data = base64.b64encode(buffer).decode('utf-8')
try:
await websocket.send_json({
'type': 'original_frame',
'frame': original_frame_data
})
await websocket.send_json({
'type': 'annotated_frame',
'frame': frame_data
})
except Exception as e:
logger.error(f"发送帧数据失败: {e}")
break
await asyncio.sleep(0.03)
except Exception as e:
logger.error(f"处理帧错误: {e}")
finally:
# 只在摄像头仍存在于字典中时才释放(避免重复释放)
if connection_id in self.camera_captures:
cap.release()
del self.camera_captures[connection_id]
logger.info(f"帧处理结束,摄像头已释放: {connection_id}")
async def stop_camera(self, connection_id: str):
# 使用锁确保同一时间只有一个协程在操作该连接的摄像头资源
lock = self._locks.get(connection_id)
if lock:
async with lock:
await self._do_stop_camera(connection_id)
else:
await self._do_stop_camera(connection_id)
async def _do_stop_camera(self, connection_id: str):
"""实际执行停止摄像头的操作(内部方法,应在获取锁后调用)"""
# 设置停止事件,通知帧处理循环退出
if connection_id in self._stop_events:
self._stop_events[connection_id].set()
if connection_id in self.camera_captures:
cap = self.camera_captures[connection_id]
cap.release()
del self.camera_captures[connection_id]
if connection_id in self.active_connections:
try:
await self.active_connections[connection_id].send_json({
'type': 'camera_stopped',
'message': '摄像头已停止'
})
except:
pass
logger.info(f"摄像头已停止: {connection_id}")
# 清理锁和事件
if connection_id in self._locks:
del self._locks[connection_id]
if connection_id in self._stop_events:
del self._stop_events[connection_id]
async def update_config(self, connection_id: str, config: Dict):
if connection_id in self.camera_configs:
self.camera_configs[connection_id].update(config)
model_id = self.camera_configs[connection_id].get('model_id', 'fire_detection')
confidence = self.camera_configs[connection_id].get('confidence', 0.5)
iou = self.camera_configs[connection_id].get('iou', 0.45)
logger.info(f"配置更新: model_id={model_id}, confidence={confidence}, iou={iou}")
if connection_id in self.active_connections:
try:
await self.active_connections[connection_id].send_json({
'type': 'config_updated',
'message': '配置已更新'
})
except:
pass
async def stop(self):
for connection_id in list(self.camera_captures.keys()):
await self.stop_camera(connection_id)
self.running = False
logger.info("摄像头服务已停止")