Golang实现kubectl包封装工具

Golang实现kubectl包封装工具 大家好

请问有没有人见过可以封装 Kubernetes CLI 工具 kubectl 的包?

我想要一个能执行 kubectl 命令,并将输出转换为结构体的包。 我知道 client-go 的存在(这是一个用于与 Kubernetes 交互的 Go 包)。但我想要一个封装 kubectl 的包装器。

如果不行的话,有没有人能指点我找到适用于 Go 的 Kubernetes 类型(Pod、容器等)?

// Micke

3 回复

感谢您的建议。

更多关于Golang实现kubectl包封装工具的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


你好,@mickesommar

我对 kubectl 的内部实现并不熟悉,但你可能想看看 kubectl 模块参考文档,尤其是该页面底部的 pkg 目录。也许这些包能满足你的需求。不足之处在于,这些包的 API 似乎没有任何文档说明。

在Go中封装kubectl命令并解析输出,可以使用k8s.io/client-go配合k8s.io/kubectl包。以下是实现示例:

package main

import (
    "bytes"
    "encoding/json"
    "fmt"
    "os/exec"
    "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/kubernetes/scheme"
)

// KubectlWrapper 封装kubectl执行器
type KubectlWrapper struct {
    kubeconfig string
    namespace  string
}

// NewKubectlWrapper 创建wrapper实例
func NewKubectlWrapper(kubeconfig, namespace string) *KubectlWrapper {
    return &KubectlWrapper{
        kubeconfig: kubeconfig,
        namespace:  namespace,
    }
}

// Execute 执行kubectl命令
func (k *KubectlWrapper) Execute(args ...string) ([]byte, error) {
    cmdArgs := []string{}
    if k.kubeconfig != "" {
        cmdArgs = append(cmdArgs, "--kubeconfig", k.kubeconfig)
    }
    if k.namespace != "" {
        cmdArgs = append(cmdArgs, "-n", k.namespace)
    }
    cmdArgs = append(cmdArgs, args...)
    
    cmd := exec.Command("kubectl", cmdArgs...)
    var stdout, stderr bytes.Buffer
    cmd.Stdout = &stdout
    cmd.Stderr = &stderr
    
    if err := cmd.Run(); err != nil {
        return nil, fmt.Errorf("kubectl执行失败: %v, stderr: %s", err, stderr.String())
    }
    
    return stdout.Bytes(), nil
}

// GetPods 获取Pod列表并解析为结构体
func (k *KubectlWrapper) GetPods() (*v1.PodList, error) {
    output, err := k.Execute("get", "pods", "-o", "json")
    if err != nil {
        return nil, err
    }
    
    podList := &v1.PodList{}
    decoder := scheme.Codecs.UniversalDeserializer()
    _, _, err = decoder.Decode(output, nil, podList)
    if err != nil {
        // 尝试使用json解码作为备选方案
        if jsonErr := json.Unmarshal(output, podList); jsonErr != nil {
            return nil, fmt.Errorf("解析Pod列表失败: %v, json错误: %v", err, jsonErr)
        }
    }
    
    return podList, nil
}

// GetPodDetails 获取特定Pod详情
func (k *KubectlWrapper) GetPodDetails(podName string) (*v1.Pod, error) {
    output, err := k.Execute("get", "pod", podName, "-o", "json")
    if err != nil {
        return nil, err
    }
    
    pod := &v1.Pod{}
    decoder := scheme.Codecs.UniversalDeserializer()
    _, _, err = decoder.Decode(output, nil, pod)
    if err != nil {
        if jsonErr := json.Unmarshal(output, pod); jsonErr != nil {
            return nil, fmt.Errorf("解析Pod详情失败: %v, json错误: %v", err, jsonErr)
        }
    }
    
    return pod, nil
}

// 使用示例
func main() {
    wrapper := NewKubectlWrapper("", "default")
    
    // 获取Pod列表
    pods, err := wrapper.GetPods()
    if err != nil {
        fmt.Printf("获取Pod列表失败: %v\n", err)
        return
    }
    
    fmt.Printf("找到 %d 个Pod:\n", len(pods.Items))
    for _, pod := range pods.Items {
        fmt.Printf("- %s (状态: %s)\n", pod.Name, pod.Status.Phase)
    }
    
    // 获取特定Pod详情
    if len(pods.Items) > 0 {
        podDetail, err := wrapper.GetPodDetails(pods.Items[0].Name)
        if err != nil {
            fmt.Printf("获取Pod详情失败: %v\n", err)
            return
        }
        
        fmt.Printf("\nPod %s 的容器:\n", podDetail.Name)
        for _, container := range podDetail.Spec.Containers {
            fmt.Printf("- %s (镜像: %s)\n", container.Name, container.Image)
        }
    }
}

对于Kubernetes类型定义,可以直接使用client-go中的类型:

import (
    corev1 "k8s.io/api/core/v1"
    appsv1 "k8s.io/api/apps/v1"
    batchv1 "k8s.io/api/batch/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// 常用Kubernetes类型示例
var (
    // Pod相关
    podSpec = corev1.PodSpec{
        Containers: []corev1.Container{
            {
                Name:  "nginx",
                Image: "nginx:latest",
                Ports: []corev1.ContainerPort{
                    {ContainerPort: 80},
                },
            },
        },
    }
    
    // Deployment相关
    deployment = appsv1.Deployment{
        ObjectMeta: metav1.ObjectMeta{
            Name: "example-deployment",
        },
        Spec: appsv1.DeploymentSpec{
            Replicas: int32Ptr(3),
            Selector: &metav1.LabelSelector{
                MatchLabels: map[string]string{"app": "example"},
            },
            Template: corev1.PodTemplateSpec{
                ObjectMeta: metav1.ObjectMeta{
                    Labels: map[string]string{"app": "example"},
                },
                Spec: podSpec,
            },
        },
    }
    
    // Service相关
    service = corev1.Service{
        ObjectMeta: metav1.ObjectMeta{
            Name: "example-service",
        },
        Spec: corev1.ServiceSpec{
            Ports: []corev1.ServicePort{
                {Port: 80, TargetPort: intstr.FromInt(80)},
            },
            Selector: map[string]string{"app": "example"},
        },
    }
)

func int32Ptr(i int32) *int32 {
    return &i
}

如果需要更完整的封装,可以考虑使用k8s.io/cli-runtime包,它提供了kubectl的运行时功能:

import (
    "k8s.io/cli-runtime/pkg/genericclioptions"
    "k8s.io/kubectl/pkg/cmd/get"
    cmdutil "k8s.io/kubectl/pkg/cmd/util"
)

func GetPodsUsingKubectlPackage() error {
    configFlags := genericclioptions.NewConfigFlags(true)
    factory := cmdutil.NewFactory(configFlags)
    
    ioStreams := genericclioptions.IOStreams{
        In:     os.Stdin,
        Out:    os.Stdout,
        ErrOut: os.Stderr,
    }
    
    cmd := get.NewCmdGet("kubectl", factory, ioStreams)
    cmd.SetArgs([]string{"pods", "-o", "json"})
    
    return cmd.Execute()
}

这些方法提供了从简单命令执行到完整kubectl功能集成的不同封装级别。

回到顶部