AIOps系列 | 开发一个 K8s Chat 命令行工具


!! 大家好,我是乔克,一个爱折腾的运维工程,一个睡觉都被自己丑醒的云原生爱好者。


作者:乔克
公众号:运维开发故事
博客:https://jokerbai.com


✍ 道路千万条,安全第一条。操作不规范,运维两行泪。

在前面我们介绍了[[03.大模型入门实战]]和 [[04.Agent入门实战]],了解了 AI 开发的基本流程,本章节我们将使用讨论如何将 Kubernetes 和 AI 结合起来。

本文主要从以下几个方面进行介绍:

  • 怎么和 Kubernetes 进行交互

  • Cobra 工具库介绍

  • 开发 Chat K8s 命令行工具

Tips:文章只是起到抛砖引玉的作用,做大做强还需各位自己努力。

怎么和 Kubernetes 进行交互

Kubernetes 集群是云原生时代中最重要的基础设施,它里面运行着成千上万的 Pods,在日常的运维工作中,SRE 常用 kubectl 命令与它进行交互。

kubectl 是官方开发的客户端,已经将所有常用的操作集成到里面了。但是, 如果我们要通过接口与 Kubernetes 进行交互,就需要使用官方的一个核心库 Client-go

client-go 是 Kubernetes 官方提供的 Go 语言客户端库,用于与 Kubernetes API Server 进行交互。它让你可以通过编程的方式创建、更新、删除和查询 Kubernetes 资源(如 Pod、Deployment、Service 等),是开发 Kubernetes 控制器、Operator、自定义调度器或运维工具的核心依赖。

client-go 的核心组件主要有:

  • Clientset:最常用的客户端集合,包含对 core、apps、networking 等 API 访问。

  • Informer:监听资源变化,如 add、update、delete,避免频繁轮询。

  • Lister:快速从本地缓存读取资源,主要是配合 Informer 使用。

  • Workqueue:处理异步任务队列,常用于控制器中

  • ...

下面我们举一个简单的例子获取所有 Pod 列表。

(1)初始化一个项目

mkdir k8s-pod-list && cd k8s-pod-list
go mod init k8s-pod-list

(2)安装 client-go

go get k8s.io/client-go/kubernetes
go get k8s.io/client-go/tools/clientcmd
go get k8s.io/client-go/rest

(3)编写代码

package main

import (
    "context"
    "fmt"
    "log"
    "path/filepath"

    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/clientcmd"
    "k8s.io/client-go/util/homedir"
)

func main() {
    // 1. 构建 kubeconfig 路径(默认 ~/.kube/config)
    var kubeconfig string
    if home := homedir.HomeDir(); home != "" {
        kubeconfig = filepath.Join(home, ".kube""config")
    }

    // 2. 加载 kubeconfig 文件,生成配置
    config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
    if err != nil {
        log.Fatal(err)
    }

    // 3. 创建 Kubernetes 客户端
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        log.Fatal(err)
    }

    // 4. 获取所有 Pod
    pods, err := clientset.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{})
    if err != nil {
        log.Fatal(err)
    }

    // 5. 打印每个 Pod 的命名空间和名称
    fmt.Printf("Found %d pods:\n"len(pods.Items))
    for _, pod := range pods.Items {
        fmt.Printf("Namespace: %s, Pod: %s\n", pod.Namespace, pod.Name)
    }
}

(4)运行 go run main.go,输出如下:

> go run main.go
Found 713 pods:
Namespace: apo, Pod: apo-alertmanager-554fd4bbfc-mjp6q
Namespace: apo, Pod: apo-altinity-clickhouse-operator-cc87bb785-j9t9q
Namespace: apo, Pod: apo-backend-594c764b94-nd4zw
Namespace: apo, Pod: apo-collector-76979647fd-cmgg2
Namespace: apo, Pod: apo-front-5fbf5f4566-4rdh9
Namespace: apo, Pod: apo-grafana-57b98b9d59-f9k4h
Namespace: apo, Pod: apo-jaeger-collector-6d7bbc945d-6p95x
Namespace: apo, Pod: apo-odigos-instrumentor-8689cd45c6-xgvns

client-go 是与 Kubernetes 交互的 Go 语言标准工具,它是构建云原生工具的基础,适合用于开发自动化运维系统、Kubernetes 插件、监控工具等。

