memory_pool.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. import os
  2. import json
  3. import hashlib
  4. from datetime import datetime, timedelta
  5. from utils.config_loader import load_config
  6. class GlobalMemoryPool:
  7. def __init__(self):
  8. self.config = load_config('workflow_rules.json')
  9. self.storage_path = self.config.get('memory_pool_config', {}).get('storage_path', './.aceflow/memory_pool')
  10. self.memory_types = {
  11. 'REQ': '需求记忆',
  12. 'CON': '约束记忆',
  13. 'TASK': '任务记忆',
  14. 'CODE': '代码记忆',
  15. 'TEST': '测试记忆',
  16. 'DEFECT': '缺陷记忆',
  17. 'FDBK': '反馈记忆'
  18. }
  19. # 创建存储目录
  20. os.makedirs(self.storage_path, exist_ok=True)
  21. for mem_type in self.memory_types.keys():
  22. os.makedirs(os.path.join(self.storage_path, mem_type), exist_ok=True)
  23. def generate_memory_id(self, mem_type, content):
  24. """生成唯一记忆ID"""
  25. timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
  26. hash_obj = hashlib.md5(content.encode('utf-8'))
  27. short_hash = hash_obj.hexdigest()[:6]
  28. return f"MEM-{mem_type}-{timestamp}-{short_hash}"
  29. def store_memory(self, mem_type, content, metadata=None):
  30. """存储记忆片段"""
  31. if mem_type not in self.memory_types:
  32. raise ValueError(f"不支持的记忆类型: {mem_type}")
  33. memory_id = self.generate_memory_id(mem_type, content)
  34. memory_data = {
  35. 'id': memory_id,
  36. 'type': mem_type,
  37. 'description': self.memory_types[mem_type],
  38. 'content': content,
  39. 'metadata': metadata or {},
  40. 'created_at': datetime.now().isoformat(),
  41. 'expires_at': (datetime.now() + timedelta(days=7)).isoformat() if mem_type != 'REQ' else None
  42. }
  43. # 保存到文件
  44. file_path = os.path.join(self.storage_path, mem_type, f"{memory_id}.json")
  45. with open(file_path, 'w', encoding='utf-8') as f:
  46. json.dump(memory_data, f, ensure_ascii=False, indent=2)
  47. return memory_id
  48. def retrieve_memory(self, memory_id=None, mem_type=None, keywords=None):
  49. """检索记忆片段"""
  50. results = []
  51. # 确定要搜索的目录
  52. search_dirs = [os.path.join(self.storage_path, t) for t in self.memory_types.keys()] if not mem_type else [os.path.join(self.storage_path, mem_type)]
  53. # 搜索文件
  54. for search_dir in search_dirs:
  55. if not os.path.exists(search_dir):
  56. continue
  57. for filename in os.listdir(search_dir):
  58. if filename.endswith('.json'):
  59. file_path = os.path.join(search_dir, filename)
  60. with open(file_path, 'r', encoding='utf-8') as f:
  61. memory = json.load(f)
  62. # 检查过期
  63. if memory['expires_at'] and datetime.fromisoformat(memory['expires_at']) < datetime.now():
  64. continue
  65. # 筛选条件
  66. if memory_id and memory['id'] != memory_id:
  67. continue
  68. if keywords and not any(keyword in memory['content'] for keyword in keywords):
  69. continue
  70. results.append(memory)
  71. return results
  72. def link_memory_to_stage(self, memory_id, stage_id):
  73. """将记忆关联到阶段"""
  74. link_file = os.path.join(self.storage_path, 'stage_links.json')
  75. links = {}
  76. if os.path.exists(link_file):
  77. with open(link_file, 'r', encoding='utf-8') as f:
  78. links = json.load(f)
  79. if stage_id not in links:
  80. links[stage_id] = []
  81. if memory_id not in links[stage_id]:
  82. links[stage_id].append(memory_id)
  83. with open(link_file, 'w', encoding='utf-8') as f:
  84. json.dump(links, f, ensure_ascii=False, indent=2)
  85. def get_stage_memories(self, stage_id):
  86. """获取阶段关联的记忆"""
  87. link_file = os.path.join(self.storage_path, 'stage_links.json')
  88. if not os.path.exists(link_file):
  89. return []
  90. with open(link_file, 'r', encoding='utf-8') as f:
  91. links = json.load(f)
  92. memory_ids = links.get(stage_id, [])
  93. return [self.retrieve_memory(memory_id=mid)[0] for mid in memory_ids if self.retrieve_memory(memory_id=mid)]