golang事件驱动架构中的中介者模式和简化CQRS模式处理插件库Go-MediatR的使用

Golang事件驱动架构中的中介者模式和简化CQRS模式处理插件库Go-MediatR的使用

go-mediatr

这个包是Golang中的中介者模式(Mediator Pattern)实现,灵感来源于.NET中优秀的jbogard/mediatr库。

安装

go get github.com/mehdihadeli/go-mediatr

特性

✅ 处理Request/Response消息,将消息传递给单个处理程序(命令、查询)

✅ 处理Notification消息,将消息传递给多个处理程序(事件)

管道行为(Pipeline Behaviours)用于在执行处理程序前后处理一些横切关注点

策略

MediatR有两种消息分发策略:

  1. Request/Response消息,分发给单个处理程序
  2. Notification消息,分发给所有(多个)处理程序,它们没有任何响应

Request/Response策略

request/response消息只有一个处理程序,可以在CQRS模式中处理命令和查询场景。

创建Request/Response消息

创建一个请求(命令或查询),它只有一个处理程序,我们可以创建一个命令消息或查询消息作为request

// Command (Request)
type CreateProductCommand struct {
    ProductID   uuid.UUID `validate:"required"`
    Name        string    `validate:"required,gte=0,lte=255"`
    Description string    `validate:"required,gte=0,lte=5000"`
    Price       float64   `validate:"required,gte=0"`
    CreatedAt   time.Time `validate:"required"`
}

// Query (Request)
type GetProdctByIdQuery struct {
    ProductID uuid.UUID `validate:"required"`
}

对于这些请求的响应,我们可以创建响应消息作为response

// Command (Response)
type CreateProductCommandResponse struct {
    ProductID uuid.UUID `json:"productId"`
}

// Query (Response)
type GetProductByIdQueryResponse struct {
    ProductID   uuid.UUID `json:"productId"`
    Name        string    `json:"name"`
    Description string    `json:"description"`
    Price       float64   `json:"price"`
    CreatedAt   time.Time `json:"createdAt"`
}

创建Request处理程序

为了处理我们的请求,我们应该为每个请求创建一个单一请求处理程序。每个处理程序应该实现RequestHandler接口。

type RequestHandler[TRequest any, TResponse any] interface {
    Handle(ctx context.Context, request TRequest) (TResponse, error)
}

这里我们为请求创建请求处理程序(命令处理程序和查询处理程序),实现上述接口:

// Command Handler
type CreateProductCommandHandler struct {
    productRepository *repository.InMemoryProductRepository
}

func NewCreateProductCommandHandler(productRepository *repository.InMemoryProductRepository) *CreateProductCommandHandler {
    return &CreateProductCommandHandler{productRepository: productRepository}
}

func (c *CreateProductCommandHandler) Handle(ctx context.Context, command *CreateProductCommand) (*creatingProductDtos.CreateProductCommandResponse, error) {
    product := &models.Product{
        ProductID:   command.ProductID,
        Name:        command.Name,
        Description: command.Description,
        Price:       command.Price,
        CreatedAt:   command.CreatedAt,
    }

    createdProduct, err := c.productRepository.CreateProduct(ctx, product)
    if err != nil {
        return nil, err
    }

    response := &creatingProductDtos.CreateProductCommandResponse{ProductID: createdProduct.ProductID}

    return response, nil
}
// Query Handler
type GetProductByIdQueryHandler struct {
    productRepository *repository.InMemoryProductRepository
}

func NewGetProductByIdQueryHandler(productRepository *repository.InMemoryProductRepository) *GetProductByIdQueryHandler {
    return &GetProductByIdQueryHandler{productRepository: productRepository}
}

func (c *GetProductByIdQueryHandler) Handle(ctx context.Context, query *GetProductByIdQuery) (*gettingProductDtos.GetProdctByIdQueryResponse, error) {
    product, err := c.productRepository.GetProductById(ctx, query.ProductID)
    if err != nil {
        return nil, err
    }

    response := &gettingProductDtos.GetProdctByIdQueryResponse{
        ProductID:   product.ProductID,
        Name:        product.Name,
        Description: product.Description,
        Price:       product.Price,
        CreatedAt:   product.CreatedAt,
    }

    return response, nil
}

注意:在我们不需要从请求处理程序获得响应的情况下,可以使用Unit类型,实际上是一个空结构体。

