嵌入式经常有类似通过串口发送指令然后等待响应再做出进一步反应的需求。比如,通过串口以AT命令来操作蓝牙模块执行扫描、连接,需要根据实际情况进行操作,复杂的可能需要执行7、8条指令才能完成连接。
对于这样的需求,如果用异步编程的方式来做,由于收发双方被解除了耦合,所以需要设置多种状态标记来对齐收发双方的步调,逻辑上非常不直观、操作上也非常繁琐,稍有疏漏就会出现bug,而且也不便于扩展【新功能需要重新修订状态空间,这就会对现有功能产生潜在的干扰】。所以我们一般都是将其串行化来简化编程。
在用c基于rt-thread开发时,这样的异步操作串行化我是通过系统提供的线程间通信工具【信号量】+用宏进行简写来实现的。
现在,当我们基于Embassy来开发嵌入式时,rust提供的async/await就是一套标准化的将并发操作进行串行化的工具。
理解await
rust中async/await的核心就是Future【一个需要系统介入的trait】:
1、async就是告诉编译器:我不是普通函数,我是一个Future,需要特殊对待
2、await则通知编译器,对于一个Future,需要按如下方式来操作:
- 为调用我,需要生成一个任务
- 将这个任务打包成一个可wake的上下文【当然还包括其它准备工作】
- 设置一个等待点,当任务执行完毕时会返回到该处并带回结果,然后从等待点继续执行
- 控制流转向这个任务,即启动这个任务,然后在其中执行我的poll函数【以该上下文为参数】
这个任务属于系统任务,由rust的运行时负责管理与调度。
我们完整描述一下await的异步操作串行化过程:
- 当前执行流启动异步操作后,发出await指令
- await指令会导致控制权被转移给rust的系统运行时
- 系统运行时启动一个用于检查该异步操作是否执行完毕的poll任务,这个poll任务将由系统按需调度
- poll任务每次被调度执行时都会检查异步操作的执行情况
- 检查后,如果通知异步操作还没执行完,系统就挂起poll任务并等待下一次调度机会
- 检查后,如果通知异步操作执行完毕了,系统就结束poll任务并带着异步操作的执行结果返回到await指令发出点
- 原来的执行流得到异步操作的执行结果,然后继续执行
大家可以看到,异步操作串行化需要完成大量的工作,在没有await的情况下,如我基于rtt的实现中,就全部需要我们自己来实现了。当然,我们自己的实现不需要考虑通用性,自然也就不会这么复杂,但主要的逻辑都是需要覆盖到的。
由我们自己编写的、负责检查异步任务是否完成的poll函数就两个返回值:
- Pending:指示异步操作还没有完成,则将poll任务丢进等待队列沉睡,等待wake后重新检查【即再次执行poll函数】
- Ready(data):指示异步操作执行完毕,则将poll任务从等待队列中踢出去,然后将Ready所包裹的数据作为结果返回调用任务时的等待点继续执行
现在就只有一个问题了:poll任务什么时候会被投入运行呢?!准备好的就可以执行。
那么,怎么才叫准备好的呢?!没执行过的、从沉睡中被wake的。
那么,怎么wake呢?!三种方式:
- 我们手动提取到waker,交给对应的系统部件,如放入时钟任务队列,由系统wake
- 我们手动提取到waker,自己保存起来,等条件满足时,我们自己wake
- 系统提取到waker,自动帮我们wake
就上述过程,Embassy/rust中存在两种特例:
1、Timer超时
如果我们需要原地等待上一段时间,我们一般会:
//等待10毫秒
Timer::after_millis(10).await;
这个时候并没有需要等待的异步操作,就单纯的等待时间的流逝。所以,就是把waker推入系统时钟队列,等待唤醒即可。
2、async标记的函数【rust中就是一个Future】
这个函数就是需要等待的异步操作。
这个时候,出现了三个执行流:
- 原执行流,由于await指令的调用,该执行流暂停
- 系统执行流,由于await指令的调用,系统运行时获得了执行权,其需要完成一系列的相关工作
- 异步执行流,系统运行时会创建一个新的任务,并在其中执行被async标记的异步函数,该函数什么时候会得到执行,理论上由系统运行时按需进行调度
当然,我们说的是理论上,不同的运行时自然有自己的考虑。
考虑到大家的习惯,我们一般都把异步操作封装到异步函数中来执行。但大家一定要理解:await是一个系统指令,和是否调用了async标记的函数没有必然的联系。
我们完全可以先创建一个Future,然后执行一系列准备工作并启动一个异步操作,然后await这个Future。而这,也正是await的标准使用方法,只不过我们经常会将执行准备工作和启动异步操作都放到一个函数中来做,最后由这个函数返回一个Future。所以大家才形成了async/await配对使用的印象。
但这个印象当然是错误的:只要是Future就可以await,async只不过简化了【创建一个匿名struct、为其编写poll函数及Future、将异步函数封装进一个隐式函数中完成创建该匿名struct、调用函数、返回刚创建的匿名struct】这一繁琐的过程而已。
作为运行时的Embassy
Embassy实现了一个任务执行器,rust编译器会自动将其装配成系统运行时,我们写好自己的Future,然后在需要时await即可,rust编译器会配合Embassy的任务执行器帮助我们自动完成上面罗列的一大堆工作。
Embassy还实现了时钟任务队列,我们可以借用来实现超时。但需要我们显式导入,即首先需要在Cargo.toml中声明对应的依赖:
[dependencies]
...其它依赖
embassy-time-queue-driver = "0.1.0"
然后在我们的程序中使用:
let waker = cx.waker();
embassy_time_queue_driver::schedule_wake(self.expires_at.as_ticks(), waker);
举个例子
我们的目标是实现:串口发送命令后等待如下响应
- 接收正确
- 接收错误
- 接收超时【未得到任何响应】
注:何为正确、何为错误,这属于业务上的判断,对我们这个例子来说,不需要关心
我们需要为这个响应设计一个数据结构:
#[derive(Clone)]
pub struct ExpectResult {
//超时检测的最后期限
expires_at: Instant,
//响应处理完毕的标记
ready: bool,
//负责叫醒任务的服务员
waker: Option<core::task::Waker>,
//区分响应状态的一个结果码枚举
rc: ExpectResultCode,
//执行结果,根据我们的业务来设计,各种形式都可以
result: (...我们需要的数据...),
}
注:考虑rust的借用,一般需要抽离成一个内部的struct来做Future,外部只做包裹以及必要的操作接口。但为便于说明,这一部分就略去了
一般来说,作为基础结构,我们都会用泛型实现通用代码。通用代码应该包括两块:结果指示码、执行结果。其它语言一般都只能把结果指示码放入执行结果、在ExpectResult中用单独的timeout指示码指示是否超时,但rust则可以将其合并到结果指示码中,这样的话,处理时就可以统一处理:接收正确、接收错误、超时了。业务逻辑上更顺畅。
结果码我们可以如下定义:
#[derive(Clone, Copy)]
pub enum ExpectResultCode<T: Copy> {
None,
//框架不关心的业务码
BPCode(T),
//框架需要处理的超时,但也是业务层面的超时
Timeout,
}
通过rust强大的枚举,原本由框架管理的Timeout就依然和BPCode一样具有业务层级的语义,但又不受业务代码的干扰。
然后我们可以根据我们的业务需要为其准备各种操作,但其中应该有两个:
impl ExpectResult {
...其它业务代码...
//代替new来创建期望结果的数据结构,如果不需要超时,就直接用new
pub fn timeout(duration: Duration) -> Self {
Self {
expires_at: Instant::now() + duration,
ready: false,
waker: None,
rc: ExpectResultCode::None,
result: (...现在还没有结果,应该就是一堆None/0之类...),
}
}
//异步操作完毕,设置结果;TC是业务结果码的泛型
pub fn set_result(&mut self, rc: TC, result: ...) {
self.rc = ExpectResultCode::BPCode(rc);
self.result = ...;
//异步操作结束
self.ready = true;
//唤醒等待中的poll任务
if let Some(waker) = &self.waker {
waker.wake();
}
}
}
然后我们就可以来实现Future了:
impl Unpin for ExpectResult {}
impl Future for ExpectResult {
type Output = (ExpectResultCode, ...我们需要的结果类型...);
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.ready {
//异步操作执行完毕,即有人执行了set_result
Poll::Ready((self.rc, ...set_result函数送入的结果...))
} else if self.expires_at < Instant::now() {
//超时
Poll::Ready((ExpectResultCode::Timeout, ...结果自然是没有了...))
} else {
//第一次调用,为上两个分支准备waker
let waker: &core::task::Waker = cx.waker();
//为set_result准备唤醒服务
self.waker = Some(waker.clone());
//为超时准备唤醒服务
embassy_time_queue_driver::schedule_wake(self.expires_at.as_ticks(), waker);
//最后,告诉系统:我先洗洗睡了,好了叫我
Poll::Pending
}
}
}
ok,那该怎么用呢?!
//以各种方式创建一个ExpectResult并为其按业务需要装订数据
//如等待50毫秒
let es = ExpectResult::timeout(Duration::from_millis(50));
...其它操作,如通过串口发送命令...
//准备完毕,异步操作已经启动,开始等待响应结果
//如你所见,对的,await针对的就是一个struct,而不是函数。
//啊?!可async修饰的就是一个函数啊?!不是async和await配合使用吗?!
//我们的ExpectResult已经在上面实现了Future,所以它可以直接用await进行串行化
//而如果是async函数,rust编译器会好心的给我们准备一个匿名struct的来实现我上面讲的东东的
//然后把async所修饰的函数作为需要等待执行完毕的异步操作丢进另一个线程/协程中执行
//并在该异步函数执行完毕时自动帮我们wake并返回该异步函数执行的结果
let (rc, ...我们需要的结果...) = es.await;
match rc {
ExpectResultCode::Timeout => {
//超时处理,如重发三次等等
...
},
...
}
相比我们自己的实现,await机制,由于系统运行时和编译器配合着帮我们做了相当多的工作,自然非常轻便与灵活,而且编写起来也比较简单、逻辑清晰。