kani/futures.rs
1// Copyright Kani Contributors
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! This module contains functions to work with futures (and async/.await) in Kani.
5
6use std::{
7 future::Future,
8 pin::Pin,
9 task::{Context, RawWaker, RawWakerVTable, Waker},
10};
11
12/// A very simple executor: it polls the future in a busy loop until completion
13///
14/// This is intended as a drop-in replacement for `futures::block_on`, which Kani cannot handle.
15/// Whereas a clever executor like `block_on` in `futures` or `tokio` would interact with the OS scheduler
16/// to be woken up when a resource becomes available, this is not supported by Kani.
17/// As a consequence, this function completely ignores the waker infrastructure and just polls the given future in a busy loop.
18///
19/// Note that [`spawn`] is not supported with this function. Use [`block_on_with_spawn`] if you need it.
20#[crate::unstable(feature = "async-lib", issue = 2559, reason = "experimental async support")]
21pub fn block_on<T>(mut fut: impl Future<Output = T>) -> T {
22 let waker = unsafe { Waker::from_raw(NOOP_RAW_WAKER) };
23 let cx = &mut Context::from_waker(&waker);
24 // SAFETY: we shadow the original binding, so it cannot be accessed again for the rest of the scope.
25 // This is the same as what the pin_mut! macro in the futures crate does.
26 let mut fut = unsafe { Pin::new_unchecked(&mut fut) };
27 loop {
28 match fut.as_mut().poll(cx) {
29 std::task::Poll::Ready(res) => return res,
30 std::task::Poll::Pending => continue,
31 }
32 }
33}
34
35/// A dummy waker, which is needed to call [`Future::poll`]
36const NOOP_RAW_WAKER: RawWaker = {
37 #[inline]
38 unsafe fn clone_waker(_: *const ()) -> RawWaker {
39 NOOP_RAW_WAKER
40 }
41
42 #[inline]
43 unsafe fn noop(_: *const ()) {}
44
45 RawWaker::new(std::ptr::null(), &RawWakerVTable::new(clone_waker, noop, noop, noop))
46};
47
48/// The global executor used by [`spawn`] and [`block_on_with_spawn`] to run tasks.
49static mut GLOBAL_EXECUTOR: Option<Scheduler> = None;
50
51type BoxFuture = Pin<Box<dyn Future<Output = ()> + Sync + 'static>>;
52
53/// Indicates to the scheduler whether it can `kani::assume` that the returned task is running.
54///
55/// This is useful if the task was picked nondeterministically using `kani::any()`.
56/// For more information, see [`SchedulingStrategy`].
57pub enum SchedulingAssumption {
58 CanAssumeRunning,
59 CannotAssumeRunning,
60}
61
62/// Trait that determines the possible sequence of tasks scheduling for a harness.
63///
64/// If your harness spawns several tasks, Kani's scheduler has to decide in what order to poll them.
65/// This order may depend on the needs of your verification goal.
66/// For example, you sometimes may wish to verify all possible schedulings, i.e. a nondeterministic scheduling strategy.
67///
68/// Nondeterministic scheduling strategies can be very slow to verify because they require Kani to check a large number of permutations of tasks.
69/// So if you want to verify a harness that uses `spawn`, but don't care about concurrency issues, you can simply use a deterministic scheduling strategy,
70/// such as [`RoundRobin`], which polls each task in turn.
71///
72/// Finally, you have the option of providing your own scheduling strategy by implementing this trait.
73/// This can be useful, for example, if you want to verify that things work correctly for a very specific task ordering.
74pub trait SchedulingStrategy {
75 /// Picks the next task to be scheduled whenever the scheduler needs to pick a task to run next, and whether it can be assumed that the picked task is still running
76 ///
77 /// Tasks are numbered `0..num_tasks`.
78 /// For example, if pick_task(4) returns (2, CanAssumeRunning) than it picked the task with index 2 and allows Kani to `assume` that this task is still running.
79 /// This is useful if the task is chosen nondeterministicall (`kani::any()`) and allows the verifier to discard useless execution branches (such as polling a completed task again).
80 ///
81 /// As a rule of thumb:
82 /// if the scheduling strategy picks the next task nondeterministically (using `kani::any()`), return CanAssumeRunning, otherwise CannotAssumeRunning.
83 /// When returning `CanAssumeRunning`, the scheduler will then assume that the picked task is still running, which cuts off "useless" paths where a completed task is polled again.
84 /// It is even necessary to make things terminate if nondeterminism is involved:
85 /// if we pick the task nondeterministically, and don't have the restriction to still running tasks, we could poll the same task over and over again.
86 ///
87 /// However, for most deterministic scheduling strategies, e.g. the round robin scheduling strategy, assuming that the picked task is still running is generally not possible
88 /// because if that task has ended, we are saying assume(false) and the verification effectively stops (which is undesirable, of course).
89 /// In such cases, return `CannotAssumeRunning` instead.
90 fn pick_task(&mut self, num_tasks: usize) -> (usize, SchedulingAssumption);
91}
92
93/// Keeps cycling through the tasks in a deterministic order
94#[derive(Default)]
95pub struct RoundRobin {
96 index: usize,
97}
98
99impl SchedulingStrategy for RoundRobin {
100 #[inline]
101 fn pick_task(&mut self, num_tasks: usize) -> (usize, SchedulingAssumption) {
102 self.index = (self.index + 1) % num_tasks;
103 (self.index, SchedulingAssumption::CannotAssumeRunning)
104 }
105}
106
107pub(crate) struct Scheduler {
108 tasks: Vec<Option<BoxFuture>>,
109 num_running: usize,
110}
111
112impl Scheduler {
113 /// Creates a scheduler with an empty task list
114 #[inline]
115 pub(crate) const fn new() -> Scheduler {
116 Scheduler { tasks: Vec::new(), num_running: 0 }
117 }
118
119 /// Adds a future to the scheduler's task list, returning a JoinHandle
120 pub(crate) fn spawn<F: Future<Output = ()> + Sync + 'static>(&mut self, fut: F) -> JoinHandle {
121 let index = self.tasks.len();
122 self.tasks.push(Some(Box::pin(fut)));
123 self.num_running += 1;
124 JoinHandle { index }
125 }
126
127 /// Runs the scheduler with the given scheduling plan until all tasks have completed
128 fn run(&mut self, mut scheduling_plan: impl SchedulingStrategy) {
129 let waker = unsafe { Waker::from_raw(NOOP_RAW_WAKER) };
130 let cx = &mut Context::from_waker(&waker);
131 while self.num_running > 0 {
132 let (index, assumption) = scheduling_plan.pick_task(self.tasks.len());
133 let task = &mut self.tasks[index];
134 if let Some(fut) = task.as_mut() {
135 match fut.as_mut().poll(cx) {
136 std::task::Poll::Ready(()) => {
137 self.num_running -= 1;
138 let _prev = task.take();
139 }
140 std::task::Poll::Pending => (),
141 }
142 } else if let SchedulingAssumption::CanAssumeRunning = assumption {
143 crate::assume(false); // useful so that we can assume that a nondeterministically picked task is still running
144 }
145 }
146 }
147
148 /// Polls the given future and the tasks it may spawn until all of them complete
149 fn block_on<F: Future<Output = ()> + Sync + 'static>(
150 &mut self,
151 fut: F,
152 scheduling_plan: impl SchedulingStrategy,
153 ) {
154 self.spawn(fut);
155 self.run(scheduling_plan);
156 }
157}
158
159/// Result of spawning a task.
160///
161/// If you `.await` a JoinHandle, this will wait for the spawned task to complete.
162pub struct JoinHandle {
163 index: usize,
164}
165
166#[allow(static_mut_refs)]
167impl Future for JoinHandle {
168 type Output = ();
169
170 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
171 if unsafe { GLOBAL_EXECUTOR.as_mut().unwrap().tasks[self.index].is_some() } {
172 std::task::Poll::Pending
173 } else {
174 cx.waker().wake_by_ref(); // For completeness. But Kani currently ignores wakers.
175 std::task::Poll::Ready(())
176 }
177 }
178}
179
180/// Spawns a task on the current global executor (which is set by [`block_on_with_spawn`])
181///
182/// This function can only be called inside a future passed to [`block_on_with_spawn`].
183#[crate::unstable(feature = "async-lib", issue = 2559, reason = "experimental async support")]
184#[allow(static_mut_refs)]
185pub fn spawn<F: Future<Output = ()> + Sync + 'static>(fut: F) -> JoinHandle {
186 unsafe {
187 if let Some(executor) = GLOBAL_EXECUTOR.as_mut() {
188 executor.spawn(fut)
189 } else {
190 // An explicit panic instead of `.expect(...)` has better location information in Kani's output
191 panic!("`spawn` should only be called within `block_on_with_spawn`")
192 }
193 }
194}
195
196/// Polls the given future and the tasks it may spawn until all of them complete
197///
198/// Contrary to [`block_on`], this allows `spawn`ing other futures
199#[crate::unstable(feature = "async-lib", issue = 2559, reason = "experimental async support")]
200#[allow(static_mut_refs)]
201pub fn block_on_with_spawn<F: Future<Output = ()> + Sync + 'static>(
202 fut: F,
203 scheduling_plan: impl SchedulingStrategy,
204) {
205 unsafe {
206 assert!(GLOBAL_EXECUTOR.is_none(), "`block_on_with_spawn` should not be nested");
207 GLOBAL_EXECUTOR = Some(Scheduler::new());
208 GLOBAL_EXECUTOR.as_mut().unwrap().block_on(fut, scheduling_plan);
209 GLOBAL_EXECUTOR = None;
210 }
211}
212
213/// Suspends execution of the current future, to allow the scheduler to poll another future
214///
215/// Specifically, it returns a future that isn't ready until the second time it is polled.
216#[crate::unstable(feature = "async-lib", issue = 2559, reason = "experimental async support")]
217pub fn yield_now() -> impl Future<Output = ()> {
218 struct YieldNow {
219 yielded: bool,
220 }
221
222 impl Future for YieldNow {
223 type Output = ();
224
225 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
226 if self.yielded {
227 cx.waker().wake_by_ref(); // For completeness. But Kani currently ignores wakers.
228 std::task::Poll::Ready(())
229 } else {
230 self.yielded = true;
231 std::task::Poll::Pending
232 }
233 }
234 }
235
236 YieldNow { yielded: false }
237}