Golang实现MapReduce的详细教程
Golang实现MapReduce的详细教程 如何解决这个面试问题。
有一个包含大量单词的文本文件,文件大小为100GB,你需要找到第一个不重复的单词。
限制:使用少于16GB的内存
示例,文件内容如下:
foo bar chaos hello hi foo bar
这个程序应该输出“chaos”,因为它是第一个不重复的单词。
如何使用MapReduce解决这个问题。
4 回复
实际上,我在将100GB文件分割成更小的块时遇到了问题。
更多关于Golang实现MapReduce的详细教程的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
它们是如何分隔的?
在从大小合理的缓冲区读取文件时,可以将其用作自然边界。
Go语言中没有 MapReduce,你可以使用 for 循环和一个“外部”变量。
或者,你指的是编程模型,而不是函数式编程语言中常见的那种函数吗?
这是一个典型的分布式计算问题,适合用MapReduce解决。以下是详细的Golang实现方案:
核心思路
将大文件分片处理,通过两阶段MapReduce:
- 第一阶段:统计每个单词在所有分片中的出现次数
- 第二阶段:找出第一个全局不重复的单词
完整实现
package main
import (
"bufio"
"fmt"
"os"
"sort"
"strings"
"sync"
)
// WordPosition 记录单词位置信息
type WordPosition struct {
Word string
FileID int
Position int64
}
// Map阶段结果
type MapResult struct {
Word string
Positions []WordPosition
}
// Reduce阶段结果
type ReduceResult struct {
Word string
Count int
FirstPos WordPosition
}
// Map函数:处理单个分片
func MapTask(filePath string, fileID int, results chan<- MapResult) {
file, err := os.Open(filePath)
if err != nil {
return
}
defer file.Close()
scanner := bufio.NewScanner(file)
scanner.Split(bufio.ScanWords)
var position int64 = 0
wordPositions := make(map[string][]WordPosition)
for scanner.Scan() {
word := strings.ToLower(scanner.Text())
wordPositions[word] = append(wordPositions[word], WordPosition{
Word: word,
FileID: fileID,
Position: position,
})
position++
}
// 发送Map结果
for word, positions := range wordPositions {
results <- MapResult{
Word: word,
Positions: positions,
}
}
}
// Shuffle:按单词分组
func Shuffle(mapResults <-chan MapResult) map[string][]WordPosition {
grouped := make(map[string][]WordPosition)
for result := range mapResults {
grouped[result.Word] = append(grouped[result.Word], result.Positions...)
}
return grouped
}
// Reduce函数:处理单个单词
func ReduceTask(word string, positions []WordPosition, results chan<- ReduceResult) {
// 按文件ID和位置排序
sort.Slice(positions, func(i, j int) bool {
if positions[i].FileID == positions[j].FileID {
return positions[i].Position < positions[j].Position
}
return positions[i].FileID < positions[j].FileID
})
// 找出全局第一个位置
var firstPos WordPosition
if len(positions) > 0 {
firstPos = positions[0]
}
results <- ReduceResult{
Word: word,
Count: len(positions),
FirstPos: firstPos,
}
}
// 主函数:协调MapReduce流程
func FindFirstUniqueWord(filePaths []string) string {
// 阶段1: Map
mapResults := make(chan MapResult, 1000)
var wg sync.WaitGroup
// 启动Map任务
for i, filePath := range filePaths {
wg.Add(1)
go func(id int, path string) {
defer wg.Done()
MapTask(path, id, mapResults)
}(i, filePath)
}
// 等待所有Map任务完成
go func() {
wg.Wait()
close(mapResults)
}()
// 阶段2: Shuffle
groupedData := Shuffle(mapResults)
// 阶段3: Reduce
reduceResults := make(chan ReduceResult, len(groupedData))
var reduceWg sync.WaitGroup
for word, positions := range groupedData {
reduceWg.Add(1)
go func(w string, pos []WordPosition) {
defer reduceWg.Done()
ReduceTask(w, pos, reduceResults)
}(word, positions)
}
go func() {
reduceWg.Wait()
close(reduceResults)
}()
// 阶段4: 收集结果并找出第一个不重复的单词
uniqueWords := make([]ReduceResult, 0)
for result := range reduceResults {
if result.Count == 1 {
uniqueWords = append(uniqueWords, result)
}
}
// 按出现位置排序
sort.Slice(uniqueWords, func(i, j int) bool {
if uniqueWords[i].FirstPos.FileID == uniqueWords[j].FirstPos.FileID {
return uniqueWords[i].FirstPos.Position < uniqueWords[j].FirstPos.Position
}
return uniqueWords[i].FirstPos.FileID < uniqueWords[j].FirstPos.FileID
})
if len(uniqueWords) > 0 {
return uniqueWords[0].Word
}
return ""
}
// 文件分片函数(模拟大文件分片)
func SplitLargeFile(inputFile string, chunkSize int64) []string {
file, err := os.Open(inputFile)
if err != nil {
panic(err)
}
defer file.Close()
fileInfo, _ := file.Stat()
fileSize := fileInfo.Size()
var chunks []string
var offset int64 = 0
for offset < fileSize {
chunkFile := fmt.Sprintf("chunk_%d.txt", offset)
chunks = append(chunks, chunkFile)
// 实际实现中这里需要按单词边界切割
// 简化处理:按固定大小切割
offset += chunkSize
}
return chunks
}
func main() {
// 示例:假设已经将100GB文件分割成多个小文件
chunkFiles := []string{
"chunk_0.txt",
"chunk_1.txt",
"chunk_2.txt",
// ... 更多分片
}
// 在实际场景中,需要先分割大文件
// chunkFiles := SplitLargeFile("100gb_file.txt", 1*1024*1024*1024) // 1GB分片
result := FindFirstUniqueWord(chunkFiles)
fmt.Printf("第一个不重复的单词是: %s\n", result)
}
内存优化版本(处理100GB文件)
// 流式处理版本,内存占用更小
func StreamMapReduce(filePaths []string) string {
// 第一遍:统计词频
wordCounts := make(map[string]int)
wordFirstPos := make(map[string]WordPosition)
for fileID, filePath := range filePaths {
file, _ := os.Open(filePath)
scanner := bufio.NewScanner(file)
scanner.Split(bufio.ScanWords)
var pos int64 = 0
for scanner.Scan() {
word := strings.ToLower(scanner.Text())
wordCounts[word]++
// 记录第一次出现的位置
if _, exists := wordFirstPos[word]; !exists {
wordFirstPos[word] = WordPosition{
Word: word,
FileID: fileID,
Position: pos,
}
}
pos++
}
file.Close()
}
// 找出所有只出现一次的单词
uniqueWords := make([]WordPosition, 0)
for word, count := range wordCounts {
if count == 1 {
uniqueWords = append(uniqueWords, wordFirstPos[word])
}
}
// 按位置排序
sort.Slice(uniqueWords, func(i, j int) bool {
if uniqueWords[i].FileID == uniqueWords[j].FileID {
return uniqueWords[i].Position < uniqueWords[j].Position
}
return uniqueWords[i].FileID < uniqueWords[j].FileID
})
if len(uniqueWords) > 0 {
return uniqueWords[0].Word
}
return ""
}
分布式版本架构
// Master节点协调
type Master struct {
MapTasks []string
ReduceTasks []string
Workers []string
}
// Worker节点
type Worker struct {
ID string
Status string
Task interface{}
}
// 分布式执行流程
func DistributedMapReduce(master *Master) string {
// 1. 分配Map任务给Worker
// 2. 收集Map结果
// 3. Shuffle数据到Reduce节点
// 4. 执行Reduce任务
// 5. 收集最终结果
return ""
}
关键点说明
- 分片策略:按1GB大小分片,共100个分片,每个分片独立处理
- 内存控制:每个Map任务只处理一个分片,内存使用在1GB以内
- 位置记录:需要记录单词的全局位置信息
- 排序机制:通过FileID和Position确定全局顺序
这个方案可以在16GB内存限制下处理100GB文件,通过分而治之的策略解决内存限制问题。

