插件窝 干货文章 利用FetchEventSource在大模型流式输出的应用方式

利用FetchEventSource在大模型流式输出的应用方式

code class strong output 833    来源:    2024-10-30

FetchEventSource在大模型流式输出应用

先讲讲这个微软开发的可以使用POST的SSE的api,github链接:GitHub - Azure/fetch-event-source: A better API for making Event Source requests, with all the features of fetch()

FetchEventSource 是微软在 ASP.NET Core 中引入的一个功能,它允许开发者以一种更简单和高效的方式处理 HTTP 请求和响应。

这个功能是作为 ASP.NET Core 的一部分提供的,它利用了 System.Net.Http.Desktop 命名空间中的 FetchResult 类型。

在 ASP.NET Core 中,FetchEventSource 通常用于处理服务器发送的事件(Server-Sent Events,SSE),这是一种允许服务器向客户端异步推送实时数据的技术。

使用 FetchEventSource,开发者可以更容易地创建和消费这些实时数据流。

使用 FetchEventSource 的一些关键点

  • 创建 EventSource 客户端:开发者可以通过 FetchEventSource 创建一个 EventSource 对象,该对象用于连接到服务器上的特定端点。
  • 监听事件:一旦 EventSource 对象被创建,就可以通过注册事件监听器来监听服务器发送的事件。
  • 处理连接EventSource 对象可以处理连接的建立、重连和关闭,以及可能出现的错误。
  • 接收数据:当服务器向客户端推送数据时,可以通过注册的事件监听器接收这些数据。
  • 断线重连:如果连接丢失,EventSource 对象可以自动尝试重新连接到服务器。
  • 取消订阅:开发者可以取消对特定事件的订阅,或者完全关闭 EventSource 连接。

以调用Qwen大模型为例

import { fetchEventSource } from '@microsoft/fetch-event-source';

export default {
  data() {
    return {
      output: '',
      apiKey: '$your-dashscope-api-key', // 替换为你的 DashScope API-KEY
      url: 'https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation',
      body: {
        model: 'qwen-turbo',
        input: {
          messages: [
            {
              role: 'system',
              content: 'You are a helpful assistant.'
            },
            {
              role: 'user',
              content: '你好'
            }
          ]
        },
        parameters: {
          incremental_output: true
        }
      }
    };
  },
  methods: {
    async startSSE() {
      const headers = {
        'Content-Type': 'application/json',
        'Authorization': `Bearer ${this.apiKey}`,
        'X-DashScope-SSE': 'enable'
      };

      try {
        this.eventSource = await fetchEventSource(this.url, {
          method: 'POST',
          headers: headers,
          body: JSON.stringify(this.body),
          onopen: (response) => {
            if (!response.ok) {
              throw new Error('Server returned an error');
            }
          },
          onmessage: (event) => {
            const data = JSON.parse(event.data);
            if (data.output && data.output.choices) {
              const content = data.output.choices[0].message.content;
              this.output += content; // 将内容添加到输出中
            }
          },
          onerror: (err) => {
            console.error('EventSource failed:', err);
            this.stopSSE();
          }
        });
      } catch (error) {
        console.error('Failed to start SSE:', error);
      }
    },
    stopSSE() {
      if (this.eventSource) {
        this.eventSource.close();
        this.eventSource = null;
      }
    }
  },
  unmounted() {
    this.stopSSE();
  }
};

这样就可以建立SSE的链接了。

那么有小伙伴就要问了,那前端怎么实时显示接收到的输出呢?

onmessage: (event) => {
            const data = JSON.parse(event.data);
            if (data.output && data.output.choices) {
              const content = data.output.choices[0].message.content;
              this.output += content; // 将内容添加到输出中
            }
          }
  • 事件处理器声明
onmessage: (event) => { // ... },

这里定义了一个 onmessage 事件处理器。

当通过 SSE 连接接收到消息时,会触发这个处理器。

  • 解析接收到的数据
const data = JSON.parse(event.data);

event.data 包含了服务器发送的消息内容,通常是以 JSON 格式的字符串。

JSON.parse 函数用于将这个 JSON 字符串解析为 JavaScript 对象。

  • 检查输出数据
if (data.output && data.output.choices) { // ... }

这里使用 if 语句来确保 data 对象中存在 output 属性,并且 output 属性中存在 choices 数组。

这是一种防御性编程的做法,用来避免在数据结构不完整时出现错误。

  • 获取消息内容
const content = data.output.choices[0].message.content;

这行代码进一步从 choices 数组中的第一个元素(通常是最相关的或者默认的消息)中提取 message.content

这通常是服务器推送的有用信息或数据。

  • 累加内容
this.output += content;

this.output 是 Vue 组件实例的一个数据属性,用于累积从服务器接收到的所有消息内容。

这里使用 += 操作符将新接收到的 content 追加到 this.output 的当前值上。

整个 onmessage 事件处理器的作用是:当通过 SSE 接收到消息时,它将解析消息内容,从中提取有用的信息,并将其追加到 Vue 组件的 output 数据属性中。

这样,组件的模板中的 <pre>{{ output }}</pre> 就可以显示所有接收到的消息内容,保持其原始的格式。

总结

以上为个人经验,希望对您有所帮助。