一:实现流量监控

SDN实验---Ryu的应用开发(三)流量监控-LMLPHP

(一)流量监控原理

SDN实验---Ryu的应用开发(三)流量监控-LMLPHP

其中控制器向交换机周期下发获取统计消息,请求交换机消息------是主动下发过程
流速公式:是(t1时刻的流量-t0时刻的流量)/(t1-t0)
剩余带宽公式:链路总带宽-流速--------是这一个这一个,例如s2-s3(不是一条,例如:h1->s1->s2->s3->h2)的剩余带宽
路径有效带宽是只:这一整条路径中,按照最小的剩余带宽处理

SDN实验---Ryu的应用开发(三)流量监控-LMLPHP

二:代码实现

(一)代码框架

from ryu.app import simple_switch_13
from ryu.controller.handler import set_ev_cls
from ryu.controller import ofp_event
from ryu.controller.handler import MAIN_DISPATCHER,DEAD_DISPATCHER class MyMonitor(simple_switch_13): #simple_switch_13 is same as the last experiment which named self_learn_switch
'''
design a class to achvie managing the quantity of flow
''' def __init__(self,*args,**kwargs):
super(MyMonitor,self).__init__(*args,**kwargs) @set_ev_cls(ofp_event.EventOFPStateChange,[MAIN_DISPATCHER,DEAD_DISPATCHER])
def _state_change_handler(self,ev):
'''
design a handler to get switch state transition condition
'''
pass def _monitor(self):
'''
design a monitor on timing system to request switch infomations about port and flow
'''
pass def _request_stats(self,datapath):
'''
the function is to send requery to datapath
'''
pass @set_ev_cls(ofp_event.EventOFPPortStatsReply,MAIN_DISPATCHER)
def _port_stats_reply_handler(self,ev):
'''
monitor to require the port state, then this function is to get infomation for port`s info
'''
pass @set_ev_cls(ofp_event.EventOFPFlowStatsReply,MAIN_DISPATCHER)
def _port_stats_reply_handler(self,ev):
'''
monitor to require the flow state, then this function is to get infomation for flow`s info
'''
pass

(二)推文:协程https://www.cnblogs.com/ssyfj/p/9030165.html

(三)全部代码实现

from operator import attrgetter

from ryu.app import simple_switch_13
from ryu.controller.handler import set_ev_cls
from ryu.controller import ofp_event
from ryu.controller.handler import MAIN_DISPATCHER,DEAD_DISPATCHER
from ryu.lib import hub class MyMonitor(simple_switch_13.SimpleSwitch13): #simple_switch_13 is same as the last experiment which named self_learn_switch
'''
design a class to achvie managing the quantity of flow
''' def __init__(self,*args,**kwargs):
super(MyMonitor,self).__init__(*args,**kwargs)
self.datapaths = {}
#use gevent to start monitor
self.monitor_thread = hub.spawn(self._monitor) @set_ev_cls(ofp_event.EventOFPStateChange,[MAIN_DISPATCHER,DEAD_DISPATCHER])
def _state_change_handler(self,ev):
'''
design a handler to get switch state transition condition
'''
#first get ofprocotol info
datapath = ev.datapath
ofproto = datapath.ofproto
ofp_parser = datapath.ofproto_parser #judge datapath`s status to decide how to operate
if datapath.state == MAIN_DISPATCHER: #should save info to dictation
if datapath.id not in self.datapaths:
self.datapaths[datapath.id] = datapath
self.logger.debug("Regist datapath: %16x",datapath.id)
elif datapath.state == DEAD_DISPATCHER: #should remove info from dictation
if datapath.id in self.datapaths:
del self.datapaths[datapath.id]
self.logger.debug("Unregist datapath: %16x",datapath.id) def _monitor(self):
'''
design a monitor on timing system to request switch infomations about port and flow
'''
while True: #initiatie to request port and flow info all the time
for dp in self.datapaths.values():
self._request_stats(dp)
hub.sleep() #pause to sleep to wait reply, and gave time to other gevent to request def _request_stats(self,datapath):
'''
the function is to send requery to datapath
'''
self.logger.debug("send stats reques to datapath: %16x for port and flow info",datapath.id) ofproto = datapath.ofproto
parser = datapath.ofproto_parser req = parser.OFPFlowStatsRequest(datapath)
datapath.send_msg(req) req = parser.OFPPortStatsRequest(datapath, , ofproto.OFPP_ANY)
datapath.send_msg(req) @set_ev_cls(ofp_event.EventOFPPortStatsReply,MAIN_DISPATCHER)
def _port_stats_reply_handler(self,ev):
'''
monitor to require the port state, then this function is to get infomation for port`s info
print("6666666666port info:")
print(ev.msg)
print(dir(ev.msg))
'''
body = ev.msg.body
self.logger.info('datapath port '
'rx_packets tx_packets'
'rx_bytes tx_bytes'
'rx_errors tx_errors'
)
self.logger.info('--------------- --------'
'-------- --------'
'-------- --------'
'-------- --------'
)
for port_stat in sorted(body,key=attrgetter('port_no')):
self.logger.info('%016x %8x %8d %8d %8d %8d %8d %8d',
ev.msg.datapath.id,port_stat.port_no,port_stat.rx_packets,port_stat.tx_packets,
port_stat.rx_bytes,port_stat.tx_bytes,port_stat.rx_errors,port_stat.tx_errors
) @set_ev_cls(ofp_event.EventOFPFlowStatsReply,MAIN_DISPATCHER)
def _flow_stats_reply_handler(self,ev):
'''
monitor to require the flow state, then this function is to get infomation for flow`s info
print("777777777flow info:")
print(ev.msg)
print(dir(ev.msg))
'''
body = ev.msg.body self.logger.info('datapath '
'in_port eth_src'
'out_port eth_dst'
'packet_count byte_count'
)
self.logger.info('--------------- '
'---- -----------------'
'---- -----------------'
'--------- ---------'
)
for flow_stat in sorted([flow for flow in body if flow.priority==],
key=lambda flow:(flow.match['in_port'],flow.match['eth_src'])):
self.logger.info('%016x %8x %17s %8x %17s %8d %8d',
ev.msg.datapath.id,flow_stat.match['in_port'],flow_stat.match['eth_src'],
flow_stat.instructions[].actions[].port,flow_stat.match['eth_dst'],
flow_stat.packet_count,flow_stat.byte_count
)

