decision_engine.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676
  1. """
  2. AceFlow v2.0 智能决策引擎
  3. 从规则引擎升级为机器学习驱动的智能系统
  4. """
  5. import json
  6. import yaml
  7. import logging
  8. from datetime import datetime
  9. from typing import Dict, List, Any, Optional, Tuple
  10. from pathlib import Path
  11. from dataclasses import dataclass
  12. from enum import Enum
  13. # 导入AI相关库(实际使用时需要安装)
  14. try:
  15. import numpy as np
  16. from sklearn.feature_extraction.text import TfidfVectorizer
  17. from sklearn.ensemble import RandomForestClassifier
  18. from sklearn.model_selection import train_test_split
  19. from sklearn.metrics import accuracy_score
  20. HAS_ML_LIBS = True
  21. except ImportError:
  22. # 如果没有安装ML库,使用简化的规则引擎
  23. HAS_ML_LIBS = False
  24. np = None
  25. class TaskType(Enum):
  26. """任务类型枚举"""
  27. FEATURE_DEVELOPMENT = "feature_development"
  28. BUG_FIX = "bug_fix"
  29. REFACTORING = "refactoring"
  30. TESTING = "testing"
  31. DOCUMENTATION = "documentation"
  32. RESEARCH = "research"
  33. ARCHITECTURE = "architecture"
  34. DEPLOYMENT = "deployment"
  35. class ProjectComplexity(Enum):
  36. """项目复杂度枚举"""
  37. SIMPLE = "simple"
  38. MODERATE = "moderate"
  39. COMPLEX = "complex"
  40. ENTERPRISE = "enterprise"
  41. @dataclass
  42. class ProjectContext:
  43. """项目上下文信息"""
  44. name: str
  45. team_size: int
  46. duration_estimate: str
  47. technology_stack: List[str]
  48. project_type: str
  49. current_stage: str
  50. completion_percentage: float
  51. historical_velocity: float
  52. risk_factors: List[str]
  53. def to_features(self) -> Dict[str, Any]:
  54. """转换为ML特征"""
  55. return {
  56. 'team_size': self.team_size,
  57. 'duration_days': self._parse_duration_to_days(self.duration_estimate),
  58. 'completion_percentage': self.completion_percentage,
  59. 'historical_velocity': self.historical_velocity,
  60. 'has_frontend': any('frontend' in tech.lower() or 'react' in tech.lower() or 'vue' in tech.lower()
  61. for tech in self.technology_stack),
  62. 'has_backend': any('backend' in tech.lower() or 'api' in tech.lower() or 'django' in tech.lower()
  63. for tech in self.technology_stack),
  64. 'has_database': any('database' in tech.lower() or 'sql' in tech.lower() or 'mongo' in tech.lower()
  65. for tech in self.technology_stack),
  66. 'risk_count': len(self.risk_factors),
  67. 'is_web_project': self.project_type.lower() in ['web', 'webapp', 'website'],
  68. 'is_mobile_project': self.project_type.lower() in ['mobile', 'app', 'ios', 'android'],
  69. 'is_api_project': self.project_type.lower() in ['api', 'microservice', 'backend']
  70. }
  71. def _parse_duration_to_days(self, duration: str) -> int:
  72. """解析持续时间为天数"""
  73. duration = duration.lower()
  74. if 'week' in duration:
  75. weeks = int(''.join(filter(str.isdigit, duration)) or 1)
  76. return weeks * 7
  77. elif 'month' in duration:
  78. months = int(''.join(filter(str.isdigit, duration)) or 1)
  79. return months * 30
  80. elif 'day' in duration:
  81. return int(''.join(filter(str.isdigit, duration)) or 1)
  82. return 7 # 默认1周
  83. @dataclass
  84. class TaskContext:
  85. """任务上下文信息"""
  86. description: str
  87. priority: str
  88. estimated_effort: str
  89. dependencies: List[str]
  90. technical_complexity: str
  91. user_impact: str
  92. def to_features(self) -> Dict[str, Any]:
  93. """转换为ML特征"""
  94. return {
  95. 'description_length': len(self.description),
  96. 'priority_high': self.priority.lower() == 'high',
  97. 'priority_medium': self.priority.lower() == 'medium',
  98. 'priority_low': self.priority.lower() == 'low',
  99. 'has_dependencies': len(self.dependencies) > 0,
  100. 'dependency_count': len(self.dependencies),
  101. 'complexity_high': self.technical_complexity.lower() == 'high',
  102. 'complexity_medium': self.technical_complexity.lower() == 'medium',
  103. 'complexity_low': self.technical_complexity.lower() == 'low',
  104. 'impact_high': self.user_impact.lower() == 'high',
  105. 'impact_medium': self.user_impact.lower() == 'medium',
  106. 'impact_low': self.user_impact.lower() == 'low',
  107. 'has_keywords_bug': 'bug' in self.description.lower() or 'fix' in self.description.lower(),
  108. 'has_keywords_feature': 'feature' in self.description.lower() or 'add' in self.description.lower(),
  109. 'has_keywords_test': 'test' in self.description.lower() or 'testing' in self.description.lower(),
  110. 'has_keywords_doc': 'doc' in self.description.lower() or 'documentation' in self.description.lower()
  111. }
  112. @dataclass
  113. class AIDecision:
  114. """AI决策结果"""
  115. recommended_flow: str
  116. confidence: float
  117. reasoning: str
  118. alternatives: List[Dict[str, Any]]
  119. estimated_duration: int
  120. suggested_tasks: List[str]
  121. risk_assessment: Dict[str, float]
  122. def to_dict(self) -> Dict[str, Any]:
  123. """转换为字典格式"""
  124. return {
  125. 'recommended_flow': self.recommended_flow,
  126. 'confidence': self.confidence,
  127. 'reasoning': self.reasoning,
  128. 'alternatives': self.alternatives,
  129. 'estimated_duration': self.estimated_duration,
  130. 'suggested_tasks': self.suggested_tasks,
  131. 'risk_assessment': self.risk_assessment,
  132. 'timestamp': datetime.now().isoformat()
  133. }
  134. class TaskClassificationModel:
  135. """任务分类模型"""
  136. def __init__(self):
  137. self.model = None
  138. self.vectorizer = None
  139. self.is_trained = False
  140. def train(self, training_data: List[Tuple[str, TaskType]]):
  141. """训练任务分类模型"""
  142. if not HAS_ML_LIBS:
  143. logging.warning("ML libraries not available, using rule-based classification")
  144. return
  145. descriptions, labels = zip(*training_data)
  146. # 文本向量化
  147. self.vectorizer = TfidfVectorizer(max_features=1000, stop_words='english')
  148. X = self.vectorizer.fit_transform(descriptions)
  149. # 训练分类器
  150. self.model = RandomForestClassifier(n_estimators=100, random_state=42)
  151. self.model.fit(X, labels)
  152. self.is_trained = True
  153. logging.info(f"Task classification model trained with {len(training_data)} samples")
  154. def predict(self, task_context: TaskContext) -> TaskType:
  155. """预测任务类型"""
  156. if not self.is_trained or not HAS_ML_LIBS:
  157. return self._rule_based_classification(task_context)
  158. # 使用ML模型预测
  159. X = self.vectorizer.transform([task_context.description])
  160. prediction = self.model.predict(X)[0]
  161. return TaskType(prediction)
  162. def _rule_based_classification(self, task_context: TaskContext) -> TaskType:
  163. """基于规则的任务分类(备用方案)"""
  164. description = task_context.description.lower()
  165. # 关键词匹配规则
  166. if any(word in description for word in ['bug', 'fix', 'error', 'issue']):
  167. return TaskType.BUG_FIX
  168. elif any(word in description for word in ['test', 'testing', 'unit test', 'integration']):
  169. return TaskType.TESTING
  170. elif any(word in description for word in ['doc', 'documentation', 'readme', 'guide']):
  171. return TaskType.DOCUMENTATION
  172. elif any(word in description for word in ['refactor', 'refactoring', 'cleanup', 'optimize']):
  173. return TaskType.REFACTORING
  174. elif any(word in description for word in ['research', 'investigate', 'spike', 'poc']):
  175. return TaskType.RESEARCH
  176. elif any(word in description for word in ['architecture', 'design', 'framework']):
  177. return TaskType.ARCHITECTURE
  178. elif any(word in description for word in ['deploy', 'deployment', 'release', 'publish']):
  179. return TaskType.DEPLOYMENT
  180. else:
  181. return TaskType.FEATURE_DEVELOPMENT
  182. class FlowRecommendationModel:
  183. """流程推荐模型"""
  184. def __init__(self):
  185. self.model = None
  186. self.is_trained = False
  187. def train(self, training_data: List[Tuple[Dict[str, Any], str]]):
  188. """训练流程推荐模型"""
  189. if not HAS_ML_LIBS:
  190. logging.warning("ML libraries not available, using rule-based recommendation")
  191. return
  192. # 准备训练数据
  193. features = []
  194. labels = []
  195. for feature_dict, flow_mode in training_data:
  196. feature_vector = list(feature_dict.values())
  197. features.append(feature_vector)
  198. labels.append(flow_mode)
  199. X = np.array(features)
  200. y = np.array(labels)
  201. # 训练模型
  202. self.model = RandomForestClassifier(n_estimators=100, random_state=42)
  203. self.model.fit(X, y)
  204. self.is_trained = True
  205. logging.info(f"Flow recommendation model trained with {len(training_data)} samples")
  206. def suggest(self, task_type: TaskType, project_context: ProjectContext) -> str:
  207. """推荐工作流模式"""
  208. if not self.is_trained or not HAS_ML_LIBS:
  209. return self._rule_based_recommendation(task_type, project_context)
  210. # 使用ML模型推荐
  211. features = list(project_context.to_features().values())
  212. prediction = self.model.predict([features])[0]
  213. return prediction
  214. def _rule_based_recommendation(self, task_type: TaskType, project_context: ProjectContext) -> str:
  215. """基于规则的流程推荐(备用方案)"""
  216. # 简单项目或快速修复
  217. if (task_type == TaskType.BUG_FIX or
  218. project_context.team_size <= 3 or
  219. 'quick' in project_context.duration_estimate.lower()):
  220. return 'minimal'
  221. # 企业级或复杂项目
  222. if (project_context.team_size > 10 or
  223. task_type == TaskType.ARCHITECTURE or
  224. len(project_context.risk_factors) > 3):
  225. return 'complete'
  226. # 默认标准模式
  227. return 'standard'
  228. class ProgressPredictionModel:
  229. """进度预测模型"""
  230. def __init__(self):
  231. self.model = None
  232. self.is_trained = False
  233. def train(self, training_data: List[Tuple[Dict[str, Any], int]]):
  234. """训练进度预测模型"""
  235. if not HAS_ML_LIBS:
  236. logging.warning("ML libraries not available, using rule-based prediction")
  237. return
  238. features = []
  239. durations = []
  240. for feature_dict, actual_duration in training_data:
  241. feature_vector = list(feature_dict.values())
  242. features.append(feature_vector)
  243. durations.append(actual_duration)
  244. X = np.array(features)
  245. y = np.array(durations)
  246. # 使用回归模型预测持续时间
  247. from sklearn.ensemble import RandomForestRegressor
  248. self.model = RandomForestRegressor(n_estimators=100, random_state=42)
  249. self.model.fit(X, y)
  250. self.is_trained = True
  251. logging.info(f"Progress prediction model trained with {len(training_data)} samples")
  252. def estimate(self, task_type: TaskType, project_context: ProjectContext) -> int:
  253. """估算任务持续时间(小时)"""
  254. if not self.is_trained or not HAS_ML_LIBS:
  255. return self._rule_based_estimation(task_type, project_context)
  256. # 使用ML模型预测
  257. features = list(project_context.to_features().values())
  258. prediction = self.model.predict([features])[0]
  259. return max(1, int(prediction)) # 至少1小时
  260. def _rule_based_estimation(self, task_type: TaskType, project_context: ProjectContext) -> int:
  261. """基于规则的时间估算(备用方案)"""
  262. base_hours = {
  263. TaskType.BUG_FIX: 4,
  264. TaskType.FEATURE_DEVELOPMENT: 16,
  265. TaskType.REFACTORING: 8,
  266. TaskType.TESTING: 6,
  267. TaskType.DOCUMENTATION: 4,
  268. TaskType.RESEARCH: 12,
  269. TaskType.ARCHITECTURE: 24,
  270. TaskType.DEPLOYMENT: 8
  271. }
  272. hours = base_hours.get(task_type, 8)
  273. # 根据团队规模调整
  274. if project_context.team_size > 5:
  275. hours *= 1.2 # 大团队协调成本
  276. elif project_context.team_size == 1:
  277. hours *= 0.8 # 单人项目效率高
  278. # 根据复杂度调整
  279. if len(project_context.risk_factors) > 2:
  280. hours *= 1.5 # 高风险项目
  281. return int(hours)
  282. class AIDecisionEngine:
  283. """AI决策引擎主类"""
  284. def __init__(self, aceflow_dir: Path):
  285. self.aceflow_dir = aceflow_dir
  286. self.ai_dir = aceflow_dir / "ai"
  287. self.models_dir = self.ai_dir / "models"
  288. self.data_dir = self.ai_dir / "data"
  289. # 初始化模型
  290. self.task_classifier = TaskClassificationModel()
  291. self.flow_recommender = FlowRecommendationModel()
  292. self.progress_predictor = ProgressPredictionModel()
  293. # 创建目录
  294. self.models_dir.mkdir(parents=True, exist_ok=True)
  295. self.data_dir.mkdir(parents=True, exist_ok=True)
  296. # 设置日志
  297. self._setup_logging()
  298. # 加载预训练模型
  299. self._load_models()
  300. def _setup_logging(self):
  301. """设置日志系统"""
  302. log_file = self.ai_dir / "ai_engine.log"
  303. logging.basicConfig(
  304. level=logging.INFO,
  305. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
  306. handlers=[
  307. logging.FileHandler(log_file),
  308. logging.StreamHandler()
  309. ]
  310. )
  311. self.logger = logging.getLogger(__name__)
  312. def _load_models(self):
  313. """加载预训练模型"""
  314. try:
  315. # 尝试加载训练好的模型
  316. # 这里可以从文件加载保存的模型
  317. self.logger.info("Models loaded successfully")
  318. except Exception as e:
  319. self.logger.warning(f"Could not load pre-trained models: {e}")
  320. # 使用默认的训练数据训练模型
  321. self._initialize_with_default_data()
  322. def _initialize_with_default_data(self):
  323. """使用默认数据初始化模型"""
  324. # 默认任务分类训练数据
  325. task_training_data = [
  326. ("Fix login bug", TaskType.BUG_FIX),
  327. ("Add user authentication", TaskType.FEATURE_DEVELOPMENT),
  328. ("Refactor database layer", TaskType.REFACTORING),
  329. ("Write unit tests for API", TaskType.TESTING),
  330. ("Update API documentation", TaskType.DOCUMENTATION),
  331. ("Research new framework", TaskType.RESEARCH),
  332. ("Design system architecture", TaskType.ARCHITECTURE),
  333. ("Deploy to production", TaskType.DEPLOYMENT)
  334. ]
  335. # 训练任务分类模型
  336. self.task_classifier.train(task_training_data)
  337. self.logger.info("Models initialized with default training data")
  338. def make_decision(self,
  339. task_context: TaskContext,
  340. project_context: ProjectContext) -> AIDecision:
  341. """做出AI决策"""
  342. try:
  343. # 1. 任务分类
  344. task_type = self.task_classifier.predict(task_context)
  345. # 2. 流程推荐
  346. recommended_flow = self.flow_recommender.suggest(task_type, project_context)
  347. # 3. 时间估算
  348. estimated_duration = self.progress_predictor.estimate(task_type, project_context)
  349. # 4. 计算置信度
  350. confidence = self._calculate_confidence(task_context, project_context)
  351. # 5. 生成推理解释
  352. reasoning = self._explain_decision(task_type, recommended_flow, project_context)
  353. # 6. 提供替代方案
  354. alternatives = self._suggest_alternatives(recommended_flow, task_type)
  355. # 7. 生成任务建议
  356. suggested_tasks = self._generate_task_suggestions(task_type, recommended_flow)
  357. # 8. 风险评估
  358. risk_assessment = self._assess_risks(task_context, project_context)
  359. decision = AIDecision(
  360. recommended_flow=recommended_flow,
  361. confidence=confidence,
  362. reasoning=reasoning,
  363. alternatives=alternatives,
  364. estimated_duration=estimated_duration,
  365. suggested_tasks=suggested_tasks,
  366. risk_assessment=risk_assessment
  367. )
  368. # 记录决策
  369. self._log_decision(decision, task_context, project_context)
  370. return decision
  371. except Exception as e:
  372. self.logger.error(f"Error in AI decision making: {e}")
  373. # 返回默认决策
  374. return self._get_default_decision()
  375. def _calculate_confidence(self, task_context: TaskContext, project_context: ProjectContext) -> float:
  376. """计算决策置信度"""
  377. confidence = 0.7 # 基础置信度
  378. # 根据项目信息完整性调整
  379. if project_context.historical_velocity > 0:
  380. confidence += 0.1
  381. if len(project_context.technology_stack) > 0:
  382. confidence += 0.1
  383. if task_context.priority in ['high', 'medium', 'low']:
  384. confidence += 0.05
  385. # 根据描述详细程度调整
  386. if len(task_context.description) > 50:
  387. confidence += 0.05
  388. return min(0.95, confidence) # 最高95%置信度
  389. def _explain_decision(self, task_type: TaskType, recommended_flow: str, project_context: ProjectContext) -> str:
  390. """生成决策推理解释"""
  391. explanations = {
  392. 'minimal': f"推荐轻量级流程,因为{self._get_minimal_reason(task_type, project_context)}",
  393. 'standard': f"推荐标准流程,因为{self._get_standard_reason(task_type, project_context)}",
  394. 'complete': f"推荐完整流程,因为{self._get_complete_reason(task_type, project_context)}"
  395. }
  396. return explanations.get(recommended_flow, "基于项目特性推荐此流程模式")
  397. def _get_minimal_reason(self, task_type: TaskType, project_context: ProjectContext) -> str:
  398. """获取轻量级流程推荐原因"""
  399. reasons = []
  400. if task_type == TaskType.BUG_FIX:
  401. reasons.append("这是一个紧急修复任务")
  402. if project_context.team_size <= 3:
  403. reasons.append("团队规模较小")
  404. if 'week' in project_context.duration_estimate and '1' in project_context.duration_estimate:
  405. reasons.append("项目周期较短")
  406. return ",".join(reasons) if reasons else "项目适合快速迭代"
  407. def _get_standard_reason(self, task_type: TaskType, project_context: ProjectContext) -> str:
  408. """获取标准流程推荐原因"""
  409. reasons = []
  410. if task_type == TaskType.FEATURE_DEVELOPMENT:
  411. reasons.append("这是功能开发任务")
  412. if 3 < project_context.team_size <= 10:
  413. reasons.append("团队规模适中")
  414. if 'month' in project_context.duration_estimate:
  415. reasons.append("项目需要适度的质量控制")
  416. return ",".join(reasons) if reasons else "项目需要平衡效率和质量"
  417. def _get_complete_reason(self, task_type: TaskType, project_context: ProjectContext) -> str:
  418. """获取完整流程推荐原因"""
  419. reasons = []
  420. if project_context.team_size > 10:
  421. reasons.append("大型团队需要严格协调")
  422. if len(project_context.risk_factors) > 3:
  423. reasons.append("项目风险较高")
  424. if task_type == TaskType.ARCHITECTURE:
  425. reasons.append("架构设计需要严格流程")
  426. return ",".join(reasons) if reasons else "项目需要严格的质量控制"
  427. def _suggest_alternatives(self, recommended_flow: str, task_type: TaskType) -> List[Dict[str, Any]]:
  428. """提供替代流程方案"""
  429. all_flows = ['minimal', 'standard', 'complete']
  430. alternatives = []
  431. for flow in all_flows:
  432. if flow != recommended_flow:
  433. alternatives.append({
  434. 'flow': flow,
  435. 'reason': self._get_alternative_reason(flow, task_type),
  436. 'confidence': 0.3 if flow == 'complete' else 0.5
  437. })
  438. return alternatives[:2] # 最多2个替代方案
  439. def _get_alternative_reason(self, flow: str, task_type: TaskType) -> str:
  440. """获取替代方案推荐原因"""
  441. reasons = {
  442. 'minimal': "如果需要快速交付,可考虑轻量级流程",
  443. 'standard': "如果需要平衡效率和质量,可考虑标准流程",
  444. 'complete': "如果质量要求极高,可考虑完整流程"
  445. }
  446. return reasons.get(flow, "可根据具体情况选择")
  447. def _generate_task_suggestions(self, task_type: TaskType, recommended_flow: str) -> List[str]:
  448. """生成任务建议"""
  449. base_suggestions = {
  450. TaskType.FEATURE_DEVELOPMENT: [
  451. "明确功能需求和验收标准",
  452. "设计用户界面和交互流程",
  453. "编写核心功能代码",
  454. "进行功能测试和用户验证"
  455. ],
  456. TaskType.BUG_FIX: [
  457. "重现和分析问题",
  458. "定位问题根本原因",
  459. "实施修复方案",
  460. "验证修复效果"
  461. ],
  462. TaskType.REFACTORING: [
  463. "分析当前代码结构",
  464. "制定重构计划",
  465. "逐步重构实施",
  466. "回归测试验证"
  467. ]
  468. }
  469. suggestions = base_suggestions.get(task_type, [
  470. "分析任务需求",
  471. "制定实施计划",
  472. "执行具体工作",
  473. "验证完成效果"
  474. ])
  475. # 根据流程模式调整建议数量
  476. if recommended_flow == 'minimal':
  477. return suggestions[:3]
  478. elif recommended_flow == 'standard':
  479. return suggestions
  480. else: # complete
  481. return suggestions + ["详细文档记录", "团队评审确认"]
  482. def _assess_risks(self, task_context: TaskContext, project_context: ProjectContext) -> Dict[str, float]:
  483. """评估项目风险"""
  484. risks = {
  485. 'schedule_risk': 0.3, # 进度风险
  486. 'technical_risk': 0.2, # 技术风险
  487. 'quality_risk': 0.2, # 质量风险
  488. 'resource_risk': 0.1 # 资源风险
  489. }
  490. # 根据项目特征调整风险
  491. if task_context.technical_complexity == 'high':
  492. risks['technical_risk'] += 0.3
  493. if len(task_context.dependencies) > 2:
  494. risks['schedule_risk'] += 0.2
  495. if project_context.team_size < 2:
  496. risks['resource_risk'] += 0.3
  497. if len(project_context.risk_factors) > 2:
  498. for risk_type in risks:
  499. risks[risk_type] += 0.1
  500. # 确保风险值在0-1范围内
  501. return {k: min(1.0, v) for k, v in risks.items()}
  502. def _log_decision(self, decision: AIDecision, task_context: TaskContext, project_context: ProjectContext):
  503. """记录AI决策用于后续学习"""
  504. log_entry = {
  505. 'timestamp': datetime.now().isoformat(),
  506. 'task_context': {
  507. 'description': task_context.description,
  508. 'priority': task_context.priority,
  509. 'complexity': task_context.technical_complexity
  510. },
  511. 'project_context': {
  512. 'team_size': project_context.team_size,
  513. 'duration_estimate': project_context.duration_estimate,
  514. 'current_stage': project_context.current_stage
  515. },
  516. 'decision': decision.to_dict()
  517. }
  518. # 保存到决策日志文件
  519. log_file = self.data_dir / "decision_log.jsonl"
  520. with open(log_file, 'a', encoding='utf-8') as f:
  521. f.write(json.dumps(log_entry, ensure_ascii=False) + '\n')
  522. def _get_default_decision(self) -> AIDecision:
  523. """获取默认决策(错误情况下的备用方案)"""
  524. return AIDecision(
  525. recommended_flow='standard',
  526. confidence=0.5,
  527. reasoning='使用默认标准流程',
  528. alternatives=[],
  529. estimated_duration=16,
  530. suggested_tasks=['分析需求', '实施开发', '测试验证'],
  531. risk_assessment={'schedule_risk': 0.3, 'technical_risk': 0.3, 'quality_risk': 0.2, 'resource_risk': 0.2}
  532. )
  533. def get_personalized_suggestions(self, project_context: ProjectContext) -> List[str]:
  534. """获取个性化建议"""
  535. suggestions = []
  536. # 基于历史表现的建议
  537. if project_context.historical_velocity < 0.8:
  538. suggestions.append("🚀 建议优化团队效率,当前进度略低于预期")
  539. if project_context.completion_percentage > 0.8:
  540. suggestions.append("🎯 项目即将完成,建议重点关注质量验证")
  541. # 基于团队规模的建议
  542. if project_context.team_size == 1:
  543. suggestions.append("👤 单人项目建议使用轻量级流程提高效率")
  544. elif project_context.team_size > 10:
  545. suggestions.append("👥 大型团队建议加强沟通协调机制")
  546. # 基于技术栈的建议
  547. if any('new' in tech.lower() or 'experimental' in tech.lower() for tech in project_context.technology_stack):
  548. suggestions.append("🔬 使用新技术建议增加研究和测试时间")
  549. return suggestions[:3] # 最多返回3条建议
  550. # 全局AI引擎实例
  551. _ai_engine_instance = None
  552. def get_ai_engine(aceflow_dir: Path = None) -> AIDecisionEngine:
  553. """获取AI引擎单例"""
  554. global _ai_engine_instance
  555. if _ai_engine_instance is None:
  556. if aceflow_dir is None:
  557. aceflow_dir = Path.cwd() / ".aceflow"
  558. _ai_engine_instance = AIDecisionEngine(aceflow_dir)
  559. return _ai_engine_instance