123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298 |
- import os
- import uuid
- import time
- from flask import Flask, request, jsonify, send_from_directory, Response
- from flask_cors import CORS
- import cv2
- import numpy as np
- import json
- import tempfile
- from werkzeug.utils import secure_filename
- import threading
- from models.player_detector import PlayerDetector
- from models.player_tracker import PlayerTracker
- from utils.field_transformer import FieldTransformer
- app = Flask(__name__, static_folder='uploads')
- CORS(app)
- # 配置上传文件夹
- UPLOAD_FOLDER = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'uploads')
- os.makedirs(UPLOAD_FOLDER, exist_ok=True)
- os.makedirs(os.path.join(UPLOAD_FOLDER, 'videos'), exist_ok=True)
- os.makedirs(os.path.join(UPLOAD_FOLDER, 'results'), exist_ok=True)
- os.makedirs(os.path.join(UPLOAD_FOLDER, 'frames'), exist_ok=True) # 存储视频帧
- os.makedirs(os.path.join(UPLOAD_FOLDER, 'visualizations'), exist_ok=True) # 存储可视化结果
- # 初始化模型
- player_detector = PlayerDetector(confidence=0.6) # 提高置信度阈值
- player_tracker = PlayerTracker()
- field_transformer = FieldTransformer()
- # 存储处理任务状态
- processing_tasks = {}
- @app.route('/api/upload', methods=['POST'])
- def upload_file():
- if 'video' not in request.files:
- return jsonify({'error': '没有上传文件'}), 400
-
- file = request.files['video']
- if file.filename == '':
- return jsonify({'error': '没有选择文件'}), 400
-
- # 生成唯一ID并保存视频
- video_id = str(uuid.uuid4())
- filename = secure_filename(file.filename)
- video_path = os.path.join(UPLOAD_FOLDER, 'videos', f"{video_id}_{filename}")
- file.save(video_path)
-
- # 创建处理任务
- processing_tasks[video_id] = {
- 'status': 'processing',
- 'progress': 0,
- 'video_path': video_path,
- 'filename': filename
- }
-
- # 异步处理视频分析
- threading.Thread(target=process_video_async, args=(video_path, video_id)).start()
-
- return jsonify({
- 'success': True,
- 'videoId': video_id,
- 'message': '视频上传成功,正在处理中',
- 'status': 'processing'
- })
- def process_video_async(video_path, video_id):
- """异步处理视频"""
- try:
- result = process_video(video_path, video_id)
- processing_tasks[video_id]['status'] = 'completed'
- processing_tasks[video_id]['result'] = result
- except Exception as e:
- processing_tasks[video_id]['status'] = 'error'
- processing_tasks[video_id]['error'] = str(e)
- print(f"处理视频时出错: {str(e)}")
- def process_video(video_path, video_id):
- """处理视频并提取球员轨迹"""
- # 读取视频
- cap = cv2.VideoCapture(video_path)
-
- # 获取视频基本信息
- fps = cap.get(cv2.CAP_PROP_FPS)
- total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
- width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
- height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
-
- # 创建帧文件夹
- frames_dir = os.path.join(UPLOAD_FOLDER, 'frames', video_id)
- os.makedirs(frames_dir, exist_ok=True)
-
- # 创建可视化文件夹
- vis_dir = os.path.join(UPLOAD_FOLDER, 'visualizations', video_id)
- os.makedirs(vis_dir, exist_ok=True)
-
- # 存储轨迹数据
- all_tracks = []
- frame_count = 0
-
- # 第一帧用于检测场地(可选)
- ret, first_frame = cap.read()
- if ret:
- # 尝试自动检测场地
- field_detected = field_transformer.auto_detect_field(first_frame)
-
- # 如果自动检测失败,使用默认值
- if not field_detected:
- print("场地自动检测失败,使用默认变换")
-
- # 重置视频读取
- cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
-
- while cap.isOpened():
- ret, frame = cap.read()
- if not ret:
- break
-
- # 更新进度
- if video_id in processing_tasks:
- processing_tasks[video_id]['progress'] = min(95, int((frame_count / total_frames) * 100))
-
- # 保存帧
- frame_path = os.path.join(frames_dir, f"frame_{frame_count:06d}.jpg")
- cv2.imwrite(frame_path, frame)
-
- # 检测球员
- detections = player_detector.detect(frame)
-
- # 跟踪球员
- tracks = player_tracker.update(detections, frame)
-
- # 变换坐标到标准足球场
- field_positions = field_transformer.transform_to_field(tracks, frame)
-
- # 可视化跟踪结果
- vis_frame = player_tracker.visualize(frame, draw_history=True)
- vis_path = os.path.join(vis_dir, f"vis_{frame_count:06d}.jpg")
- cv2.imwrite(vis_path, vis_frame)
-
- # 保存当前帧的轨迹数据
- frame_data = {
- 'frame': frame_count,
- 'tracks': []
- }
-
- for track_id, position in field_positions.items():
- # 从原始跟踪数据获取球员位置和尺寸
- if track_id in tracks:
- x, y, w, h = tracks[track_id]
- x1, y1 = int(x - w/2), int(y - h/2)
- x2, y2 = int(x + w/2), int(y + h/2)
-
- frame_data['tracks'].append({
- 'id': int(track_id),
- 'x': float(position[0]),
- 'y': float(position[1]),
- 'z': 0.0, # 高度默认为0
- 'image_pos': {
- 'x1': x1,
- 'y1': y1,
- 'x2': x2,
- 'y2': y2
- }
- })
-
- all_tracks.append(frame_data)
- frame_count += 1
-
- cap.release()
-
- # 保存结果到JSON文件
- result_path = os.path.join(UPLOAD_FOLDER, 'results', f"{video_id}_tracks.json")
- with open(result_path, 'w') as f:
- json.dump(all_tracks, f)
-
- # 保存视频信息
- video_info = {
- 'fps': fps,
- 'total_frames': total_frames,
- 'width': width,
- 'height': height,
- 'filename': os.path.basename(video_path)
- }
-
- video_info_path = os.path.join(UPLOAD_FOLDER, 'results', f"{video_id}_info.json")
- with open(video_info_path, 'w') as f:
- json.dump(video_info, f)
-
- return {
- 'trackFile': f"/api/results/{video_id}_tracks.json",
- 'infoFile': f"/api/results/{video_id}_info.json",
- 'totalFrames': frame_count,
- 'framesDir': f"/api/frames/{video_id}",
- 'visualizationsDir': f"/api/visualizations/{video_id}",
- 'fps': fps
- }
- @app.route('/api/status/<video_id>', methods=['GET'])
- def get_status(video_id):
- """获取视频处理状态"""
- if video_id not in processing_tasks:
- return jsonify({'error': '找不到处理任务'}), 404
-
- task = processing_tasks[video_id]
- response = {
- 'status': task['status'],
- 'progress': task['progress']
- }
-
- if task['status'] == 'completed' and 'result' in task:
- response['result'] = task['result']
- elif task['status'] == 'error' and 'error' in task:
- response['error'] = task['error']
-
- return jsonify(response)
- @app.route('/api/results/<filename>', methods=['GET'])
- def get_result(filename):
- return send_from_directory(os.path.join(UPLOAD_FOLDER, 'results'), filename)
- @app.route('/api/frames/<video_id>/<frame_filename>', methods=['GET'])
- def get_frame(video_id, frame_filename):
- return send_from_directory(os.path.join(UPLOAD_FOLDER, 'frames', video_id), frame_filename)
- @app.route('/api/visualizations/<video_id>/<vis_filename>', methods=['GET'])
- def get_visualization(video_id, vis_filename):
- return send_from_directory(os.path.join(UPLOAD_FOLDER, 'visualizations', video_id), vis_filename)
- @app.route('/api/video/<video_id>', methods=['GET'])
- def get_video(video_id):
- """获取原始视频"""
- # 查找对应的视频文件
- for filename in os.listdir(os.path.join(UPLOAD_FOLDER, 'videos')):
- if filename.startswith(video_id):
- return send_from_directory(os.path.join(UPLOAD_FOLDER, 'videos'), filename)
-
- return jsonify({'error': '找不到视频文件'}), 404
- @app.route('/api/generate_visualization_video/<video_id>', methods=['GET'])
- def generate_visualization_video(video_id):
- """生成包含轨迹可视化的视频"""
- if video_id not in processing_tasks or processing_tasks[video_id]['status'] != 'completed':
- return jsonify({'error': '视频尚未处理完成'}), 400
-
- vis_dir = os.path.join(UPLOAD_FOLDER, 'visualizations', video_id)
- if not os.path.exists(vis_dir) or len(os.listdir(vis_dir)) == 0:
- return jsonify({'error': '找不到可视化结果'}), 404
-
- try:
- # 获取视频信息
- with open(os.path.join(UPLOAD_FOLDER, 'results', f"{video_id}_info.json"), 'r') as f:
- video_info = json.load(f)
-
- fps = video_info.get('fps', 30)
- width = video_info.get('width', 1280)
- height = video_info.get('height', 720)
-
- # 输出视频路径
- output_path = os.path.join(UPLOAD_FOLDER, 'videos', f"{video_id}_visualization.mp4")
-
- # 获取所有可视化帧文件
- frame_files = sorted([f for f in os.listdir(vis_dir) if f.startswith('vis_')])
-
- if not frame_files:
- return jsonify({'error': '没有找到可视化帧'}), 404
-
- # 读取第一帧获取尺寸
- first_frame = cv2.imread(os.path.join(vis_dir, frame_files[0]))
- h, w, _ = first_frame.shape
-
- # 创建视频写入器
- fourcc = cv2.VideoWriter_fourcc(*'mp4v')
- out = cv2.VideoWriter(output_path, fourcc, fps, (w, h))
-
- # 写入每一帧
- for frame_file in frame_files:
- frame = cv2.imread(os.path.join(vis_dir, frame_file))
- out.write(frame)
-
- out.release()
-
- return jsonify({
- 'success': True,
- 'video_url': f"/api/videos/{video_id}_visualization.mp4"
- })
-
- except Exception as e:
- return jsonify({'error': f'生成可视化视频失败: {str(e)}'}), 500
- @app.route('/api/videos/<filename>', methods=['GET'])
- def get_visualization_video(filename):
- return send_from_directory(os.path.join(UPLOAD_FOLDER, 'videos'), filename)
- if __name__ == '__main__':
- app.run(debug=True, host='0.0.0.0', port=5000)
|