golang基于Server-Sent Events实现实时更新推送的插件库Mercure的使用

Golang基于Server-Sent Events实现实时更新推送的插件库Mercure的使用

Mercure是一种协议,用于以方便、快速、可靠且省电的方式向Web浏览器和其他HTTP客户端推送数据更新。它特别适用于通过Web API提供的资源的异步和实时更新,以及响应式Web和移动应用程序。

Mercure核心概念

Mercure协议基于Server-Sent Events (SSE)技术,包含以下主要组件:

  1. Hub:中心服务器,负责接收和分发更新
  2. Publisher:发布更新的客户端
  3. Subscriber:订阅更新的客户端

Golang中使用Mercure的完整示例

1. 安装Mercure库

go get github.com/dunglas/mercure

2. 创建Mercure Hub服务

package main

import (
	"log"
	"net/http"

	"github.com/dunglas/mercure"
	"github.com/dunglas/mercure/hub"
)

func main() {
	// 配置Mercure Hub
	config := &hub.Config{
		PublisherJWTKey: []byte("your-publisher-jwt-key"), // 发布者JWT密钥
		SubscriberJWTKey: []byte("your-subscriber-jwt-key"), // 订阅者JWT密钥
		AllowAnonymous: true, // 允许匿名订阅
	}

	// 创建Hub实例
	h := hub.New(config)

	// 设置HTTP路由
	http.HandleFunc("/.well-known/mercure", h.ServeHTTP)

	// 启动HTTP服务器
	log.Println("Mercure Hub started on :8080")
	if err := http.ListenAndServe(":8080", nil); err != nil {
		log.Fatal(err)
	}
}

3. 发布更新示例

package main

import (
	"context"
	"log"
	"net/http"

	"github.com/dunglas/mercure"
)

