使用 fetch 请求 openai stream 响应时,Nodejs环境下内容偶尔会被“切断”

发布于 1周前 作者 yibo5220 来自 nodejs/Nestjs
const response = await fetch(...);
const reader = response.body.getReader();
const decoder = new TextDecoder('utf-8');
while (!done) {
    const { value, done: readerDone } = await reader.read();
    if (value) {
    	const char = decoder.decode(value);
        console.log(char);
    }
}

代码如上,有时候打印出来的 char 为:

data: {"id":"chatcmpl-7Y79egENb17GOU20IaW5KgJJhbf4M","object":"chat.completion.chunk","created":1688365010,"model":"gpt-3.5-turbo-0613","choices":[{"index":0,"delta":{"role":"assistant","content":""},"finish_reason":null}]}

data: {“id”:"chatcmpl-7Y79egEN

图示: https://i.imgur.com/P1YQs4q.png

也就是从"id"中间被切断了,导致内容少 1 到 2 个字。

请问有啥可改进的方法吗?


使用 fetch 请求 openai stream 响应时,Nodejs环境下内容偶尔会被“切断”

11 回复

为啥不直接用 EventSource 读,要自己手写这玩意?非要手写的话可以去参考下 EventSource 的 polyfill 怎么实现的。


#1 不会。现在都是还是网上东拼西凑来的。。

当服务器端向客户端发送一段 HTTP 流( HTTP Streaming )时,数据是以块( chunks )的形式发送的,而不是一次性发送全部。在浏览器环境中,我们可以使用 Fetch API 的流( stream )读取器读取到这些数据。

这是一个基本的例子:

javascript<br>fetch('/your-http-streaming-url')<br> .then(response =&gt; {<br> const reader = response.body.getReader();<br> const stream = new ReadableStream({<br> start(controller) {<br> function push() {<br> reader.read().then(({ done, value }) =&gt; {<br> if (done) {<br> controller.close();<br> return;<br> }<br> controller.enqueue(value);<br> push();<br> })<br> .catch(error =&gt; {<br> console.error(error);<br> controller.error(error);<br> })<br> }<br> push();<br> }<br> });<br><br> return new Response(stream, { headers: { "Content-Type": "text/html" } });<br> })<br> .then(response =&gt; response.text())<br> .then(result =&gt; {<br> console.log(result);<br> })<br> .catch(err =&gt; {<br> console.error(err);<br> });<br>

这个示例做了以下事情:

1. 使用 fetch API 获取数据流。
2. 创建一个流读取器( stream reader )读取响应主体。
3. 创建一个新的 ReadableStream,在它的 start 函数中读取数据,并通过 controller.enqueue 方法将数据加入队列中。
4. 如果读取过程中出现错误,使用 controller.error 将错误信息发送出去。
5. 当数据全部读取完毕,关闭控制器 controller.close
6. 最后,获取到的数据通过 Response.text() 转化为文本格式,并输出。

注意,上述示例仅适用于文本数据流,如果你需要处理的是二进制数据流,可能需要进行适当的调整。例如,你可能需要使用 Response.blob() 代替 Response.text()

chatGPT 的回答

你好像没有正确处理 done

#4 在循环体中处理的

<br>if (choices.finish_reason === 'stop' || choices.finish_reason === 'function_call') {<br> done = true;<br> break;<br>}<br>


EventSource 只能 url 吧,我看 openAi 接口都是 POST 有 request body 的,EventSource 没法用。

curl https://api.openai.com/v1/completions <br> -H “Content-Type: application/json” <br> -H “Authorization: Bearer $OPENAI_API_KEY” <br> -d ‘{
“model”: “gpt-3.5-turbo”,
“prompt”: “Say this is a test”,
“max_tokens”: 7,
“steam”: true,
“temperature”: 0
}’

我的解决方法是,先判断一个 chunk 里最后的 data: 是否为一个合法的 json ,如果不是,则将下一次最开始接收到的字符串与前一次的非法 json 拼接,可以完美解决

