kani/
futures.rs

1// Copyright Kani Contributors
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! This module contains functions to work with futures (and async/.await) in Kani.
5
6use std::{
7    future::Future,
8    pin::Pin,
9    task::{Context, RawWaker, RawWakerVTable, Waker},
10};
11
12/// A very simple executor: it polls the future in a busy loop until completion
13///
14/// This is intended as a drop-in replacement for `futures::block_on`, which Kani cannot handle.
15/// Whereas a clever executor like `block_on` in `futures` or `tokio` would interact with the OS scheduler
16/// to be woken up when a resource becomes available, this is not supported by Kani.
17/// As a consequence, this function completely ignores the waker infrastructure and just polls the given future in a busy loop.
18///
19/// Note that [`spawn`] is not supported with this function. Use [`block_on_with_spawn`] if you need it.
20#[crate::unstable(feature = "async-lib", issue = 2559, reason = "experimental async support")]
21pub fn block_on<T>(mut fut: impl Future<Output = T>) -> T {
22    let waker = unsafe { Waker::from_raw(NOOP_RAW_WAKER) };
23    let cx = &mut Context::from_waker(&waker);
24    // SAFETY: we shadow the original binding, so it cannot be accessed again for the rest of the scope.
25    // This is the same as what the pin_mut! macro in the futures crate does.
26    let mut fut = unsafe { Pin::new_unchecked(&mut fut) };
27    loop {
28        match fut.as_mut().poll(cx) {
29            std::task::Poll::Ready(res) => return res,
30            std::task::Poll::Pending => continue,
31        }
32    }
33}
34
35/// A dummy waker, which is needed to call [`Future::poll`]
36const NOOP_RAW_WAKER: RawWaker = {
37    #[inline]
38    unsafe fn clone_waker(_: *const ()) -> RawWaker {
39        NOOP_RAW_WAKER
40    }
41
42    #[inline]
43    unsafe fn noop(_: *const ()) {}
44
45    RawWaker::new(std::ptr::null(), &RawWakerVTable::new(clone_waker, noop, noop, noop))
46};
47
48/// The global executor used by [`spawn`] and [`block_on_with_spawn`] to run tasks.
49static mut GLOBAL_EXECUTOR: Option<Scheduler> = None;
50
51type BoxFuture = Pin<Box<dyn Future<Output = ()> + Sync + 'static>>;
52
53/// Indicates to the scheduler whether it can `kani::assume` that the returned task is running.
54///
55/// This is useful if the task was picked nondeterministically using `kani::any()`.
56/// For more information, see [`SchedulingStrategy`].
57pub enum SchedulingAssumption {
58    CanAssumeRunning,
59    CannotAssumeRunning,
60}
61
62/// Trait that determines the possible sequence of tasks scheduling for a harness.
63///
64/// If your harness spawns several tasks, Kani's scheduler has to decide in what order to poll them.
65/// This order may depend on the needs of your verification goal.
66/// For example, you sometimes may wish to verify all possible schedulings, i.e. a nondeterministic scheduling strategy.
67///
68/// Nondeterministic scheduling strategies can be very slow to verify because they require Kani to check a large number of permutations of tasks.
69/// So if you want to verify a harness that uses `spawn`, but don't care about concurrency issues, you can simply use a deterministic scheduling strategy,
70/// such as [`RoundRobin`], which polls each task in turn.
71///
72/// Finally, you have the option of providing your own scheduling strategy by implementing this trait.
73/// This can be useful, for example, if you want to verify that things work correctly for a very specific task ordering.
74pub trait SchedulingStrategy {
75    /// Picks the next task to be scheduled whenever the scheduler needs to pick a task to run next, and whether it can be assumed that the picked task is still running
76    ///
77    /// Tasks are numbered `0..num_tasks`.
78    /// For example, if pick_task(4) returns (2, CanAssumeRunning) than it picked the task with index 2 and allows Kani to `assume` that this task is still running.
79    /// This is useful if the task is chosen nondeterministicall (`kani::any()`) and allows the verifier to discard useless execution branches (such as polling a completed task again).
80    ///
81    /// As a rule of thumb:
82    /// if the scheduling strategy picks the next task nondeterministically (using `kani::any()`), return CanAssumeRunning, otherwise CannotAssumeRunning.
83    /// When returning `CanAssumeRunning`, the scheduler will then assume that the picked task is still running, which cuts off "useless" paths where a completed task is polled again.
84    /// It is even necessary to make things terminate if nondeterminism is involved:
85    /// if we pick the task nondeterministically, and don't have the restriction to still running tasks, we could poll the same task over and over again.
86    ///
87    /// However, for most deterministic scheduling strategies, e.g. the round robin scheduling strategy, assuming that the picked task is still running is generally not possible
88    /// because if that task has ended, we are saying assume(false) and the verification effectively stops (which is undesirable, of course).
89    /// In such cases, return `CannotAssumeRunning` instead.
90    fn pick_task(&mut self, num_tasks: usize) -> (usize, SchedulingAssumption);
91}
92
93/// Keeps cycling through the tasks in a deterministic order
94#[derive(Default)]
95pub struct RoundRobin {
96    index: usize,
97}
98
99impl SchedulingStrategy for RoundRobin {
100    #[inline]
101    fn pick_task(&mut self, num_tasks: usize) -> (usize, SchedulingAssumption) {
102        self.index = (self.index + 1) % num_tasks;
103        (self.index, SchedulingAssumption::CannotAssumeRunning)
104    }
105}
106
107pub(crate) struct Scheduler {
108    tasks: Vec<Option<BoxFuture>>,
109    num_running: usize,
110}
111
112impl Scheduler {
113    /// Creates a scheduler with an empty task list
114    #[inline]
115    pub(crate) const fn new() -> Scheduler {
116        Scheduler { tasks: Vec::new(), num_running: 0 }
117    }
118
119    /// Adds a future to the scheduler's task list, returning a JoinHandle
120    pub(crate) fn spawn<F: Future<Output = ()> + Sync + 'static>(&mut self, fut: F) -> JoinHandle {
121        let index = self.tasks.len();
122        self.tasks.push(Some(Box::pin(fut)));
123        self.num_running += 1;
124        JoinHandle { index }
125    }
126
127    /// Runs the scheduler with the given scheduling plan until all tasks have completed
128    fn run(&mut self, mut scheduling_plan: impl SchedulingStrategy) {
129        let waker = unsafe { Waker::from_raw(NOOP_RAW_WAKER) };
130        let cx = &mut Context::from_waker(&waker);
131        while self.num_running > 0 {
132            let (index, assumption) = scheduling_plan.pick_task(self.tasks.len());
133            let task = &mut self.tasks[index];
134            if let Some(fut) = task.as_mut() {
135                match fut.as_mut().poll(cx) {
136                    std::task::Poll::Ready(()) => {
137                        self.num_running -= 1;
138                        let _prev = task.take();
139                    }
140                    std::task::Poll::Pending => (),
141                }
142            } else if let SchedulingAssumption::CanAssumeRunning = assumption {
143                crate::assume(false); // useful so that we can assume that a nondeterministically picked task is still running
144            }
145        }
146    }
147
148    /// Polls the given future and the tasks it may spawn until all of them complete
149    fn block_on<F: Future<Output = ()> + Sync + 'static>(
150        &mut self,
151        fut: F,
152        scheduling_plan: impl SchedulingStrategy,
153    ) {
154        self.spawn(fut);
155        self.run(scheduling_plan);
156    }
157}
158
159/// Result of spawning a task.
160///
161/// If you `.await` a JoinHandle, this will wait for the spawned task to complete.
162pub struct JoinHandle {
163    index: usize,
164}
165
166#[allow(static_mut_refs)]
167impl Future for JoinHandle {
168    type Output = ();
169
170    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
171        if unsafe { GLOBAL_EXECUTOR.as_mut().unwrap().tasks[self.index].is_some() } {
172            std::task::Poll::Pending
173        } else {
174            cx.waker().wake_by_ref(); // For completeness. But Kani currently ignores wakers.
175            std::task::Poll::Ready(())
176        }
177    }
178}
179
180/// Spawns a task on the current global executor (which is set by [`block_on_with_spawn`])
181///
182/// This function can only be called inside a future passed to [`block_on_with_spawn`].
183#[crate::unstable(feature = "async-lib", issue = 2559, reason = "experimental async support")]
184#[allow(static_mut_refs)]
185pub fn spawn<F: Future<Output = ()> + Sync + 'static>(fut: F) -> JoinHandle {
186    unsafe {
187        if let Some(executor) = GLOBAL_EXECUTOR.as_mut() {
188            executor.spawn(fut)
189        } else {
190            // An explicit panic instead of `.expect(...)` has better location information in Kani's output
191            panic!("`spawn` should only be called within `block_on_with_spawn`")
192        }
193    }
194}
195
196/// Polls the given future and the tasks it may spawn until all of them complete
197///
198/// Contrary to [`block_on`], this allows `spawn`ing other futures
199#[crate::unstable(feature = "async-lib", issue = 2559, reason = "experimental async support")]
200#[allow(static_mut_refs)]
201pub fn block_on_with_spawn<F: Future<Output = ()> + Sync + 'static>(
202    fut: F,
203    scheduling_plan: impl SchedulingStrategy,
204) {
205    unsafe {
206        assert!(GLOBAL_EXECUTOR.is_none(), "`block_on_with_spawn` should not be nested");
207        GLOBAL_EXECUTOR = Some(Scheduler::new());
208        GLOBAL_EXECUTOR.as_mut().unwrap().block_on(fut, scheduling_plan);
209        GLOBAL_EXECUTOR = None;
210    }
211}
212
213/// Suspends execution of the current future, to allow the scheduler to poll another future
214///
215/// Specifically, it returns a future that isn't ready until the second time it is polled.
216#[crate::unstable(feature = "async-lib", issue = 2559, reason = "experimental async support")]
217pub fn yield_now() -> impl Future<Output = ()> {
218    struct YieldNow {
219        yielded: bool,
220    }
221
222    impl Future for YieldNow {
223        type Output = ();
224
225        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
226            if self.yielded {
227                cx.waker().wake_by_ref(); // For completeness. But Kani currently ignores wakers.
228                std::task::Poll::Ready(())
229            } else {
230                self.yielded = true;
231                std::task::Poll::Pending
232            }
233        }
234    }
235
236    YieldNow { yielded: false }
237}