Golang中如何在同一个结构体中使用多个channel
Golang中如何在同一个结构体中使用多个channel 在我的代码中,我想实现以下功能:
- 从输入接收数据作为
event和message - 根据
event格式化接收到的数据
我考虑使用类似于面向对象编程中的方法,但看起来我把事情搞砸了。
我写的是:
// Define the structs that contains the channels
type sseData struct {
event, message string
}
type DataPasser struct {
data chan sseData
logs chan string
connection chan struct{} // To control maximum allowed clients connections
}
// DEfine the struct's reciever that do the formating based on the input date
func (p *DataPasser) Format() {
data := <-p.data
switch {
case len(data.event) > 0:
p.logs <- fmt.Sprintf("event: %v\ndata: %v\n\n", data.event, data.message)
case len(data.event) == 0:
p.logs <- fmt.Sprintf("data: %v\n\n", data.message)
}
}
然后我有以下代码:
func (p *DataPasser) HandleSignal(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream; charset=utf-8")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
setupCORS(&w, r)
fmt.Println("Client connected from IP:", r.RemoteAddr)
p.connection <- struct{}{}
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Internal error", 500)
return
}
fmt.Fprint(w, "event: notification\ndata: Connection to WhatsApp server ...\n\n")
flusher.Flush()
// Connect to the WhatsApp client
go Connect()
// Prepare dataParser `p` to recieve data through its sseData channel
go p.Format()
for {
select {
case c := <-p.logs:
fmt.Fprint(w, c)
flusher.Flush()
case <-r.Context().Done():
<-p.connection
fmt.Println("Connection closed")
return
}
}
}
func setupCORS(w *http.ResponseWriter, req *http.Request) {
(*w).Header().Set("Cache-Control", "no-cache")
(*w).Header().Set("Access-Control-Allow-Origin", "*")
(*w).Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE")
(*w).Header().Set("Access-Control-Allow-Headers", "Accept, Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token, Authorization")
}
在连接函数中,我有:
package main
import (
"context"
"fmt"
)
var err error
func Connect() {
fmt.Println("Connected")
if client.IsConnected() {
client.Disconnect()
passer.data <- sseData{
event: "notification",
message: "Reconnecting to WhatsApp server ...",
}
}
if client.Store.ID == nil {
// No ID stored, new login
GetQR:
qrChan, _ := client.GetQRChannel(context.Background())
err = client.Connect()
if err != nil {
// panic(err)
//passer.logs <- "Can not connect with WhatApp server, try again later"
passer.data <- sseData{
event: "notification",
message: "Can not connect with WhatApp server, try again later",
}
fmt.Println("Sorry", err)
}
for evt := range qrChan {
switch evt.Event {
case "success":
{
//passer.logs <- "success"
passer.data <- sseData{
event: "notification",
message: "success",
}
fmt.Println("Login event: success")
}
case "timeout":
{
//passer.logs <- "timeout/Refreshing"
passer.data <- sseData{
event: "notification",
message: "timeout/Refreshing",
}
fmt.Println("Login event: timeout")
goto GetQR
}
case "code":
{
fmt.Println("new code recieved")
fmt.Println(evt.Code)
//passer.logs <- evt.Code
passer.data <- sseData{
event: "qrCode",
message: evt.Code,
}
}
}
}
} else {
// Already logged in, just connect
//passer.logs <- "Already logged"
passer.data <- sseData{
event: "notification",
message: "Already logged in",
}
fmt.Println("Already logged")
err = client.Connect()
if err != nil {
panic(err)
}
}
/*
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
<-c
passer.data <- sseData{
event: "notification",
message: "Server got shut down",
}
*/
}
在主文件中,我有:
var passer *DataPasser
const maxClients = 1
func init() {
passer = &DataPasser{
data: make(chan sseData),
logs: make(chan string),
connection: make(chan struct{}, maxClients),
}
}
func main() {
http.HandleFunc("/sse", passer.HandleSignal)
go http.ListenAndServe(":1234", nil)
// Listen to Ctrl+C (you can also do something else that prevents the program from exiting)
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
<-c
if client.IsConnected() {
client.Disconnect()
}
}
发生的情况是,服务器仅正确发送了第一个SSE,看起来它在通道通信的某个地方卡住了。
我通过编写以下代码解决了这个问题:
// Connect to the WhatsApp client
go Connect()
for {
select {
case data := <-p.data:
fmt.Println("recieved")
switch {
case len(data.event) > 0:
fmt.Fprintf(w, "event: %v\ndata: %v\n\n", data.event, data.message)
case len(data.event) == 0:
fmt.Fprintf(w, "data: %v\n\n", data.message)
}
flusher.Flush()
case <-r.Context().Done():
<-p.connection
fmt.Println("Connection closed")
return
}
}
但我仍然对拆分操作和使用接收器感兴趣,有什么想法吗?
更多关于Golang中如何在同一个结构体中使用多个channel的实战教程也可以访问 https://www.itying.com/category-94-b0.html
1 回复
更多关于Golang中如何在同一个结构体中使用多个channel的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
在同一个结构体中使用多个channel是Go语言中的常见模式。你的实现基本正确,但需要调整goroutine的生命周期管理。以下是修改后的版本:
// 修改DataPasser结构体,添加一个停止信号
type DataPasser struct {
data chan sseData
logs chan string
connection chan struct{}
done chan struct{} // 用于控制Format goroutine的停止
}
// 修改Format方法,使其持续处理数据
func (p *DataPasser) Format() {
defer close(p.logs)
for {
select {
case data := <-p.data:
switch {
case len(data.event) > 0:
p.logs <- fmt.Sprintf("event: %v\ndata: %v\n\n", data.event, data.message)
case len(data.event) == 0:
p.logs <- fmt.Sprintf("data: %v\n\n", data.message)
}
case <-p.done:
return
}
}
}
// 修改HandleSignal方法
func (p *DataPasser) HandleSignal(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream; charset=utf-8")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
setupCORS(&w, r)
fmt.Println("Client connected from IP:", r.RemoteAddr)
p.connection <- struct{}{}
defer func() { <-p.connection }()
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Internal error", 500)
return
}
fmt.Fprint(w, "event: notification\ndata: Connection to WhatsApp server ...\n\n")
flusher.Flush()
// 为每个连接创建独立的Format处理器
p.done = make(chan struct{})
go p.Format()
// Connect to the WhatsApp client
go Connect()
defer close(p.done)
for {
select {
case c := <-p.logs:
fmt.Fprint(w, c)
flusher.Flush()
case <-r.Context().Done():
fmt.Println("Connection closed")
return
}
}
}
// 修改init函数
func init() {
passer = &DataPasser{
data: make(chan sseData),
logs: make(chan string),
connection: make(chan struct{}, maxClients),
}
}
或者,如果你想保持Format方法更通用,可以这样实现:
// 使用context控制goroutine生命周期
func (p *DataPasser) Format(ctx context.Context) {
for {
select {
case data := <-p.data:
switch {
case len(data.event) > 0:
p.logs <- fmt.Sprintf("event: %v\ndata: %v\n\n", data.event, data.message)
case len(data.event) == 0:
p.logs <- fmt.Sprintf("data: %v\n\n", data.message)
}
case <-ctx.Done():
return
}
}
}
// 在HandleSignal中调用
func (p *DataPasser) HandleSignal(w http.ResponseWriter, r *http.Request) {
// ... 其他代码保持不变
// 创建context用于控制Format goroutine
ctx, cancel := context.WithCancel(r.Context())
defer cancel()
go p.Format(ctx)
// ... 其他代码保持不变
}
对于Connect函数中的通道使用,确保使用正确的DataPasser实例:
func Connect() {
// 使用全局passer变量或通过参数传递
if passer == nil {
return
}
// ... 原有代码,但确保使用passer.data而不是直接访问通道
passer.data <- sseData{
event: "notification",
message: "Reconnecting to WhatsApp server ...",
}
// ... 其他代码
}
这种模式允许你在同一个结构体中管理多个channel,并通过接收器方法处理业务逻辑。关键点是确保goroutine有正确的生命周期管理,避免goroutine泄漏。

