开发 Operator 调度 GPU 实例资源池


!! 大家好,我是乔克,一个爱折腾的运维工程,一个睡觉都被自己丑醒的云原生爱好者。


作者:乔克
公众号:运维开发故事
博客:https://jokerbai.com


✍ 道路千万条,安全第一条。操作不规范,运维两行泪。

最近在学习《AIOps》相关的知识课程,为了让学习有一定的收获,所以将其进行了总结分享,如果你恰好也需要,很荣幸能帮到你。

前面我们介绍了《开发K8s Chat 命令行工具》和《开发 K8s GPT 故障诊断工具》两篇和 K8s 相关的文章,本篇文章我们将把 K8s、AI、云 三者结合起来,开发一个 AI 工具。

本章节将引入一个新的概念——K8s Operator,它是 K8s 的一种扩展形式,可以帮助用户以 K8s 声明式 API 的方式管理应用及服务,Operator 定义了一组在 Kubernetes 集群中打包和部署复杂业务应用的方法,主要是为解决特定应用或服务关于如何运行、部署及出现问题时如何处理提供的一种特定的自定义方式。比如:

  • 按需部署应用服务

  • 实现应用状态的备份和还原,完成版本升级

  • 数据库 schema 或额外的配置设置的改动

在 K8s 中我们使用的 Deployment、Daemonset、Statefulset 等这些都是 K8s 的资源,这些资源的创建、删除、更新等动作都会被称为事件,K8s 的 Controller Manager 负责事件的监听,并触发对应的动作来满足期望,这种方式就是声明式,即用户只需要关心应用程序的最终状态。当我们在使用中发现有些资源并不能满足日常的需求,对于这类需求可以使用 K8s 的自定义资源和 Operator 为应用程序提供基于 K8s 的扩展。

在这其中,CRD 就是对自定义资源的描述,如果要自定义资源,就需要先定义好 CRD,也就是介绍这个资源有什么属性,这些属性的类型、结构是怎样的。

比如 PG 的 Operator 如下:

apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
  name: postgresqls.acid.zalan.do
  labels:
    app.kubernetes.io/name: postgres-operator
  annotations:
    "helm.sh/hook": crd-install
spec:
  group: acid.zalan.do
  names:
    kind: postgresql
    listKind: postgresqlList
    plural: postgresqls
    singular: postgresql
    shortNames:
    - pg  additionalPrinterColumns:
  - name: Team
    typestring
    description: Team responsible for Postgres CLuster
    JSONPath: .spec.teamId
  - name: Version
    typestring
    description: PostgreSQL version
    JSONPath: .spec.postgresql.version
  - name: Pods
    type: integer
    description: Number of Pods per Postgres cluster
    JSONPath: .spec.numberOfInstances
  - name: Volume
    typestring
    description: Size of the bound volume
    JSONPath: .spec.volume.size

CRD 主要包括 apiVersion、kind、metadata 和 spec 四个部分。其中最关键的是 apiVersion 和 kind,apiVersion 表示资源所属组织和版本,apiVersion 一般由 APIGourp 和 Version 组成,这里的 APIGourp 是http://apiextensions.k8s.io,Version 是 v1beta1,相关信息可以通过kubectl api-resoures查看。kind 表示资源类型,这里是CustomResourceDefinition,表示是一个自定义的资源描述。

本文我们将自己开发一个 Operator 来维护 GPU 资源池的稳定,解决 AI 模型训练的基础平台的稳定性。其架构如下:

ee11ee9bb3ba2f232c0f78573956823f MD5

ee11ee9bb3ba2f232c0f78573956823f MD5

其中:

  • GPU 资源池采用的是腾讯云的竞价 GPU 实例

  • Operator 运行在 K8s 中,通过 SpootPool 控制 GPU 资源池的数量

  • 若云平台释放了某台 GPU 实例,当 Operator 监听到资源池数量和期望的不匹配,会自动补充到期望数量

Operator 的开发有多种脚手架,常用的有 operator-sdk、kubebuilder 等,这里我们将使用 kubebuilder 来完成 Operator 的开发。

前置条件

  • 准备一个可用的 K8s 集群,可以使用 kind、kubeadm、二进制等各种形式安装,如果使用 kubeadm 安装集群,可以参考 Kubernetes集群管理。

  • 安装好 kubebuilder,可以参考 kubebuild快速安装。

  • 准备好云平台的 AK,这里是采用腾讯云,其他云类似。

快速开始

1、设计 CRD

