golang实现Oracle Coherence缓存API的gRPC网络传输插件库coherence-go-client的使用
Golang实现Oracle Coherence缓存API的gRPC网络传输插件库coherence-go-client的使用
Coherence Go Client简介
Coherence Go Client允许Go应用程序作为缓存客户端连接到Coherence集群,使用gRPC作为网络传输协议。
Coherence是一个可扩展、容错、云就绪的分布式平台,用于构建基于网格的应用程序和可靠存储数据。该产品广泛应用于关键金融交易系统、高性能电信产品和电子商务应用等领域。
主要特性
- 提供类似Map的接口操作缓存条目,包括:
Put
,PutWithExpiry
,PutIfAbsent
,PutAll
,Get
,GetAll
,Remove
,Clear
,GetOrDefault
,Replace
,ReplaceMapping
,Size
,IsEmpty
,ContainsKey
,ContainsValue
,ContainsEntry
等操作
- 支持集群端查询、聚合和过滤Map条目
- 支持使用EntryProcessors在集群端操作Map条目
- 支持注册监听器以接收以下通知:
- Map的插入、更新和删除等变更事件
- Map生命周期事件,如截断、释放或销毁
- 会话生命周期事件,如连接、断开、重新连接和关闭
- 支持将Go结构体存储为JSON,并能够序列化为Java对象以便其他Coherence语言API访问
- 支持近缓存(Near Cache),在Go客户端缓存频繁访问的数据以避免跨网络请求
- 在Coherence Community Edition 25.03+和商业版14.1.2.0+中支持简单和双端队列
- 所有Coherence API全面支持Go泛型
系统要求
- Coherence CE 22.06.4+, 25.03+ 或 Coherence 14.1.1.2206.4+ 商业版,并配置了gRPCProxy
- Go 1.23+
注意:如果要使用最新版本中的队列API,必须使用CE 24.09或商业版14.1.2.0.x。
注意:如果使用Go版本<1.23,可以使用客户端的v2.0.0版本。
启动支持gRPC的Coherence集群
在测试Go客户端之前,必须确保有可用的Coherence集群。对于本地开发,推荐使用Coherence CE Docker镜像,它包含了客户端正常运行所需的一切。
docker run -d -p 1408:1408 -p 30000:30000 ghcr.io/oracle/coherence-ce:25.03.1
安装
go get github.com/oracle/coherence-go-client/v2@latest
使用示例
以下示例连接到运行在默认端口1408上的gRPC Proxy的Coherence集群,创建一个键为int
、值为string
的新NamedMap
,并执行Put()
、Get()
和Size()
操作。
注意:键和值也可以是Go结构体。
package main
import (
"context"
"fmt"
"github.com/oracle/coherence-go-client/v2/coherence"
)
func main() {
var (
value *string
ctx = context.Background()
)
// 创建一个新的Session,连接到默认gRPC端口1408
session, err := coherence.NewSession(ctx, coherence.WithPlainText())
if err != nil {
panic(err)
}
defer session.Close()
// 获取一个键为int、值为string的NamedMap
namedMap, err := coherence.GetNamedMap[int, string](session, "my-map")
if err != nil {
panic(err)
}
// 放入一个新的键/值对
if _, err = namedMap.Put(ctx, 1, "one"); err != nil {
panic(err)
}
// 获取键1的值
if value, err = namedMap.Get(ctx, 1); err != nil {
panic(err)
}
fmt.Println("Value for key 1 is", *value)
// 更新键1的值
if _, err = namedMap.Put(ctx, 1, "ONE"); err != nil {
panic(err)
}
// 获取更新后的值
if value, err = namedMap.Get(ctx, 1); err != nil {
panic(err)
}
fmt.Println("Updated value is", *value)
// 移除该条目
if _, err = namedMap.Remove(ctx, 1); err != nil {
panic(err)
}
}
更复杂的示例(使用结构体)
package main
import (
"context"
"fmt"
"github.com/oracle/coherence-go-client/v2/coherence"
)
// 定义一个Person结构体
type Person struct {
ID int `json:"id"`
Name string `json:"name"`
Age int `json:"age"`
Email string `json:"email"`
}
func main() {
ctx := context.Background()
// 创建Session
session, err := coherence.NewSession(ctx, coherence.WithPlainText())
if err != nil {
panic(err)
}
defer session.Close()
// 获取一个键为string、值为Person的NamedMap
namedMap, err := coherence.GetNamedMap[string, Person](session, "people")
if err != nil {
panic(err)
}
// 创建一个Person实例
person := Person{
ID: 1,
Name: "John Doe",
Age: 30,
Email: "john.doe@example.com",
}
// 将Person存入缓存
if _, err = namedMap.Put(ctx, person.Email, person); err != nil {
panic(err)
}
// 从缓存中获取Person
value, err := namedMap.Get(ctx, person.Email)
if err != nil {
panic(err)
}
if value != nil {
fmt.Printf("Retrieved person: %+v\n", *value)
}
// 使用PutIfAbsent方法
newPerson := Person{
ID: 2,
Name: "Jane Smith",
Age: 28,
Email: "jane.smith@example.com",
}
if previous, err := namedMap.PutIfAbsent(ctx, newPerson.Email, newPerson); err != nil {
panic(err)
} else if previous != nil {
fmt.Println("Entry already existed")
} else {
fmt.Println("New entry added")
}
// 获取Map大小
size, err := namedMap.Size(ctx)
if err != nil {
panic(err)
}
fmt.Printf("Map size: %d\n", size)
}
监听器示例
package main
import (
"context"
"fmt"
"github.com/oracle/coherence-go-client/v2/coherence"
)
func main() {
ctx := context.Background()
// 创建Session
session, err := coherence.NewSession(ctx, coherence.WithPlainText())
if err != nil {
panic(err)
}
defer session.Close()
// 获取NamedMap
namedMap, err := coherence.GetNamedMap[string, string](session, "events")
if err != nil {
panic(err)
}
// 创建监听器
listener := coherence.MapListener[string, string]{
OnInserted: func(e coherence.MapEvent[string, string]) {
fmt.Printf("Entry inserted - key: %v, value: %v\n", e.Key(), e.NewValue())
},
OnUpdated: func(e coherence.MapEvent[string, string]) {
fmt.Printf("Entry updated - key: %v, old value: %v, new value: %v\n",
e.Key(), e.OldValue(), e.NewValue())
},
OnDeleted: func(e coherence.MapEvent[string, string]) {
fmt.Printf("Entry deleted - key: %v, old value: %v\n", e.Key(), e.OldValue())
},
}
// 添加监听器
if err = namedMap.AddListener(ctx, listener); err != nil {
panic(err)
}
// 执行一些操作触发事件
if _, err = namedMap.Put(ctx, "key1", "value1"); err != nil {
panic(err)
}
if _, err = namedMap.Put(ctx, "key1", "value2"); err != nil {
panic(err)
}
if _, err = namedMap.Remove(ctx, "key1"); err != nil {
panic(err)
}
// 移除监听器
if err = namedMap.RemoveListener(ctx, listener); err != nil {
panic(err)
}
}
注意事项
- 确保Coherence集群已正确配置gRPC Proxy
- 对于生产环境,应考虑使用安全连接而非明文连接
- 合理处理错误和资源释放(如使用defer关闭session)
- 根据实际需求选择合适的缓存策略和数据结构
以上示例展示了coherence-go-client的基本用法,更多高级功能可以参考官方文档和示例代码。
更多关于golang实现Oracle Coherence缓存API的gRPC网络传输插件库coherence-go-client的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang实现Oracle Coherence缓存API的gRPC网络传输插件库coherence-go-client的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
使用coherence-go-client实现Oracle Coherence缓存API的gRPC网络传输
Oracle Coherence是一个分布式缓存和数据网格产品,而coherence-go-client是一个Go语言客户端库,允许Go应用程序通过gRPC与Coherence集群交互。下面我将介绍如何使用这个库。
安装
首先安装coherence-go-client库:
go get github.com/oracle/coherence-go-client
基本使用示例
1. 初始化客户端
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/oracle/coherence-go-client/coherence"
)
func main() {
// 创建Session配置
config := coherence.SessionConfig{
Address: "localhost:1408", // Coherence gRPC代理地址
}
// 创建Session
session, err := coherence.NewSession(context.Background(), config)
if err != nil {
log.Fatalf("无法创建Session: %v", err)
}
defer session.Close()
// 获取NamedCache
cache, err := coherence.GetNamedCache[int, string](session, "my-cache")
if err != nil {
log.Fatalf("无法获取缓存: %v", err)
}
// 使用缓存...
}
2. 基本缓存操作
// 放入值
err = cache.Put(context.Background(), 1, "value1")
if err != nil {
log.Printf("Put操作失败: %v", err)
}
// 获取值
value, err := cache.Get(context.Background(), 1)
if err != nil {
log.Printf("Get操作失败: %v", err)
} else {
fmt.Printf("获取到的值: %v\n", value)
}
// 检查是否存在
exists, err := cache.ContainsKey(context.Background(), 1)
if err != nil {
log.Printf("ContainsKey操作失败: %v", err)
} else {
fmt.Printf("键1存在: %v\n", exists)
}
// 删除值
removed, err := cache.Remove(context.Background(), 1)
if err != nil {
log.Printf("Remove操作失败: %v", err)
} else {
fmt.Printf("删除的值: %v\n", removed)
}
3. 批量操作
// 批量放入
items := map[int]string{
1: "value1",
2: "value2",
3: "value3",
}
err = cache.PutAll(context.Background(), items)
if err != nil {
log.Printf("PutAll操作失败: %v", err)
}
// 批量获取
keys := []int{1, 2, 3}
result, err := cache.GetAll(context.Background(), keys)
if err != nil {
log.Printf("GetAll操作失败: %v", err)
} else {
for k, v := range result {
fmt.Printf("键: %d, 值: %s\n", k, v)
}
}
4. 使用选项配置
// 带有选项的Put操作
err = cache.Put(context.Background(), 1, "value1",
coherence.WithExpiry(10*time.Minute), // 10分钟后过期
coherence.WithUnits(coherence.UnitDays), // 时间单位
coherence.WithFormat("json")) // 值格式
if err != nil {
log.Printf("带选项的Put操作失败: %v", err)
}
5. 监听缓存事件
// 创建监听器
listener := coherence.NewMapListener[int, string]().
OnInserted(func(e coherence.MapEvent[int, string]) {
fmt.Printf("插入事件 - 键: %d, 新值: %s\n", e.Key(), e.NewValue())
}).
OnUpdated(func(e coherence.MapEvent[int, string]) {
fmt.Printf("更新事件 - 键: %d, 旧值: %s, 新值: %s\n",
e.Key(), e.OldValue(), e.NewValue())
}).
OnDeleted(func(e coherence.MapEvent[int, string]) {
fmt.Printf("删除事件 - 键: %d, 旧值: %s\n", e.Key(), e.OldValue())
})
// 添加监听器到缓存
_, err = cache.AddListener(context.Background(), listener)
if err != nil {
log.Printf("添加监听器失败: %v", err)
}
// 后续操作会触发事件...
高级功能
1. 使用过滤器
import "github.com/oracle/coherence-go-client/filters"
// 创建过滤器
filter := filters.Greater(10) // 值大于10的条目
// 使用过滤器查询
keys, err := cache.KeySet(context.Background(), coherence.WithFilter(filter))
if err != nil {
log.Printf("KeySet操作失败: %v", err)
} else {
fmt.Println("符合条件的键:", keys)
}
2. 聚合操作
import "github.com/oracle/coherence-go-client/aggregators"
// 创建聚合器
agg := aggregators.Count() // 计数聚合器
// 执行聚合
result, err := cache.Aggregate(context.Background(), agg, coherence.WithFilter(filter))
if err != nil {
log.Printf("Aggregate操作失败: %v", err)
} else {
fmt.Printf("聚合结果: %v\n", result)
}
3. 事务支持
// 开始事务
tx, err := session.BeginTransaction()
if err != nil {
log.Printf("开始事务失败: %v", err)
return
}
// 在事务中操作缓存
txCache := cache.WithTransaction(tx)
err = txCache.Put(context.Background(), 1, "transaction-value")
if err != nil {
log.Printf("事务中Put操作失败: %v", err)
tx.Rollback()
return
}
// 提交事务
err = tx.Commit()
if err != nil {
log.Printf("提交事务失败: %v", err)
}
最佳实践
- 连接池管理:重用Session而不是为每个请求创建新Session
- 错误处理:始终检查gRPC操作返回的错误
- 上下文使用:使用context.Context来传递截止时间和取消信号
- 资源清理:确保在使用后关闭Session和监听器
- 性能考虑:批量操作比单个操作更高效
总结
coherence-go-client提供了完整的API来与Oracle Coherence集群交互,包括基本的CRUD操作、批量操作、事件监听、过滤器和聚合器等高级功能。通过gRPC协议,Go应用程序可以高效地访问分布式缓存,实现数据共享和高速访问。
在实际应用中,建议根据具体需求选择合适的API,并注意资源管理和错误处理,以确保应用程序的健壮性和性能。