Spaces:
Running
on
A10G
Running
on
A10G
import numpy as np | |
from scipy.spatial.distance import cdist | |
from fastdtw import fastdtw | |
import json | |
import cv2 | |
# 读取视频文件 | |
def read_video_frames(video_path): | |
cap = cv2.VideoCapture(video_path) | |
frames = [] | |
while cap.isOpened(): | |
ret, frame = cap.read() | |
if not ret: | |
break | |
frames.append(frame) | |
cap.release() | |
return frames | |
# 提取关键点 | |
def extract_keypoints(sequence): | |
keypoints_sequence = [] | |
for frame in sequence: | |
keypoints = frame['instances'][0]['keypoints'][5:13] + frame['instances'][0]['keypoints'][91:133] # 获取每一帧的关键点(仅考虑上半身及手部的点,共计50个) | |
keypoints_sequence.append(np.array(keypoints)) | |
return keypoints_sequence | |
# 计算两帧之间的距离(这里使用欧氏距离) | |
def calculate_distance_matrix(seq1, seq2): | |
distances = [] | |
for i in range(len(seq1)): | |
frame_distances = [] | |
for j in range(len(seq2)): | |
distance = np.linalg.norm(seq1[i] - seq2[j], axis=1).mean() # 计算关键点的平均距离 | |
frame_distances.append(distance) | |
distances.append(frame_distances) | |
return np.array(distances) | |
# 计算两个手语序列的最佳对齐路径 | |
def align_sequences(seq1, seq2): | |
# 提取关键点 | |
keypoints_seq1 = extract_keypoints(seq1) | |
keypoints_seq2 = extract_keypoints(seq2) | |
# 计算距离矩阵 | |
distances = calculate_distance_matrix(keypoints_seq1, keypoints_seq2) | |
# 使用 DTW 进行对齐 | |
distance, path = fastdtw(keypoints_seq1, keypoints_seq2, dist=lambda x, y: np.linalg.norm(x - y)) | |
return distance, path | |
def filter_sequence_by_alignment(sequence, alignment_path, index): | |
""" | |
根据alignment_path筛选序列 | |
sequence: 输入序列 (sequence1 or sequence2) | |
alignment_path: 对齐路径 | |
index: 选择是sequence1还是sequence2 (0表示sequence1, 1表示sequence2) | |
""" | |
filtered_sequence = [] | |
for path in alignment_path: | |
frame_index = path[index] | |
filtered_sequence.append(sequence[frame_index]) | |
return filtered_sequence | |
def scale_keypoints(standard, user, seq1_frames, seq2_frames): | |
height1, width1, _ = seq1_frames[0].shape | |
height2, width2, _ = seq2_frames[0].shape | |
sequence1 = json.load(open(standard + ".json", 'r')) | |
sequence2 = json.load(open(user + ".json", 'r')) | |
unified_width = int(max(width1, width2)) | |
unified_height = int(max(height1, height2)) | |
# 计算标准视频和用户视频的缩放比例 float | |
scale_x_standard = unified_width / width1 | |
scale_y_standard = unified_height / height1 | |
scale_x_user = unified_width / width2 | |
scale_y_user = unified_height / height2 | |
# 如果标准视频的宽、高需要缩放 | |
if scale_x_standard != 1.0 or scale_y_standard != 1.0: | |
for frame in range(len(sequence1)): | |
keypoints_00 = sequence1[frame]["instances"][0]["keypoints"][5:13] + sequence1[frame]["instances"][0]["keypoints"][91:133] | |
adjusted_keypoints_00 = [ | |
[point[0] * scale_x_standard, point[1] * scale_y_standard] | |
for point in keypoints_00 | |
] | |
sequence1[frame]["instances"][0]["keypoints"][5:13] = adjusted_keypoints_00[:8] | |
sequence1[frame]["instances"][0]["keypoints"][91:133] = adjusted_keypoints_00[8:] | |
# 如果用户视频的宽、高需要缩放 | |
if scale_x_user != 1.0 or scale_y_user != 1.0: | |
for frame in range(len(sequence2)): | |
keypoints_01 = sequence2[frame]["instances"][0]["keypoints"][5:13] + sequence2[frame]["instances"][0]["keypoints"][91:133] | |
adjusted_keypoints_01 = [ | |
[point[0] * scale_x_user, point[1] * scale_y_user] | |
for point in keypoints_01 | |
] | |
sequence2[frame]["instances"][0]["keypoints"][5:13] = adjusted_keypoints_01[:8] | |
sequence2[frame]["instances"][0]["keypoints"][91:133] = adjusted_keypoints_01[8:] | |
json.dump(sequence1, open(standard + ".json", 'w'), indent=4) | |
json.dump(sequence2, open(user + ".json", 'w'), indent=4) | |
# 根据对齐路径提取帧并创建新视频 | |
def create_aligned_videos(seq1_frames, seq2_frames, alignment_path, output_combined_path, output_seq1_path, output_seq2_path, fps=30): | |
height1, width1, _ = seq1_frames[0].shape | |
height2, width2, _ = seq2_frames[0].shape | |
# height = max(height1, height2) | |
# width = width1 + width2 | |
# 计算输出视频的统一宽度和高度 | |
max_height = max(height1, height2) | |
max_width = max(width1, width2) | |
# 按比例计算缩放系数 | |
scale1 = min(max_width / width1, max_height / height1) | |
scale2 = min(max_width / width2, max_height / height2) | |
# 缩放后统一的宽高 | |
unified_width = int(max_width) | |
unified_height = int(max_height) | |
# 创建输出视频 | |
fourcc = cv2.VideoWriter_fourcc(*'H264') | |
# 创建并排视频 | |
combined_out = cv2.VideoWriter(output_combined_path, fourcc, fps, (unified_width * 2, unified_height)) | |
# 创建单独的视频 | |
seq1_out = cv2.VideoWriter(output_seq1_path, fourcc, fps, (unified_width, unified_height)) | |
seq2_out = cv2.VideoWriter(output_seq2_path, fourcc, fps, (unified_width, unified_height)) | |
for idx1, idx2 in alignment_path: | |
# 获取对齐的帧 | |
frame1 = seq1_frames[idx1] | |
frame2 = seq2_frames[idx2] | |
# # 如果两帧高度不一致,调整frame2的高度以匹配frame1 | |
# if height1 != height2: | |
# frame2 = cv2.resize(frame2, (width2, height1)) | |
# 等比例缩放两个视频的帧到统一尺寸 | |
frame1_resized = cv2.resize(frame1, (unified_width, unified_height), interpolation=cv2.INTER_AREA) | |
frame2_resized = cv2.resize(frame2, (unified_width, unified_height), interpolation=cv2.INTER_AREA) | |
# 拼接帧,左右并排 | |
combined_frame = np.hstack((frame1_resized, frame2_resized)) | |
# 写入到输出视频 | |
combined_out.write(combined_frame) | |
seq1_out.write(frame1_resized) | |
seq2_out.write(frame2_resized) | |
# 释放资源 | |
combined_out.release() | |
seq1_out.release() | |
seq2_out.release() | |
def align_filter(standard, user): | |
# 示例数据 | |
sequence1 = json.load(open(standard + ".json", 'r')) | |
sequence2 = json.load(open(user + ".json", 'r')) | |
# 对齐两个序列 | |
distance, alignment_path = align_sequences(sequence1, sequence2) | |
# 过滤sequence1和sequence2 | |
filtered_sequence1 = filter_sequence_by_alignment(sequence1, alignment_path, index=0) | |
filtered_sequence2 = filter_sequence_by_alignment(sequence2, alignment_path, index=1) | |
print(f"DTW 最佳对齐路径: {alignment_path}") | |
print(f"DTW 最小对齐距离: {distance}") | |
# 存储对齐的json keypoint | |
json.dump(filtered_sequence1, open(standard + ".json", 'w'), indent=4) | |
json.dump(filtered_sequence2, open(user + ".json", 'w'), indent=4) | |
# 读取视频帧 | |
seq1_frames = read_video_frames(standard + '.mp4') # Sequence 1: 55 frames | |
seq2_frames = read_video_frames(user + '.mp4') # Sequence 2: 34 frames | |
# 输出视频路径 | |
output_combined_path = 'tmp/aligned_combined_output.mp4' # 合并视频 | |
output_seq1_path = standard + '.mp4' # 对齐后sequence1视频 | |
output_seq2_path = user + '.mp4' # 对齐后sequence2视频 | |
height1, width1, _ = seq1_frames[0].shape | |
height2, width2, _ = seq2_frames[0].shape | |
# 如果视频尺寸不相同则根据比例缩放关键点坐标 | |
if height1 != height2 or width1 != width2: | |
scale_keypoints(standard, user, seq1_frames, seq2_frames) | |
# 创建对齐后的视频 | |
create_aligned_videos(seq1_frames, seq2_frames, alignment_path, output_combined_path, output_seq1_path, output_seq2_path) | |
print(f"Combined video created at {output_combined_path}") | |
print(f"Aligned Sequence 1 video created at {output_seq1_path}") | |
print(f"Aligned Sequence 2 video created at {output_seq2_path}") | |