uni-app通过SSE支持流式效果

发布于 1周前 作者 sinazl 来自 Uni-App

uni-app通过SSE支持流式效果

uni-app支持SSE

H5

因为Web端运行在浏览器内核上,SSE的支持是比较完备的,可以使用axios、[@microsoft/fetch-event-source](https://github.com/Azure/fetch-event-source) 等实现,各种案例也比较完善因此不再赘述。

微信小程序

微信小程序的SSE方案参考的是《微信小程序除了WebSocket其他思路实现流传输文字(打字机)效果》

TypeScript代码

import type { UnCancelTokenListener, UnGenericAbortSignal, UnHeaders } from '@uni-helper/uni-network'  

/**  
 * 二进制解析成文本  
 * @param data 二进制数据  
 * @returns 文本  
 */  
export function decodeArrayBuffer(data: ArrayBuffer | undefined) {  
  if (!data) {  
    return ''  
  }  
  return decodeUsingURIComponent(data)  
}  

/**  
 * URIComponent解码二进制流(不用引入额外包)  
 * @param data 二进制流  
 * @returns 文本  
 */  
function decodeUsingURIComponent(data: ArrayBuffer) {  
  const uint8Array = new Uint8Array(data)  
  let text = String.fromCharCode(...uint8Array)  
  try {  
    text = decodeURIComponent(escape(text))  
  } catch (e) {  
    console.error('decodeUsingURIComponent: Can not decodeURI ', text)  
  }  
  return text  
}  

type onStreamReceivedListener = (text: string) => void  

export function fetchStreamChat(  
  params: { prompt: string; uuid: string },  
  signal?: UnGenericAbortSignal,  
  listener?: onStreamReceivedListener  
) {  
  const onHeadersReceived = (response?: { headers?: UnHeaders }) => {  
    console.log('fetchStreamChat.onHeadersReceived: ', response?.headers)  
  }  
  const onChunkReceived = (response?: { data?: ArrayBuffer }) => {  
    const text = decodeArrayBuffer(response?.data)  
    listener?.(text)  
  }  
  return post<string>({  
    url: '/openai/completions/stream',  
    headers: {  
      Accept: 'text/event-stream',  
      'Content-Type': 'application/json',  
      token: 'your-token'  
    },  
    data: {  
      content: params.prompt,  
      scene: params.uuid,  
      source: 'gpt4',  
    },  
    responseType: 'arraybuffer',  
    enableChunked: true,  
    onHeadersReceived,  
    onChunkReceived,  
    signal: signal  
  })  
}

特殊注意以上代码使用时对abort-controller的引入方式

import AbortController from 'abort-controller/dist/abort-controller'  

let controller = new AbortController()  
const onResponseListener = async (responseText: string) => {  
console.log('==response==\n', responseText)  
}  
await fetchStreamChat({ prompt, uuid }, controller.signal, onResponseListener)

后端NGINX配置

# 注意这里只配置代理发送接口,不然其他接口也会受影响  

location /openai/completions/stream {  
    # ...more config  

    proxy_set_header Transfer-Encoding "";  
    chunked_transfer_encoding on;  
    proxy_buffering off;  
}

APP

App目前看到的方案最多,但是目前为止没有找到很合适的方案。有更好的方案请各位大佬补充Thanks♪(・ω・)ノ

1. plus.net.XMLHttpRequest

参考方案 《XMLHttpRequest模块管理网络请求》,具体代码如下

import type { UnCancelTokenListener, UnGenericAbortSignal, UnHeaders } from '@uni-helper/uni-network'  

type onStreamReceivedListener = (text: string) => void  

export class CanceledError extends Error {  
  constructor(message?: string) {  
    super(message ?? 'canceled')  
  }  
}  

export function fetchStreamChatForApp(  
  params: { prompt: string; uuid: string },  
  signal?: UnGenericAbortSignal,  
  listener?: onStreamReceivedListener  
) {  
  return new Promise((resolve, reject) => {  
    // 梳理好请求数据  
    const token =  'your-token'  
    const data = JSON.stringify({  
      content: params.prompt,  
      scene: params.uuid,  
      source: 'gpt3.5',  
    })  
    // 处理资源释放  
    let onCanceled: UnCancelTokenListener  
    const done = () => {  
      signal?.removeEventListener?.('abort', onCanceled)  
    }  

    // 封装请求  
    // @ts-ignore  
    let xhr: plus.net.XMLHttpRequest | undefined  
    // @ts-ignore  
    xhr = new plus.net.XMLHttpRequest()  
    xhr.withCredentials = true  
    // 配置终止逻辑  
    if (signal) {  
      signal.addEventListener?.('abort', () => {  
        console.log('fetchStreamChatForApp signal abort')  
        xhr.abort()  
      })  
    }  
    let nLastIndex = 0  
    xhr.onreadystatechange = function () {  
      console.log(`onreadystatechange(${xhr.readyState}) → `)  
      if (xhr.readyState === 4) {  
        if (nLastIndex < xhr.responseText.length) {  
          const responseText = xhr.responseText as string  
          // 处理 HTTP 数据块  
          if (responseText) {  
            const textLen = responseText.length  
            const chunk = responseText.substring(nLastIndex)  
            nLastIndex = textLen  
            listener?.(chunk)  
          }  
        }  
        if (xhr.status === 200) {  
          resolve({ code: ResultCode.SUCCESS, msg: 'end' })  
          done()  
        } else {  
          reject(new Error(xhr.statusText))  
          done()  
        }  
      }  
    }  
    xhr.onprogress = function (event: any) {  
      const responseText = xhr.responseText  
      if (responseText) {  
        const textLen = responseText.length  
        const chunk = responseText.substring(nLastIndex)  
        nLastIndex = textLen  
        listener?.(chunk)  
        console.log('onprogress ', chunk)  
      }  
    }  

    xhr.onerror = function (error: any) {  
      console.error('Network Error:', error)  
      reject(error)  
      done()  
    }  
    // 配置请求  
    xhr.open('POST', 'https://your-site/api/openai/completions/stream')  
    xhr.setRequestHeader('Accept', 'text/event-stream')  
    xhr.setRequestHeader('token', token)  
    xhr.setRequestHeader('User-Agent', 'Mobile')  
    xhr.setRequestHeader('Content-Type', 'application/json')  
    xhr.setRequestHeader('Host', 'mapi.lawvector.cn')  
    xhr.setRequestHeader('Connection', 'keep-alive')  

    // 处理终止逻辑  
    if (signal) {  
      onCanceled = cancel => {  
        console.log('fetchStreamChatForApp onCanceled ', cancel)  
        if (!xhr) {  
          return  
        }  
        reject(new CanceledError('canceled'))  
        xhr.abort()  
        xhr = undefined  
      }  
      // @ts-expect-error no types  
      signal?.aborted ? onCanceled() : signal?.addEventListener('abort', onCanceled)  
    }  
    xhr.send(data)  
  })  
}

当前方案经验证,可以从流式接口获取到数据,但是流式效果不太好,而且从网上汇总来的信息来看,plus.net存在较多问题,比如《plus.net.XMLHttpRequest()在苹果端移动网络环境下不能使用》等。因此不推荐 plus.net方案

2. event-source-polyfill

参考方案 《OpenAI流式请求实现方案》《react + ts + event-source-polyfill 实现方案》《Vue中使用eventSource处理ChatGPT聊天SSE长连接获取数据》

实际App上运行发现报错 TypeError: XMLHttpRequest is not a constructor

哪位大佬可以解决上述问题请补充,不胜感激!

3. fetch-event-source

参考方案 《js调用SSE客户端》《fetch-event-source源码解析》《ChatGPT-SSE流式响应》

经过验证发现单独引入 [@microsoft/fetch-event-source](https://github.com/Azure/fetch-event-source) 会抛出异常

《ChatGPT-SSE流式响应》 分析应该是需要结合 renderjs 进行使用。

目前推荐使用该方案~

4. App原生语言插件

参考 《EventSource (sse)等自定义网络请求》

因为该插件目前仅支持Android,不推荐。

理论上,原生插件是一定能够解决这个问题,期待大佬们开发更完善的原生插件。


1 回复

在uni-app中,你可以通过Server-Sent Events (SSE) 实现流式效果,以实时接收服务器推送的更新。SSE 是一种允许服务器向客户端推送更新的技术,这些更新以文本流的形式发送到客户端,并且客户端可以使用 JavaScript 来处理这些更新。

以下是一个简单的示例,展示如何在uni-app中使用SSE:

服务器端代码(Node.js)

首先,我们需要在服务器端设置一个SSE连接。这里我们使用Node.js和Express框架作为示例:

const express = require('express');
const app = express();
const port = 3000;

app.get('/sse', (req, res) => {
  res.setHeader('Content-Type', 'text/event-stream');
  res.setHeader('Cache-Control', 'no-cache');
  res.setHeader('Connection', 'keep-alive');

  const intervalId = setInterval(() => {
    res.write(`data: ${JSON.stringify({ message: 'Hello, World!', time: new Date() })}\n\n`);
  }, 1000);

  req.on('close', () => {
    clearInterval(intervalId);
  });
});

app.listen(port, () => {
  console.log(`SSE server is running at http://localhost:${port}/sse`);
});

客户端代码(uni-app)

在uni-app中,你可以通过uni.request来初始化SSE连接,并使用WebSocket API的类似方式来处理消息。不过需要注意的是,uni.request本身不支持SSE,但你可以通过创建隐藏的iframe来绕过这个限制。

以下是一个简单的uni-app页面示例,展示如何接收SSE消息:

<template>
  <view>
    <text>{{ message }}</text>
    <text>{{ time }}</text>
  </view>
</template>

<script>
export default {
  data() {
    return {
      message: '',
      time: ''
    };
  },
  mounted() {
    this.initSSE();
  },
  methods: {
    initSSE() {
      const iframe = document.createElement('iframe');
      iframe.style.display = 'none';
      iframe.src = 'http://localhost:3000/sse';
      document.body.appendChild(iframe);

      const eventSource = new EventSource(iframe.src);
      eventSource.onmessage = (event) => {
        const data = JSON.parse(event.data);
        this.message = data.message;
        this.time = data.time;
      };

      eventSource.onerror = (error) => {
        console.error('SSE error:', error);
        eventSource.close();
      };
    }
  }
};
</script>

在这个示例中,我们创建了一个隐藏的iframe来加载SSE端点,并通过EventSource对象来监听来自服务器的消息。每当收到新消息时,我们更新页面的数据。

请确保在实际部署时,处理跨域问题,并根据需要调整服务器和客户端代码。

回到顶部