本文我们希望的是开发一个 Kubernetes 工具,单纯使用 client-go 略显麻烦,我们还需要使用另外一个 CLI 工具库——Cobra,它可以为我们开发提升很大的效率。

Cobra 工具库介绍

Cobra 是 Go 语言中最流行的命令行工具库,用于构建功能强大、结构清晰的现代 CLI(Command-Line Interface)应用程序。它被广泛用于许多知名开源项目中,例如:

  • Kubernetes (kubectl)

  • Helm

  • Docker (docker CLI)

它的核心特性有:

  • Command:支持命令及子命令,如 app serverapp config set

  • Flags:支持全局和局部 flag,如 --config-v

  • Args:命令的参数,一般用来表示操作的对象,比如 kubectl get pod,其中 pod 就是操作的对象。

  • 可以自动生成 help,输入 app --help 自动输出帮助信息。

  • 可以 shell 环境下的自动补全。

对于 Command,每个命令都是一个 &cobra.Command{},比如:

&cobra.Command{
    Use:   "hello",              // 命令用法
    Short: "简短描述",
    Long:  "长描述",
    Run: func(cmd *cobra.Command, args []string) {
        // 执行逻辑
    },
}

而 Flags 分为 全局 Flag 和 局部 Flag,取决你的 Flag 是假如的 rootCmd 还是 其他命令。

// 局部 flag
helloCmd.Flags().StringVarP(&name, "name""n""default""姓名")

// 全局 flag(加到 rootCmd)
rootCmd.PersistentFlags().Bool("verbose"false"启用详细日志")

另外,Flags 支持 StringBoolIntFloat 类型,如下:

  • String 类型

  • StringVar(&variable, "flag", "default", "description"),不带缩写

  • StringVarP(&variable, "flag", ”shorthand“, "default", "description"),带缩写

  • StringSliceVar(&variable, "flag", []string{}, "description"),可以接手数组,cli可以传入多个flag

  • StringVarP(&variable, "flag", ”shorthand“, []string{}, "description")

  • Bool 类型

  • BoolVar(&variable, "flag", "default", "description"),不带缩写

  • BoolVarP(&variable, "flag", ”shorthand“, "default", "description"),带缩写

  • Int 类型

  • IntVar(&variable, "flag", "default", "description"),不带缩写

  • IntVarP(&variable, "flag", ”shorthand“, "default", "description"),带缩写

  • Float 类型

  • FloatVar(&variable, "flag", "default", "description"),不带缩写

  • FloatVarP(&variable, "flag", ”shorthand“, "default", "description"),带缩写

下面我们开发一个最简单的 hello命令。

(1)安装 Cobra

go get -u github.com/spf13/cobra/cobra

(2)初始化项目

mkdir hello && cd hello
go mod init hello
cobra-cli init

这会自动生成:

  • cmd/root.go —— 主命令

  • main.go —— 入口

  • 自动支持 myapp --help

(3)添加子命令

cobra-cli add hello

(4)编辑 cmd/hello.go,添加逻辑

package cmd

import (
    "fmt"
    "github.com/spf13/cobra"
)

var name string

var helloCmd = &cobra.Command{
    Use:   "hello",
    Short: "打印问候语",
    Long:  一个简单的命令,用于向某人问好,
    Run: func(cmd *cobra.Command, args []string) {
        if name == "" {
            name = "World"
        }
        fmt.Printf("Hello, %s!\n", name)
    },
}

