为什么使用 LCEL
为什么使用 LCEL
我们建议首先阅读 LCEL 入门 部分。
LCEL 通过提供以下功能,使得从基本组件构建复杂链变得容易。它通过提供以下方式实现:
- 统一的接口:每个 LCEL 对象都实现了
Runnable
接口,该接口定义了一组公共的调用方法(invoke
、batch
、stream
、ainvoke
,等等)。这使得 LCEL 对象链也自动支持这些调用成为可能。也就是说,每个 LCEL 对象链本身也是一个 LCEL 对象。 - 组合原语:LCEL 提供了一些原语,使得容易组合链,并行化组件,添加回退,动态配置链内部等等。
为了更好地理解 LCEL 的价值,看到它的工作原理,并考虑如何在没有它的情况下重新创建类似的功能是很有帮助的。在这个步骤中,我们将使用入门部分的基本示例。我们将采用我们简单的提示 + 模型链,它在底层已经定义了很多功能,然后看看重新创建所有这些功能需要什么。
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
prompt = ChatPromptTemplate.from_template("Tell me a short joke about {topic}")
model = ChatOpenAI(model="gpt-3.5-turbo")
output_parser = StrOutputParser()
chain = prompt | model | output_parser
调用
在最简单的情况下,我们只需传入一个主题字符串,并获得一个笑话字符串:
#### 没有 LCEL
```python
from typing import List
import openai
prompt_template = "Tell me a short joke about {topic}"
client = openai.OpenAI()
def call_chat_model(messages: List[dict]) -> str:
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=messages,
)
return response.choices[0].message.content
def invoke_chain(topic: str) -> str:
prompt_value = prompt_template.format(topic=topic)
messages = [{"role": "user", "content": prompt_value}]
return call_chat_model(messages)
invoke_chain("ice cream")
LCEL
from langchain_core.runnables import RunnablePassthrough
prompt = ChatPromptTemplate.from_template(
"Tell me a short joke about {topic}"
)
output_parser = StrOutputParser()
model = ChatOpenAI(model="gpt-3.5-turbo")
chain = (
{"topic": RunnablePassthrough()}
| prompt
| model
| output_parser
)
chain.invoke("ice cream")
流式传输
如果我们想要流式传输结果,我们需要改变我们的函数:
没有 LCEL
from typing import Iterator
def stream_chat_model(messages: List[dict]) -> Iterator[str]:
stream = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=messages,
stream=True,
)
for response in stream:
content = response.choices[0].delta.content
if content is not None:
yield content
def stream_chain(topic: str) -> Iterator[str]:
prompt_value = prompt.format(topic=topic)
return stream_chat_model([{"role": "user", "content": prompt_value}])
for chunk in stream_chain("ice cream"):
print(chunk, end="", flush=True)
LCEL
for chunk in chain.stream("ice cream"):
print(chunk, end="", flush=True)
批处理
如果我们想要并行运行一批输入,则需要一个新的函数:
没有 LCEL
from concurrent.futures import ThreadPoolExecutor
def batch_chain(topics: list) -> list:
with ThreadPoolExecutor(max_workers=5) as executor:
return list(executor.map(invoke_chain, topics))
batch_chain(["ice cream", "spaghetti", "dumplings"])
LCEL
chain.batch(["ice cream", "spaghetti", "dumplings"])
异步
如果我们需要一个异步版本:
没有 LCEL
async_client = openai.AsyncOpenAI()
async def acall_chat_model(messages: List[dict]) -> str:
response = await async_client.chat.completions.create(
model="gpt-3.5-turbo",
messages=messages,
)
return response.choices[0].message.content
async def ainvoke_chain(topic: str) -> str:
prompt_value = prompt_template.format(topic=topic)
messages = [{"role": "user", "content": prompt_value}]
return await acall_chat_model(messages)
await ainvoke_chain("ice cream")
LCEL
chain.ainvoke("冰淇淋")
LLM instead of chat model
如果我们想要使用完成端点而不是聊天端点:
Without LCEL
def call_llm(prompt_value: str) -> str:
response = client.completions.create(
model="gpt-3.5-turbo-instruct",
prompt=prompt_value,
)
return response.choices[0].text
def invoke_llm_chain(topic: str) -> str:
prompt_value = prompt_template.format(topic=topic)
return call_llm(prompt_value)
invoke_llm_chain("冰淇淋")
LCEL
from langchain_openai import OpenAI
llm = OpenAI(model="gpt-3.5-turbo-instruct")
llm_chain = (
{"topic": RunnablePassthrough()}
| prompt
| llm
| output_parser
)
llm_chain.invoke("冰淇淋")
Different model provider
如果我们想要使用Anthropic而不是OpenAI:
Without LCEL
import anthropic
anthropic_template = f"Human:\n\n{prompt_template}\n\nAssistant:"
anthropic_client = anthropic.Anthropic()
def call_anthropic(prompt_value: str) -> str:
response = anthropic_client.completions.create(
model="claude-2",
prompt=prompt_value,
max_tokens_to_sample=256,
)
return response.completion
def invoke_anthropic_chain(topic: str) -> str:
prompt_value = anthropic_template.format(topic=topic)
return call_anthropic(prompt_value)
invoke_anthropic_chain("冰淇淋")
LCEL
from langchain_community.chat_models import ChatAnthropic
anthropic = ChatAnthropic(model="claude-2")
anthropic_chain = (
{"topic": RunnablePassthrough()}
| prompt
| anthropic
| output_parser
)
anthropic_chain.invoke("冰淇淋")
Runtime configurability
如果我们想要在运行时使聊天模型或LLM的选择可配置:
Without LCEL
def invoke_configurable_chain(
topic: str,
*,
model: str = "chat_openai"
) -> str:
if model == "chat_openai":
return invoke_chain(topic)
elif model == "openai":
return invoke_llm_chain(topic)
elif model == "anthropic":
return invoke_anthropic_chain(topic)
else:
raise ValueError(
f"Received invalid model '{model}'."
" Expected one of chat_openai, openai, anthropic"
)
def stream_configurable_chain(
topic: str,
*,
model: str = "chat_openai"
) -> Iterator[str]:
if model == "chat_openai":
return stream_chain(topic)
elif model == "openai":
# 注意我们还没有实现这个
return stream_llm_chain(topic)
elif model == "anthropic":
# 注意我们还没有实现这个
return stream_anthropic_chain(topic)
else:
raise ValueError(
f"Received invalid model '{model}'."
" Expected one of chat_openai, openai, anthropic"
)
def batch_configurable_chain(
topics: List[str],
*,
model: str = "chat_openai"
) -> List[str]:
# 你明白了
...
async def abatch_configurable_chain(
topics: List[str],
*,
model: str = "chat_openai"
) -> List[str]:
...
invoke_configurable_chain("冰淇淋", model="openai")
stream = stream_configurable_chain(
"冰淇淋",
model="anthropic"
)
for chunk in stream:
print(chunk, end="", flush=True)
# batch_configurable_chain(["冰淇淋", "意大利面", "饺子"])
# await ainvoke_configurable_chain("冰淇淋")
With LCEL
from langchain_core.runnables import ConfigurableField
configurable_model = model.configurable_alternatives(
ConfigurableField(id="model"),
default_key="chat_openai",
openai=llm,
anthropic=anthropic,
)
configurable_chain = (
{"topic": RunnablePassthrough()}
| prompt
| configurable_model
| output_parser
)
configurable_chain.invoke(
"冰淇淋",
config={"model": "openai"}
)
stream = configurable_chain.stream(
"冰淇淋",
config={"model": "anthropic"}
)
for chunk in stream:
print(chunk, end="", flush=True)
configurable_chain.batch(["冰淇淋", "意大利面", "饺子"])
# await configurable_chain.ainvoke("冰淇淋")
Logging
如果我们想要记录中间结果:
Without LCEL
我们将为了说明目的而print
中间步骤
def invoke_anthropic_chain_with_logging(topic: str) -> str:
print(f"输入: {topic}")
prompt_value = anthropic_template.format(topic=topic)
print(f"格式化的提示: {prompt_value}")
output = call_anthropic(prompt_value)
print(f"输出: {output}")
return output
invoke_anthropic_chain_with_logging("冰淇淋")
LCEL
每个组件都与LangSmith集成。如果我们设置以下两个环境变量,所有链追踪都将记录到LangSmith中。
import os
os.environ["LANGCHAIN_API_KEY"] = "..."
os.environ["LANGCHAIN_TRACING_V2"] = "true"
anthropic_chain.invoke("冰淇淋")
这是我们的LangSmith追踪的样子:https://smith.langchain.com/public/e4de52f8-bcd9-4732-b950-deee4b04e313/r
Fallbacks
如果我们想要添加备用逻辑,以防一个模型API出现故障:
Without LCEL
def invoke_chain_with_fallback(topic: str) -> str:
try:
return invoke_chain(topic)
except Exception:
return invoke_anthropic_chain(topic)
async def ainvoke_chain_with_fallback(topic: str) -> str:
try:
return await ainvoke_chain(topic)
except Exception:
# 注意:我们实际上还没有实现这个。
return ainvoke_anthropic_chain(topic)
async def batch_chain_with_fallback(topics: List[str]) -> str:
try:
return batch_chain(topics)
except Exception:
# 注意:我们实际上还没有实现这个。
return batch_anthropic_chain(topics)
invoke_chain_with_fallback("冰淇淋")
# await ainvoke_chain_with_fallback("冰淇淋")
batch_chain_with_fallback(["冰淇淋", "意大利面", "饺子"]))
LCEL
fallback_chain = chain.with_fallbacks([anthropic_chain])
fallback_chain.invoke("冰淇淋")
# await fallback_chain.ainvoke("冰淇淋")
fallback_chain.batch(["冰淇淋", "意大利面", "饺子"])
Full code comparison
即使在这个简单的例子中,我们的LCEL链也可以简洁地包含很多功能。随着链变得更加复杂,这变得尤为有价值。
Without LCEL
from concurrent.futures import ThreadPoolExecutor
from typing import Iterator, List, Tuple
import anthropic
import openai
prompt_template = "告诉我一个关于{topic}的笑话"
anthropic_template = f"人类:\n\n{prompt_template}\n\n助手:"
client = openai.OpenAI()
async_client = openai.AsyncOpenAI()
anthropic_client = anthropic.Anthropic()
def call_chat_model(messages: List[dict]) -> str:
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=messages,
)
return response.choices[0].message.content
def invoke_chain(topic: str) -> str:
print(f"输入: {topic}")
prompt_value = prompt_template.format(topic=topic)
print(f"格式化的提示: {prompt_value}")
messages = [{"role": "user", "content": prompt_value}]
output = call_chat_model(messages)
print(f"输出: {output}")
return output
def stream_chat_model(messages: List[dict]) -> Iterator[str]:
stream = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=messages,
stream=True,
)
for response in stream:
content = response.choices[0].delta.content
if content is not None:
yield content
def stream_chain(topic: str) -> Iterator[str]:
print(f"输入: {topic}")
prompt_value = prompt.format(topic=topic)
print(f"格式化的提示: {prompt_value}")
stream = stream_chat_model([{"role": "user", "content": prompt_value}])
for chunk in stream:
print(f"Token: {chunk}", end="")
yield chunk
def batch_chain(topics: list) -> list:
with ThreadPoolExecutor(max_workers=5) as executor:
return list(executor.map(invoke_chain, topics))
def call_llm(prompt_value: str) -> str:
response = client.completions.create(
model="gpt-3.5-turbo-instruct",
prompt=prompt_value,
)
return response.choices[0].text
def invoke_llm_chain(topic: str) -> str:
print(f"输入: {topic}")
prompt_value = promtp_template.format(topic=topic)
print(f"格式化的提示: {prompt_value}")
output = call_llm(prompt_value)
print(f"输出: {output}")
return output
def call_anthropic(prompt_value: str) -> str:
response = anthropic_client.completions.create(
model="claude-2",
prompt=prompt_value,
max_tokens_to_sample=256,
)
return response.completion
def invoke_anthropic_chain(topic: str) -> str:
print(f"输入: {topic}")
prompt_value = anthropic_template.format(topic=topic)
print(f"格式化的提示: {prompt_value}")
output = call_anthropic(prompt_value)
print(f"输出: {output}")
return output
async def ainvoke_anthropic_chain(topic: str) -> str:
...
def stream_anthropic_chain(topic: str) -> Iterator[str]:
...
def batch_anthropic_chain(topics: List[str]) -> List[str]:
...
def invoke_configurable_chain(
topic: str,
*,
model: str = "chat_openai"
) -> str:
if model == "chat_openai":
return invoke_chain(topic)
elif model == "openai":
return invoke_llm_chain(topic)
elif model == "anthropic":
return invoke_anthropic_chain(topic)
else:
raise ValueError(
f"接收到无效的模型 '{model}'。"
" 期望其中之一 chat_openai, openai, anthropic"
)
def stream_configurable_chain(
topic: str,
*,
model: str = "chat_openai"
) -> Iterator[str]:
if model == "chat_openai":
return stream_chain(topic)
elif model == "openai":
>>>>>>> 9f1a2c490f6c7f3f7b7b8a3a1b2d0e2a6f5e6b0f
return stream_llm_chain(topic)
elif model == "anthropic":
return stream_anthropic_chain(topic)
else:
raise ValueError(
f"Received invalid model '{model}'."
" Expected one of chat_openai, openai, anthropic"
)
def batch_configurable_chain(
topics: List[str],
*,
model: str = "chat_openai"
) -> List[str]:
if model == "chat_openai":
return batch_chain(topics)
elif model == "openai":
return batch_llm_chain(topics)
elif model == "anthropic":
return batch_anthropic_chain(topics)
else:
raise ValueError(
f"Received invalid model '{model}'."
" Expected one of chat_openai, openai, anthropic"
)
LCEL
from langchain_core.runnables import ConfigurableField
configurable_model = model.configurable_alternatives(
ConfigurableField(id="model"),
default_key="chat_openai",
openai=llm,
anthropic=anthropic,
)
configurable_chain = (
{"topic": RunnablePassthrough()}
| prompt
| configurable_model
| output_parser
)
configurable_chain.invoke(
"冰淇淋",
config={"model": "openai"}
)
stream = configurable_chain.stream(
"冰淇淋",
config={"model": "anthropic"}
)
for chunk in stream:
print(chunk, end="", flush=True)
configurable_chain.batch(["冰淇淋", "意大利面", "饺子"])
# await configurable_chain.ainvoke("冰淇淋")
注意我们还没有实现这个功能。
返回 stream_llm_chain(topic) elif model == "anthropic":
注意我们还没有实现这个功能
返回 stream_anthropic_chain(topic) else: raise ValueError( f"Received invalid model '{model}'." " Expected one of chat_openai, openai, anthropic" )
def batch_configurable_chain( topics: List[str], *, model: str = "chat_openai" ) -> List[str]: ...
async def abatch_configurable_chain( topics: List[str], *, model: str = "chat_openai" ) -> List[str]: ...
def invoke_chain_with_fallback(topic: str) -> str: try: 返回 invoke_chain(topic) except Exception: 返回 invoke_anthropic_chain(topic)
async def ainvoke_chain_with_fallback(topic: str) -> str: try: 返回 await ainvoke_chain(topic) except Exception: 返回 ainvoke_anthropic_chain(topic)
async def batch_chain_with_fallback(topics: List[str]) -> str: try: 返回 batch_chain(topics) except Exception: 返回 batch_anthropic_chain(topics)
#### LCEL
```python
import os
from langchain_community.chat_models import ChatAnthropic
from langchain_openai import ChatOpenAI
from langchain_openai import OpenAI
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough, ConfigurableField
os.environ["LANGCHAIN_API_KEY"] = "..."
os.environ["LANGCHAIN_TRACING_V2"] = "true"
prompt = ChatPromptTemplate.from_template(
"Tell me a short joke about {topic}"
)
chat_openai = ChatOpenAI(model="gpt-3.5-turbo")
openai = OpenAI(model="gpt-3.5-turbo-instruct")
anthropic = ChatAnthropic(model="claude-2")
model = (
chat_openai
.with_fallbacks([anthropic])
.configurable_alternatives(
ConfigurableField(id="model"),
default_key="chat_openai",
openai=openai,
anthropic=anthropic,
)
)
chain = (
{"topic": RunnablePassthrough()}
| prompt
| model
| StrOutputParser()
)
下一步
要继续学习有关LCEL的内容,我们建议: