AI Agent 进阶 Agent记忆 向量数据库 知识图谱 RAG

Agent记忆系统设计:从短期记忆到长期知识管理

AIEng Hub
阅读约 30 分钟

引言

记忆是AI Agent的核心能力之一。一个没有记忆的Agent就像金鱼一样,每次交互都是全新的开始。设计一个高效、可靠的Agent记忆系统,是构建实用Agent的关键。

本文将系统性地介绍Agent记忆系统的设计原理、存储方案和实现技巧,帮助你构建一个具备”长期记忆”的智能Agent。

为什么Agent需要记忆?

人类记忆 vs Agent记忆

人类记忆Agent记忆实现挑战
短期记忆(几秒-几分钟)上下文窗口Token限制、信息压缩
工作记忆(当前任务)对话历史多轮对话管理
长期记忆(知识、经验)向量存储、知识库检索准确性、知识更新
情景记忆(个人经历)用户画像、交互记录隐私保护、数据安全
程序性记忆(技能)工具使用记录技能学习、迁移

记忆类型对比

┌─────────────────────────────────────────────────────────────┐
│                    Agent 记忆层次架构                         │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                    感官记忆                          │   │
│  │  (Sensory Memory) - 原始输入缓存                     │   │
│  │  • 用户输入文本                                      │   │
│  │  • 工具返回结果                                      │   │
│  │  • 系统状态快照                                      │   │
│  │  生命周期: 毫秒级                                     │   │
│  └─────────────────────────────────────────────────────┘   │
│                         ↓                                   │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                    短期记忆                          │   │
│  │  (Short-term Memory) - 当前上下文                    │   │
│  │  • 对话历史 (ConversationBuffer)                      │   │
│  │  • 当前任务状态                                      │   │
│  │  • 临时变量                                          │   │
│  │  生命周期: 单次对话 / 受限于上下文窗口                 │   │
│  └─────────────────────────────────────────────────────┘   │
│                         ↓                                   │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                    工作记忆                          │   │
│  │  (Working Memory) - 任务执行中                       │   │
│  │  • 任务规划 (Plan)                                    │   │
│  │  • 中间结果缓存                                      │   │
│  │  • 工具调用链                                        │   │
│  │  生命周期: 任务执行期间                               │   │
│  └─────────────────────────────────────────────────────┘   │
│                         ↓                                   │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                    长期记忆                          │   │
│  │  (Long-term Memory) - 持久化存储                     │   │
│  │  • 语义记忆: 向量数据库 (Chroma, Pinecone)            │   │
│  │  • 情景记忆: 用户交互历史                             │   │
│  │  • 程序记忆: 工具使用模式                             │   │
│  │  • 知识图谱: 结构化知识                              │   │
│  │  生命周期: 永久 / 可更新                              │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
└─────────────────────────────────────────────────────────────┘

短期记忆设计

1. 对话历史管理

from typing import List, Dict, Any, Optional
from dataclasses import dataclass, field
from datetime import datetime
import json

@dataclass
class Message:
    """单条消息"""
    role: str  # "user", "assistant", "system", "tool"
    content: str
    metadata: Dict[str, Any] = field(default_factory=dict)
    timestamp: datetime = field(default_factory=datetime.now)
    message_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    
    def to_dict(self) -> Dict[str, Any]:
        return {
            "role": self.role,
            "content": self.content,
            "metadata": self.metadata,
            "timestamp": self.timestamp.isoformat(),
            "message_id": self.message_id
        }

class ConversationBuffer:
    """基础对话缓冲区"""
    
    def __init__(self, max_messages: int = 100):
        self.messages: List[Message] = []
        self.max_messages = max_messages
    
    def add_message(self, role: str, content: str, 
                    metadata: Dict = None) -> Message:
        """添加消息"""
        message = Message(role=role, content=content, metadata=metadata or {})
        self.messages.append(message)
        
        # 保持历史在限制范围内
        if len(self.messages) > self.max_messages:
            self.messages = self.messages[-self.max_messages:]
        
        return message
    
    def get_messages(self, last_n: int = None) -> List[Message]:
        """获取最近的消息"""
        if last_n:
            return self.messages[-last_n:]
        return self.messages.copy()
    
    def clear(self):
        """清空历史"""
        self.messages = []
    
    def to_string(self, last_n: int = 10) -> str:
        """转换为字符串格式"""
        messages = self.get_messages(last_n)
        return "\n".join([
            f"{msg.role}: {msg.content}"
            for msg in messages
        ])