func main() {
	// 创建Publisher客户端
	publisher := mercure.NewPublisher(
		mercure.WithPublisherJWT([]byte("your-publisher-jwt-key"), // 发布者JWT密钥
		mercure.WithHubURL("http://localhost:8080/.well-known/mercure"), // Hub地址
	)

	// 发布更新
	err := publisher.Publish(context.Background(), &mercure.Update{
		Topics: []string{"https://example.com/books/1"}, // 主题URL
		Data:   []byte(`{"status": "updated"}`), // 更新数据
	})
	if err != nil {
		log.Fatal(err)
	}

	log.Println("Update published successfully")
}

4. 订阅更新示例

package main

import (
	"context"
	"fmt"
	"log"
	"net/http"

	"github.com/dunglas/mercure"
)

func main() {
	// 创建Subscriber客户端
	subscriber := mercure.NewSubscriber(
		mercure.WithSubscriberJWT([]byte("your-subscriber-jwt-key")), // 订阅者JWT密钥
		mercure.WithHubURL("http://localhost:8080/.well-known/mercure"), // Hub地址
	)

	// 订阅主题
	ctx := context.Background()
	subscription, err := subscriber.Subscribe(ctx, []string{"https://example.com/books/1"})
	if err != nil {
		log.Fatal(err)
	}

	// 处理接收到的更新
	go func() {
		for {
			select {
			case <-ctx.Done():
				return
			case update := <-subscription.Updates():
				fmt.Printf("Received update: %s\n", update.Data)
			case err := <-subscription.Errors():
				fmt.Printf("Error: %v\n", err)
			}
		}
	}()

	// 保持程序运行
	select {}
}

高级配置选项

Mercure Hub支持多种配置选项:

config := &hub.Config{
	PublisherJWTKey: []byte("your-publisher-jwt-key"),
	SubscriberJWTKey: []byte("your-subscriber-jwt-key"),
	AllowAnonymous: true,
	CORSOrigins: []string{"*"}, // 允许所有来源的CORS请求
	PublishOrigins: []string{"*"}, // 允许所有来源的发布请求
	Transport: mercure.NewLocalTransport(), // 使用本地传输
	TopicSelectorStore: mercure.NewTopicSelectorStore(), // 主题选择器存储
}

安全考虑

  1. 始终使用JWT密钥保护发布和订阅端点
  2. 限制允许的来源(CORS)
  3. 考虑使用HTTPS加密通信
  4. 实现适当的认证和授权机制

Mercure提供了一种简单而强大的方式来实现实时更新功能,特别适合现代Web和移动应用程序。通过Golang实现,您可以轻松地将实时功能集成到您的应用中。


更多关于golang基于Server-Sent Events实现实时更新推送的插件库Mercure的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang基于Server-Sent Events实现实时更新推送的插件库Mercure的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Mercure: 基于Server-Sent Events的实时更新推送库

Mercure是一个基于Server-Sent Events (SSE)协议的实时通信协议和库,它提供了一种简单高效的方式来实现服务器到客户端的实时数据推送。下面我将介绍如何在Go中使用Mercure实现实时更新推送。

Mercure核心概念

  1. 发布者(Publisher): 负责向Mercure hub发送更新
  2. 订阅者(Subscriber): 通过SSE接收更新
  3. Hub: 中心服务器,负责接收发布者的消息并分发给订阅者

安装Mercure Hub

首先需要运行Mercure Hub服务,可以使用Docker快速启动:

docker run -e MERCURE_PUBLISHER_JWT_KEY='your-publisher-key' \
           -e MERCURE_SUBSCRIBER_JWT_KEY='your-subscriber-key' \
           -e MERCURE_EXTRA_DIRECTIVES="cors_origins http://localhost:3000" \
           -p 3000:80 \
           dunglas/mercure

Go客户端实现

1. 发布消息

package main

import (
	"bytes"
	"fmt"
	"net/http"
	"time"
)

func publishUpdate() {
	// Mercure Hub URL
	hubURL := "http://localhost:3000/.well-known/mercure"

	// 创建JWT令牌 (实际应用中应该安全生成)
	// 这里使用简单的硬编码演示
	jwtToken := "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJtZXJjdXJlIjp7InB1Ymxpc2giOlsiKiJdfX0.obDjwCgqtPuIvwBlTxUEmibbBf0zypKCNzNKP7Op2UM"

	// 准备消息数据
	update := `{
		"@id": "https://example.com/updates/1",
		"@type": "Update",
		"title": "New update available!",
		"content": "This is a real-time update from the server.",
		"date": "` + time.Now().Format(time.RFC3339) + `"
	}`

	// 创建请求
	req, err := http.NewRequest("POST", hubURL, bytes.NewBufferString(update))
	if err != nil {
		fmt.Println("Error creating request:", err)
		return
	}

	// 设置请求头
	req.Header.Set("Authorization", "Bearer "+jwtToken)
	req.Header.Set("Content-Type", "application/json")
	req.Header.Set("Link", `<http://localhost:3000/updates>; rel="mercure"`)

	// 发送请求
	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		fmt.Println("Error sending request:", err)
		return
	}
	defer resp.Body.Close()

	fmt.Println("Update published, status:", resp.Status)
}

func main() {
	publishUpdate()
}

2. 订阅消息 (客户端实现)

package main

import (
	"bufio"
	"fmt"
	"log"
	"net/http"
)

func subscribeToUpdates() {
	// Mercure Hub的订阅URL
	hubURL := "http://localhost:3000/.well-known/mercure"
	
	// 创建请求
	req, err := http.NewRequest("GET", hubURL, nil)
	if err != nil {
		log.Fatal("Error creating request:", err)
	}

	// 设置查询参数
	q := req.URL.Query()
	q.Add("topic", "http://localhost:3000/updates")
	req.URL.RawQuery = q.Encode()

	// 设置Accept头为text/event-stream
	req.Header.Set("Accept", "text/event-stream")

	// 发送请求
	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		log.Fatal("Error connecting to Mercure hub:", err)
	}
	defer resp.Body.Close()

	// 检查响应状态
	if resp.StatusCode != http.StatusOK {
		log.Fatal("Unexpected status code:", resp.StatusCode)
	}

	// 读取事件流
	scanner := bufio.NewScanner(resp.Body)
	for scanner.Scan() {
		line := scanner.Text()
		if line != "" {
			fmt.Println("Received update:", line)
		}
	}

	if err := scanner.Err(); err != nil {
		log.Fatal("Error reading event stream:", err)
	}
}

