golang高性能分布式map/reduce系统插件gleam的使用

Golang高性能分布式Map/Reduce系统插件Gleam的使用

Gleam是一个高性能、高效的分布式执行系统,它简单、通用、灵活且易于定制。Gleam是用Go语言构建的,用户定义的可以用Go、Unix管道工具或任何流式程序来编写计算逻辑。

主要特性

高性能

  • 纯Go的mapper和reducer具有高性能和并发性
  • 数据通过内存流动,可选择性地写入磁盘
  • 多个map reduce步骤会合并在一起以获得更好的性能

内存高效

  • 每个执行器运行在独立的OS进程中,由OS管理内存
  • Gleam master和agent服务器内存高效,消耗约10MB内存
  • 根据数据大小提示自动调整所需内存大小

灵活

  • Gleam流可以独立运行或分布式运行
  • 可调整为内存模式或磁盘模式

易于定制

  • Go代码比Scala、Java、C++更简单易读

使用示例

单词计数示例

package main

import (
	"flag"
	"strings"

	"github.com/chrislusf/gleam/distributed"
	"github.com/chrislusf/gleam/flow"
	"github.com/chrislusf/gleam/gio"
	"github.com/chrislusf/gleam/plugins/file"
)

var (
	isDistributed   = flag.Bool("distributed", false, "run in distributed or not")
	Tokenize  = gio.RegisterMapper(tokenize)
	AppendOne = gio.RegisterMapper(appendOne)
	Sum = gio.RegisterReducer(sum)
)

func main() {

	gio.Init()   // 如果命令行调用mapper或reducer,执行并退出
	flag.Parse() // 可选,因为gio.Init()也会调用这个

	f := flow.New("top5 words in passwd").
		Read(file.Txt("/etc/passwd", 2)).  // 读取txt文件并分成2个分片
		Map("tokenize", Tokenize).    // 调用注册的"tokenize" mapper函数
		Map("appendOne", AppendOne).  // 调用注册的"appendOne" mapper函数
		ReduceByKey("sum", Sum).         // 调用注册的"sum" reducer函数
		Sort("sortBySum", flow.OrderBy(2, true)).
		Top("top5", 5, flow.OrderBy(2, false)).
		Printlnf("%s\t%d")

	if *isDistributed {
		f.Run(distributed.Option())
	} else {
		f.Run()
	}
}

func tokenize(row []interface{}) error {
	line := gio.ToString(row[0])
	for _, s := range strings.FieldsFunc(line, func(r rune) bool {
		return !('A' <= r && r <= 'Z' || 'a' <= r && r <= 'z' || '0' <= r && r <= '9')
	}) {
		gio.Emit(s)
	}
	return nil
}

func appendOne(row []interface{}) error {
	row = append(row, 1)
	gio.Emit(row...)
	return nil
}

func sum(x, y interface{}) (interface{}, error) {
	return gio.ToInt64(x) + gio.ToInt64(y), nil
}

使用Unix管道工具的单词计数

package main

import (
	"fmt"

	"github.com/chrislusf/gleam/flow"
	"github.com/chrislusf/gleam/gio"
	"github.com/chrislusf/gleam/gio/mapper"
	"github.com/chrislusf/gleam/plugins/file"
	"github.com/chrislusf/gleam/util"
)

func main() {

	gio.Init()

	flow.New("word count by unix pipes").
		Read(file.Txt("/etc/passwd", 2)).
		Map("tokenize", mapper.Tokenize).
		Pipe("lowercase", "tr 'A-Z' 'a-z'").
		Pipe("sort", "sort").
		Pipe("uniq", "uniq -c").
		OutputRow(func(row *util.Row) error {
			fmt.Printf("%s\n", gio.ToString(row.K[0]))
			return nil
		}).Run()
}

连接两个CSV文件

假设文件"a.csv"有字段"a1, a2, a3, a4, a5",文件"b.csv"有字段"b1, b2, b3"。我们想连接a1 = b2的行,输出格式为"a1, a4, b3"。

package main

import (
	. "github.com/chrislusf/gleam/flow"
	"github.com/chrislusf/gleam/gio"
	"github.com/chrislusf/gleam/plugins/file"
)

func main() {

	gio.Init()

	f := New("join a.csv and b.csv by a1=b2")
	a := f.Read(file.Csv("a.csv", 1)).Select("select", Field(1,4)) // a1, a4
	b := f.Read(file.Csv("b.csv", 1)).Select("select", Field(2,3)) // b2, b3

	a.Join("joinByKey", b).Printlnf("%s,%s,%s").Run()  // a1, a4, b3
}

分布式计算

本地设置Gleam集群

# 在服务器上启动"gleam master"
> go get github.com/chrislusf/gleam/distributed/gleam
> gleam master --address=":45326"

