.背景

 

k8s分布式系统中,通迅成为重要的部分。 本文分享一下如何使用通迅中间件。

本文代码相关技术如下:

rabbitmq

redis

golang

 

k8s集群与集群之间通讯, 我们都可以使用相同的中间件rabbitmq


本文使用最简单的模式 LB,单实例的RPC调用。

 

.分布式调用结构

 

2.1 rabbitmq lb模式调用

 

 

 

 

. agent1 agent2 agent3 同时上报自己在线时, rabbitmq自动调用 controller1  controller2 中其中一个实例,再由controller X 写入redis中去。当controller1controller2需要所有agent状态时, 读取数据都是redis,所以都是一致的。

 

. agent1 agent2 agent3 获取配置信息时, rabbitmq也自动调用controller1 controller2 其中一个实例。再由controller X 读取redis或者mysql数据,再返回给agent。不论是调用到controller1还是controller2,返回的数据都是一致。

 


2.2 rabbitmq 单实例模式调用

 

 

 

controller 实例下发配置信息时:

 

setp1. 获得当前在线的agent

setp2. 单实例模式rpc调用。向所有的agent发送配置信息。

setp3. 可以明确了解有没有agent时下发配置失败的。如果都失败,则本次调用失败.。只要有一个失败,就可以认为需要重发一次命令。

 

.代码实现

 

3.1. rabbitmq rpc 调用 客户端实现

 

package ingress

 

import (

"fmt"

"time"

"context"

"github.com/wzhliang/xing"

"wise2c/wisecloud-ingress-agent/communicate"

"wise2c/wisecloud-ingress-agent/log"

"wise2c/wisecloud-ingress-agent/common"

 

)

 

type ControllerClient struct {

Producer  *xing.Client

Client    communicate.ControllerHelperClient

}

 

func NewControllerClient() *ControllerClient {

 

agent := &ControllerClient{}

 

//amqp_url := "amqp://guest:guest@localhost:5672/"

amqp_url := fmt.Sprintf("amqp://%s:%s@%s:%d",

common.MQUser,

common.MQPassword,

common.MQHost,

common.MQPort,

)

 

var err error

agent.Producer, err = xing.NewClient(

globalRPCAgentName,

amqp_url,

xing.SetIdentifier(&xing.NoneIdentifier{}),

xing.SetSerializer(&xing.JSONSerializer{}),

)

if err != nil {

log.Error("xing.NewClient() is failed.%s", err.Error())

return agent

}

 

//LB RPC

target := fmt.Sprint("ingress.controller")

agent.Client = communicate.NewControllerHelperClient(target, agent.Producer)

 

return agent

}

 

func (this *ControllerClient) Close() {

if this.Producer == nil {

return

}

 

this.Producer.Close()

//this.closed = true

}

 

func (this *ControllerClient) OnlineAgent(name string) error {


ctx, cancel := context.WithTimeout(context.Background(), 5000*time.Millisecond)

defer cancel()

 

log.Debug("OnlineAgent(%s)", name)

_, err := this.Client.OnlineAgent(ctx,

&communicate.OnlineAgentRequest{Name: name, })

if err != nil {

return err

}

 

return nil

}

 

func (this *ControllerClient) GetIngressConfigs(uuid string) (string, error){


ctx, cancel := context.WithTimeout(context.Background(), 5000*time.Millisecond)

defer cancel()

 

response, err := this.Client.GetIngressConfigs(ctx,

&communicate.GetIngressConfigsRequest{

Uuid:   uuid,

})

if err != nil {

return "error", err

}

 

return response.Content, err

}

 

 

func AgentHeartbeatToController() {

if globalControllerClient == nil {

return

}

 

var err error

content := ""

for {

 

err = globalControllerClient.OnlineAgent(globalRPCAgentName)

if err != nil {

log.Error(err.Error())

}

 

//1 time / 2 second.

time.Sleep(time.Millisecond*2000)

}

}

 

3.2. rabbitmq rpc 调用 服务端实现

 

package ingress

 

import (

"fmt"

"context"

"github.com/wzhliang/xing"

"wise2c/wisecloud-ingress-controller/communicate"

"wise2c/wisecloud-ingress-controller/log"

"wise2c/wisecloud-ingress-controller/common"

)

 

type ControllerServerImp struct{}

 

func (g *ControllerServerImp) OnlineAgent(ctx context.Context, req *communicate.OnlineAgentRequest, rsp *communicate.Void) error {

log.Debug("OnlineAgent(%s)", req.Name)

 

err := globalAgentClient.manager.OnlineAgent(req.Name)

if err != nil {

return err

}

return nil

}

 

func (g *ControllerServerImp) GetIngressConfigs(ctx context.Context, req *communicate.GetIngressConfigsRequest, rsp *communicate.GetIngressConfigsResponse) error {

 

log.Info("GetIngressConfigs(%s)", req.Uuid)

rsp.Content = "ok"

 

return nil

}

 

