基于gRPC和REST接口的Golang分布式任务调度器

基于gRPC和REST接口的Golang分布式任务调度器 分布式调度器类似于cron,但提供了通过gRPC和REST/JSON接口进行控制的功能。同时还提供了Python和Go命令行管理工具。该调度器非常简单,但对于那些学习如何使用gRPC并自动为其生成REST网关的人来说可能很有用。源代码和简要文档请访问:https://gitlab.com/andreynech/dsched

目前缺少的是服务发现系统。我正在考虑添加Zeroconf(也称为Bonjour/Avahi)支持,或者类似CORBA命名服务的集中式服务存储库。非常欢迎就这些主题提出任何想法和反馈。


更多关于基于gRPC和REST接口的Golang分布式任务调度器的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于基于gRPC和REST接口的Golang分布式任务调度器的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


基于你描述的分布式任务调度器项目,这是一个很好的学习gRPC和REST网关集成的示例。以下是对你提到的服务发现需求的反馈,以及可能的实现方式。

对于服务发现,Zeroconf 是一个轻量级的方案,适合小型或本地网络环境,而集中式服务存储库(如使用 etcd 或 Consul)更适合生产级分布式系统。我建议优先考虑 etcd,因为它与 Go 生态集成良好,且广泛用于服务发现。

以下是一个简单的示例代码,展示如何在你的调度器中集成 etcd 进行服务注册和发现。假设你的调度器使用 gRPC 服务,并在启动时注册自己。

首先,添加 etcd 客户端依赖。在 go.mod 中添加:

require go.etcd.io/etcd/client/v3 v3.5.0

然后,在调度器的主代码中,实现服务注册功能。以下是一个基本示例:

package main

import (
    "context"
    "log"
    "time"

    clientv3 "go.etcd.io/etcd/client/v3"
)

const (
    etcdEndpoint = "localhost:2379" // 假设 etcd 运行在本地
    serviceKey   = "/dsched/services" // 服务注册的键前缀
)

// registerService 向 etcd 注册服务实例
func registerService(serviceID, serviceAddr string, ttl int64) error {
    cli, err := clientv3.New(clientv3.Config{
        Endpoints:   []string{etcdEndpoint},
        DialTimeout: 5 * time.Second,
    })
    if err != nil {
        return err
    }
    defer cli.Close()

    // 创建租约,设置 TTL(生存时间)
    resp, err := cli.Grant(context.Background(), ttl)
    if err != nil {
        return err
    }

    // 将服务地址注册到 etcd,并关联租约
    key := serviceKey + "/" + serviceID
    _, err = cli.Put(context.Background(), key, serviceAddr, clientv3.WithLease(resp.ID))
    if err != nil {
        return err
    }

    // 保持租约活跃
    keepAlive, err := cli.KeepAlive(context.Background(), resp.ID)
    if err != nil {
        return err
    }
    go func() {
        for range keepAlive {
            // 保持租约更新
        }
    }()
    log.Printf("Service registered: %s -> %s\n", key, serviceAddr)
    return nil
}

func main() {
    // 示例:在调度器启动时注册
    serviceID := "scheduler-1" // 唯一服务标识
    serviceAddr := "localhost:8080" // gRPC 服务地址
    ttl := int64(10) // 租约 TTL 为 10 秒
    if err := registerService(serviceID, serviceAddr, ttl); err != nil {
        log.Fatalf("Failed to register service: %v", err)
    }

    // 这里启动你的 gRPC 服务器和 REST 网关
    // ... 剩余代码
}

对于服务发现,可以添加一个函数来从 etcd 查询可用服务:

// discoverServices 从 etcd 发现所有注册的服务
func discoverServices() (map[string]string, error) {
    cli, err := clientv3.New(clientv3.Config{
        Endpoints:   []string{etcdEndpoint},
        DialTimeout: 5 * time.Second,
    })
    if err != nil {
        return nil, err
    }
    defer cli.Close()

    resp, err := cli.Get(context.Background(), serviceKey, clientv3.WithPrefix())
    if err != nil {
        return nil, err
    }

    services := make(map[string]string)
    for _, kv := range resp.Kvs {
        services[string(kv.Key)] = string(kv.Value)
    }
    return services, nil
}

如果你选择 Zeroconf,可以使用 Go 的 github.com/grandcat/zeroconf 库。以下是简单示例:

package main

import (
    "log"
    "time"

    "github.com/grandcat/zeroconf"
)

func registerZeroconf(serviceName, serviceType string, port int) {
    server, err := zeroconf.Register(serviceName, serviceType, "local.", port, nil, nil)
    if err != nil {
        log.Fatal(err)
    }
    defer server.Shutdown()

    // 保持服务注册
    select {}
}

func main() {
    // 示例注册
    go registerZeroconf("dsched", "_http._tcp", 8080)
    // 这里启动你的服务
    time.Sleep(time.Hour) // 模拟长时间运行
}

在实际项目中,你需要根据网络环境和可扩展性需求选择方案。etcd 提供更强的一致性和可靠性,而 Zeroconf 更适合零配置的本地网络。你可以参考你的项目代码结构,将这些片段集成到现有 gRPC 服务器初始化部分。

回到顶部