golang官方Elasticsearch客户端插件库go-elasticsearch的使用

Golang官方Elasticsearch客户端插件库go-elasticsearch的使用

兼容性

Go语言

从版本8.12.0开始,该库遵循Go语言的发布策略。每个主要的Go版本会一直支持到有两个更新的主要版本发布为止。

Elasticsearch

语言客户端是向前兼容的,这意味着客户端支持与Elasticsearch的更高或相等的次要版本通信。Elasticsearch语言客户端仅与默认发行版向后兼容,且不提供保证。

使用Go模块时,在导入路径中包含版本号,并指定显式版本或分支:

require github.com/elastic/go-elasticsearch/v9 v9.x.x
require github.com/elastic/go-elasticsearch/v8 v8.x.x

可以在单个项目中使用多个版本的客户端:

// go.mod
github.com/elastic/go-elasticsearch/v8 v8.18.0
github.com/elastic/go-elasticsearch/v9 v9.0.0

// main.go
import (
  elasticsearch7 "github.com/elastic/go-elasticsearch/v8"
  elasticsearch8 "github.com/elastic/go-elasticsearch/v9"
)
// ...
es8, _ := elasticsearch7.NewDefaultClient()
es9, _ := elasticsearch8.NewDefaultClient()

安装

使用go get命令安装:

go get github.com/elastic/go-elasticsearch/v9

连接示例

package main

import (
	"log"

	"github.com/elastic/go-elasticsearch/v9"
)

func main() {
	// 创建默认客户端
	cfg := elasticsearch.Config{
		Addresses: []string{
			"http://localhost:9200",
		},
	}
	
	es, err := elasticsearch.NewClient(cfg)
	if err != nil {
		log.Fatalf("Error creating the client: %s", err)
	}

	// 检查集群健康状态
	res, err := es.Cluster.Health()
	if err != nil {
		log.Fatalf("Error getting response: %s", err)
	}
	defer res.Body.Close()
	
	log.Println(res)
}

基本操作示例

创建索引

// 创建索引
res, err := es.Indices.Create("test-index")
if err != nil {
	log.Fatalf("Cannot create index: %s", err)
}
defer res.Body.Close()
log.Println(res)

索引文档

// 索引文档
doc := `{"title":"Test document","content":"This is a test document"}`

res, err := es.Index(
	"test-index",                      // 索引名
	strings.NewReader(doc),            // 文档内容
	es.Index.WithDocumentID("1"),      // 文档ID
	es.Index.WithRefresh("true"),      // 刷新索引
)
if err != nil {
	log.Fatalf("Error indexing document: %s", err)
}
defer res.Body.Close()
log.Println(res)

搜索文档

// 搜索文档
query := `{
  "query": {
    "match": {
      "title": "test"
    }
  }
}`

res, err := es.Search(
	es.Search.WithIndex("test-index"),
	es.Search.WithBody(strings.NewReader(query)),
	es.Search.WithPretty(),
)
if err != nil {
	log.Fatalf("Error searching documents: %s", err)
}
defer res.Body.Close()
log.Println(res)

删除文档

// 删除文档
res, err := es.Delete(
	"test-index",  // 索引名
	"1",           // 文档ID
)
if err != nil {
	log.Fatalf("Error deleting document: %s", err)
}
defer res.Body.Close()
log.Println(res)

删除索引

// 删除索引
res, err := es.Indices.Delete([]string{"test-index"})
if err != nil {
	log.Fatalf("Cannot delete index: %s", err)
}
defer res.Body.Close()
log.Println(res)

辅助工具

esutil包提供了方便的辅助函数:

import "github.com/elastic/go-elasticsearch/v9/esutil"

// 使用JSONReader创建请求体
body := esutil.JSONReader(map[string]interface{}{
	"query": map[string]interface{}{
		"match": map[string]interface{}{
			"title": "test",
		},
	},
})

res, err := es.Search(
	es.Search.WithIndex("test-index"),
	es.Search.WithBody(body),
)

完整示例

package main

import (
	"context"
	"encoding/json"
	"log"
	"strings"
	"time"

	"github.com/elastic/go-elasticsearch/v9"
	"github.com/elastic/go-elasticsearch/v9/esapi"
)

func main() {
	// 创建客户端
	cfg := elasticsearch.Config{
		Addresses: []string{"http://localhost:9200"},
	}
	es, err := elasticsearch.NewClient(cfg)
	if err != nil {
		log.Fatalf("Error creating the client: %s", err)
	}

	// 1. 创建索引
	createIndex(es, "test-index")

	// 2. 索引文档
	indexDocument(es, "test-index", "1", `{"title":"Test document","content":"This is a test document"}`)

	// 3. 获取文档
	getDocument(es, "test-index", "1")

	// 4. 搜索文档
	searchDocuments(es, "test-index", "test")

	// 5. 删除文档
	deleteDocument(es, "test-index", "1")

	// 6. 删除索引
	deleteIndex(es, "test-index")
}

