一、系统概述
该系统基于计算机视觉技术,实现对视频或摄像头画面中的人员进行检测、跟踪,并生成轨迹数据。支持透视变换校准(鸟瞰图显示)、多目标跟踪、轨迹存储及视频录制功能,适用于安防监控、行为分析等场景。
二、依赖库
python
运行
import cv2 # 计算机视觉处理(OpenCV库)
import numpy as np # 数值计算
import time # 时间处理
import os # 文件与目录操作
from datetime import datetime # 日期时间处理
三、类定义:PersonTracker
3.1 构造函数 __init__
功能
初始化人员跟踪器,配置视频源、输出参数、背景减除器及跟踪参数。
参数说明
参数名 | 类型 | 默认值 | 描述 |
---|---|---|---|
video_source | int/str | 0 | 视频源(0 为默认摄像头,或指定视频文件路径) |
save_video | bool | False | 是否保存处理后的视频 |
show_warped | bool | True | 是否显示透视变换后的鸟瞰图 |
内部属性
- 视频源与基础参数:
cap
:视频捕获对象(cv2.VideoCapture
实例)frame_width
/frame_height
:视频帧宽高fps
:帧率
- 输出配置:
output_folder
:输出文件夹(默认output
)out
:视频写入对象(cv2.VideoWriter
实例,仅当save_video=True
时创建)
- 背景减除:
fgbg
:使用MOG2
算法的背景减除器,支持阴影检测
- 跟踪参数:
min_contour_area
/max_contour_area
:过滤轮廓的面积阈值(单位像素)trajectories
:存储轨迹的字典(键为人员 ID,值为轨迹信息)max_disappeared_frames
:允许目标消失的最大帧数(超过则删除轨迹)max_distance
:轨迹匹配的最大距离(像素)
- 透视变换:
perspective_transform
:透视变换矩阵(校准后生成)warped_width
/warped_height
:鸟瞰图尺寸(宽度固定 500,高度与原始帧一致)
3.2 方法列表
3.2.1 calibrate_perspective()
- 功能:通过鼠标点击选择 4 个点,校准透视变换矩阵,生成鸟瞰图。
- 操作说明:
- 显示视频第一帧,按顺序点击左上、右上、右下、左下四个点,形成矩形区域。
- 按
q
键退出校准。
- 返回值:
bool
(True
为校准成功,False
为取消或失败)
3.2.2 detect_persons(frame)
- 功能:在输入帧中检测人员,返回检测结果和二值化掩码。
- 输入:
frame
(BGR 格式图像) - 处理流程:
- 应用背景减除,生成前景掩码。
- 形态学操作(开运算 + 闭运算)去除噪声。
- 查找轮廓,过滤面积不符合阈值的轮廓。
- 计算每个轮廓的中心点和边界框。
- 返回值:
(persons, thresh)
,其中:persons
:检测到的人员列表(每个元素为字典,包含bbox
、center
、contour
、area
)thresh
:二值化掩码图像
3.2.3 track_persons(detected_persons)
- 功能:根据检测结果更新人员轨迹。
- 输入:
detected_persons
(detect_persons
返回的人员列表) - 算法逻辑:
- 计算现有轨迹与新检测的匹配距离(欧氏距离),优先匹配近距离目标。
- 未匹配的轨迹:若连续消失超过
max_disappeared_frames
,则删除。 - 未匹配的检测:创建新轨迹,分配唯一 ID。
3.2.4 draw_results(frame, persons, thresh)
- 功能:在图像上绘制检测框、轨迹、ID 及统计信息,支持鸟瞰图显示。
- 输入:
frame
:原始帧persons
:检测到的人员列表thresh
:二值化掩码(未使用,仅保留接口)
- 输出:绘制后的结果图像(若
show_warped=True
,则为原始帧与鸟瞰图的横向拼接图)
3.2.5 save_trajectories()
- 功能:将当前所有轨迹数据保存到文本文件,包含 ID、起始时间、轨迹点坐标等。
- 存储路径:
output_folder/trajectories_时间戳.txt
3.2.6 run()
- 功能:运行跟踪主循环,处理视频流并实时显示结果。
- 操作说明:
- 按
q
键退出程序。 - 按
s
键保存当前轨迹数据。
- 按
- 流程:
- 调用
calibrate_perspective()
进行透视校准(可选)。 - 逐帧读取视频,检测、跟踪人员,绘制结果。
- 释放资源并关闭窗口。
- 调用
四、主程序入口
python
运行
if __name__ == "__main__":tracker = PersonTracker(video_source=0, # 0为摄像头,或指定视频文件路径(如"video.mp4")save_video=True, # 启用视频录制show_warped=True # 显示鸟瞰图)tracker.run()
五、使用说明
5.1 环境配置
- 安装依赖库:
bash
pip install opencv-python numpy
- 确保摄像头或视频文件可用。
5.2 透视校准操作
- 运行程序后,会弹出窗口提示选择 4 个点。
- 按顺序点击视频中的矩形区域四角(如地面区域),生成鸟瞰图。
- 校准完成后,右侧会显示鸟瞰图中的轨迹。
5.3 输出文件
- 视频文件:若
save_video=True
,生成output/tracking_时间戳.avi
。 - 轨迹文件:按
s
键生成output/trajectories_时间戳.txt
,包含各 ID 的坐标序列。
六、参数调整建议
参数名 | 作用 | 调整场景 |
---|---|---|
min_contour_area | 过滤小目标(如噪声) | 目标较小时调小,反之调大 |
max_contour_area | 过滤大目标(如多人重叠) | 目标较大时调大,反之调小 |
max_disappeared_frames | 目标消失后保留轨迹的帧数 | 目标运动间隔较长时调大 |
max_distance | 轨迹匹配的最大允许距离 | 目标运动速度快时调大 |
warped_width | 鸟瞰图宽度 | 显示区域宽窄调整 |
七、注意事项
- 背景减除器
MOG2
需要一定时间学习背景(前几秒可能检测不稳定)。 - 透视校准的四点应选择实际场景中的矩形区域(如地面边框),以确保鸟瞰图坐标准确。
- 若视频帧率较低,可尝试降低
warped_width
或关闭show_warped
以减少计算量。
完成代码
import cv2
import numpy as np
import time
import os
from datetime import datetimeclass PersonTracker:def __init__(self, video_source=0, save_video=False, show_warped=True):"""初始化人员跟踪器"""# 视频源设置self.video_source = video_sourceself.cap = cv2.VideoCapture(video_source)if not self.cap.isOpened():raise ValueError("无法打开视频源", video_source)# 获取视频的宽度、高度和帧率self.frame_width = int(self.cap.get(3))self.frame_height = int(self.cap.get(4))self.fps = self.cap.get(cv2.CAP_PROP_FPS)# 输出设置self.save_video = save_videoself.output_folder = "output"self.show_warped = show_warped# 创建输出文件夹if not os.path.exists(self.output_folder):os.makedirs(self.output_folder)# 背景减除器self.fgbg = cv2.createBackgroundSubtractorMOG2(history=500, varThreshold=100, detectShadows=True)# 人员检测参数self.min_contour_area = 1000 # 最小轮廓面积self.max_contour_area = 50000 # 最大轮廓面积# 轨迹存储self.trajectories = {} # 存储每个人的轨迹self.next_person_id = 1 # 下一个可用的人员IDself.max_disappeared_frames = 10 # 最大消失帧数self.max_distance = 100 # 最大匹配距离# 透视变换参数self.perspective_transform = Noneself.warped_width = 500self.warped_height = self.frame_height # 与原始帧高度一致# 录制设置self.out = Noneif save_video:timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")output_path = os.path.join(self.output_folder, f"tracking_{timestamp}.avi")fourcc = cv2.VideoWriter_fourcc(*'XVID')self.out = cv2.VideoWriter(output_path, fourcc, self.fps, (self.frame_width, self.frame_height))def calibrate_perspective(self):"""校准透视变换,创建鸟瞰图"""print("请在图像中选择4个点,形成一个矩形区域,用于透视变换")print("按顺序点击:左上、右上、右下、左下")# 读取一帧用于选择点ret, frame = self.cap.read()if not ret:print("无法读取视频帧")return False# 创建窗口并设置鼠标回调cv2.namedWindow("选择透视变换点 (按 'q' 退出)")points = []def click_event(event, x, y, flags, param):if event == cv2.EVENT_LBUTTONDOWN:points.append((x, y))cv2.circle(frame, (x, y), 5, (0, 255, 0), -1)cv2.imshow("选择透视变换点 (按 'q' 退出)", frame)cv2.setMouseCallback("选择透视变换点 (按 'q' 退出)", click_event)# 显示图像并等待点击cv2.imshow("选择透视变换点 (按 'q' 退出)", frame)while len(points) < 4:key = cv2.waitKey(1) & 0xFFif key == ord('q'):cv2.destroyAllWindows()return Falsecv2.destroyAllWindows()# 定义目标矩形src = np.float32(points)dst = np.float32([[0, 0],[self.warped_width, 0],[self.warped_width, self.warped_height],[0, self.warped_height]])# 计算透视变换矩阵self.perspective_transform = cv2.getPerspectiveTransform(src, dst)return Truedef detect_persons(self, frame):"""检测图像中的人物"""# 应用背景减除fgmask = self.fgbg.apply(frame)# 图像预处理_, thresh = cv2.threshold(fgmask, 127, 255, cv2.THRESH_BINARY)kernel = np.ones((5, 5), np.uint8)thresh = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel, iterations=2)thresh = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel, iterations=3)# 查找轮廓contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)persons = []for contour in contours:area = cv2.contourArea(contour)if area < self.min_contour_area or area > self.max_contour_area:continue# 计算边界框x, y, w, h = cv2.boundingRect(contour)center = (int(x + w/2), int(y + h/2))# 计算轮廓的中心点M = cv2.moments(contour)if M["m00"] != 0:cX = int(M["m10"] / M["m00"])cY = int(M["m01"] / M["m00"])center = (cX, cY)persons.append({'bbox': (x, y, w, h),'center': center,'contour': contour,'area': area})return persons, threshdef track_persons(self, detected_persons):"""跟踪检测到的人员"""# 计算当前检测点与现有轨迹的距离unmatched_tracks = list(self.trajectories.keys())unmatched_detections = list(range(len(detected_persons)))matches = []# 计算所有可能的匹配for track_id in self.trajectories:trajectory = self.trajectories[track_id]last_position = trajectory['positions'][-1]min_distance = float('inf')min_index = -1for i, person in enumerate(detected_persons):if i in unmatched_detections:distance = np.sqrt((last_position[0] - person['center'][0])**2 + (last_position[1] - person['center'][1])**2)if distance < min_distance and distance < self.max_distance:min_distance = distancemin_index = i# 如果找到匹配if min_index != -1:matches.append((track_id, min_index, min_distance))# 按距离排序,优先处理距离近的匹配matches.sort(key=lambda x: x[2])# 应用匹配for match in matches:track_id, detection_index, _ = matchif track_id in unmatched_tracks and detection_index in unmatched_detections:# 更新轨迹self.trajectories[track_id]['positions'].append(detected_persons[detection_index]['center'])self.trajectories[track_id]['last_seen'] = 0self.trajectories[track_id]['bbox'] = detected_persons[detection_index]['bbox']# 从待匹配列表中移除unmatched_tracks.remove(track_id)unmatched_detections.remove(detection_index)# 处理未匹配的轨迹for track_id in unmatched_tracks:self.trajectories[track_id]['last_seen'] += 1if self.trajectories[track_id]['last_seen'] > self.max_disappeared_frames:del self.trajectories[track_id]# 处理未匹配的检测结果for detection_index in unmatched_detections:# 创建新轨迹self.trajectories[self.next_person_id] = {'positions': [detected_persons[detection_index]['center']],'last_seen': 0,'bbox': detected_persons[detection_index]['bbox'],'start_time': time.time()}self.next_person_id += 1def draw_results(self, frame, persons, thresh):"""在图像上绘制检测和跟踪结果"""output = frame.copy()# 绘制检测到的人物for person in persons:x, y, w, h = person['bbox']cv2.rectangle(output, (x, y), (x + w, y + h), (0, 255, 0), 2)cv2.circle(output, person['center'], 5, (0, 0, 255), -1)# 绘制轨迹for track_id, trajectory in self.trajectories.items():positions = trajectory['positions']# 绘制轨迹线for i in range(1, len(positions)):cv2.line(output, positions[i-1], positions[i], (255, 0, 0), 2)# 绘制轨迹点for pos in positions:cv2.circle(output, pos, 3, (255, 0, 0), -1)# 绘制ID和轨迹长度if len(positions) > 0:last_pos = positions[-1]cv2.putText(output, f"ID: {track_id}", (last_pos[0] + 10, last_pos[1] - 20),cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 0, 0), 2)cv2.putText(output, f"Points: {len(positions)}", (last_pos[0] + 10, last_pos[1]),cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 0, 0), 2)# 显示统计信息cv2.putText(output, f"Persons: {len(self.trajectories)}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)cv2.putText(output, f"FPS: {int(self.fps)}", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)# 创建结果显示窗口if self.show_warped and self.perspective_transform is not None:# 创建鸟瞰图warped = cv2.warpPerspective(output, self.perspective_transform, (self.warped_width, self.warped_height))# 在鸟瞰图上绘制轨迹for track_id, trajectory in self.trajectories.items():positions = trajectory['positions']for i in range(1, len(positions)):# 将原始坐标转换为鸟瞰图坐标pos1 = np.array([[positions[i-1][0], positions[i-1][1]]], dtype=np.float32).reshape(-1, 1, 2)pos2 = np.array([[positions[i][0], positions[i][1]]], dtype=np.float32).reshape(-1, 1, 2)warped_pos1 = cv2.perspectiveTransform(pos1, self.perspective_transform)[0][0]warped_pos2 = cv2.perspectiveTransform(pos2, self.perspective_transform)[0][0]cv2.line(warped, (int(warped_pos1[0]), int(warped_pos1[1])),(int(warped_pos2[0]), int(warped_pos2[1])), (255, 0, 0), 2)# 合并显示combined = np.hstack((output, warped))return combinedreturn outputdef save_trajectories(self):"""保存轨迹数据到文件"""timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")output_path = os.path.join(self.output_folder, f"trajectories_{timestamp}.txt")with open(output_path, 'w') as f:f.write("Person Trajectories\n")f.write(f"Recorded on: {datetime.now()}\n\n")for track_id, trajectory in self.trajectories.items():f.write(f"Person ID: {track_id}\n")f.write(f"Start Time: {time.ctime(trajectory['start_time'])}\n")f.write(f"Duration: {time.time() - trajectory['start_time']:.2f} seconds\n")f.write(f"Trajectory Points: {len(trajectory['positions'])}\n")f.write("Positions:\n")for pos in trajectory['positions']:f.write(f" ({pos[0]}, {pos[1]})\n")f.write("\n")print(f"轨迹数据已保存到: {output_path}")def run(self):"""运行人员跟踪系统"""# 首先进行透视校准if not self.calibrate_perspective():print("透视校准失败,使用原始视角")print("开始人员跟踪...")print("按 'q' 退出,按 's' 保存轨迹数据")frame_count = 0start_time = time.time()while True:ret, frame = self.cap.read()if not ret:break# 计算实际帧率frame_count += 1if frame_count % 10 == 0:elapsed_time = time.time() - start_timeself.fps = frame_count / elapsed_time# 检测人员persons, thresh = self.detect_persons(frame)# 跟踪人员self.track_persons(persons)# 绘制结果result = self.draw_results(frame, persons, thresh)# 保存视频if self.save_video:self.out.write(result)# 显示结果cv2.imshow("人员轨迹跟踪系统 (按 'q' 退出,按 's' 保存轨迹)", result)# 按键处理key = cv2.waitKey(1) & 0xFFif key == ord('q'):breakelif key == ord('s'):self.save_trajectories()# 释放资源self.cap.release()if self.out:self.out.release()cv2.destroyAllWindows()print("人员跟踪系统已关闭")# 主程序入口
if __name__ == "__main__":# 创建人员跟踪器实例tracker = PersonTracker(video_source=0, # 0表示默认摄像头,也可以指定视频文件路径save_video=True, # 是否保存视频show_warped=True # 是否显示鸟瞰图)# 运行跟踪器tracker.run()