Golang中如何实现通道间的通信

Golang中如何实现通道间的通信 我正在学习使用Go通道,并尝试编写以下代码。我希望通过一个goroutine从给定的CSV文件中读取数据,然后将读取到的记录发送到另一个通道,该通道负责将相同的记录添加到另一个CSV文件中:

package main

import (
	"encoding/csv"
	"encoding/json"
	"fmt"
	"log"
	"os"
)

func failOnError(err error) {
	if err != nil {
		log.Fatal("Error:", err)
		panic(err)
	}
}
func main() {
	read := make(chan Data)
	go func(input_file string) {
		var data Data

		fr, err := os.Open(input_file)
		failOnError(err)
		defer fr.Close()
		r := csv.NewReader(fr)
		rows, err := r.ReadAll()
		failOnError(err)
		data.header = rows[0]
		for _, row := range rows[1:] {
			data.lines = append(data.lines, Person{
				Firstname: row[0],
				Lastname:  row[1],
				Address: &Address{
					City:  row[2],
					State: row[3],
				},
			})
		}

		peopleJson, _ := json.Marshal(data.lines)
		fmt.Println(string(peopleJson))   // This is working smoothly

		read <- data
	}("people.csv")

	csvOut, err := os.Create("resultsfile.csv")
	if err != nil {
		log.Fatal("Unable to open output")
	}

	out := make(chan int)
	select {
	case data := <-read:
		go func(data Data) {
			println("data received")     // <-- Not show up
			w := csv.NewWriter(csvOut)
			defer csvOut.Close()
			// handle header
			data.header = append(data.header, "score")
			if err = w.Write(data.header); err != nil {
				log.Fatal(err)
			}
			/*
				hanlde data
			*/
			w.Flush()
			out <- 0
		}(data)
	case _ = <-out:
		println("done")
	}

}

type Person struct {
	Firstname string   `json:"firstname"` // JSON annotation will allow for easy printing to JSON after it had been loaded
	Lastname  string   `json:"lastname"`
	Address   *Address `json:"address,omitempty"`
}

type Address struct {
	City  string `json:"city"`
	State string `json:"state"`
}

type Data struct {
	header []string
	lines  []Person
}

我的代码运行失败了,没有报错,也没有显示 data received 的提示。

注意:当我删除第二个goroutine,并用以下代码替换时,代码可以正常工作:

	data := <-read
	w := csv.NewWriter(csvOut)
	defer csvOut.Close()
	// handle header
	data.header = append(data.header, "score")
	if err = w.Write(data.header); err != nil {
		log.Fatal(err)
	}
	w.Flush()

但我希望通过使用goroutine来实现,以便更好地理解goroutine。


更多关于Golang中如何实现通道间的通信的实战教程也可以访问 https://www.itying.com/category-94-b0.html

2 回复

我找到了如下解决方案:

package main

import (
	"encoding/csv"
	"encoding/json"
	"fmt"
	"log"
	"os"
)

func failOnError(err error) {
	if err != nil {
		log.Fatal("Error:", err)
		panic(err)
	}
}
func main() {
	read := make(chan Data)
	go func(input_file string) {
		var data Data

		fr, err := os.Open(input_file)
		failOnError(err)
		defer fr.Close()
		r := csv.NewReader(fr)
		rows, err := r.ReadAll()
		failOnError(err)
		data.header = rows[0]
		for _, row := range rows[1:] {
			data.lines = append(data.lines, Person{
				Firstname: row[0],
				Lastname:  row[1],
				Address: &Address{
					City:  row[2],
					State: row[3],
				},
			})
		}

		peopleJson, _ := json.Marshal(data.lines)
		fmt.Println(string(peopleJson))   // This is working smoothly

		read <- data
	}("people.csv")

	csvOut, err := os.Create("resultsfile.csv")
	if err != nil {
		log.Fatal("Unable to open output")
	}

	out := make(chan int)

	data := <-read
	go func(data Data, outputfile *os.File) {
		w := csv.NewWriter(outputfile)
		defer outputfile.Close()
		// handle header
		data.header = append(data.header, "score")
		if err = w.Write(data.header); err != nil {
			log.Fatal(err)
		}
		/*
			hanlde data
		*/
		w.Flush()
		out <- 0
	}(data, csvOut)

	<-out
}

type Person struct {
	Firstname string   `json:"firstname"` // JSON annotation will allow for easy printing to JSON after it had been loaded
	Lastname  string   `json:"lastname"`
	Address   *Address `json:"address,omitempty"`
}

type Address struct {
	City  string `json:"city"`
	State string `json:"state"`
}

type Data struct {
	header []string
	lines  []Person
}

更多关于Golang中如何实现通道间的通信的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


你的代码存在两个主要问题:通道通信的同步逻辑和goroutine的生命周期管理。以下是修正后的实现:

package main

import (
	"encoding/csv"
	"encoding/json"
	"fmt"
	"log"
	"os"
	"sync"
)

func failOnError(err error) {
	if err != nil {
		log.Fatal("Error:", err)
	}
}

func main() {
	read := make(chan Data)
	done := make(chan bool)
	var wg sync.WaitGroup

	// 读取goroutine
	wg.Add(1)
	go func(input_file string) {
		defer wg.Done()
		var data Data

		fr, err := os.Open(input_file)
		failOnError(err)
		defer fr.Close()
		
		r := csv.NewReader(fr)
		rows, err := r.ReadAll()
		failOnError(err)
		
		data.header = rows[0]
		for _, row := range rows[1:] {
			data.lines = append(data.lines, Person{
				Firstname: row[0],
				Lastname:  row[1],
				Address: &Address{
					City:  row[2],
					State: row[3],
				},
			})
		}

		peopleJson, _ := json.Marshal(data.lines)
		fmt.Println(string(peopleJson))

		read <- data
	}("people.csv")

	// 写入goroutine
	wg.Add(1)
	go func() {
		defer wg.Done()
		
		data := <-read
		fmt.Println("data received")  // 现在会显示
		
		csvOut, err := os.Create("resultsfile.csv")
		failOnError(err)
		defer csvOut.Close()
		
		w := csv.NewWriter(csvOut)
		defer w.Flush()
		
		// 处理header
		data.header = append(data.header, "score")
		if err = w.Write(data.header); err != nil {
			log.Fatal(err)
		}
		
		// 处理数据行
		for _, person := range data.lines {
			record := []string{
				person.Firstname,
				person.Lastname,
				person.Address.City,
				person.Address.State,
				"100", // 示例score值
			}
			if err = w.Write(record); err != nil {
				log.Fatal(err)
			}
		}
		
		done <- true
	}()

	// 等待完成
	go func() {
		wg.Wait()
		close(read)
		close(done)
	}()

	<-done
	fmt.Println("done")
}

type Person struct {
	Firstname string   `json:"firstname"`
	Lastname  string   `json:"lastname"`
	Address   *Address `json:"address,omitempty"`
}

type Address struct {
	City  string `json:"city"`
	State string `json:"state"`
}

type Data struct {
	header []string
	lines  []Person
}

关键修改点:

  1. 使用WaitGroup确保goroutine完成:原代码中第二个goroutine可能没有执行,因为main函数提前退出。

  2. 修复通道通信逻辑:原代码的select语句存在问题,out通道在写入前就被读取。

  3. 添加done通道作为完成信号:确保所有操作完成后再退出main函数。

  4. 完善数据写入逻辑:添加了实际的数据行写入代码。

运行这个版本,你会看到"data received"和"done"的输出,并且resultsfile.csv文件会被正确创建。

回到顶部