Golang中如何实现通道间的通信
Golang中如何实现通道间的通信 我正在学习使用Go通道,并尝试编写以下代码。我希望通过一个goroutine从给定的CSV文件中读取数据,然后将读取到的记录发送到另一个通道,该通道负责将相同的记录添加到另一个CSV文件中:
package main
import (
"encoding/csv"
"encoding/json"
"fmt"
"log"
"os"
)
func failOnError(err error) {
if err != nil {
log.Fatal("Error:", err)
panic(err)
}
}
func main() {
read := make(chan Data)
go func(input_file string) {
var data Data
fr, err := os.Open(input_file)
failOnError(err)
defer fr.Close()
r := csv.NewReader(fr)
rows, err := r.ReadAll()
failOnError(err)
data.header = rows[0]
for _, row := range rows[1:] {
data.lines = append(data.lines, Person{
Firstname: row[0],
Lastname: row[1],
Address: &Address{
City: row[2],
State: row[3],
},
})
}
peopleJson, _ := json.Marshal(data.lines)
fmt.Println(string(peopleJson)) // This is working smoothly
read <- data
}("people.csv")
csvOut, err := os.Create("resultsfile.csv")
if err != nil {
log.Fatal("Unable to open output")
}
out := make(chan int)
select {
case data := <-read:
go func(data Data) {
println("data received") // <-- Not show up
w := csv.NewWriter(csvOut)
defer csvOut.Close()
// handle header
data.header = append(data.header, "score")
if err = w.Write(data.header); err != nil {
log.Fatal(err)
}
/*
hanlde data
*/
w.Flush()
out <- 0
}(data)
case _ = <-out:
println("done")
}
}
type Person struct {
Firstname string `json:"firstname"` // JSON annotation will allow for easy printing to JSON after it had been loaded
Lastname string `json:"lastname"`
Address *Address `json:"address,omitempty"`
}
type Address struct {
City string `json:"city"`
State string `json:"state"`
}
type Data struct {
header []string
lines []Person
}
我的代码运行失败了,没有报错,也没有显示 data received 的提示。
注意:当我删除第二个goroutine,并用以下代码替换时,代码可以正常工作:
data := <-read
w := csv.NewWriter(csvOut)
defer csvOut.Close()
// handle header
data.header = append(data.header, "score")
if err = w.Write(data.header); err != nil {
log.Fatal(err)
}
w.Flush()
但我希望通过使用goroutine来实现,以便更好地理解goroutine。
更多关于Golang中如何实现通道间的通信的实战教程也可以访问 https://www.itying.com/category-94-b0.html
我找到了如下解决方案:
package main
import (
"encoding/csv"
"encoding/json"
"fmt"
"log"
"os"
)
func failOnError(err error) {
if err != nil {
log.Fatal("Error:", err)
panic(err)
}
}
func main() {
read := make(chan Data)
go func(input_file string) {
var data Data
fr, err := os.Open(input_file)
failOnError(err)
defer fr.Close()
r := csv.NewReader(fr)
rows, err := r.ReadAll()
failOnError(err)
data.header = rows[0]
for _, row := range rows[1:] {
data.lines = append(data.lines, Person{
Firstname: row[0],
Lastname: row[1],
Address: &Address{
City: row[2],
State: row[3],
},
})
}
peopleJson, _ := json.Marshal(data.lines)
fmt.Println(string(peopleJson)) // This is working smoothly
read <- data
}("people.csv")
csvOut, err := os.Create("resultsfile.csv")
if err != nil {
log.Fatal("Unable to open output")
}
out := make(chan int)
data := <-read
go func(data Data, outputfile *os.File) {
w := csv.NewWriter(outputfile)
defer outputfile.Close()
// handle header
data.header = append(data.header, "score")
if err = w.Write(data.header); err != nil {
log.Fatal(err)
}
/*
hanlde data
*/
w.Flush()
out <- 0
}(data, csvOut)
<-out
}
type Person struct {
Firstname string `json:"firstname"` // JSON annotation will allow for easy printing to JSON after it had been loaded
Lastname string `json:"lastname"`
Address *Address `json:"address,omitempty"`
}
type Address struct {
City string `json:"city"`
State string `json:"state"`
}
type Data struct {
header []string
lines []Person
}
更多关于Golang中如何实现通道间的通信的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
你的代码存在两个主要问题:通道通信的同步逻辑和goroutine的生命周期管理。以下是修正后的实现:
package main
import (
"encoding/csv"
"encoding/json"
"fmt"
"log"
"os"
"sync"
)
func failOnError(err error) {
if err != nil {
log.Fatal("Error:", err)
}
}
func main() {
read := make(chan Data)
done := make(chan bool)
var wg sync.WaitGroup
// 读取goroutine
wg.Add(1)
go func(input_file string) {
defer wg.Done()
var data Data
fr, err := os.Open(input_file)
failOnError(err)
defer fr.Close()
r := csv.NewReader(fr)
rows, err := r.ReadAll()
failOnError(err)
data.header = rows[0]
for _, row := range rows[1:] {
data.lines = append(data.lines, Person{
Firstname: row[0],
Lastname: row[1],
Address: &Address{
City: row[2],
State: row[3],
},
})
}
peopleJson, _ := json.Marshal(data.lines)
fmt.Println(string(peopleJson))
read <- data
}("people.csv")
// 写入goroutine
wg.Add(1)
go func() {
defer wg.Done()
data := <-read
fmt.Println("data received") // 现在会显示
csvOut, err := os.Create("resultsfile.csv")
failOnError(err)
defer csvOut.Close()
w := csv.NewWriter(csvOut)
defer w.Flush()
// 处理header
data.header = append(data.header, "score")
if err = w.Write(data.header); err != nil {
log.Fatal(err)
}
// 处理数据行
for _, person := range data.lines {
record := []string{
person.Firstname,
person.Lastname,
person.Address.City,
person.Address.State,
"100", // 示例score值
}
if err = w.Write(record); err != nil {
log.Fatal(err)
}
}
done <- true
}()
// 等待完成
go func() {
wg.Wait()
close(read)
close(done)
}()
<-done
fmt.Println("done")
}
type Person struct {
Firstname string `json:"firstname"`
Lastname string `json:"lastname"`
Address *Address `json:"address,omitempty"`
}
type Address struct {
City string `json:"city"`
State string `json:"state"`
}
type Data struct {
header []string
lines []Person
}
关键修改点:
-
使用WaitGroup确保goroutine完成:原代码中第二个goroutine可能没有执行,因为main函数提前退出。
-
修复通道通信逻辑:原代码的select语句存在问题,
out通道在写入前就被读取。 -
添加done通道作为完成信号:确保所有操作完成后再退出main函数。
-
完善数据写入逻辑:添加了实际的数据行写入代码。
运行这个版本,你会看到"data received"和"done"的输出,并且resultsfile.csv文件会被正确创建。

