golang事件驱动架构中的中介者模式和简化CQRS模式处理插件库Go-MediatR的使用
Golang事件驱动架构中的中介者模式和简化CQRS模式处理插件库Go-MediatR的使用
这个包是Golang中的中介者模式(Mediator Pattern)
实现,灵感来源于.NET中优秀的jbogard/mediatr库。
安装
go get github.com/mehdihadeli/go-mediatr
特性
✅ 处理Request/Response
消息,将消息传递给单个处理程序(命令、查询)
✅ 处理Notification
消息,将消息传递给多个处理程序(事件)
✅ 管道行为(Pipeline Behaviours)
用于在执行处理程序前后处理一些横切关注点
策略
MediatR有两种消息分发策略:
Request/Response
消息,分发给单个处理程序
Notification
消息,分发给所有(多个)处理程序
,它们没有任何响应
Request/Response策略
request/response
消息只有一个处理程序
,可以在CQRS模式中处理命令和查询场景。
创建Request/Response消息
创建一个请求(命令或查询),它只有一个处理程序,我们可以创建一个命令消息或查询消息作为request
:
// Command (Request)
type CreateProductCommand struct {
ProductID uuid.UUID `validate:"required"`
Name string `validate:"required,gte=0,lte=255"`
Description string `validate:"required,gte=0,lte=5000"`
Price float64 `validate:"required,gte=0"`
CreatedAt time.Time `validate:"required"`
}
// Query (Request)
type GetProdctByIdQuery struct {
ProductID uuid.UUID `validate:"required"`
}
对于这些请求的响应,我们可以创建响应消息作为response
:
// Command (Response)
type CreateProductCommandResponse struct {
ProductID uuid.UUID `json:"productId"`
}
// Query (Response)
type GetProductByIdQueryResponse struct {
ProductID uuid.UUID `json:"productId"`
Name string `json:"name"`
Description string `json:"description"`
Price float64 `json:"price"`
CreatedAt time.Time `json:"createdAt"`
}
创建Request处理程序
为了处理我们的请求,我们应该为每个请求创建一个单一请求处理程序
。每个处理程序应该实现RequestHandler
接口。
type RequestHandler[TRequest any, TResponse any] interface {
Handle(ctx context.Context, request TRequest) (TResponse, error)
}
这里我们为请求创建请求处理程序
(命令处理程序和查询处理程序),实现上述接口:
// Command Handler
type CreateProductCommandHandler struct {
productRepository *repository.InMemoryProductRepository
}
func NewCreateProductCommandHandler(productRepository *repository.InMemoryProductRepository) *CreateProductCommandHandler {
return &CreateProductCommandHandler{productRepository: productRepository}
}
func (c *CreateProductCommandHandler) Handle(ctx context.Context, command *CreateProductCommand) (*creatingProductDtos.CreateProductCommandResponse, error) {
product := &models.Product{
ProductID: command.ProductID,
Name: command.Name,
Description: command.Description,
Price: command.Price,
CreatedAt: command.CreatedAt,
}
createdProduct, err := c.productRepository.CreateProduct(ctx, product)
if err != nil {
return nil, err
}
response := &creatingProductDtos.CreateProductCommandResponse{ProductID: createdProduct.ProductID}
return response, nil
}
// Query Handler
type GetProductByIdQueryHandler struct {
productRepository *repository.InMemoryProductRepository
}
func NewGetProductByIdQueryHandler(productRepository *repository.InMemoryProductRepository) *GetProductByIdQueryHandler {
return &GetProductByIdQueryHandler{productRepository: productRepository}
}
func (c *GetProductByIdQueryHandler) Handle(ctx context.Context, query *GetProductByIdQuery) (*gettingProductDtos.GetProdctByIdQueryResponse, error) {
product, err := c.productRepository.GetProductById(ctx, query.ProductID)
if err != nil {
return nil, err
}
response := &gettingProductDtos.GetProdctByIdQueryResponse{
ProductID: product.ProductID,
Name: product.Name,
Description: product.Description,
Price: product.Price,
CreatedAt: product.CreatedAt,
}
return response, nil
}
注意:在我们不需要从请求处理程序获得响应的情况下,可以使用
Unit
类型,实际上是一个空结构体。
将Request处理程序注册到MediatR
在发送
或分发
我们的请求之前,我们应该将请求处理程序注册
到MediatR。
这里我们将请求处理程序(命令处理程序和查询处理程序)注册到MediatR:
// Registering `createProductCommandHandler` request handler for `CreateProductCommand` request to the MediatR
mediatr.RegisterHandler[*creatingProduct.CreateProductCommand, *creatingProductsDtos.CreateProductCommandResponse](createProductCommandHandler)
// Registering `getProductByIdQueryHandler` request handler for `GetProductByIdQuery` request to the MediatR
mediatr.RegisterHandler[*gettingProduct.GetProductByIdQuery, *gettingProductDtos.GetProdctByIdQueryResponse](getProductByIdQueryHandler)
发送Request到MediatR
最后,通过中介发送消息。
这里我们将请求发送到MediatR,将它们分发给请求处理程序(命令处理程序和查询处理程序):
// Sending `CreateProductCommand` request to mediatr for dispatching to the `CreateProductCommandHandler` request handler
command := &CreateProductCommand{
ProductID: uuid.NewV4(),
Name: request.name,
Description: request.description,
Price: request.price,
CreatedAt: time.Now(),
}
mediatr.Send[*CreateProductCommand, *creatingProductsDtos.CreateProductCommandResponse](ctx, command)
// Sending `GetProductByIdQuery` request to mediatr for dispatching to the `GetProductByIdQueryHandler` request handler
query := &GetProdctByIdQuery{
ProductID: uuid.NewV4()
}
mediatr.Send[*GetProductByIdQuery, *gettingProductsDtos.GetProductByIdQueryResponse](ctx, query)
Notification策略
notification
消息可以有多个处理程序
,并且没有任何响应,它可以处理事件驱动架构中的事件通知或通知。
创建Notification消息
创建一个通知(事件),它有多个处理程序
并且没有任何响应,我们可以创建一个事件通知作为notification
:
// Event (Notification)
type ProductCreatedEvent struct {
ProductID uuid.UUID `json:"productId"`
Name string `json:"name"`
Description string `json:"description"`
Price float64 `json:"price"`
CreatedAt time.Time `json:"createdAt"`
}
这个事件没有任何响应。
创建Notification处理程序
为了处理我们的通知,我们可以为每个通知事件创建多个通知处理程序
。每个处理程序应该实现NotificationHandler
接口。
type NotificationHandler[TNotification any] interface {
Handle(ctx context.Context, notification TNotification) error
}
这里我们为通知创建多个通知事件处理程序
,实现上述接口:
// Notification Event Handler1
type ProductCreatedEventHandler1 struct {
}
func (c *ProductCreatedEventHandler1) Handle(ctx context.Context, event *ProductCreatedEvent) error {
//Do something with the event here !
return nil
}
// Notification Event Handler2
type ProductCreatedEventHandler2 struct {
}
func (c *ProductCreatedEventHandler2) Handle(ctx context.Context, event *ProductCreatedEvent) error {
//Do something with the event here !
return nil
}
将Notification处理程序注册到MediatR
在发布
我们的通知之前,我们应该将通知处理程序注册
到MediatR。
这里我们将通知处理程序注册到MediatR:
// Registering `notificationHandler1`, `notificationHandler2` notification handler for `ProductCreatedEvent` notification event to the MediatR
notificationHandler1 := &ProductCreatedEventHandler1{}
notificationHandler2 := &ProductCreatedEventHandler2{}
mediatr.RegisterNotificationHandlers[*events.ProductCreatedEvent](notificationHandler1, notificationHandler2)
发布Notification到MediatR
最后,通过中介发布通知事件。
这里我们将通知发布到MediatR,将它们分发给通知处理程序:
// Publishing `ProductCreatedEvent` notification to mediatr for dispatching to the `ProductCreatedEventHandler1`, `ProductCreatedEventHandler2` notification handlers
productCreatedEvent := &ProductCreatedEvent {
ProductID: createdProduct.ProductID,
Name: createdProduct.Name,
Price: createdProduct.Price,
CreatedAt: createdProduct.CreatedAt,
Description: createdProduct.Description,
}
mediatr.Publish[*events.ProductCreatedEvent](ctx, productCreatedEvent)
使用管道行为(Pipeline Behaviors)
有时我们需要在运行请求处理程序之前或之后添加一些横切关注点,如日志记录、指标、断路器、重试等。在这种情况下,我们可以使用PipelineBehavior
。它实际上就像一个中间件或装饰器模式。
这些行为将在调用mediatr上的Send
方法发送请求时,在执行请求处理程序之前或之后执行。
创建管道行为
为了创建一个管道行为,我们应该实现PipelineBehavior
接口:
type PipelineBehavior interface {
Handle(ctx context.Context, request interface{}, next RequestHandlerFunc) (interface{}, error)
}
request
参数是通过mediatr的Send
方法传入的请求对象,而next
参数是行为链中下一个动作的延续,其类型为RequestHandlerFunc
。
这是一个管道行为的例子:
type RequestLoggerBehaviour struct {
}
func (r *RequestLoggerBehaviour) Handle(ctx context.Context, request interface{}, next mediatr.RequestHandlerFunc) (interface{}, error) {
log.Printf("logging some stuff before handling the request")
response, err := next()
if err != nil {
return nil, err
}
log.Println("logging some stuff after handling the request")
return response, nil
}
在我们定义的行为中,我们需要调用next
参数,它会调用行为链中的下一个动作,如果没有其他行为,next
将调用我们的实际请求处理程序
并返回响应。我们可以在调用行为链中的下一个动作之前或之后做一些事情。
将管道行为注册到MediatR
为了将我们的管道行为注册到MediatR,我们应该使用RegisterPipelineBehaviors
方法:
loggerPipeline := &behaviours.RequestLoggerBehaviour{}
err = mediatr.RegisterRequestPipelineBehaviors(loggerPipeline)
更多关于golang事件驱动架构中的中介者模式和简化CQRS模式处理插件库Go-MediatR的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang事件驱动架构中的中介者模式和简化CQRS模式处理插件库Go-MediatR的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Go-MediatR:Golang中的中介者模式与简化CQRS实现
中介者模式与CQRS简介
中介者模式(Mediator Pattern)是一种行为设计模式,它通过引入一个中介者对象来封装一组对象之间的交互,从而减少对象间的直接依赖关系。在事件驱动架构中,这种模式特别有用。
CQRS(Command Query Responsibility Segregation)是一种架构模式,将读写操作分离为不同的模型:
- 命令(Command):修改状态的操作
- 查询(Query):读取状态的操作
Go-MediatR库概述
Go-MediatR是一个受.NET MediatR启发的Golang库,它简化了中介者模式的实现,并提供了对CQRS模式的支持。主要特点包括:
- 请求/响应处理
- 命令/查询分离
- 通知/事件处理
- 中间件支持
安装
go get github.com/Ozeer/gomediatr
基本使用示例
1. 定义命令和处理器
package main
import (
"context"
"fmt"
"github.com/Ozeer/gomediatr"
)
// 定义命令
type CreateUserCommand struct {
Name string
Email string
}
// 定义命令处理器
type CreateUserCommandHandler struct{}
func (h *CreateUserCommandHandler) Handle(ctx context.Context, command CreateUserCommand) error {
fmt.Printf("Creating user: %s (%s)\n", command.Name, command.Email)
// 实际创建用户的逻辑
return nil
}
func main() {
// 初始化MediatR
mediatr := gomediatr.NewMediator()
// 注册命令处理器
err := mediatr.RegisterRequestHandler[CreateUserCommand, error](
func() (CreateUserCommand, error) {
return CreateUserCommand{}, nil
},
&CreateUserCommandHandler{},
)
if err != nil {
panic(err)
}
// 发送命令
err = mediatr.Send(context.Background(), CreateUserCommand{
Name: "John Doe",
Email: "john@example.com",
})
if err != nil {
fmt.Println("Error:", err)
}
}
2. 定义查询和处理器
// 定义查询
type GetUserQuery struct {
ID int
}
type User struct {
ID int
Name string
Email string
}
// 定义查询处理器
type GetUserQueryHandler struct{}
func (h *GetUserQueryHandler) Handle(ctx context.Context, query GetUserQuery) (User, error) {
// 模拟从数据库获取用户
return User{
ID: query.ID,
Name: "John Doe",
Email: "john@example.com",
}, nil
}
func main() {
mediatr := gomediatr.NewMediator()
// 注册查询处理器
err := mediatr.RegisterRequestHandler[GetUserQuery, User](
func() (GetUserQuery, error) {
return GetUserQuery{}, nil
},
&GetUserQueryHandler{},
)
if err != nil {
panic(err)
}
// 发送查询
user, err := mediatr.Send[GetUserQuery, User](context.Background(), GetUserQuery{ID: 1})
if err != nil {
fmt.Println("Error:", err)
} else {
fmt.Printf("User: %+v\n", user)
}
}
3. 事件处理
// 定义事件
type UserCreatedEvent struct {
UserID int
Name string
}
// 定义事件处理器
type UserCreatedEventHandler struct{}
func (h *UserCreatedEventHandler) Handle(ctx context.Context, event UserCreatedEvent) error {
fmt.Printf("User created event handled: %d - %s\n", event.UserID, event.Name)
return nil
}
func main() {
mediatr := gomediatr.NewMediator()
// 注册事件处理器
err := mediatr.RegisterNotificationHandler[UserCreatedEvent](
func() (UserCreatedEvent, error) {
return UserCreatedEvent{}, nil
},
&UserCreatedEventHandler{},
)
if err != nil {
panic(err)
}
// 发布事件
err = mediatr.Publish(context.Background(), UserCreatedEvent{
UserID: 1,
Name: "John Doe",
})
if err != nil {
fmt.Println("Error:", err)
}
}
高级特性
中间件支持
type LoggingMiddleware struct{}
func (m *LoggingMiddleware) Handle(
ctx context.Context,
request interface{},
next gomediatr.RequestHandlerFunc,
) (interface{}, error) {
fmt.Printf("Handling request: %T\n", request)
// 调用下一个中间件或处理器
response, err := next(ctx, request)
if err != nil {
fmt.Printf("Request failed: %v\n", err)
} else {
fmt.Printf("Request succeeded: %v\n", response)
}
return response, err
}
func main() {
mediatr := gomediatr.NewMediator()
mediatr.Use(&LoggingMiddleware{})
// 注册处理器和发送请求的代码...
}
多个事件处理器
type EmailNotificationHandler struct{}
func (h *EmailNotificationHandler) Handle(ctx context.Context, event UserCreatedEvent) error {
fmt.Printf("Sending welcome email to: %s\n", event.Name)
return nil
}
func main() {
mediatr := gomediatr.NewMediator()
// 注册多个事件处理器
mediatr.RegisterNotificationHandler[UserCreatedEvent](
func() (UserCreatedEvent, error) {
return UserCreatedEvent{}, nil
},
&UserCreatedEventHandler{},
)
mediatr.RegisterNotificationHandler[UserCreatedEvent](
func() (UserCreatedEvent, error) {
return UserCreatedEvent{}, nil
},
&EmailNotificationHandler{},
)
// 发布事件会触发所有处理器
mediatr.Publish(context.Background(), UserCreatedEvent{
UserID: 1,
Name: "John Doe",
})
}
最佳实践
-
命令设计:
- 命令应该表示业务意图,而不仅仅是CRUD操作
- 命令应该是不可变的
-
查询设计:
- 查询不应该有副作用
- 为每个特定的UI需求创建专门的查询
-
事件设计:
- 事件应该用过去时命名(如UserCreated)
- 事件应该包含足够的信息供处理器使用
-
依赖注入:
- 处理器可以通过构造函数注入依赖
- 避免在处理器中使用全局状态
总结
Go-MediatR为Golang提供了一种简洁的方式来实现中介者模式和CQRS模式。它的主要优势包括:
- 减少组件间的直接依赖
- 清晰的职责分离
- 易于测试(每个处理器可以独立测试)
- 灵活的扩展性(通过中间件)
对于需要处理复杂业务逻辑的应用程序,特别是微服务架构中,Go-MediatR可以帮助保持代码的组织性和可维护性。