Golang实现OpenAI流式处理的技术探讨

Golang实现OpenAI流式处理的技术探讨 大家好,请帮帮忙。 我正在使用OpenAI构建一个聊天机器人,并且想使用流式传输。问题是当OpenAI响应并添加一个单词时,Go会添加一行。结果在我的终端中,我得到了这个:

Stream response: 
Hello
Hello!
Hello! How
Hello! How can
Hello! How can I
Hello! How can I assist
Hello! How can I assist you
Hello! How can I assist you today
Hello! How can I assist you today

Stream finished

这是我的代码

// With Streaming Response --- SSE
func ChatWithBot() gin.HandlerFunc {

	return func(c *gin.Context) {

		var message MessageRequest

		// Getting the response from user
		err := c.BindJSON(&message)

		if err != nil {
			c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
			return

		}

		userMessage := message.Message

		// Setting the modal

		OpenAI_API_Key := os.Getenv("OPENAI_API_KEY")

		openAiClient := openai.NewClient(OpenAI_API_Key)

		ctx := context.Background()

		reqToOpenAI := openai.ChatCompletionRequest{
			Model:     openai.GPT3Dot5Turbo,
			MaxTokens: 8,
			Messages: []openai.ChatCompletionMessage{
				{
					Role:    openai.ChatMessageRoleUser,
					Content: userMessage,
				},
			},
			Stream: true,
		}

		stream, err := openAiClient.CreateChatCompletionStream(ctx, reqToOpenAI)

		if err != nil {
			fmt.Printf("Error creating completion")
			c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
			return
		}

		defer stream.Close()

		// w := c.Writer
		// w.Header().Set("Content-Type", "text/plain; charset=utf-8")
		// c.Header("Cache-Control", "no-cache")
		// c.Header("Connection", "keep-alive")

		fmt.Printf("Stream response: ")

		full := ""

		for {
			response, err := stream.Recv()
			if errors.Is(err, io.EOF) {
				fmt.Println("\nStream finished")
				break

			}

			if err != nil {
				fmt.Printf("\nStream error: %v\n", err)
				return
			}

			openAIReply := response.Choices[0].Delta.Content

			// Send each new reply as a separate SSE message

			// Remove any newline characters from the response
			openAIReply = strings.ReplaceAll(openAIReply, "\n", "")

			full += openAIReply

			fmt.Println(full)

			c.Writer.Write([]byte(full))
			c.Writer.(http.Flusher).Flush()
		}



}

而在前端我得到了这个 image

作为参考,这是我试图重写的JS代码

async function main() {
    let full = "";

    const completion = await openai.chat.completions.create({
        message: [{ role: "user", content: "Recipes for chicken fry" }],
        model: "gpt-3.5-turbo",
        stream: true,
    });

    for await (const part of completion) {
        let text = part.choices[0].delta.content;
        full += text;

        console.clear();

        console.log(full);
    }
}

谢谢


更多关于Golang实现OpenAI流式处理的技术探讨的实战教程也可以访问 https://www.itying.com/category-94-b0.html

2 回复

在得到 DrewTim(在 Slack 上)的帮助后,我终于解决了这个问题。谢谢。

这是最新的更新代码:

	for {
			response, err := stream.Recv()
			if errors.Is(err, io.EOF) {
				fmt.Println("\nStream finished")
				return
			}

			if err != nil {
				fmt.Printf("\nStream error: %v\n", err)
				return
			}

			openAIReply := response.Choices[0].Delta.Content

			fmt.Printf(openAIReply)

			// fmt.Fprint(openAIReply)
			c.Writer.WriteString(openAIReply)

			c.Writer.Flush()
			// c.JSON(http.StatusOK, gin.H{"success": true, "message": openAIReply})
		}

更多关于Golang实现OpenAI流式处理的技术探讨的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


问题出在每次循环迭代时都使用fmt.Println(full),这会在每次输出后自动换行。要解决这个问题,应该使用fmt.Print而不是fmt.Println,并在需要时手动控制换行。

以下是修改后的代码:

fmt.Printf("Stream response: ")

full := ""

for {
    response, err := stream.Recv()
    if errors.Is(err, io.EOF) {
        fmt.Println("\nStream finished")
        break
    }

    if err != nil {
        fmt.Printf("\nStream error: %v\n", err)
        return
    }

    openAIReply := response.Choices[0].Delta.Content
    
    // 移除换行符
    openAIReply = strings.ReplaceAll(openAIReply, "\n", "")
    
    full += openAIReply
    
    // 使用Print代替Println,并清除当前行
    fmt.Print("\r" + full)
    
    // 向前端发送数据
    c.Writer.Write([]byte(full))
    c.Writer.(http.Flusher).Flush()
}

或者,如果你想完全模拟JavaScript示例中的console.clear()行为,可以这样修改:

fmt.Printf("Stream response: ")

full := ""

for {
    response, err := stream.Recv()
    if errors.Is(err, io.EOF) {
        fmt.Println("\nStream finished")
        break
    }

    if err != nil {
        fmt.Printf("\nStream error: %v\n", err)
        return
    }

    openAIReply := response.Choices[0].Delta.Content
    
    // 移除换行符
    openAIReply = strings.ReplaceAll(openAIReply, "\n", "")
    
    full += openAIReply
    
    // 清除终端并重新输出
    fmt.Print("\033[H\033[2J")
    fmt.Print("Stream response: " + full)
    
    // 向前端发送数据
    c.Writer.Write([]byte(full))
    c.Writer.(http.Flusher).Flush()
}

对于前端显示问题,你需要确保正确设置SSE(Server-Sent Events)的响应头。取消注释你代码中的相关部分:

w := c.Writer
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")

fmt.Printf("Stream response: ")

full := ""

for {
    response, err := stream.Recv()
    if errors.Is(err, io.EOF) {
        fmt.Println("\nStream finished")
        w.Write([]byte("data: [DONE]\n\n"))
        w.(http.Flusher).Flush()
        break
    }

    if err != nil {
        fmt.Printf("\nStream error: %v\n", err)
        return
    }

    openAIReply := response.Choices[0].Delta.Content
    
    // 移除换行符
    openAIReply = strings.ReplaceAll(openAIReply, "\n", "")
    
    full += openAIReply
    
    // 终端输出
    fmt.Print("\r" + full)
    
    // SSE格式发送到前端
    fmt.Fprintf(w, "data: %s\n\n", full)
    w.(http.Flusher).Flush()
}

这样修改后,终端输出会在一行中更新,前端也能正确接收流式数据。

回到顶部