将Request处理程序注册到MediatR

发送分发我们的请求之前,我们应该将请求处理程序注册到MediatR。

这里我们将请求处理程序(命令处理程序和查询处理程序)注册到MediatR:

// Registering `createProductCommandHandler` request handler for `CreateProductCommand` request to the MediatR
mediatr.RegisterHandler[*creatingProduct.CreateProductCommand, *creatingProductsDtos.CreateProductCommandResponse](createProductCommandHandler)

// Registering `getProductByIdQueryHandler` request handler for `GetProductByIdQuery` request to the MediatR
mediatr.RegisterHandler[*gettingProduct.GetProductByIdQuery, *gettingProductDtos.GetProdctByIdQueryResponse](getProductByIdQueryHandler)

发送Request到MediatR

最后,通过中介发送消息。

这里我们将请求发送到MediatR,将它们分发给请求处理程序(命令处理程序和查询处理程序):

// Sending `CreateProductCommand` request to mediatr for dispatching to the `CreateProductCommandHandler` request handler
command := &CreateProductCommand{
    ProductID:   uuid.NewV4(),
    Name:        request.name,
    Description: request.description,
    Price:       request.price,
    CreatedAt:   time.Now(),
}

mediatr.Send[*CreateProductCommand, *creatingProductsDtos.CreateProductCommandResponse](ctx, command)
// Sending `GetProductByIdQuery` request to mediatr for dispatching to the `GetProductByIdQueryHandler` request handler
query := &GetProdctByIdQuery{
    ProductID:   uuid.NewV4()
}

mediatr.Send[*GetProductByIdQuery, *gettingProductsDtos.GetProductByIdQueryResponse](ctx, query)

Notification策略

notification消息可以有多个处理程序,并且没有任何响应,它可以处理事件驱动架构中的事件通知或通知。

创建Notification消息

创建一个通知(事件),它有多个处理程序并且没有任何响应,我们可以创建一个事件通知作为notification

// Event (Notification)
type ProductCreatedEvent struct {
    ProductID uuid.UUID   `json:"productId"`
    Name        string    `json:"name"`
    Description string    `json:"description"`
    Price       float64   `json:"price"`
    CreatedAt   time.Time `json:"createdAt"`
}

这个事件没有任何响应。

创建Notification处理程序

为了处理我们的通知,我们可以为每个通知事件创建多个通知处理程序。每个处理程序应该实现NotificationHandler接口。

type NotificationHandler[TNotification any] interface {
    Handle(ctx context.Context, notification TNotification) error
}

这里我们为通知创建多个通知事件处理程序,实现上述接口:

// Notification Event Handler1
type ProductCreatedEventHandler1 struct {
}

func (c *ProductCreatedEventHandler1) Handle(ctx context.Context, event *ProductCreatedEvent) error {
//Do something with the event here !
    return nil
}
// Notification Event Handler2
type ProductCreatedEventHandler2 struct {
}

func (c *ProductCreatedEventHandler2) Handle(ctx context.Context, event *ProductCreatedEvent) error {
//Do something with the event here !
    return nil
}

将Notification处理程序注册到MediatR

发布我们的通知之前,我们应该将通知处理程序注册到MediatR。

这里我们将通知处理程序注册到MediatR:

// Registering `notificationHandler1`, `notificationHandler2` notification handler for `ProductCreatedEvent` notification event to the MediatR
notificationHandler1 := &ProductCreatedEventHandler1{}
notificationHandler2 := &ProductCreatedEventHandler2{}

mediatr.RegisterNotificationHandlers[*events.ProductCreatedEvent](notificationHandler1, notificationHandler2)

发布Notification到MediatR

最后,通过中介发布通知事件。

这里我们将通知发布到MediatR,将它们分发给通知处理程序:

// Publishing `ProductCreatedEvent` notification to mediatr for dispatching to the `ProductCreatedEventHandler1`, `ProductCreatedEventHandler2` notification handlers
productCreatedEvent := 	&ProductCreatedEvent {
    ProductID:   createdProduct.ProductID,
    Name:        createdProduct.Name,
    Price:       createdProduct.Price,
    CreatedAt:   createdProduct.CreatedAt,
    Description: createdProduct.Description,
}

mediatr.Publish[*events.ProductCreatedEvent](ctx, productCreatedEvent)