# 在不同的服务器或端口上启动"gleam agent"
> gleam agent --dir=2 --port 45327 --host=127.0.0.1
> gleam agent --dir=3 --port 45328 --host=127.0.0.1

在Kubernetes上设置Gleam集群

安装Kubernetes工具后,可以使用Skaffold运行最新版本:

cd ./k8s
skaffold run --profile base

或者构建和运行本地版本:

cd ./k8s
skaffold dev --profile dev

更改执行模式

定义流后,Run()函数可以在本地模式或分布式模式下执行:

f := flow.New("")
...
// 1. 本地模式
f.Run()

// 2. 分布式模式
import "github.com/chrislusf/gleam/distributed"
f.Run(distributed.Option())
f.Run(distributed.Option().SetMaster("master_ip:45326"))

重要特性

  • 容错的OnDisk()
  • 从本地、HDFS或S3读取数据
  • 多种数据源支持:Cassandra、Kafka、Parquet文件、ORC文件、CSV文件、TSV文件、TXT文件等

状态

Gleam仍在发展中,欢迎任何帮助!特别是现在需要帮助实现Go读取Parquet文件的功能。


更多关于golang高性能分布式map/reduce系统插件gleam的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang高性能分布式map/reduce系统插件gleam的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Golang高性能分布式Map/Reduce系统插件Gleam使用指南

Gleam是一个用Go语言编写的高性能分布式Map/Reduce系统,它提供了简单易用的API来实现分布式计算任务。下面我将详细介绍Gleam的基本概念、安装方法和使用示例。

1. Gleam简介

Gleam的主要特点包括:

  • 轻量级设计,易于部署
  • 支持本地和分布式模式运行
  • 提供类似Unix管道的API风格
  • 自动处理数据分区和并行计算
  • 内置支持文本、JSON等常见数据格式

2. 安装Gleam

使用go get命令安装Gleam:

go get -u github.com/chrislusf/gleam

3. 基本使用示例

3.1 简单Map/Reduce示例

package main

import (
	"fmt"
	"strings"

	"github.com/chrislusf/gleam/flow"
	"github.com/chrislusf/gleam/gio"
)

func main() {
	gio.Init() // 初始化Gleam环境

	// 创建数据流
	f := flow.New("Word Count").
		// 读取文本数据
		Read("input.txt").
		// 按行分割
		Map("split lines", func(line []byte, emit gio.Emitter) {
			for _, word := range strings.Fields(string(line)) {
				emit([]byte(word))
			}
		}).
		// 单词计数
		ReduceByKey("count words", func(x, y []byte) []byte {
			return []byte(fmt.Sprintf("%d", gio.ToInt(x)+gio.ToInt(y)))
		}).
		// 输出结果
		Write("output.txt")

	// 运行数据流
	f.Run()
}

3.2 分布式模式运行

要运行分布式模式,需要先启动master和agent:

# 启动master
gleam master start

# 启动agent (可以在多台机器上运行)
gleam agent start --master=master_host:master_port

然后修改代码使用分布式执行:

f := flow.New("Distributed Word Count").Read("hdfs://input.txt")...
f.Distributed = true
f.Run()

4. 高级功能示例

4.1 使用自定义分区函数

f := flow.New("Custom Partitioning").
	Read("input.txt").
	// 自定义分区函数,按单词首字母分区
	Partition("partition by first letter", func(line []byte) int {
		if len(line) == 0 {
			return 0
		}
		return int(line[0] % 3) // 分成3个分区
	}).
	Map("process", func(line []byte, emit gio.Emitter) {
		// 处理逻辑
		emit(line)
	})

4.2 处理JSON数据

type Person struct {
	Name string `json:"name"`
	Age  int    `json:"age"`
}

f := flow.New("JSON Processing").
	Read("people.json").
	Map("parse json", func(line []byte, emit gio.Emitter) {
		var p Person
		if err := json.Unmarshal(line, &p); err == nil {
			// 提取年龄大于30的人
			if p.Age > 30 {
				emit([]byte(p.Name))
			}
		}
	}).
	Write("output.txt")

5. 性能优化技巧

  1. 合理设置分区数:根据数据量和集群规模选择适当的分区数
  2. 减少数据序列化:尽量在Map阶段过滤掉不需要的数据
  3. 使用组合操作:将多个Map操作合并减少中间数据传输
  4. 本地模式测试:先在本地小数据集测试,再扩展到分布式环境

6. 常见问题解决

  1. 数据倾斜:通过自定义分区函数平衡负载
  2. 内存不足:增加agent内存或减少分区大小
  3. 网络问题:确保master和agent之间的网络通畅

Gleam提供了强大的分布式计算能力,同时保持了Go语言的简洁性。通过合理设计数据流和分区策略,可以构建高效的分布式数据处理管道。

回到顶部