Agent搭建与应用
Ollama
本地部署模型+调用,可以使用云服务代替
LangChain
依赖安装:
1 2 3 4 5 6
| pip install langchain langchain-community langchain-ollama dashscope langchain-chroma chromadb
|
PromptTemplate
基于 PromptTemplate 可以得到提示词模板,支持基于模板注入变量得到最终提示词
zero-shot 思想下,可以基于 PromptTemplate 直接完成
few-shot 思想下,需要更换为 FewShotPromptTemplate
Q:使用PromptTemplate还不如自己手动拼接字符串?
A: 使用Template模板构建提示词,在大型工程中更容易做标准化模板,同时支持LangChian框架的链式调用(Runnable接口)
样例:
1 2 3 4 5 6 7
| prompt_template = PromptTemplate.from_template( "我的邻居今年生了个{gender},请你帮他取名" ) prompt = prompt_template.format(gender= "女孩")
print(model.invoke(prompt))
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| prompt_template = PromptTemplate.from_template( "单词: {word} 反义词: {antonym}" )
examples = [ {"word": "大", "antonym": "小"}, {"word": "上", "antonym": "下"}, ]
few_shot_prompt = FewShotPromptTemplate( examples= examples, example_prompt= prompt_template, prefix= "给出给定词的反义词,示例如下", suffix= "{input}的反义词是?", input_variables= ["input"], ) prompt = few_shot_prompt.invoke({"input": "左"}).to_string() ''' prompt 给出给定词的反义词,示例如下
单词: 大 反义词: 小
单词: 上 反义词: 下
左的反义词是? ''' print(model.invoke(prompt))
|
不难发现,在 prompt 中,invoke 比 format 多了个 to_string,这是因为:
| 功能 |
纯字符串替换,解析占位符 生成提示词 |
纯字符串替换,解析占位符 生成提示词 |
| 返回值 |
字符串 |
PromptValue对象 |
| 传参 |
.format(k= v, k= v, …) |
.invoke({k= v, k= v, …}) |
| 解析 |
支持解析{}占位符 |
支持解析占位符和MessagesPlaceholder 结构化占位符 |
样例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| chat_prompt_template = ChatPromptTemplate.from_messages( [ ("system", "你是一个诗人,能够作诗且不说废话"), MessagesPlaceholder(variable_name= "history"), ("human", "来一首唐诗"), ] )
history = [ ("human", "来一首唐诗"), ("ai", "登鹳雀楼\n 白日依山尽,黄河入海流。\n 欲穷千里目,更上一层楼。") ]
prompt = chat_prompt_template.invoke({"history": history}).to_string() ''' System: 你是一个诗人,能够作诗且不说废话 Human: 来一首唐诗 AI: 登鹳雀楼 白日依山尽,黄河入海流。 欲穷千里目,更上一层楼。 Human: 来一首唐诗 ''' print(prompt)
|
Chain
将组件串联,上一个组件的输出作为下一个组件的输入是Langchain链(尤其是|管道链)的核心工作 原理,这也是链式调用的核心价值:实现数据的自动化流转与组件的协同工作,如下
1
| chain = prompt_template | model
|
核心前提:即 Runnable子类对象才能入链(以及callable、Mapping接口子类对象也可加入(用的不多))。
样例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| chat_prompt_template = ChatPromptTemplate.from_messages( [ ("system", "你是一个诗人,能够作诗且不说废话"), MessagesPlaceholder(variable_name= "history"), ("human", "来一首唐诗"), ] )
history = [ ("human", "来一首唐诗"), ("ai", "登鹳雀楼\n 白日依山尽,黄河入海流。\n 欲穷千里目,更上一层楼。") ]
chain = chat_prompt_template | model print(chain.invoke({"history": history})) for chunk in chain.stream({"history": history}): print(chunk, end= "", flush= True)
|
StrOutputParser
在构建 chain 时,例如 chain = prompt | model | model 结构会引发错误,这是因为 model 的输出结果是 AIMessage 类,而 model 只能接收 PromptValue | str | Sequence[MessageLikeRepresentation] 类,显然 AIMessage 不能作为 model 的输入,此时需要内置解析器 StrOutputParser 来解析为简单字符串
样例:
1 2 3 4 5 6 7 8 9 10
| parser = StrOutputParser()
prompt_template = PromptTemplate.from_template( "我的邻居姓{lastname},刚生了个{gender},帮我起个名,仅告知我名字,无需其他内容" )
chain = prompt_template | model | parser | model | parser
for chunk in chain.stream({"lastname": "王", "gender": "男"}): print(chunk, end= "", flush= True)
|
JsonOutputParser
上述样例的链其实并不标准,因为上一个模型的输入没有被处理就送入下一个模型,正常情况下,处理逻辑为:invoke | stream (初始输入) → 提示词模板 → 模型 → 数据处理 → 提示词模板 → 模型 → 解析器 → 结果
JsonOutputParser 能够将 AImessage 类转换为字典,便于注入提示词模板
样例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| str_parser = StrOutputParser() json_parser = JsonOutputParser()
first_prompt_template = PromptTemplate.from_template( "我的邻居姓{lastname},刚生了个{gender},请起名,并封装为JSON格式返回给我," "要求key为name,value为名字。请严格遵循JSON格式要求,请勿返回其他内容" )
second_prompt_template = PromptTemplate.from_template( "分析一下'{name}'这个名字的含义" )
chain = first_prompt_template | model | json_parser | second_prompt_template | model | str_parser
for chunk in chain.stream({"lastname": "王", "gender": "男"}): print(chunk, end= "", flush= True)
|
RunnableLambda
RunnableLambda 类能将普通函数转换为 Runnable 接口实例,方便自定义函数加入 chain
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| str_parser = StrOutputParser()
prompt_template_first = PromptTemplate.from_template( "我的邻居姓{lastname},刚生了个{gender},请起名,仅告诉我名字,无需其他内容" )
prompt_template_second = PromptTemplate.from_template( "简单分析一下'{name}'这个名字的含义" )
func = RunnableLambda(lambda ai_msg: {"name": ai_msg.content})
chain = prompt_template_first | model | func | prompt_template_second | model | str_parser
for chunk in chain.stream({"lastname": "王", "gender": "男"}): print(chunk, end= "", flush= True)
|
通过 RunnableLambda 可以自定义 Debug 函数,便于查找和修复
Memory
临时会话记忆
在内存中存储对话记录,用于临时会话记忆
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
|
prompt_template = ChatPromptTemplate.from_messages( [ ("system", "根据会话历史信息回应用户问题,对话历史:"), MessagesPlaceholder("chat_history"), ("human", "{input}") ] )
str_parser = StrOutputParser()
base_chain = prompt_template | model | str_parser
store = {}
def get_history(session_id): if session_id not in store: store[session_id] = InMemoryChatMessageHistory() return store[session_id]
conversation_history = RunnableWithMessageHistory( base_chain, get_history, input_messages_key= "input", history_messages_key= "chat_history" )
if __name__ == '__main__': session_config = { "configurable": { "session_id": "user_001" } } for chunk in conversation_history.stream({"input": "我姓李,怎么给孩子起名?给出三个名字备选,只要名字"}, session_config): print(chunk, end= "", flush= True) print("\n" + "*" * 20) for chunk in conversation_history.stream({"input": "分析哪个名字更好"}, session_config): print(chunk, end= "", flush= True)
|
长期会话记忆
将对话历史存入文件,实现长期会话记忆
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| class FileChatMessageHistory(BaseChatMessageHistory): def __init__(self, session_id, storage_path): self.session_id = session_id self.storage_path = storage_path self.file_path = os.path.join(self.storage_path, self.session_id) os.makedirs(os.path.dirname(self.file_path), exist_ok=True) def add_messages(self, messages: Sequence[BaseMessage]) -> None: all_messages = list(self.messages) all_messages.extend(messages) new_messages = [message_to_dict(m) for m in all_messages] with open(self.file_path, "w", encoding= 'utf-8') as f: json.dump(new_messages, f, indent= 4) @property def messages(self) -> list[BaseMessage]: try: with open(self.file_path, "r", encoding= 'utf-8') as f: messages_data = json.load(f) return messages_from_dict(messages_data) except FileNotFoundError: return [] def clear(self) -> None: with open(self.file_path, "w", encoding= 'utf-8') as f: json.dump([], f) def get_history(session_id): return FileChatMessageHistory(session_id, "./history")
|
Document Loader
使用 Document Loader 来加载各种数据文件,确保无论数据来源如何,都能对其进行一致性处理
CSVLoader
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| loader = CSVLoader( file_path= "./data/data.csv", csv_args={ "delimiter": ",", "quotechar": '"', "fieldnames": ["name", "age", "gender"], }, encoding= "utf-8" )
documents = loader.load()
for document in documents: print(document) print("*" * 100)
for ducument in loader.lazy_load(): print(ducument)
|
JsonLoader
使用 JsonLoader 需要额外安装依赖:
1 2 3 4 5 6 7 8 9
| loader = JSONLoader( file_path= "./data/stus.json", jq_schema= ".[].name", text_content= True, )
print(loader.load())
|
TextLoader
文本分割有利于关键字检索
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| loader = TextLoader("./data/data.txt", encoding= "utf-8")
docs = loader.load()
spilitter = RecursiveCharacterTextSplitter( chunk_size= 100, chunk_overlap= 20, separators= ["\n\n", "\n", " ", "", "。", ",", ",", ".", "!", "?", "!", "?"], length_function= len, ) splited_docs = spilitter.split_documents(docs) print(len(splited_docs)) for doc in splited_docs: print(doc)
|
PyPDFLoader
使用 PyPDFLoader 需要额外安装依赖:
1 2 3 4 5 6
| loader = PyPDFLoader( file_path= "./data/data.pdf", mode= "page", password= "", )
|
VectorStores
向量的存储与关键词相似性检索是实现 RAG 的重要部分
在内存内储存变量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
|
vector_store = InMemoryVectorStore( embedding= embedding_model )
vector_store.add_documents( documents= documents, ids= [f"id_{i}" for i in range(1, len(documents) + 1)] )
vector_store.delete(['id_1', 'id_2'])
results = vector_store.similarity_search( query= "怎么创建一个Scrapy项目?", k= 3, )
for result in results: print(result)
|
外部向量持久化保存
本文使用 Chroma 向量数据库(轻量级)
依赖:langchain-chroma chromadb
vscode可以安装mysql扩展进行数据库可视化(推荐)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
|
vector_store = Chroma( collection_name= "test", embedding_function= embedding_model, persist_directory= "./chroma_db", )
vector_store.add_documents( documents= documents, ids= [f"id_{i}" for i in range(1, len(documents) + 1)] )
vector_store.delete(['id_1', 'id_2'])
results = vector_store.similarity_search( query= "怎么创建一个Scrapy项目?", k= 3 )
for result in results: print(result)
|
结合LLM案例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| prompt = ChatPromptTemplate.from_messages( [ ("system", "以提供的参考资料为主,简洁和专业地回答用户问题,参考资料:{reference}"), ("human", "{question}") ] )
vector_store = Chroma( collection_name= "test", embedding_function= embedding_model, persist_directory= "./chroma_db" )
question = "怎么创建一个scrapy项目?" references = vector_store.similarity_search(question, k= 3) references_text = [reference.page_content for reference in references]
def print_prompt(prompt): print("*" * 20 + "Prompt" + "*" * 20) print(prompt) print("*" * 20 + "******" + "*" * 20) return prompt
chain = prompt | RunnableLambda(lambda prompt: print_prompt(prompt)) |chat_model
for chunk in chain.stream({"question": question, "reference": references_text}): print(chunk, end= "", flush= True)
|
RunnablePassthrough
在上述代码中,手动调用关键词比对过于繁琐,且不易于工程化,我们可以将其加入链
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| retriever = vector_store.as_retriever(search_kwargs={"k": 3}) ''' retriever : - 输入: 用户的问题 str - 输出: 引用的参考资料 list[Document] prompt : - 输入: 用户的问题和参考资料 dict - 输出: 完整提示词 PromptValue ''' format_doc = RunnableLambda(lambda references: [reference.page_content for reference in references])
chain = ( {"question": RunnablePassthrough(), "reference": retriever | format_doc } | prompt | chat_model )
for chunk in chain.stream("response的属性与方法都有哪些"): print(chunk, end= "", flush= True)
|
Agent
ReAct
Agent ReAct是大模型智能体的核心思考与行动框架,全称Reasoning+Acting(推理+行动),是让Agent像人类一样「思考问题 → 制定策略 → 执行行动 → 验证结果」的关键逻辑。
简单来说:ReAct让Agent不再是“直接回答问题”,而是通过“自然语言思考过程”指导工具调用,一步步解决复杂问题,完美适配需要多步推理、工具协作的场景(如智能客服、报告生成、任务规划等)。
一个典型的 ReAct 范式包括:
思考 Reasoning:分析问题,判断现有信息是否足够,明确下一步
- 即模型决策是否需要调用外部工具获取更多信息用来回答
行动 Action:执行思考阶段指定的策略
观察 Observation:获取行动的结果,提取有效信息
- 即获取工具返回值即判断工具是否正常工作位下一轮思考提供信息
重复 Repeat:重复上述行为直到结束
下面是一个简单的 ReAct 代码示例:
langchain 支持工具并行运行,下文要求 每轮仅能思考并调用1个工具 是为了演示 ReAct 的实际运行过程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| @tool(description= "获取体重,返回值是int,单位为kg") def get_weight() -> int: return 70
@tool(description= "获取身高,返回值是int,单位为cm") def get_height() -> int: return 175
agent = create_agent( model= model, tools= [get_weight, get_height], system_prompt= ''' 你是严格遵循ReAct框架的智能体,必须按「思考→行动→观察→再思考」的流程解决问题, 且**每轮仅能思考并调用1个工具**,禁止单次调用多个工具。 并告知我你的思考过程,工具的调用原因,按思考、行动、观察三个结构告知我 ''' )
for chunk in agent.stream( { "messages": [ {"role": "user", "content": "计算一下我的BMI是多少?"} ] }, stream_mode= "values" ): latest_message = chunk["messages"][-1] if latest_message.content: print(f"{type(latest_message).__name__}: {latest_message.content}") try: if latest_message.tool_calls: print(f"Calling tool: {[tc['name'] for tc in latest_message.tool_calls]}") except: pass
|
Middleware
中间件的作用是对智能体的每一步工作进行控制和自定义的执行
作用场景:
- 日志记录、分析、调试
- 转换提示词、工具选择
- 重试、备用、提前终止等逻辑控制
- 安全防护、个人身份检测等
LangChain中内置了一些基础的中间件,中间件通过Hooks钩子来实现拦截,自定义中间件可以简单的使用装饰器来定义。
详细参见:https://docs.langchain.com/oss/python/langchain/middleware/built-in
节点式钩子(执行点顺序拦截):
before_agent:agent 执行之前拦截
after_agent:agent执行后拦截
before_model: 模型执行前拦截
after_model: 模型执行后拦截
针对工具和模型的包装式钩子:
wrap_model_call:每个模型调用时拦截
wrap_tool_call:每个工具调用时拦截
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| @tool(description= "查询天气,传入城市名称(str),返回天气信息(str)") def get_weather(city: str) -> str: return f"{city}: 今天是晴天,温度是25度"
@before_agent def log_before_agent(state: AgentState, runtime: Runtime) -> None: print(f"[Before agent]agent启动,附带{len(state['messages'])}条信息") @after_agent def log_after_agent(state: AgentState, runtime: Runtime) -> None: print(f"[After agent]agent结束,附带{len(state['messages'])}条信息") @before_model def log_before_model(state: AgentState, runtime: Runtime) -> None: print(f"[Before model]model启动,附带{len(state['messages'])}条信息") @after_model def log_after_model(state: AgentState, runtime: Runtime) -> None: print(f"[After model]model结束,附带{len(state['messages'])}条信息") @wrap_model_call def model_call_hook(request, handler): print("[hook]模型正在调用") return handler(request)
@wrap_tool_call def tool_call_hook(request, handler): print(f"[hook]工具执行:{request.tool_call['name']}") print(f"[hook]工具参数:{request.tool_call['args']}") return handler(request)
agent = create_agent( model= ChatOllama(model= "qwen3:4b"), tools= [get_weather], middleware= [ log_before_agent, log_after_agent, log_before_model, log_after_model, model_call_hook, tool_call_hook, ], system_prompt= "你是一个聊天助手,可以简介精炼地回答用户问题" ) =================================================================
HumanMessage: 今天深圳天气如何? [Before agent]agent启动,附带1条信息 [Before model]model启动,附带1条信息 [hook]模型正在调用 Calling tool: ['get_weather'] [After model]model结束,附带2条信息 [hook]工具执行:get_weather [hook]工具参数:{'city': '深圳'} ToolMessage: 深圳: 今天是晴天,温度是25度 [Before model]model启动,附带3条信息 [hook]模型正在调用 AIMessage: 今天深圳晴天,气温25度。 [After model]model结束,附带4条信息 [After agent]agent结束,附带4条信息
|