使用管道行为(Pipeline Behaviors)

有时我们需要在运行请求处理程序之前或之后添加一些横切关注点,如日志记录、指标、断路器、重试等。在这种情况下,我们可以使用PipelineBehavior。它实际上就像一个中间件或装饰器模式

这些行为将在调用mediatr上的Send方法发送请求时,在执行请求处理程序之前或之后执行。

创建管道行为

为了创建一个管道行为,我们应该实现PipelineBehavior接口:

type PipelineBehavior interface {
    Handle(ctx context.Context, request interface{}, next RequestHandlerFunc) (interface{}, error)
}

request参数是通过mediatr的Send方法传入的请求对象,而next参数是行为链中下一个动作的延续,其类型为RequestHandlerFunc

这是一个管道行为的例子:

type RequestLoggerBehaviour struct {
}

func (r *RequestLoggerBehaviour) Handle(ctx context.Context, request interface{}, next mediatr.RequestHandlerFunc) (interface{}, error) {
    log.Printf("logging some stuff before handling the request")

    response, err := next()
    if err != nil {
        return nil, err
    }

    log.Println("logging some stuff after handling the request")

    return response, nil
}

在我们定义的行为中,我们需要调用next参数,它会调用行为链中的下一个动作,如果没有其他行为,next将调用我们的实际请求处理程序并返回响应。我们可以在调用行为链中的下一个动作之前或之后做一些事情。

将管道行为注册到MediatR

为了将我们的管道行为注册到MediatR,我们应该使用RegisterPipelineBehaviors方法:

loggerPipeline := &behaviours.RequestLoggerBehaviour{}
err = mediatr.RegisterRequestPipelineBehaviors(loggerPipeline)

更多关于golang事件驱动架构中的中介者模式和简化CQRS模式处理插件库Go-MediatR的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang事件驱动架构中的中介者模式和简化CQRS模式处理插件库Go-MediatR的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Go-MediatR:Golang中的中介者模式与简化CQRS实现

中介者模式与CQRS简介

中介者模式(Mediator Pattern)是一种行为设计模式,它通过引入一个中介者对象来封装一组对象之间的交互,从而减少对象间的直接依赖关系。在事件驱动架构中,这种模式特别有用。

CQRS(Command Query Responsibility Segregation)是一种架构模式,将读写操作分离为不同的模型:

  • 命令(Command):修改状态的操作
  • 查询(Query):读取状态的操作

Go-MediatR库概述

Go-MediatR是一个受.NET MediatR启发的Golang库,它简化了中介者模式的实现,并提供了对CQRS模式的支持。主要特点包括:

  • 请求/响应处理
  • 命令/查询分离
  • 通知/事件处理
  • 中间件支持

安装

go get github.com/Ozeer/gomediatr

基本使用示例

1. 定义命令和处理器

package main

import (
	"context"
	"fmt"
	"github.com/Ozeer/gomediatr"
)

// 定义命令
type CreateUserCommand struct {
	Name  string
	Email string
}

// 定义命令处理器
type CreateUserCommandHandler struct{}

func (h *CreateUserCommandHandler) Handle(ctx context.Context, command CreateUserCommand) error {
	fmt.Printf("Creating user: %s (%s)\n", command.Name, command.Email)
	// 实际创建用户的逻辑
	return nil
}

func main() {
	// 初始化MediatR
	mediatr := gomediatr.NewMediator()
	
	// 注册命令处理器
	err := mediatr.RegisterRequestHandler[CreateUserCommand, error](
		func() (CreateUserCommand, error) {
			return CreateUserCommand{}, nil
		},
		&CreateUserCommandHandler{},
	)
	
	if err != nil {
		panic(err)
	}
	
	// 发送命令
	err = mediatr.Send(context.Background(), CreateUserCommand{
		Name:  "John Doe",
		Email: "john@example.com",
	})
	
	if err != nil {
		fmt.Println("Error:", err)
	}
}

2. 定义查询和处理器

// 定义查询
type GetUserQuery struct {
	ID int
}

type User struct {
	ID    int
	Name  string
	Email string
}

// 定义查询处理器
type GetUserQueryHandler struct{}

func (h *GetUserQueryHandler) Handle(ctx context.Context, query GetUserQuery) (User, error) {
	// 模拟从数据库获取用户
	return User{
		ID:    query.ID,
		Name:  "John Doe",
		Email: "john@example.com",
	}, nil
}

