一、搭建etcd环境
我将使用 etcd v3版本, so,本地先建个单机版本ETCD环境。
docker pull xieyanze/etcd3:latest
docker run —name etcd-v3.0.9 -d -v /tmp:/data \\
-p 2379:2379 -p 2380:2380 -p 4001:4001 -p 7001:7001 xieyanze/etcd3:latest
docker exec -it etcd-v3.0.9 sh
当前默认还是v2版本通过设定环境变量export ETCDCTL_API=3,设置成V3版本。
export ETCDCTL_API=3
etcdctl put /test/ok 11
etcdctl put /test/ok 22
etcdctl del /test/gg
#删除所有/test前缀的节点
etcdctl del /test --prefix
etcdctl get /test/ok
# 前缀查询
etcdctl get /test/ok --prefix
二.软件逻辑结构
1. k8s master cluster
dev-7
dev-8
2. k8s slave cluster 1 env1
dev-1
dev-2
dev-3
3. k8s slave cluster 2 env2
dev-4
dev-5
dev-6
三. controller 与 agent 服务注册与发现
实现原理:
注意: etcd v3版本, k/v 的超时间时TTL最小5秒种.
1.每2秒钟,每个服务向etcd发送一次心跳包,证明自己还活着.
2.当服务退出时, 主动删除etcd的key. 或者等到TTL超时之后,自动下线.
3.controller需要获得agent的状态,直接GET [ingress/agent/${env_uuid}/]就能获得当前agent在线状态
4.agent需要获得controller的状态,直接GET [ingress/controller]就能获得当前controller在线状态
controller 目录
目录 | 值 | TTL |
ingress/controller/dev7_xxx | {"ip":xxx} | 5 |
ingress/controller/dev8_xxx | {"ip":xxx} | 5 |
agent 目录
目录 | 值 | TTL |
ingress/agent/env1/dev1_xxx | {"ip":xxx} | 5 |
ingress/agent/env1/dev2_xxx | {"ip":xxx} | 5 |
ingress/agent/env1/dev3_yyy | {"ip":xxx} | 5 |
ingress/agent/env2/dev4_xxx | {"ip":xxx} | 5 |
ingress/agent/env2/dev5_xxx | {"ip":xxx} | 5 |
ingress/agent/env2/dev6_yyy | {"ip":xxx} | 5 |
四.软件业务的实现.
4.1 controller side:
1. 客户端调用controller restful api.controller 直接写入ETCD,同时写入副本到mysql.
2. controller 如果关注于agent的变化.只需要watch ingress/agent这个目录
3. controller 是无状态,不需要同步多个实例之间的数据,可以任意的scale它的实例数.
4. 如果controller挂掉之后,重启加载mysql的数据库同步到etcd中.
4.2 controller需要了解规则执行状态
etcd 目录
目录 | 值 | TTL |
ingress/ingress_config/env1/${config_uuid1}/status/dev1_xxx | 1 | 5 |
ingress/ingress_config/env1/${config_uuid1}/status/dev2_xxx | 1 | 5 |
ingress/ingress_config/env1/${config_uuid1}/status/dev3_xxx | 1 | 5 |
ingress/ingress_config/env1/${config_uuid2}/status/dev1_xxx | 1 | 5 |
ingress/ingress_config/env1/${config_uuid2}/status/dev2_xxx | 1 | 5 |
ingress/ingress_config/env1/${config_uuid2}/status/dev3_xxx | 1 | 5 |
ingress/ingress_config/env2/${config_uuid3}/status/dev4_xxx | 1 | 5 |
ingress/ingress_config/env2/${config_uuid3}/status/dev5_xxx | 1 | 5 |
ingress/ingress_config/env2/${config_uuid3}/status/dev6_xxx | 1 | 5 |
agent的执行状态直接写入配置状态中,
先获得当前ingress/agent/env1目录下的agent列表,对比ingress/ingress_agent/env1/${config_uuid1}/status目录下的规则完成之后反馈列表, 每一个都存在时,则全部执行成功.
4.3 agent side:
1. 不同集群agent 通过etcd的watch功能在第一时间,获得监听到所有数据的变化 新建,删除,更新
2. 不同集群agent 定时3分钟获得自已环境下的列表信息,同步处理相关信息
3. 如果agent挂了之后, 重启加载一次etcd中所有的ingress_conifg.
五. 代码实例
5.1 etcd clientv3 的封装
1.连接管理,支持TLS
2.增,删,查, 支持自动超时的设值
3.watch 监听目录或KEY的值的变化(PUT,DELETE)
package main
import (
"fmt"
"time"
// "github.com/coreos/etcd/pkg/transport"
"github.com/coreos/etcd/clientv3"
"golang.org/x/net/context"
// "sync"
)
type EtcdData struct {
Key string
Value string
}
type EtcdHelper struct {
RequestTimeout time.Duration
Client *clientv3.Client
}
func NewEtcdHelper() *EtcdHelper {
//tlsInfo := transport.TLSInfo{
// CertFile: "/tmp/test-certs/test-name-1.pem",
// KeyFile: "/tmp/test-certs/test-name-1-key.pem",
// TrustedCAFile: "/tmp/test-certs/trusted-ca.pem",
//}
//tlsInfo := transport.TLSInfo{
// CertFile: "./tls/apiserver.crt",
// KeyFile: "./tls/apiserver.key",
//}
//tlsConfig, err := tlsInfo.ClientConfig()
//if err != nil {
// fmt.Printf("%s", err.Error())
// return nil
//}
//cli, err := clientv3.New(clientv3.Config{
// Endpoints: []string{"dev-7:2379"},
// DialTimeout: 3 * time.Second,
// TLS: tlsConfig,
//})
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
//Endpoints: []string{"http://dev-7:2379"},
DialTimeout: 3 * time.Second,
})
if err != nil {
fmt.Printf("%s", err.Error())
return nil
}
return &EtcdHelper{
RequestTimeout: 5 *time.Second,
Client: cli,
}
}
func (c *EtcdHelper) Release() {
if c.Client != nil {
c.Client.Close()
}
}
func (c *EtcdHelper) PutValue(key string, value string, ttl int64) error {
ctx, cancel := context.WithTimeout(context.Background(), c.RequestTimeout)
defer cancel()
// minimum lease TTL is 5-second
resp, err := c.Client.Grant(context.TODO(), ttl)
if err != nil {
fmt.Printf("%s\\n", err.Error())
return err
}
_, err = c.Client.Put(ctx, key, value, clientv3.WithLease(resp.ID))
if err != nil {
fmt.Printf("%s\\n", err.Error())
return err
}
return nil
}
func (c *EtcdHelper) SetValue(key string, value string) error {
ctx, cancel := context.WithTimeout(context.Background(), c.RequestTimeout)
defer cancel()
_, err := c.Client.Put(ctx, key, value)
if err != nil {
fmt.Printf("%s\\n", err.Error())
return err
}
return nil
}
func (c *EtcdHelper) GetValue(key string) []EtcdData {
ctx, cancel := context.WithTimeout(context.Background(), c.RequestTimeout)
defer cancel()
resp, err := c.Client.Get(ctx, key, clientv3.WithPrefix())
if err != nil {
fmt.Printf("%s\\n", err.Error())
return nil
}
var kv_slice []EtcdData
for _, ev := range resp.Kvs {
//fmt.Printf("%s : %s\\n", ev.Key, ev.Value)
kv := EtcdData{string(ev.Key), string(ev.Value)}
kv_slice = append(kv_slice, kv)
}
return kv_slice
}
func (c *EtcdHelper) DelValue(key string) error {
ctx, cancel := context.WithTimeout(context.Background(), c.RequestTimeout)
defer cancel()
_, err := c.Client.Delete(ctx, key, clientv3.WithPrefix())
if err != nil {
fmt.Printf("%s\\n", err.Error())
return err
}
return nil
}
func (c *EtcdHelper) Watch(key string) {
rch := c.Client.Watch(context.Background(), key, clientv3.WithPrefix())
for wresp := range rch {
for _, ev := range wresp.Events {
fmt.Printf("watch %s %q : %q\\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
}
func (c *EtcdHelper) Listen(key string) clientv3.WatchChan {
return c.Client.Watch(context.Background(), key, clientv3.WithPrefix())
}
5.2 controller 的代码实现
1.controller上线,下线功能
2.controller定时发送心跳包到etcd.
3.controller监听agent的变化.(1-3)完成服务注册与发现
4.controller通过下发配置到etcd,通知所有watch ingress_config变化的agent
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
type ControllerClient struct {
Period time.Duration
Name string
IP string
Helper *EtcdHelper
StopCha chan int
//Lock *sync.Mutex
}
func NewControllerClient(name string, host_ip string) *ControllerClient {
return &ControllerClient{
Period: 2,
Name: name,
IP: host_ip,
Helper: NewEtcdHelper(),
StopCha: make(chan int, 10),
//Lock: new(sync.Mutex),
}
}
func (cc *ControllerClient) Init(display bool) {
go func() {
cc.OnLine()
for {
select {
case
fmt.Printf("online goroutinue is exited.")
return
case
cc.OnLine()
}
}
}()
if display {
go func() {
watch_chan := cc.Helper.Listen("/ingress/agent")
for wresp := range watch_chan {
for _, ev := range wresp.Events {
fmt.Printf("watch %s %q : %q\\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
}()
}
}
func (cc *ControllerClient) OnLine() {
key := fmt.Sprintf("/ingress/controller/%s", cc.Name)
//cc.Lock.Lock()
//defer cc.Lock.Unlock()
err := cc.Helper.PutValue(key, "1", 5)
if err != nil {
fmt.Printf(err.Error())
}
}
func (cc *ControllerClient) OffLine() {
close(cc.StopCha)
key := fmt.Sprintf("/ingress/controller/%s", cc.Name)
//cc.Lock.Lock()
//defer cc.Lock.Unlock()
err := cc.Helper.DelValue(key)
if err != nil {
fmt.Printf(err.Error())
}
}
func (cc *ControllerClient) GetIngressConfig(env_uuid string, uuid string) []EtcdData {
//TODO. first save to mysql.
key := fmt.Sprintf("/ingress/ingress_config/%s/%s", env_uuid, uuid)
//cc.Lock.Lock()
//defer cc.Lock.Unlock()
return cc.Helper.GetValue(key)
}
func (cc *ControllerClient) SetIngressConfig(env_uuid string, uuid string, config string) {
//TODO. first save to mysql.
key := fmt.Sprintf("/ingress/ingress_config/%s/%s", env_uuid, uuid)
//cc.Lock.Lock()
//defer cc.Lock.Unlock()
err := cc.Helper.SetValue(key, config)
if err != nil {
fmt.Printf(err.Error())
}
}
func (cc *ControllerClient) DelIngressConfig(env_uuid string, uuid string) {
//TODO. first update to mysql.
key := fmt.Sprintf("/ingress/ingress_config/%s/%s", env_uuid, uuid)
//cc.Lock.Lock()
//defer cc.Lock.Unlock()
err := cc.Helper.DelValue(key)
if err != nil {
fmt.Printf(err.Error())
}
}
5.3 agent代码实现
1.agent上线,下线功能
2.agent定时发送心跳包到etcd.
3.agent监听(watch) controller的变化.(1-3)完成服务注册与发现
4.agnet监听(watch) ingress_config变化的agent,实时完成更新或设置配置,删除配置功能.
////////////////////////////////////////////////////////////////////////////////////////////
type AgentClient struct {
LivePeriod time.Duration
FirstConfigPerid time.Duration
SyncConfigPeriod time.Duration
Name string
EnvUUID string
IP string
Helper *EtcdHelper
StopCha chan struct{}
}
func NewAgentClient(name string, env_uuid string, host_ip string) *AgentClient {
return &AgentClient{
LivePeriod: 2,
FirstConfigPerid: 3,
SyncConfigPeriod: 60,
Name: name,
EnvUUID: env_uuid,
IP: host_ip,
Helper: NewEtcdHelper(),
StopCha: make(chan struct{}, 1),
}
}
func (ac *AgentClient) Init(display bool) {
//我还活着,不要干掉我.
go func() {
ac.OnLine()
for {
select {
case
return
case
ac.OnLine()
}
}
}()
//if display {
// go func() {
// watch_chan := cc.Helper.Listen("/ingress/agent")
// for wresp := range watch_chan {
// for _, ev := range wresp.Events {
// fmt.Printf("watch %s %q : %q\\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
// }
// }
// }()
//}
//重启之后,第一次同步 和 定期同步.
//go func() {
//
// time.Sleep(time.Second * ac.FirstConfigPerid)
// ac.SyncIngressConfigs()
//
// for {
// select {
// case
// return
// case
// ac.SyncIngressConfigs()
// }
// }
//}()
if display {
//监听controller变化(等待处理掉线自动重连后,重监听)
go func() {
watch_chan := ac.Helper.Listen("/ingress/controller")
for wresp := range watch_chan {
for _, ev := range wresp.Events {
fmt.Printf("watch %s %q : %q\\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
}()
}
//监听本环境下ingress_config的变化(等待处理掉线自动重连, 重监听)
go func() {
key := fmt.Sprintf("/ingress/ingress_config/%s", ac.EnvUUID)
watch_chan := ac.Helper.Listen(key)
for wresp := range watch_chan {
for _, ev := range wresp.Events {
fmt.Printf("watch %s %q : %q\\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
switch ev.Type.String() {
case "PUT":
fmt.Printf("agent=%s SetIngressConfig(%s, %s)\\n", ac.Name, ev.Kv.Key, ev.Kv.Value)
//TODO: SetIngressConfig(key, value)
break
case "DELETE":
fmt.Printf("agent=%s DelIngressConfig(%s)\\n", ac.Name, ev.Kv.Key)
//TODO: DelIngressConfig(key)
break
}
}
}
}()
}
func (ac *AgentClient) OnLine() {
key := fmt.Sprintf("/ingress/agent/%s/%s", ac.EnvUUID, ac.Name)
err := ac.Helper.PutValue(key, fmt.Sprintf(`{"name":"%s", "env_uuid":"%s", "ip":"%s"}`, ac.Name, ac.EnvUUID, ac.IP), 5)
if err != nil {
fmt.Printf(err.Error())
}
}
func (ac *AgentClient) OffLine() {
//ac.StopCha
close(ac.StopCha)
key := fmt.Sprintf("/ingress/agent/%s/%s", ac.EnvUUID, ac.Name)
err := ac.Helper.DelValue(key)
if err != nil {
fmt.Printf(err.Error())
}
}
func (ac *AgentClient) UpdateIngressStatus(uuid string) {
key := fmt.Sprintf("/ingress/ingress_config_status/%s/%s/%s", ac.EnvUUID, uuid, ac.Name)
err := ac.Helper.DelValue(key)
if err != nil {
fmt.Printf(err.Error())
}
}
//服务重启之后,第一次先调用 并用 定时同步
func (ac *AgentClient) SyncIngressConfigs() {
key := fmt.Sprintf("/ingress/ingress_config/%s", ac.EnvUUID)
kv_slice := ac.Helper.GetValue(key)
if kv_slice != nil {
//TODO: ingressConfig.SyncIngressConfigs(kv_slice)
for _, kv := range kv_slice {
fmt.Printf("name=%s, key:%s-----value:%s\\n", ac.Name, kv.Key, kv.Value)
}
}
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func main() {
controller1 := NewControllerClient("dev-7_001", "192.168.0.10")
controller1.Init(false)
controller2 := NewControllerClient("dev-8_002", "192.168.0.11")
controller2.Init(false)
controller3 := NewControllerClient("dev-8_003", "192.168.0.12")
controller3.Init(false)
agent1 := NewAgentClient("dev-1_001", "1", "192.168.0.1")
agent1.Init(false)
agent2 := NewAgentClient("dev-2_001", "1", "192.168.0.2")
agent2.Init(false)
agent3 := NewAgentClient("dev-3_001", "1", "192.168.0.3")
agent3.Init(false)
agent4 := NewAgentClient("dev-4_001", "1", "192.168.0.4")
agent4.Init(false)
agent5 := NewAgentClient("dev-5_001", "1", "192.168.0.5")
agent5.Init(false)
agent6 := NewAgentClient("dev-6_001", "1", "192.168.0.6")
agent6.Init(false)
agent7 := NewAgentClient("dev-7_001", "1", "192.168.0.7")
agent7.Init(false)
agent8 := NewAgentClient("dev-8_001", "1", "192.168.0.8")
agent8.Init(false)
agent9 := NewAgentClient("dev-9_001", "1", "192.168.0.9")
agent9.Init(false)
agent10 := NewAgentClient("dev-10_001", "1", "192.168.0.10")
agent10.Init(false)
time.Sleep(time.Second*1)
controller3.SetIngressConfig("1", "0001", `{"config":"helloworld"}`)
controller3.DelIngressConfig("1", "0001")
controller3.SetIngressConfig("1", "0002", `{"config":"helloworld"}`)
controller3.DelIngressConfig("1", "0002")
controller3.SetIngressConfig("1", "0003", `{"config":"helloworld"}`)
controller3.DelIngressConfig("1", "0003")
controller3.SetIngressConfig("1", "0004", `{"config":"helloworld"}`)
controller3.DelIngressConfig("1", "0004")
controller3.SetIngressConfig("1", "0005", `{"config":"helloworld"}`)
controller3.DelIngressConfig("1", "0005")
forever := make(chan struct{})
}