使用 fetch 请求 openai stream 响应时,Nodejs环境下内容偶尔会被“切断”
const response = await fetch(...);
const reader = response.body.getReader();
const decoder = new TextDecoder('utf-8');
while (!done) {
const { value, done: readerDone } = await reader.read();
if (value) {
const char = decoder.decode(value);
console.log(char);
}
}
代码如上,有时候打印出来的 char 为:
data: {"id":"chatcmpl-7Y79egENb17GOU20IaW5KgJJhbf4M","object":"chat.completion.chunk","created":1688365010,"model":"gpt-3.5-turbo-0613","choices":[{"index":0,"delta":{"role":"assistant","content":""},"finish_reason":null}]}
data: {“id”:"chatcmpl-7Y79egEN
图示: https://i.imgur.com/P1YQs4q.png
也就是从"id"中间被切断了,导致内容少 1 到 2 个字。
请问有啥可改进的方法吗?
使用 fetch 请求 openai stream 响应时,Nodejs环境下内容偶尔会被“切断”
为啥不直接用 EventSource 读,要自己手写这玩意?非要手写的话可以去参考下 EventSource 的 polyfill 怎么实现的。
#1 不会。现在都是还是网上东拼西凑来的。。
当服务器端向客户端发送一段 HTTP 流( HTTP Streaming )时,数据是以块( chunks )的形式发送的,而不是一次性发送全部。在浏览器环境中,我们可以使用 Fetch API 的流( stream )读取器读取到这些数据。
这是一个基本的例子:javascript<br>fetch('/your-http-streaming-url')<br> .then(response => {<br> const reader = response.body.getReader();<br> const stream = new ReadableStream({<br> start(controller) {<br> function push() {<br> reader.read().then(({ done, value }) => {<br> if (done) {<br> controller.close();<br> return;<br> }<br> controller.enqueue(value);<br> push();<br> })<br> .catch(error => {<br> console.error(error);<br> controller.error(error);<br> })<br> }<br> push();<br> }<br> });<br><br> return new Response(stream, { headers: { "Content-Type": "text/html" } });<br> })<br> .then(response => response.text())<br> .then(result => {<br> console.log(result);<br> })<br> .catch(err => {<br> console.error(err);<br> });<br>
这个示例做了以下事情:
1. 使用 fetch
API 获取数据流。
2. 创建一个流读取器( stream reader )读取响应主体。
3. 创建一个新的 ReadableStream
,在它的 start
函数中读取数据,并通过 controller.enqueue
方法将数据加入队列中。
4. 如果读取过程中出现错误,使用 controller.error
将错误信息发送出去。
5. 当数据全部读取完毕,关闭控制器 controller.close
。
6. 最后,获取到的数据通过 Response.text()
转化为文本格式,并输出。
注意,上述示例仅适用于文本数据流,如果你需要处理的是二进制数据流,可能需要进行适当的调整。例如,你可能需要使用 Response.blob()
代替 Response.text()
。
chatGPT 的回答
你好像没有正确处理 done
#4 在循环体中处理的<br>if (choices.finish_reason === 'stop' || choices.finish_reason === 'function_call') {<br> done = true;<br> break;<br>}<br>
EventSource 只能 url 吧,我看 openAi 接口都是 POST 有 request body 的,EventSource 没法用。
curl https://api.openai.com/v1/completions <br> -H “Content-Type: application/json” <br> -H “Authorization: Bearer $OPENAI_API_KEY” <br> -d ‘{
“model”: “gpt-3.5-turbo”,
“prompt”: “Say this is a test”,
“max_tokens”: 7,
“steam”: true,
“temperature”: 0
}’
我的解决方法是,先判断一个 chunk 里最后的 data: 是否为一个合法的 json ,如果不是,则将下一次最开始接收到的字符串与前一次的非法 json 拼接,可以完美解决
附上我做的 ws api 的源码js<br>wss.on('connection', ws => {<br> let isConnected = true<br><br> ws.on('message', async e => {<br> let message = JSON.parse(e.toString())<br> if(message.type == 'conversation') {<br> let es = await fetch('<a target="_blank" href="https://api.openai.com/v1/chat/completions'" rel="nofollow noopener">https://api.openai.com/v1/chat/completions'</a>, {<br> headers: {<br> 'Content-Type': 'application/json',<br> 'Authorization': 'Bearer ' + 'YOUR_OPENAI_API_KEY'<br> },<br> method: 'POST',<br> body: JSON.stringify({<br> model: message.data.model,<br> messages: message.data.messages,<br> stream: true<br> })<br> })<br><br> const reader = es.body.pipeThrough(new TextDecoderStream()).getReader()<br><br> let errObj = ''<br><br> while(true) {<br> if(!isConnected) {<br> process.stdout.write('\n')<br> break<br> }<br> const res = await reader.read()<br> if(res.done) {<br> break<br> }<br> let chunk = res.value<br> chunk = chunk.replace(/data: /g, '').split('\n')<br><br> chunk.map(item => {<br> if(item != '[DONE]' && item != '' && item != undefined) {<br> let json<br> <br> try {<br> if(errObj != '') {<br> item = errObj + item<br> errObj = ''<br> }<br><br> json = JSON.parse(item)<br><br> if(json.choices[0].delta.content == undefined) return<br> ws.send(JSON.stringify({<br> type: 'conversation',<br> data: {<br> type: 'continue',<br> text: json.choices[0].delta.content<br> }<br> }))<br> process.stdout.write(json.choices[0].delta.content)<br> }catch {<br> errObj = item<br> return<br> }<br><br> }else if(item == '[DONE]') {<br> ws.send(JSON.stringify({<br> type: 'conversation',<br> data: {<br> type: 'done',<br> text: null<br> }<br> }))<br> process.stdout.write('\n')<br> }<br> })<br> }<br> }<br> })<br><br> ws.onclose = () => {<br> isConnected = false<br> }<br>})<br>
我发现原生 fetch 在手机端直连 gpt 的接口时一点数据都出不来,但在 pc 端就没问题,这是为什么?
#8 这倒是一个解决方法。不过我改用 ‘fortaine/fetch-event-source’ 库了,效果比手写好些。。
在 Node.js 环境下使用 fetch
请求 OpenAI 的 Stream 响应时,遇到内容偶尔被“切断”的问题,通常是由于流处理不当或网络延迟导致的。以下是一些可能的解决方案和优化建议:
-
确保流正确读取: 使用
fetch
后,应正确设置Response
对象的body
为可读流,并使用循环读取数据直到流结束。const { fetch } = require('node-fetch'); async function fetchStream() { const response = await fetch('YOUR_OPENAI_STREAM_URL'); const reader = response.body.getReader(); let stream = ''; let done, value; while ({ done, value } = await reader.read(), !done) { stream += new TextDecoder().decode(value); } console.log(stream); } fetchStream();
-
处理网络延迟: 网络延迟可能导致数据未完全接收就被处理。可以增加重试逻辑或使用更健壮的流处理库(如
axios
支持的取消令牌和重试机制)。 -
检查 OpenAI API 限制: 确认是否达到 OpenAI API 的速率限制或请求超时。根据 API 文档调整请求频率或增加超时时间。
-
使用合适的库: 考虑使用
axios
或got
等更强大的 HTTP 客户端库,它们提供了更丰富的流处理选项和错误处理机制。
通过上述方法,可以有效减少内容被“切断”的情况,确保数据的完整性和可靠性。