Golang高规模并行处理的最佳解决方案探讨

Golang高规模并行处理的最佳解决方案探讨 我有一个用例,需要以每批N行的方式处理大型SQL表/xls/txt文件,以生成其节点和关系。

Golang 计划逻辑:

输入 是如下格式的行数组:

       ColumnA     ColumnB          ColumnC          ColumnD
        C1           C2               C31               C41
        C1           C2               C31               C42
        C1           C2               C32               C43

逻辑: 为每一行生成以RDF三元组形式表示的节点和关系。每个单元格将是一个节点。哪些列之间存在关系将由一个输入JSON决定。

以第一行为例,可能的三元组如下:

id/c1   <name>        "C1"
id/c2   <name>        "C2"
id/c31  <name>        "C31"
id/c41  <name>        "C41"
id/c1   <Relation1>    id/c2
id/c2   <Relation2>    id/c41
id/c31  <Relation3>    id/c41

为了提升速度,将启动K个goroutine来并行处理行。

K、N变量希望根据运行我的进程的硬件(10k、50k或更多)进行配置。硬件通常是32GB到64GB的机器。

问题:

处理行数可能达到数百万的巨大表格时,由于数据的性质,大量重复的三元组会在不同行中重复出现,这将生成巨大的输出文件。

避免重复的最简单方法可能是在批次之间维护一个全局的 map[string]struct{},以三元组字符串作为键,并检查键是否已存在,如果存在则不添加。使用多个goroutine时,映射的同步将成为瓶颈,并且由于数据量巨大,这个全局映射可能无法装入内存。

我是Golang新手,请帮我为这个问题找到更好的解决方案。在版本1中,我希望尽可能通过限制支持的输入大小来坚持使用单机设置。 任何匹配此用例的GitHub代码参考都会很有帮助。

谢谢。


更多关于Golang高规模并行处理的最佳解决方案探讨的实战教程也可以访问 https://www.itying.com/category-94-b0.html

3 回复

感谢您花时间处理我的问题。

我所说的 RDF 三元组是指: 起始节点ID 关系名称 结束节点ID 关系将是单向的。 我将向逻辑提供明确的输入,以决定是否在两列之间建立关系,包括列之间关系的确切名称。 逻辑不需要推断任何关系,它只需遵循给定的定义。

变量 K 将控制在部署的硬件上遇到问题时创建的 goroutine 数量。

变量 N 将控制输入到处理逻辑的行数(批次)。 输入行的批次大小将根据硬件和源表大小进行规划。 源表可以是任意大小。几百万行将是标准情况。 例如,假设源表有 100 万行。那么我们将以 N = 50K 或 100K 的批次读取它们,并仅将这些行提供给逻辑。这样考虑的原因是,一次性读取所有行会耗尽内存,因为处理逻辑也需要内存。

由于每个批次的行都是独立处理的,所以排序、MapReduce 的逻辑对于一个批次是有效的,但跨批次重复仍然是主要问题。如何避免创建节点/关系三元组,如果它已经在迄今为止的任何批次中被创建过? 我并不追求零重复,但希望尽可能将其最小化,同时也要保持处理稳定性,因为在这种情况下内存将是瓶颈。

我很乐意回答任何进一步的问题。我正在寻找这方面的想法,因为我缺乏 Golang 经验。

谢谢。

更多关于Golang高规模并行处理的最佳解决方案探讨的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


你好,Naresh,

你能澄清一下“RDF三元组”是什么意思吗?

你还能澄清一下 <RelationshipN> 节点是什么吗? 我不明白如何仅通过查看A到D列,就能推断出 id/c1 与 id/c2 相关,然后 id/c2 与 id/c41 相关,但与 id/c31 无关。 这些关系是双向的还是单向的?

当你说:

K,N 变量希望根据运行我进程的硬件(10k 或 50k 或更多)进行配置。

你的意思是你的批次大约有 10-50k 行,还是你计划运行 10-50k 个 goroutine?

我还不完全理解你想做什么,但这里有一些想法,可能对你有帮助,也可能没有:

  • 这个问题听起来不像是 I/O 密集型的(我可能错了,在获得更多信息后我会更理解),所以在测量到性能提升之前,我不建议为每个硬件核心设置大量的 goroutine 因子。
  • 你能在你正在处理的程序获取数据之前对其进行排序吗? 如果你能按 A 列排序,你可能不需要将所有三元组都保存在内存中。当你遍历行并到达一个新的 A 列值时,你可以丢弃前一个 A 列值的数据。 如果你能先按 A 列排序,再按 B 列排序,你或许能让内存占用更小。
  • 你可能可以将批次分发给各个工作器,这些工作器创建自己的映射,在完成其批次后,再汇集到一个负责连接这些批次的消费者那里。

希望这能有所帮助。 如果不能,你能提供更多关于你在做什么以及你尝试过什么的信息吗?

对于高规模并行处理重复三元组的问题,我推荐使用分片映射(sharded map)配合通道(channel)的模式。以下是具体实现方案:

package main

import (
	"encoding/json"
	"fmt"
	"hash/fnv"
	"sync"
)

// 定义三元组结构
type Triple struct {
	Subject   string `json:"subject"`
	Predicate string `json:"predicate"`
	Object    string `json:"object"`
}

// 分片映射结构
type ShardedMap struct {
	shards []*shard
	shardCount int
}

type shard struct {
	sync.RWMutex
	data map[string]struct{}
}

// 创建分片映射
func NewShardedMap(shardCount int) *ShardedMap {
	shards := make([]*shard, shardCount)
	for i := range shards {
		shards[i] = &shard{
			data: make(map[string]struct{}),
		}
	}
	return &ShardedMap{
		shards:     shards,
		shardCount: shardCount,
	}
}

