Master部分

1、master初始化

  • 以node name创建一个distributed logical router
  • 创建两个load balancer用于处理east-west traffic,一个处理TCP,另一个处理UDP
  • 创建一个名为"join"的logical switch用于连接gateway router和distributed router。"join"的IP地址范围为100.64.1.0/24
  • 将distributed router和"join"相连接

2、创建management port

这部分的主要内容为master node创建一个logical switch用于连接distributed router。这个switch仅有一个logical port(A OVS internal interface),它有如下用途:

  • 通过该logical port可以用私有IP访问其他node上运行的容器
  • 当在master node上创建该port时,集群中的容器就不用通过NAT也能访问k8s daemon了
  • node可以用pod的IP地址对pod进行health-check

具体操作步骤如下:

  • 创建一个router port,并且给它分配该local_subnet(该node分配到的IP地址区间)的第一个地址。
  • 创建一个logical switch并且设置它的子网范围,名字就为node_name
  • 将该logical switch连接到router上
  • 如果br-int不存在,则创建之,并且在它上面创建一个OVS internal interface,名字为"k8s-%s" % (node_name[:11])
  • 创建一个OVN logical port,名字为"k8s-" + node_name,之后再对上文的internal interface进行配置
  • 首先启动该interface,如果该interface之前就已经存在了,就删除其上的路由和IP,再给该interface配置IP,并添加到达整个集群的路由
  • 最后,将load balancer添加到logical switch上

Minion部分

对于minion部分,如果所在的平台不是windos,则先调用_linux_init()进行初始化配置,之后再创建management port,做法和master完全一样

1、_linux_init()初始化

这部分主要是对于CNI的配置,具体操作如下:

  • 首先找到ovn-k8s-cni-overlay所在的位置cni_plugin_path,如果cni插件的可执行文件CNI_LINK_PATH/ovn_cni不存在,则构造一个软链接,因此最终k8s调用的cni插件就是ovn-k8s-cni-overlay
  • 接着写入cni network的配置文件,直接创建CNI_CONF_PATH/10-net.conf文件,具体内容如下:
        data = {
"name": "net",
"type": "ovn_cni",
"bridge": "br-int",
"isGateway": "true",
"ipMasq": "false",
"ipam": {
"type": "host-local",
"subnet": args.minion_switch_subnet
}
}

  

 Gateway部分

  • 不能同时指定参数args.physical_interface和args.bridge_interface,两个参数代表了两种方式
  • 首先判断gateway router之前是否已经有被创建,如果没有的话,创建之,且名字为"GR_%s" % (node_name)
  • 接着再将gateway router和上文中的"join"相连接
  • 在gateway router中添加静态路由,将distributed router作为nexthop,而gateway和各个distributed router构成的子网为100.64.1.0/24,连接端口的ip地址通过generate_gateway_ip()生成
  • 在distributed router中添加静态路由,并且将第一个gateway router作为默认网关
  • 为每个gateway router创建两个north-south load-balancer
  • 创建一个external switch用于连接physical interface,名字为"ext_%s" % (node_name)
  • 当我们使用网桥方案,例如将eth0绑定到网桥breth0上,首先进行操作,保证网桥mac地址不变(因为通常有新端口加入时,网桥的端口会发生变化)
  • 设置iface_id为"%s_%s" % (args.bridge_interface, node_name)
  • 创建patch port将br-int和breth0相连
  • 在external_switch上创建一个external interface,名字就为上文中的iface_id,并且地址为"unknown",外部世界通过该端口相连
  • 将gateway router上添加端口,地址是breth0的mac地址,ip是breth0的ip,名字为"rtoe-" + gateway_router
  • 在gateway router上添加默认路由,网关为参数中指定的默认网关
  • 将external_switch和gateway router相连,且端口为"etor-" + gateway_router
  • 在gateway router上设置默认的SNAT