2. 智能摘要记忆

from langchain_openai import ChatOpenAI
from langchain.schema import HumanMessage, SystemMessage

class ConversationSummaryMemory:
    """基于摘要的对话记忆"""
    
    def __init__(self, llm: ChatOpenAI = None, 
                 buffer_size: int = 6,
                 summary_token_limit: int = 500):
        self.llm = llm or ChatOpenAI(model="gpt-3.5-turbo")
        self.buffer = ConversationBuffer(max_messages=buffer_size)
        self.summary = ""
        self.summary_token_limit = summary_token_limit
    
    def add_message(self, role: str, content: str, 
                    metadata: Dict = None) -> None:
        """添加消息并维护摘要"""
        self.buffer.add_message(role, content, metadata)
        
        # 当缓冲区满时,生成摘要
        if len(self.buffer.messages) >= self.buffer.max_messages:
            self._summarize_buffer()
    
    async def _summarize_buffer(self) -> None:
        """生成对话摘要"""
        conversation = self.buffer.to_string()
        
        prompt = f"""
        请将以下对话内容总结为简洁的摘要,保留关键信息:
        
        当前摘要:
        {self.summary}
        
        新对话:
        {conversation}
        
        要求:
        1. 总结主要讨论的话题和结论
        2. 保留用户明确提到的偏好和约束
        3. 控制在{self.summary_token_limit}字以内
        4. 如果与当前摘要相关,进行合并
        
        输出摘要:
        """
        
        response = await self.llm.ainvoke([HumanMessage(content=prompt)])
        self.summary = response.content
        
        # 清空缓冲区,保留最近2条
        recent = self.buffer.messages[-2:]
        self.buffer.clear()
        for msg in recent:
            self.buffer.messages.append(msg)
    
    def get_context(self) -> str:
        """获取完整的上下文(摘要 + 近期消息)"""
        context = []
        
        if self.summary:
            context.append(f"对话摘要: {self.summary}")
        
        recent = self.buffer.to_string()
        if recent:
            context.append(f"\n近期对话:\n{recent}")
        
        return "\n".join(context)

3. 滑动窗口记忆

import tiktoken

class SlidingWindowMemory:
    """基于Token的滑动窗口记忆"""
    
    def __init__(self, max_tokens: int = 4000, 
                 model: str = "gpt-4"):
        self.max_tokens = max_tokens
        self.model = model
        self.messages: List[Message] = []
        self.encoding = tiktoken.encoding_for_model(model)
    
    def count_tokens(self, text: str) -> int:
        """计算文本的token数"""
        return len(self.encoding.encode(text))
    
    def add_message(self, role: str, content: str, 
                    metadata: Dict = None) -> None:
        """添加消息,自动管理窗口"""
        message = Message(role=role, content=content, metadata=metadata or {})
        self.messages.append(message)
        
        # 确保不超过token限制
        self._trim_to_limit()
    
    def _trim_to_limit(self) -> None:
        """裁剪到token限制内"""
        total_tokens = sum(
            self.count_tokens(msg.content) + 4  # 角色标记开销
            for msg in self.messages
        )
        
        # 保留系统消息和最近的对话
        system_messages = [m for m in self.messages if m.role == "system"]
        other_messages = [m for m in self.messages if m.role != "system"]
        
        # 从最早的消息开始删除
        while total_tokens > self.max_tokens and len(other_messages) > 2:
            removed = other_messages.pop(0)
            total_tokens -= self.count_tokens(removed.content) + 4
        
        self.messages = system_messages + other_messages
    
    def get_messages_for_llm(self) -> List[Dict[str, str]]:
        """获取LLM可用的消息格式"""
        return [
            {"role": msg.role, "content": msg.content}
            for msg in self.messages
        ]

长期记忆设计

1. 向量存储记忆

from typing import List, Dict, Any, Optional
import numpy as np
import chromadb
from chromadb.config import Settings
import hashlib

