golang官方Elasticsearch客户端插件库go-elasticsearch的使用
Golang官方Elasticsearch客户端插件库go-elasticsearch的使用
兼容性
Go语言
从版本8.12.0
开始,该库遵循Go语言的发布策略。每个主要的Go版本会一直支持到有两个更新的主要版本发布为止。
Elasticsearch
语言客户端是向前兼容的,这意味着客户端支持与Elasticsearch的更高或相等的次要版本通信。Elasticsearch语言客户端仅与默认发行版向后兼容,且不提供保证。
使用Go模块时,在导入路径中包含版本号,并指定显式版本或分支:
require github.com/elastic/go-elasticsearch/v9 v9.x.x
require github.com/elastic/go-elasticsearch/v8 v8.x.x
可以在单个项目中使用多个版本的客户端:
// go.mod
github.com/elastic/go-elasticsearch/v8 v8.18.0
github.com/elastic/go-elasticsearch/v9 v9.0.0
// main.go
import (
elasticsearch7 "github.com/elastic/go-elasticsearch/v8"
elasticsearch8 "github.com/elastic/go-elasticsearch/v9"
)
// ...
es8, _ := elasticsearch7.NewDefaultClient()
es9, _ := elasticsearch8.NewDefaultClient()
安装
使用go get命令安装:
go get github.com/elastic/go-elasticsearch/v9
连接示例
package main
import (
"log"
"github.com/elastic/go-elasticsearch/v9"
)
func main() {
// 创建默认客户端
cfg := elasticsearch.Config{
Addresses: []string{
"http://localhost:9200",
},
}
es, err := elasticsearch.NewClient(cfg)
if err != nil {
log.Fatalf("Error creating the client: %s", err)
}
// 检查集群健康状态
res, err := es.Cluster.Health()
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
defer res.Body.Close()
log.Println(res)
}
基本操作示例
创建索引
// 创建索引
res, err := es.Indices.Create("test-index")
if err != nil {
log.Fatalf("Cannot create index: %s", err)
}
defer res.Body.Close()
log.Println(res)
索引文档
// 索引文档
doc := `{"title":"Test document","content":"This is a test document"}`
res, err := es.Index(
"test-index", // 索引名
strings.NewReader(doc), // 文档内容
es.Index.WithDocumentID("1"), // 文档ID
es.Index.WithRefresh("true"), // 刷新索引
)
if err != nil {
log.Fatalf("Error indexing document: %s", err)
}
defer res.Body.Close()
log.Println(res)
搜索文档
// 搜索文档
query := `{
"query": {
"match": {
"title": "test"
}
}
}`
res, err := es.Search(
es.Search.WithIndex("test-index"),
es.Search.WithBody(strings.NewReader(query)),
es.Search.WithPretty(),
)
if err != nil {
log.Fatalf("Error searching documents: %s", err)
}
defer res.Body.Close()
log.Println(res)
删除文档
// 删除文档
res, err := es.Delete(
"test-index", // 索引名
"1", // 文档ID
)
if err != nil {
log.Fatalf("Error deleting document: %s", err)
}
defer res.Body.Close()
log.Println(res)
删除索引
// 删除索引
res, err := es.Indices.Delete([]string{"test-index"})
if err != nil {
log.Fatalf("Cannot delete index: %s", err)
}
defer res.Body.Close()
log.Println(res)
辅助工具
esutil
包提供了方便的辅助函数:
import "github.com/elastic/go-elasticsearch/v9/esutil"
// 使用JSONReader创建请求体
body := esutil.JSONReader(map[string]interface{}{
"query": map[string]interface{}{
"match": map[string]interface{}{
"title": "test",
},
},
})
res, err := es.Search(
es.Search.WithIndex("test-index"),
es.Search.WithBody(body),
)
完整示例
package main
import (
"context"
"encoding/json"
"log"
"strings"
"time"
"github.com/elastic/go-elasticsearch/v9"
"github.com/elastic/go-elasticsearch/v9/esapi"
)
func main() {
// 创建客户端
cfg := elasticsearch.Config{
Addresses: []string{"http://localhost:9200"},
}
es, err := elasticsearch.NewClient(cfg)
if err != nil {
log.Fatalf("Error creating the client: %s", err)
}
// 1. 创建索引
createIndex(es, "test-index")
// 2. 索引文档
indexDocument(es, "test-index", "1", `{"title":"Test document","content":"This is a test document"}`)
// 3. 获取文档
getDocument(es, "test-index", "1")
// 4. 搜索文档
searchDocuments(es, "test-index", "test")
// 5. 删除文档
deleteDocument(es, "test-index", "1")
// 6. 删除索引
deleteIndex(es, "test-index")
}
func createIndex(es *elasticsearch.Client, indexName string) {
res, err := es.Indices.Create(indexName)
if err != nil {
log.Fatalf("Cannot create index: %s", err)
}
defer res.Body.Close()
log.Printf("Create index response: %s", res.String())
}
func indexDocument(es *elasticsearch.Client, index, id, doc string) {
req := esapi.IndexRequest{
Index: index,
DocumentID: id,
Body: strings.NewReader(doc),
Refresh: "true",
}
res, err := req.Do(context.Background(), es)
if err != nil {
log.Fatalf("Error indexing document: %s", err)
}
defer res.Body.Close()
log.Printf("Index document response: %s", res.String())
}
func getDocument(es *elasticsearch.Client, index, id string) {
res, err := es.Get(index, id)
if err != nil {
log.Fatalf("Error getting document: %s", err)
}
defer res.Body.Close()
var result map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
log.Fatalf("Error parsing the response body: %s", err)
}
log.Printf("Document: %+v", result)
}
func searchDocuments(es *elasticsearch.Client, index, query string) {
searchBody := `{
"query": {
"match": {
"title": "` + query + `"
}
}
}`
res, err := es.Search(
es.Search.WithIndex(index),
es.Search.WithBody(strings.NewReader(searchBody)),
es.Search.WithPretty(),
)
if err != nil {
log.Fatalf("Error searching documents: %s", err)
}
defer res.Body.Close()
var result map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
log.Fatalf("Error parsing the response body: %s", err)
}
log.Printf("Search results: %+v", result)
}
func deleteDocument(es *elasticsearch.Client, index, id string) {
res, err := es.Delete(index, id)
if err != nil {
log.Fatalf("Error deleting document: %s", err)
}
defer res.Body.Close()
log.Printf("Delete document response: %s", res.String())
}
func deleteIndex(es *elasticsearch.Client, index string) {
res, err := es.Indices.Delete([]string{index})
if err != nil {
log.Fatalf("Cannot delete index: %s", err)
}
defer res.Body.Close()
log.Printf("Delete index response: %s", res.String())
}
更多示例
可以参考项目中的_examples
文件夹,其中包含了许多完整的示例代码,包括客户端配置和自定义、使用自定义CA进行安全(TLS)连接、为单元测试模拟传输、将客户端嵌入自定义类型、构建查询、单独和批量执行请求以及解析响应等。
更多关于golang官方Elasticsearch客户端插件库go-elasticsearch的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang官方Elasticsearch客户端插件库go-elasticsearch的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Golang官方Elasticsearch客户端go-elasticsearch使用指南
go-elasticsearch是Elastic官方提供的Golang客户端库,用于与Elasticsearch服务进行交互。下面我将详细介绍其基本使用方法。
安装
首先安装go-elasticsearch库:
go get github.com/elastic/go-elasticsearch/v8
基本使用
1. 初始化客户端
package main
import (
"log"
"github.com/elastic/go-elasticsearch/v8"
)
func main() {
// 默认配置连接本地9200端口
es, err := elasticsearch.NewDefaultClient()
if err != nil {
log.Fatalf("Error creating the client: %s", err)
}
// 检查连接
res, err := es.Info()
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
defer res.Body.Close()
log.Println(res)
}
2. 自定义配置客户端
cfg := elasticsearch.Config{
Addresses: []string{
"http://localhost:9200",
"http://localhost:9201",
},
Username: "foo",
Password: "bar",
Transport: &http.Transport{
MaxIdleConnsPerHost: 10,
ResponseHeaderTimeout: time.Second,
},
}
es, err := elasticsearch.NewClient(cfg)
if err != nil {
log.Fatalf("Error creating the client: %s", err)
}
常用操作示例
1. 索引文档
// 索引一个文档
doc := map[string]interface{}{
"title": "Test document",
"content": "This is a test document for go-elasticsearch",
}
data, _ := json.Marshal(doc)
res, err := es.Index(
"test-index", // 索引名称
strings.NewReader(string(data)), // 文档内容
es.Index.WithDocumentID("1"), // 文档ID
es.Index.WithRefresh("true"), // 立即刷新使文档可搜索
)
if err != nil {
log.Fatalf("Error indexing document: %s", err)
}
defer res.Body.Close()
log.Println(res)
2. 搜索文档
var buf bytes.Buffer
query := map[string]interface{}{
"query": map[string]interface{}{
"match": map[string]interface{}{
"title": "test",
},
},
}
if err := json.NewEncoder(&buf).Encode(query); err != nil {
log.Fatalf("Error encoding query: %s", err)
}
res, err := es.Search(
es.Search.WithContext(context.Background()),
es.Search.WithIndex("test-index"),
es.Search.WithBody(&buf),
es.Search.WithTrackTotalHits(true),
es.Search.WithPretty(),
)
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
defer res.Body.Close()
// 解析结果
var r map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
log.Fatalf("Error parsing the response body: %s", err)
}
log.Printf(
"[%s] %d hits; took: %dms",
res.Status(),
int(r["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"].(float64)),
int(r["took"].(float64)),
)
3. 批量操作
// 准备批量请求
var buf bytes.Buffer
// 第一个文档
doc1 := map[string]interface{}{
"title": "Batch document 1",
}
meta1 := map[string]interface{}{
"index": map[string]interface{}{
"_index": "test-index",
"_id": "2",
},
}
// 第二个文档
doc2 := map[string]interface{}{
"title": "Batch document 2",
}
meta2 := map[string]interface{}{
"index": map[string]interface{}{
"_index": "test-index",
"_id": "3",
},
}
// 编码元数据和文档
if err := json.NewEncoder(&buf).Encode(meta1); err != nil {
log.Fatal(err)
}
if err := json.NewEncoder(&buf).Encode(doc1); err != nil {
log.Fatal(err)
}
if err := json.NewEncoder(&buf).Encode(meta2); err != nil {
log.Fatal(err)
}
if err := json.NewEncoder(&buf).Encode(doc2); err != nil {
log.Fatal(err)
}
// 执行批量请求
res, err := es.Bulk(
strings.NewReader(buf.String()),
es.Bulk.WithIndex("test-index"),
)
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
defer res.Body.Close()
log.Println(res)
高级功能
1. 使用Scroll API处理大量数据
// 初始搜索请求
res, err := es.Search(
es.Search.WithIndex("test-index"),
es.Search.WithScroll(10*time.Minute),
es.Search.WithSize(100),
es.Search.WithQuery("match_all"),
)
if err != nil {
log.Fatal(err)
}
var r map[string]interface{}
json.NewDecoder(res.Body).Decode(&r)
res.Body.Close()
scrollID := r["_scroll_id"].(string)
hits := r["hits"].(map[string]interface{})["hits"].([]interface{})
// 处理第一批结果
for _, hit := range hits {
log.Printf("Document: %s", hit.(map[string]interface{})["_id"])
}
// 使用Scroll API获取剩余结果
for len(hits) > 0 {
res, err := es.Scroll(
es.Scroll.WithScrollID(scrollID),
es.Scroll.WithScroll(10*time.Minute),
)
if err != nil {
log.Fatal(err)
}
json.NewDecoder(res.Body).Decode(&r)
res.Body.Close()
scrollID = r["_scroll_id"].(string)
hits = r["hits"].(map[string]interface{})["hits"].([]interface{})
for _, hit := range hits {
log.Printf("Document: %s", hit.(map[string]interface{})["_id"])
}
}
// 清除scroll
es.ClearScroll(es.ClearScroll.WithScrollID(scrollID))
2. 使用异步Bulk处理器
// 创建Bulk处理器
bulk, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Index: "test-index", // 默认索引名
Client: es, // Elasticsearch客户端
NumWorkers: 4, // worker数量
FlushBytes: 5e+6, // 5MB刷新阈值
})
// 添加文档到处理器
doc := map[string]interface{}{
"title": "Async bulk document",
}
data, _ := json.Marshal(doc)
err = bulk.Add(
context.Background(),
esutil.BulkIndexerItem{
Action: "index",
Document: strings.NewReader(string(data)),
OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) {
log.Printf("Indexed document ID: %s", res.Id)
},
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
log.Printf("Error indexing document: %s", err)
},
},
)
if err != nil {
log.Fatal(err)
}
// 关闭处理器并等待所有文档处理完成
if err := bulk.Close(context.Background()); err != nil {
log.Fatalf("Unexpected error: %s", err)
}
stats := bulk.Stats()
log.Printf("Indexed %d documents", stats.NumAdded)
最佳实践
- 客户端重用:创建一次客户端并重用,而不是为每个请求创建新客户端
- 连接池:适当配置Transport参数以优化连接池
- 错误处理:始终检查错误并处理响应体
- 批量操作:对于大量文档操作,使用Bulk API
- 上下文:使用context.Context来管理请求超时和取消
go-elasticsearch提供了丰富的API来满足各种Elasticsearch操作需求,更多高级用法可以参考官方文档。