Golang实现并行执行多个并发协程的最佳实践

Golang实现并行执行多个并发协程的最佳实践 我正在尝试通过编写一个类似于 LevelDB 的键值存储来学习 Go 中的并发编程。在这个过程中,我遇到了一个处理多个 goroutine 的问题。

我内存(RAM)中有一些数据,需要将其刷新到磁盘。为了异步执行此操作,我启动了一个 goroutine 来将数据写入磁盘,这样主 goroutine 仍然可以执行其他任务,例如处理读写查询。但当我执行时,我观察到新生成的 goroutine 开始执行,并在文件写入行处停止,然后切换回主 goroutine。根据我的理解,我认为文件写入是一个阻塞语句,这就是 goroutine 之间切换的原因。但我在一台 12 核的本地机器上运行它,所以我期望两个 goroutine 能够并行运行。我是否遗漏了什么?为了使其正常工作,我不得不使用 sync.WaitGroup 来让主 goroutine 实际等待,直到此文件写入完成。如果有人能告诉我是否有可能让这两个 goroutine 并行运行,那将非常有帮助。

我相对来说是编写并发程序的新手,Go 是我开始编写并发程序的第一门语言。 任何帮助都将不胜感激。谢谢!


更多关于Golang实现并行执行多个并发协程的最佳实践的实战教程也可以访问 https://www.itying.com/category-94-b0.html

13 回复

这真是太棒了!感谢你的帮助。非常感谢!

更多关于Golang实现并行执行多个并发协程的最佳实践的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


我猜它可能是并行发生的,但我们仅通过查看日志无法推断出那么多信息。

但这难道不是因为每个上下文中的阻塞语句导致协程来回切换吗? 假设这里的 time.sleep 是阻塞的

在处理协程时,总是使用等待组会更好吗?我是在假设运行在单核机器上的情况下,才想到使用等待组这个主意的。

我观察到,新创建的 goroutine 开始执行,但在文件写入行处停止,并切换回了主 goroutine。

你是否观察到主 goroutine 在创建另一个 goroutine 后就停止了?

是的,新创建的goroutine运行得非常快,但只持续到某个特定点(即发生文件读取操作时)。在那里,执行会从新创建的goroutine切换到主goroutine。

在两个goroutine中都添加一个 time.Sleep(...) 调用。我打赌你随后会看到来自两个goroutine的日志同时出现。

func main() {
    fmt.Println("hello world")
}

新创建的协程产生的日志全部来自该协程,而不是主协程

尝试在两个协程中都添加一个微小的 time.Sleep 间隔。可能是因为创建的协程运行得太快,在主协程有机会在中间写入自己的日志输出之前,它就输出了所有的日志语句。

我不太确定,但当我查看日志时,我观察到新创建的 goroutine 之后的日志都来自这个新创建的 goroutine,而不是主 goroutine。然后它(新创建的 goroutine)在文件写入之前停止,接着切换回主 goroutine。

Abeshek:

在处理协程时,总是使用等待组会更好吗?

是的。因为当主协程退出时(即 func main 结束时),所有协程都会停止。为了避免这种情况,主协程必须等待其他协程完成。

哦,而且这段代码示例甚至没有像Print调用之类的阻塞I/O操作。

在本地运行这段代码(不确定Playground是否提供超过一个核心,说实话),你就能看到goroutine之间的一些切换。(在我的机器上,我需要多次运行代码才能看到切换。大多数时候,看起来真的像是两个goroutine一个接一个地运行。)

不,调度器是抢占式的,至少部分是。在 Go 的某些旧版本中,它曾经是协作式的,但这一点已经改变。

这个游乐场片段展示了两个 goroutine 在有和没有睡眠延迟时的行为。将 delay 设置为 0 并运行几次,你会看到运行时在 goroutine 之间切换。(当 delay = 1ns 时,切换当然很明显。)

package main

import (
    "fmt"
    "time"
)