class VectorStoreMemory:
    """基于向量数据库的长期记忆"""
    
    def __init__(self, 
                 collection_name: str = "agent_memory",
                 embedding_model = None,
                 persist_directory: str = "./memory_db"):
        # 初始化ChromaDB
        self.client = chromadb.Client(Settings(
            chroma_db_impl="duckdb+parquet",
            persist_directory=persist_directory
        ))
        
        # 获取或创建集合
        self.collection = self.client.get_or_create_collection(
            name=collection_name,
            metadata={"hnsw:space": "cosine"}
        )
        
        self.embedding_model = embedding_model
    
    async def add_memory(self, 
                        content: str,
                        memory_type: str = "fact",
                        metadata: Dict[str, Any] = None) -> str:
        """添加记忆"""
        # 生成唯一ID
        memory_id = hashlib.md5(
            f"{content}:{datetime.now().isoformat()}".encode()
        ).hexdigest()
        
        # 生成embedding
        if self.embedding_model:
            embedding = await self.embedding_model.embed_query(content)
        else:
            # 使用默认的embedding
            embedding = self._simple_embed(content)
        
        # 存储到向量数据库
        self.collection.add(
            ids=[memory_id],
            embeddings=[embedding],
            documents=[content],
            metadatas=[{
                "type": memory_type,
                "timestamp": datetime.now().isoformat(),
                **(metadata or {})
            }]
        )
        
        return memory_id
    
    async def search_memories(self, 
                             query: str,
                             top_k: int = 5,
                             memory_type: str = None,
                             filter_dict: Dict = None) -> List[Dict]:
        """搜索相关记忆"""
        # 生成查询embedding
        if self.embedding_model:
            query_embedding = await self.embedding_model.embed_query(query)
        else:
            query_embedding = self._simple_embed(query)
        
        # 构建过滤条件
        where_clause = filter_dict or {}
        if memory_type:
            where_clause["type"] = memory_type
        
        # 执行搜索
        results = self.collection.query(
            query_embeddings=[query_embedding],
            n_results=top_k,
            where=where_clause if where_clause else None
        )
        
        # 格式化结果
        memories = []
        for i in range(len(results["ids"][0])):
            memories.append({
                "id": results["ids"][0][i],
                "content": results["documents"][0][i],
                "distance": results["distances"][0][i],
                "metadata": results["metadatas"][0][i]
            })
        
        return memories
    
    def _simple_embed(self, text: str) -> List[float]:
        """简单的embedding实现(仅用于演示)"""
        # 实际应用中应使用专业的embedding模型
        # 如OpenAI的text-embedding-ada-002
        import random
        random.seed(hash(text) % 10000)
        return [random.random() for _ in range(1536)]
    
    def delete_memory(self, memory_id: str) -> None:
        """删除记忆"""
        self.collection.delete(ids=[memory_id])
    
    def update_memory(self, memory_id: str, 
                     content: str,
                     metadata: Dict = None) -> None:
        """更新记忆"""
        # 先删除旧记忆
        self.delete_memory(memory_id)
        # 添加新记忆
        self.add_memory(content, metadata=metadata)

2. 分层记忆系统