补充:注意---每个事件的属性可能不同,需要我们进行Debug,例如上面就出现了ev.msg.body(之前hub实现中没有)

(四)代码讲解

1.class MyMonitor(simple_switch_13.SimpleSwitch13):

simple_switch_13.SimpleSwitch13是样例代码,其中实现了和我们上一次实验中,自学习交换机类似的功能
(稍微多了个关于交换机是否上传全部packet还是只上传buffer_id),所以我们直接继承,可以减少写代码时间

2.协程实现伪并发self.monitor_thread = hub.spawn(self._monitor)

    def __init__(self,*args,**kwargs):
super(MyMonitor,self).__init__(*args,**kwargs)
self.datapaths = {}
#use gevent to start monitor
self.monitor_thread = hub.spawn(self._monitor)

3.在协程中实现周期请求交换机信息

    def _monitor(self):
'''
design a monitor on timing system to request switch infomations about port and flow
'''
while True: #initiatie to request port and flow info all the time
for dp in self.datapaths.values():
self._request_stats(dp)
hub.sleep(5) #pause to sleep to wait reply, and gave time to other gevent to request

4.主动下发消息,请求交换机信息OFPFlowStatsRequest------注意:我们这里请求两个(端口和协议信息),所以我们要使用两个函数来分别处理port和flow响应

    def _request_stats(self,datapath):
'''
the function is to send requery to datapath
'''
self.logger.debug("send stats reques to datapath: %16x for port and flow info",datapath.id) ofproto = datapath.ofproto
parser = datapath.ofproto_parser req = parser.OFPFlowStatsRequest(datapath)
datapath.send_msg(req) req = parser.OFPPortStatsRequest(datapath, 0, ofproto.OFPP_ANY)  #可以向上面一样省略默认参数
datapath.send_msg(req)

源码查看参数

@_set_stats_type(ofproto.OFPMP_FLOW, OFPFlowStats)
@_set_msg_type(ofproto.OFPT_MULTIPART_REQUEST)
class OFPFlowStatsRequest(OFPFlowStatsRequestBase):
"""
Individual flow statistics request message The controller uses this message to query individual flow statistics. ================ ======================================================
Attribute Description
================ ======================================================
flags Zero or ``OFPMPF_REQ_MORE``
table_id ID of table to read
out_port Require matching entries to include this as an output
port
out_group Require matching entries to include this as an output
group
cookie Require matching entries to contain this cookie value
cookie_mask Mask used to restrict the cookie bits that must match
match Instance of ``OFPMatch``
================ ====================================================== Example:: def send_flow_stats_request(self, datapath):
ofp = datapath.ofproto
ofp_parser = datapath.ofproto_parser cookie = cookie_mask =
match = ofp_parser.OFPMatch(in_port=)
req = ofp_parser.OFPFlowStatsRequest(datapath, ,
ofp.OFPTT_ALL,
ofp.OFPP_ANY, ofp.OFPG_ANY,
cookie, cookie_mask,
match)
datapath.send_msg(req)
""" def __init__(self, datapath, flags=0, table_id=ofproto.OFPTT_ALL,
out_port=ofproto.OFPP_ANY,
out_group=ofproto.OFPG_ANY,
cookie=0, cookie_mask=0, match=None, type_=None
):