ovn-k8s-gateway-helper部分

  • 首先获取参数,例如--physical-bridge和--physical-interface,接着确认ovs.dirs.RUNDIR和ovs.dirs.LOGDIR的路径(一般为"/var/run/openvswitch"和"/var/log/openvswitch"),最后确定k8s api server
  • 确认physical-bridge是否存在,存在则调用["ovs-vsctl", "get", "interface", $physical_interface, "ofport"]获取physical interface,且命名为physical_interface_ofport
  • 创建一对patch port将physical bridge和br-int相连,再调用命令获取physical bridge上所在的patch port的ofport,且命名为br_int_ofport
  • 调用pool.spawn(_unixctl_run)创建一个"线程",其中_unixctl_run调用ovs.unixctl.server.UnixctlServer.Create(None)创建unixctls,并在一个while死循环中持续运行unixctl_server.run()
  • 调用函数add_conntrack_rules(),在physical bridge上创建如下的flow:
    • 在table 0中,对于来自pod并且发往外部世界的包,提交给connection,从而相反的流量能够返回pods
    • 在table 0中,对于来自外部世界的包,将它通过conntrack提交到table 1,从而了解连接的状态
    • 在table 1中,将established and related connections发往pod
    • 最后,其余的连接的action都为NORMAL
  • 接着调用syn_services():
    • 从api server获取所有的services
    • 遍历所有的services,对于类型不为"NodePort"的service直接跳过,如果service中没有“ports”也直接跳过
    • 如果"ports"中不存在"nodePort"也直接跳过,如果获取的"protocol"不为"udp"和"tcp"则直接跳过,否则将"%s_%s" % (protocol, port)添加到node_ports缓存中
  • 利用'ovs-ofctl dump-flows'命令获取缓存的flow,再将过时的与node ports相关的flow删除
  • 最后一个死循环,遍历service stream,并调用service_events(event):
    • 首先从中获取event type,service name,namespace以及service type等参数,如果service type不为"NodePort"则直接跳过
    • 以"%s_%s" % (namespace, service_name)为ket获取缓存,如果event_type不为"DELETE"且缓存存在,则直接返回
    • 获取service ports,遍历之,如果port中没有"nodePort"则直接跳过,再从中获取protocol,设置protocol_dst = "%s_dst" % (protocol),node_port_key = "%s_%s" % (protocol, port)
    • 若event type为"DELETE",则先将node_port_key从nodes_port_cache中删除,再删除相应的flow
    • 否则,对于其他类型,则先将node_port_key加入node_ports_cache,再添加flow,flow的内容为["add-flow", physical_bridge, "priority=100", "in_port=physical_interface_ofport", protocol, protocol_dst=port, "action=br_int_ofport"]

Watcher部分

1、首先确定ovs的rundir和logdir以及OVN ND unix socket的路径

2、接着,确定k8s api server是否运行, cluster router,两个load balancer是否已经创建

3、首先调用pool.spawn(conn_processor.run_processor)启动一个processor用于处理之后的各种event

4、调用_create_k8s_pod_watcher(),_create_k8s_service_watcher()和_create_k8s_endpoint_watcher()创建三个watcher,pod_watcher_inst的创建过程如下所示:

pod watcher:

  • 首先调用kubernetes.watch_pods(variable.K8S_API_SERVER)创建pod_stream
  • 调用_sync_k8s_pods()进行k8s pod和ovn配置的同步
    • 先调用mode = ovn_k8s.modes.overlay.OvnNB()创建对象用以操控OVN
    • 接着调用pods = kubernetes.get_all_pods(variable.K8S_API_SEVER)获取所有的pod信息
    • 若pods不为空,则调用mode.sync_pods()进行具体的操作,总的来说,syn_pods()就是将已被删除的pod的logical_port删除

      • 创建expected_logical_ports = set()
      • 遍历pods,创建logical_port = "%s_%s" % (namespace, pod_name)获取对应pod的logical_port,并将其加入expected_logical_ports。接着从annotations中获取ip_address,接着再调用self._add_k8s_l4_port_name_cache()将其加入cache中
      • 调用existing_logical_ports = ovn_nbctl("--data=bare", "--no-heading", "--columns=name", "find", "logical_switch_port", "external_ids:pod=true").split()获取之前保存的logical_port
      • 最后,遍历existing_logical_ports - expected_logical_ports将pod已被删除的logical_ports,调用ovn_nbclt()删除
  • 调用pool.spawn(_process_func, pod_watch_inst, _create_k8s_pod_watcher)创建一个"线程"启动监听,对于service和endpoint的watcher同理。第一个参数_process_func是一个函数,而后两个参数是该函数的参数
  • _process_func(watcher, watcher_recycle_func)非常简单,就是无限循环地调用调用watcher.process(),如果发生异常,则再次调用watcher = watcher_recycle_func()重启一个watcher
  • PodWatcher的process()方法非常简单,就是调用util.process_stream(self._pod_stream, self._process_pod_event)
  • process_stream(data_stream, event_callback)首先调用line = next(data_stream)获取一个pod信息,如果没有发生异常的话,就直接调用event_callback(json.load(line))
  • _process_pod_event(self, event)
    • 首先从event中获取pod_data = event['object'],即pod信息和event_type = event['type'],即事件类型
    • 再创建cache_key = "%s_%s" % (namespace, pod_name),其实就是logical_port,再调用self._update_pod_cache(event_type, cache_key, pod_data)更新pod cache
    • 如果之前没有该pod的缓存,或者event_type为"DELETE",则调用self._send_connectivity_event(event_type, pod_name, pod_data)
    • _send_connectivity_event()首先调用ev  = ovn_k8s.processor.Event(event_type, source = pod_name, metadata = pod_data)创建一个Event类,接着调用conn_processor.get_event_queue().put(ev)将该事件加入队列中

