golang实现REST API通用操作日志与复制系统插件库oplog的使用
Golang实现REST API通用操作日志与复制系统插件库oplog的使用
概述
OpLog是一个用于生产者和消费者之间实时数据同步的通用数据库复制系统,适用于微服务架构或为公共API添加流式功能。
典型用例
- 组件处理权威数据库,多个独立组件需要保持数据的本地只读视图(如搜索引擎索引、推荐引擎、多区域架构等)
- 实现公共流式API来监控服务模型中对象的变更
安装
go get -u github.com/dailymotion/oplog
go build -a -o /usr/local/bin/oplogd github.com/dailymotion/oplog/cmd/oplogd
go build -a -o /usr/local/bin/oplog-sync github.com/dailymotion/oplog/cmd/oplog-sync
go build -a -o /usr/local/bin/oplog-tail github.com/dailymotion/oplog/cmd/oplog-tail
启动代理
oplogd --mongo-url mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]]/database[?options]
可用选项:
--capped-collection-size=10485760
:MongoDB capped集合大小(默认10MB)--debug=false
:显示调试日志--listen=":8042"
:监听地址--max-queued-events=100000
:队列最大事件数
生产者API:UDP和HTTP
可以通过UDP数据报或HTTP POST请求发送操作到代理。
HTTP请求示例:
package main
import (
"bytes"
"net/http"
)
func main() {
// 示例:通过HTTP发送操作
url := "http://localhost:8042/"
data := []byte(`{
"event": "insert",
"parents": ["video/xk32jd", "user/xkjdi"],
"type": "video",
"id": "xk32jd"
}`)
req, _ := http.NewRequest("POST", url, bytes.NewBuffer(data))
req.Header.Set("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
panic(err)
}
defer resp.Body.Close()
}
消费者API:Server Sent Event
SSE API运行在与UDP API相同的端口上,但使用TCP。
package main
import (
"fmt"
"io"
"net/http"
)
func main() {
// 示例:消费SSE流
req, _ := http.NewRequest("GET", "http://localhost:8042/", nil)
req.Header.Set("Accept", "text/event-stream")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
panic(err)
}
defer resp.Body.Close()
buf := make([]byte, 1024)
for {
n, err := resp.Body.Read(buf)
if err != nil {
if err == io.EOF {
break
}
panic(err)
}
fmt.Println(string(buf[:n]))
}
}
完整复制
要进行完整复制,将0
作为Last-Event-ID
HTTP头的值传递。
定期源同步
使用oplog-sync
命令执行同步:
oplog-sync --mongo-url mongodb://localhost/oplog --dump-file dump.json
状态端点
代理暴露了一个/status
HTTP端点来显示统计信息:
{
"clients": 0,
"connections": 0,
"events_discarded": 0,
"events_error": 0,
"events_ingested": 0,
"events_received": 0,
"events_sent": 0,
"queue_max_size": 100000,
"queue_size": 0,
"status": "OK"
}
架构示例
许可证
所有源代码均根据MIT许可证授权。
更多关于golang实现REST API通用操作日志与复制系统插件库oplog的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
1 回复
更多关于golang实现REST API通用操作日志与复制系统插件库oplog的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Golang实现REST API通用操作日志与复制系统插件库oplog的使用
操作日志(OpLog)是记录系统变更的重要组件,在REST API开发中尤为关键。下面我将介绍如何使用Golang实现通用的操作日志系统,并介绍MongoDB oplog的集成使用。
一、基础操作日志实现
1. 定义操作日志模型
package models
import (
"time"
)
type OperationLog struct {
ID string `json:"id" bson:"_id"`
UserID string `json:"user_id" bson:"user_id"`
Username string `json:"username" bson:"username"`
Operation string `json:"operation" bson:"operation"` // create/update/delete
Resource string `json:"resource" bson:"resource"` // 资源类型,如user/product
ResourceID string `json:"resource_id" bson:"resource_id"`
OldData interface{} `json:"old_data,omitempty" bson:"old_data,omitempty"`
NewData interface{} `json:"new_data,omitempty" bson:"new_data,omitempty"`
RequestData interface{} `json:"request_data,omitempty" bson:"request_data,omitempty"`
IP string `json:"ip" bson:"ip"`
CreatedAt time.Time `json:"created_at" bson:"created_at"`
}
2. 中间件记录操作日志
package middleware
import (
"context"
"net/http"
"time"
"github.com/yourproject/models"
)
func OpLogMiddleware(next http.Handler, logRepo models.OpLogRepository) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// 获取请求信息
userID := r.Header.Get("X-User-ID")
username := r.Header.Get("X-Username")
operation := getOperationFromMethod(r.Method)
resource := getResourceFromPath(r.URL.Path)
// 复制请求体以便后续使用
var requestBody interface{}
if r.Body != nil {
// 这里简化处理,实际需要根据Content-Type解析
// 可以使用io.TeeReader同时读取和复制请求体
}
// 包装ResponseWriter以捕获响应状态码和数据
lrw := NewLoggingResponseWriter(w)
// 调用下一个处理器
next.ServeHTTP(lrw, r)
// 记录日志
log := models.OperationLog{
UserID: userID,
Username: username,
Operation: operation,
Resource: resource,
RequestData: requestBody,
IP: r.RemoteAddr,
CreatedAt: time.Now(),
}
// 如果是修改操作,可以在这里获取变更前后的数据对比
if operation == "update" || operation == "delete" {
// 从上下文中获取旧数据(需要在业务处理中设置)
if oldData := r.Context().Value("old_data"); oldData != nil {
log.OldData = oldData
}
}
// 保存日志
go func() {
_ = logRepo.Create(context.Background(), &log)
}()
})
}
type loggingResponseWriter struct {
http.ResponseWriter
statusCode int
}
func NewLoggingResponseWriter(w http.ResponseWriter) *loggingResponseWriter {
return &loggingResponseWriter{w, http.StatusOK}
}
func (lrw *loggingResponseWriter) WriteHeader(code int) {
lrw.statusCode = code
lrw.ResponseWriter.WriteHeader(code)
}
二、集成MongoDB Oplog
MongoDB的oplog(操作日志)是一个特殊的集合,记录了所有对数据库的修改操作。
1. 监听MongoDB Oplog
package oplog
import (
"context"
"log"
"time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
type OplogWatcher struct {
client *mongo.Client
dbName string
collection string
callback func(bson.Raw)
}
func NewOplogWatcher(mongoURI, dbName, collection string, callback func(bson.Raw)) (*OplogWatcher, error) {
client, err := mongo.Connect(context.Background(), options.Client().ApplyURI(mongoURI))
if err != nil {
return nil, err
}
return &OplogWatcher{
client: client,
dbName: dbName,
collection: collection,
callback: callback,
}, nil
}
func (w *OplogWatcher) Watch() {
// 获取oplog集合
oplog := w.client.Database("local").Collection("oplog.rs")
// 获取最后一条oplog的时间戳
var lastEntry struct {
Timestamp bson.MongoTimestamp `bson:"ts"`
}
opts := options.FindOne().SetSort(bson.D{{"$natural", -1}})
err := oplog.FindOne(context.Background(), bson.D{}, opts).Decode(&lastEntry)
if err != nil {
log.Fatal(err)
}
// 创建监听管道
pipeline := mongo.Pipeline{
{{"$match", bson.D{
{"ns", w.dbName + "." + w.collection},
{"ts", bson.D{{"$gt", lastEntry.Timestamp}}},
}}},
}
streamOpts := options.ChangeStream().SetFullDocument(options.UpdateLookup)
stream, err := oplog.Watch(context.Background(), pipeline, streamOpts)
if err != nil {
log.Fatal(err)
}
// 处理变更流
for stream.Next(context.Background()) {
var changeEvent bson.Raw
if err := stream.Decode(&changeEvent); err != nil {
log.Println("Error decoding change event:", err)
continue
}
w.callback(changeEvent)
}
if err := stream.Err(); err != nil {
log.Println("Change stream error:", err)
}
}
func (w *OplogWatcher) Close() {
_ = w.client.Disconnect(context.Background())
}
2. 使用示例
func main() {
// 初始化oplog监听器
watcher, err := oplog.NewOplogWatcher(
"mongodb://localhost:27017",
"mydb",
"users",
func(change bson.Raw) {
// 处理变更事件
var event struct {
Op string `bson:"op"` // i=insert, u=update, d=delete
Ns string `bson:"ns"`
O bson.Raw
O2 bson.Raw `bson:"o2,omitempty"`
}
if err := bson.Unmarshal(change, &event); err != nil {
log.Println("Error unmarshalling change event:", err)
return
}
log.Printf("Operation: %s on %s", event.Op, event.Ns)
// 这里可以将变更转发到其他系统或进行其他处理
},
)
if err != nil {
log.Fatal(err)
}
defer watcher.Close()
// 启动监听
go watcher.Watch()
// 启动HTTP服务器
// ...
}
三、操作日志的应用场景
- 审计追踪:记录谁在什么时候做了什么操作
- 数据恢复:通过操作日志可以恢复误删或错误修改的数据
- 数据同步:将变更同步到其他系统或数据库
- 实时分析:分析用户行为模式
四、最佳实践
- 异步记录:操作日志记录应该异步进行,不影响主业务流程
- 批量写入:对于高并发系统,可以考虑批量写入日志
- 日志轮转:定期归档旧日志,避免日志表过大
- 敏感信息过滤:记录日志时过滤掉密码等敏感信息
- 索引优化:为常用查询字段建立索引
通过以上实现,你可以构建一个完整的操作日志系统,既包含应用层的操作记录,又能利用数据库本身的变更日志机制,为你的REST API提供全面的操作追踪能力。