LangFlow是一个针对LangChain的GUI,它采用了反应流设计,提供了一种轻松的方式,通过拖放组件和聊天框来实验和原型化流程。 LangFlow允许您探索不同的大语言模型、prompt参数、链配置和代理行为,并跟踪代理的思维过程。
Chainlit是一个开源Python包,旨在彻底改变构建和共享语言模型(LM)应用程序的方式。Chainlit可以创建用户界面(UI),类似于由OpenAI开发的ChatGPT用户界面,Chainlit可以开发类
本文将介绍 Chainlit 使用 LangFlow构建 LLM 的工作流,将 LangFlow 发布的服务展现到Chainlit 前端
环境设置
代码首先导入必要的模块,包括 chainlit 用于处理对话界面和 aiohttp 用于异步 HTTP 请求。它定义了 BASE_API_URL 和 LANGFLOW_API_KEY 等常量,用于连接 Langflow 的 API。根据 Langflow API 密钥文档,API 密钥是必需的,用于认证请求。
消息处理
on_message 函数通过 @cl.on_message 装饰器触发,当用户发送新消息时调用。它将聊天上下文转换为 Langflow 期望的格式,准备发送请求。
发送请求到 Langflow
代码构建了 Langflow API 的 URL,包含 stream=true 参数以启用流式响应。请求负载(payload)包括输入消息内容,输入和输出类型均设置为“chat”。然后,它使用 aiohttp.ClientSession 异步发送 POST 请求,包含 JSON 负载和认证头。根据 Langflow API 示例,这种方式与 Langflow 的 /api/v1/run/{FLOW_ID} 端点兼容。
处理流式响应
Langflow 的响应以流式方式返回,代码通过异步迭代 response.content 处理每个事件。事件类型包括:
- “add_message”:处理新消息的添加;
- “token”:解码并处理单个令牌,区分“思考内容”和“最终答案”;
- “end”:结束处理,发送最终答案。
支持“思考”模式
对于“token”事件,代码使用 get_thinking_content 检查是否为思考内容。如果是,则更新 Chainlit 中的“Thinking”步骤;否则,使用 filter_content 处理普通内容并流式传输。
集成的优势
这种 Chainlit 与 Langflow 的集成提供了以下优势:
增强功能:结合 Chainlit 的界面和 Langflow 的后端,开发者可以构建具有复杂逻辑和工作流的应用。实时交互:流式处理确保用户实时接收响应,提升对话体验。
灵活性和定制化:两者均为开源 Python 工具,支持广泛的定制和与其他服务的集成。
快速开发:开发者可以利用两者的优势,快速原型设计和部署对话式 AI 应用。
这种集成特别适合需要多代理系统或 RAG 功能的聊天应用开发,例如客户支持机器人或基于知识库的问答系统。
示例代码如下:
@cl.on_message
async def on_message(msg: cl.Message):
messages = cl.chat_context.to_openai()
print("Historical messages:", messages)
start = time.time()
url = f"{BASE_API_URL}/api/v1/run/{FLOW_ID}?stream=true"
payload = {
"input_value": msg.content,
"output_type": "chat",
"input_type": "chat"
}
print(f"Request URL: {url}")
print(f"Request payload: {json.dumps(payload, ensure_ascii=False)}")
thinking = False
final_answer = cl.Message(content="")
thinking_step = None
async with aiohttp.ClientSession() as session:
try:
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {LANGFLOW_API_KEY}"
}
async with session.post(url, json=payload, headers=headers) as response:
print(f"Response status: {response.status}")
if response.status != 200:
error_text = await response.text()
print(f"Error response: {error_text}")
raise Exception(f"API request failed: {response.status} - {error_text}")
buffer = ""
async for line in response.content:
if not line:
continue
try:
line_text = line.decode('utf-8').strip()
if not line_text:
continue
event_data = json.loads(line_text)
event_type = event_data["event"]
if event_type == "add_message":
message_id = event_data["data"]["id"]
elif event_type == "token":
chunk = event_data["data"].get("chunk", "")
if chunk: # 过滤空心跳包
try:
# 首先尝试直接解码
decoded = chunk
if '\\u' in chunk:
decoded = chunk.encode().decode('unicode_escape')
# 使用 get_thinking_content 检查是否包含思考内容
thinking_content = get_thinking_content(type('Delta', (), {'content': decoded})())
if thinking_content:
# 如果有思考内容,创建或更新 thinking_step
if not thinking_step:
thinking_step = cl.Step(name="Thinking")
await thinking_step.__aenter__()
thinking = True
await thinking_step.stream_token(thinking_content)
else:
# 如果没有思考内容,使用 filter_content 处理普通内容
filtered_content = filter_content(decoded)
if filtered_content:
buffer += filtered_content
await final_answer.stream_token(filtered_content)
except Exception as decode_error:
print(f"解码错误: {decode_error}, 原始chunk: {chunk}")
elif event_type == "end":
if thinking and thinking_step:
thought_for = round(time.time() - start)
thinking_step.name = f"Thought for {thought_for}s"
await thinking_step.update()
await thinking_step.__aexit__(None, None, None)
thinking = False
# 设置最终答案
final_answer.content = buffer
await final_answer.send()
except json.JSONDecodeError as e:
print(f"解析失败的行: {line_text}, 错误: {str(e)}")
continue
except Exception as e:
print(f"请求错误: {str(e)}")
await cl.Message(content=f"抱歉,发生错误: {str(e)}").send()