golang Elasticsearch客户端操作插件库elastic的使用

Golang Elasticsearch客户端操作插件库elastic的使用

Elastic是一个用于Go编程语言的Elasticsearch客户端库。

版本对应关系

以下是Elasticsearch版本与Elastic客户端的对应关系:

Elasticsearch版本 Elastic版本 包路径 备注
7.x 7.0 github.com/olivere/elastic/v7 使用Go modules
6.x 6.0 github.com/olivere/elastic 使用依赖管理工具
5.x 5.0 gopkg.in/olivere/elastic.v5 积极维护
2.x 3.0 gopkg.in/olivere/elastic.v3 已弃用
1.x 2.0 gopkg.in/olivere/elastic.v2 已弃用
0.9-1.3 1.0 gopkg.in/olivere/elastic.v1 已弃用

基本使用示例

以下是一个完整的示例,展示如何创建客户端、创建索引、添加文档和执行搜索:

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/olivere/elastic/v7"
)

// 定义一个简单的文档结构
type Tweet struct {
	User    string `json:"user"`
	Message string `json:"message"`
}

func main() {
	// 创建客户端
	client, err := elastic.NewClient(
		elastic.SetURL("http://127.0.0.1:9200"), // 默认连接本地9200端口
		elastic.SetSniff(false),                 // 在生产环境中建议关闭嗅探
	)
	if err != nil {
		log.Fatal(err)
	}

	// 检查Elasticsearch是否可用
	info, code, err := client.Ping("http://127.0.0.1:9200").Do(context.Background())
	if err != nil {
		log.Fatal(err)
	}
	fmt.Printf("Elasticsearch returned with code %d and version %s\n", code, info.Version.Number)

	// 检查索引是否存在
	exists, err := client.IndexExists("twitter").Do(context.Background())
	if err != nil {
		log.Fatal(err)
	}
	if !exists {
		// 创建索引
		createIndex, err := client.CreateIndex("twitter").BodyString(`{
			"settings":{
				"number_of_shards":1,
				"number_of_replicas":0
			}
		}`).Do(context.Background())
		if err != nil {
			log.Fatal(err)
		}
		if !createIndex.Acknowledged {
			log.Println("Create index not acknowledged")
		}
	}

	// 添加文档
	tweet := Tweet{User: "olivere", Message: "Take Five"}
	put1, err := client.Index().
		Index("twitter").
		Id("1").
		BodyJson(tweet).
		Do(context.Background())
	if err != nil {
		log.Fatal(err)
	}
	fmt.Printf("Indexed tweet %s to index %s, type %s\n", put1.Id, put1.Index, put1.Type)

	// 执行搜索
	termQuery := elastic.NewTermQuery("user", "olivere")
	searchResult, err := client.Search().
		Index("twitter").
		Query(termQuery).
		Sort("user", true).
		From(0).Size(10).
		Pretty(true).
		Do(context.Background())
	if err != nil {
		log.Fatal(err)
	}

	// 打印搜索结果
	fmt.Printf("Query took %d milliseconds\n", searchResult.TookInMillis)
	fmt.Printf("Found a total of %d tweets\n", searchResult.TotalHits())

	// 遍历结果
	for _, hit := range searchResult.Hits.Hits {
		var t Tweet
		err := json.Unmarshal(hit.Source, &t)
		if err != nil {
			log.Printf("Failed to unmarshal source: %v", err)
			continue
		}
		fmt.Printf("Tweet by %s: %s\n", t.User, t.Message)
	}
}

主要功能API

文档API

  • 索引API
  • 获取API
  • 删除API
  • 按查询删除API
  • 更新API
  • 按查询更新API
  • 批量获取API
  • 批量API
  • 重新索引API

搜索API

  • 搜索
  • 搜索模板
  • 多搜索API
  • 计数API
  • 验证API
  • 解释API
  • 性能分析API

聚合

  • 指标聚合(平均值、基数、最大值等)
  • 桶聚合(日期直方图、范围等)
  • 管道聚合(移动平均、导数等)