func RunRPCServer() {

//globalRPCControllerName = fmt.Sprintf("host.controller.%s", common.GetGuid())

 

//LB RPC.

globalRPCControllerName = fmt.Sprintf("ingress.controller")

 

//amqp_url := "amqp://guest:guest@localhost:5672/"

amqp_url := fmt.Sprintf("amqp://%s:%s@%s:%d",

common.MQUser,

common.MQPassword,

common.MQHost,

common.MQPort,

)

 

svc, err := xing.NewService(

globalRPCControllerName,

amqp_url,

xing.SetSerializer(&xing.JSONSerializer{}),

xing.SetBrokerTimeout(155),

)

if err != nil {

log.Error(fmt.Sprintf("MQURL=%s NewService is failed. %s", amqp_url, err.Error()))

}

 

communicate.RegisterControllerHelperHandler(svc, &ControllerServerImp{})

 

log.Info("RPC Server is starting. Connect to the rabbitmq[%s].", amqp_url)

err = svc.Run()

if err != nil {

log.Error(err.Error())

}

}

 

 

3.3. rabbitmq 单实例rpc调用 客户端实现

1. rpc lb调用实时上报agent是否在线,实现了类似consul的服务发现的功能.

2. ClientManager可以通过redis中的实时数据,管理所有的rpc client.  agent 下线,或者3秒之间没有上报状态,则清除指定的rpc client.

3. 这样每次下发配置时,可以实时发送到每个rpc单实例服务器实例.

 

package ingress

 

import (

"fmt"

"sync"

"time"

"errors"

"context"

"github.com/wzhliang/xing"

"github.com/astaxie/beego/utils"

"wise2c/wisecloud-ingress-controller/communicate"

"wise2c/wisecloud-ingress-controller/log"

"wise2c/wisecloud-ingress-controller/common"

)

 

type AgentHelper struct {

mutex         *sync.Mutex

closed       bool

 

Helper        communicate.AgentHelperClient

Producer      *xing.Client

}

 

func NewAgentHelper(agent_name string) *AgentHelper {

var err error

 

agent := &AgentHelper{

mutex:   new(sync.Mutex),

closed:  false,

}

 

//amqp_url := "amqp://guest:guest@localhost:5672/"

amqp_url := fmt.Sprintf("amqp://%s:%s@%s:%d",

common.MQUser,

common.MQPassword,

common.MQHost,

common.MQPort,

)

 

agent.Producer, err = xing.NewClient(

globalRPCControllerName,

amqp_url,

xing.SetIdentifier(&xing.NoneIdentifier{}),

xing.SetSerializer(&xing.JSONSerializer{}),

)

if err != nil {

log.Error(fmt.Sprintf("MQURL=%s NewClient is failed. %s",

amqp_url,

err.Error()))

return agent

}

 

//target := fmt.Sprint("ingress.agent.%s", agent_name)

agent.Helper = communicate.NewAgentHelperClient(agent_name, agent.Producer)

 

return agent

}

 

func (this *AgentHelper) Close() {

this.mutex.Lock()

defer this.mutex.Unlock()

 

if this.Producer != nil {

this.Producer.Close()

}

this.closed = true

}

 

func (this *AgentHelper) SetIngressConfig(content string) error  {

this.mutex.Lock()

defer this.mutex.Unlock()

 

if this.closed {

return errors.New("the client is closed.")

}

 

ctx, cancel := context.WithTimeout(context.Background(), 5000*time.Millisecond)

defer cancel()

 

log.Info("SetIngressConfig(%s)", content)

_, err := this.Helper.SetIngressConfig(ctx,

&communicate.SetIngressConfigRequest{

content,

})

 

return err

}

 

func (this *AgentHelper) DelIngressConfig(uuid string) error {

this.mutex.Lock()

defer this.mutex.Unlock()

 

if this.closed {

return errors.New("the client is closed.")

}

 

ctx, cancel := context.WithTimeout(context.Background(), 5000*time.Millisecond)

defer cancel()

 

log.Info("DelIngressConfig(%s)", uuid)

_, err := this.Helper.DelIngressConfig(ctx,

&communicate.DelIngressConfigRequest{

uuid,

})

 

return err

}

 

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

type ClientManager struct {

mutex           *sync.Mutex

Pool   *utils.BeeMap

}

 

func NewClientManager() *ClientManager {

return &ClientManager{

mutex:      new(sync.Mutex),

Pool:       utils.NewBeeMap(),

}

}

 

func (p *ClientManager) Init() {

go p.RunConnect()

}

 

func (p *ClientManager) RunConnect() {

for {

names, err := globalRedisClient.GetAgentNames()

if err != nil {

log.Error(err.Error())

}

 

name_map := map[interface{}]int{}

for _, name := range names {

name_map[name] = 1

}

 

for key, v := range p.Pool.Items() {

//log.Error("key=%s", key)

_, ok := name_map[key]

if ok {

//log.Warning("find the %s", key)

continue

}

 

if v != nil {

log.Warning("Close the AgentHelper %s", key)

v.(*AgentHelper).Close()

}

 

log.Warning("Delete the Pool %s", key)

p.Pool.Delete(key)

}

 

for _, name := range names {

if !p.Pool.Check(name) {

log.Warning("New the AgentHelper %s", name)

p.Pool.Set(name, NewAgentHelper(name))

}

}

 

 

time.Sleep(time.Second * 1)

}

}

 