class HierarchicalMemory:
    """分层记忆系统"""
    
    def __init__(self, llm: ChatOpenAI = None):
        self.llm = llm or ChatOpenAI()
        
        # 三层记忆
        self.episodic_memory = VectorStoreMemory(collection_name="episodic")  # 情景记忆
        self.semantic_memory = VectorStoreMemory(collection_name="semantic")  # 语义记忆
        self.procedural_memory = VectorStoreMemory(collection_name="procedural")  # 程序记忆
        
        # 工作记忆(短期)
        self.working_memory = ConversationBuffer(max_messages=20)
    
    async def memorize_interaction(self, 
                                  user_input: str,
                                  agent_response: str,
                                  context: Dict = None) -> None:
        """记忆一次交互"""
        # 1. 存储到工作记忆
        self.working_memory.add_message("user", user_input)
        self.working_memory.add_message("assistant", agent_response)
        
        # 2. 提取关键信息并分类存储
        await self._categorize_and_store(
            user_input, 
            agent_response, 
            context
        )
    
    async def _categorize_and_store(self,
                                   user_input: str,
                                   agent_response: str,
                                   context: Dict = None) -> None:
        """对记忆进行分类存储"""
        
        # 使用LLM分析记忆类型
        prompt = f"""
        分析以下对话,提取关键信息并分类:
        
        用户: {user_input}
        Agent: {agent_response}
        
        请提取:
        1. 事实性知识(客观信息)
        2. 用户偏好(主观喜好)
        3. 交互模式(如何完成任务)
        
        输出JSON格式:
        {{
            "facts": ["事实1", "事实2"],
            "preferences": ["偏好1", "偏好2"],
            "patterns": ["模式1", "模式2"]
        }}
        """
        
        response = await self.llm.ainvoke([HumanMessage(content=prompt)])
        
        try:
            categorized = json.loads(response.content)
            
            # 存储到不同记忆层
            for fact in categorized.get("facts", []):
                await self.semantic_memory.add_memory(
                    fact, 
                    memory_type="fact",
                    metadata={"source": "conversation"}
                )
            
            for pref in categorized.get("preferences", []):
                await self.episodic_memory.add_memory(
                    pref,
                    memory_type="preference",
                    metadata={"user_context": context}
                )
            
            for pattern in categorized.get("patterns", []):
                await self.procedural_memory.add_memory(
                    pattern,
                    memory_type="pattern",
                    metadata={"task_type": context.get("task_type")}
                )
                
        except json.JSONDecodeError:
            # 如果解析失败,直接存储原始内容
            await self.episodic_memory.add_memory(
                f"User: {user_input}\nAgent: {agent_response}",
                memory_type="raw_conversation"
            )
    
    async def recall(self, 
                    query: str,
                    memory_types: List[str] = None) -> Dict[str, List]:
        """回忆相关信息"""
        memory_types = memory_types or ["semantic", "episodic", "procedural"]
        
        results = {}
        
        if "semantic" in memory_types:
            results["facts"] = await self.semantic_memory.search_memories(
                query, memory_type="fact"
            )
        
        if "episodic" in memory_types:
            results["experiences"] = await self.episodic_memory.search_memories(
                query, memory_type="preference"
            )
        
        if "procedural" in memory_types:
            results["patterns"] = await self.procedural_memory.search_memories(
                query, memory_type="pattern"
            )
        
        return results
    
    def get_working_context(self) -> str:
        """获取工作记忆上下文"""
        return self.working_memory.to_string(last_n=10)

3. 知识图谱记忆

from neo4j import GraphDatabase
from typing import List, Dict, Any

class KnowledgeGraphMemory:
    """基于知识图谱的记忆系统"""
    
    def __init__(self, uri: str, user: str, password: str):
        self.driver = GraphDatabase.driver(uri, auth=(user, password))
    
    def close(self):
        self.driver.close()
    
    def add_entity(self, entity_name: str, 
                   entity_type: str,
                   properties: Dict[str, Any] = None) -> None:
        """添加实体"""
        with self.driver.session() as session:
            session.run("""
                MERGE (e:Entity {name: $name, type: $type})
                SET e += $properties
                SET e.updated_at = datetime()
            """, name=entity_name, type=entity_type, 
                 properties=properties or {})
    
    def add_relation(self, from_entity: str,
                    relation_type: str,
                    to_entity: str,
                    properties: Dict = None) -> None:
        """添加关系"""
        with self.driver.session() as session:
            session.run("""
                MATCH (from:Entity {name: $from})
                MATCH (to:Entity {name: $to})
                MERGE (from)-[r:RELATION {type: $rel_type}]->(to)
                SET r += $properties
                SET r.updated_at = datetime()
            """, from=from_entity, to=to_entity, 
                 rel_type=relation_type, properties=properties or {})
    
    def query_knowledge(self, entity_name: str, 
                       depth: int = 2) -> Dict:
        """查询实体相关知识"""
        with self.driver.session() as session:
            result = session.run("""
                MATCH path = (start:Entity {name: $name})-[:RELATION*1..$depth]-(related)
                RETURN start, relationships(path), nodes(path)
            """, name=entity_name, depth=depth)
            
            knowledge = {"entity": entity_name, "relations": []}
            for record in result:
                rels = record["relationships(path)"]
                nodes = record["nodes(path)"]
                
                for rel in rels:
                    knowledge["relations"].append({
                        "from": rel.start_node["name"],
                        "type": rel["type"],
                        "to": rel.end_node["name"],
                        "properties": dict(rel)
                    })
            
            return knowledge
    
    def find_path(self, from_entity: str, 
                 to_entity: str) -> List[Dict]:
        """查找两个实体间的关系路径"""
        with self.driver.session() as session:
            result = session.run("""
                MATCH path = shortestPath(
                    (a:Entity {name: $from})-[:RELATION*]-(b:Entity {name: $to})
                )
                RETURN relationships(path), nodes(path)
            """, from=from_entity, to=to_entity)
            
            paths = []
            for record in result:
                path_info = []
                nodes = record["nodes(path)"]
                rels = record["relationships(path)"]
                
                for i, rel in enumerate(rels):
                    path_info.append({
                        "from": nodes[i]["name"],
                        "relation": rel["type"],
                        "to": nodes[i+1]["name"]
                    })
                paths.append(path_info)
            
            return paths