在开发之前需要先设计好 CRD(就像业务开发前先设计好表结构一样),本文的 CRD 主要包含云平台虚拟机的开通,包括最小和最大实例数,以及腾讯云 SDK 所需要的各种参数,比如地域、可用区、VPC、子网、安全组、镜像等。

最后 CRD 设计如下:

apiVersion: devops.jokerbai.com/v1
kind: Spotpool
metadata:
  labels:
    app.kubernetes.io/name: spotpool
    app.kubernetes.io/managed-by: kustomize
  name: spotpool-sample
spec:
  secretId: 密钥ID
  secretKey: 密钥Key
  region: 区域
  availabilityZone: 可用区
  instanceType: 实例类型
  minimum: 最小实例数
  maximum: 最大实例数
  subnetId: 子网ID
  vpcId: VPC ID
  securityGroupIds:
    - 安全组
  imageId: 镜像ID
  instanceChargeType: 实例付费类型

2、初始化项目

定义好 CRD 字段之后,我们先使用 kubebuilder 初始化一个 Operator 项目,命令如下:

(1)初始化项目

mkdir spotpool && cd spotpool
kubebuilder init \
  --domain jokerbai.com \
  --repo github.com/joker-bai/spotpool \
  --project-name spotpool \
  --plugins go/v4 \
  --owner "Joker Bai"

(2)创建 API

kubebuilder create api --group devops.jokerbai.com --version v1 --kind Spotpool

(3)生成后的目录结构大致如下

.
├── api
│   └── v1
│       ├── groupversion_info.go
│       ├── spotpool_types.go
│       └── zz_generated.deepcopy.go
├── bin
│   ├── controller-gen -> /root/workspace/godev/src/github.com/joker-bai/spotpool/bin/controller-gen-v0.18.0
│   └── controller-gen-v0.18.0
├── cmd
│   └── main.go
├── config
│   ├── crd
│   │   ├── kustomization.yaml
│   │   └── kustomizeconfig.yaml
│   ├── default
│   │   ├── cert_metrics_manager_patch.yaml
│   │   ├── kustomization.yaml
│   │   ├── manager_metrics_patch.yaml
│   │   └── metrics_service.yaml
│   ├── manager
│   │   ├── kustomization.yaml
│   │   └── manager.yaml
│   ├── network-policy
│   │   ├── allow-metrics-traffic.yaml
│   │   └── kustomization.yaml
│   ├── prometheus
│   │   ├── kustomization.yaml
│   │   ├── monitor_tls_patch.yaml
│   │   └── monitor.yaml
│   ├── rbac
│   │   ├── kustomization.yaml
│   │   ├── leader_election_role_binding.yaml
│   │   ├── leader_election_role.yaml
│   │   ├── metrics_auth_role_binding.yaml
│   │   ├── metrics_auth_role.yaml
│   │   ├── metrics_reader_role.yaml
│   │   ├── role_binding.yaml
│   │   ├── role.yaml
│   │   ├── service_account.yaml
│   │   ├── spotpool_admin_role.yaml
│   │   ├── spotpool_editor_role.yaml
│   │   └── spotpool_viewer_role.yaml
│   └── samples
│       ├── devops.jokerbai.com_v1_spotpool.yaml
│       └── kustomization.yaml
├── Dockerfile
├── go.mod
├── go.sum
├── hack
│   └── boilerplate.go.txt
├── internal
│   └── controller
│       ├── spotpool_controller.go
│       ├── spotpool_controller_test.go
│       └── suite_test.go
├── Makefile
├── PROJECT
├── README.md
└── test
    ├── e2e
    │   ├── e2e_suite_test.go
    │   └── e2e_test.go
    └── utils
        └── utils.go

3、CRD 开发

(1)定义 API

api/v1alpha1/spotpool_types.go中定义 CRD 的结构体,如下:

package v1

