1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
// Copyright Kani Contributors
// SPDX-License-Identifier: Apache-2.0 OR MIT

//! This module contains functions to work with futures (and async/.await) in Kani.

use std::{
    future::Future,
    pin::Pin,
    task::{Context, RawWaker, RawWakerVTable, Waker},
};

/// A very simple executor: it polls the future in a busy loop until completion
///
/// This is intended as a drop-in replacement for `futures::block_on`, which Kani cannot handle.
/// Whereas a clever executor like `block_on` in `futures` or `tokio` would interact with the OS scheduler
/// to be woken up when a resource becomes available, this is not supported by Kani.
/// As a consequence, this function completely ignores the waker infrastructure and just polls the given future in a busy loop.
///
/// Note that [`spawn`] is not supported with this function. Use [`block_on_with_spawn`] if you need it.
#[crate::unstable(feature = "async-lib", issue = 2559, reason = "experimental async support")]
pub fn block_on<T>(mut fut: impl Future<Output = T>) -> T {
    let waker = unsafe { Waker::from_raw(NOOP_RAW_WAKER) };
    let cx = &mut Context::from_waker(&waker);
    // SAFETY: we shadow the original binding, so it cannot be accessed again for the rest of the scope.
    // This is the same as what the pin_mut! macro in the futures crate does.
    let mut fut = unsafe { Pin::new_unchecked(&mut fut) };
    loop {
        match fut.as_mut().poll(cx) {
            std::task::Poll::Ready(res) => return res,
            std::task::Poll::Pending => continue,
        }
    }
}

/// A dummy waker, which is needed to call [`Future::poll`]
const NOOP_RAW_WAKER: RawWaker = {
    #[inline]
    unsafe fn clone_waker(_: *const ()) -> RawWaker {
        NOOP_RAW_WAKER
    }

    #[inline]
    unsafe fn noop(_: *const ()) {}

    RawWaker::new(std::ptr::null(), &RawWakerVTable::new(clone_waker, noop, noop, noop))
};

/// The global executor used by [`spawn`] and [`block_on_with_spawn`] to run tasks.
static mut GLOBAL_EXECUTOR: Option<Scheduler> = None;

type BoxFuture = Pin<Box<dyn Future<Output = ()> + Sync + 'static>>;

/// Indicates to the scheduler whether it can `kani::assume` that the returned task is running.
///
/// This is useful if the task was picked nondeterministically using `kani::any()`.
/// For more information, see [`SchedulingStrategy`].
pub enum SchedulingAssumption {
    CanAssumeRunning,
    CannotAssumeRunning,
}

/// Trait that determines the possible sequence of tasks scheduling for a harness.
///
/// If your harness spawns several tasks, Kani's scheduler has to decide in what order to poll them.
/// This order may depend on the needs of your verification goal.
/// For example, you sometimes may wish to verify all possible schedulings, i.e. a nondeterministic scheduling strategy.
///
/// Nondeterministic scheduling strategies can be very slow to verify because they require Kani to check a large number of permutations of tasks.
/// So if you want to verify a harness that uses `spawn`, but don't care about concurrency issues, you can simply use a deterministic scheduling strategy,
/// such as [`RoundRobin`], which polls each task in turn.
///
/// Finally, you have the option of providing your own scheduling strategy by implementing this trait.
/// This can be useful, for example, if you want to verify that things work correctly for a very specific task ordering.
pub trait SchedulingStrategy {
    /// Picks the next task to be scheduled whenever the scheduler needs to pick a task to run next, and whether it can be assumed that the picked task is still running
    ///
    /// Tasks are numbered `0..num_tasks`.
    /// For example, if pick_task(4) returns (2, CanAssumeRunning) than it picked the task with index 2 and allows Kani to `assume` that this task is still running.
    /// This is useful if the task is chosen nondeterministicall (`kani::any()`) and allows the verifier to discard useless execution branches (such as polling a completed task again).
    ///
    /// As a rule of thumb:
    /// if the scheduling strategy picks the next task nondeterministically (using `kani::any()`), return CanAssumeRunning, otherwise CannotAssumeRunning.
    /// When returning `CanAssumeRunning`, the scheduler will then assume that the picked task is still running, which cuts off "useless" paths where a completed task is polled again.
    /// It is even necessary to make things terminate if nondeterminism is involved:
    /// if we pick the task nondeterministically, and don't have the restriction to still running tasks, we could poll the same task over and over again.
    ///
    /// However, for most deterministic scheduling strategies, e.g. the round robin scheduling strategy, assuming that the picked task is still running is generally not possible
    /// because if that task has ended, we are saying assume(false) and the verification effectively stops (which is undesirable, of course).
    /// In such cases, return `CannotAssumeRunning` instead.
    fn pick_task(&mut self, num_tasks: usize) -> (usize, SchedulingAssumption);
}

/// Keeps cycling through the tasks in a deterministic order
#[derive(Default)]
pub struct RoundRobin {
    index: usize,
}