# 使用示例
async def demo_knowledge_graph():
    kg = KnowledgeGraphMemory(
        uri="bolt://localhost:7687",
        user="neo4j",
        password="password"
    )
    
    # 添加用户知识
    kg.add_entity("张三", "User", {"age": 30, "job": "工程师"})
    kg.add_entity("Python", "Technology", {"type": "编程语言"})
    kg.add_entity("机器学习", "Field", {"category": "AI"})
    
    # 添加关系
    kg.add_relation("张三", "擅长", "Python")
    kg.add_relation("张三", "学习", "机器学习")
    kg.add_relation("机器学习", "使用", "Python")
    
    # 查询知识
    knowledge = kg.query_knowledge("张三", depth=2)
    print(knowledge)
    
    kg.close()

实战:完整的Agent记忆系统

import asyncio
import json
from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta

class AgentMemorySystem:
    """完整的Agent记忆系统"""
    
    def __init__(self, 
                 llm: ChatOpenAI = None,
                 vector_db_path: str = "./memory_db",
                 enable_kg: bool = False):
        self.llm = llm or ChatOpenAI(model="gpt-4")
        
        # 初始化各层记忆
        self.short_term = ConversationSummaryMemory(self.llm)
        self.long_term = HierarchicalMemory(self.llm)
        self.vector_store = VectorStoreMemory(
            persist_directory=vector_db_path
        )
        
        # 可选的知识图谱
        self.knowledge_graph = None
        if enable_kg:
            self.knowledge_graph = KnowledgeGraphMemory(
                "bolt://localhost:7687",
                "neo4j",
                "password"
            )
        
        # 用户画像
        self.user_profiles: Dict[str, Dict] = {}
    
    async def process_interaction(self,
                                 user_id: str,
                                 user_input: str,
                                 agent_response: str,
                                 task_context: Dict = None) -> None:
        """处理一次交互,更新所有记忆层"""
        
        # 1. 更新短期记忆
        self.short_term.add_message("user", user_input)
        self.short_term.add_message("assistant", agent_response)
        
        # 2. 更新长期记忆
        await self.long_term.memorize_interaction(
            user_input, 
            agent_response, 
            task_context
        )
        
        # 3. 更新用户画像
        await self._update_user_profile(user_id, user_input, agent_response)
        
        # 4. 定期总结(每10轮)
        if len(self.short_term.buffer.messages) % 10 == 0:
            await self._periodic_summary(user_id)
    
    async def _update_user_profile(self, user_id: str,
                                  user_input: str,
                                  agent_response: str) -> None:
        """更新用户画像"""
        if user_id not in self.user_profiles:
            self.user_profiles[user_id] = {
                "preferences": {},
                "interaction_count": 0,
                "common_topics": [],
                "communication_style": "",
                "last_updated": datetime.now().isoformat()
            }
        
        profile = self.user_profiles[user_id]
        profile["interaction_count"] += 1
        
        # 每5次交互更新一次画像
        if profile["interaction_count"] % 5 == 0:
            # 分析用户偏好
            recent_interactions = self.short_term.buffer.to_string(last_n=10)
            
            prompt = f"""
            根据以下交互记录,总结用户画像:
            
            {recent_interactions}
            
            请提取:
            1. 用户的主要兴趣领域
            2. 沟通风格偏好(简洁/详细/正式/随意)
            3. 特殊偏好或约束
            
            输出JSON格式。
            """
            
            response = await self.llm.ainvoke([HumanMessage(content=prompt)])
            try:
                new_profile = json.loads(response.content)
                profile.update(new_profile)
                profile["last_updated"] = datetime.now().isoformat()
            except:
                pass
    
    async def _periodic_summary(self, user_id: str) -> None:
        """定期总结记忆"""
        # 生成对话摘要
        await self.short_term._summarize_buffer()
        
        # 将摘要存入长期记忆
        if self.short_term.summary:
            await self.vector_store.add_memory(
                self.short_term.summary,
                memory_type="conversation_summary",
                metadata={
                    "user_id": user_id,
                    "timestamp": datetime.now().isoformat()
                }
            )
    
    async def retrieve_context(self,
                              query: str,
                              user_id: str = None) -> Dict[str, Any]:
        """检索相关上下文"""
        context = {
            "short_term": self.short_term.get_context(),
            "user_profile": self.user_profiles.get(user_id, {}),
            "relevant_memories": {},
            "knowledge_graph": None
        }
        
        # 检索长期记忆
        memories = await self.long_term.recall(query)
        context["relevant_memories"] = memories
        
        # 检索向量存储
        vector_results = await self.vector_store.search_memories(query, top_k=3)
        context["vector_memories"] = vector_results
        
        # 知识图谱查询
        if self.knowledge_graph and user_id:
            kg_result = self.knowledge_graph.query_knowledge(user_id, depth=2)
            context["knowledge_graph"] = kg_result
        
        return context
    
    def format_context_for_prompt(self, context: Dict) -> str:
        """将上下文格式化为Prompt可用的字符串"""
        parts = []
        
        # 用户画像
        if context["user_profile"]:
            parts.append("用户画像:")
            for key, value in context["user_profile"].items():
                if key != "last_updated":
                    parts.append(f"  - {key}: {value}")
        
        # 短期记忆
        if context["short_term"]:
            parts.append(f"\n近期对话:\n{context['short_term']}")
        
        # 相关记忆
        if context["relevant_memories"]:
            parts.append("\n相关历史信息:")
            for mem_type, memories in context["relevant_memories"].items():
                if memories:
                    parts.append(f"  [{mem_type}]")
                    for mem in memories[:2]:  # 每类最多2条
                        parts.append(f"    - {mem.get('content', mem)}")
        
        return "\n".join(parts)

