golang Elasticsearch客户端操作插件库elastic的使用
Golang Elasticsearch客户端操作插件库elastic的使用
Elastic是一个用于Go编程语言的Elasticsearch客户端库。
版本对应关系
以下是Elasticsearch版本与Elastic客户端的对应关系:
Elasticsearch版本 | Elastic版本 | 包路径 | 备注 |
---|---|---|---|
7.x | 7.0 | github.com/olivere/elastic/v7 | 使用Go modules |
6.x | 6.0 | github.com/olivere/elastic | 使用依赖管理工具 |
5.x | 5.0 | gopkg.in/olivere/elastic.v5 | 积极维护 |
2.x | 3.0 | gopkg.in/olivere/elastic.v3 | 已弃用 |
1.x | 2.0 | gopkg.in/olivere/elastic.v2 | 已弃用 |
0.9-1.3 | 1.0 | gopkg.in/olivere/elastic.v1 | 已弃用 |
基本使用示例
以下是一个完整的示例,展示如何创建客户端、创建索引、添加文档和执行搜索:
package main
import (
"context"
"fmt"
"log"
"github.com/olivere/elastic/v7"
)
// 定义一个简单的文档结构
type Tweet struct {
User string `json:"user"`
Message string `json:"message"`
}
func main() {
// 创建客户端
client, err := elastic.NewClient(
elastic.SetURL("http://127.0.0.1:9200"), // 默认连接本地9200端口
elastic.SetSniff(false), // 在生产环境中建议关闭嗅探
)
if err != nil {
log.Fatal(err)
}
// 检查Elasticsearch是否可用
info, code, err := client.Ping("http://127.0.0.1:9200").Do(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("Elasticsearch returned with code %d and version %s\n", code, info.Version.Number)
// 检查索引是否存在
exists, err := client.IndexExists("twitter").Do(context.Background())
if err != nil {
log.Fatal(err)
}
if !exists {
// 创建索引
createIndex, err := client.CreateIndex("twitter").BodyString(`{
"settings":{
"number_of_shards":1,
"number_of_replicas":0
}
}`).Do(context.Background())
if err != nil {
log.Fatal(err)
}
if !createIndex.Acknowledged {
log.Println("Create index not acknowledged")
}
}
// 添加文档
tweet := Tweet{User: "olivere", Message: "Take Five"}
put1, err := client.Index().
Index("twitter").
Id("1").
BodyJson(tweet).
Do(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("Indexed tweet %s to index %s, type %s\n", put1.Id, put1.Index, put1.Type)
// 执行搜索
termQuery := elastic.NewTermQuery("user", "olivere")
searchResult, err := client.Search().
Index("twitter").
Query(termQuery).
Sort("user", true).
From(0).Size(10).
Pretty(true).
Do(context.Background())
if err != nil {
log.Fatal(err)
}
// 打印搜索结果
fmt.Printf("Query took %d milliseconds\n", searchResult.TookInMillis)
fmt.Printf("Found a total of %d tweets\n", searchResult.TotalHits())
// 遍历结果
for _, hit := range searchResult.Hits.Hits {
var t Tweet
err := json.Unmarshal(hit.Source, &t)
if err != nil {
log.Printf("Failed to unmarshal source: %v", err)
continue
}
fmt.Printf("Tweet by %s: %s\n", t.User, t.Message)
}
}
主要功能API
文档API
- 索引API
- 获取API
- 删除API
- 按查询删除API
- 更新API
- 按查询更新API
- 批量获取API
- 批量API
- 重新索引API
搜索API
- 搜索
- 搜索模板
- 多搜索API
- 计数API
- 验证API
- 解释API
- 性能分析API
聚合
- 指标聚合(平均值、基数、最大值等)
- 桶聚合(日期直方图、范围等)
- 管道聚合(移动平均、导数等)
索引API
- 创建索引
- 删除索引
- 获取索引
- 索引是否存在
- 打开/关闭索引
- 索引模板
高级功能
滚动查询
// 初始化滚动查询
scroll := client.Scroll("twitter").
Size(100).
KeepAlive("1m")
for {
results, err := scroll.Do(context.Background())
if err == io.EOF {
break // 所有结果已返回
}
if err != nil {
log.Fatal(err)
}
// 处理结果
for _, hit := range results.Hits.Hits {
var t Tweet
err := json.Unmarshal(hit.Source, &t)
if err != nil {
log.Printf("Failed to unmarshal source: %v", err)
continue
}
fmt.Printf("Tweet by %s: %s\n", t.User, t.Message)
}
}
// 清除滚动ID
_, err = scroll.Clear(context.Background())
if err != nil {
log.Printf("Failed to clear scroll: %v", err)
}
批量操作
// 创建批量处理器
bulkRequest := client.Bulk()
// 添加多个操作
tweet1 := Tweet{User: "user1", Message: "Message 1"}
bulkRequest.Add(elastic.NewBulkIndexRequest().
Index("twitter").
Id("1").
Doc(tweet1))
tweet2 := Tweet{User: "user2", Message: "Message 2"}
bulkRequest.Add(elastic.NewBulkIndexRequest().
Index("twitter").
Id("2").
Doc(tweet2))
// 执行批量操作
bulkResponse, err := bulkRequest.Do(context.Background())
if err != nil {
log.Fatal(err)
}
// 检查失败的操作
if bulkResponse.Errors {
for _, item := range bulkResponse.Items {
for _, result := range item {
if result.Error != nil {
log.Printf("Failed to index document %s: %v", result.Id, result.Error)
}
}
}
}
注意事项
- 这是一个正在积极开发的分支,不要在生产环境中使用。
- 如果要使用稳定版本的Elastic,请使用Go modules管理依赖。
- 对于7.x版本,导入路径应为:
import "github.com/olivere/elastic/v7"
更多详细信息和主题(如如何添加中间件或如何连接到AWS Elasticsearch服务)请参考Wiki。
更多关于golang Elasticsearch客户端操作插件库elastic的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
1 回复
更多关于golang Elasticsearch客户端操作插件库elastic的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Golang Elasticsearch客户端操作:elastic库使用指南
elastic是一个强大的Go语言Elasticsearch客户端库,提供了丰富的API来与Elasticsearch交互。下面我将详细介绍其使用方法。
1. 安装与初始化
首先安装elastic库:
go get github.com/olivere/elastic/v7
初始化客户端:
package main
import (
"context"
"fmt"
"log"
"github.com/olivere/elastic/v7"
)
func main() {
// 创建客户端
client, err := elastic.NewClient(
elastic.SetURL("http://localhost:9200"), // 设置ES地址
elastic.SetSniff(false), // 禁用嗅探器(适用于Docker环境)
)
if err != nil {
log.Fatal(err)
}
defer client.Stop()
// 检查ES是否运行
info, code, err := client.Ping("http://localhost:9200").Do(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("Elasticsearch returned with code %d and version %s\n", code, info.Version.Number)
}
2. 索引操作
创建索引
// 创建索引
createIndex, err := client.CreateIndex("my_index").Do(context.Background())
if err != nil {
log.Fatal(err)
}
if !createIndex.Acknowledged {
fmt.Println("创建索引未被确认")
}
检查索引是否存在
exists, err := client.IndexExists("my_index").Do(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("索引存在: %v\n", exists)
删除索引
deleteIndex, err := client.DeleteIndex("my_index").Do(context.Background())
if err != nil {
log.Fatal(err)
}
if !deleteIndex.Acknowledged {
fmt.Println("删除索引未被确认")
}
3. 文档操作
添加文档
type User struct {
Name string `json:"name"`
Age int `json:"age"`
}
// 添加文档
user := User{Name: "张三", Age: 30}
put, err := client.Index().
Index("my_index").
Id("1"). // 设置文档ID
BodyJson(user).
Do(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("文档ID %s 添加到索引 %s\n", put.Id, put.Index)
获取文档
// 获取文档
get, err := client.Get().
Index("my_index").
Id("1").
Do(context.Background())
if err != nil {
log.Fatal(err)
}
if get.Found {
var user User
err := json.Unmarshal(get.Source, &user)
if err != nil {
log.Fatal(err)
}
fmt.Printf("获取到文档: %+v\n", user)
}
更新文档
// 更新文档
update, err := client.Update().
Index("my_index").
Id("1").
Doc(map[string]interface{}{"age": 31}).
Do(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("文档版本变为 %d\n", update.Version)
删除文档
// 删除文档
delete, err := client.Delete().
Index("my_index").
Id("1").
Do(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("文档删除: %s\n", delete.Result)
4. 搜索操作
简单搜索
// 简单搜索
termQuery := elastic.NewTermQuery("name", "张三")
searchResult, err := client.Search().
Index("my_index").
Query(termQuery).
From(0).Size(10).
Pretty(true).
Do(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("查询耗时 %d 毫秒, 结果总数: %d\n",
searchResult.TookInMillis,
searchResult.TotalHits())
for _, hit := range searchResult.Hits.Hits {
var user User
err := json.Unmarshal(hit.Source, &user)
if err != nil {
log.Fatal(err)
}
fmt.Printf("用户 %s 年龄 %d\n", user.Name, user.Age)
}
组合查询
// 组合查询
boolQuery := elastic.NewBoolQuery()
boolQuery.Must(elastic.NewMatchQuery("name", "张三"))
boolQuery.Filter(elastic.NewRangeQuery("age").Gt(25))
searchResult, err := client.Search().
Index("my_index").
Query(boolQuery).
Sort("age", true). // 按年龄升序
From(0).Size(10).
Do(context.Background())
// 处理结果同上
聚合查询
// 聚合查询
aggs := elastic.NewTermsAggregation().Field("age")
searchResult, err := client.Search().
Index("my_index").
Query(elastic.NewMatchAllQuery()).
Aggregation("ages", aggs).
Size(0). // 不返回具体文档
Do(context.Background())
if agg, found := searchResult.Aggregations.Terms("ages"); found {
for _, bucket := range agg.Buckets {
fmt.Printf("年龄 %s 有 %d 人\n", bucket.Key, bucket.DocCount)
}
}
5. 批量操作
// 批量操作
bulkRequest := client.Bulk()
users := []User{
{Name: "李四", Age: 25},
{Name: "王五", Age: 28},
{Name: "赵六", Age: 32},
}
for i, user := range users {
req := elastic.NewBulkIndexRequest().
Index("my_index").
Id(fmt.Sprintf("%d", i+2)).
Doc(user)
bulkRequest = bulkRequest.Add(req)
}
bulkResponse, err := bulkRequest.Do(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("批量操作耗时 %d 毫秒\n", bulkResponse.Took)
6. 使用技巧与注意事项
-
连接池管理:elastic客户端内置连接池,建议复用客户端实例
-
上下文控制:所有操作都需要传入context,便于超时和取消控制
-
错误处理:Elasticsearch可能返回HTTP错误,需要检查
err
和响应状态 -
版本兼容:确保客户端版本与Elasticsearch服务器版本兼容
-
性能优化:
- 批量操作代替单条操作
- 合理设置分页大小
- 使用过滤器(filter)代替查询(query)提高性能
elastic库功能强大,以上只是基本用法。实际使用时,建议参考官方文档了解更多高级特性。