// 根据键获取分片
func (sm *ShardedMap) getShard(key string) *shard {
	h := fnv.New32a()
	h.Write([]byte(key))
	return sm.shards[uint(h.Sum32())%uint32(sm.shardCount)]
}

// 检查并添加三元组
func (sm *ShardedMap) CheckAndAdd(t Triple) bool {
	key := fmt.Sprintf("%s|%s|%s", t.Subject, t.Predicate, t.Object)
	shard := sm.getShard(key)
	
	shard.Lock()
	defer shard.Unlock()
	
	if _, exists := shard.data[key]; exists {
		return false
	}
	shard.data[key] = struct{}{}
	return true
}

// 工作池处理结构
type Processor struct {
	workers      int
	batchSize    int
	tripleMap    *ShardedMap
	resultChan   chan Triple
	errorChan    chan error
	wg           sync.WaitGroup
}

func NewProcessor(workers, batchSize, shardCount int) *Processor {
	return &Processor{
		workers:    workers,
		batchSize:  batchSize,
		tripleMap:  NewShardedMap(shardCount),
		resultChan: make(chan Triple, 10000),
		errorChan:  make(chan error, 100),
	}
}

// 处理批次数据
func (p *Processor) ProcessBatch(batch [][]string, config map[string][]string) {
	rowChan := make(chan []string, len(batch))
	
	// 启动工作goroutine
	for i := 0; i < p.workers; i++ {
		p.wg.Add(1)
		go func() {
			defer p.wg.Done()
			for row := range rowChan {
				p.processRow(row, config)
			}
		}()
	}
	
	// 发送数据到工作池
	for _, row := range batch {
		rowChan <- row
	}
	close(rowChan)
	p.wg.Wait()
}

// 处理单行数据
func (p *Processor) processRow(row []string, config map[string][]string) {
	// 生成节点三元组
	for colIdx, cell := range row {
		nodeID := fmt.Sprintf("id/%s", cell)
		triple := Triple{
			Subject:   nodeID,
			Predicate: "<name>",
			Object:    fmt.Sprintf("\"%s\"", cell),
		}
		
		if p.tripleMap.CheckAndAdd(triple) {
			p.resultChan <- triple
		}
	}
	
	// 根据配置生成关系三元组
	for _, relation := range config["relations"] {
		var relConfig map[string]string
		json.Unmarshal([]byte(relation), &relConfig)
		
		sourceIdx := // 从配置获取源列索引
		targetIdx := // 从配置获取目标列索引
		
		triple := Triple{
			Subject:   fmt.Sprintf("id/%s", row[sourceIdx]),
			Predicate: fmt.Sprintf("<%s>", relConfig["name"]),
			Object:    fmt.Sprintf("id/%s", row[targetIdx]),
		}
		
		if p.tripleMap.CheckAndAdd(triple) {
			p.resultChan <- triple
		}
	}
}

// 结果收集器
func (p *Processor) CollectResults() <-chan Triple {
	return p.resultChan
}

// 主处理流程
func main() {
	// 配置参数
	workers := 100      // 根据CPU核心数调整
	batchSize := 10000  // 每批处理行数
	shardCount := 256   // 分片数,建议2的幂次
	
	processor := NewProcessor(workers, batchSize, shardCount)
	
	// 启动结果收集器
	go func() {
		outputFile, _ := os.Create("output.ttl")
		defer outputFile.Close()
		
		for triple := range processor.CollectResults() {
			line := fmt.Sprintf("%s %s %s .\n", 
				triple.Subject, triple.Predicate, triple.Object)
			outputFile.WriteString(line)
		}
	}()
	
	// 分批读取和处理数据
	for {
		batch := readBatchFromSource(batchSize)
		if len(batch) == 0 {
			break
		}
		
		config := loadConfig("relations.json")
		processor.ProcessBatch(batch, config)
	}
	
	close(processor.resultChan)
}

// 辅助函数
func readBatchFromSource(batchSize int) [][]string {
	// 实现从SQL/文件读取批次数据
	return nil
}

func loadConfig(path string) map[string][]string {
	// 加载关系配置
	return nil
}

对于内存优化,可以添加持久化存储支持:

// 磁盘辅助存储
type DiskBackedSet struct {
	memoryMap *ShardedMap
	diskStore *bolt.DB
	threshold int
}

func (d *DiskBackedSet) CheckAndAdd(t Triple) bool {
	key := fmt.Sprintf("%s|%s|%s", t.Subject, t.Predicate, t.Object)
	
	// 先检查内存
	if d.memoryMap.CheckAndAdd(t) {
		return true
	}
	
	// 内存未找到,检查磁盘
	var exists bool
	d.diskStore.View(func(tx *bolt.Tx) error {
		bucket := tx.Bucket([]byte("triples"))
		if bucket != nil {
			exists = bucket.Get([]byte(key)) != nil
		}
		return nil
	})
	
	if !exists {
		d.diskStore.Update(func(tx *bolt.Tx) error {
			bucket, _ := tx.CreateBucketIfNotExists([]byte("triples"))
			return bucket.Put([]byte(key), []byte{1})
		})
		return true
	}
	
	return false
}

对于GitHub参考,可以查看以下项目:

  1. https://github.com/dgraph-io/ristretto - 高性能并发缓存
  2. https://github.com/cornelk/hashmap - 优化的并发哈希映射
  3. https://github.com/alphadose/haxmap - 更快的并发哈希映射

这个方案通过分片映射减少锁竞争,使用工作池模式控制并发度,支持批次处理,并提供了磁盘持久化选项来处理超大规模数据集。

回到顶部