golang高性能分布式map/reduce系统插件gleam的使用
Golang高性能分布式Map/Reduce系统插件Gleam的使用
Gleam是一个高性能、高效的分布式执行系统,它简单、通用、灵活且易于定制。Gleam是用Go语言构建的,用户定义的可以用Go、Unix管道工具或任何流式程序来编写计算逻辑。
主要特性
高性能
- 纯Go的mapper和reducer具有高性能和并发性
- 数据通过内存流动,可选择性地写入磁盘
- 多个map reduce步骤会合并在一起以获得更好的性能
内存高效
- 每个执行器运行在独立的OS进程中,由OS管理内存
- Gleam master和agent服务器内存高效,消耗约10MB内存
- 根据数据大小提示自动调整所需内存大小
灵活
- Gleam流可以独立运行或分布式运行
- 可调整为内存模式或磁盘模式
易于定制
- Go代码比Scala、Java、C++更简单易读
使用示例
单词计数示例
package main
import (
"flag"
"strings"
"github.com/chrislusf/gleam/distributed"
"github.com/chrislusf/gleam/flow"
"github.com/chrislusf/gleam/gio"
"github.com/chrislusf/gleam/plugins/file"
)
var (
isDistributed = flag.Bool("distributed", false, "run in distributed or not")
Tokenize = gio.RegisterMapper(tokenize)
AppendOne = gio.RegisterMapper(appendOne)
Sum = gio.RegisterReducer(sum)
)
func main() {
gio.Init() // 如果命令行调用mapper或reducer,执行并退出
flag.Parse() // 可选,因为gio.Init()也会调用这个
f := flow.New("top5 words in passwd").
Read(file.Txt("/etc/passwd", 2)). // 读取txt文件并分成2个分片
Map("tokenize", Tokenize). // 调用注册的"tokenize" mapper函数
Map("appendOne", AppendOne). // 调用注册的"appendOne" mapper函数
ReduceByKey("sum", Sum). // 调用注册的"sum" reducer函数
Sort("sortBySum", flow.OrderBy(2, true)).
Top("top5", 5, flow.OrderBy(2, false)).
Printlnf("%s\t%d")
if *isDistributed {
f.Run(distributed.Option())
} else {
f.Run()
}
}
func tokenize(row []interface{}) error {
line := gio.ToString(row[0])
for _, s := range strings.FieldsFunc(line, func(r rune) bool {
return !('A' <= r && r <= 'Z' || 'a' <= r && r <= 'z' || '0' <= r && r <= '9')
}) {
gio.Emit(s)
}
return nil
}
func appendOne(row []interface{}) error {
row = append(row, 1)
gio.Emit(row...)
return nil
}
func sum(x, y interface{}) (interface{}, error) {
return gio.ToInt64(x) + gio.ToInt64(y), nil
}
使用Unix管道工具的单词计数
package main
import (
"fmt"
"github.com/chrislusf/gleam/flow"
"github.com/chrislusf/gleam/gio"
"github.com/chrislusf/gleam/gio/mapper"
"github.com/chrislusf/gleam/plugins/file"
"github.com/chrislusf/gleam/util"
)
func main() {
gio.Init()
flow.New("word count by unix pipes").
Read(file.Txt("/etc/passwd", 2)).
Map("tokenize", mapper.Tokenize).
Pipe("lowercase", "tr 'A-Z' 'a-z'").
Pipe("sort", "sort").
Pipe("uniq", "uniq -c").
OutputRow(func(row *util.Row) error {
fmt.Printf("%s\n", gio.ToString(row.K[0]))
return nil
}).Run()
}
连接两个CSV文件
假设文件"a.csv"有字段"a1, a2, a3, a4, a5",文件"b.csv"有字段"b1, b2, b3"。我们想连接a1 = b2的行,输出格式为"a1, a4, b3"。
package main
import (
. "github.com/chrislusf/gleam/flow"
"github.com/chrislusf/gleam/gio"
"github.com/chrislusf/gleam/plugins/file"
)
func main() {
gio.Init()
f := New("join a.csv and b.csv by a1=b2")
a := f.Read(file.Csv("a.csv", 1)).Select("select", Field(1,4)) // a1, a4
b := f.Read(file.Csv("b.csv", 1)).Select("select", Field(2,3)) // b2, b3
a.Join("joinByKey", b).Printlnf("%s,%s,%s").Run() // a1, a4, b3
}
分布式计算
本地设置Gleam集群
# 在服务器上启动"gleam master"
> go get github.com/chrislusf/gleam/distributed/gleam
> gleam master --address=":45326"
# 在不同的服务器或端口上启动"gleam agent"
> gleam agent --dir=2 --port 45327 --host=127.0.0.1
> gleam agent --dir=3 --port 45328 --host=127.0.0.1
在Kubernetes上设置Gleam集群
安装Kubernetes工具后,可以使用Skaffold运行最新版本:
cd ./k8s
skaffold run --profile base
或者构建和运行本地版本:
cd ./k8s
skaffold dev --profile dev
更改执行模式
定义流后,Run()函数可以在本地模式或分布式模式下执行:
f := flow.New("")
...
// 1. 本地模式
f.Run()
// 2. 分布式模式
import "github.com/chrislusf/gleam/distributed"
f.Run(distributed.Option())
f.Run(distributed.Option().SetMaster("master_ip:45326"))
重要特性
- 容错的OnDisk()
- 从本地、HDFS或S3读取数据
- 多种数据源支持:Cassandra、Kafka、Parquet文件、ORC文件、CSV文件、TSV文件、TXT文件等
状态
Gleam仍在发展中,欢迎任何帮助!特别是现在需要帮助实现Go读取Parquet文件的功能。
更多关于golang高性能分布式map/reduce系统插件gleam的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang高性能分布式map/reduce系统插件gleam的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Golang高性能分布式Map/Reduce系统插件Gleam使用指南
Gleam是一个用Go语言编写的高性能分布式Map/Reduce系统,它提供了简单易用的API来实现分布式计算任务。下面我将详细介绍Gleam的基本概念、安装方法和使用示例。
1. Gleam简介
Gleam的主要特点包括:
- 轻量级设计,易于部署
- 支持本地和分布式模式运行
- 提供类似Unix管道的API风格
- 自动处理数据分区和并行计算
- 内置支持文本、JSON等常见数据格式
2. 安装Gleam
使用go get命令安装Gleam:
go get -u github.com/chrislusf/gleam
3. 基本使用示例
3.1 简单Map/Reduce示例
package main
import (
"fmt"
"strings"
"github.com/chrislusf/gleam/flow"
"github.com/chrislusf/gleam/gio"
)
func main() {
gio.Init() // 初始化Gleam环境
// 创建数据流
f := flow.New("Word Count").
// 读取文本数据
Read("input.txt").
// 按行分割
Map("split lines", func(line []byte, emit gio.Emitter) {
for _, word := range strings.Fields(string(line)) {
emit([]byte(word))
}
}).
// 单词计数
ReduceByKey("count words", func(x, y []byte) []byte {
return []byte(fmt.Sprintf("%d", gio.ToInt(x)+gio.ToInt(y)))
}).
// 输出结果
Write("output.txt")
// 运行数据流
f.Run()
}
3.2 分布式模式运行
要运行分布式模式,需要先启动master和agent:
# 启动master
gleam master start
# 启动agent (可以在多台机器上运行)
gleam agent start --master=master_host:master_port
然后修改代码使用分布式执行:
f := flow.New("Distributed Word Count").Read("hdfs://input.txt")...
f.Distributed = true
f.Run()
4. 高级功能示例
4.1 使用自定义分区函数
f := flow.New("Custom Partitioning").
Read("input.txt").
// 自定义分区函数,按单词首字母分区
Partition("partition by first letter", func(line []byte) int {
if len(line) == 0 {
return 0
}
return int(line[0] % 3) // 分成3个分区
}).
Map("process", func(line []byte, emit gio.Emitter) {
// 处理逻辑
emit(line)
})
4.2 处理JSON数据
type Person struct {
Name string `json:"name"`
Age int `json:"age"`
}
f := flow.New("JSON Processing").
Read("people.json").
Map("parse json", func(line []byte, emit gio.Emitter) {
var p Person
if err := json.Unmarshal(line, &p); err == nil {
// 提取年龄大于30的人
if p.Age > 30 {
emit([]byte(p.Name))
}
}
}).
Write("output.txt")
5. 性能优化技巧
- 合理设置分区数:根据数据量和集群规模选择适当的分区数
- 减少数据序列化:尽量在Map阶段过滤掉不需要的数据
- 使用组合操作:将多个Map操作合并减少中间数据传输
- 本地模式测试:先在本地小数据集测试,再扩展到分布式环境
6. 常见问题解决
- 数据倾斜:通过自定义分区函数平衡负载
- 内存不足:增加agent内存或减少分区大小
- 网络问题:确保master和agent之间的网络通畅
Gleam提供了强大的分布式计算能力,同时保持了Go语言的简洁性。通过合理设计数据流和分区策略,可以构建高效的分布式数据处理管道。