Golang实现MapReduce的详细教程

Golang实现MapReduce的详细教程 如何解决这个面试问题。

有一个包含大量单词的文本文件,文件大小为100GB,你需要找到第一个不重复的单词。
限制:使用少于16GB的内存

       示例,文件内容如下:
       foo bar chaos hello hi foo bar

这个程序应该输出“chaos”,因为它是第一个不重复的单词。

如何使用MapReduce解决这个问题。

4 回复

实际上,我在将100GB文件分割成更小的块时遇到了问题。

更多关于Golang实现MapReduce的详细教程的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


它们是如何分隔的?

在从大小合理的缓冲区读取文件时,可以将其用作自然边界。

Go语言中没有 MapReduce,你可以使用 for 循环和一个“外部”变量。

或者,你指的是编程模型,而不是函数式编程语言中常见的那种函数吗?

这是一个典型的分布式计算问题,适合用MapReduce解决。以下是详细的Golang实现方案:

核心思路

将大文件分片处理,通过两阶段MapReduce:

  1. 第一阶段:统计每个单词在所有分片中的出现次数
  2. 第二阶段:找出第一个全局不重复的单词

完整实现

package main

import (
	"bufio"
	"fmt"
	"os"
	"sort"
	"strings"
	"sync"
)

// WordPosition 记录单词位置信息
type WordPosition struct {
	Word     string
	FileID   int
	Position int64
}

// Map阶段结果
type MapResult struct {
	Word      string
	Positions []WordPosition
}

// Reduce阶段结果
type ReduceResult struct {
	Word      string
	Count     int
	FirstPos  WordPosition
}

// Map函数:处理单个分片
func MapTask(filePath string, fileID int, results chan<- MapResult) {
	file, err := os.Open(filePath)
	if err != nil {
		return
	}
	defer file.Close()

	scanner := bufio.NewScanner(file)
	scanner.Split(bufio.ScanWords)
	
	var position int64 = 0
	wordPositions := make(map[string][]WordPosition)

	for scanner.Scan() {
		word := strings.ToLower(scanner.Text())
		wordPositions[word] = append(wordPositions[word], WordPosition{
			Word:     word,
			FileID:   fileID,
			Position: position,
		})
		position++
	}

	// 发送Map结果
	for word, positions := range wordPositions {
		results <- MapResult{
			Word:      word,
			Positions: positions,
		}
	}
}

// Shuffle:按单词分组
func Shuffle(mapResults <-chan MapResult) map[string][]WordPosition {
	grouped := make(map[string][]WordPosition)
	
	for result := range mapResults {
		grouped[result.Word] = append(grouped[result.Word], result.Positions...)
	}
	
	return grouped
}

// Reduce函数:处理单个单词
func ReduceTask(word string, positions []WordPosition, results chan<- ReduceResult) {
	// 按文件ID和位置排序
	sort.Slice(positions, func(i, j int) bool {
		if positions[i].FileID == positions[j].FileID {
			return positions[i].Position < positions[j].Position
		}
		return positions[i].FileID < positions[j].FileID
	})

	// 找出全局第一个位置
	var firstPos WordPosition
	if len(positions) > 0 {
		firstPos = positions[0]
	}

	results <- ReduceResult{
		Word:      word,
		Count:     len(positions),
		FirstPos:  firstPos,
	}
}

// 主函数:协调MapReduce流程
func FindFirstUniqueWord(filePaths []string) string {
	// 阶段1: Map
	mapResults := make(chan MapResult, 1000)
	var wg sync.WaitGroup
	
	// 启动Map任务
	for i, filePath := range filePaths {
		wg.Add(1)
		go func(id int, path string) {
			defer wg.Done()
			MapTask(path, id, mapResults)
		}(i, filePath)
	}
	
	// 等待所有Map任务完成
	go func() {
		wg.Wait()
		close(mapResults)
	}()
	
	// 阶段2: Shuffle
	groupedData := Shuffle(mapResults)
	
	// 阶段3: Reduce
	reduceResults := make(chan ReduceResult, len(groupedData))
	var reduceWg sync.WaitGroup
	
	for word, positions := range groupedData {
		reduceWg.Add(1)
		go func(w string, pos []WordPosition) {
			defer reduceWg.Done()
			ReduceTask(w, pos, reduceResults)
		}(word, positions)
	}
	
	go func() {
		reduceWg.Wait()
		close(reduceResults)
	}()
	
	// 阶段4: 收集结果并找出第一个不重复的单词
	uniqueWords := make([]ReduceResult, 0)
	for result := range reduceResults {
		if result.Count == 1 {
			uniqueWords = append(uniqueWords, result)
		}
	}
	
	// 按出现位置排序
	sort.Slice(uniqueWords, func(i, j int) bool {
		if uniqueWords[i].FirstPos.FileID == uniqueWords[j].FirstPos.FileID {
			return uniqueWords[i].FirstPos.Position < uniqueWords[j].FirstPos.Position
		}
		return uniqueWords[i].FirstPos.FileID < uniqueWords[j].FirstPos.FileID
	})
	
	if len(uniqueWords) > 0 {
		return uniqueWords[0].Word
	}
	return ""
}