service watcher

  • 整个过程和pod watcher是类似的,首先调用kubernetes.watch_services(variable.K8S_API_SERVER)获取service_stream
  • 调用_sync_k8s_services()进行k8s service和ovn配置的同步:
    • 先调用mode = ovn_k8s.modes.overlay.OvnNB()创建对象以操控OVN
    • 接着调用services = kubernetes.get_all_services(variables.K8S_API_SERVER)获取所有的service信息
    • 若services不为空,则调用mode.sync_services(services)进行具体的操作:
      • 对于所有的"clusterIP" services创建cluster_services = {'TCP': [], 'UDP': []},内部填充'IP:port',对于所有的NodePort service,创建nodeport_services = {'TCP': [], 'UDP': []},内部填充nodeport或者'external_ip:port'
      • 遍历从api server中获取 的service,如果service type 既不为"ClusterIP"又不为"NodePort",或者获取的"clusterIP"(service_ip)和"ports"(service_ports)为空,则跳过该service
      • 遍历service_ports,如果service_type为"NodePort"则获取port为"nodePort",否则获取port为"port",若port为空,则跳过
      • 调用service_port.get('protocol', 'TCP')获取protocol,再根据service_type和protocol分别向nodeport_services和cluster_service添加记录
      • 遍历获取的external_ips,根据protocol向nodeport_services中添加记录,记录的内容为"%s:%s" % (external_ip, port)
    • 对于OVN cluster,如果有vip在OVN load-balancer中存在,但是在当前的k8s service中不存在,则将其删除
    • 对于每个gateway,删除所有不存在于"nodeport_services"上的vip
  • 调用pool.spawn(_process_func, service_watcher_inst, _create_k8s_service_watcher)创建一个"线程"监听
  • _process_func(watcher, watcher_recycle_func)非常简单,就是无限循环地调用调用watcher.process(),如果发生异常,则再次调用watcher = watcher_recycle_func()重启一个watcher
  • ServiceWatcher的process()方法非常简单,就是调用util.process_stream(self._service_stream, self._process_service_event)
  • process_stream(data_stream, event_callback)首先调用line = next(data_stream)获取一个service信息,如果没有发生异常的话,就直接调用event_callback(json.load(line))
  • _process_service_event(self, event):
    • 创建service_data = event['object']
    • 调用cluster_ip = service_data['spec'].get('clusterIP')获取cluster_ip(vip),不过有可能在service创建的时候,还并没有分配一个cluster_ip,这时候就直接返回
    • 获取service_name,namespace和event_type,创建cache_key = "%s_%s" % (namespace, service_name),并且由此获取cache_service,并调用self._update_service_cache(event_type, cache_key, service_data)进行更新
    • 当cache_service为空,或者event_type为"DELETE",则调用self._send_connectivity_event(event_type, service_name, service_data),将其放入消息队列中
  • 最终,service的创建工作由update_vip(self, event)完成:

    • 从event中获取service_type和service_name,如果service_type不为”clusterIP“或者"nodePort"就忽略
    • 从event中获取event_type和namespace,创建cache_key = "%s_%s" % (namespace, service_name)
    • 调用self._update_service_cache(event_type, cache_key, service_data),如果event.event_type为"DELETED",则调用self._update_vip(service_data, None)
    • _update_vip(self, service_data, ips):
      • 从service_data中获取service_type, namespace, service_ip,service_ports,external_ips
      • 遍历service_ports,获取port, protocol, target_port,如果service_type为"NodePort"则调用self._create_gateways_vip(namespace, ips, port, target_port, protocol),如果service_type为"ClusterIP"则调用self._create_cluster_vip(namespace, service_ip, ips, port, target_port, protocol)
      • 遍历external_ips,将"external_ip:port"作为VIP加入gateway load-balancer,调用self._create_external_vip(namespace, external_ip, ips, port, target_port, protocol)
05-11 11:07