中文
luckfu的潦草笔记

最初的梦想绝对会到达!


  • 首页

  • 归档

  • 关于我

  • 公益404

  • 搜索

Chainlit 使用 Langflow 后端

时间: 2025-04-24   |   分类: GPT   chatGPT   | 字数: 1359 字 | 阅读: 3分钟 | 阅读次数:
  • LangFlow是一个针对LangChain的GUI,它采用了反应流设计,提供了一种轻松的方式,通过拖放组件和聊天框来实验和原型化流程。 LangFlow允许您探索不同的大语言模型、prompt参数、链配置和代理行为,并跟踪代理的思维过程。

  • Chainlit是一个开源Python包,旨在彻底改变构建和共享语言模型(LM)应用程序的方式。Chainlit可以创建用户界面(UI),类似于由OpenAI开发的ChatGPT用户界面,Chainlit可以开发类

本文将介绍 Chainlit 使用 LangFlow构建 LLM 的工作流,将 LangFlow 发布的服务展现到Chainlit 前端

图片

环境设置

代码首先导入必要的模块,包括 chainlit 用于处理对话界面和 aiohttp 用于异步 HTTP 请求。它定义了 BASE_API_URL 和 LANGFLOW_API_KEY 等常量,用于连接 Langflow 的 API。根据 Langflow API 密钥文档,API 密钥是必需的,用于认证请求。

消息处理

on_message 函数通过 @cl.on_message 装饰器触发,当用户发送新消息时调用。它将聊天上下文转换为 Langflow 期望的格式,准备发送请求。

发送请求到 Langflow

代码构建了 Langflow API 的 URL,包含 stream=true 参数以启用流式响应。请求负载(payload)包括输入消息内容,输入和输出类型均设置为“chat”。然后,它使用 aiohttp.ClientSession 异步发送 POST 请求,包含 JSON 负载和认证头。根据 Langflow API 示例,这种方式与 Langflow 的 /api/v1/run/{FLOW_ID} 端点兼容。

处理流式响应

Langflow 的响应以流式方式返回,代码通过异步迭代 response.content 处理每个事件。事件类型包括:

  • “add_message”:处理新消息的添加;
  • “token”:解码并处理单个令牌,区分“思考内容”和“最终答案”;
  • “end”:结束处理,发送最终答案。

支持“思考”模式

对于“token”事件,代码使用 get_thinking_content 检查是否为思考内容。如果是,则更新 Chainlit 中的“Thinking”步骤;否则,使用 filter_content 处理普通内容并流式传输。

集成的优势

  • 这种 Chainlit 与 Langflow 的集成提供了以下优势:
    增强功能:结合 Chainlit 的界面和 Langflow 的后端,开发者可以构建具有复杂逻辑和工作流的应用。

  • 实时交互:流式处理确保用户实时接收响应,提升对话体验。

  • 灵活性和定制化:两者均为开源 Python 工具,支持广泛的定制和与其他服务的集成。

  • 快速开发:开发者可以利用两者的优势,快速原型设计和部署对话式 AI 应用。

这种集成特别适合需要多代理系统或 RAG 功能的聊天应用开发,例如客户支持机器人或基于知识库的问答系统。

示例代码如下:

@cl.on_message
async def on_message(msg: cl.Message):
    messages = cl.chat_context.to_openai()
    print("Historical messages:", messages)
    start = time.time()
    
    url = f"{BASE_API_URL}/api/v1/run/{FLOW_ID}?stream=true"
    payload = {
        "input_value": msg.content,
        "output_type": "chat",
        "input_type": "chat"
    }
    
    print(f"Request URL: {url}")
    print(f"Request payload: {json.dumps(payload, ensure_ascii=False)}")
    
    thinking = False
    final_answer = cl.Message(content="")
    thinking_step = None
    
    async with aiohttp.ClientSession() as session:
        try:
            headers = {
                "Content-Type": "application/json",
                "Authorization": f"Bearer {LANGFLOW_API_KEY}" 
            }
            
            async with session.post(url, json=payload, headers=headers) as response:
                print(f"Response status: {response.status}")
                
                if response.status != 200:
                    error_text = await response.text()
                    print(f"Error response: {error_text}")
                    raise Exception(f"API request failed: {response.status} - {error_text}")
                
                buffer = ""
                async for line in response.content:
                    if not line:
                        continue
                        
                    try:
                        line_text = line.decode('utf-8').strip()
                        if not line_text:
                            continue
                            
                        event_data = json.loads(line_text)
                        event_type = event_data["event"]
                        
                        if event_type == "add_message":
                            message_id = event_data["data"]["id"]
                            
                        elif event_type == "token":
                            chunk = event_data["data"].get("chunk", "")
                            if chunk:  # 过滤空心跳包
                                try:
                                    # 首先尝试直接解码
                                    decoded = chunk
                                    if '\\u' in chunk:
                                        decoded = chunk.encode().decode('unicode_escape')
                                        
                                    # 使用 get_thinking_content 检查是否包含思考内容
                                    thinking_content = get_thinking_content(type('Delta', (), {'content': decoded})())
                                    
                                    if thinking_content:
                                        # 如果有思考内容,创建或更新 thinking_step
                                        if not thinking_step:
                                            thinking_step = cl.Step(name="Thinking")
                                            await thinking_step.__aenter__()
                                            thinking = True
                                        await thinking_step.stream_token(thinking_content)
                                    else:
                                        # 如果没有思考内容,使用 filter_content 处理普通内容
                                        filtered_content = filter_content(decoded)
                                        if filtered_content:
                                            buffer += filtered_content
                                            await final_answer.stream_token(filtered_content)
                                            
                                except Exception as decode_error:
                                    print(f"解码错误: {decode_error}, 原始chunk: {chunk}")
                                    
                        elif event_type == "end":
                            if thinking and thinking_step:
                                thought_for = round(time.time() - start)
                                thinking_step.name = f"Thought for {thought_for}s"
                                await thinking_step.update()
                                await thinking_step.__aexit__(None, None, None)
                                thinking = False
                            
                            # 设置最终答案
                            final_answer.content = buffer
                            await final_answer.send()
                            
                    except json.JSONDecodeError as e:
                        print(f"解析失败的行: {line_text}, 错误: {str(e)}")
                        continue
                        
        except Exception as e:
            print(f"请求错误: {str(e)}")
            await cl.Message(content=f"抱歉,发生错误: {str(e)}").send()

详细代码参考 https://github.com/luckfu/chat

#Langflow# #Chainlit#

声明:Chainlit 使用 Langflow 后端

链接:http://www.luckfu.com/post/2025-04-24_langflow_and_chainlit/

作者:luckfu

声明: 本博客文章除特别声明外,均采用 CC BY-NC-SA 3.0许可协议,转载请注明出处!

创作实属不易,如有帮助,那就打赏博主些许茶钱吧 ^_^
WeChat Pay

微信打赏

Alipay

支付宝打赏

paypal Pay

PayPal打赏

自定义 LangChain OpenAI 聊天模型(CustomChatOpenAI)重构实践
  • 文章目录
  • 站点概览
luckfu

luckfu

一瞬间,过去的一切都离你而去,剩下的只有回忆!

31 日志
29 分类
41 标签
GitHub
标签云
  • 生活
  • Python
  • Oracle cloud
  • Free vps
  • Gpt
  • K8s
  • Vm.standard.a1.flex
  • Chatgpt
  • Github
  • Langchain
© 2010 - 2025 luckfu的潦草笔记
Powered by - Hugo v0.127.0 / Theme by - NexT
0%