Awesome-omni-skill rust-dpdk
用户态网络专家。处理 DPDK, 用户态驱动, 高性能网络, packet processing, 零拷贝, RSS 负载均衡
install
source · Clone the upstream repo
git clone https://github.com/diegosouzapw/awesome-omni-skill
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/diegosouzapw/awesome-omni-skill "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/cli-automation/rust-dpdk" ~/.claude/skills/diegosouzapw-awesome-omni-skill-rust-dpdk && rm -rf "$T"
manifest:
skills/cli-automation/rust-dpdk/SKILL.mdsource content
用户态网络 (DPDK)
核心问题
如何实现百万级 PPS 的高性能网络数据包处理?
传统内核网络栈有太多上下文切换和内存拷贝开销。
DPDK vs 内核网络栈
| 特性 | 内核网络栈 | DPDK |
|---|---|---|
| 上下文切换 | 每次包都切换 | 轮询模式,无切换 |
| 内存拷贝 | 多次拷贝 | 零拷贝 |
| 中断 | 频繁中断 | 轮询 (poll mode driver) |
| 延迟 | 较高 | 微秒级 |
| 吞吐量 | 万级 PPS | 百万级 PPS |
| CPU 利用率 | 较低但有开销 | 高但高效 |
核心组件
// DPDK 核心结构 struct DpdkContext { memory_pool: Mempool, // 内存池 ports: Vec<Port>, // 网卡端口 rx_queues: Vec<RxQueue>, // 接收队列 tx_queues: Vec<TxQueue>, // 发送队列 cpu_cores: Vec<Core>, // CPU 核心分配 } struct Port { port_id: u16, mac_addr: [u8; 6], link_speed: u32, max_queues: u16, } struct Mempool { name: String, buffer_size: usize, cache_size: usize, total_buffers: u32, }
内存池管理
// 创建 DPDK 内存池 fn create_mempool() -> Result<Mempool, DpdkError> { let mempool = unsafe { rte_mempool_create( b"packet_pool\0".as_ptr() as *const c_char, NUM_BUFFERS as u32, // 缓冲区数量 BUFFER_SIZE as u16, // 每个缓冲区大小 CACHE_SIZE as u32, // CPU 缓存大小 0, // 私有数据大小 Some(rte_pktmbuf_pool_init), // 初始化函数 std::ptr::null(), // 初始化参数 Some(rte_pktmbuf_init), // 对象初始化函数 std::ptr::null(), // 对象参数 rte_socket_id() as i32, // 内存所在 Socket 0, // 标志位 ) }; if mempool.is_null() { Err(DpdkError::MempoolCreateFailed) } else { Ok(Mempool { inner: mempool }) } } // 分配缓冲区 fn alloc_mbuf(mempool: &Mempool) -> Option<*mut rte_mbuf> { unsafe { let mbuf = rte_pktmbuf_alloc(mempool.inner); if mbuf.is_null() { None } else { Some(mbuf) } } }
零拷贝接收
// 零拷贝接收数据包 fn process_packets( port_id: u16, queue_id: u16, bufs: &mut [*mut rte_mbuf; MAX_BURST_SIZE], ) -> usize { let num_received = unsafe { rte_eth_rx_burst( port_id, queue_id, bufs.as_mut_ptr(), MAX_BURST_SIZE as u16, ) }; // 直接处理 mbuf,不需要拷贝 for i in 0..num_received { let mbuf = bufs[i]; // 访问数据(零拷贝) let data_ptr = unsafe { rte_pktmbuf_mtod(mbuf, *const u8) }; let data_len = unsafe { rte_pktmbuf_pkt_len(mbuf) }; // 处理数据包 process_packet(data_ptr, data_len); // 释放 mbuf 回内存池 unsafe { rte_pktmbuf_free(mbuf); } } num_received }
批量发送
// 批量发送数据包 fn transmit_packets( port_id: u16, queue_id: u16, packets: &[Packet], ) -> usize { let mut mbufs: Vec<*mut rte_mbuf> = packets .iter() .map(|p| p.to_mbuf()) .collect(); let sent = unsafe { rte_eth_tx_burst( port_id, queue_id, mbufs.as_mut_ptr(), mbufs.len() as u16, ) }; // 释放未发送的 mbuf for i in sent..mbufs.len() { unsafe { rte_pktmbuf_free(mbufs[i]); } } sent }
RSS 负载均衡
// 配置 RSS (Receive Side Scaling) fn configure_rss(port_id: u16) -> Result<(), DpdkError> { let mut port_info: rte_eth_dev_info = unsafe { std::mem::zeroed() }; unsafe { rte_eth_dev_info_get(port_id, &mut port_info); } // 配置 RSS 哈希 let mut rss_conf: rte_eth_rss_conf = unsafe { std::mem::zeroed() }; rss_conf.rss_key_len = 40; rss_conf.rss_hf = RTE_ETH_RSS_TCP | RTE_ETH_RSS_UDP | RTE_ETH_RSS_IPV4; unsafe { let ret = rte_eth_dev_rss_hash_conf_update( port_id, &rss_conf, ); if ret < 0 { return Err(DpdkError::RssConfigFailed); } } Ok(()) } // 根据哈希值分配队列 fn get_queue_by_hash(hash: u32, num_queues: u16) -> u16 { // 使用简单的取模分发 (hash % num_queues as u32) as u16 }
多队列配置
// 配置多队列 fn configure_multi_queue(port_id: u16, num_queues: u16) -> Result<(), DpdkError> { let mut port_conf: rte_eth_conf = unsafe { std::mem::zeroed() }; port_conf.rxmode.split_hdr_size = 0; port_conf.rxmode.mq_mode = rte_eth_mq_mode::ETH_MQ_RX_RSS; port_conf.txmode.mq_mode = rte_eth_mq_mode::ETH_MQ_TX_NONE; // 配置 RX 队列 let mut rx_conf: rte_eth_rxconf = unsafe { std::mem::zeroed() }; rx_conf.rx_free_thresh = 32; rx_conf.rx_drop_en = 0; // 配置 TX 队列 let mut tx_conf: rte_eth_txconf = unsafe { std::mem::zeroed() }; tx_conf.tx_free_thresh = 32; // 分配 RX 队列 for queue in 0..num_queues { unsafe { let ret = rte_eth_rx_queue_setup( port_id, queue, 1024, // 队列深度 rte_socket_id() as u32, &rx_conf, mempool.inner, ); if ret < 0 { return Err(DpdkError::QueueSetupFailed); } } } // 分配 TX 队列 for queue in 0..num_queues { unsafe { let ret = rte_eth_tx_queue_setup( port_id, queue, 1024, rte_socket_id() as u32, &tx_conf, ); if ret < 0 { return Err(DpdkError::QueueSetupFailed); } } } Ok(()) }
CPU 亲和性
use std::os::raw::c_int; use std::thread; fn set_cpu_affinity(core_id: u32) -> Result<(), DpdkError> { let mut cpuset: cpu_set_t = unsafe { std::mem::zeroed() }; unsafe { CPU_SET(core_id as usize, &mut cpuset); let ret = pthread_setaffinity_np( pthread_self(), std::mem::size_of::<cpu_set_t>(), &cpuset, ); if ret != 0 { return Err(DpdkError::AffinitySetFailed); } } Ok(()) } // 为每个 RX 队列分配专用核心 fn allocate_cores_for_queues(num_queues: u16) { for queue in 0..num_queues { thread::spawn(move || { set_cpu_affinity(queue as u32).unwrap(); process_queue(queue); }); } }
性能优化
| 优化点 | 方法 |
|---|---|
| 内存对齐 | 缓存行对齐 (64 字节) |
| 无锁队列 | 使用 SPSC 队列 |
| 批处理 | 批量收发减少系统调用 |
| CPU 亲和性 | 核心绑定减少上下文切换 |
| Hugepages | 2MB/1GB 大页减少 TLB miss |
与其他技能关联
rust-dpdk │ ├─► rust-performance → 性能优化 ├─► rust-embedded → no_std 环境 └─► rust-concurrency → 并发模型