一、搭建etcd环境

 

我将使用 etcd v3版本, so,本地先建个单机版本ETCD环境。

 

docker pull xieyanze/etcd3:latest

 

docker run —name etcd-v3.0.9 -d -v /tmp:/data \\

          -p 2379:2379 -p 2380:2380 -p 4001:4001 -p 7001:7001 xieyanze/etcd3:latest

          

docker exec -it etcd-v3.0.9 sh

当前默认还是v2版本通过设定环境变量export ETCDCTL_API=3,设置成V3版本。

export ETCDCTL_API=3

 

etcdctl put /test/ok 11

etcdctl put /test/ok 22

 

etcdctl del  /test/gg

#删除所有/test前缀的节点

etcdctl del  /test --prefix

 

etcdctl get /test/ok

前缀查询

etcdctl get /test/ok --prefix

 

.软件逻辑结构

 

 

1. k8s master cluster
dev-7
dev-8

2. k8s slave cluster 1 env1
dev-1
dev-2
dev-3

3. k8s slave cluster 2 env2
dev-4
dev-5
dev-6

. controller  agent 服务注册与发现

实现原理:

注意: etcd v3版本, k/v 的超时间时TTL最小5秒种.
1.2秒钟,每个服务向etcd发送一次心跳包,证明自己还活着.
2.当服务退出时主动删除etcdkey. 或者等到TTL超时之后,自动下线.
3.controller需要获得agent的状态,直接GET [ingress/agent/${env_uuid}/]就能获得当前agent在线状态
4.agent需要获得controller的状态,直接GET [ingress/controller]就能获得当前controller在线状态

 

 

controller 目录

目录

TTL

ingress/controller/dev7_xxx

{"ip":xxx}

5

ingress/controller/dev8_xxx

{"ip":xxx}

5

 

agent 目录

目录

TTL

ingress/agent/env1/dev1_xxx

{"ip":xxx}

5

ingress/agent/env1/dev2_xxx

{"ip":xxx}

5

ingress/agent/env1/dev3_yyy

{"ip":xxx}

5

ingress/agent/env2/dev4_xxx

{"ip":xxx}

5

ingress/agent/env2/dev5_xxx

{"ip":xxx}

5

ingress/agent/env2/dev6_yyy

{"ip":xxx}

5

 

 

.软件业务的实现.

 

4.1 controller side:

 

1. 客户端调用controller restful api.controller 直接写入ETCD,同时写入副本到mysql.

2. controller 如果关注于agent的变化.只需要watch ingress/agent这个目录

3. controller 是无状态,不需要同步多个实例之间的数据,可以任意的scale它的实例数.

4. 如果controller挂掉之后,重启加载mysql的数据库同步到etcd.

 

4.2 controller需要了解规则执行状态

 

 

 

etcd 目录

目录

TTL

ingress/ingress_config/env1/${config_uuid1}/status/dev1_xxx

1

5

ingress/ingress_config/env1/${config_uuid1}/status/dev2_xxx

1

5

ingress/ingress_config/env1/${config_uuid1}/status/dev3_xxx

1

5

ingress/ingress_config/env1/${config_uuid2}/status/dev1_xxx

1

5

ingress/ingress_config/env1/${config_uuid2}/status/dev2_xxx

1

5

ingress/ingress_config/env1/${config_uuid2}/status/dev3_xxx

1

5

ingress/ingress_config/env2/${config_uuid3}/status/dev4_xxx

1

5

ingress/ingress_config/env2/${config_uuid3}/status/dev5_xxx

1

5

ingress/ingress_config/env2/${config_uuid3}/status/dev6_xxx

1

5

 

 

agent的执行状态直接写入配置状态中,
先获得当前ingress/agent/env1目录下的agent列表,对比ingress/ingress_agent/env1/${config_uuid1}/status目录下的规则完成之后反馈列表每一个都存在时,则全部执行成功.

4.3 agent side:

1. 不同集群agent 通过etcdwatch功能在第一时间,获得监听到所有数据的变化 新建,删除,更新

2. 不同集群agent 定时3分钟获得自已环境下的列表信息,同步处理相关信息

3. 如果agent挂了之后重启加载一次etcd中所有的ingress_conifg.

代码实例

 

5.1 etcd clientv3 的封装

1.连接管理,支持TLS
2.,,支持自动超时的设值
3.watch 监听目录或KEY的值的变化(PUT,DELETE)

package main

 

import (

"fmt"

"time"

// "github.com/coreos/etcd/pkg/transport"

"github.com/coreos/etcd/clientv3"

"golang.org/x/net/context"

// "sync"

)

 

 

type EtcdData struct {

Key              string

Value            string

}

 

