dpdk的l2fwd主要做二层转发,代码分析如下。

 #include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include <inttypes.h>
#include <sys/types.h>
#include <sys/queue.h>
#include <netinet/in.h>
#include <setjmp.h>
#include <stdarg.h>
#include <ctype.h>
#include <errno.h>
#include <getopt.h> #include <rte_common.h>
#include <rte_log.h>
#include <rte_memory.h>
#include <rte_memcpy.h>
#include <rte_memzone.h>
#include <rte_eal.h>
#include <rte_per_lcore.h>
#include <rte_launch.h>
#include <rte_atomic.h>
#include <rte_cycles.h>
#include <rte_prefetch.h>
#include <rte_lcore.h>
#include <rte_per_lcore.h>
#include <rte_branch_prediction.h>
#include <rte_interrupts.h>
#include <rte_pci.h>
#include <rte_random.h>
#include <rte_debug.h>
#include <rte_ether.h>
#include <rte_ethdev.h>
#include <rte_ring.h>
#include <rte_mempool.h>
#include <rte_mbuf.h> #define RTE_LOGTYPE_L2FWD RTE_LOGTYPE_USER1 #define MBUF_SIZE (2048 + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
#define NB_MBUF 8192 #define MAX_PKT_BURST 32
#define BURST_TX_DRAIN_US 100 /* TX drain every ~100us */ /*
* Configurable number of RX/TX ring descriptors
*/
#define RTE_TEST_RX_DESC_DEFAULT 128
#define RTE_TEST_TX_DESC_DEFAULT 512
static uint16_t nb_rxd = RTE_TEST_RX_DESC_DEFAULT;
static uint16_t nb_txd = RTE_TEST_TX_DESC_DEFAULT; /*物理端口的mac地址的数组 ethernet addresses of ports */
static struct ether_addr l2fwd_ports_eth_addr[RTE_MAX_ETHPORTS]; /*已经启用的物理端口的掩码/位图 mask of enabled ports */
static uint32_t l2fwd_enabled_port_mask = ; /*已经启用的目的物理端口编号的数组 list of enabled ports */
static uint32_t l2fwd_dst_ports[RTE_MAX_ETHPORTS]; static unsigned int l2fwd_rx_queue_per_lcore = ; //默认值,每个lcore负责的接收队列数量 struct mbuf_table { //mbuf数组,可以存放32个数据包
unsigned len;
struct rte_mbuf *m_table[MAX_PKT_BURST];
}; #define MAX_RX_QUEUE_PER_LCORE 16
#define MAX_TX_QUEUE_PER_PORT 16
struct lcore_queue_conf {
unsigned n_rx_port; //用于接收数据包的物理端口的实际数量
unsigned rx_port_list[MAX_RX_QUEUE_PER_LCORE];
struct mbuf_table tx_mbufs[RTE_MAX_ETHPORTS]; //保存发送数据包的缓存区 } __rte_cache_aligned;
struct lcore_queue_conf lcore_queue_conf[RTE_MAX_LCORE]; static const struct rte_eth_conf port_conf = {
.rxmode = {
.split_hdr_size = ,
.header_split = , /**< Header Split disabled */
.hw_ip_checksum = , /**< IP checksum offload disabled */
.hw_vlan_filter = , /**< VLAN filtering disabled */
.jumbo_frame = , /**< Jumbo Frame Support disabled */
.hw_strip_crc = , /**< CRC stripped by hardware */
},
.txmode = {
.mq_mode = ETH_MQ_TX_NONE,
},
}; struct rte_mempool * l2fwd_pktmbuf_pool = NULL; /*每个物理端口的统计结构体 Per-port statistics struct */
struct l2fwd_port_statistics {
uint64_t tx;
uint64_t rx;
uint64_t dropped;
} __rte_cache_aligned;
struct l2fwd_port_statistics port_statistics[RTE_MAX_ETHPORTS]; //数据包的统计信息的全局数组 /* A tsc-based timer responsible for triggering statistics printout */
#define TIMER_MILLISECOND 2000000ULL /* around 1ms at 2 Ghz */
#define MAX_TIMER_PERIOD 86400 /* 1 day max */
static int64_t timer_period = * TIMER_MILLISECOND * ; /* default period is 10 seconds */ /* Print out statistics on packets dropped */
static void //打印数据包丢失等统计信息
print_stats(void)
{
uint64_t total_packets_dropped, total_packets_tx, total_packets_rx;
unsigned portid; total_packets_dropped = ;
total_packets_tx = ;
total_packets_rx = ; const char clr[] = { , '[', '', 'J', '\0' };
const char topLeft[] = { , '[', '', ';', '', 'H','\0' }; /* Clear screen and move to top left */
printf("%s%s", clr, topLeft); printf("\nPort statistics ===================================="); for (portid = ; portid < RTE_MAX_ETHPORTS; portid++) {
/* skip disabled ports */
if ((l2fwd_enabled_port_mask & ( << portid)) == )
continue;
printf("\nStatistics for port %u ------------------------------"
"\nPackets sent: %24"PRIu64
"\nPackets received: %20"PRIu64
"\nPackets dropped: %21"PRIu64,
portid,
port_statistics[portid].tx,
port_statistics[portid].rx,
port_statistics[portid].dropped); total_packets_dropped += port_statistics[portid].dropped;
total_packets_tx += port_statistics[portid].tx;
total_packets_rx += port_statistics[portid].rx;
}
printf("\nAggregate statistics ==============================="
"\nTotal packets sent: %18"PRIu64
"\nTotal packets received: %14"PRIu64
"\nTotal packets dropped: %15"PRIu64,
total_packets_tx,
total_packets_rx,
total_packets_dropped);
printf("\n====================================================\n");
} /* Send the burst of packets on an output interface */
static int //在一个输出接口上burst发送数据包
l2fwd_send_burst(struct lcore_queue_conf *qconf, unsigned n, uint8_t port)
{
struct rte_mbuf **m_table;
unsigned ret;
unsigned queueid =; m_table = (struct rte_mbuf **)qconf->tx_mbufs[port].m_table;
//burst输出数据包
ret = rte_eth_tx_burst(port, (uint16_t) queueid, m_table, (uint16_t) n);
port_statistics[port].tx += ret; //记录发包数量
if (unlikely(ret < n)) {
port_statistics[port].dropped += (n - ret); //记录丢包数量
do {
rte_pktmbuf_free(m_table[ret]);
} while (++ret < n);
} return ;
} /* Enqueue packets for TX and prepare them to be sent */
static int //把数据包入队到发送缓冲区
l2fwd_send_packet(struct rte_mbuf *m, uint8_t port)
{
unsigned lcore_id, len;
struct lcore_queue_conf *qconf; lcore_id = rte_lcore_id(); //取得正在运行的lcore编号 qconf = &lcore_queue_conf[lcore_id];//取得lcore_queue的配置
len = qconf->tx_mbufs[port].len; //得到发包缓存区中数据包的个数
qconf->tx_mbufs[port].m_table[len] = m;//指向数据包
len++; /* enough pkts to be sent */
if (unlikely(len == MAX_PKT_BURST)) { //如果累计到32个数据包
l2fwd_send_burst(qconf, MAX_PKT_BURST, port); //实际发送数据包
len = ;
} qconf->tx_mbufs[port].len = len;//更新发包缓存区中的数据包的个数
return ;
} static void
l2fwd_simple_forward(struct rte_mbuf *m, unsigned portid)
{ struct ether_hdr *eth;
void *tmp;
unsigned dst_port; dst_port = l2fwd_dst_ports[portid];
eth = rte_pktmbuf_mtod(m, struct ether_hdr *); /* 02:00:00:00:00:xx 修改目的mac地址 */
tmp = &eth->d_addr.addr_bytes[];
*((uint64_t *)tmp) = 0x000000000002 + ((uint64_t)dst_port << ); /* src addr 修改进入包的目的mac地址为转发包的源mac地址 */
ether_addr_copy(&l2fwd_ports_eth_addr[dst_port], &eth->s_addr); l2fwd_send_packet(m, (uint8_t) dst_port); //在dst_port上发送数据包
} /* main processing loop */
static void //线程的主处理循环
l2fwd_main_loop(void)
{
struct rte_mbuf *pkts_burst[MAX_PKT_BURST];
struct rte_mbuf *m;
unsigned lcore_id;
uint64_t prev_tsc, diff_tsc, cur_tsc, timer_tsc;
unsigned i, j, portid, nb_rx;
struct lcore_queue_conf *qconf;
const uint64_t drain_tsc = (rte_get_tsc_hz() + US_PER_S - ) / US_PER_S * BURST_TX_DRAIN_US; prev_tsc = ;
timer_tsc = ; lcore_id = rte_lcore_id(); //获取当期lcore的编号
qconf = &lcore_queue_conf[lcore_id]; //读取此lcore上的配置信息 if (qconf->n_rx_port == ) { //如果此lcore上的用于接收的物理端口数量为0
RTE_LOG(INFO, L2FWD, "lcore %u has nothing to do\n", lcore_id);
return; //那么结束该线程
} RTE_LOG(INFO, L2FWD, "entering main loop on lcore %u\n", lcore_id); for (i = ; i < qconf->n_rx_port; i++) { //遍历所有的用于接收数据包的物理端口 portid = qconf->rx_port_list[i];//一个lcore可能负责多个接收用的物理端口
RTE_LOG(INFO, L2FWD, " -- lcoreid=%u portid=%u\n", lcore_id,
portid);
} while () { //死循环 cur_tsc = rte_rdtsc(); /*
* TX burst queue drain
*/
diff_tsc = cur_tsc - prev_tsc;
if (unlikely(diff_tsc > drain_tsc)) { for (portid = ; portid < RTE_MAX_ETHPORTS; portid++) {
if (qconf->tx_mbufs[portid].len == )
continue;
l2fwd_send_burst(&lcore_queue_conf[lcore_id],
qconf->tx_mbufs[portid].len,
(uint8_t) portid);
qconf->tx_mbufs[portid].len = ;
} /* if timer is enabled */
if (timer_period > ) { //如果定时器启动 /* advance the timer */
timer_tsc += diff_tsc; /* if timer has reached its timeout */
if (unlikely(timer_tsc >= (uint64_t) timer_period)) { /* do this only on master core */
if (lcore_id == rte_get_master_lcore()) {
print_stats(); //十秒钟打印一次收包统计信息
/* reset the timer */
timer_tsc = ;
}
}
} prev_tsc = cur_tsc;
} /*
* Read packet from RX queues
*/
for (i = ; i < qconf->n_rx_port; i++) { //遍历所有的用于接收数据包的物理端口 portid = qconf->rx_port_list[i]; //第i个物理端口
nb_rx = rte_eth_rx_burst((uint8_t) portid, , //接收数据包,返回实际个数
pkts_burst, MAX_PKT_BURST); port_statistics[portid].rx += nb_rx; //记录物理端口上收包数量 for (j = ; j < nb_rx; j++) { //遍历实际接收到的所有的数据包
m = pkts_burst[j];
rte_prefetch0(rte_pktmbuf_mtod(m, void *)); //预取
l2fwd_simple_forward(m, portid);//简单的二层转发数据包
}
}
}
} static int
l2fwd_launch_one_lcore(__attribute__((unused)) void *dummy)
{
l2fwd_main_loop();//线程执行函数
return ;
} /* display usage */
static void
l2fwd_usage(const char *prgname)
{
printf("%s [EAL options] -- -p PORTMASK [-q NQ]\n"
" -p PORTMASK: hexadecimal bitmask of ports to configure\n"
" -q NQ: number of queue (=ports) per lcore (default is 1)\n"
" -T PERIOD: statistics will be refreshed each PERIOD seconds (0 to disable, 10 default, 86400 maximum)\n",
prgname);
} static int
l2fwd_parse_portmask(const char *portmask)
{
char *end = NULL;
unsigned long pm; /* parse hexadecimal string */
pm = strtoul(portmask, &end, );
if ((portmask[] == '\0') || (end == NULL) || (*end != '\0'))
return -; if (pm == )
return -; return pm;
} static unsigned int
l2fwd_parse_nqueue(const char *q_arg)
{
char *end = NULL;
unsigned long n; /* parse hexadecimal string */
n = strtoul(q_arg, &end, ); //转换为十进制
if ((q_arg[] == '\0') || (end == NULL) || (*end != '\0'))
return ;
if (n == )
return ;
if (n >= MAX_RX_QUEUE_PER_LCORE)
return ; return n;
} static int
l2fwd_parse_timer_period(const char *q_arg)
{
char *end = NULL;
int n; /* parse number string */
n = strtol(q_arg, &end, ); //转换为十进制
if ((q_arg[] == '\0') || (end == NULL) || (*end != '\0'))
return -;
if (n >= MAX_TIMER_PERIOD)
return -; return n;
} /* Parse the argument given in the command line of the application */
static int
l2fwd_parse_args(int argc, char **argv)
{
int opt, ret;
char **argvopt;
int option_index;
char *prgname = argv[];
static struct option lgopts[] = {
{NULL, , , }
}; argvopt = argv; while ((opt = getopt_long(argc, argvopt, "p:q:T:",
lgopts, &option_index)) != EOF) { switch (opt) {
/* portmask */
case 'p': //物理端口的掩码
l2fwd_enabled_port_mask = l2fwd_parse_portmask(optarg);
if (l2fwd_enabled_port_mask == ) {
printf("invalid portmask\n");
l2fwd_usage(prgname);
return -;
}
break; /* nqueue */
case 'q': //lcore负责的队列的数量
l2fwd_rx_queue_per_lcore = l2fwd_parse_nqueue(optarg);//修改默认值
if (l2fwd_rx_queue_per_lcore == ) {
printf("invalid queue number\n");
l2fwd_usage(prgname);
return -;
}
break; /* timer period */
case 'T': //定时的长度
timer_period = l2fwd_parse_timer_period(optarg) * * TIMER_MILLISECOND;
if (timer_period < ) {
printf("invalid timer period\n");
l2fwd_usage(prgname);
return -;
}
break; /* long options */
case :
l2fwd_usage(prgname);
return -; default:
l2fwd_usage(prgname);
return -;
}
} if (optind >= )
argv[optind-] = prgname; ret = optind-;
optind = ; /* reset getopt lib */
return ret;
} /* Check the link status of all ports in up to 9s, and print them finally */
static void //检查物理端口的连接状态
check_all_ports_link_status(uint8_t port_num, uint32_t port_mask)
{
#define CHECK_INTERVAL 100 /* 100ms */
#define MAX_CHECK_TIME 90 /* 9s (90 * 100ms) in total */
uint8_t portid, count, all_ports_up, print_flag = ;
struct rte_eth_link link; printf("\nChecking link status");
fflush(stdout);
for (count = ; count <= MAX_CHECK_TIME; count++) {
all_ports_up = ;
for (portid = ; portid < port_num; portid++) {
if ((port_mask & ( << portid)) == )
continue;
memset(&link, , sizeof(link));
rte_eth_link_get_nowait(portid, &link);
/* print link status if flag set */
if (print_flag == ) {
if (link.link_status)
printf("Port %d Link Up - speed %u "
"Mbps - %s\n", (uint8_t)portid,
(unsigned)link.link_speed,
(link.link_duplex == ETH_LINK_FULL_DUPLEX) ?
("full-duplex") : ("half-duplex\n"));
else
printf("Port %d Link Down\n",
(uint8_t)portid);
continue;
}
/* clear all_ports_up flag if any link down */
if (link.link_status == ) {
all_ports_up = ;
break;
}
}
/* after finally printing all link status, get out */
if (print_flag == )
break; if (all_ports_up == ) {
printf(".");
fflush(stdout);
rte_delay_ms(CHECK_INTERVAL);
} /* set the print_flag if all ports up or timeout */
if (all_ports_up == || count == (MAX_CHECK_TIME - )) {
print_flag = ;
printf("done\n");
}
}
} int //主函数
main(int argc, char **argv)
{
struct lcore_queue_conf *qconf;
struct rte_eth_dev_info dev_info;
int ret;
uint8_t nb_ports;
uint8_t nb_ports_available;
uint8_t portid, last_port;
unsigned lcore_id, rx_lcore_id;
unsigned nb_ports_in_mask = ; /* init EAL */
ret = rte_eal_init(argc, argv); //初始化环境抽象层,并解析相关参数
if (ret < )
rte_exit(EXIT_FAILURE, "Invalid EAL arguments\n");
argc -= ret;
argv += ret; /* parse application arguments (after the EAL ones) */
ret = l2fwd_parse_args(argc, argv); //解析l2fwd相关的参数: -p -q -P
if (ret < )
rte_exit(EXIT_FAILURE, "Invalid L2FWD arguments\n"); /* create the mbuf pool */
l2fwd_pktmbuf_pool = //创建mbuf pool
rte_mempool_create("mbuf_pool", NB_MBUF,
MBUF_SIZE, ,
sizeof(struct rte_pktmbuf_pool_private),
rte_pktmbuf_pool_init, NULL,
rte_pktmbuf_init, NULL,
rte_socket_id(), );
if (l2fwd_pktmbuf_pool == NULL)
rte_exit(EXIT_FAILURE, "Cannot init mbuf pool\n"); nb_ports = rte_eth_dev_count(); //得到物理端口的实际数量
if (nb_ports == )
rte_exit(EXIT_FAILURE, "No Ethernet ports - bye\n"); if (nb_ports > RTE_MAX_ETHPORTS) //如果物理端口的数量超过限制
nb_ports = RTE_MAX_ETHPORTS; /* 重置目的物理端口的数组 reset l2fwd_dst_ports */
for (portid = ; portid < RTE_MAX_ETHPORTS; portid++)
l2fwd_dst_ports[portid] = ;//清零
last_port = ; /* 每个lcore用在一个专用的发送队列上
* Each logical core is assigned a dedicated TX queue on each port.
*/
for (portid = ; portid < nb_ports; portid++) {//遍历所有的物理端口
/* 忽略未启用的物理端口 skip ports that are not enabled */
if ((l2fwd_enabled_port_mask & ( << portid)) == )
continue; if (nb_ports_in_mask % ) { //如果是有偶数个物理端口,设为相邻两个物理端口对发
l2fwd_dst_ports[portid] = last_port; //奇数号的目的物理端口为偶数号
l2fwd_dst_ports[last_port] = portid; //偶数号的目的物理端口为奇数号
}
else //如果是奇数个物理端口
last_port = portid; nb_ports_in_mask++; //更新已启用的物理端口的总数 rte_eth_dev_info_get(portid, &dev_info);
}
if (nb_ports_in_mask % ) { //如果已启用的物理端口的总数是奇数
printf("Notice: odd number of ports in portmask.\n");
l2fwd_dst_ports[last_port] = last_port;//last_port的目的物理端口还是last_port
} rx_lcore_id = ;
qconf = NULL; /* Initialize the port/queue configuration of each logical core */
for (portid = ; portid < nb_ports; portid++) { //遍历所有的物理端口
/* 忽略未启用的物理端口 skip ports that are not enabled */
if ((l2fwd_enabled_port_mask & ( << portid)) == )
continue; /* 得到此物理端口的lcore编号 get the lcore_id for this port */
while (rte_lcore_is_enabled(rx_lcore_id) == || //如果此lcore未启用
lcore_queue_conf[rx_lcore_id].n_rx_port == //如果lcore上负责接收的物理端口的实际数量等于
l2fwd_rx_queue_per_lcore) {//每个lcore负责的接收队列的实际数量(-q参数值)
rx_lcore_id++;//接收lcore的编号自增
if (rx_lcore_id >= RTE_MAX_LCORE) //如果接收lcore编号超过lcore最大数量
rte_exit(EXIT_FAILURE, "Not enough cores\n");
} if (qconf != &lcore_queue_conf[rx_lcore_id])
/* Assigned a new logical core in the loop above. */
qconf = &lcore_queue_conf[rx_lcore_id]; qconf->rx_port_list[qconf->n_rx_port] = portid;
qconf->n_rx_port++;//用于接收数据包的物理端口数量自增
printf("Lcore %u: RX port %u\n", rx_lcore_id, (unsigned) portid);
} nb_ports_available = nb_ports; /*初始化每个物理端口 Initialise each port */
for (portid = ; portid < nb_ports; portid++) { //遍历所有的物理端口
/* 忽略未使能的物理端口 skip ports that are not enabled */
if ((l2fwd_enabled_port_mask & ( << portid)) == ) {
printf("Skipping disabled port %u\n", (unsigned) portid);
nb_ports_available--;
continue;
}
/* 初始化某个物理端口 init port */
printf("Initializing port %u... ", (unsigned) portid);
fflush(stdout);
ret = rte_eth_dev_configure(portid, , , &port_conf); //第一步,设为1个发送队列和1个接收队列
if (ret < )
rte_exit(EXIT_FAILURE, "Cannot configure device: err=%d, port=%u\n",
ret, (unsigned) portid); rte_eth_macaddr_get(portid,&l2fwd_ports_eth_addr[portid]);//获取mac地址 /* 在每个物理端口上建立一个接收队列 init one RX queue */
fflush(stdout);
ret = rte_eth_rx_queue_setup(portid, , nb_rxd, //第二步,0代表接收队列的编号
rte_eth_dev_socket_id(portid),
NULL,
l2fwd_pktmbuf_pool);
if (ret < )
rte_exit(EXIT_FAILURE, "rte_eth_rx_queue_setup:err=%d, port=%u\n",
ret, (unsigned) portid); /* 在每个物理端口上建立一个发送队列 init one TX queue on each port */
fflush(stdout);
ret = rte_eth_tx_queue_setup(portid, , nb_txd,//第三步,0代表发送队列的编号
rte_eth_dev_socket_id(portid),
NULL);
if (ret < )
rte_exit(EXIT_FAILURE, "rte_eth_tx_queue_setup:err=%d, port=%u\n",
ret, (unsigned) portid); /*启动设备 Start device */
ret = rte_eth_dev_start(portid); //第四步,启动物理端口
if (ret < )
rte_exit(EXIT_FAILURE, "rte_eth_dev_start:err=%d, port=%u\n",
ret, (unsigned) portid); printf("done: \n"); rte_eth_promiscuous_enable(portid); printf("Port %u, MAC address: %02X:%02X:%02X:%02X:%02X:%02X\n\n",
(unsigned) portid,
l2fwd_ports_eth_addr[portid].addr_bytes[],
l2fwd_ports_eth_addr[portid].addr_bytes[],
l2fwd_ports_eth_addr[portid].addr_bytes[],
l2fwd_ports_eth_addr[portid].addr_bytes[],
l2fwd_ports_eth_addr[portid].addr_bytes[],
l2fwd_ports_eth_addr[portid].addr_bytes[]); /*清空物理端口的统计信息 initialize port stats */
memset(&port_statistics, , sizeof(port_statistics));
} if (!nb_ports_available) {
rte_exit(EXIT_FAILURE,
"All available ports are disabled. Please set portmask.\n");
} check_all_ports_link_status(nb_ports, l2fwd_enabled_port_mask); /* 在每个lcore上启动线程 launch per-lcore init on every lcore */
rte_eal_mp_remote_launch(l2fwd_launch_one_lcore, NULL, CALL_MASTER);
RTE_LCORE_FOREACH_SLAVE(lcore_id) {
if (rte_eal_wait_lcore(lcore_id) < ) //等待线程完成工作
return -;
} return ;
}
05-16 07:48