func (p *ClientManager) GetClients() map[interface{}]interface{} {

return p.Pool.Items()

}

 

func (p *ClientManager) OnlineAgent(agent_name string) error {

p.mutex.Lock()

defer p.mutex.Unlock()

 

globalRedisClient.OnlineAgent(agent_name, "1")

 

if !p.Pool.Check(agent_name) {

p.Pool.Set(agent_name, NewAgentHelper(agent_name))

}

 

return nil

}

 

func (p *ClientManager) OfflineAgent(agent_name string) error {

p.mutex.Lock()

defer p.mutex.Unlock()

 

globalRedisClient.OfflineAgent(agent_name)

 

agent_helper :=  p.Pool.Get(agent_name)

if agent_helper != nil {

agent_helper.(*AgentHelper).Close()

p.Pool.Delete(agent_name)

}

 

return nil

}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

type AgentClient struct {

manager *ClientManager

}

 

func NewAgentClient() *AgentClient {

return &AgentClient{

manager:    NewClientManager(),

}

}

 

func (client *AgentClient) Init() {

client.manager.Init()

}

 

type AgentHandlerCallback func(request, response interface{}, helper *AgentHelper) error

 

func (client *AgentClient) AgentHandler(request, response interface{}, callback AgentHandlerCallback) (err error) {

maps := client.manager.GetClients()

 

count := 0

for k, v := range maps {

name := k.(string)

if v == nil {

continue

}

 

helper := v.(*AgentHelper)

if callback == nil {

continue

}

 

err = callback(request, response, helper)

if err != nil {

return errors.New(fmt.Sprintf("%s %s", name, err.Error()))

}

 

count += 1

}

 

if count > 0 {

return nil

}

 

return errors.New("no agent online.")

}

 

func (this *AgentClient) SetIngressConfig(request, response interface{}) error {

return this.AgentHandler(request, response, func(request, response interface{}, helper *AgentHelper) error {

return helper.SetIngressConfig(request.(string))

})

}

 

func (this *AgentClient) DelIngressConfig(request, response interface{}) error {

return this.AgentHandler(request, response, func(request, response interface{}, helper *AgentHelper) error {

return helper.DelIngressConfig(request.(string))

})

}

 

 


3.4. rabbitmq 单实例rpc调用.服务端实现

 

package ingress

 

import (

"fmt"

"context"

"github.com/wzhliang/xing"

"wise2c/wisecloud-ingress-agent/communicate"

"wise2c/wisecloud-ingress-agent/log"

"wise2c/wisecloud-ingress-agent/common"

)

 

 

type AgentServerImp struct{}

 

func (g *AgentServerImp) SetIngressConfig(ctx context.Context, req *communicate.SetIngressConfigRequest, rsp *communicate.Void) error {

log.Info("SetIngressConfig(%s)", req.Content)

 

config := &Wise2cIngressConfig{}

err := config.Parse([]byte(req.Content))

if err != nil {

log.Error(err.Error())

return err

}

 

globalIngressProcess.SetIngressConfig(config)

 

return nil

}

 

func (g *AgentServerImp) DelIngressConfig(ctx context.Context, req *communicate.DelIngressConfigRequest, rsp *communicate.Void) error {

log.Info("DelIngressConfig(%s)", req.Uuid)

 

globalIngressProcess.DelIngressConfig(req.Uuid)

 

return nil

}

 

func RunRPCServer() {

//amqp_url := "amqp://guest:guest@localhost:5672/"

amqp_url := fmt.Sprintf("amqp://%s:%s@%s:%d",

common.MQUser,

common.MQPassword,

common.MQHost,

common.MQPort,

)

 

svc, err := xing.NewService(

globalRPCAgentName,

amqp_url,

xing.SetSerializer(&xing.JSONSerializer{}),

)

if err != nil {

log.Error(fmt.Sprintf("MQURL=%s, NewService() is failed. %s", amqp_url,  err.Error()))

}

 

communicate.RegisterAgentHelperHandler(svc, &AgentServerImp{})

 

log.Info("RPC Server is starting. Connect to the rabbitmq[%s].", amqp_url)

go LoopRPC(svc)

}

 

func LoopRPC(svc *xing.Client) {

err := svc.Run()

if err != nil {

log.Error(err.Error())

}

}

 

.总结

 

● 通过 rabbitmq lb调用方式,可以实现从agent侧上报数据到controller侧或者agent侧拉取需要的数据。

 

● 通过rabbitmq 单实例调用方式,由于有了之前lb上报agent状态,或者使用第三方 consul.etcd中服务发现功能。我们可以实现从controller侧下发配置到每一个agent,在每个agent实例中完成相同的功能。

09-03 08:24