Golang中如何编写并发程序并同步结果

Golang中如何编写并发程序并同步结果 编写一个由多个网络节点执行的并发程序 g,在执行结束时,每个节点广播其结果。

2 回复

请先深呼吸,然后尝试更清晰地重新表述你的问题,因为目前无法理解。

更多关于Golang中如何编写并发程序并同步结果的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


在Go中实现并发程序并同步结果,可以通过goroutine和channel结合sync.WaitGroup来完成。以下是一个示例,展示如何创建多个并发任务,等待它们完成,并收集结果:

package main

import (
    "fmt"
    "sync"
    "time"
)

// 模拟网络节点执行的任务
func nodeTask(nodeID int, wg *sync.WaitGroup, resultChan chan<- string) {
    defer wg.Done()
    
    // 模拟网络操作耗时
    time.Sleep(time.Duration(nodeID*100) * time.Millisecond)
    
    // 生成结果
    result := fmt.Sprintf("节点%d的结果: 处理完成", nodeID)
    
    // 将结果发送到通道
    resultChan <- result
}

func main() {
    const numNodes = 5
    
    var wg sync.WaitGroup
    resultChan := make(chan string, numNodes)
    
    // 启动所有节点任务
    for i := 1; i <= numNodes; i++ {
        wg.Add(1)
        go nodeTask(i, &wg, resultChan)
    }
    
    // 等待所有goroutine完成
    wg.Wait()
    close(resultChan)
    
    // 收集并处理所有结果
    fmt.Println("所有节点执行完成,开始收集结果:")
    for result := range resultChan {
        fmt.Println(result)
    }
}

如果需要实现节点间的结果广播,可以使用sync.Map或额外的channel:

package main

import (
    "fmt"
    "sync"
    "time"
)

type Node struct {
    ID     int
    Result string
}

func executeNode(nodeID int, wg *sync.WaitGroup, broadcastChan chan<- Node) {
    defer wg.Done()
    
    // 模拟处理时间
    time.Sleep(time.Duration(nodeID*100) * time.Millisecond)
    
    // 生成节点结果
    result := Node{
        ID:     nodeID,
        Result: fmt.Sprintf("数据块_%d", nodeID*1000),
    }
    
    // 广播结果到所有节点(通过共享channel)
    broadcastChan <- result
}

func main() {
    const numNodes = 5
    
    var wg sync.WaitGroup
    broadcastChan := make(chan Node, numNodes)
    results := make(map[int]string)
    var mu sync.Mutex
    
    // 启动结果收集器
    go func() {
        for node := range broadcastChan {
            mu.Lock()
            results[node.ID] = node.Result
            mu.Unlock()
            fmt.Printf("节点%d广播结果: %s\n", node.ID, node.Result)
        }
    }()
    
    // 启动所有节点
    for i := 1; i <= numNodes; i++ {
        wg.Add(1)
        go executeNode(i, &wg, broadcastChan)
    }
    
    // 等待所有节点完成
    wg.Wait()
    close(broadcastChan)
    
    // 等待结果收集完成
    time.Sleep(100 * time.Millisecond)
    
    // 输出最终结果集
    fmt.Println("\n最终收集的所有结果:")
    mu.Lock()
    for nodeID, result := range results {
        fmt.Printf("节点%d: %s\n", nodeID, result)
    }
    mu.Unlock()
}

对于更复杂的同步需求,可以使用sync.Cond或atomic包:

package main

import (
    "fmt"
    "sync"
    "sync/atomic"
    "time"
)

func main() {
    const numNodes = 5
    
    var wg sync.WaitGroup
    var completed int32
    results := make([]string, numNodes)
    
    for i := 0; i < numNodes; i++ {
        wg.Add(1)
        go func(nodeID int) {
            defer wg.Done()
            
            // 模拟工作负载
            time.Sleep(time.Duration(nodeID*100) * time.Millisecond)
            
            // 存储结果
            results[nodeID] = fmt.Sprintf("节点%d的结果数据", nodeID)
            
            // 原子操作更新完成计数
            atomic.AddInt32(&completed, 1)
            
            // 广播完成状态
            fmt.Printf("节点%d完成,当前完成数: %d\n", 
                nodeID, atomic.LoadInt32(&completed))
        }(i)
    }
    
    wg.Wait()
    
    fmt.Println("\n所有节点执行完成,最终结果:")
    for i, result := range results {
        fmt.Printf("节点%d: %s\n", i, result)
    }
}

这些示例展示了Go中并发执行和结果同步的基本模式,可以根据具体需求调整同步机制和数据结构。

回到顶部