func main() {
	mediatr := gomediatr.NewMediator()
	
	// 注册查询处理器
	err := mediatr.RegisterRequestHandler[GetUserQuery, User](
		func() (GetUserQuery, error) {
			return GetUserQuery{}, nil
		},
		&GetUserQueryHandler{},
	)
	
	if err != nil {
		panic(err)
	}
	
	// 发送查询
	user, err := mediatr.Send[GetUserQuery, User](context.Background(), GetUserQuery{ID: 1})
	
	if err != nil {
		fmt.Println("Error:", err)
	} else {
		fmt.Printf("User: %+v\n", user)
	}
}

3. 事件处理

// 定义事件
type UserCreatedEvent struct {
	UserID int
	Name   string
}

// 定义事件处理器
type UserCreatedEventHandler struct{}

func (h *UserCreatedEventHandler) Handle(ctx context.Context, event UserCreatedEvent) error {
	fmt.Printf("User created event handled: %d - %s\n", event.UserID, event.Name)
	return nil
}

func main() {
	mediatr := gomediatr.NewMediator()
	
	// 注册事件处理器
	err := mediatr.RegisterNotificationHandler[UserCreatedEvent](
		func() (UserCreatedEvent, error) {
			return UserCreatedEvent{}, nil
		},
		&UserCreatedEventHandler{},
	)
	
	if err != nil {
		panic(err)
	}
	
	// 发布事件
	err = mediatr.Publish(context.Background(), UserCreatedEvent{
		UserID: 1,
		Name:   "John Doe",
	})
	
	if err != nil {
		fmt.Println("Error:", err)
	}
}

高级特性

中间件支持

type LoggingMiddleware struct{}

func (m *LoggingMiddleware) Handle(
	ctx context.Context,
	request interface{},
	next gomediatr.RequestHandlerFunc,
) (interface{}, error) {
	fmt.Printf("Handling request: %T\n", request)
	
	// 调用下一个中间件或处理器
	response, err := next(ctx, request)
	
	if err != nil {
		fmt.Printf("Request failed: %v\n", err)
	} else {
		fmt.Printf("Request succeeded: %v\n", response)
	}
	
	return response, err
}

func main() {
	mediatr := gomediatr.NewMediator()
	mediatr.Use(&LoggingMiddleware{})
	
	// 注册处理器和发送请求的代码...
}

多个事件处理器

type EmailNotificationHandler struct{}

func (h *EmailNotificationHandler) Handle(ctx context.Context, event UserCreatedEvent) error {
	fmt.Printf("Sending welcome email to: %s\n", event.Name)
	return nil
}

func main() {
	mediatr := gomediatr.NewMediator()
	
	// 注册多个事件处理器
	mediatr.RegisterNotificationHandler[UserCreatedEvent](
		func() (UserCreatedEvent, error) {
			return UserCreatedEvent{}, nil
		},
		&UserCreatedEventHandler{},
	)
	
	mediatr.RegisterNotificationHandler[UserCreatedEvent](
		func() (UserCreatedEvent, error) {
			return UserCreatedEvent{}, nil
		},
		&EmailNotificationHandler{},
	)
	
	// 发布事件会触发所有处理器
	mediatr.Publish(context.Background(), UserCreatedEvent{
		UserID: 1,
		Name:   "John Doe",
	})
}

最佳实践

  1. 命令设计

    • 命令应该表示业务意图,而不仅仅是CRUD操作
    • 命令应该是不可变的
  2. 查询设计

    • 查询不应该有副作用
    • 为每个特定的UI需求创建专门的查询
  3. 事件设计

    • 事件应该用过去时命名(如UserCreated)
    • 事件应该包含足够的信息供处理器使用
  4. 依赖注入

    • 处理器可以通过构造函数注入依赖
    • 避免在处理器中使用全局状态

总结

Go-MediatR为Golang提供了一种简洁的方式来实现中介者模式和CQRS模式。它的主要优势包括:

  1. 减少组件间的直接依赖
  2. 清晰的职责分离
  3. 易于测试(每个处理器可以独立测试)
  4. 灵活的扩展性(通过中间件)

对于需要处理复杂业务逻辑的应用程序,特别是微服务架构中,Go-MediatR可以帮助保持代码的组织性和可维护性。

回到顶部