总述
自2021年6月21号由玩云原生的运维转玩云原生的开发至今,已有5月有余,除去中间的一些其他工作任务,实际参与(实际是一个人负责开发)多厂商容器平台开发应有3月有余。个人开发并规划的多厂商容器平台是根据此张由我个人设计的规划图进行的(ps:部门没有架构师级别的,能提供可行的架构图或者一点指导,所以只能根据4年的kubernetes使用经验和逛各大网站和大厂的相关文档)。
依赖项
- golang 1.16.6
- goland 2021.2.3
- gin 1.7.4
- gorm 1.9.16
- kubernetes 1.16 -- 1.19
- client-go 0.19.0
规划图浅解
采取模块分层开发:以适配层为分界线
各容器厂商会根据实际需求,对容器平台进行适当的整改,如:
根据以上的考虑:
部分核心代码
目录结构:
.
├── Makefile
├── README.md
├── cmd
│ └── container
│ └── container.go
├── config
│ └── container.yaml
├── deploy
│ ├── docker
│ │ └── Dockerfile
│ └── vmware
├── docs
├── go.mod
├── go.sum
├── internal
│ └── container
│ ├── bootstrap
│ ├── controller
│ ├── dao
│ ├── dto
│ ├── ecode
│ ├── initialize
│ ├── job
│ ├── middleware
│ ├── models
│ ├── pkg
│ │ ├── kubernetes
│ │ │ ├── dto
│ │ │ └── service
│ │ ├── registry
│ │ │ ├── registry.go
│ │ │ └── tke
│ │ │ └── acr
│ │ │ └── harbor
│ │ └── rancher
│ │ ├── dto
│ │ └── service
│ │ └── ack
│ │ ├── dto
│ │ └── service
│ │ └── tke
│ │ ├── dto
│ │ └── service
│ ├── routers
│ └── service
├── pkg
├── scripts
│ ├── db.sh
│ └── deploy.sh
└── test
封装httpClient
type HTTPClient interface {
Do(req *http.Request) (*http.Response, error)
}
var (
Client HTTPClient
)
func init() {
Client = &http.Client{
Timeout: 10 * time.Second,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
DisableKeepAlives: true,
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second, // tcp连接超时时间
KeepAlive: 60 * time.Second, // 保持长连接的时间
DualStack: true,
}).DialContext, // 设置连接的参数
MaxIdleConns: 50, // 最大空闲连接
MaxConnsPerHost: 100,
MaxIdleConnsPerHost: 100, // 每个host保持的空闲连接数
ExpectContinueTimeout: 30 * time.Second, // 等待服务第一响应的超时时间
IdleConnTimeout: 60 * time.Second, // 空闲连接的超时时间
},
}
}
// CheckRespStatus 状态检查
func CheckRespStatus(resp *http.Response) ([]byte, error) {
bodyBytes, _ := ioutil.ReadAll(resp.Body)
if resp.StatusCode >= 200 && resp.StatusCode < 400 {
return bodyBytes, nil
}
return nil, errors.New(string(bodyBytes))
}
// Request 建立http请求
func Request(url, token, body string, headerSet map[string]string, method string) (respStatusCode int, respBytes []byte, err error) {
request, err := http.NewRequest(method, url, strings.NewReader(body))
if err != nil {
return 401, nil, err
}
//添加token
if token != "" {
request.Header.Set("Authorization", "Bearer "+token)
}
// header 添加字段
if headerSet != nil {
for k, v := range headerSet {
request.Header.Set(k, v)
}
}
resp, err := Client.Do(request)
if err != nil {
return 401, nil, err
}
defer resp.Body.Close()
// 返回的状态码
respBytes, err = CheckRespStatus(resp)
respStatusCode = resp.StatusCode
return
}
封装clusterManager
type ClusterManager struct {
ClientSet *kubernetes.Clientset
Metrics *metrics.Clientset
DynamicClient dynamic.Interface
}
const (
//DefaultQPS High enough QPS to fit all expected use cases.
DefaultQPS = 1e6
//DefaultBurst High enough Burst to fit all expected use cases.
DefaultBurst = 1e6
)
func buildConfig(clusterName string) (*rest.Config, error) {
var clientConfig *rest.Config
var configV1 *clientcmdapiv1.Config
var dbCluster models.Cluster
var err error
var host string
rows := bootstrap.DB.Where("cluster_name = ?", clusterName).Find(&dbCluster).RowsAffected
if rows == 0 {
return nil, errors.New("the database does not have this information")
}
if dbCluster.KubeConfigSecret != "" {
kubeConfigBytes, err := base64.StdEncoding.DecodeString(dbCluster.KubeConfigSecret)
kubeConfigJson, err := yaml.YAMLToJSON(kubeConfigBytes)
err = json.Unmarshal(kubeConfigJson, &configV1)
if err != nil {
logrus.Error(err.Error())
}
// 切换匹配的版本
configObject, err := clientcmdlatest.Scheme.ConvertToVersion(configV1, clientcmdapi.SchemeGroupVersion)
configInternal := configObject.(*clientcmdapi.Config)
// 实例化配置信息
clientConfig, err = clientcmd.NewDefaultClientConfig(*configInternal, &clientcmd.ConfigOverrides{}).ClientConfig()
clientConfig.QPS = DefaultQPS
clientConfig.Burst = DefaultBurst
} else if dbCluster.Token != "" {
var addresses []dto.Addresses
err := json.Unmarshal([]byte(dbCluster.APIServer), &addresses)
for _, address := range addresses {
if address.Type == "Real" {
host = fmt.Sprintf("https://") + address.Host + fmt.Sprintf(":") + strconv.Itoa(address.Port)
break
}
}
if err != nil {
return nil, errors.New("request connection cluster failed")
}
clientConfig = &rest.Config{
Host: host,
APIPath: "",
ContentConfig: rest.ContentConfig{},
Username: "",
Password: "",
BearerToken: dbCluster.Token,
BearerTokenFile: "",
Impersonate: rest.ImpersonationConfig{},
AuthProvider: nil,
AuthConfigPersister: nil,
ExecProvider: nil,
TLSClientConfig: rest.TLSClientConfig{
Insecure: true,
},
UserAgent: "",
DisableCompression: false,
Transport: nil,
WrapTransport: nil,
QPS: DefaultQPS,
Burst: DefaultBurst,
RateLimiter: nil,
WarningHandler: nil,
Timeout: 0,
Dial: nil,
Proxy: nil,
}
} else {
return nil, errors.New("build client config error")
}
return clientConfig, nil
}
func BuildApiServerClient(clusterName string) (*ClusterManager, error) {
clientConfig, err := buildConfig(clusterName)
if err != nil {
return nil, err
}
if clientConfig == nil {
return nil, errors.New("err: error BuildApiServerClient")
}
clientSet, err := kubernetes.NewForConfig(clientConfig)
if err != nil {
return nil, err
}
// 这里一定要调用Discovery().ServerVersion(),探测Kube apiServer是否可用,因为kubernetes.NewForConfig(restConfig)不会去检查服务是否可用,当服务不可用时,该方法不会返回错误的
_, err = clientSet.Discovery().ServerVersion()
if err != nil {
return nil, err
}
m, err := metrics.NewForConfig(clientConfig)
if err != nil {
return nil, err
}
d, err := dynamic.NewForConfig(clientConfig)
if err != nil {
return nil, err
}
clusterManager := &ClusterManager{
clientSet,
m,
d,
}
return clusterManager, nil
}
封装reponseApi
type ApiResponse struct {
Code int `json:"code"`
Msg string `json:"message"`
Data interface{} `json:"data"`
}
// PaginateResponse 显然这个结构体可以复用 ApiResponse, 但是 swagger 不认识!
type PaginateResponse struct {
Code int `json:"code"`
Msg string `json:"message"`
Data Paginate `json:"data"`
}
type Paginate struct {
CurPage int `json:"cur_page"` // 当前页
CurPageSize int `json:"cur_page_size"` // 每页展示数据量
Total int `json:"total"` // 总共数据量
TotalPage int `json:"total_page"` // 总共页数
Data interface{} `json:"data"` // 数据
}
// SuccessResponse API成功返回
func SuccessResponse(c *gin.Context, data interface{}) {
response(c, ecode.Success, data)
}
// SuccessPaginateResponse 分页返回
func SuccessPaginateResponse(c *gin.Context, data interface{}, total int, curPage int, curPageSize int) {
c.JSON(http.StatusOK, PaginateResponse{
Code: int(ecode.Success),
Msg: ecode.ErrMsg[ecode.Success],
Data: Paginate{CurPage: curPage, CurPageSize: curPageSize, Total: total, TotalPage: int(math.Ceil(float64(total) / float64(curPageSize))), Data: data},
})
c.Abort()
}
// ErrorResponse API失败返回
func ErrorResponse(c *gin.Context, code ecode.ErrCode, data interface{}) {
response(c, code, data)
}
func NotFoundResponse(c *gin.Context) {
c.JSON(http.StatusNotFound, gin.H{
"code": 404,
"message": "页面未找到",
"data": "",
})
}
func response(c *gin.Context, code ecode.ErrCode, data interface{}) {
c.JSON(http.StatusOK, ApiResponse{
Code: int(code),
Msg: ecode.ErrMsg[code],
Data: data,
})
c.Abort()
}
几个调kubernetes的例子【认为比较有趣的例子】
# 倒序输出事件
events, err := w.workloadKubernetes.FindEvents(clientSet, namespace, name)
if err != nil {
return nil, err
}
var eventsWorkloadReps []*dto.EventsWorkloadRep
t := time.Time{}
for i := len(events) - 1; i >= 0; i-- {
if events[i].FirstTimestamp.Time == t {
events[i].FirstTimestamp.Time = events[i].EventTime.Time
}
if events[i].LastTimestamp.Time == t {
events[i].LastTimestamp.Time = events[i].EventTime.Time
}
eventsWorkloadReps = append(eventsWorkloadReps, &dto.EventsWorkloadRep{
WorkloadUUID: id,
FirstTimestamp: events[i].FirstTimestamp.Time,
LastTimestamp: events[i].LastTimestamp.Time,
Type: events[i].Type,
Kind: events[i].InvolvedObject.Kind,
Name: events[i].Name,
Reason: events[i].Reason,
Message: events[i].Message,
Count: events[i].Count,
})
}
if len(eventsWorkloadReps) == 0 {
return make([]*dto.EventsWorkloadRep, 0), nil
}
return eventsWorkloadReps, nil
# pod log
limit, _ := strconv.ParseInt(tailLines, 10, 64)
req := clientSet.CoreV1().Pods(namespace).GetLogs(name, &coreV1.PodLogOptions{Container: container, Timestamps: true, TailLines: &limit})
podLogs, err := req.Stream(context.TODO())
if err != nil {
return "error in opening stream"
}
defer podLogs.Close()
buf := new(bytes.Buffer)
_, err = io.Copy(buf, podLogs)
if err != nil {
return "error in copy information from podLogs to buf"
}
str := buf.String()
return str
# 根据kubeconfig 获取ApiServer\CertFile\Token
decoded, err := base64.StdEncoding.DecodeString(kubeConfig)
decodestr := string(decoded)
// 认证方式为kubeConfig
// 通过kubeConfig获取 api / token / certFile
kubeConfigJson, err := syaml.YAMLToJSON([]byte(decodestr))
var configV1 *clientcmdapiv1.Config
err = json.Unmarshal(kubeConfigJson, &configV1)
if err != nil {
return nil, nil, err
}
c, err := clientcmd.RESTConfigFromKubeConfig(decoded)
if err != nil {
return nil, nil, err
}
clientSet, err := kubernetes.NewForConfig(c)
if err != nil {
return nil, nil, err
}
sa, err := clientSet.CoreV1().ServiceAccounts("kube-system").Get(context.TODO(), "admin-user", metaV1.GetOptions{})
secrets, err := clientSet.CoreV1().Secrets("kube-system").Get(context.TODO(), sa.Secrets[0].Name, metaV1.GetOptions{})
if err != nil {
return nil, err
}
importClusterReq.ApiServer = configV1.Clusters[0].Cluster.Server
encoded := base64.StdEncoding.EncodeToString(configV1.Clusters[0].Cluster.CertificateAuthorityData)
importClusterReq.CertFile = encoded
importClusterReq.Token = string(secrets.Data["token"])
# 实现apply yaml 【......写成了x了】
func (y *Yaml) ApplyYaml(dynamicClient dynamic.Interface, clientSet *kubernetes.Clientset, yamlBody []byte) (interface{}, error) {
data, err := yamlutil.ToJSON(yamlBody)
var applyYaml dto.ApplyYaml
if err = json.Unmarshal(data, &applyYaml); err != nil {
return nil, err
}
var applyYamlRep string
decoder := yamlutil.NewYAMLOrJSONDecoder(bytes.NewReader(yamlBody), len(yamlBody))
var rawObj runtime.RawExtension
if err := decoder.Decode(&rawObj); err != nil {
return nil, err
}
obj, gvk, err := yaml.NewDecodingSerializer(unstructured.UnstructuredJSONScheme).Decode(rawObj.Raw, nil, nil)
unstructuredMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
if err != nil {
return nil, err
}
unstructuredObj := &unstructured.Unstructured{Object: unstructuredMap}
// 获取支持的资源类型列表
gr, err := restmapper.GetAPIGroupResources(clientSet.Discovery())
if err != nil {
return nil, err
}
// 创建 'Discovery REST Mapper',获取查询的资源的类型
mapper := restmapper.NewDiscoveryRESTMapper(gr)
// 查找 Group/Version/Kind 的 REST 映射
mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return nil, err
}
var dri dynamic.ResourceInterface
// 需要为 namespace 范围内的资源提供不同的接口
if mapping.Scope.Name() == meta.RESTScopeNameNamespace {
if unstructuredObj.GetNamespace() == "" {
unstructuredObj.SetNamespace("default")
}
dri = dynamicClient.Resource(mapping.Resource).Namespace(unstructuredObj.GetNamespace())
} else {
dri = dynamicClient.Resource(mapping.Resource)
}
if applyYaml.Metadata.Namespace == "" {
applyYaml.Metadata.Namespace = "default"
}
// 查询k8s是否有该资源类型
switch applyYaml.Kind {
case "Deployment":
_, err = clientSet.AppsV1().Deployments(applyYaml.Metadata.Namespace).Get(context.TODO(), applyYaml.Metadata.Name, metaV1.GetOptions{})
case "StatefulSet":
_, err = clientSet.AppsV1().StatefulSets(applyYaml.Metadata.Namespace).Get(context.TODO(), applyYaml.Metadata.Name, metaV1.GetOptions{})
case "DaemonSet":
_, err = clientSet.AppsV1().DaemonSets(applyYaml.Metadata.Namespace).Get(context.TODO(), applyYaml.Metadata.Name, metaV1.GetOptions{})
case "ReplicaSet":
_, err = clientSet.AppsV1().ReplicaSets(applyYaml.Metadata.Namespace).Get(context.TODO(), applyYaml.Metadata.Name, metaV1.GetOptions{})
case "CronJob":
_, err = clientSet.BatchV1().CronJobs(applyYaml.Metadata.Namespace).Get(context.TODO(), applyYaml.Metadata.Name, metaV1.GetOptions{})
case "Job":
_, err = clientSet.BatchV1().Jobs(applyYaml.Metadata.Namespace).Get(context.TODO(), applyYaml.Metadata.Name, metaV1.GetOptions{})
case "Service":
_, err = clientSet.CoreV1().Services(applyYaml.Metadata.Namespace).Get(context.TODO(), applyYaml.Metadata.Name, metaV1.GetOptions{})
case "ConfigMap":
_, err = clientSet.CoreV1().ConfigMaps(applyYaml.Metadata.Namespace).Get(context.TODO(), applyYaml.Metadata.Name, metaV1.GetOptions{})
case "Ingress":
_, err = clientSet.ExtensionsV1beta1().Ingresses(applyYaml.Metadata.Namespace).Get(context.TODO(), applyYaml.Metadata.Name, metaV1.GetOptions{})
case "ServiceAccount":
_, err = clientSet.CoreV1().ServiceAccounts(applyYaml.Metadata.Namespace).Get(context.TODO(), applyYaml.Metadata.Name, metaV1.GetOptions{})
case "ClusterRole":
_, err = clientSet.RbacV1().ClusterRoles().Get(context.TODO(), applyYaml.Metadata.Name, metaV1.GetOptions{})
case "RoleBinding":
_, err = clientSet.RbacV1().RoleBindings(applyYaml.Metadata.Namespace).Get(context.TODO(), applyYaml.Metadata.Name, metaV1.GetOptions{})
case "ClusterRoleBinding":
_, err = clientSet.RbacV1().ClusterRoleBindings().Get(context.TODO(), applyYaml.Metadata.Name, metaV1.GetOptions{})
case "APIService":
_, err := dri.Create(context.Background(), unstructuredObj, metaV1.CreateOptions{})
if err != nil {
return nil, err
}
}
if err != nil {
if !errors.IsNotFound(err) {
return nil, err
}
// 不存在则创建
obj2, err := dri.Create(context.Background(), unstructuredObj, metaV1.CreateOptions{})
fmt.Println("obj2", obj2)
if err != nil {
return nil, err
}
applyYamlRep = fmt.Sprintf("%s/%s/%s created", obj2.GetNamespace(), obj2.GetKind(), obj2.GetName())
} else { // 已存在则更新
obj2, err := dri.Update(context.Background(), unstructuredObj, metaV1.UpdateOptions{})
if err != nil {
return nil, err
}
applyYamlRep = fmt.Sprintf("%s/%s/%s update", obj2.GetNamespace(), obj2.GetKind(), obj2.GetName())
}
return applyYamlRep, nil
}
以registry的命名空间新建为例 来个适配器的demo 【伪代码】
# controller层
func CreateContainerRegistryNamespace(c *gin.Context) {
var param dto.CreateContainerRegistryNamespaceReq
if err := c.ShouldBindJSON(¶m); err != nil {
controller.ErrorResponse(c, ecode.PARAMETER_ERR, err.Error())
return
}
cred, err := service.NewCredentialService().GetPlainTextCredential(param.CredentialID)
if err != nil {
ErrorResponse(c, ecode.ParameterErr, err.Error())
return
}
param.Token = cred.Token
if data, err := service.NewRegistryService(param.Source,param.Token).CreateContainerRegistryNamespace(param); err != nil {
controller.ErrorResponse(c, ecode.PARAMETER_ERR, err.Error())
} else {
controller.SuccessResponse(c, data)
}
}
# service层
type registryService struct {
cli registry.Registry
registryDao dao.RegistryDao
}
func NewRegistryService(source,token string) *registryService {
return ®istryService{
cli: registry.NewRegistryCli(source,token),
}
}
func (r *registryService) CreateContainerRegistryNamespace(param dto.CreateContainerRegistryNamespaceReq) (interface{}, error) {
rep, err := r.cli.CreateContainerRegistryNamespace(param.DisplayName, param.Describe, param.Visibility)
return rep, err
}
# interface层
type Registry interface {
CreateContainerRegistryNamespace(name, describe, visibility, uuid string) (*dto.ListContainerRegistryNamespacesRep, error)
}
func NewRegistryCli(source,token string) Registry {
var cli Registry
switch source {
case "tke":
cli = tke.NewTkeClient(token)
}
return cli
}
# 方法层
type tkeClient struct {
token string
}
func NewTkeClient(token) *tkeClient {
return &tkeClient{
token: token
}
}
func (t *tkeClient) CreateContainerRegistryNamespace(name, describe, visibility, uuid string) (*dto.ListContainerRegistryNamespacesRep, error) {
url := viper.GetString("url") + "/apis/registry.tkestack.io/v1/namespaces/"
createJson := tDto.CreateContainerRegistryReq{
APIVersion: "registry.tkestack.io/v1",
Kind: "Namespace",
Spec: tDto.CreateContainerRegistryReqSpec{
Name: name,
DisplayName: describe,
Visibility: visibility,
},
}
jsonData, errs := json.Marshal(createJson)
if errs != nil {
return nil, errs
}
_, rep, err := pkg.Request(url, token, string(jsonData), nil, http.MethodPost)
if err != nil {
errRep := gojsonq.New().FromString(err.Error()).Find("message")
return nil, errors.New(errRep.(string))
}
var listRep *tDto.ListRepItems
if err = json.NewDecoder(strings.NewReader(string(rep))).Decode(&listRep); err != nil {
errRep := gojsonq.New().FromString(err.Error()).Find("message")
return nil, errors.New(errRep.(string))
}
var createRep dto.ListContainerRegistryNamespacesRep
createRep.DisplayName = listRep.Spec.Name
createRep.Describe = listRep.Spec.DisplayName
createRep.Visibility = listRep.Spec.Visibility
createRep.Name = listRep.Metadata.Name
createRep.RepoCount = listRep.Status.RepoCount
return &createRep, nil
}