impl SchedulingStrategy for RoundRobin {
    #[inline]
    fn pick_task(&mut self, num_tasks: usize) -> (usize, SchedulingAssumption) {
        self.index = (self.index + 1) % num_tasks;
        (self.index, SchedulingAssumption::CannotAssumeRunning)
    }
}

pub(crate) struct Scheduler {
    tasks: Vec<Option<BoxFuture>>,
    num_running: usize,
}

impl Scheduler {
    /// Creates a scheduler with an empty task list
    #[inline]
    pub(crate) const fn new() -> Scheduler {
        Scheduler { tasks: Vec::new(), num_running: 0 }
    }

    /// Adds a future to the scheduler's task list, returning a JoinHandle
    pub(crate) fn spawn<F: Future<Output = ()> + Sync + 'static>(&mut self, fut: F) -> JoinHandle {
        let index = self.tasks.len();
        self.tasks.push(Some(Box::pin(fut)));
        self.num_running += 1;
        JoinHandle { index }
    }

    /// Runs the scheduler with the given scheduling plan until all tasks have completed
    fn run(&mut self, mut scheduling_plan: impl SchedulingStrategy) {
        let waker = unsafe { Waker::from_raw(NOOP_RAW_WAKER) };
        let cx = &mut Context::from_waker(&waker);
        while self.num_running > 0 {
            let (index, assumption) = scheduling_plan.pick_task(self.tasks.len());
            let task = &mut self.tasks[index];
            if let Some(fut) = task.as_mut() {
                match fut.as_mut().poll(cx) {
                    std::task::Poll::Ready(()) => {
                        self.num_running -= 1;
                        let _prev = task.take();
                    }
                    std::task::Poll::Pending => (),
                }
            } else if let SchedulingAssumption::CanAssumeRunning = assumption {
                crate::assume(false); // useful so that we can assume that a nondeterministically picked task is still running
            }
        }
    }

    /// Polls the given future and the tasks it may spawn until all of them complete
    fn block_on<F: Future<Output = ()> + Sync + 'static>(
        &mut self,
        fut: F,
        scheduling_plan: impl SchedulingStrategy,
    ) {
        self.spawn(fut);
        self.run(scheduling_plan);
    }
}

/// Result of spawning a task.
///
/// If you `.await` a JoinHandle, this will wait for the spawned task to complete.
pub struct JoinHandle {
    index: usize,
}

impl Future for JoinHandle {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
        if unsafe { GLOBAL_EXECUTOR.as_mut().unwrap().tasks[self.index].is_some() } {
            std::task::Poll::Pending
        } else {
            cx.waker().wake_by_ref(); // For completeness. But Kani currently ignores wakers.
            std::task::Poll::Ready(())
        }
    }
}

/// Spawns a task on the current global executor (which is set by [`block_on_with_spawn`])
///
/// This function can only be called inside a future passed to [`block_on_with_spawn`].
#[crate::unstable(feature = "async-lib", issue = 2559, reason = "experimental async support")]
pub fn spawn<F: Future<Output = ()> + Sync + 'static>(fut: F) -> JoinHandle {
    unsafe {
        if let Some(executor) = GLOBAL_EXECUTOR.as_mut() {
            executor.spawn(fut)
        } else {
            // An explicit panic instead of `.expect(...)` has better location information in Kani's output
            panic!("`spawn` should only be called within `block_on_with_spawn`")
        }
    }
}

/// Polls the given future and the tasks it may spawn until all of them complete
///
/// Contrary to [`block_on`], this allows `spawn`ing other futures
#[crate::unstable(feature = "async-lib", issue = 2559, reason = "experimental async support")]
pub fn block_on_with_spawn<F: Future<Output = ()> + Sync + 'static>(
    fut: F,
    scheduling_plan: impl SchedulingStrategy,
) {
    unsafe {
        assert!(GLOBAL_EXECUTOR.is_none(), "`block_on_with_spawn` should not be nested");
        GLOBAL_EXECUTOR = Some(Scheduler::new());
        GLOBAL_EXECUTOR.as_mut().unwrap().block_on(fut, scheduling_plan);
        GLOBAL_EXECUTOR = None;
    }
}

/// Suspends execution of the current future, to allow the scheduler to poll another future
///
/// Specifically, it returns a future that isn't ready until the second time it is polled.
#[crate::unstable(feature = "async-lib", issue = 2559, reason = "experimental async support")]
pub fn yield_now() -> impl Future<Output = ()> {
    struct YieldNow {
        yielded: bool,
    }

    impl Future for YieldNow {
        type Output = ();

        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
            if self.yielded {
                cx.waker().wake_by_ref(); // For completeness. But Kani currently ignores wakers.
                std::task::Poll::Ready(())
            } else {
                self.yielded = true;
                std::task::Poll::Pending
            }
        }
    }

    YieldNow { yielded: false }
}