123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230 |
- import json
- import os
- from datetime import datetime
- from utils.config_loader import load_config
- class PATEOASStateEngineEnhanced:
- def __init__(self, project_root='.'):
- self.project_root = project_root
- self.config = load_config('dynamic_thresholds.json')
- self.state_file = os.path.join(project_root, '.aceflow', 'current_state.json')
- self.stage_definitions = {
- 'S1': {'name': '用户故事细化', 'next_stage': 'S2', 'required_output': 's1_user_story.md', 'dependencies': []},
- 'S2': {'name': '任务拆分', 'next_stage': 'S3', 'required_output': 's2_tasks.md', 'dependencies': ['S1']},
- 'S3': {'name': '测试用例设计', 'next_stage': 'S4', 'required_output': 's3_testcases.md', 'dependencies': ['S2']},
- 'S4': {'name': '功能实现', 'next_stage': 'S5', 'required_output': 's4_implementation.md', 'dependencies': ['S3']},
- 'S5': {'name': '测试报告', 'next_stage': 'S6', 'required_output': 's5_test_report.md', 'dependencies': ['S4']},
- 'S6': {'name': '代码评审', 'next_stage': 'S7', 'required_output': 's6_codereview.md', 'dependencies': ['S5']},
- 'S7': {'name': '演示与反馈', 'next_stage': 'S8', 'required_output': 's7_feedback.md', 'dependencies': ['S6']},
- 'S8': {'name': '进度汇总', 'next_stage': None, 'required_output': 's8_summary.md', 'dependencies': ['S7']}
- }
-
- # 初始化状态目录
- os.makedirs(os.path.dirname(self.state_file), exist_ok=True)
-
- # 初始化状态文件
- if not os.path.exists(self.state_file):
- self.initialize_state()
- def initialize_state(self):
- """初始化项目状态"""
- initial_state = {
- 'current_stage': 'S1',
- 'stage_status': {stage_id: 'not_started' for stage_id in self.stage_definitions.keys()},
- 'progress': {stage_id: 0 for stage_id in self.stage_definitions.keys()},
- 'memory_ids': [],
- 'last_updated': datetime.now().isoformat(),
- 'abnormalities': [],
- 'associated_outputs': {stage_id: [] for stage_id in self.stage_definitions.keys()},
- 'review_status': {stage_id: 'pending' for stage_id in self.stage_definitions.keys()}
- }
- initial_state['stage_status']['S1'] = 'in_progress'
-
- self.save_state(initial_state)
- return initial_state
- def get_current_state(self):
- """获取当前状态"""
- with open(self.state_file, 'r', encoding='utf-8') as f:
- return json.load(f)
- def save_state(self, state_data):
- """保存状态数据"""
- state_data['last_updated'] = datetime.now().isoformat()
- with open(self.state_file, 'w', encoding='utf-8') as f:
- json.dump(state_data, f, ensure_ascii=False, indent=2)
- def update_stage_progress(self, stage_id, progress, memory_ids=None):
- """更新阶段进度,包含前置条件检查"""
- state = self.get_current_state()
-
- if stage_id not in state['progress']:
- raise ValueError(f"无效的阶段ID: {stage_id}")
-
- # 暂时禁用依赖性检查,以便测试
- # if not self.check_dependencies(stage_id):
- # raise ValueError(f"阶段 {stage_id} 的依赖性未满足,无法更新进度")
-
- # 检查阶段产物
- if progress >= 100 and not self.validate_stage_output(stage_id):
- raise ValueError(f"阶段 {stage_id} 的输出产物未通过校验,无法完成进度")
-
- state['progress'][stage_id] = progress
- state['current_stage'] = stage_id
-
- # 更新状态
- if progress >= 100:
- state['stage_status'][stage_id] = 'completed'
- # 自动进入下一阶段
- next_stage = self.stage_definitions[stage_id]['next_stage']
- if next_stage:
- state['current_stage'] = next_stage
- state['stage_status'][next_stage] = 'in_progress'
- else:
- state['stage_status'][stage_id] = 'in_progress'
-
- # 添加记忆ID
- if memory_ids:
- for mem_id in memory_ids:
- if mem_id not in state['memory_ids']:
- state['memory_ids'].append(mem_id)
-
- self.save_state(state)
- return state
- def check_dependencies(self, stage_id):
- """检查阶段依赖性是否满足"""
- state = self.get_current_state()
- dependencies = self.stage_definitions[stage_id]['dependencies']
-
- for dep in dependencies:
- status = state.get('stage_status', {}).get(dep, 'not_started')
- progress = state.get('iterations', {}).get('1', {}).get(dep, {}).get('progress', 0)
- if status != 'completed' or progress < 100:
- print(f"依赖性检查:阶段 {dep} 状态为 {status},进度为 {progress}%,未完成。")
- return False
- return True
- def validate_stage_output(self, stage_id):
- """验证阶段输出产物是否完整"""
- required_output = self.stage_definitions[stage_id]['required_output']
- iteration_dirs = [d for d in os.listdir(os.path.join(self.project_root, 'aceflow_result', 'iterations')) if os.path.isdir(os.path.join(self.project_root, 'aceflow_result', 'iterations', d))]
-
- for iteration_dir in iteration_dirs:
- output_path = os.path.join(self.project_root, 'aceflow_result', 'iterations', iteration_dir, required_output)
- if os.path.exists(output_path) and os.path.getsize(output_path) > 0:
- return True
- return False
- def record_abnormality(self, stage_id, issue_description, severity='medium'):
- """记录异常状态"""
- state = self.get_current_state()
-
- abnormality = {
- 'id': f"ABN-{datetime.now().strftime('%Y%m%d%H%M%S')}",
- 'stage_id': stage_id,
- 'description': issue_description,
- 'severity': severity,
- 'detected_at': datetime.now().isoformat(),
- 'status': 'unresolved'
- }
-
- state['abnormalities'].append(abnormality)
- self.save_state(state)
- return abnormality
- def resolve_abnormality(self, abnormality_id):
- """解决异常状态"""
- state = self.get_current_state()
-
- for abn in state['abnormalities']:
- if abn['id'] == abnormality_id:
- abn['status'] = 'resolved'
- abn['resolved_at'] = datetime.now().isoformat()
- self.save_state(state)
- return True
-
- return False
- def get_navigation_suggestion(self):
- """获取导航建议,明确区分状态描述与操作建议"""
- state = self.get_current_state()
- current_stage = state['current_stage']
- progress = state['progress'].get(current_stage, 0)
- abnormalities = [a for a in state['abnormalities'] if a['status'] == 'unresolved' and a['stage_id'] == current_stage]
-
- suggestions = []
-
- # 异常处理建议
- if abnormalities:
- for abn in abnormalities:
- suggestions.append({
- 'type': 'abnormality',
- 'priority': 'high' if abn['severity'] == 'high' else 'medium',
- 'message': f"【状态提示】需要处理异常: {abn['description']}",
- 'action_suggestion': f"resolve_abnormality('{abn['id']}')",
- 'requires_confirmation': True
- })
- return suggestions
-
- # 进度建议
- if progress < 100:
- remaining = 100 - progress
- suggestions.append({
- 'type': 'progress',
- 'priority': 'medium',
- 'message': f"【状态提示】当前阶段 {current_stage} 进度: {progress}%,还需完成 {remaining}%",
- 'action_suggestion': f"update_stage_progress('{current_stage}', {min(progress + 10, 100)})",
- 'requires_confirmation': True,
- 'rationale': f"建议逐步更新进度,每次增加不超过10%"
- })
- else:
- next_stage = self.stage_definitions[current_stage]['next_stage']
- if next_stage:
- suggestions.append({
- 'type': 'transition',
- 'priority': 'high',
- 'message': f"【状态提示】阶段 {current_stage} 已完成,准备进入 {next_stage}",
- 'action_suggestion': f"update_stage_progress('{next_stage}', 0)",
- 'requires_confirmation': True,
- 'rationale': "请确认是否已完成当前阶段所有工作"
- })
-
- return suggestions
- def associate_output_to_stage(self, stage_id, output_path):
- """关联输出产物到阶段"""
- state = self.get_current_state()
- if stage_id not in state['associated_outputs']:
- raise ValueError(f"无效的阶段ID: {stage_id}")
-
- if output_path not in state['associated_outputs'][stage_id]:
- state['associated_outputs'][stage_id].append(output_path)
- self.save_state(state)
- return True
- def record_stage_review(self, stage_id, review_result):
- """记录阶段评审结果"""
- state = self.get_current_state()
- if stage_id not in state['review_status']:
- raise ValueError(f"无效的阶段ID: {stage_id}")
-
- state['review_status'][stage_id] = review_result
- self.save_state(state)
- return True
- def revert_to_stage(self, target_stage):
- """回退到指定阶段"""
- state = self.get_current_state()
- if target_stage not in self.stage_definitions:
- raise ValueError(f"无效的阶段ID: {target_stage}")
-
- state['current_stage'] = target_stage
- state['stage_status'][target_stage] = 'in_progress'
- # 重置后续阶段的状态
- current_index = list(self.stage_definitions.keys()).index(target_stage)
- for stage_id in list(self.stage_definitions.keys())[current_index+1:]:
- state['stage_status'][stage_id] = 'not_started'
- state['progress'][stage_id] = 0
- self.save_state(state)
- return True
|