LangGraph入门案例详解:Reflexion架构
Reflexion 是由 Shinn 等人设计的一种架构,旨在通过口头反馈和自我反思进行学习。该代理会对任务响应进行明确的批评,以生成更高质量的最终响应,但这会以较长的执行时间为代价。本文将详细讲解如何使用LangGraph框架构建一个Reflexion架构的入门案例。
框架概述
Reflexion架构主要包含以下三个组件:
- Actor(代理)与自我反思:负责生成初始响应,并基于自我反思进行修正。
- 外部评估器:针对特定任务进行评估,如代码编译步骤等。
- 情节记忆:存储来自Actor的反思内容。
在本案例中,我们将重点构建Actor组件,后两个组件(外部评估器和情节记忆)因任务特定性较强,将在实际应用中根据需求进行定制。
环境搭建
首先,需要安装相关的Python包:
%pip install -U --quiet langgraph langchain_anthropic tavily-python
然后,配置API密钥:
import getpass
import osdef _set_if_undefined(var: str) -> None:if os.environ.get(var):returnos.environ[var] = getpass.getpass(var)_set_if_undefined("ANTHROPIC_API_KEY")
_set_if_undefined("TAVILY_API_KEY")
LangSmith 是一个帮助开发者调试和监控LangGraph项目的工具,建议注册以便更好地管理项目。
定义LLM(大型语言模型)
选择并配置一个LLM,比如使用Anthropic的ChatAnthropic模型:
from langchain_anthropic import ChatAnthropicllm = ChatAnthropic(model="claude-3-5-sonnet-20240620")
# 也可以使用OpenAI或其他提供商
# from langchain_openai import ChatOpenAI
# llm = ChatOpenAI(model="gpt-4-turbo-preview")
构建Actor(包含自我反思)
Actor是Reflexion架构的核心,负责生成初始响应、进行自我批评并修正响应。主要包括以下子组件:
- 工具执行上下文
- 初始响应生成器
- 修正器
构建工具
使用TavilySearchAPIWrapper作为搜索工具:
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_community.utilities.tavily_search import TavilySearchAPIWrappersearch = TavilySearchAPIWrapper()
tavily_tool = TavilySearchResults(api_wrapper=search, max_results=5)
定义Pydantic模型
使用Pydantic定义响应和反思的结构:
from pydantic import BaseModel, Fieldclass Reflection(BaseModel):missing: str = Field(description="对缺失部分的批评。")superfluous: str = Field(description="对多余部分的批评。")class AnswerQuestion(BaseModel):"""回答问题。提供答案、反思,然后提出搜索查询以改进答案。"""answer: str = Field(description="问题的详细答案,约250字。")reflection: Reflection = Field(description="你对初始答案的反思。")search_queries: list[str] = Field(description="1-3个用于研究改进当前答案的搜索查询。")
创建响应器
定义一个具有重试机制的响应器,确保生成的响应符合Pydantic模型的验证:
from langchain_core.messages import HumanMessage, ToolMessage
from langchain_core.output_parsers.openai_tools import PydanticToolsParser
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from pydantic import ValidationErrorclass ResponderWithRetries:def __init__(self, runnable, validator):self.runnable = runnableself.validator = validatordef respond(self, state: dict):response = []for attempt in range(3):response = self.runnable.invoke({"messages": state["messages"]}, {"tags": [f"attempt:{attempt}"]})try:self.validator.invoke(response)return {"messages": response}except ValidationError as e:state = state + [response,ToolMessage(content=f"{repr(e)}\n\n请密切注意函数模式。\n\n"+ self.validator.schema_json()+ " 请通过修正所有验证错误来响应。",tool_call_id=response.tool_calls[0]["id"],),]return {"messages": response}
定义提示模板
创建一个聊天提示模板,引导模型生成回答并进行反思:
import datetimeactor_prompt_template = ChatPromptTemplate.from_messages([("system","""你是一名资深研究员。
当前时间:{time}1. {first_instruction}
2. 反思并批评你的答案。请严厉批评以最大限度地提高质量。
3. 推荐搜索查询以研究信息并改进你的答案。""",),MessagesPlaceholder(variable_name="messages"),("user","\n\n<system>反思用户最初的问题以及迄今为止采取的行动。使用 {function_name} 函数进行响应。</reminder>",),]
).partial(time=lambda: datetime.datetime.now().isoformat(),
)
生成初始回答
绑定LLM和工具,创建初始回答链:
initial_answer_chain = actor_prompt_template.partial(first_instruction="提供一个详细的约250字的答案。",function_name=AnswerQuestion.__name__,
) | llm.bind_tools(tools=[AnswerQuestion])validator = PydanticToolsParser(tools=[AnswerQuestion])first_responder = ResponderWithRetries(runnable=initial_answer_chain, validator=validator
)
示例问题
测试初始回答生成器:
example_question = "为什么反思对AI有用?"
initial = first_responder.respond({"messages": [HumanMessage(content=example_question)]}
)
修正回答
根据初始回答的反思,生成修正后的回答:
revise_instructions = """使用新信息修正你之前的答案。- 你应该利用之前的批评来添加重要信息到你的答案中。- 你必须在修正后的答案中包含数字引用,以确保其可验证性。- 在答案底部添加一个“参考文献”部分(不计入字数限制),形式如下:- [1] https://example.com- [2] https://example.com- 你应该利用之前的批评来删除答案中的多余信息,并确保其不超过250字。
"""class ReviseAnswer(AnswerQuestion):"""修正你对问题的原始答案。提供一个答案、反思,使用参考文献引用你的反思,并最后添加搜索查询以改进答案。"""references: list[str] = Field(description="引用以支持你更新后的答案。")revision_chain = actor_prompt_template.partial(first_instruction=revise_instructions,function_name=ReviseAnswer.__name__,
) | llm.bind_tools(tools=[ReviseAnswer])revision_validator = PydanticToolsParser(tools=[ReviseAnswer])revisor = ResponderWithRetries(runnable=revision_chain, validator=revision_validator)
执行修正
import jsonrevised = revisor.respond({"messages": [HumanMessage(content=example_question),initial["messages"],ToolMessage(tool_call_id=initial["messages"].tool_calls[0]["id"],content=json.dumps(tavily_tool.invoke({"query": initial["messages"].tool_calls[0]["args"]["search_queries"][0]})),),]}
)
revised["messages"]
创建工具节点
定义工具节点以执行工具调用:
from langchain_core.tools import StructuredTool
from langgraph.prebuilt import ToolNodedef run_queries(search_queries: list[str], **kwargs):"""运行生成的查询。"""return tavily_tool.batch([{"query": query} for query in search_queries])tool_node = ToolNode([StructuredTool.from_function(run_queries, name=AnswerQuestion.__name__),StructuredTool.from_function(run_queries, name=ReviseAnswer.__name__),]
)
构建图谱
将所有组件连接在一起,形成执行流程:
from typing import Literal
from langgraph.graph import END, StateGraph, START
from langgraph.graph.message import add_messages
from typing import Annotated
from typing_extensions import TypedDictclass State(TypedDict):messages: Annotated[list, add_messages]MAX_ITERATIONS = 5
builder = StateGraph(State)
builder.add_node("draft", first_responder.respond)
builder.add_node("execute_tools", tool_node)
builder.add_node("revise", revisor.respond)# 连接节点
builder.add_edge("draft", "execute_tools")
builder.add_edge("execute_tools", "revise")# 定义循环逻辑
def _get_num_iterations(state: list):i = 0for m in state[::-1]:if m.type not in {"tool", "ai"}:breaki += 1return idef event_loop(state: list):num_iterations = _get_num_iterations(state["messages"])if num_iterations > MAX_ITERATIONS:return ENDreturn "execute_tools"# 修正后可能继续执行工具或结束
builder.add_conditional_edges("revise", event_loop, ["execute_tools", END])
builder.add_edge(START, "draft")
graph = builder.compile()
执行并观察结果
启动图谱执行,并观察每一步的输出:
from IPython.display import Image, displaytry:display(Image(graph.get_graph().draw_mermaid_png()))
except Exception:passevents = graph.stream({"messages": [("user", "我们应该如何应对气候危机?")]},stream_mode="values",
)
for i, step in enumerate(events):print(f"步骤 {i}")step["messages"][-1].pretty_print()
示例输出分析:
- 步骤 0:用户提出问题。
- 步骤 1:AI生成初始回答,并提出反思和改进建议。
- 步骤 2:工具执行(如搜索相关信息)。
- 步骤 3:AI基于工具提供的信息修正回答。
通过这种多轮反思和修正,AI能够生成更加全面和准确的回答。
总结
本案例展示了如何使用LangGraph构建一个具备自我反思能力的AI代理。通过以下步骤实现:
- 环境搭建:安装必要的Python包并配置API密钥。
- 定义LLM:选择并配置一个大型语言模型。
- 构建Actor:包括工具执行上下文、初始回答生成器和修正器。
- 构建工具:利用外部搜索工具获取补充信息。
- 构建图谱:将所有组件连接,定义执行流程和循环逻辑。
- 执行并观察:运行图谱,观察AI如何通过反思不断改进回答。
通过这种方法,可以创建一个能够通过自我反思和外部工具不断优化响应质量的智能代理,适用于各种复杂任务和应用场景。
汇总
以下是将上述所有代码汇总到一个Python文件中的完整脚本。请确保在运行之前已安装所有必要的包,并配置好API密钥。
# reflexion_architecture.pyimport getpass
import os
from typing import List, Annotated
from typing_extensions import TypedDict
import datetime
import jsonfrom langchain_anthropic import ChatAnthropic
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_community.utilities.tavily_search import TavilySearchAPIWrapper
from pydantic import BaseModel, Field, ValidationError
from langchain_core.messages import HumanMessage, ToolMessage
from langchain_core.output_parsers.openai_tools import PydanticToolsParser
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.tools import StructuredTool
from langgraph.prebuilt import ToolNode
from langgraph.graph import END, StateGraph, START
from langgraph.graph.message import add_messages
from IPython.display import Image, display# 配置API密钥
def _set_if_undefined(var: str) -> None:if os.environ.get(var):returnos.environ[var] = getpass.getpass(f"请输入 {var} 的值: ")_set_if_undefined("ANTHROPIC_API_KEY")
_set_if_undefined("TAVILY_API_KEY")# 定义Pydantic模型
class Reflection(BaseModel):missing: str = Field(description="对缺失部分的批评。")superfluous: str = Field(description="对多余部分的批评。")class AnswerQuestion(BaseModel):"""回答问题。提供答案、反思,然后提出搜索查询以改进答案。"""answer: str = Field(description="问题的详细答案,约250字。")reflection: Reflection = Field(description="你对初始答案的反思。")search_queries: List[str] = Field(description="1-3个用于研究改进当前答案的搜索查询。")class ReviseAnswer(AnswerQuestion):"""修正你对问题的原始答案。提供一个答案、反思,使用参考文献引用你的反思,并最后添加搜索查询以改进答案。"""references: List[str] = Field(description="引用以支持你更新后的答案。")# 定义ResponderWithRetries类
class ResponderWithRetries:def __init__(self, runnable, validator):self.runnable = runnableself.validator = validatordef respond(self, state: dict):response = []for attempt in range(3):response = self.runnable.invoke({"messages": state["messages"]}, {"tags": [f"attempt:{attempt}"]})try:self.validator.invoke(response)return {"messages": response}except ValidationError as e:state = state + [response,ToolMessage(content=f"{repr(e)}\n\n请密切注意函数模式。\n\n"+ self.validator.schema_json()+ " 请通过修正所有验证错误来响应。",tool_call_id=response.tool_calls[0]["id"],),]return {"messages": response}# 定义提示模板
actor_prompt_template = ChatPromptTemplate.from_messages([("system","""你是一名资深研究员。
当前时间:{time}1. {first_instruction}
2. 反思并批评你的答案。请严厉批评以最大限度地提高质量。
3. 推荐搜索查询以研究信息并改进你的答案。""",),MessagesPlaceholder(variable_name="messages"),("user","\n\n<system>反思用户最初的问题以及迄今为止采取的行动。使用 {function_name} 函数进行响应。</reminder>",),]
).partial(time=lambda: datetime.datetime.now().isoformat(),
)# 定义LLM
llm = ChatAnthropic(model="claude-3-5-sonnet-20240620")
# 如果使用OpenAI,可以使用以下代码:
# from langchain_openai import ChatOpenAI
# llm = ChatOpenAI(model="gpt-4-turbo-preview")# 创建工具
search = TavilySearchAPIWrapper()
tavily_tool = TavilySearchResults(api_wrapper=search, max_results=5)# 创建初始回答生成器
initial_answer_chain = actor_prompt_template.partial(first_instruction="提供一个详细的约250字的答案。",function_name=AnswerQuestion.__name__,
) | llm.bind_tools(tools=[AnswerQuestion])validator = PydanticToolsParser(tools=[AnswerQuestion])first_responder = ResponderWithRetries(runnable=initial_answer_chain, validator=validator
)# 定义示例问题
example_question = "为什么反思对AI有用?"
initial = first_responder.respond({"messages": [HumanMessage(content=example_question)]}
)# 定义修正回答的指令
revise_instructions = """使用新信息修正你之前的答案。- 你应该利用之前的批评来添加重要信息到你的答案中。- 你必须在修正后的答案中包含数字引用,以确保其可验证性。- 在答案底部添加一个“参考文献”部分(不计入字数限制),形式如下:- [1] https://example.com- [2] https://example.com- 你应该利用之前的批评来删除答案中的多余信息,并确保其不超过250字。
"""# 创建修正链
revision_chain = actor_prompt_template.partial(first_instruction=revise_instructions,function_name=ReviseAnswer.__name__,
) | llm.bind_tools(tools=[ReviseAnswer])revision_validator = PydanticToolsParser(tools=[ReviseAnswer])revisor = ResponderWithRetries(runnable=revision_chain, validator=revision_validator)# 执行修正
revised = revisor.respond({"messages": [HumanMessage(content=example_question),initial["messages"],ToolMessage(tool_call_id=initial["messages"].tool_calls[0]["id"],content=json.dumps(tavily_tool.invoke({"query": initial["messages"].tool_calls[0]["args"]["search_queries"][0]})),),]}
)
print(revised["messages"])# 创建工具节点
def run_queries(search_queries: List[str], **kwargs):"""运行生成的查询。"""return tavily_tool.batch([{"query": query} for query in search_queries])tool_node = ToolNode([StructuredTool.from_function(run_queries, name=AnswerQuestion.__name__),StructuredTool.from_function(run_queries, name=ReviseAnswer.__name__),]
)# 构建图谱
class State(TypedDict):messages: Annotated[List, add_messages]MAX_ITERATIONS = 5
builder = StateGraph(State)
builder.add_node("draft", first_responder.respond)
builder.add_node("execute_tools", tool_node)
builder.add_node("revise", revisor.respond)# 连接节点
builder.add_edge("draft", "execute_tools")
builder.add_edge("execute_tools", "revise")# 定义循环逻辑
def _get_num_iterations(state: list):i = 0for m in state[::-1]:if m.type not in {"tool", "ai"}:breaki += 1return idef event_loop(state: list):num_iterations = _get_num_iterations(state["messages"])if num_iterations > MAX_ITERATIONS:return ENDreturn "execute_tools"# 修正后可能继续执行工具或结束
builder.add_conditional_edges("revise", event_loop, ["execute_tools", END])
builder.add_edge(START, "draft")
graph = builder.compile()# 执行并观察结果
try:display(Image(graph.get_graph().draw_mermaid_png()))
except Exception:# 这需要一些额外的依赖项,是可选的passevents = graph.stream({"messages": [("user", "我们应该如何应对气候危机?")]},stream_mode="values",
)
for i, step in enumerate(events):print(f"步骤 {i}")step["messages"][-1].pretty_print()
脚本说明
-
环境搭建和API密钥配置:
- 安装必要的Python包。
- 配置Anthropic和Tavily的API密钥。
-
导入库:
- 导入所有需要的库和模块,包括LangChain、LangGraph、Pydantic等。
-
定义数据模型:
- 使用Pydantic定义
Reflection
和AnswerQuestion
模型,以确保数据结构的有效性。 - 定义
ReviseAnswer
模型,继承自AnswerQuestion
,并增加references
字段。
- 使用Pydantic定义
-
定义响应器类:
ResponderWithRetries
类负责生成响应,并在验证失败时进行重试和修正。
-
创建提示模板:
- 使用
ChatPromptTemplate
定义系统和用户的交互模板,引导模型生成回答和进行反思。
- 使用
-
定义LLM:
- 配置并实例化一个大型语言模型,如Anthropic的
ChatAnthropic
。
- 配置并实例化一个大型语言模型,如Anthropic的
-
创建工具:
- 使用Tavily的搜索工具
TavilySearchAPIWrapper
来辅助生成更准确的回答。
- 使用Tavily的搜索工具
-
创建初始回答生成器:
- 结合提示模板和LLM,创建一个初始回答生成器,并绑定Pydantic验证器。
-
定义示例问题并生成初始回答:
- 通过调用
first_responder.respond
生成对示例问题的初始回答。
- 通过调用
-
定义修正指令和修正模型:
- 设定修正回答的指令,确保答案符合质量要求。
- 定义
ReviseAnswer
模型,增加references
字段以支持引用。
-
创建修正链:
- 结合提示模板和LLM,创建一个修正回答的链,并绑定Pydantic验证器。
-
执行修正:
- 通过调用
revisor.respond
,基于初始回答的反思,生成修正后的回答。
- 通过调用
-
创建工具节点:
- 定义一个工具节点,用于执行搜索查询以辅助回答的改进。
-
构建图谱:
- 使用
StateGraph
将所有组件连接起来,定义执行流程和循环逻辑,确保在达到最大迭代次数前持续优化回答。
- 使用
-
执行并观察结果:
- 启动图谱执行,观察AI如何通过多轮反思和修正,不断优化最终回答。
执行说明
-
准备工作:
- 确保已安装所有必要的Python包。
- 获取并配置Anthropic和Tavily的API密钥。
-
运行脚本:
- 将上述代码保存为一个Python文件,例如
reflexion_architecture.py
。 - 在支持Jupyter显示功能的环境中运行此脚本,以便能够看到图谱的可视化结果。
- 将上述代码保存为一个Python文件,例如
-
观察输出:
- 脚本将输出多步执行的结果,每一步包括AI生成的回答、反思和修正。
- 如果环境支持,图谱的可视化图像将显示,帮助理解执行流程。