func main() {
	fmt.Println("Subscribing to updates...")
	subscribeToUpdates()
}

集成到Web应用

下面是一个完整的Go Web应用示例,集成了Mercure发布和订阅功能:

package main

import (
	"bytes"
	"encoding/json"
	"fmt"
	"html/template"
	"log"
	"net/http"
	"time"
)

const (
	hubURL     = "http://localhost:3000/.well-known/mercure"
	publisherJWT = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJtZXJjdXJlIjp7InB1Ymxpc2giOlsiKiJdfX0.obDjwCgqtPuIvwBlTxUEmibbBf0zypKCNzNKP7Op2UM"
)

type Update struct {
	ID      string `json:"@id"`
	Type    string `json:"@type"`
	Title   string `json:"title"`
	Content string `json:"content"`
	Date    string `json:"date"`
}

func publishUpdate(update Update) error {
	data, err := json.Marshal(update)
	if err != nil {
		return err
	}

	req, err := http.NewRequest("POST", hubURL, bytes.NewBuffer(data))
	if err != nil {
		return err
	}

	req.Header.Set("Authorization", "Bearer "+publisherJWT)
	req.Header.Set("Content-Type", "application/ld+json")
	req.Header.Set("Link", `<http://localhost:3000/updates>; rel="mercure"`)

	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		return err
	}
	defer resp.Body.Close()

	if resp.StatusCode != http.StatusOK {
		return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
	}

	return nil
}

func homeHandler(w http.ResponseWriter, r *http.Request) {
	tmpl := template.Must(template.New("index").Parse(`
	<!DOCTYPE html>
	<html>
	<head>
		<title>Mercure Demo</title>
	</head>
	<body>
		<h1>Real-time Updates</h1>
		<button onclick="sendUpdate()">Send Update</button>
		<div id="updates"></div>
		
		<script>
			const eventSource = new EventSource('http://localhost:3000/.well-known/mercure?topic=http://localhost:3000/updates');
			
			eventSource.onmessage = function(event) {
				const update = JSON.parse(event.data);
				const updatesDiv = document.getElementById('updates');
				updatesDiv.innerHTML += `<p><strong>${update.title}</strong>: ${update.content} (${update.date})</p>`;
			};
			
			function sendUpdate() {
				fetch('/publish', { method: 'POST' });
			}
		</script>
	</body>
	</html>
	`))
	tmpl.Execute(w, nil)
}

func publishHandler(w http.ResponseWriter, r *http.Request) {
	update := Update{
		ID:      fmt.Sprintf("http://localhost:3000/updates/%d", time.Now().Unix()),
		Type:    "Update",
		Title:   "Server Update",
		Content: "This is a real-time update sent from the server!",
		Date:    time.Now().Format(time.RFC3339),
	}

	if err := publishUpdate(update); err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}

	w.WriteHeader(http.StatusOK)
	fmt.Fprintln(w, "Update published successfully")
}

func main() {
	http.HandleFunc("/", homeHandler)
	http.HandleFunc("/publish", publishHandler)

	fmt.Println("Server started at http://localhost:8080")
	log.Fatal(http.ListenAndServe(":8080", nil))
}

安全考虑

  1. JWT安全: 在实际应用中,应该安全地生成和存储JWT令牌
  2. CORS配置: 确保正确配置CORS以限制来源
  3. HTTPS: 生产环境应该使用HTTPS
  4. 认证授权: 实现适当的认证和授权机制

优势

  1. 简单: 比WebSocket更简单的协议
  2. HTTP兼容: 基于HTTP,不需要特殊协议
  3. 自动重连: SSE内置自动重连机制
  4. 文本友好: 非常适合文本数据的实时推送

Mercure是一个强大的工具,特别适合需要实时更新但不需要全双工通信的场景,如新闻推送、实时通知、股票行情等。

回到顶部