索引API

  • 创建索引
  • 删除索引
  • 获取索引
  • 索引是否存在
  • 打开/关闭索引
  • 索引模板

高级功能

滚动查询

// 初始化滚动查询
scroll := client.Scroll("twitter").
    Size(100).
    KeepAlive("1m")

for {
    results, err := scroll.Do(context.Background())
    if err == io.EOF {
        break // 所有结果已返回
    }
    if err != nil {
        log.Fatal(err)
    }

    // 处理结果
    for _, hit := range results.Hits.Hits {
        var t Tweet
        err := json.Unmarshal(hit.Source, &t)
        if err != nil {
            log.Printf("Failed to unmarshal source: %v", err)
            continue
        }
        fmt.Printf("Tweet by %s: %s\n", t.User, t.Message)
    }
}

// 清除滚动ID
_, err = scroll.Clear(context.Background())
if err != nil {
    log.Printf("Failed to clear scroll: %v", err)
}

批量操作

// 创建批量处理器
bulkRequest := client.Bulk()

// 添加多个操作
tweet1 := Tweet{User: "user1", Message: "Message 1"}
bulkRequest.Add(elastic.NewBulkIndexRequest().
    Index("twitter").
    Id("1").
    Doc(tweet1))

tweet2 := Tweet{User: "user2", Message: "Message 2"}
bulkRequest.Add(elastic.NewBulkIndexRequest().
    Index("twitter").
    Id("2").
    Doc(tweet2))

// 执行批量操作
bulkResponse, err := bulkRequest.Do(context.Background())
if err != nil {
    log.Fatal(err)
}

// 检查失败的操作
if bulkResponse.Errors {
    for _, item := range bulkResponse.Items {
        for _, result := range item {
            if result.Error != nil {
                log.Printf("Failed to index document %s: %v", result.Id, result.Error)
            }
        }
    }
}

注意事项

  1. 这是一个正在积极开发的分支,不要在生产环境中使用
  2. 如果要使用稳定版本的Elastic,请使用Go modules管理依赖。
  3. 对于7.x版本,导入路径应为:import "github.com/olivere/elastic/v7"

更多详细信息和主题(如如何添加中间件或如何连接到AWS Elasticsearch服务)请参考Wiki。


更多关于golang Elasticsearch客户端操作插件库elastic的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang Elasticsearch客户端操作插件库elastic的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Golang Elasticsearch客户端操作:elastic库使用指南

elastic是一个强大的Go语言Elasticsearch客户端库,提供了丰富的API来与Elasticsearch交互。下面我将详细介绍其使用方法。

1. 安装与初始化

首先安装elastic库:

go get github.com/olivere/elastic/v7

初始化客户端:

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/olivere/elastic/v7"
)

func main() {
	// 创建客户端
	client, err := elastic.NewClient(
		elastic.SetURL("http://localhost:9200"), // 设置ES地址
		elastic.SetSniff(false),                 // 禁用嗅探器(适用于Docker环境)
	)
	if err != nil {
		log.Fatal(err)
	}
	defer client.Stop()

	// 检查ES是否运行
	info, code, err := client.Ping("http://localhost:9200").Do(context.Background())
	if err != nil {
		log.Fatal(err)
	}
	fmt.Printf("Elasticsearch returned with code %d and version %s\n", code, info.Version.Number)
}

2. 索引操作

创建索引

// 创建索引
createIndex, err := client.CreateIndex("my_index").Do(context.Background())
if err != nil {
    log.Fatal(err)
}
if !createIndex.Acknowledged {
    fmt.Println("创建索引未被确认")
}

检查索引是否存在

exists, err := client.IndexExists("my_index").Do(context.Background())
if err != nil {
    log.Fatal(err)
}
fmt.Printf("索引存在: %v\n", exists)

删除索引

deleteIndex, err := client.DeleteIndex("my_index").Do(context.Background())
if err != nil {
    log.Fatal(err)
}
if !deleteIndex.Acknowledged {
    fmt.Println("删除索引未被确认")
}