5.获取端口响应信息ofp_event.EventOFPPortStatsReply

    @set_ev_cls(ofp_event.EventOFPPortStatsReply,MAIN_DISPATCHER)
def _port_stats_reply_handler(self,ev):
'''
monitor to require the port state, then this function is to get infomation for port`s info
print("6666666666port info:")
print(ev.msg)
print(dir(ev.msg))
'''
body = ev.msg.body
self.logger.info('datapath port '
'rx_packets tx_packets'
'rx_bytes tx_bytes'
'rx_errors tx_errors'
)
self.logger.info('--------------- --------'
'-------- --------'
'-------- --------'
'-------- --------'
)
for port_stat in sorted(body,key=attrgetter('port_no')):
self.logger.info('%016x %8x %8d %8d %8d %8d %8d %8d',
ev.msg.datapath.id,port_stat.port_no,port_stat.rx_packets,port_stat.tx_packets,
port_stat.rx_bytes,port_stat.tx_bytes,port_stat.rx_errors,port_stat.tx_errors
)

端口信息:《参考》

6666666666port info:
version=0x4,msg_type=0x13,msg_len=0x1d0,xid=0x8dcd9187,
OFPPortStatsReply(
body=[
OFPPortStats(port_no=,rx_packets=,tx_packets=,rx_bytes=,tx_bytes=,rx_dropped=,tx_dropped=,rx_errors=,tx_errors=,rx_frame_err=,rx_over_err=,rx_crc_err=,collisions=,duration_sec=,duration_nsec=), OFPPortStats(port_no=,rx_packets=,tx_packets=,rx_bytes=,tx_bytes=,rx_dropped=,tx_dropped=,rx_errors=,tx_errors=,rx_frame_err=,rx_over_err=,rx_crc_err=,collisions=,duration_sec=,duration_nsec=), OFPPortStats(port_no=,rx_packets=,tx_packets=,rx_bytes=,tx_bytes=,rx_dropped=,tx_dropped=,rx_errors=,tx_errors=,rx_frame_err=,rx_over_err=,rx_crc_err=,collisions=,duration_sec=,duration_nsec=), OFPPortStats(port_no=,rx_packets=,tx_packets=,rx_bytes=,tx_bytes=,rx_dropped=,tx_dropped=,rx_errors=,tx_errors=,rx_frame_err=,rx_over_err=,rx_crc_err=,collisions=,duration_sec=,duration_nsec=)
]
,flags=,type=) OFPPortStats(
port_no=, ----------
rx_packets=, ----------
tx_packets=, ----------
rx_bytes=, ----------
tx_bytes=, ----------
rx_dropped=,
tx_dropped=,
rx_errors=, ----------
tx_errors=, ----------
rx_frame_err=,
rx_over_err=,
rx_crc_err=,
collisions=,
duration_sec=,
duration_nsec=) ['_STATS_MSG_TYPES', '_TYPE', '__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_base_attributes', '_class_prefixes', '_class_suffixes', '_decode_value', '_encode_value', '_get_decoder', '_get_default_decoder', '_get_default_encoder', '_get_encoder', '_get_type', '_is_class', '_opt_attributes', '_restore_args', '_serialize_body', '_serialize_header', '_serialize_pre', 'body', 'buf', 'cls_body_single_struct', 'cls_from_jsondict_key', 'cls_msg_type', 'cls_stats_body_cls', 'cls_stats_type',
'datapath' ----------
, 'flags', 'from_jsondict', 'msg_len', 'msg_type', 'obj_from_jsondict', 'parser', 'parser_stats', 'parser_stats_body', 'register_stats_type', 'serialize', 'set_buf', 'set_classes', 'set_headers', 'set_xid', 'stringify_attrs', 'to_jsondict', 'type', 'version', 'xid']

6.获取flow协议响应信息ofp_event.EventOFPFlowStatsReply

    @set_ev_cls(ofp_event.EventOFPFlowStatsReply,MAIN_DISPATCHER)
def _flow_stats_reply_handler(self,ev):
'''
monitor to require the flow state, then this function is to get infomation for flow`s info
print("777777777flow info:")
print(ev.msg)
print(dir(ev.msg))
'''
body = ev.msg.body self.logger.info('datapath '
'in_port eth_src'
'out_port eth_dst'
'packet_count byte_count'
)
self.logger.info('--------------- '
'---- -----------------'
'---- -----------------'
'--------- ---------'
)
for flow_stat in sorted([flow for flow in body if flow.priority==],
key=lambda flow:(flow.match['in_port'],flow.match['eth_src'])):
self.logger.info('%016x %8x %17s %8x %17s %8d %8d',
ev.msg.datapath.id,flow_stat.match['in_port'],flow_stat.match['eth_src'],
flow_stat.instructions[].actions[].port,flow_stat.match['eth_dst'],
flow_stat.packet_count,flow_stat.byte_count
)