// 文件分片函数(模拟大文件分片)
func SplitLargeFile(inputFile string, chunkSize int64) []string {
	file, err := os.Open(inputFile)
	if err != nil {
		panic(err)
	}
	defer file.Close()
	
	fileInfo, _ := file.Stat()
	fileSize := fileInfo.Size()
	
	var chunks []string
	var offset int64 = 0
	
	for offset < fileSize {
		chunkFile := fmt.Sprintf("chunk_%d.txt", offset)
		chunks = append(chunks, chunkFile)
		
		// 实际实现中这里需要按单词边界切割
		// 简化处理:按固定大小切割
		offset += chunkSize
	}
	
	return chunks
}

func main() {
	// 示例:假设已经将100GB文件分割成多个小文件
	chunkFiles := []string{
		"chunk_0.txt",
		"chunk_1.txt",
		"chunk_2.txt",
		// ... 更多分片
	}
	
	// 在实际场景中,需要先分割大文件
	// chunkFiles := SplitLargeFile("100gb_file.txt", 1*1024*1024*1024) // 1GB分片
	
	result := FindFirstUniqueWord(chunkFiles)
	fmt.Printf("第一个不重复的单词是: %s\n", result)
}

内存优化版本(处理100GB文件)

// 流式处理版本,内存占用更小
func StreamMapReduce(filePaths []string) string {
	// 第一遍:统计词频
	wordCounts := make(map[string]int)
	wordFirstPos := make(map[string]WordPosition)
	
	for fileID, filePath := range filePaths {
		file, _ := os.Open(filePath)
		scanner := bufio.NewScanner(file)
		scanner.Split(bufio.ScanWords)
		
		var pos int64 = 0
		for scanner.Scan() {
			word := strings.ToLower(scanner.Text())
			wordCounts[word]++
			
			// 记录第一次出现的位置
			if _, exists := wordFirstPos[word]; !exists {
				wordFirstPos[word] = WordPosition{
					Word:     word,
					FileID:   fileID,
					Position: pos,
				}
			}
			pos++
		}
		file.Close()
	}
	
	// 找出所有只出现一次的单词
	uniqueWords := make([]WordPosition, 0)
	for word, count := range wordCounts {
		if count == 1 {
			uniqueWords = append(uniqueWords, wordFirstPos[word])
		}
	}
	
	// 按位置排序
	sort.Slice(uniqueWords, func(i, j int) bool {
		if uniqueWords[i].FileID == uniqueWords[j].FileID {
			return uniqueWords[i].Position < uniqueWords[j].Position
		}
		return uniqueWords[i].FileID < uniqueWords[j].FileID
	})
	
	if len(uniqueWords) > 0 {
		return uniqueWords[0].Word
	}
	return ""
}

分布式版本架构

// Master节点协调
type Master struct {
	MapTasks    []string
	ReduceTasks []string
	Workers     []string
}

// Worker节点
type Worker struct {
	ID      string
	Status  string
	Task    interface{}
}

// 分布式执行流程
func DistributedMapReduce(master *Master) string {
	// 1. 分配Map任务给Worker
	// 2. 收集Map结果
	// 3. Shuffle数据到Reduce节点
	// 4. 执行Reduce任务
	// 5. 收集最终结果
	return ""
}

关键点说明

  1. 分片策略:按1GB大小分片,共100个分片,每个分片独立处理
  2. 内存控制:每个Map任务只处理一个分片,内存使用在1GB以内
  3. 位置记录:需要记录单词的全局位置信息
  4. 排序机制:通过FileID和Position确定全局顺序

这个方案可以在16GB内存限制下处理100GB文件,通过分而治之的策略解决内存限制问题。

回到顶部