import os import uuid import time from flask import Flask, request, jsonify, send_from_directory, Response from flask_cors import CORS import cv2 import numpy as np import json import tempfile from werkzeug.utils import secure_filename import threading from models.player_detector import PlayerDetector from models.player_tracker import PlayerTracker from utils.field_transformer import FieldTransformer app = Flask(__name__, static_folder='uploads') CORS(app) # 配置上传文件夹 UPLOAD_FOLDER = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'uploads') os.makedirs(UPLOAD_FOLDER, exist_ok=True) os.makedirs(os.path.join(UPLOAD_FOLDER, 'videos'), exist_ok=True) os.makedirs(os.path.join(UPLOAD_FOLDER, 'results'), exist_ok=True) os.makedirs(os.path.join(UPLOAD_FOLDER, 'frames'), exist_ok=True) # 存储视频帧 os.makedirs(os.path.join(UPLOAD_FOLDER, 'visualizations'), exist_ok=True) # 存储可视化结果 # 初始化模型 player_detector = PlayerDetector(confidence=0.6) # 提高置信度阈值 player_tracker = PlayerTracker() field_transformer = FieldTransformer() # 存储处理任务状态 processing_tasks = {} @app.route('/api/upload', methods=['POST']) def upload_file(): if 'video' not in request.files: return jsonify({'error': '没有上传文件'}), 400 file = request.files['video'] if file.filename == '': return jsonify({'error': '没有选择文件'}), 400 # 生成唯一ID并保存视频 video_id = str(uuid.uuid4()) filename = secure_filename(file.filename) video_path = os.path.join(UPLOAD_FOLDER, 'videos', f"{video_id}_{filename}") file.save(video_path) # 创建处理任务 processing_tasks[video_id] = { 'status': 'processing', 'progress': 0, 'video_path': video_path, 'filename': filename } # 异步处理视频分析 threading.Thread(target=process_video_async, args=(video_path, video_id)).start() return jsonify({ 'success': True, 'videoId': video_id, 'message': '视频上传成功,正在处理中', 'status': 'processing' }) def process_video_async(video_path, video_id): """异步处理视频""" try: result = process_video(video_path, video_id) processing_tasks[video_id]['status'] = 'completed' processing_tasks[video_id]['result'] = result except Exception as e: processing_tasks[video_id]['status'] = 'error' processing_tasks[video_id]['error'] = str(e) print(f"处理视频时出错: {str(e)}") def process_video(video_path, video_id): """处理视频并提取球员轨迹""" # 读取视频 cap = cv2.VideoCapture(video_path) # 获取视频基本信息 fps = cap.get(cv2.CAP_PROP_FPS) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # 创建帧文件夹 frames_dir = os.path.join(UPLOAD_FOLDER, 'frames', video_id) os.makedirs(frames_dir, exist_ok=True) # 创建可视化文件夹 vis_dir = os.path.join(UPLOAD_FOLDER, 'visualizations', video_id) os.makedirs(vis_dir, exist_ok=True) # 存储轨迹数据 all_tracks = [] frame_count = 0 # 第一帧用于检测场地(可选) ret, first_frame = cap.read() if ret: # 尝试自动检测场地 field_detected = field_transformer.auto_detect_field(first_frame) # 如果自动检测失败,使用默认值 if not field_detected: print("场地自动检测失败,使用默认变换") # 重置视频读取 cap.set(cv2.CAP_PROP_POS_FRAMES, 0) while cap.isOpened(): ret, frame = cap.read() if not ret: break # 更新进度 if video_id in processing_tasks: processing_tasks[video_id]['progress'] = min(95, int((frame_count / total_frames) * 100)) # 保存帧 frame_path = os.path.join(frames_dir, f"frame_{frame_count:06d}.jpg") cv2.imwrite(frame_path, frame) # 检测球员 detections = player_detector.detect(frame) # 跟踪球员 tracks = player_tracker.update(detections, frame) # 变换坐标到标准足球场 field_positions = field_transformer.transform_to_field(tracks, frame) # 可视化跟踪结果 vis_frame = player_tracker.visualize(frame, draw_history=True) vis_path = os.path.join(vis_dir, f"vis_{frame_count:06d}.jpg") cv2.imwrite(vis_path, vis_frame) # 保存当前帧的轨迹数据 frame_data = { 'frame': frame_count, 'tracks': [] } for track_id, position in field_positions.items(): # 从原始跟踪数据获取球员位置和尺寸 if track_id in tracks: x, y, w, h = tracks[track_id] x1, y1 = int(x - w/2), int(y - h/2) x2, y2 = int(x + w/2), int(y + h/2) frame_data['tracks'].append({ 'id': int(track_id), 'x': float(position[0]), 'y': float(position[1]), 'z': 0.0, # 高度默认为0 'image_pos': { 'x1': x1, 'y1': y1, 'x2': x2, 'y2': y2 } }) all_tracks.append(frame_data) frame_count += 1 cap.release() # 保存结果到JSON文件 result_path = os.path.join(UPLOAD_FOLDER, 'results', f"{video_id}_tracks.json") with open(result_path, 'w') as f: json.dump(all_tracks, f) # 保存视频信息 video_info = { 'fps': fps, 'total_frames': total_frames, 'width': width, 'height': height, 'filename': os.path.basename(video_path) } video_info_path = os.path.join(UPLOAD_FOLDER, 'results', f"{video_id}_info.json") with open(video_info_path, 'w') as f: json.dump(video_info, f) return { 'trackFile': f"/api/results/{video_id}_tracks.json", 'infoFile': f"/api/results/{video_id}_info.json", 'totalFrames': frame_count, 'framesDir': f"/api/frames/{video_id}", 'visualizationsDir': f"/api/visualizations/{video_id}", 'fps': fps } @app.route('/api/status/', methods=['GET']) def get_status(video_id): """获取视频处理状态""" if video_id not in processing_tasks: return jsonify({'error': '找不到处理任务'}), 404 task = processing_tasks[video_id] response = { 'status': task['status'], 'progress': task['progress'] } if task['status'] == 'completed' and 'result' in task: response['result'] = task['result'] elif task['status'] == 'error' and 'error' in task: response['error'] = task['error'] return jsonify(response) @app.route('/api/results/', methods=['GET']) def get_result(filename): return send_from_directory(os.path.join(UPLOAD_FOLDER, 'results'), filename) @app.route('/api/frames//', methods=['GET']) def get_frame(video_id, frame_filename): return send_from_directory(os.path.join(UPLOAD_FOLDER, 'frames', video_id), frame_filename) @app.route('/api/visualizations//', methods=['GET']) def get_visualization(video_id, vis_filename): return send_from_directory(os.path.join(UPLOAD_FOLDER, 'visualizations', video_id), vis_filename) @app.route('/api/video/', methods=['GET']) def get_video(video_id): """获取原始视频""" # 查找对应的视频文件 for filename in os.listdir(os.path.join(UPLOAD_FOLDER, 'videos')): if filename.startswith(video_id): return send_from_directory(os.path.join(UPLOAD_FOLDER, 'videos'), filename) return jsonify({'error': '找不到视频文件'}), 404 @app.route('/api/generate_visualization_video/', methods=['GET']) def generate_visualization_video(video_id): """生成包含轨迹可视化的视频""" if video_id not in processing_tasks or processing_tasks[video_id]['status'] != 'completed': return jsonify({'error': '视频尚未处理完成'}), 400 vis_dir = os.path.join(UPLOAD_FOLDER, 'visualizations', video_id) if not os.path.exists(vis_dir) or len(os.listdir(vis_dir)) == 0: return jsonify({'error': '找不到可视化结果'}), 404 try: # 获取视频信息 with open(os.path.join(UPLOAD_FOLDER, 'results', f"{video_id}_info.json"), 'r') as f: video_info = json.load(f) fps = video_info.get('fps', 30) width = video_info.get('width', 1280) height = video_info.get('height', 720) # 输出视频路径 output_path = os.path.join(UPLOAD_FOLDER, 'videos', f"{video_id}_visualization.mp4") # 获取所有可视化帧文件 frame_files = sorted([f for f in os.listdir(vis_dir) if f.startswith('vis_')]) if not frame_files: return jsonify({'error': '没有找到可视化帧'}), 404 # 读取第一帧获取尺寸 first_frame = cv2.imread(os.path.join(vis_dir, frame_files[0])) h, w, _ = first_frame.shape # 创建视频写入器 fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(output_path, fourcc, fps, (w, h)) # 写入每一帧 for frame_file in frame_files: frame = cv2.imread(os.path.join(vis_dir, frame_file)) out.write(frame) out.release() return jsonify({ 'success': True, 'video_url': f"/api/videos/{video_id}_visualization.mp4" }) except Exception as e: return jsonify({'error': f'生成可视化视频失败: {str(e)}'}), 500 @app.route('/api/videos/', methods=['GET']) def get_visualization_video(filename): return send_from_directory(os.path.join(UPLOAD_FOLDER, 'videos'), filename) if __name__ == '__main__': app.run(debug=True, host='0.0.0.0', port=5000)