# 使用示例
async def demo_memory_system():
    memory_system = AgentMemorySystem(
        vector_db_path="./demo_memory_db"
    )
    
    user_id = "user_001"
    
    # 模拟多次交互
    interactions = [
        ("你好,我想学习Python编程", "你好!Python是一门非常优秀的编程语言..."),
        ("我更喜欢通过项目实战来学习", "很好的学习方式!项目驱动能让你更快掌握实际技能..."),
        ("请推荐一些适合初学者的项目", "对于初学者,我推荐以下几个项目:1. 计算器 2. 待办事项应用..."),
        ("我对Web开发比较感兴趣", "Web开发是Python的热门应用领域,你可以学习Flask或Django框架..."),
    ]
    
    for user_input, agent_response in interactions:
        await memory_system.process_interaction(
            user_id, user_input, agent_response,
            task_context={"task_type": "learning_assistant"}
        )
        print(f"已记忆交互: {user_input[:30]}...")
    
    # 检索上下文
    query = "推荐Python学习资源"
    context = await memory_system.retrieve_context(query, user_id)
    
    print("\n" + "="*60)
    print("检索到的上下文:")
    print("="*60)
    formatted = memory_system.format_context_for_prompt(context)
    print(formatted)

if __name__ == "__main__:
    asyncio.run(demo_memory_system())

记忆管理最佳实践

1. 记忆重要性评分

class MemoryScorer:
    """记忆重要性评分器"""
    
    def __init__(self, llm: ChatOpenAI = None):
        self.llm = llm or ChatOpenAI()
    
    async def score_memory(self, content: str, 
                          context: Dict = None) -> float:
        """评估记忆的重要性分数 (0-1)"""
        
        prompt = f"""
        评估以下信息的重要性(0-1分):
        
        信息内容: {content}
        上下文: {context or ''}
        
        评分标准:
        - 0.9-1.0: 关键信息(用户身份、重要偏好、关键约束)
        - 0.7-0.8: 重要信息(任务目标、重要上下文)
        - 0.4-0.6: 一般信息(普通对话内容)
        - 0.1-0.3: 次要信息(寒暄、重复内容)
        
        只输出数字分数。
        """
        
        response = await self.llm.ainvoke([HumanMessage(content=prompt)])
        try:
            score = float(response.content.strip())
            return max(0.0, min(1.0, score))
        except:
            return 0.5
    
    async def should_remember(self, content: str,
                             threshold: float = 0.5) -> bool:
        """判断是否应该记住"""
        score = await self.score_memory(content)
        return score >= threshold