type EtcdHelper struct {

RequestTimeout   time.Duration

Client           *clientv3.Client

}

 

func NewEtcdHelper() *EtcdHelper {

 

//tlsInfo := transport.TLSInfo{

// CertFile:      "/tmp/test-certs/test-name-1.pem",

// KeyFile:       "/tmp/test-certs/test-name-1-key.pem",

// TrustedCAFile: "/tmp/test-certs/trusted-ca.pem",

//}

//tlsInfo := transport.TLSInfo{

// CertFile:      "./tls/apiserver.crt",

// KeyFile:       "./tls/apiserver.key",

//}

//tlsConfig, err := tlsInfo.ClientConfig()

//if err != nil {

// fmt.Printf("%s", err.Error())

// return nil

//}

 

//cli, err := clientv3.New(clientv3.Config{

// Endpoints: []string{"dev-7:2379"},

// DialTimeout: 3 * time.Second,

// TLS:         tlsConfig,

//})

 

cli, err := clientv3.New(clientv3.Config{

Endpoints: []string{"127.0.0.1:2379"},

//Endpoints: []string{"http://dev-7:2379"},

DialTimeout: 3 * time.Second,

})

 

if err != nil {

fmt.Printf("%s", err.Error())

return nil

}

 

return &EtcdHelper{

RequestTimeout: 5 *time.Second,

Client: cli,

}

}

 

func (c *EtcdHelper) Release() {

if c.Client != nil {

c.Client.Close()

}

}

 

func (c *EtcdHelper) PutValue(key string, value string, ttl int64) error {

ctx, cancel := context.WithTimeout(context.Background(), c.RequestTimeout)

defer cancel()

 

// minimum lease TTL is 5-second

resp, err := c.Client.Grant(context.TODO(), ttl)

if err != nil {

fmt.Printf("%s\\n", err.Error())

return err

}

 

_, err = c.Client.Put(ctx, key, value, clientv3.WithLease(resp.ID))

if err != nil {

fmt.Printf("%s\\n", err.Error())

return err

}

 

return nil

}

 

func (c *EtcdHelper) SetValue(key string, value string) error {

ctx, cancel := context.WithTimeout(context.Background(), c.RequestTimeout)

defer cancel()

 

_, err := c.Client.Put(ctx, key, value)

if err != nil {

fmt.Printf("%s\\n", err.Error())

return err

}

 

return nil

}

 

func (c *EtcdHelper) GetValue(key string) []EtcdData {

ctx, cancel := context.WithTimeout(context.Background(), c.RequestTimeout)

defer cancel()

resp, err := c.Client.Get(ctx, key, clientv3.WithPrefix())

if err != nil {

fmt.Printf("%s\\n", err.Error())

return nil

}

 

var kv_slice []EtcdData

for _, ev := range resp.Kvs {

//fmt.Printf("%s : %s\\n", ev.Key, ev.Value)

 

kv := EtcdData{string(ev.Key), string(ev.Value)}

kv_slice = append(kv_slice, kv)

}

 

return kv_slice

}

 

func (c *EtcdHelper) DelValue(key string) error {

ctx, cancel := context.WithTimeout(context.Background(), c.RequestTimeout)

defer cancel()

_, err := c.Client.Delete(ctx, key, clientv3.WithPrefix())

if err != nil {

fmt.Printf("%s\\n", err.Error())

return err

}

 

 return nil

}

 

func (c *EtcdHelper) Watch(key string) {

rch := c.Client.Watch(context.Background(), key, clientv3.WithPrefix())

for wresp := range rch {

for _, ev := range wresp.Events {

fmt.Printf("watch %s %q : %q\\n", ev.Type, ev.Kv.Key, ev.Kv.Value)

}

}

}

 

func (c *EtcdHelper) Listen(key string) clientv3.WatchChan {

return c.Client.Watch(context.Background(), key, clientv3.WithPrefix())

}

5.2 controller 的代码实现

 

1.controller上线,下线功能
2.controller定时发送心跳包到etcd.
3.controller监听agent的变化.(1-3)完成服务注册与发现
4.controller通过下发配置到etcd,通知所有watch ingress_config变化的agent

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

type ControllerClient struct {

Period   time.Duration

Name     string

IP       string

Helper   *EtcdHelper

 

StopCha  chan int

 

//Lock     *sync.Mutex

}

 

func NewControllerClient(name string, host_ip string) *ControllerClient {

return &ControllerClient{

Period:  2,

Name:    name,

IP:      host_ip,

Helper:  NewEtcdHelper(),

StopCha: make(chan int, 10),

//Lock:    new(sync.Mutex),

}

}

 

