Golang中如何在同一个结构体中使用多个channel

Golang中如何在同一个结构体中使用多个channel 在我的代码中,我想实现以下功能:

  1. 从输入接收数据作为 eventmessage
  2. 根据 event 格式化接收到的数据

我考虑使用类似于面向对象编程中的方法,但看起来我把事情搞砸了。

我写的是:

// Define the structs that contains the channels
type sseData struct {
	event, message string
}
type DataPasser struct {
	data       chan sseData
	logs       chan string
	connection chan struct{} // To control maximum allowed clients connections
}

// DEfine the struct's reciever that do the formating based on the input date
func (p *DataPasser) Format() {
	data := <-p.data
	switch {
	case len(data.event) > 0:
		p.logs <- fmt.Sprintf("event: %v\ndata: %v\n\n", data.event, data.message)
	case len(data.event) == 0:
		p.logs <- fmt.Sprintf("data: %v\n\n", data.message)
	}

}

然后我有以下代码:

func (p *DataPasser) HandleSignal(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "text/event-stream; charset=utf-8")
	w.Header().Set("Cache-Control", "no-cache")
	w.Header().Set("Connection", "keep-alive")
	setupCORS(&w, r)

	fmt.Println("Client connected from IP:", r.RemoteAddr)

	p.connection <- struct{}{}
	flusher, ok := w.(http.Flusher)
	if !ok {
		http.Error(w, "Internal error", 500)
		return
	}

	fmt.Fprint(w, "event: notification\ndata: Connection to WhatsApp server ...\n\n")
	flusher.Flush()

	// Connect to the WhatsApp client
	go Connect()

	// Prepare dataParser `p` to recieve data through its sseData channel
	go p.Format()


	for {
		select {
		case c := <-p.logs:
			fmt.Fprint(w, c)
			flusher.Flush()
		case <-r.Context().Done():
			<-p.connection
			fmt.Println("Connection closed")
			return
		}
	}
}

func setupCORS(w *http.ResponseWriter, req *http.Request) {
	(*w).Header().Set("Cache-Control", "no-cache")
	(*w).Header().Set("Access-Control-Allow-Origin", "*")
	(*w).Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE")
	(*w).Header().Set("Access-Control-Allow-Headers", "Accept, Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token, Authorization")
}

在连接函数中,我有:

package main

import (
	"context"
	"fmt"
)

var err error

func Connect() {
	fmt.Println("Connected")
	if client.IsConnected() {
		client.Disconnect()
		passer.data <- sseData{
			event:   "notification",
			message: "Reconnecting to WhatsApp server ...",
		}
	}

	if client.Store.ID == nil {
		// No ID stored, new login
	GetQR:
		qrChan, _ := client.GetQRChannel(context.Background())
		err = client.Connect()
		if err != nil {
			//	panic(err)
			//passer.logs <- "Can not connect with WhatApp server, try again later"
			passer.data <- sseData{
				event:   "notification",
				message: "Can not connect with WhatApp server, try again later",
			}
			fmt.Println("Sorry", err)
		}

		for evt := range qrChan {
			switch evt.Event {
			case "success":
				{
					//passer.logs <- "success"
					passer.data <- sseData{
						event:   "notification",
						message: "success",
					}
					fmt.Println("Login event: success")
				}
			case "timeout":
				{
					//passer.logs <- "timeout/Refreshing"
					passer.data <- sseData{
						event:   "notification",
						message: "timeout/Refreshing",
					}
					fmt.Println("Login event: timeout")
					goto GetQR
				}
			case "code":
				{
					fmt.Println("new code recieved")
					fmt.Println(evt.Code)
					//passer.logs <- evt.Code
					passer.data <- sseData{
						event:   "qrCode",
						message: evt.Code,
					}
				}
			}
		}
	} else {
		// Already logged in, just connect
		//passer.logs <- "Already logged"
		passer.data <- sseData{
			event:   "notification",
			message: "Already logged in",
		}
		fmt.Println("Already logged")
		err = client.Connect()
		if err != nil {
			panic(err)
		}
	}
	/*
		c := make(chan os.Signal, 1)
		signal.Notify(c, os.Interrupt, syscall.SIGTERM)

		<-c
		passer.data <- sseData{
			event:   "notification",
			message: "Server got shut down",
		}
	*/
}

在主文件中,我有:

var passer *DataPasser

const maxClients = 1

func init() {
	passer = &DataPasser{
		data:       make(chan sseData),
		logs:       make(chan string),
		connection: make(chan struct{}, maxClients),
	}
}