func createIndex(es *elasticsearch.Client, indexName string) {
	res, err := es.Indices.Create(indexName)
	if err != nil {
		log.Fatalf("Cannot create index: %s", err)
	}
	defer res.Body.Close()
	log.Printf("Create index response: %s", res.String())
}

func indexDocument(es *elasticsearch.Client, index, id, doc string) {
	req := esapi.IndexRequest{
		Index:      index,
		DocumentID: id,
		Body:       strings.NewReader(doc),
		Refresh:    "true",
	}

	res, err := req.Do(context.Background(), es)
	if err != nil {
		log.Fatalf("Error indexing document: %s", err)
	}
	defer res.Body.Close()
	log.Printf("Index document response: %s", res.String())
}

func getDocument(es *elasticsearch.Client, index, id string) {
	res, err := es.Get(index, id)
	if err != nil {
		log.Fatalf("Error getting document: %s", err)
	}
	defer res.Body.Close()
	
	var result map[string]interface{}
	if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
		log.Fatalf("Error parsing the response body: %s", err)
	}
	log.Printf("Document: %+v", result)
}

func searchDocuments(es *elasticsearch.Client, index, query string) {
	searchBody := `{
		"query": {
			"match": {
				"title": "` + query + `"
			}
		}
	}`

	res, err := es.Search(
		es.Search.WithIndex(index),
		es.Search.WithBody(strings.NewReader(searchBody)),
		es.Search.WithPretty(),
	)
	if err != nil {
		log.Fatalf("Error searching documents: %s", err)
	}
	defer res.Body.Close()
	
	var result map[string]interface{}
	if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
		log.Fatalf("Error parsing the response body: %s", err)
	}
	log.Printf("Search results: %+v", result)
}

func deleteDocument(es *elasticsearch.Client, index, id string) {
	res, err := es.Delete(index, id)
	if err != nil {
		log.Fatalf("Error deleting document: %s", err)
	}
	defer res.Body.Close()
	log.Printf("Delete document response: %s", res.String())
}

func deleteIndex(es *elasticsearch.Client, index string) {
	res, err := es.Indices.Delete([]string{index})
	if err != nil {
		log.Fatalf("Cannot delete index: %s", err)
	}
	defer res.Body.Close()
	log.Printf("Delete index response: %s", res.String())
}

更多示例

可以参考项目中的_examples文件夹,其中包含了许多完整的示例代码,包括客户端配置和自定义、使用自定义CA进行安全(TLS)连接、为单元测试模拟传输、将客户端嵌入自定义类型、构建查询、单独和批量执行请求以及解析响应等。


更多关于golang官方Elasticsearch客户端插件库go-elasticsearch的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang官方Elasticsearch客户端插件库go-elasticsearch的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Golang官方Elasticsearch客户端go-elasticsearch使用指南

go-elasticsearch是Elastic官方提供的Golang客户端库,用于与Elasticsearch服务进行交互。下面我将详细介绍其基本使用方法。

安装

首先安装go-elasticsearch库:

go get github.com/elastic/go-elasticsearch/v8

基本使用

1. 初始化客户端

package main

import (
	"log"
	"github.com/elastic/go-elasticsearch/v8"
)

func main() {
	// 默认配置连接本地9200端口
	es, err := elasticsearch.NewDefaultClient()
	if err != nil {
		log.Fatalf("Error creating the client: %s", err)
	}
	
	// 检查连接
	res, err := es.Info()
	if err != nil {
		log.Fatalf("Error getting response: %s", err)
	}
	defer res.Body.Close()
	
	log.Println(res)
}

2. 自定义配置客户端

cfg := elasticsearch.Config{
	Addresses: []string{
		"http://localhost:9200",
		"http://localhost:9201",
	},
	Username: "foo",
	Password: "bar",
	Transport: &http.Transport{
		MaxIdleConnsPerHost:   10,
		ResponseHeaderTimeout: time.Second,
	},
}

es, err := elasticsearch.NewClient(cfg)
if err != nil {
	log.Fatalf("Error creating the client: %s", err)
}

常用操作示例

1. 索引文档

// 索引一个文档
doc := map[string]interface{}{
	"title":   "Test document",
	"content": "This is a test document for go-elasticsearch",
}

data, _ := json.Marshal(doc)

res, err := es.Index(
	"test-index",                  // 索引名称
	strings.NewReader(string(data)), // 文档内容
	es.Index.WithDocumentID("1"),  // 文档ID
	es.Index.WithRefresh("true"),  // 立即刷新使文档可搜索
)
if err != nil {
	log.Fatalf("Error indexing document: %s", err)
}
defer res.Body.Close()

log.Println(res)

2. 搜索文档

var buf bytes.Buffer
query := map[string]interface{}{
	"query": map[string]interface{}{
		"match": map[string]interface{}{
			"title": "test",
		},
	},
}
if err := json.NewEncoder(&buf).Encode(query); err != nil {
	log.Fatalf("Error encoding query: %s", err)
}