2. 记忆遗忘机制

class MemoryForgetting:
    """记忆遗忘管理"""
    
    def __init__(self, 
                 decay_rate: float = 0.95,
                 access_boost: float = 0.1):
        self.decay_rate = decay_rate  # 遗忘速率
        self.access_boost = access_boost  # 访问提升
        self.memory_strengths: Dict[str, float] = {}
    
    def update_strength(self, memory_id: str, 
                       accessed: bool = False) -> float:
        """更新记忆强度"""
        current = self.memory_strengths.get(memory_id, 1.0)
        
        # 时间衰减
        current *= self.decay_rate
        
        # 访问提升
        if accessed:
            current = min(1.0, current + self.access_boost)
        
        self.memory_strengths[memory_id] = current
        return current
    
    def should_forget(self, memory_id: str, 
                     threshold: float = 0.1) -> bool:
        """判断是否应该遗忘"""
        strength = self.memory_strengths.get(memory_id, 1.0)
        return strength < threshold
    
    async def cleanup_memories(self, 
                              vector_store: VectorStoreMemory) -> int:
        """清理遗忘的记忆"""
        forgotten = 0
        for memory_id, strength in list(self.memory_strengths.items()):
            if self.should_forget(memory_id):
                vector_store.delete_memory(memory_id)
                del self.memory_strengths[memory_id]
                forgotten += 1
        return forgotten

3. 记忆冲突解决

class MemoryConflictResolver:
    """记忆冲突解决器"""
    
    def __init__(self, llm: ChatOpenAI = None):
        self.llm = llm or ChatOpenAI()
    
    async def detect_conflict(self, 
                             existing: Dict,
                             new_memory: Dict) -> bool:
        """检测记忆冲突"""
        # 简单的关键词匹配
        existing_keywords = set(existing.get("content", "").lower().split())
        new_keywords = set(new_memory.get("content", "").lower().split())
        
        overlap = existing_keywords & new_keywords
        total = existing_keywords | new_keywords
        
        # 相似度超过阈值视为潜在冲突
        return len(overlap) / len(total) > 0.5 if total else False
    
    async def resolve_conflict(self,
                              existing: Dict,
                              new_memory: Dict) -> Dict:
        """解决记忆冲突"""
        
        prompt = f"""
        以下两条记忆存在冲突,请整合为一条准确的记忆:
        
        记忆1(旧): {existing.get("content")}
        时间: {existing.get("metadata", {}).get("timestamp")}
        
        记忆2(新): {new_memory.get("content")}
        时间: {new_memory.get("metadata", {}).get("timestamp")}
        
        要求:
        1. 保留两条记忆的关键信息
        2. 如果信息矛盾,以新记忆为准
        3. 保持简洁明了
        
        整合后的记忆:
        """
        
        response = await self.llm.ainvoke([HumanMessage(content=prompt)])
        
        return {
            "content": response.content.strip(),
            "metadata": {
                **existing.get("metadata", {}),
                "updated_at": datetime.now().isoformat(),
                "merged_from": [existing.get("id"), new_memory.get("id")]
            }
        }

总结

本文系统介绍了Agent记忆系统的设计与实现,核心要点包括:

  1. 记忆分层:感官记忆 → 短期记忆 → 工作记忆 → 长期记忆
  2. 短期记忆:对话历史、智能摘要、滑动窗口管理
  3. 长期记忆:向量存储、分层记忆、知识图谱
  4. 记忆管理:重要性评分、遗忘机制、冲突解决
  5. 实战集成:完整的记忆系统实现

记忆系统的设计原则:

  • 分层存储:不同重要性、时效性的信息存储在不同层级
  • 动态管理:自动摘要、遗忘、更新
  • 高效检索:语义搜索 + 结构化查询
  • 隐私保护:敏感信息加密、访问控制

相关资源


本文最后更新于 2024-02-15,如有问题欢迎在社区讨论。