本文介绍了如何使用 futures.rs 和 Redis PubSub 为阻塞调用实现期货流?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试创建一个系统,我的应用程序可以通过该系统从 Redis PubSub 通道接收流数据并对其进行处理.我正在使用的 Redis 驱动程序,以及我使用的所有其他 Rust Redis 驱动程序看到了,使用阻塞操作从通道中获取数据,只有在接收到数据时才返回一个值:

I'm trying to create a system by which my application can receive streaming data from a Redis PubSub channel and process it. The Redis driver that I'm using, along with all other Redis drivers for Rust that I've seen, use a blocking operation to get data from the channel that only returns a value when it receives data:

let msg = match pubsub.get_message() {
        Ok(m) => m,
        Err(_) => panic!("Could not get message from pubsub!")
};
let payload: String = match msg.get_payload() {
    Ok(s) => s,
    Err(_) => panic!("Could not convert redis message to string!")
};

我想在将来使用 futures-rs 库来包装这个阻塞函数调用这样我就可以在等待输入的同时在我的应用程序中执行其他任务.

I wanted to use the futures-rs library to wrap this blocking function call in a future so that I can perform other tasks within my application while waiting for input.

我阅读了期货的教程并尝试创建一个 Stream 会在 PubSub 收到数据时发出信号,但我不知道该怎么做.

I read the tutorial for futures and tried to create a Stream that would signal when there data is received by the PubSub, but I can't figure out how to do so.

如何为阻塞 pubsub.get_message() 函数创建 schedulepoll 函数?

How can I create schedule and poll functions for the blocking pubsub.get_message() function?

推荐答案

严重警告 我以前从未使用过这个库,我对某些概念的了解有点低... 不足.我主要是在阅读教程.我很确定任何做过异步工作的人都会读到这篇文章并笑出声来,但这对其他人来说可能是一个有用的起点.警告清空者!

Heavy caveat I've never used this library before, and my low-level knowledge of some of the concepts is a bit... lacking. Mostly I'm reading through the tutorial. I'm pretty sure anyone who has done async work will read this and laugh, but it may be a useful starting point for other people. Caveat emptor!

让我们从更简单的事情开始,演示 Stream 的工作原理.我们可以将 Result 的迭代器转换为流:

Let's start with something a bit simpler, demonstrating how a Stream works. We can convert an iterator of Results into a stream:

extern crate futures;

use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let payloads: Vec<Result<String, ()>> = vec![Ok("a".into()), Ok("b".into())];
    let payloads = stream::iter(payloads.into_iter());

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
}

这向我们展示了一种消费流的方式.我们使用 and_then 对每个有效载荷做一些事情(这里只是打印出来)然后 for_eachStream 转换回 未来.然后我们可以通过调用奇怪命名的 forget 方法来运行未来.

This shows us one way to consume the stream. We use and_then to do something to each payload (here just printing it out) and then for_each to convert the Stream back into a Future. We can then run the future by calling the strangely named forget method.

接下来是将 Redis 库结合起来,只处理一条消息.由于 get_message() 方法是阻塞的,我们需要在混合中引入一些线程.在这种类型的异步系统中执行大量工作并不是一个好主意,因为其他一切都会被阻塞.例如:

Next is to tie the Redis library into the mix, handling just one message. Since the get_message() method is blocking, we need to introduce some threads into the mix. It's not a good idea to perform large amount of work in this type of asynchronous system as everything else will be blocked. For example:

除非另有安排,否则应确保此功能的实现非常快.

在理想的世界中,redis crate 将构建在类似 futures 的库之上,并在本机公开所有这些.

In an ideal world, the redis crate would be built atop a library like futures and expose all this natively.

extern crate redis;
extern crate futures;

use std::thread;
use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");

    let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
    pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");

    let (tx, payloads) = stream::channel();

    let redis_thread = thread::spawn(move || {
        let msg = pubsub.get_message().expect("Unable to get message");
        let payload: Result<String, _> = msg.get_payload();
        tx.send(payload).forget();
    });

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
    redis_thread.join().expect("unable to join to thread");
}

我的理解在这里变得更加模糊.在一个单独的线程中,我们阻塞消息并在我们收到消息时将其推送到通道中.我不明白的是为什么我们需要抓住线程的句柄.我希望 foo.forget 会阻塞自己,等待流为空.

My understanding gets fuzzier here. In a separate thread, we block for the message and push it into the channel when we get it. What I don't understand is why we need to hold onto the thread's handle. I would expect that foo.forget would be blocking itself, waiting until the stream is empty.

在到 Redis 服务器的 telnet 连接中,发送:

In a telnet connection to the Redis server, send this:

publish rust awesome

你会看到它有效.添加打印语句表明(对我而言)foo.forget 语句在线程产生之前运行.

And you will see it works. Adding print statements shows that the (for me) the foo.forget statement is run before the thread is spawned.

多条消息比较棘手.Sender 消耗自身以防止生成方领先于消费方太远.这是通过从 send 返回另一个未来来实现的!我们需要将它从那里穿梭回来,以便在循环的下一次迭代中重新使用它:

Multiple messages is trickier. The Sender consumes itself to prevent the generating side from getting too far ahead of the consuming side. This is accomplished by returning another future from send! We need to shuttle it back out of there to reuse it for the next iteration of the loop:

extern crate redis;
extern crate futures;

use std::thread;
use std::sync::mpsc;

use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");

    let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
    pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");

    let (tx, payloads) = stream::channel();

    let redis_thread = thread::spawn(move || {
        let mut tx = tx;

        while let Ok(msg) = pubsub.get_message() {
            let payload: Result<String, _> = msg.get_payload();

            let (next_tx_tx, next_tx_rx) = mpsc::channel();

            tx.send(payload).and_then(move |new_tx| {
                next_tx_tx.send(new_tx).expect("Unable to send successor channel tx");
                futures::finished(())
            }).forget();

            tx = next_tx_rx.recv().expect("Unable to receive successor channel tx");
        }
    });

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
    redis_thread.join().expect("unable to join to thread");
}

我相信随着时间的推移,这种类型的互操作将会有更多的生态系统.例如,futures-cpupool crate 可能扩展以支持与此类似的用例.

I'm sure that there will be more ecosystem for this type of interoperation as time goes on. For example, the futures-cpupool crate could probably be extended to support a similar usecase to this.

这篇关于如何使用 futures.rs 和 Redis PubSub 为阻塞调用实现期货流?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-07 03:04
查看更多