res, err := es.Search(
	es.Search.WithContext(context.Background()),
	es.Search.WithIndex("test-index"),
	es.Search.WithBody(&buf),
	es.Search.WithTrackTotalHits(true),
	es.Search.WithPretty(),
)
if err != nil {
	log.Fatalf("Error getting response: %s", err)
}
defer res.Body.Close()

// 解析结果
var r map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
	log.Fatalf("Error parsing the response body: %s", err)
}

log.Printf(
	"[%s] %d hits; took: %dms",
	res.Status(),
	int(r["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"].(float64)),
	int(r["took"].(float64)),
)

3. 批量操作

// 准备批量请求
var buf bytes.Buffer

// 第一个文档
doc1 := map[string]interface{}{
	"title": "Batch document 1",
}
meta1 := map[string]interface{}{
	"index": map[string]interface{}{
		"_index": "test-index",
		"_id":    "2",
	},
}

// 第二个文档
doc2 := map[string]interface{}{
	"title": "Batch document 2",
}
meta2 := map[string]interface{}{
	"index": map[string]interface{}{
		"_index": "test-index",
		"_id":    "3",
	},
}

// 编码元数据和文档
if err := json.NewEncoder(&buf).Encode(meta1); err != nil {
	log.Fatal(err)
}
if err := json.NewEncoder(&buf).Encode(doc1); err != nil {
	log.Fatal(err)
}
if err := json.NewEncoder(&buf).Encode(meta2); err != nil {
	log.Fatal(err)
}
if err := json.NewEncoder(&buf).Encode(doc2); err != nil {
	log.Fatal(err)
}

// 执行批量请求
res, err := es.Bulk(
	strings.NewReader(buf.String()),
	es.Bulk.WithIndex("test-index"),
)
if err != nil {
	log.Fatalf("Error getting response: %s", err)
}
defer res.Body.Close()

log.Println(res)

高级功能

1. 使用Scroll API处理大量数据

// 初始搜索请求
res, err := es.Search(
	es.Search.WithIndex("test-index"),
	es.Search.WithScroll(10*time.Minute),
	es.Search.WithSize(100),
	es.Search.WithQuery("match_all"),
)
if err != nil {
	log.Fatal(err)
}

var r map[string]interface{}
json.NewDecoder(res.Body).Decode(&r)
res.Body.Close()

scrollID := r["_scroll_id"].(string)
hits := r["hits"].(map[string]interface{})["hits"].([]interface{})

// 处理第一批结果
for _, hit := range hits {
	log.Printf("Document: %s", hit.(map[string]interface{})["_id"])
}

// 使用Scroll API获取剩余结果
for len(hits) > 0 {
	res, err := es.Scroll(
		es.Scroll.WithScrollID(scrollID),
		es.Scroll.WithScroll(10*time.Minute),
	)
	if err != nil {
		log.Fatal(err)
	}
	
	json.NewDecoder(res.Body).Decode(&r)
	res.Body.Close()
	
	scrollID = r["_scroll_id"].(string)
	hits = r["hits"].(map[string]interface{})["hits"].([]interface{})
	
	for _, hit := range hits {
		log.Printf("Document: %s", hit.(map[string]interface{})["_id"])
	}
}

// 清除scroll
es.ClearScroll(es.ClearScroll.WithScrollID(scrollID))

2. 使用异步Bulk处理器

// 创建Bulk处理器
bulk, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
	Index:      "test-index",       // 默认索引名
	Client:     es,                // Elasticsearch客户端
	NumWorkers: 4,                 // worker数量
	FlushBytes: 5e+6,              // 5MB刷新阈值
})

// 添加文档到处理器
doc := map[string]interface{}{
	"title": "Async bulk document",
}
data, _ := json.Marshal(doc)

err = bulk.Add(
	context.Background(),
	esutil.BulkIndexerItem{
		Action:    "index",
		Document:  strings.NewReader(string(data)),
		OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) {
			log.Printf("Indexed document ID: %s", res.Id)
		},
		OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
			log.Printf("Error indexing document: %s", err)
		},
	},
)
if err != nil {
	log.Fatal(err)
}

// 关闭处理器并等待所有文档处理完成
if err := bulk.Close(context.Background()); err != nil {
	log.Fatalf("Unexpected error: %s", err)
}

stats := bulk.Stats()
log.Printf("Indexed %d documents", stats.NumAdded)

最佳实践

  1. 客户端重用:创建一次客户端并重用,而不是为每个请求创建新客户端
  2. 连接池:适当配置Transport参数以优化连接池
  3. 错误处理:始终检查错误并处理响应体
  4. 批量操作:对于大量文档操作,使用Bulk API
  5. 上下文:使用context.Context来管理请求超时和取消

go-elasticsearch提供了丰富的API来满足各种Elasticsearch操作需求,更多高级用法可以参考官方文档。

回到顶部