analyze.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. #!/usr/bin/env python3
  2. # .aceflow/scripts/analyze.py
  3. """
  4. AceFlow 执行分析器
  5. 生成迭代分析报告。
  6. """
  7. import json
  8. from datetime import datetime
  9. from pathlib import Path
  10. import argparse
  11. import re
  12. class IterationAnalyzer:
  13. """分析指定迭代的所有产出物"""
  14. def __init__(self, iteration_id: str):
  15. self.iteration_id = iteration_id
  16. self.base_path = Path("aceflow_result") / iteration_id
  17. self.state_file = Path(".aceflow/state.json")
  18. self.state = self._load_state()
  19. def _load_state(self) -> dict:
  20. """加载状态文件"""
  21. if self.state_file.exists():
  22. return json.loads(self.state_file.read_text(encoding='utf-8'))
  23. return {}
  24. def analyze_completeness(self) -> dict:
  25. """分析各阶段完成情况"""
  26. stage_dirs = [d.name for d in self.base_path.iterdir() if d.is_dir()]
  27. return {
  28. "completed_stages": self.state.get("completed_stages", []),
  29. "total_stages": len(stage_dirs),
  30. "completion_rate": len(self.state.get("completed_stages", [])) / len(stage_dirs) * 100 if stage_dirs else 0
  31. }
  32. def analyze_test_results(self) -> dict:
  33. """分析S5测试报告"""
  34. test_dir = self.base_path / "S5_test_report"
  35. stats = {"total_files": 0, "pass_rate_avg": 0, "coverage_avg": 0, "failed_count": 0}
  36. rates, coverages = [], []
  37. if not test_dir.exists(): return stats
  38. files = list(test_dir.glob("s5_test_*.md"))
  39. stats["total_files"] = len(files)
  40. for file in files:
  41. content = file.read_text(encoding='utf-8')
  42. pass_rate_match = re.search(r"通过率:\s*([\d\.]+)%", content)
  43. coverage_match = re.search(r"覆盖率:\s*([\d\.]+)%", content)
  44. stats["failed_count"] += content.count("❌")
  45. if pass_rate_match: rates.append(float(pass_rate_match.group(1)))
  46. if coverage_match: coverages.append(float(coverage_match.group(1)))
  47. if rates: stats["pass_rate_avg"] = sum(rates) / len(rates)
  48. if coverages: stats["coverage_avg"] = sum(coverages) / len(coverages)
  49. return stats
  50. def analyze_code_review(self) -> dict:
  51. """分析S6代码评审报告"""
  52. review_file = self.base_path / "S6_codereview" / "s6_codereview.md"
  53. stats = {"critical": 0, "major": 0, "minor": 0}
  54. if not review_file.exists(): return stats
  55. content = review_file.read_text(encoding='utf-8')
  56. stats["critical"] = content.count("🔴")
  57. stats["major"] = content.count("🟡")
  58. stats["minor"] = content.count("🔵")
  59. return stats
  60. def generate_report(self) -> str:
  61. """生成Markdown格式的分析报告"""
  62. completeness = self.analyze_completeness()
  63. test_stats = self.analyze_test_results()
  64. review_stats = self.analyze_code_review()
  65. metrics = self.state.get("metrics", {})
  66. report = f"""
  67. # AceFlow 迭代分析报告: {self.iteration_id}
  68. **分析时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
  69. ## 1. 整体进度
  70. - **完成度**: {completeness['completion_rate']:.1f}% ({len(completeness['completed_stages'])}/{completeness['total_stages']} 阶段)
  71. - **已完成阶段**: {', '.join(completeness['completed_stages'])}
  72. - **总任务数**: {metrics.get('total_tasks', 'N/A')}
  73. - **已完成任务**: {metrics.get('completed_tasks', 'N/A')}
  74. ## 2. 质量分析
  75. ### 测试概览
  76. - **平均通过率**: {test_stats['pass_rate_avg']:.1f}%
  77. - **平均覆盖率**: {test_stats['coverage_avg']:.1f}%
  78. - **总失败用例数**: {test_stats['failed_count']}
  79. ### 代码评审问题
  80. - **🔴 严重问题**: {review_stats['critical']}
  81. - **🟡 一般问题**: {review_stats['major']}
  82. - **🔵 优化建议**: {review_stats['minor']}
  83. ## 3. 智能建议
  84. """
  85. suggestions = []
  86. if completeness['completion_rate'] < 100: suggestions.append("- ⚠️ 流程未完成,请检查卡点阶段。")
  87. if test_stats['pass_rate_avg'] < 90: suggestions.append("- 📉 测试通过率偏低,建议加强单元测试和代码审查。")
  88. if review_stats['critical'] > 0: suggestions.append("- 🚨 存在严重代码问题,需要优先修复。")
  89. if not suggestions: suggestions.append("- ✅ 整体表现良好,可总结经验并归档。")
  90. report += "\n".join(suggestions)
  91. return report.strip()
  92. def save_report(self, report: str):
  93. """保存报告到S8目录"""
  94. report_path = self.base_path / "S8_summary" / "analysis_report.md"
  95. report_path.parent.mkdir(parents=True, exist_ok=True)
  96. report_path.write_text(report, encoding='utf-8')
  97. print(f"✅ 分析报告已保存至: {report_path}")
  98. def main():
  99. """命令行接口"""
  100. parser = argparse.ArgumentParser(description='AceFlow 迭代分析器')
  101. parser.add_argument('--iteration', '-i', required=True, help='要分析的迭代ID')
  102. parser.add_argument('--save', '-s', action='store_true', help='保存报告到文件')
  103. args = parser.parse_args()
  104. analyzer = IterationAnalyzer(args.iteration)
  105. report = analyzer.generate_report()
  106. print(report)
  107. if args.save:
  108. analyzer.save_report(report)
  109. if __name__ == "__main__":
  110. main()