func main() {

	http.HandleFunc("/sse", passer.HandleSignal)
	go http.ListenAndServe(":1234", nil)

	// Listen to Ctrl+C (you can also do something else that prevents the program from exiting)
	c := make(chan os.Signal, 1)
	signal.Notify(c, os.Interrupt, syscall.SIGTERM)

	<-c
	if client.IsConnected() {
		client.Disconnect()
	}
}

发生的情况是,服务器仅正确发送了第一个SSE,看起来它在通道通信的某个地方卡住了。

我通过编写以下代码解决了这个问题:

	// Connect to the WhatsApp client
	go Connect()

	for {
		select {
		case data := <-p.data:
			fmt.Println("recieved")

			switch {
			case len(data.event) > 0:
				fmt.Fprintf(w, "event: %v\ndata: %v\n\n", data.event, data.message)
			case len(data.event) == 0:
				fmt.Fprintf(w, "data: %v\n\n", data.message)
			}
			flusher.Flush()
		case <-r.Context().Done():
			<-p.connection
			fmt.Println("Connection closed")
			return
		}
	}

但我仍然对拆分操作和使用接收器感兴趣,有什么想法吗?


更多关于Golang中如何在同一个结构体中使用多个channel的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于Golang中如何在同一个结构体中使用多个channel的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


在同一个结构体中使用多个channel是Go语言中的常见模式。你的实现基本正确,但需要调整goroutine的生命周期管理。以下是修改后的版本:

// 修改DataPasser结构体,添加一个停止信号
type DataPasser struct {
    data       chan sseData
    logs       chan string
    connection chan struct{}
    done       chan struct{} // 用于控制Format goroutine的停止
}

// 修改Format方法,使其持续处理数据
func (p *DataPasser) Format() {
    defer close(p.logs)
    
    for {
        select {
        case data := <-p.data:
            switch {
            case len(data.event) > 0:
                p.logs <- fmt.Sprintf("event: %v\ndata: %v\n\n", data.event, data.message)
            case len(data.event) == 0:
                p.logs <- fmt.Sprintf("data: %v\n\n", data.message)
            }
        case <-p.done:
            return
        }
    }
}

// 修改HandleSignal方法
func (p *DataPasser) HandleSignal(w http.ResponseWriter, r *http.Request) {
    w.Header().Set("Content-Type", "text/event-stream; charset=utf-8")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")
    setupCORS(&w, r)

    fmt.Println("Client connected from IP:", r.RemoteAddr)

    p.connection <- struct{}{}
    defer func() { <-p.connection }()

    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "Internal error", 500)
        return
    }

    fmt.Fprint(w, "event: notification\ndata: Connection to WhatsApp server ...\n\n")
    flusher.Flush()

    // 为每个连接创建独立的Format处理器
    p.done = make(chan struct{})
    go p.Format()

    // Connect to the WhatsApp client
    go Connect()

    defer close(p.done)
    
    for {
        select {
        case c := <-p.logs:
            fmt.Fprint(w, c)
            flusher.Flush()
        case <-r.Context().Done():
            fmt.Println("Connection closed")
            return
        }
    }
}

// 修改init函数
func init() {
    passer = &DataPasser{
        data:       make(chan sseData),
        logs:       make(chan string),
        connection: make(chan struct{}, maxClients),
    }
}

或者,如果你想保持Format方法更通用,可以这样实现:

// 使用context控制goroutine生命周期
func (p *DataPasser) Format(ctx context.Context) {
    for {
        select {
        case data := <-p.data:
            switch {
            case len(data.event) > 0:
                p.logs <- fmt.Sprintf("event: %v\ndata: %v\n\n", data.event, data.message)
            case len(data.event) == 0:
                p.logs <- fmt.Sprintf("data: %v\n\n", data.message)
            }
        case <-ctx.Done():
            return
        }
    }
}

// 在HandleSignal中调用
func (p *DataPasser) HandleSignal(w http.ResponseWriter, r *http.Request) {
    // ... 其他代码保持不变
    
    // 创建context用于控制Format goroutine
    ctx, cancel := context.WithCancel(r.Context())
    defer cancel()
    
    go p.Format(ctx)
    
    // ... 其他代码保持不变
}

对于Connect函数中的通道使用,确保使用正确的DataPasser实例:

func Connect() {
    // 使用全局passer变量或通过参数传递
    if passer == nil {
        return
    }
    
    // ... 原有代码,但确保使用passer.data而不是直接访问通道
    passer.data <- sseData{
        event:   "notification",
        message: "Reconnecting to WhatsApp server ...",
    }
    // ... 其他代码
}

这种模式允许你在同一个结构体中管理多个channel,并通过接收器方法处理业务逻辑。关键点是确保goroutine有正确的生命周期管理,避免goroutine泄漏。

回到顶部