func (cc *ControllerClient) Init(display bool) {

go func() {

cc.OnLine()

 

for {

select {

case

fmt.Printf("online goroutinue is exited.")

return

case

cc.OnLine()

}

}

}()

 

 

if display {

go func() {

watch_chan := cc.Helper.Listen("/ingress/agent")

for wresp := range watch_chan {

for _, ev := range wresp.Events {

fmt.Printf("watch %s %q : %q\\n", ev.Type, ev.Kv.Key, ev.Kv.Value)

}

}

}()

}

}

 

func (cc *ControllerClient) OnLine() {

key := fmt.Sprintf("/ingress/controller/%s", cc.Name)

 

//cc.Lock.Lock()

//defer cc.Lock.Unlock()

err := cc.Helper.PutValue(key, "1", 5)

 

if err != nil  {

fmt.Printf(err.Error())

}

}

 

func (cc *ControllerClient) OffLine() {

close(cc.StopCha)

 

key := fmt.Sprintf("/ingress/controller/%s", cc.Name)

 

//cc.Lock.Lock()

//defer cc.Lock.Unlock()

err := cc.Helper.DelValue(key)

 

if err != nil  {

fmt.Printf(err.Error())

}

}

 

func (cc *ControllerClient) GetIngressConfig(env_uuid string, uuid string) []EtcdData {

//TODO. first save to mysql.

key := fmt.Sprintf("/ingress/ingress_config/%s/%s", env_uuid, uuid)

//cc.Lock.Lock()

//defer cc.Lock.Unlock()

return cc.Helper.GetValue(key)

 

 

}

 

func (cc *ControllerClient) SetIngressConfig(env_uuid string, uuid string, config string) {

//TODO. first save to mysql.

 

key := fmt.Sprintf("/ingress/ingress_config/%s/%s", env_uuid, uuid)

//cc.Lock.Lock()

//defer cc.Lock.Unlock()

err := cc.Helper.SetValue(key, config)

 

if err != nil  {

fmt.Printf(err.Error())

}

}

 

func (cc *ControllerClient) DelIngressConfig(env_uuid string, uuid string) {

//TODO. first update to mysql.

 

key := fmt.Sprintf("/ingress/ingress_config/%s/%s", env_uuid, uuid)

 

//cc.Lock.Lock()

//defer cc.Lock.Unlock()

err := cc.Helper.DelValue(key)

 

if err != nil {

fmt.Printf(err.Error())

}

}

 

5.3 agent代码实现

 

1.agent上线,下线功能
2.agent定时发送心跳包到etcd.
3.agent监听(watch) controller的变化.(1-3)完成服务注册与发现
4.agnet监听(watch) ingress_config变化的agent,实时完成更新或设置配置,删除配置功能.

 

 

 

////////////////////////////////////////////////////////////////////////////////////////////

type AgentClient struct {

LivePeriod         time.Duration

FirstConfigPerid   time.Duration

SyncConfigPeriod   time.Duration

 

 

Name         string

EnvUUID      string

IP           string

Helper       *EtcdHelper

 

StopCha      chan struct{}

}

 

func NewAgentClient(name string, env_uuid string, host_ip string) *AgentClient {

return &AgentClient{

LivePeriod:         2,

FirstConfigPerid:   3,

SyncConfigPeriod:   60,

 

 

Name:           name,

EnvUUID:        env_uuid,

IP:             host_ip,

Helper:         NewEtcdHelper(),

StopCha:        make(chan struct{}, 1),

}

}

 

func (ac *AgentClient) Init(display bool) {

//我还活着,不要干掉我.

go func() {

ac.OnLine()

 

for {

select {

case

return

case

ac.OnLine()

}

}

}()

 

//if display {

// go func() {

// watch_chan := cc.Helper.Listen("/ingress/agent")

// for wresp := range watch_chan {

// for _, ev := range wresp.Events {

// fmt.Printf("watch %s %q : %q\\n", ev.Type, ev.Kv.Key, ev.Kv.Value)

// }

// }

// }()

//}

 

//重启之后,第一次同步  定期同步.

//go func() {

//

// time.Sleep(time.Second * ac.FirstConfigPerid)

// ac.SyncIngressConfigs()

//

// for {

// select {

// case

// return

// case

// ac.SyncIngressConfigs()

// }

// }

//}()

 

if display {

//监听controller变化(等待处理掉线自动重连后,重监听)

go func() {

watch_chan := ac.Helper.Listen("/ingress/controller")

for wresp := range watch_chan {

for _, ev := range wresp.Events {

fmt.Printf("watch %s %q : %q\\n", ev.Type, ev.Kv.Key, ev.Kv.Value)

}

}

}()

}

 

//监听本环境下ingress_config的变化(等待处理掉线自动重连重监听)

go func() {

key := fmt.Sprintf("/ingress/ingress_config/%s", ac.EnvUUID)

watch_chan := ac.Helper.Listen(key)

for wresp := range watch_chan {

for _, ev := range wresp.Events {

fmt.Printf("watch %s %q : %q\\n", ev.Type, ev.Kv.Key, ev.Kv.Value)

 

switch ev.Type.String() {

case "PUT":

fmt.Printf("agent=%s SetIngressConfig(%s, %s)\\n", ac.Name, ev.Kv.Key, ev.Kv.Value)

//TODO: SetIngressConfig(key, value)

break

case "DELETE":

fmt.Printf("agent=%s DelIngressConfig(%s)\\n", ac.Name,  ev.Kv.Key)

//TODO: DelIngressConfig(key)

break

}

}

}

}()

}

 