func main() {
    const delay = 0 // 尝试 0 和 time.Nanosecond
    var counter int
    go func() {
        for {
            counter++
        }
    }()
    go func() {
        for {
            counter++
        }
    }()
    time.Sleep(delay)
    fmt.Println(counter)
}

在Go中实现并行执行多个goroutine时,理解并发(concurrency)与并行(parallelism)的区别很重要。你观察到的行为是正常的:Go的调度器会在阻塞操作(如文件I/O)时切换goroutine,但这不意味着它们不能并行运行。在12核机器上,goroutine确实可以并行执行,但调度器会根据情况管理它们的执行。

以下是一个示例,展示如何并行执行多个goroutine,特别是处理文件写入等阻塞操作,并使用sync.WaitGroup来等待所有goroutine完成:

package main

import (
    "fmt"
    "os"
    "sync"
    "time"
)

// 模拟数据写入磁盘的函数
func writeDataToFile(filename string, data []byte, wg *sync.WaitGroup) {
    defer wg.Done() // 确保在函数退出时通知WaitGroup

    // 模拟一些处理时间
    time.Sleep(100 * time.Millisecond)

    // 写入文件(阻塞操作)
    err := os.WriteFile(filename, data, 0644)
    if err != nil {
        fmt.Printf("Error writing to %s: %v\n", filename, err)
        return
    }
    fmt.Printf("Data written to %s\n", filename)
}

func main() {
    var wg sync.WaitGroup
    files := []string{"file1.txt", "file2.txt", "file3.txt"}
    data := []byte("Sample data for writing to disk.")

    // 启动多个goroutine并行写入文件
    for _, file := range files {
        wg.Add(1) // 为每个goroutine增加计数
        go writeDataToFile(file, data, &wg)
    }

    // 主goroutine可以继续执行其他任务
    fmt.Println("Main goroutine is free to handle other tasks...")

    // 等待所有写入goroutine完成
    wg.Wait()
    fmt.Println("All file writes completed.")
}

在这个示例中,多个goroutine并行执行文件写入操作。sync.WaitGroup用于确保主goroutine等待所有写入完成,但在此期间主goroutine可以处理其他任务(如处理查询)。Go的调度器会自动在可用的CPU核心上并行运行这些goroutine,特别是在有阻塞I/O时,调度器会切换到其他goroutine以最大化资源利用率。

如果你的目标是实现类似LevelDB的键值存储,考虑使用通道(channels)或更高级的同步原语来管理并发。例如,可以使用带缓冲的通道来限制同时进行的磁盘写入数量,避免资源耗尽:

package main

import (
    "fmt"
    "os"
    "sync"
)

func writeDataToFileWithLimit(filename string, data []byte, sem chan struct{}, wg *sync.WaitGroup) {
    defer wg.Done()

    sem <- struct{}{}        // 获取信号量,限制并发数
    defer func() { <-sem }() // 释放信号量

    err := os.WriteFile(filename, data, 0644)
    if err != nil {
        fmt.Printf("Error writing to %s: %v\n", filename, err)
        return
    }
    fmt.Printf("Data written to %s\n", filename)
}

func main() {
    var wg sync.WaitGroup
    files := []string{"file1.txt", "file2.txt", "file3.txt", "file4.txt", "file5.txt"}
    data := []byte("Sample data.")
    sem := make(chan struct{}, 2) // 限制最多2个并发写入

    for _, file := range files {
        wg.Add(1)
        go writeDataToFileWithLimit(file, data, sem, &wg)
    }

    wg.Wait()
    fmt.Println("All writes done with concurrency limit.")
}

这个示例使用带缓冲的通道作为信号量,限制同时执行文件写入的goroutine数量为2,从而控制资源使用。这在实际应用中很重要,因为过多的并行磁盘I/O可能导致性能下降。

总之,你的观察是正常的,Go的调度器会处理goroutine的切换,而并行性在有多核时自动发生。使用sync.WaitGroup是正确的做法,以确保主goroutine在需要时等待其他goroutine完成。

回到顶部