附上我做的 ws api 的源码

js<br>wss.on('connection', ws =&gt; {<br> let isConnected = true<br><br> ws.on('message', async e =&gt; {<br> let message = JSON.parse(e.toString())<br> if(message.type == 'conversation') {<br> let es = await fetch('<a target="_blank" href="https://api.openai.com/v1/chat/completions'" rel="nofollow noopener">https://api.openai.com/v1/chat/completions'</a>, {<br> headers: {<br> 'Content-Type': 'application/json',<br> 'Authorization': 'Bearer ' + 'YOUR_OPENAI_API_KEY'<br> },<br> method: 'POST',<br> body: JSON.stringify({<br> model: message.data.model,<br> messages: message.data.messages,<br> stream: true<br> })<br> })<br><br> const reader = es.body.pipeThrough(new TextDecoderStream()).getReader()<br><br> let errObj = ''<br><br> while(true) {<br> if(!isConnected) {<br> process.stdout.write('\n')<br> break<br> }<br> const res = await reader.read()<br> if(res.done) {<br> break<br> }<br> let chunk = res.value<br> chunk = chunk.replace(/data: /g, '').split('\n')<br><br> chunk.map(item =&gt; {<br> if(item != '[DONE]' &amp;&amp; item != '' &amp;&amp; item != undefined) {<br> let json<br> <br> try {<br> if(errObj != '') {<br> item = errObj + item<br> errObj = ''<br> }<br><br> json = JSON.parse(item)<br><br> if(json.choices[0].delta.content == undefined) return<br> ws.send(JSON.stringify({<br> type: 'conversation',<br> data: {<br> type: 'continue',<br> text: json.choices[0].delta.content<br> }<br> }))<br> process.stdout.write(json.choices[0].delta.content)<br> }catch {<br> errObj = item<br> return<br> }<br><br> }else if(item == '[DONE]') {<br> ws.send(JSON.stringify({<br> type: 'conversation',<br> data: {<br> type: 'done',<br> text: null<br> }<br> }))<br> process.stdout.write('\n')<br> }<br> })<br> }<br> }<br> })<br><br> ws.onclose = () =&gt; {<br> isConnected = false<br> }<br>})<br>

我发现原生 fetch 在手机端直连 gpt 的接口时一点数据都出不来,但在 pc 端就没问题,这是为什么?

#8 这倒是一个解决方法。不过我改用 ‘fortaine/fetch-event-source’ 库了,效果比手写好些。。

在 Node.js 环境下使用 fetch 请求 OpenAI 的 Stream 响应时,遇到内容偶尔被“切断”的问题,通常是由于流处理不当或网络延迟导致的。以下是一些可能的解决方案和优化建议:

  1. 确保流正确读取: 使用 fetch 后,应正确设置 Response 对象的 body 为可读流,并使用循环读取数据直到流结束。

    const { fetch } = require('node-fetch');
    
    async function fetchStream() {
      const response = await fetch('YOUR_OPENAI_STREAM_URL');
      const reader = response.body.getReader();
      let stream = '';
      let done, value;
    
      while ({ done, value } = await reader.read(), !done) {
        stream += new TextDecoder().decode(value);
      }
    
      console.log(stream);
    }
    
    fetchStream();
    
  2. 处理网络延迟: 网络延迟可能导致数据未完全接收就被处理。可以增加重试逻辑或使用更健壮的流处理库(如 axios 支持的取消令牌和重试机制)。

  3. 检查 OpenAI API 限制: 确认是否达到 OpenAI API 的速率限制或请求超时。根据 API 文档调整请求频率或增加超时时间。

  4. 使用合适的库: 考虑使用 axiosgot 等更强大的 HTTP 客户端库,它们提供了更丰富的流处理选项和错误处理机制。

通过上述方法,可以有效减少内容被“切断”的情况,确保数据的完整性和可靠性。

回到顶部