协议信息《参考》

777777777flow info:
version=0x4,msg_type=0x13,msg_len=0x200,xid=0x9e448a1a,
OFPFlowStatsReply(
body=[
OFPFlowStats(byte_count=,cookie=,duration_nsec=,duration_sec=,flags=,hard_timeout=,idle_timeout=,instructions=[OFPInstructionActions(actions=[OFPActionOutput(len=,max_len=,port=,type=)],len=,type=)],
length=,match=OFPMatch(oxm_fields={'in_port': , 'eth_src': '8a:06:6a:2c:10:fc', 'eth_dst': '26:20:2f:85:5a:9a'}),packet_count=,priority=,table_id=), OFPFlowStats(byte_count=,cookie=,duration_nsec=,duration_sec=,flags=,hard_timeout=,idle_timeout=,instructions=[OFPInstructionActions(actions=[OFPActionOutput(len=,max_len=,port=,type=)],len=,type=)],
length=,match=OFPMatch(oxm_fields={'in_port': , 'eth_src': '26:20:2f:85:5a:9a', 'eth_dst': '8a:06:6a:2c:10:fc'}),packet_count=,priority=,table_id=), OFPFlowStats(byte_count=,cookie=,duration_nsec=,duration_sec=,flags=,hard_timeout=,idle_timeout=,instructions=[OFPInstructionActions(actions=[OFPActionOutput(len=,max_len=,port=,type=)],len=,type=)],
length=,match=OFPMatch(oxm_fields={'in_port': , 'eth_src': 'ca:9e:a1:af:b9:5f', 'eth_dst': '26:20:2f:85:5a:9a'}),packet_count=,priority=,table_id=), OFPFlowStats(byte_count=,cookie=,duration_nsec=,duration_sec=,flags=,hard_timeout=,idle_timeout=,instructions=[OFPInstructionActions(actions=[OFPActionOutput(len=,max_len=,port=,type=)],len=,type=)]
,length=,match=OFPMatch(oxm_fields={'in_port': , 'eth_src': '26:20:2f:85:5a:9a', 'eth_dst': 'ca:9e:a1:af:b9:5f'}),packet_count=,priority=,table_id=), OFPFlowStats(byte_count=,cookie=,duration_nsec=,duration_sec=,flags=,hard_timeout=,idle_timeout=,instructions=[OFPInstructionActions(actions=[OFPActionOutput(len=,max_len=,port=,type=)],len=,type=)],
length=,match=OFPMatch(oxm_fields={}),packet_count=,priority=,table_id=)
]
,flags=,type=) OFPFlowStats(
byte_count=, ----------
cookie=,
duration_nsec=,
duration_sec=,
flags=,
hard_timeout=,
idle_timeout=,
instructions=[
OFPInstructionActions(
actions=[
OFPActionOutput(
len=,
max_len=,
port=, ----------
type=)
],
len=,
type=
)
],
length=,
match=OFPMatch(oxm_fields={
'in_port': , ----------
'eth_src': '8a:06:6a:2c:10:fc', ----------
'eth_dst': '26:20:2f:85:5a:9a' ----------
}),
packet_count=, ----------
priority=,
table_id=
) ['_STATS_MSG_TYPES', '_TYPE', '__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_base_attributes', '_class_prefixes', '_class_suffixes', '_decode_value', '_encode_value', '_get_decoder', '_get_default_decoder', '_get_default_encoder', '_get_encoder', '_get_type', '_is_class', '_opt_attributes', '_restore_args', '_serialize_body', '_serialize_header', '_serialize_pre', 'body', 'buf', 'cls_body_single_struct', 'cls_from_jsondict_key', 'cls_msg_type', 'cls_stats_body_cls', 'cls_stats_type',
'datapath' ----------
, 'flags', 'from_jsondict', 'msg_len', 'msg_type', 'obj_from_jsondict', 'parser', 'parser_stats', 'parser_stats_body', 'register_stats_type', 'serialize', 'set_buf', 'set_classes', 'set_headers', 'set_xid', 'stringify_attrs', 'to_jsondict', 'type', 'version', 'xid']

三:实验演示

(一)开启Ryu

ryu-manager my_monitor.py

SDN实验---Ryu的应用开发(三)流量监控-LMLPHP

(二)开启Mininet

sudo mn --topo=tree,, --controller=remote --mac 

SDN实验---Ryu的应用开发(三)流量监控-LMLPHP

SDN实验---Ryu的应用开发(三)流量监控-LMLPHP

(三)Ryu显示结果

SDN实验---Ryu的应用开发(三)流量监控-LMLPHP

(四)还需要去了解返回的字段含义才可以---以后做,最近没时间了

05-17 07:02