func init() {
    rootCmd.AddCommand(helloCmd)
    helloCmd.Flags().StringVarP(&name, "name""n""""输入你的名字")
}

(5)运行程序,查看输出。

go run main.go hello
# 输出: Hello, World!

go run main.go hello --name=Jokerbai
# 输出: Hello, Jokerbai!

go run main.go hello -n Bob
# 输出: Hello, Bob!

go run main.go hello --help
# 一个简单的命令,用于向某人问好
#
# Usage:
#  cobra-hello hello [flags]
#
# Flags:
#   -h, --help          help for hello
#   -n, --name string   输入你的名字

通过上面简单的示例,我们不难发现通过 Cobra,我们可以快速搭建起命令工具的架子,后续只需要结合业务对这个架子进行装修,省去不少麻烦。

开发 Chat K8s 命令行工具

上面我们对本文需要的基础工具进行了简单介绍,接下来我们将进入第一个主题:开发 Chat K8s 命令行工具。

我们这个工具需要实现的效果是:

  • Command:k8scopilot ask chatgpt 进入交互模式

  • 可以进行资源的部署,比如:

  • 帮我部署一个 Deploy,镜像是 nginx

  • 部署一个 Pod,镜像是 nginx

  • 可以查询资源信息,比如:

  • 查一下 default ns 的 Pod、SVC、Deploy 资源

  • 可以删除资源,比如:

  • 删除 default ns 下的 nginx deploy

下面我们进入正式的开发。

1.创建一个 k8scopilot目录,使用 cobra-cli 初始化项目

mkdir k8scopilot && cd k8scopilot
go mod init
cobra-cli initr

2.添加第一个 ask 命令

cobra-cli add ask

3.接着,添加一个 chatgpt 命令,它是 ask 的子命令

cobra-cli add chatgpt -p "askCmd"

4.在 chatgpt 命令中实现一个简单的功能:

  • 当用户执行 k8scopilot ask chatgpt 进入控制台输入

  • 然后用户可以在控制台中进行对话

  • 当用户输入 exit 退出

(1)增加一个 startChat 方法,用于处理生成对话回复

func startChat() {
 scanner := bufio.NewScanner(os.Stdin)
 fmt.Println("我是 K8S Copilot,你可以向我咨询 K8S 相关问题")

 for {
  fmt.Print("> ")
  if scanner.Scan() {
   input := scanner.Text()
   if input == "exit" {
    fmt.Println("退出对话")
    break
   }
   if input == "" {
    continue
   }
   fmt.Println("你输入的是:", input)
  }
 }
}

效果如下:

> .\k8scopilot.exe ask chatgpt
我是 K8S Copilot,你可以向我咨询 K8S 相关问题
> 部署资源
你输入的是: 部署资源
> exit
退出对话

(2)封装 OpenAI 客户端,方便后续调用大模型。创建一个 utils 目录,然后创建 openai.go 完成客户端封装。

package utils

import (
 "context"
 "errors"
 "net/http"
 "os"
 "time"

 "github.com/sashabaranov/go-openai"
)

const defaultBaseURL = "https://vip.apiyi.com/v1"
const defaultApiKey = "sk-xxxx"

type OpenAI struct {
 Client *openai.Client
 ctx    context.Context
}

func NewOpenAI() *OpenAI {
 apiKey := os.Getenv("OPENAI_API_KEY")
 if apiKey == "" {
  apiKey = defaultApiKey
 }
 apiBase := os.Getenv("OPENAI_API_BASE")
 if apiBase == "" {
  apiBase = defaultBaseURL
 }
 config := openai.DefaultConfig(apiKey)
 config.BaseURL = apiBase
 config.HTTPClient = &http.Client{
  Timeout: 30 * time.Second,
 }
 client := openai.NewClientWithConfig(config)

 return &OpenAI{
  Client: client,
  ctx:    context.Background(),
 }
}

func (op *OpenAI) SendMessage(prompt string, content string) (string, error) {
 req := openai.ChatCompletionRequest{
  Model: openai.GPT4oMini,
  Messages: []openai.ChatCompletionMessage{
   {
    Role:    "system",
    Content: prompt,
   },
   {
    Role:    "user",
    Content: content,
   },
  },
 }
 resp, err := op.Client.CreateChatCompletion(op.ctx, req)
 if err != nil {
  return "", err
 }
 if len(resp.Choices) == 0 {
  return "", errors.New("no response")
 }
 return resp.Choices[0].Message.Content, nil
}

我们定义了 NewOpenAI 方法用来初始化 openai,然后再定义了一个 SendMessage 方法用来和 GPT 进行交互。

(3)实现 FunctionCall,在需要根据用户输入执行不同的操作的时候,我们在《大模型入门实战》章节有讲过 FunctionCall,该项目我们将使用其实现。

这里只实现三个功能:

  • 生成 YAML 清单并部署

  • 查询 K8s 资源

  • 删除 K8s 资源

在 chatgpt.go 中实现。

首先,定义一个工具函数 createTools,用于定义 openai 的工具集。

func createTools() []openai.Tool {
 // 用来生成 K8s YAML 并部署资源
 f1 := openai.FunctionDefinition{
  Name:        "generateAndDeployResource",
  Description: "生成 K8s YAML 并部署资源",
  Parameters: jsonschema.Definition{
   Type: jsonschema.Object,
   Properties: map[string]jsonschema.Definition{
    "user_input": {
     Type:        jsonschema.String,
     Description: "用户输出的文本内容,要求包含资源类型和镜像",
    },
   },
   Required: []string{"user_input"}, // 修复:应该是 user_input 而不是 location
  },
 }

 // 用来查询 K8s 资源
 f2 := openai.FunctionDefinition{
  Name:        "queryResource",
  Description: "查询 K8s 资源",
  Parameters: jsonschema.Definition{
   Type: jsonschema.Object,
   Properties: map[string]jsonschema.Definition{
    "namespace": {
     Type:        jsonschema.String,
     Description: "资源所在的命名空间",
    },
    "resource_type": {
     Type:        jsonschema.String,
     Description: "K8s 资源标准类型,例如 Pod、Deployment、Service 等",
    },
   },
   Required: []string{"namespace""resource_type"}, // 添加必需参数
  },
 }

 // 用来删除 K8s 资源
 f3 := openai.FunctionDefinition{
  Name:        "deleteResource",
  Description: "删除 K8s 资源",
  Parameters: jsonschema.Definition{
   Type: jsonschema.Object,
   Properties: map[string]jsonschema.Definition{
    "namespace": {
     Type:        jsonschema.String,
     Description: "资源所在的命名空间",
    },
    "resource_type": {
     Type:        jsonschema.String,
     Description: "K8s 资源标准类型,例如 Pod、Deployment、Service 等",
    },
    "resource_name": {
     Type:        jsonschema.String,
     Description: "资源名称",
    },
   },
   Required: []string{"namespace""resource_type""resource_name"}, // 添加必需参数
  },
 }

 return []openai.Tool{
  {Type: openai.ToolTypeFunction, Function: &f1},
  {Type: openai.ToolTypeFunction, Function: &f2},
  {Type: openai.ToolTypeFunction, Function: &f3},
 }
}

接着,我们定义 functionCalling 方法,用于将工具注册到大模型以及执行工具调用。

func functionCalling(input string, client *utils.OpenAI) string {
 tools := createTools()

 dialogue := []openai.ChatCompletionMessage{
  {Role: openai.ChatMessageRoleUser, Content: input},
 }

 // 调用 OpenAI API
 resp, err := client.Client.CreateChatCompletion(context.TODO(),
  openai.ChatCompletionRequest{
   Model:    openai.GPT4TurboPreview,
   Messages: dialogue,
   Tools:    tools,
  },
 )
 if err != nil {
  return fmt.Sprintf("OpenAI API 调用失败: %v", err)
 }

 // 验证响应
 if len(resp.Choices) == 0 {
  return "OpenAI 返回了空的响应"
 }

 msg := resp.Choices[0].Message

 // 如果没有工具调用,直接返回消息内容
 if len(msg.ToolCalls) == 0 {
  if msg.Content != "" {
   return msg.Content
  }
  return "抱歉,我无法理解您的请求,请提供更具体的信息。"
 }

 // 处理多个工具调用(虽然当前逻辑假设只有一个)
 if len(msg.ToolCalls) > 1 {
  return "抱歉,当前不支持同时执行多个操作,请一次只执行一个操作。"
 }

 // 执行工具调用
 toolCall := msg.ToolCalls[0]
 result, err := callFunction(client, toolCall.Function.Name, toolCall.Function.Arguments)
 if err != nil {
  return fmt.Sprintf("执行操作失败: %v", err)
 }
 return result
}

在 functionCalling 中,我们调用了 callFunction 方法执行工具调用,下面实现 callFuntion ,它将根据 OpenAI 返回的消息,调用对应的函数。

// 根据 OpenAI 返回的消息,调用对应的函数
func callFunction(client *utils.OpenAI, name, arguments string) (string, error) {
 switch name {
 case "generateAndDeployResource":
  params := struct {
   UserInput string json:"user_input"
  }{}
  if err := json.Unmarshal([]byte(arguments), &params); err != nil {
   return "", fmt.Errorf("解析生成部署资源参数失败: %v", err)
  }
  return generateAndDeployResource(client, params.UserInput)

 case "queryResource":
  params := struct {
   Namespace    string json:"namespace"
   ResourceType string json:"resource_type"
  }{}
  if err := json.Unmarshal([]byte(arguments), &params); err != nil {
   return "", fmt.Errorf("解析查询资源参数失败: %v", err)
  }
  return queryResource(params.Namespace, params.ResourceType)

 case "deleteResource":
  params := struct {
   Namespace    string json:"namespace"
   ResourceType string json:"resource_type"
   ResourceName string json:"resource_name"
  }{}
  if err := json.Unmarshal([]byte(arguments), &params); err != nil {
   return "", fmt.Errorf("解析删除资源参数失败: %v", err)
  }
  return deleteResource(params.Namespace, params.ResourceType, params.ResourceName)

 default:
  return "", fmt.Errorf("未知的函数: %s", name)
 }
}

接下来,我们需要实现具体的操作方法,它们分别是:

  • generateAndDeployResource

  • queryResource

  • deleteResource

在这之前,我们需要在 utils 中封装 client-go,方便后续调用。

// utils/client_go.go
package utils

import (
 "fmt"
 "path/filepath"
 "strings"

 "k8s.io/client-go/discovery"
 "k8s.io/client-go/dynamic"
 "k8s.io/client-go/kubernetes"
 "k8s.io/client-go/tools/clientcmd"
 "k8s.io/client-go/util/homedir"
)

// ClientGo encapsulates both clientset and dynamicClient for Kubernetes operations
type ClientGo struct {
 Clientset       *kubernetes.Clientset
 DynamicClient   dynamic.Interface
 DiscoveryClient discovery.DiscoveryInterface
}

// NewClientGo initializes a new ClientGo instance with the provided kubeconfig path
func NewClientGo(kubeconfig string) (*ClientGo, error) {
 // Handle ~ in the kubeconfig path
 if strings.HasPrefix(kubeconfig, "~") {
  homeDir := homedir.HomeDir()
  kubeconfig = filepath.Join(homeDir, kubeconfig[1:])
 }

 // Build the configuration from the kubeconfig file
 config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
 if err != nil {
  return nil, fmt.Errorf("failed to build kubeconfig: %w", err)
 }

 // Create the clientset
 clientset, err := kubernetes.NewForConfig(config)
 if err != nil {
  return nil, fmt.Errorf("failed to create clientset: %w", err)
 }

 // Create the dynamic client
 dynamicClient, err := dynamic.NewForConfig(config)
 if err != nil {
  return nil, fmt.Errorf("failed to create dynamic client: %w", err)
 }

 // Create DiscoveryClient
 discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
 if err != nil {
  return nil, fmt.Errorf("failed to create discovery client: %w", err)
 }

 return &ClientGo{
  Clientset:       clientset,
  DynamicClient:   dynamicClient,
  DiscoveryClient: discoveryClient,
 }, nil
}

然后,我们再来实现具体的方法。

其中,generateAndDeployResource 是用来生成 YAML 清单以及部署资源的,其逻辑就是调用大模型生成纯净的 K8s YAML 资源,然后使用 Client-go 完成资源部署。

// 生成 K8s YAML 并部署资源
func generateAndDeployResource(client *utils.OpenAI, userInput string) (string, error) {
 // 生成 YAML 内容
 yamlContent, err := client.SendMessage("你现在是一个 K8s 资源生成器,根据用户输入生成 K8s YAML,注意除了 YAML 内容以外不要输出任何内容,此外不要把 YAML 放在 ``` 代码快里", userInput)
 if err != nil {
  return "", fmt.Errorf("生成 YAML 失败: %v", err)
 }

 // 创建 Kubernetes 客户端
 clientGo, err := utils.NewClientGo(kubeconfig) // kubeconfig 是一个全局 Flag
 if err != nil {
  return "", fmt.Errorf("创建 Kubernetes 客户端失败: %v", err)
 }

 // 获取 API 资源
 resources, err := restmapper.GetAPIGroupResources(clientGo.DiscoveryClient)
 if err != nil {
  return "", fmt.Errorf("获取 API 资源失败: %v", err)
 }

 // 将 YAML 转成 unstructured 对象
 unstructuredObj := &unstructured.Unstructured{}
 _, _, err = scheme.Codecs.UniversalDeserializer().Decode([]byte(yamlContent), nil, unstructuredObj)
 if err != nil {
  return "", fmt.Errorf("解析 YAML 失败: %v", err)
 }

 // 创建 REST mapper
 mapper := restmapper.NewDiscoveryRESTMapper(resources)
 // 从 unstructuredObj 中提取 GVK
 gvk := unstructuredObj.GroupVersionKind()
 // 用 GVK 转 GVR
 mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
 if err != nil {
  return "", fmt.Errorf("映射资源类型失败: %v", err)
 }

 // 设置默认命名空间
 namespace := unstructuredObj.GetNamespace()
 if namespace == "" {
  namespace = "default"
 }

 // 部署资源
 _, err = clientGo.DynamicClient.Resource(mapping.Resource).Namespace(namespace).Create(context.TODO(), unstructuredObj, metav1.CreateOptions{})
 if err != nil {
  return "", fmt.Errorf("部署资源失败: %v", err)
 }

 resourceName := unstructuredObj.GetName()
 resourceKind := unstructuredObj.GetKind()
 return fmt.Sprintf("✅ 成功部署 %s/%s 到命名空间 %s\n\n生成的 YAML:\n%s", resourceKind, resourceName, namespace, yamlContent), nil
}

对于 queryResource 是用来查询资源信息的,不同的资源信息有不同的 GVR,如下:

// 查询 K8s 资源
func queryResource(namespace, resourceType string) (string, error) {
 // 创建 Kubernetes 客户端
 clientGo, err := utils.NewClientGo(kubeconfig)
 if err != nil {
  return "", fmt.Errorf("创建 Kubernetes 客户端失败: %v", err)
 }

 // 标准化资源类型
 resourceType = strings.ToLower(resourceType)
 var gvr schema.GroupVersionResource
 var displayName string

 switch resourceType {
 case "deployment""deployments":
  gvr = schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}
  displayName = "Deployment"
 case "service""services""svc":
  gvr = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "services"}
  displayName = "Service"
 case "pod""pods":
  gvr = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}
  displayName = "Pod"
 case "configmap""configmaps""cm":
  gvr = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "configmaps"}
  displayName = "ConfigMap"
 case "secret""secrets":
  gvr = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "secrets"}
  displayName = "Secret"
 default:
  return "", fmt.Errorf("不支持的资源类型: %s。支持的类型: deployment, service, pod, configmap, secret", resourceType)
 }

 // 查询资源
 resourceList, err := clientGo.DynamicClient.Resource(gvr).Namespace(namespace).List(context.TODO(), metav1.ListOptions{})
 if err != nil {
  return "", fmt.Errorf("查询资源失败: %v", err)
 }

 // 格式化输出结果
 if len(resourceList.Items) == 0 {
  return fmt.Sprintf("在命名空间 '%s' 中未找到任何 %s 资源", namespace, displayName), nil
 }

 result := fmt.Sprintf("在命名空间 '%s' 中找到 %d 个 %s 资源:\n\n", namespace, len(resourceList.Items), displayName)
 for i, item := range resourceList.Items {
  result += fmt.Sprintf("%d. %s\n", i+1, item.GetName())

  // 添加一些额外信息
  if creationTime := item.GetCreationTimestamp(); !creationTime.IsZero() {
   result += fmt.Sprintf("   创建时间: %s\n", creationTime.Format("2006-01-02 15:04:05"))
  }
  result += "\n"
 }

 return result, nil
}

最后,deleteResource 是用来删除资源信息的,和 queryResource 逻辑差不多,如下:

// 删除 K8s 资源
func deleteResource(namespace, resourceType, resourceName string) (string, error) {
 // 创建 Kubernetes 客户端
 clientGo, err := utils.NewClientGo(kubeconfig)
 if err != nil {
  return "", fmt.Errorf("创建 Kubernetes 客户端失败: %v", err)
 }

 // 标准化资源类型
 resourceType = strings.ToLower(resourceType)
 var gvr schema.GroupVersionResource
 var displayName string

 switch resourceType {
 case "deployment""deployments":
  gvr = schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}
  displayName = "Deployment"
 case "service""services""svc":
  gvr = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "services"}
  displayName = "Service"
 case "pod""pods":
  gvr = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}
  displayName = "Pod"
 case "configmap""configmaps""cm":
  gvr = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "configmaps"}
  displayName = "ConfigMap"
 case "secret""secrets":
  gvr = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "secrets"}
  displayName = "Secret"
 default:
  return "", fmt.Errorf("不支持的资源类型: %s。支持的类型: deployment, service, pod, configmap, secret", resourceType)
 }

 // 检查资源是否存在
 _, err = clientGo.DynamicClient.Resource(gvr).Namespace(namespace).Get(context.TODO(), resourceName, metav1.GetOptions{})
 if err != nil {
  return "", fmt.Errorf("资源 %s/%s 在命名空间 '%s' 中不存在: %v", displayName, resourceName, namespace, err)
 }

 // 删除资源
 err = clientGo.DynamicClient.Resource(gvr).Namespace(namespace).Delete(context.TODO(), resourceName, metav1.DeleteOptions{})
 if err != nil {
  return "", fmt.Errorf("删除资源失败: %v", err)
 }

 return fmt.Sprintf("成功删除 %s/%s 从命名空间 '%s'", displayName, resourceName, namespace), nil
}

(4)我们将 functionCall 和 startChat 结合起来。

首先,增加一个 processInput 方法,主要用来初始化 openai,并将用户输入传递给大模型。

func processInput(input string) string {
 // 1. 初始化 openai
 client, err := utils.NewOpenAIClient()
 if err != nil {
  return err.Error()
 }
 // 2. 封装 utils/openai.go,调用 OpenAI API 得到回复
 // response, err := client.SendMessage("你好", input)

 // 3. 调用 OpenAI Function calling
 response := functionCalling(input, client)
 return response
}

最后,在 startChat 中调用 processInput 即可。

func startChat() {
 scanner := bufio.NewScanner(os.Stdin)
 fmt.Println("我是 K8S Copilot,你可以向我咨询 K8S 相关问题")

 for {
  fmt.Print("> ")
  if scanner.Scan() {
   input := scanner.Text()
   if input == "exit" {
    fmt.Println("退出对话")
    break
   }
   if input == "" {
    continue
   }
   response := processInput(input)
   fmt.Println(response)
  }
 }
}

(5)执行效果如下

afeac18b0fefb8fd12f032d165061797 MD5

总结

本文探讨了如何将人工智能技术与 Kubernetes 运维实践相结合,成功构建了一个名为 k8scopilot 的智能命令行助手。通过整合 client-goCobra 和 OpenAI 的 Function Calling 等关键技术,我们实现了一个能够理解自然语言指令并执行相应 Kubernetes 操作的 AI Agent。

我们首先介绍了与 Kubernetes 集群交互的核心工具 client-go,它是所有自动化操作的基石。接着,我们利用 Cobra 库高效地构建了结构清晰、易于扩展的命令行界面。最后,通过 OpenAI 的大语言模型,特别是其 Function Calling 能力,我们将模糊的自然语言转换为精确的、可编程的 API 调用,实现了从“人理解机器命令”到“机器理解人”的范式转变。

最后,求关注。如果你还想看更多优质原创文章,欢迎关注我们的公众号「运维开发故事」。

如果我的文章对你有所帮助,还请帮忙点赞、在看、转发一下,你的支持会激励我输出更高质量的文章,非常感谢!

你还可以把我的公众号设为「星标」,这样当公众号文章更新时,你会在第一时间收到推送消息,避免错过我的文章更新。


我是 乔克,《运维开发故事》公众号团队中的一员,一线运维农民工,云原生实践者,这里不仅有硬核的技术干货,还有我们对技术的思考和感悟,欢迎关注我们的公众号,期待和你一起成长!

------本页内容已结束,喜欢请分享------

© 版权声明
THE END
喜欢就支持一下吧
点赞0
分享
评论 抢沙发
运维开发故事的头像-运维开发故事

昵称

取消
昵称表情代码图片