import (
 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// EDIT THIS FILE!  THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required.  Any new fields you add must have json tags for the fields to be serialized.

// SpotpoolSpec defines the desired state of Spotpool
type SpotpoolSpec struct {
 // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
 // Important: Run "make" to regenerate code after modifying this file
 SecretId           string   json:"secretId,omitempty"
 SecretKey          string   json:"secretKey,omitempty"
 Region             string   json:"region,omitempty"
 AvaliableZone      string   json:"availabilityZone,omitempty"
 InstanceType       string   json:"instanceType,omitempty"
 SubnetId           string   json:"subnetId,omitempty"
 VpcId              string   json:"vpcId,omitempty"
 SecurityGroupId    []string json:"securityGroupIds,omitempty"
 ImageId            string   json:"imageId,omitempty"
 InstanceChargeType string   json:"instanceChargeType,omitempty"
 Minimum            int32    json:"minimum,omitempty"
 Maximum            int32    json:"maximum,omitempty"
}

// SpotpoolStatus defines the observed state of Spotpool
type SpotpoolStatus struct {
 // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
 // Important: Run "make" to regenerate code after modifying this file
 Size       int32              json:"size,omitempty"
 Conditions []metav1.Condition json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type" protobuf:"bytes,rep,name=conditions"
 Instances  []Instances        json:"instances,omitempty"
}

type Instances struct {
 InstanceId string json:"instanceId,omitempty"
 PublicIp   string json:"publicIp,omitempty"
}

//+kubebuilder:object:root=true
//+kubebuilder:subresource:status

// Spotpool is the Schema for the spotpools API
type Spotpool struct {
 metav1.TypeMeta   json:",inline"
 metav1.ObjectMeta json:"metadata,omitempty"

 Spec   SpotpoolSpec   json:"spec,omitempty"
 Status SpotpoolStatus json:"status,omitempty"
}

//+kubebuilder:object:root=true

// SpotpoolList contains a list of Spotpool
type SpotpoolList struct {
 metav1.TypeMeta json:",inline"
 metav1.ListMeta json:"metadata,omitempty"
 Items           []Spotpool json:"items"
}

func init() {
 SchemeBuilder.Register(&Spotpool{}, &SpotpoolList{})
}

在 SpotpoolSpec 中定义设计的 CRD 结构体,这些字段都是创建虚拟机的必要字段。另外,在 SpotpoolStatus 中定义返回状态里的信息,这里只需要 Instance 相关的信息。

(2)生成代码

API 相关的代码开发完后,执行以下命令生成代码:

make generate
make manifests

4、Controller 开发

(1)开发控制器逻辑

控制器的主逻辑是:

  • 从云平台获取运行的实例数

  • 判断实例数和期望的实例数是否相等

  • 如果小于期望值,则创建实例

  • 如果大于期望值,则删除实例

所以主逻辑的代码如下,修改internal/controller/spotpool_controller.go

func (r *SpotpoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
 log := logf.FromContext(ctx)

 // 获取用户期望
 spotpool := &devopsjokerbaicomv1.Spotpool{}
 if err := r.Get(ctx, req.NamespacedName, spotpool); err != nil {
  log.Error(err, "unable to fetch spotspool")
 }

 // 从云平台获取获取运行的实例
 runningVmList, err := r.getRunningInstanceIds(spotpool)
 if err != nil {
  log.Error(err, "get running vm instance failed")
  // 十秒后重试
  return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
 }

 runningCount := len(runningVmList)

 switch {
 case runningCount < int(spotpool.Spec.Minimum):
  // 创建实例扩容
  delta := spotpool.Spec.Minimum - int32(runningCount)
  log.Info("creating instances""delta", delta)
  err = r.runInstances(spotpool, delta)
  if err != nil {
   log.Error(err, "unable to create instances")
   return ctrl.Result{RequeueAfter: 40 * time.Second}, nil
  }
 case runningCount > int(spotpool.Spec.Maximum):
  // 删除实例缩容
  delta := int32(runningCount) - spotpool.Spec.Maximum
  log.Info("terminating instances""delta", delta)
  err = r.terminateInstances(spotpool, delta)
  if err != nil {
   log.Error(err, "unable to terminate instances")
   return ctrl.Result{RequeueAfter: 40 * time.Second}, nil
  }
 }

 return ctrl.Result{RequeueAfter: 40 * time.Second}, nil
}

其中:

  • r.getRunningInstanceIds(spotpool) 用户获取云平台运行的实例数

  • r.runInstances(spotpool, delta) 用于调用云平台进行扩容

  • r.terminateInstances(spotpool, delta) 用于调用云平台进行缩容

接下来分别实现上面的三个方法。

(1)首先,实现 getRunningInstanceIds 方法

func (r *SpotpoolReconciler) getRunningInstanceIds(spotpool *devopsjokerbaicomv1.Spotpool) ([]string, error) {
 client, err := r.createCVMClient(spotpool.Spec)
 if err != nil {
  return nil, err
 }

 request := cvm.NewDescribeInstancesRequest()
 response, err := client.DescribeInstances(request)
 if err != nil {
  return nil, err
 }
 var instances []devopsjokerbaicomv1.Instances
 var runningInstanceIDs []string
 for _, instance := range response.Response.InstanceSet {
  if *instance.InstanceState == "RUNNING" || *instance.InstanceState == "PENDING" || *instance.InstanceState == "STARTING" {
   runningInstanceIDs = append(runningInstanceIDs, *instance.InstanceId)
  }
  // 检查实例的公网 IP,如果不存在公网 IP,则继续重试
  if len(instance.PublicIpAddresses) == 0 {
   return nil, fmt.Errorf("instance %s does not have public ip", *instance.InstanceId)
  }
  instances = append(instances, devopsjokerbaicomv1.Instances{
   InstanceId: *instance.InstanceId,
   PublicIp:   *instance.PublicIpAddresses[0],
  })
 }
 // 更新 status
 spotpool.Status.Instances = instances
 err = r.Status().Update(context.Background(), spotpool)
 if err != nil {
  return nil, err
 }
 return runningInstanceIDs, nil
}

// 获取腾讯云 SDK client
func (r *SpotpoolReconciler) createCVMClient(spec devopsjokerbaicomv1.SpotpoolSpec) (*cvm.Client, error) {
 credential := common.NewCredential(spec.SecretId, spec.SecretKey)
 cpf := profile.NewClientProfile()
 cpf.HttpProfile.ReqMethod = "POST"
 cpf.HttpProfile.ReqTimeout = 30
 cpf.SignMethod = "HmacSHA1"

 client, err := cvm.NewClient(credential, spec.Region, cpf)
 if err != nil {
  return nil, err
 }
 return client, nil
}

其中:

  • 调用 r.createCVMClient(spotpool.Spec) 获取腾讯云SDK client

  • 然后调用 client.DescribeInstances(request) 获取实例详细信息

  • 最后通过判断 instance.InstanceStat 和 instance.PublicIpAddresses 的状态信息决定是否是需要的实例

  • 最后返回实例列表信息

(2)实现 r.runInstances(spotpool, delta) 用于调用云平台进行扩容

func (r *SpotpoolReconciler) runInstances(spotpool *devopsjokerbaicomv1.Spotpool, count int32error {
 client, err := r.createCVMClient(spotpool.Spec)
 if err != nil {
  return err
 }
 request := cvm.NewRunInstancesRequest()
 request.ImageId = common.StringPtr(spotpool.Spec.ImageId)
 request.Placement = &cvm.Placement{
  Zone: common.StringPtr(spotpool.Spec.AvaliableZone),
 }
 request.InstanceChargeType = common.StringPtr(spotpool.Spec.InstanceChargeType)
 request.InstanceCount = common.Int64Ptr(int64(count))
 request.InstanceName = common.StringPtr("spotpool" + time.Now().Format("20060102150405"))
 request.InstanceType = common.StringPtr(spotpool.Spec.InstanceType)
 request.InternetAccessible = &cvm.InternetAccessible{
  InternetChargeType:      common.StringPtr("BANDWIDTH_POSTPAID_BY_HOUR"),
  InternetMaxBandwidthOut: common.Int64Ptr(1),
  PublicIpAssigned:        common.BoolPtr(true),
 }
 request.LoginSettings = &cvm.LoginSettings{
  Password: common.StringPtr("Password123"),
 }
 request.SecurityGroupIds = common.StringPtrs(spotpool.Spec.SecurityGroupId)
 request.SystemDisk = &cvm.SystemDisk{
  DiskType: common.StringPtr("CLOUD_BSSD"),
  DiskSize: common.Int64Ptr(100),
 }
 request.VirtualPrivateCloud = &cvm.VirtualPrivateCloud{
  SubnetId: common.StringPtr(spotpool.Spec.SubnetId),
  VpcId:    common.StringPtr(spotpool.Spec.VpcId),
 }

 // print request
 fmt.Println(request.ToJsonString())

 // 创建实例
 response, err := client.RunInstances(request)
 if _, ok := err.(*errors.TencentCloudSDKError); ok {
  return err
 }
 // other errors
 if err != nil {
  return err
 }

 // 获取到返回的 instancesid
 instanceIds := make([]string0len(response.Response.InstanceIdSet))
 for _, instanceId := range response.Response.InstanceIdSet {
  instanceIds = append(instanceIds, *instanceId)
 }

 fmt.Println("run instances success", instanceIds)
 // 更新 status
 _, err = r.getRunningInstanceIds(spotpool)
 if err != nil {
  return err
 }
 return nil
}

这个方法主要是调用 client.RunInstances(request) 进行实例创建,然后调用 r.getRunningInstanceIds(spotpool) 更新 status 的状态信息。

(3)开发r.terminateInstances(spotpool, delta) 用于调用云平台进行缩容

func (r *SpotpoolReconciler) terminateInstances(spotpool *devopsjokerbaicomv1.Spotpool, count int32error {
 client, err := r.createCVMClient(spotpool.Spec)
 if err != nil {
  return err
 }

 runningInstances, err := r.getRunningInstanceIds(spotpool)
 if err != nil {
  return err
 }

 instancesIds := runningInstances[:count]
 request := cvm.NewTerminateInstancesRequest()
 request.InstanceIds = common.StringPtrs(instancesIds)

 // 获取返回
 response, err := client.TerminateInstances(request)
 if _, ok := err.(*errors.TencentCloudSDKError); ok {
  return err
 }
 // other errors
 if err != nil {
  return err
 }

 fmt.Println("Terminate response: ", response)
 fmt.Println("terminate instances success", instancesIds)

 // 更新 status
 _, err = r.getRunningInstanceIds(spotpool)
 if err != nil {
  return err
 }
 return nil
}

删除实例和创建实例的实现逻辑类似,先调用 client.TerminateInstances(request) 进行删除,然后调用 r.getRunningInstanceIds(spotpool) 更新状态。

上面三个步骤完成了主要逻辑开发,可以初步实现具体的效果,如果希望功能更健全,则需要对其进行开发优化。

部署和测试

1、本地测试

# 安装 CRD
make install
# 运行 controller
make run

2、创建 Spotpool 实例测试

(1)创建 Spotpool 资源清单,编辑 config/samples/devops.jokerbai.com_v1_spotpool.yaml

apiVersion: devops.jokerbai.com.jokerbai.com/v1
kind: Spotpool
metadata:
  labels:
    app.kubernetes.io/name: spotpool
    app.kubernetes.io/managed-by: kustomize
  name: spotpool-sample
spec:
  secretId: xxx
  secretKey: xxx
  region: ap-singapore
  availabilityZone: ap-singapore-2
  instanceType: "GN7.2XLARGE32"
  minimum: 2
  maximum: 2
  subnetId: DEFAULT
  vpcId: DEFAULT
  securityGroupIds:
    - sg-xxx
  imageId: img-xxx
  instanceChargeType: SPOTPAID

(2)运行资源清单

# 创建实例
kubectl apply -f config/samples/devops.jokerbai.com_v1_spotpool.yaml

# 查看状态
kubectl get spotpool

(3)构建并部署到集群

# 构建镜像
make docker-build docker-push IMG=<your-registry>/spotpool:v1

# 部署到集群
make deploy IMG=<your-registry>/spotpool:v1

(4)清理

# 删除 operator
make undeploy

# 删除 CRD
make uninstall

最后

本文通过结合 Kubernetes、AI 和云平台,深入探讨了如何利用 K8s Operator 实现对 GPU 资源池的自动化管理。我们从 Operator 的核心概念出发,介绍了 CRD(自定义资源定义)和控制器的设计原理,并基于 kubebuilder 开发了一个名为 Spotpool 的 Operator,用于在腾讯云上维护竞价实例的稳定运行。

整个开发过程遵循“声明式 API”的思想,用户只需定义期望的状态(如最小/最大实例数),Operator 便会在后台持续监控并自动调整实际状态,确保资源池始终符合预期。这不仅极大地简化了运维操作,也提升了 AI 模型训练平台的稳定性和弹性。

Operator 是云原生时代自动化运维的重要利器。掌握其开发方法,意味着我们不仅能“用好” Kubernetes,更能“扩展” Kubernetes,为复杂业务场景提供定制化的解决方案。

最后,求关注。如果你还想看更多优质原创文章,欢迎关注我们的公众号「运维开发故事」。

如果我的文章对你有所帮助,还请帮忙点赞、在看、转发一下,你的支持会激励我输出更高质量的文章,非常感谢!

你还可以把我的公众号设为「星标」,这样当公众号文章更新时,你会在第一时间收到推送消息,避免错过我的文章更新。


我是 乔克,《运维开发故事》公众号团队中的一员,一线运维农民工,云原生实践者,这里不仅有硬核的技术干货,还有我们对技术的思考和感悟,欢迎关注我们的公众号,期待和你一起成长!

------本页内容已结束,喜欢请分享------

© 版权声明
THE END
喜欢就支持一下吧
点赞0
分享
评论 抢沙发
乔克叔叔的头像-运维开发故事

昵称

取消
昵称表情代码图片