我正在玩Tokio和Rust,例如,我正在尝试编写一个简单的UDP代理,该代理将仅在一个套接字上接受UDP数据包并将其发送到其他多个目的地。但是,我遇到了需要将接收到的数据包发送到多个地址的情况,并且不确定如何以惯用的方式进行处理。

代码我到目前为止:

extern crate bytes;
extern crate futures;

use std::net::SocketAddr;
use tokio::codec::BytesCodec;
use tokio::net::{UdpFramed, UdpSocket};
use tokio::prelude::*;

fn main() {
    let listen_address = "127.0.0.1:4711".parse::<SocketAddr>().unwrap();
    let forwarder = {
        let socket = UdpSocket::bind(&listen_address).unwrap();
        let peers = vec![
            "192.168.1.136:4711".parse::<SocketAddr>().unwrap(),
            "192.168.1.136:4712".parse::<SocketAddr>().unwrap(),
        ];
        UdpFramed::new(UdpSocket::bind(&listen_address).unwrap(), BytesCodec::new()).for_each(
            move |(bytes, _from)| {
                // These are the problematic lines
                for peer in peers.iter() {
                    socket.send_dgram(&bytes, &peer);
                }
                Ok(())
            },
        )
    };

    tokio::run({
        forwarder
            .map_err(|err| println!("Error: {}", err))
            .map(|_| ())
    });
}

有问题的线路正在尝试使用新绑定(bind)的套接字将接收到的数据包发送到其他多个地址。

现有示例全部将数据包转发到单个目标,或者在内部使用mpsc channel 在内部任务之间进行通信。我认为这不是必需的,并且不必在每个监听套接字中产生多个任务就可以执行此操作。

更新:感谢@Ömer-erden,我得到了可以运行的代码。
extern crate bytes;
extern crate futures;

use std::net::SocketAddr;
use tokio::codec::BytesCodec;
use tokio::net::{UdpFramed, UdpSocket};
use tokio::prelude::*;

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let listen_address = "0.0.0.0:4711".parse::<SocketAddr>()?;
    let socket = UdpSocket::bind(&listen_address)?;
    let peers: Vec<SocketAddr> = vec!["192.168.1.136:8080".parse()?, "192.168.1.136:8081".parse()?];
    let (mut writer, reader) = UdpFramed::new(socket, BytesCodec::new()).split();
    let forwarder = reader.for_each(move |(bytes, _from)| {
        for peer in peers.iter() {
            writer.start_send((bytes.clone().into(), peer.clone()))?;
        }
        writer.poll_complete()?;
        Ok(())
    });

    tokio::run({
        forwarder
            .map_err(|err| println!("Error: {}", err))
            .map(|_| ())
    });
    Ok(())
}

注意:
  • 不必为每个poll_completion都调用start_send:仅在派出所有start_send之后才需要调用它。
  • 出于某种原因,在调用之间会破坏peer的内容(但没有编译器错误),从而生成错误22(通常是因为给sendto(2)分配了错误的地址)。

    在调试器中查看,很明显,第二次,对等地址指向无效的内存。我选择克隆peer
  • 我删除了对unwrap()的调用,而是向上传播Result
  • 最佳答案

    您的代码有一个逻辑错误:您尝试两次将相同的地址分别绑定(bind)为发送者和接收者,所以绑定(bind)相同的地址。相反,您可以使用流并接收UdpFramed具有提供此功能的功能,请参见 Sink :



    let listen_address = "127.0.0.1:4711".parse::<SocketAddr>().unwrap();
    let forwarder = {
        let (mut socket_sink, socket_stream) =
            UdpFramed::new(UdpSocket::bind(&listen_address).unwrap(), BytesCodec::new()).split();
        let peers = vec![
            "192.168.1.136:4711".parse::<SocketAddr>().unwrap(),
            "192.168.1.136:4712".parse::<SocketAddr>().unwrap(),
        ];
    
        socket_stream.for_each(move |(bytes, _from)| {
            for peer in peers.iter() {
                socket_sink.start_send((bytes.clone().into(), *peer));
                socket_sink.poll_complete();
            }
            Ok(())
        })
    };
    
    tokio::run({
        forwarder
            .map_err(|err| println!("Error: {}", err))
            .map(|_| ())
    });
    

    关于sockets - 使用Tokio future 的多播UDP数据包,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/56578876/

    10-10 23:19
    查看更多