3. 文档操作

添加文档

type User struct {
    Name string `json:"name"`
    Age  int    `json:"age"`
}

// 添加文档
user := User{Name: "张三", Age: 30}
put, err := client.Index().
    Index("my_index").
    Id("1").          // 设置文档ID
    BodyJson(user).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}
fmt.Printf("文档ID %s 添加到索引 %s\n", put.Id, put.Index)

获取文档

// 获取文档
get, err := client.Get().
    Index("my_index").
    Id("1").
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}
if get.Found {
    var user User
    err := json.Unmarshal(get.Source, &user)
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("获取到文档: %+v\n", user)
}

更新文档

// 更新文档
update, err := client.Update().
    Index("my_index").
    Id("1").
    Doc(map[string]interface{}{"age": 31}).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}
fmt.Printf("文档版本变为 %d\n", update.Version)

删除文档

// 删除文档
delete, err := client.Delete().
    Index("my_index").
    Id("1").
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}
fmt.Printf("文档删除: %s\n", delete.Result)

4. 搜索操作

简单搜索

// 简单搜索
termQuery := elastic.NewTermQuery("name", "张三")
searchResult, err := client.Search().
    Index("my_index").
    Query(termQuery).
    From(0).Size(10).
    Pretty(true).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}

fmt.Printf("查询耗时 %d 毫秒, 结果总数: %d\n", 
    searchResult.TookInMillis, 
    searchResult.TotalHits())

for _, hit := range searchResult.Hits.Hits {
    var user User
    err := json.Unmarshal(hit.Source, &user)
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("用户 %s 年龄 %d\n", user.Name, user.Age)
}

组合查询

// 组合查询
boolQuery := elastic.NewBoolQuery()
boolQuery.Must(elastic.NewMatchQuery("name", "张三"))
boolQuery.Filter(elastic.NewRangeQuery("age").Gt(25))

searchResult, err := client.Search().
    Index("my_index").
    Query(boolQuery).
    Sort("age", true). // 按年龄升序
    From(0).Size(10).
    Do(context.Background())
// 处理结果同上

聚合查询

// 聚合查询
aggs := elastic.NewTermsAggregation().Field("age")

searchResult, err := client.Search().
    Index("my_index").
    Query(elastic.NewMatchAllQuery()).
    Aggregation("ages", aggs).
    Size(0). // 不返回具体文档
    Do(context.Background())

if agg, found := searchResult.Aggregations.Terms("ages"); found {
    for _, bucket := range agg.Buckets {
        fmt.Printf("年龄 %s 有 %d 人\n", bucket.Key, bucket.DocCount)
    }
}

5. 批量操作

// 批量操作
bulkRequest := client.Bulk()

users := []User{
    {Name: "李四", Age: 25},
    {Name: "王五", Age: 28},
    {Name: "赵六", Age: 32},
}

for i, user := range users {
    req := elastic.NewBulkIndexRequest().
        Index("my_index").
        Id(fmt.Sprintf("%d", i+2)).
        Doc(user)
    bulkRequest = bulkRequest.Add(req)
}

bulkResponse, err := bulkRequest.Do(context.Background())
if err != nil {
    log.Fatal(err)
}
fmt.Printf("批量操作耗时 %d 毫秒\n", bulkResponse.Took)

6. 使用技巧与注意事项

  1. 连接池管理:elastic客户端内置连接池,建议复用客户端实例

  2. 上下文控制:所有操作都需要传入context,便于超时和取消控制

  3. 错误处理:Elasticsearch可能返回HTTP错误,需要检查err和响应状态

  4. 版本兼容:确保客户端版本与Elasticsearch服务器版本兼容

  5. 性能优化

    • 批量操作代替单条操作
    • 合理设置分页大小
    • 使用过滤器(filter)代替查询(query)提高性能

elastic库功能强大,以上只是基本用法。实际使用时,建议参考官方文档了解更多高级特性。

回到顶部