前言
Kubernetes 现在已经成为了事实的云原生分布式操作系统,其最大的优势在于扩展性,比如在计算,存储,网络都可以根据使用者的需求进行扩展。另外一个重要扩展就是 Custom Resource 特性,通过 Custom Resource 开发者可以定义自己的资源,然后实现对应的 Operator 来调和实现自身的控制逻辑。
以前开发 Operator 需要开发者实现资源的监听,资源事件的队列化,以及后面的整套控制逻辑,比较繁琐,正因为如此,市场上出现了多款开发 Operator 的脚手架,比较常用的有 Operator-SDK 和 Kubebuilder,他们两者其实都是对 Controller Runtime(Kubernetes SIG 官方封装和抽象的开发 Operator 的公共库) 的封装,Operator-SDK 是 CoreOS 出品,Kubebuilder 则是 Kubernetes-SIG 官方团队原生打造,因此我们的教程是用 Kubebuilder 来示范开发自己的 Operator。
原理篇
Kubebuilder 脚手架生成 Operator 的代码后,开发者只需要在 Reconciler 里面实现自己的控制逻辑,下图中除 Reconciler 外,其它部分的都是 Kubebuilder 自动生成的。生成的代码底层直接依赖 Controller Runtime 这个 Kubernetes SIG 维护的核心库,但是这个库大家了解的不多,这就让它成为了黑盒,开发者在开发自己的 Operator 的时候往往会心里没底,因此我画出了完整的原理图,然后根据该图做详细的解释,下图就是整体的原理图:
我们先把每个核心概念在下面介绍一下:
GVK & GVR
-
GVK = Group + Version + Kind 组合而来的,资源种类描述术语,例如 deployment kind 的 GVK 是 apps/v1/deployments,用来唯一标识某个种类资源
- GVR = Group + Version + Resource 组合而来的,资源实例描述术语,例如某个 deployment 的 name 是 sample,那么它的 GVR 则是 apps/v1/sample,用来唯一标识某个类型资源的具体对象
- Group 是相关 API 功能集合,每个 Group 拥有一个或多个 Version,用于接口的演进,Kind 关联着一个 Package 中定义的 Go Type,比如 apps/v1/deployment 就关联着 Kubernetes 源码里面的 k8s.io/api/apps/v1 package 中的 Deployment 的 struct,自然 GVK 实例化出来的资源对象就是 GVR
CRD & CR
CRD 即 Custom Resource Definition,是 Kubernetes 提供给开发者自定义类型资源的功能,开发者自定义 CRD 然后实现该 CRD 对应的 Operator 来完成对应的控制逻辑,CRD 是通知 Kubernetes 平台存在一种新的资源,CR 则是该 CRD 定义的具体的实例对象,CRD 就是某个类型的 GVK,而 CR 则对应 GVR 表示某个具体资源类型的对象。
Scheme
这里存储了 GVK 对应的 Go Type 的映射关系,相反也存储了 Go Type 对应 GVK 的映射关系,也就是说给定 Go Type 就知道他的 GVK,给定 GVK 就知道他的 Go Type,上图中 Kubebuilder 生成的代码里就自动生成了 Scheme,该Scheme里面存储了Kubernetes 原生资源和自定义的 CRD 的 GVK 和 Go Type的映射关系,例如我们收到 Kubernetes APIServer 的 GVR 的 JSON 数据之后如下:
{
"kind": "MyJob",
"apiVersion": "myjob.github.com/v1beta1",
...
}
根据 JSON 数据里面的 kind 和 apiVersion 字段即获得了 GVK,然后就能根据 GVK 获得 Go Type 来反序列化出对应的 GVR。
Manager
Controller Runtime 抽象的最外层管理对象,负责管理内部的 Controller,Cache,Client 等对象。
Cache
负责管理 GVK 对应的 Share Informer,GVK 和 Share Informer 是一一对应的,一个 GVK 只会存在对应的一个 Share Informer,里面管理的 Share Informer 只有 Controller Watch 才会创建出 GVK 对应的 Share Informer,然后上层所有的 Controller 根据 GVK 共享该 Share Informer,Share Informer 会负责监听对应 GVK 的 GVR 的创建/删除/更新操作,然后通知所有 Watch 该 GVK 的 Controller,Controller 将对应的资源名称添加到 Queue里面,最终触发开发者的 Reconciler 的调和。
Client
Reconciler 对资源的创建/删除/更新操作都是通过该对象去操作,里面分为两种 Client:
-
Read Client 则是对应资源的读操作,该操作不会去访问 Kubernetes APIServer,而是去访问 GVK 对应的 Share Informer 对应的本地缓存
-
Write Client 则是对应资源的写操作,该操作则会直接去访问 Kubernetes APIServer
开发者不用去选择使用哪种 Client,而是直接去使用从 Manager 对象获取到的 Client 然后使用 Create/Update/Delete 接口去操作对应的 GVR,Client 里面会自动帮你完成对应的操作。
Controller
该对象跟开发者要实现的逻辑 Reconciler 是一一对应的关系,里面有创建的带限速功能的 Queue,以及该 Controller 关注 GVK 的 Watcher,一个 Controller 可以关注很多 GVK,该关注会根据 GVK 到 Cache 里面找到对应的 Share Informer 去 Watch 资源,Watch 到的事件会加入到 Queue里面,Queue 最终触发开发者的 Reconciler 的调和。
Reconciler
接收 Controller 发送给自己的 GVR 事件,然后从 Cache 中读取出 GVR 的当前状态,经过自己的控制逻辑,通过 Client 向 Kubernetes APIServer 更新 GVR 资源,开发者只需要在 Reconciler 实现自己的控制逻辑,示意图如下:
我们以 MyJob CRD 这个 Operator 示例来说明整个流程:
-
初始化 Scheme,将 Kubernetes 的原生资源以及 MyJob 资源的 GVK 和 Go Type 注册进去
-
初始化 Manager,会将上面初始完毕的 Scheme 传入进去,Manager 内部会初始化 Cache 和 Client
-
初始化 Reconciler,同时将该 Reconciler 注册到 Manager,同时会在 Manager 里面初始化一个 Controller 与该 Reconciler对应
-
Reconciler Watch MyJob 和 Pod 资源
-
Watch MyJob 资源,Controller 会从 Cache 里面去获取 MyJob 的 Share Informer,如果没有则创建,然后对该 Share Informer 进行 Watch,将得到的 MyJob 的名字和 Namespace 扔进 Queue
-
Watch Pod 资源,Controller 会从 Cache 里面去获取 Pod 的 Share Informer,如果没有则创建,然后对该 Share Informer 进行 Watch,将得到的 Pod 资源的 Owner 是 MyJob 的名字和 Namespace 扔进 Queue
5. 最终 Controller 将所有 Watch 的资源事件扔到 Queue后,Controller 会将 Queue 里的 MyJob 的名字和 Namespace 去触发 Reconciler 的 Reconcile 接口进行调和
6. 开发者只需要在 Reconciler 里面接收到对应 GVR 的事件去完成对应的控制逻辑,上面的步骤则直接由 Kubebuilder 生成的代码自动完成
最后有了以上的核心概念之后,我们可以总结出一个完整的 Operator 概念层级图:
实践篇
Kubebuilder 的安装请参考官方教程
1. 初始化项目
kubebuilder init --domain github.com
2. 创建CRD
kubebuilder create api --group myjob --version v1beta1 --kind MyJob
上面命令执行完毕后项目结构如下:
├── Dockerfile
├── Makefile
├── PROJECT // Kubebuilder 自动生成的项目元数据
├── README.md
├── api
│ └── v1beta1
│ ├── groupversion_info.go // GV(GroupVersion) 定义以及 CRD 向 Scheme 注册的方法
│ ├── myjob_types.go // 自定义 CRD 对应的 struct 的地方
│ └── zz_generated.deepcopy.go // Kubebuilder 工具自动生成的 GVR DeepCopy的方法
├── bin
│ └── manager
├── config
│ ├── certmanager
│ │ ├── certificate.yaml
│ │ ├── kustomization.yaml
│ │ └── kustomizeconfig.yaml
│ ├── crd // 部署 CRD 的相关 Yaml 集合
│ │ ├── bases
│ │ │ └── myjob.github.com_myjobs.yaml
│ │ ├── kustomization.yaml
│ │ ├── kustomizeconfig.yaml
│ │ └── patches
│ │ ├── cainjection_in_myjobs.yaml
│ │ └── webhook_in_myjobs.yaml
│ ├── default // 使用 Kustomize 部署该 Operator 的一个默认 Yaml 集合,它以 crd,rbac,manager 为 base,具体的可以去学习 Kustomize 相关的用法
│ │ ├── kustomization.yaml
│ │ ├── manager_auth_proxy_patch.yaml
│ │ ├── manager_webhook_patch.yaml
│ │ └── webhookcainjection_patch.yaml
│ ├── manager // 部署 Operator 的相关 Yaml 集合
│ │ ├── kustomization.yaml
│ │ └── manager.yaml
│ ├── prometheus // Operator 运行监控相关的 Yaml 集合
│ │ ├── kustomization.yaml
│ │ └── monitor.yaml
│ ├── rbac // Operator 部署需要的 RBAC 权限相关的 Yaml 集合
│ │ ├── auth_proxy_client_clusterrole.yaml
│ │ ├── auth_proxy_role.yaml
│ │ ├── auth_proxy_role_binding.yaml
│ │ ├── auth_proxy_service.yaml
│ │ ├── kustomization.yaml
│ │ ├── leader_election_role.yaml
│ │ ├── leader_election_role_binding.yaml
│ │ ├── myjob_editor_role.yaml
│ │ ├── myjob_viewer_role.yaml
│ │ ├── role.yaml
│ │ └── role_binding.yaml
│ ├── samples // 部署一个 CR 示例的 Yaml
│ │ └── myjob_v1beta1_myjob.yaml
│ └── webhook
│ ├── kustomization.yaml
│ ├── kustomizeconfig.yaml
│ └── service.yaml
├── controllers
│ ├── myjob_controller.go // 开发者实现 Reconciler,完成控制逻辑的文件,对应上面原理图的 Reconciler
│ ├── myjob_controller_test.go
│ └── suite_test.go
├── cover.out
├── go.mod
├── go.sum
├── hack
│ └── boilerplate.go.txt
└── main.go // 该文件是 Kubebuilder 自动生成的,该文件里面会会对应上面原理图中的 Scheme 初始化,以及 Manager 的初始化,然后将 Reconciler 添加到 Manager 中
3. 定义 CRD
对应 api/v1beta1/myjob_types.go 文件:
package v1beta1
import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
const (
// myjob 刚创建的时候默认状态
MyJobPending = "pending"
// myjob 管理的 pod 创建后对应的状态
MyJobRunning = "running"
// myjob 管理的 pod 执行完成后对应的状态
MyJobCompleted = "completed"
)
// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.
// MyJobSpec defines the desired state of MyJob
type MyJobSpec struct {
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
// Important: Run "make" to regenerate code after modifying this file
Template v1.PodTemplateSpec `json:"template" protobuf:"bytes,6,opt,name=template"`
}
// MyJobStatus defines the observed state of MyJob
type MyJobStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// Important: Run "make" to regenerate code after modifying this file
// +optional
Phase string `json:"phase,omitempty"`
}
func (j *MyJobStatus) SetDefault(job *MyJob) bool {
changed := false
if job.Status.Phase == "" {
job.Status.Phase = MyJobPending
changed = true
}
return changed
}
// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// MyJob is the Schema for the myjobs API
type MyJob struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec MyJobSpec `json:"spec,omitempty"`
Status MyJobStatus `json:"status,omitempty"`
}
func (j *MyJob) StatusSetDefault() bool {
return j.Status.SetDefault(j)
}
// +kubebuilder:object:root=true
// MyJobList contains a list of MyJob
type MyJobList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []MyJob `json:"items"`
}
func init() {
SchemeBuilder.Register(&MyJob{}, &MyJobList{})
}
MyJob 的逻辑我们以简单为主,主要说明整个的开发流程,每个 MyJob 都会只创建一个与自己名字和 Namespace 一模一样的 Pod,MyJob 初始状态为 Pending,当对应的 Pod 创建出来,则 MyJob 的状态变成 Running,当 Pod 执行完毕变成 Succeeded 或者 Failed 或者 正在被删除后,则 MyJob 的状态变成 Completed 状态。
4. 开发控制器逻辑
该 Operator 需要创建 Pod,因此需要给该 Operator 创建 Pod 的权限,Kubebuilder 支持自动生成 Operator 的 RBAC,但是需要开发者在控制逻辑加上标识,此处我们加上对 Pod 有读写的权限的标识:
// +kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch;create;update;patch;delete
标识的具体参考 Kubebuilder标记教程,这样最后在部署的时候会根据开发者添加的这些标识由工具自动生成对应的 RBAC Yaml 文件。
对应 controllers/myjob_controller.go 文件:
package controllers
import (
"context"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/source"
myjobv1beta1 "github.com/sky-big/myjob-operator/api/v1beta1"
)
// MyJobReconciler reconciles a MyJob object
type MyJobReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
}
// +kubebuilder:rbac:groups=myjob.github.com,resources=myjobs,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=myjob.github.com,resources=myjobs/status,verbs=get;update;patch
// 打上该控制器需要 Pod 所有权限标识,Kubebuilder 在生成 RBAC 的时候会读取该标识然后生成对 Pod 的权限
// +kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch;create;update;patch;delete
func (r *MyJobReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
ctx := context.Background()
logger := r.Log.WithValues("myjob", req.NamespacedName)
// your logic here
j := &myjobv1beta1.MyJob{}
if err := r.Get(ctx, req.NamespacedName, j); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
// 设置 MyJob 的 Status 的默认值
if j.StatusSetDefault() {
if err := r.Status().Update(ctx, j); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{Requeue: true}, nil
}
// Pod 不存在则创建 Pod,如果存在检查 Pod 的状态
p := &corev1.Pod{}
err := r.Get(ctx, req.NamespacedName, p)
if err == nil {
// MyJob 的状态还是 Pending,但是对应的 Pod 已经创建,则将 MyJob 的状态置为 Running
if !isPodCompleted(p) && myjobv1beta1.MyJobRunning != j.Status.Phase {
j.Status.Phase = myjobv1beta1.MyJobRunning
if err := r.Status().Update(ctx, j); err != nil {
return ctrl.Result{}, err
}
logger.Info("myjob phase changed", "Phase", myjobv1beta1.MyJobRunning)
}
// MyJob 对应的 Pod 已经执行完毕,则将 MyJob 的状态置为 Completed
if isPodCompleted(p) && myjobv1beta1.MyJobRunning == j.Status.Phase {
j.Status.Phase = myjobv1beta1.MyJobCompleted
if err := r.Status().Update(ctx, j); err != nil {
return ctrl.Result{}, err
}
logger.Info("myjob phase changed", "Phase", myjobv1beta1.MyJobCompleted)
}
} else if err != nil && errors.IsNotFound(err) {
// 创建 MyJob 对应的 Pod
pod := makePodByMyJob(j)
if err := controllerutil.SetControllerReference(j, pod, r.Scheme); err != nil {
return ctrl.Result{}, err
}
if err := r.Create(ctx, pod); err != nil && !errors.IsAlreadyExists(err) {
return ctrl.Result{}, err
}
logger.Info("myjob create pod success")
} else {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
func (r *MyJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
c := ctrl.NewControllerManagedBy(mgr)
// 监视拥有者是 MyJob 类型的 Pod,同时将 Pod 的拥有者 MyJob 扔进处理队列中,对 MyJob 进行调和
c.Watches(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForOwner{
IsController: true,
OwnerType: &myjobv1beta1.MyJob{},
})
return c.For(&myjobv1beta1.MyJob{}).
Complete(r)
}
func isPodCompleted(pod *corev1.Pod) bool {
if corev1.PodSucceeded == pod.Status.Phase ||
corev1.PodFailed == pod.Status.Phase ||
pod.DeletionTimestamp != nil {
return true
}
return false
}
func makePodByMyJob(j *myjobv1beta1.MyJob) *corev1.Pod {
return &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: j.Name,
Namespace: j.Namespace,
},
Spec: *j.Spec.Template.Spec.DeepCopy(),
}
}
5. 编写集成测试
Kubebuilder 使用了 Controller-Runtime 提供的 envtest 来帮助开发者来写集成测试,这个包会帮助你单独启动 Kubernetes APIServer 以及 Etcd 服务(注意 Kubebuilder 官方安装包里面会包含这两个服务的可执行文件,如果开发者是自己编译部署安装的 Kubebuilder,则开发者需要单独安装这两个服务的可执行文件),这两个进程专门用来帮助你进行集成测试,请注意这两个服务是真实启动在你的开发机器上的,因此要注意 APIServer 以及 Etcd 对应的端口不要被占用,同时你要启动上面原理图的 Manager 以及你的 Reconciler,Manager 的监控服务会占用 8080 端口也需要特别注意不被占用或者自己指定其它端口,我们按照以下步骤来完成集成测试的开发:
1. Kubebuilder 在生成的代码里面会在 controllers 目录下生成文件 suite_test.go 文件,里面已经帮助你启动了 envtest,但是我们还要在此文件里添加启动我们自己的 Manager 以及 Reconciler,代码如下:
package controllers
import (
"path/filepath"
"testing"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
"sigs.k8s.io/controller-runtime/pkg/envtest/printer"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
myjobv1beta1 "github.com/sky-big/myjob-operator/api/v1beta1"
// +kubebuilder:scaffold:imports
)
// These tests use Ginkgo (BDD-style Go testing framework). Refer to
// http://onsi.github.io/ginkgo/ to learn more about Ginkgo.
var cfg *rest.Config
var k8sClient client.Client
var testEnv *envtest.Environment
func TestAPIs(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecsWithDefaultAndCustomReporters(t,
"Controller Suite",
[]Reporter{printer.NewlineReporter{}})
}
var _ = BeforeSuite(func(done Done) {
logf.SetLogger(zap.LoggerTo(GinkgoWriter, true))
By("bootstrapping test environment")
testEnv = &envtest.Environment{
CRDDirectoryPaths: []string{filepath.Join("..", "config", "crd", "bases")},
}
var err error
cfg, err = testEnv.Start()
Expect(err).ToNot(HaveOccurred())
Expect(cfg).ToNot(BeNil())
err = myjobv1beta1.AddToScheme(scheme.Scheme)
Expect(err).NotTo(HaveOccurred())
// +kubebuilder:scaffold:scheme
k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme})
Expect(err).ToNot(HaveOccurred())
Expect(k8sClient).ToNot(BeNil())
// *号中间这块代码是我们在 Kubebuilder 生成的代码上添加的代码,添加的代码逻辑主要是启动 Manager 以及我们自己的 Reconciler 控制器
// ******************************************
// 创建 manager
k8sManager, err := ctrl.NewManager(cfg, ctrl.Options{
Scheme: scheme.Scheme,
MetricsBindAddress: ":8082",
})
Expect(err).ToNot(HaveOccurred())
// 启动 myjob reconciler
err = (&MyJobReconciler{
Client: k8sManager.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("MyJob"),
Scheme: k8sManager.GetScheme(),
}).SetupWithManager(k8sManager)
Expect(err).ToNot(HaveOccurred())
// 启动 manager
go func() {
err = k8sManager.Start(ctrl.SetupSignalHandler())
Expect(err).ToNot(HaveOccurred())
}()
k8sClient = k8sManager.GetClient()
Expect(k8sClient).ToNot(BeNil())
// ******************************************
close(done)
}, 60)
var _ = AfterSuite(func() {
By("tearing down the test environment")
err := testEnv.Stop()
Expect(err).ToNot(HaveOccurred())
})
2. 上面的步骤完成后,我们其实可以知道,目前 Kubernetes APIServer,Etcd 已经启动完毕,同时我们自己的 Manager,Reconciler 启动完成,这样我们就可以编写对应的测试用例来测试我们的 Operator 了,我们在 controllers 目录下创建 myjob_controller_test.go 文件来编写具体的测试用例的文件,下面的测试用例流程是 『创建 myjob → 验证 myjob 创建成功 → 验证 myjob 对应的 pod 创建成功 → 验证 myjob 的状态是否 running → Mock 对应的 pod 执行完毕 → 验证 myjob 的状态变成 completed 状态』,代码如下:
package controllers
import (
"context"
"time"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
myjobv1beta1 "github.com/sky-big/myjob-operator/api/v1beta1"
)
var _ = Describe("MyJob controller", func() {
const (
MyjobName = "test-myjob"
MyjobNamespace = "default"
timeout = time.Second * 10
duration = time.Second * 10
interval = time.Millisecond * 250
)
Context("When creating MyJob", func() {
It("Should be success", func() {
By("By creating a new MyJob")
ctx := context.Background()
// 0. 创建 myjob
cronJob := &myjobv1beta1.MyJob{
TypeMeta: metav1.TypeMeta{
APIVersion: "myjob.github.com/v1beta1",
Kind: "MyJob",
},
ObjectMeta: metav1.ObjectMeta{
Name: MyjobName,
Namespace: MyjobNamespace,
},
Spec: myjobv1beta1.MyJobSpec{
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{
v1.Container{
Name: "pi",
Image: "perl",
Command: []string{"perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"},
},
},
},
},
},
}
Expect(k8sClient.Create(ctx, cronJob)).Should(Succeed())
myjobKey := types.NamespacedName{Name: MyjobName, Namespace: MyjobNamespace}
createdMyjob := &myjobv1beta1.MyJob{}
// 1. 验证 myjob 创建成功
Eventually(func() bool {
err := k8sClient.Get(ctx, myjobKey, createdMyjob)
if err != nil {
return false
}
return true
}, timeout, interval).Should(BeTrue())
Expect(createdMyjob.Name).Should(Equal(MyjobName))
// 2. 验证 myjob 创建 pod
myPodKey := types.NamespacedName{Name: MyjobName, Namespace: MyjobNamespace}
myPod := &v1.Pod{}
Consistently(func() (string, error) {
err := k8sClient.Get(ctx, myPodKey, myPod)
if err != nil {
return "", err
}
return myPod.Name, nil
}, duration, interval).Should(Equal(MyjobName))
// 3. 验证 myjob 状态变为 Running
runningMyjob := &myjobv1beta1.MyJob{}
Consistently(func() bool {
err := k8sClient.Get(ctx, myjobKey, runningMyjob)
if err != nil {
return false
}
return runningMyjob.Status.Phase == myjobv1beta1.MyJobRunning
}, duration, interval).Should(BeTrue())
// 4. Mock Pod 工作完成
mockPod := &v1.Pod{}
Consistently(func() bool {
err := k8sClient.Get(ctx, myPodKey, mockPod)
if err != nil {
return false
}
copy := mockPod.DeepCopy()
copy.Status.Phase = v1.PodSucceeded
err = k8sClient.Status().Update(context.TODO(), copy)
if err != nil {
return false
}
return true
}, duration, interval).Should(BeTrue())
// 5. 验证 myjob 状态变为 Completed
completedMyjob := &myjobv1beta1.MyJob{}
Consistently(func() bool {
err := k8sClient.Get(ctx, myjobKey, completedMyjob)
if err != nil {
return false
}
return completedMyjob.Status.Phase == myjobv1beta1.MyJobCompleted
}, duration, interval).Should(BeTrue())
})
})
})
上述的测试用例中使用了 Ginkgo 以及 Gomega 测试相关的辅助包,详细使用方法参见官方文档。
6. 执行集成测试 & 编译
$ make test
go test ./... -coverprofile cover.out
? github.com/sky-big/myjob-operator [no test files]
? github.com/sky-big/myjob-operator/api/v1beta1 [no test files]
ok github.com/sky-big/myjob-operator/controllers 46.381s coverage: 81.6% of statements
最终结果测试成功,控制器逻辑代码覆盖率达到81.6%
# 编译
$ make
go fmt ./...
go vet ./...
go build -o bin/manager main.go
编译成功,可执行文件存在 bin 目录下面
编译过程会自动安装 controller-gen,然后使用它根据 api/v1beta1/myjob_types.go 来生成深度拷贝等通用代码。
7. 打包上传 Docker 镜像
打包镜像对应根目录下的 Dockerfile 文件由于国内网络的问题,需要修改两处:
-
在 Run go mod download 前面添加一行设置 GOPROXY:RUN go env -w GOPROXY=https://goproxy.cn
FROM golang:1.13 as builder
WORKDIR /workspace
# Copy the Go Modules manifests
COPY go.mod go.mod
COPY go.sum go.sum
# cache deps before building and copying source so that we don't need to re-download as much
# and so that source changes don't invalidate our downloaded layer
RUN go env -w GOPROXY=https://goproxy.cn
RUN go mod download
-
将 FROM gcr.io/distroless/static:nonroot 换成国内的镜像源,我在 dockerhub 上找了一个下载量较多的源:kubeimages/distroless-static:latest
# FROM gcr.io/distroless/static:nonroot
FROM kubeimages/distroless-static:latest
WORKDIR /
COPY --from=builder /workspace/manager .
USER nonroot:nonroot
ENTRYPOINT ["/manager"]
上传镜像的时候需要修改根目录下的 Makefile 文件的第一行,指定镜像的存储的仓库地址以及镜像名称和 Tag,下面填的是我自己的 dockerhub 的仓库地址,在 Push 之前开发者需要登录自己的仓库。
# Image URL to use all building/pushing image targets
IMG ?= skybig/myjob-operator:latest
修改完毕后,执行以下命令即可:
# 打包镜像
$ make docker-build
# 上传镜像
$ make docker-push
Operator 打包成镜像后,通过该镜像启动容器后,会启动我们的控制器。
8. 部署
目前 Kubebuilder 最新版本不支持 Kubernetes 1.18版本,对应的 BUG 已经修复,但是还没有发布到最新版本的 Kubebuilder,这是对应的 PR(https://github.com/kubernetes-sigs/controller-tools/pull/440),Kubernetes 1.18 以下版本使用没有问题,这是需要注意的点。
我添加了一个卸载的 Makefile Target 在 Makefile 文件中,方便测试。
# Deploy controller in the configured Kubernetes cluster in ~/.kube/config
deploy: manifests kustomize
cd config/manager && $(KUSTOMIZE) edit set image controller=${IMG}
$(KUSTOMIZE) build config/default | kubectl apply -f -
undeploy: manifests kustomize
cd config/manager && $(KUSTOMIZE) edit set image controller=${IMG}
$(KUSTOMIZE) build config/default | kubectl delete -f -
Kubebuilder yaml 的管理都是通过 kustomize 进行管理的,该工具在这里就就不细说了,它是 Kubernetes 原生概念帮助用户创作并复用声明式配置,在 kustomize 出现之前,Kubernetes 管理应用的方式主要是通过 Helm 或者上层 Paas 来完成。
修改完毕后,执行以下命令即可:
# 将 Operator 部署在 Kubernetes 集群中
$ make deploy
# 查看 Operator 对应的 Pod 的状态
$ kubectl get pods -A
NAMESPACE NAME READY STATUS RESTARTS AGE
myjob-operator-system myjob-operator-controller-manager-65489c68c8-md2w7 2/2 Running 0 143m
9. 测试
测试的 yaml 对应在 config/sample/myjob_v1beta1_myjob.yaml 文件中,我根据最新定义的 CRD 修改了一下:
apiVersion: myjob.github.com/v1beta1
kind: MyJob
metadata:
name: myjob-sample
spec:
# Add fields here
template:
metadata:
name: pi
spec:
containers:
- name: pi
image: perl
command: [ "perl", "-Mbignum=bpi", "-wle", "print bpi(2000)" ]
restartPolicy: Never
然后在项目根目录下执行命令进行测试:
# 部署测试 MyJob
$ kubectl apply -f config/sample/myjob_v1beta1_myjob.yaml
# 查看 MyJob 对应的 Pod 执行状态
$ kubectl get pods -A
NAMESPACE NAME READY STATUS RESTARTS AGE
default myjob-sample 0/1 Completed 0 143m
myjob-operator-system myjob-operator-controller-manager-65489c68c8-md2w7 2/2 Running 0 143m
# 查看 MyJob 的状态
$ kubectl get myjobs -A
NAMESPACE NAME AGE
default myjob-sample 146m
$ kubectl describe myjob myjob-sample
Name: myjob-sample
Namespace: default
Labels: <none>
Annotations: API Version: myjob.github.com/v1beta1
Kind: MyJob
Metadata:
Creation Timestamp: 2020-11-16T08:58:31Z
Generation: 1
Resource Version: 652546
Self Link: /apis/myjob.github.com/v1beta1/namespaces/default/myjobs/myjob-sample
UID: 1ae6e8b0-931b-4630-b3ea-bf60e94bf2d0
Spec:
Template:
Metadata:
Name: pi
Spec:
Containers:
Command:
perl
-Mbignum=bpi
-wle
print bpi(2000)
Image: perl
Name: pi
Restart Policy: Never
Status:
Phase: completed
Events: <none
可以看到 MyJob 的状态变成了 Completed 完成状态,咱们开发的 MyJob Operator 也就从零开始到现在完美结束了。
文章里的代码存放在我的Github仓库,大家可以按需使用和查看。
参考文档
-
Custom Resource 特性:https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/
-
Kubebuilder 源代码:https://github.com/kubernetes-sigs/kubebuilder
-
Kubebuilder 安装文档:https://book.kubebuilder.io/quick-start.html#installation
-
Kubebuilder 标记文档:https://book.kubebuilder.io/reference/markers.html
-
Kubebuilder envtest 集成测试工具使用文档:https://book.kubebuilder.io/reference/envtest.html
-
Operator-SDK:https://github.com/operator-framework/operator-sdk
-
Controller Runtime:https://github.com/kubernetes-sigs/controller-runtime
-
Kustomize:https://kustomize.io/
-
Ginkgo:https://onsi.github.io/ginkgo/
-
Gomega:https://onsi.github.io/gomega/
作者简介:徐同学,百度资深研发工程师,现就职于百度基础架构部云原生团队,对云原生相关的 Kubernetes,Serverless 等方向有深入的研究和实践经验。
了解更多微服务、云原生技术的相关信息,请关注我们的微信公众号【云原生计算】!