func (ac *AgentClient) OnLine() {

key := fmt.Sprintf("/ingress/agent/%s/%s", ac.EnvUUID, ac.Name)

err := ac.Helper.PutValue(key, fmt.Sprintf(`{"name":"%s", "env_uuid":"%s", "ip":"%s"}`, ac.Name, ac.EnvUUID, ac.IP), 5)

if err != nil  {

fmt.Printf(err.Error())

}

}

 

func (ac *AgentClient) OffLine() {

//ac.StopCha

close(ac.StopCha)

 

key := fmt.Sprintf("/ingress/agent/%s/%s", ac.EnvUUID, ac.Name)

err := ac.Helper.DelValue(key)

if err != nil  {

fmt.Printf(err.Error())

}

}

 

func (ac *AgentClient) UpdateIngressStatus(uuid string) {

key := fmt.Sprintf("/ingress/ingress_config_status/%s/%s/%s", ac.EnvUUID, uuid, ac.Name)

err := ac.Helper.DelValue(key)

if err != nil  {

fmt.Printf(err.Error())

}

}

 

//服务重启之后,第一次先调用 并用 定时同步

func (ac *AgentClient) SyncIngressConfigs() {

key := fmt.Sprintf("/ingress/ingress_config/%s", ac.EnvUUID)

kv_slice := ac.Helper.GetValue(key)

if kv_slice != nil {

//TODO: ingressConfig.SyncIngressConfigs(kv_slice)

for _, kv := range kv_slice {

fmt.Printf("name=%s, key:%s-----value:%s\\n", ac.Name, kv.Key, kv.Value)

}

}

}

 

 

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

func main() {

 

controller1 := NewControllerClient("dev-7_001", "192.168.0.10")

controller1.Init(false)

controller2 := NewControllerClient("dev-8_002", "192.168.0.11")

controller2.Init(false)

controller3 := NewControllerClient("dev-8_003", "192.168.0.12")

controller3.Init(false)

 

 

agent1 := NewAgentClient("dev-1_001", "1", "192.168.0.1")

agent1.Init(false)

 

agent2 := NewAgentClient("dev-2_001", "1", "192.168.0.2")

agent2.Init(false)

 

agent3 := NewAgentClient("dev-3_001", "1", "192.168.0.3")

agent3.Init(false)

 

agent4 := NewAgentClient("dev-4_001", "1", "192.168.0.4")

agent4.Init(false)

 

agent5 := NewAgentClient("dev-5_001", "1", "192.168.0.5")

agent5.Init(false)

 

agent6 := NewAgentClient("dev-6_001", "1", "192.168.0.6")

agent6.Init(false)

 

agent7 := NewAgentClient("dev-7_001", "1", "192.168.0.7")

agent7.Init(false)

 

agent8 := NewAgentClient("dev-8_001", "1", "192.168.0.8")

agent8.Init(false)

 

agent9 := NewAgentClient("dev-9_001", "1", "192.168.0.9")

agent9.Init(false)

 

agent10 := NewAgentClient("dev-10_001", "1", "192.168.0.10")

agent10.Init(false)

 

 

time.Sleep(time.Second*1)

controller3.SetIngressConfig("1", "0001", `{"config":"helloworld"}`)

controller3.DelIngressConfig("1", "0001")

 

controller3.SetIngressConfig("1", "0002", `{"config":"helloworld"}`)

controller3.DelIngressConfig("1", "0002")

 

controller3.SetIngressConfig("1", "0003", `{"config":"helloworld"}`)

controller3.DelIngressConfig("1", "0003")

 

controller3.SetIngressConfig("1", "0004", `{"config":"helloworld"}`)

controller3.DelIngressConfig("1", "0004")

 

controller3.SetIngressConfig("1", "0005", `{"config":"helloworld"}`)

controller3.DelIngressConfig("1", "0005")

 

 

 

 

forever := make(chan struct{})

}

12-21 14:23