utils/
async_ecs.rs

1//! This module handles asynchronous operations in ECS.
2//!
3//! Most notably, it defines the `AsyncComponent` struct which tracks the state of an async operation
4//! and automatically executes commands emitted on the world the component is attached to.
5
6use bevy::ecs::world::CommandQueue;
7use bevy::prelude::{BevyError, Commands, Component, Entity, Event, Query, World};
8use bevy::tasks::futures_lite::future;
9use bevy::tasks::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool, Task, block_on};
10pub use crossbeam_channel::{Receiver, Sender};
11use crossbeam_channel::{SendError, unbounded};
12use std::future::Future;
13
14/// Represents an ongoing asynchronous operation that will be polled for progress and/or completion.
15///
16/// To create instances, see [`AsyncComponent::new_async`], [`AsyncComponent::new_compute`] and
17/// [`AsyncComponent::new_io`].
18#[derive(Component)]
19pub struct AsyncComponent {
20    /// The task tracking if the asynchronous operation is completed.
21    task: Task<()>,
22    /// The channel through which updates are emitted.
23    ///
24    /// Updates are modelled through Bevy's [`CommandQueue`](https://docs.rs/bevy/latest/bevy/ecs/world/struct.CommandQueue.html),
25    /// which allows async tasks to send commands back to the main World.
26    receiver: Receiver<CommandQueue>,
27}
28
29/// Bevy doesn't implement a common trait for task pools, so we use this macro to generate the implementation
30/// for each task pool.
31macro_rules! new_async_component {
32    ($( #[$meta:meta] )* $fn_name:ident, $pool:ty) => {
33        $( #[$meta] )*
34        #[must_use = "AsyncComponent will not be polled without being spawned"]
35        pub fn $fn_name<F, Fut, EF>(task: F, handler: EF) -> Self
36        where
37            F: FnOnce(Sender<CommandQueue>) -> Fut + Send + 'static,
38            Fut: Future<Output = Result<(), BevyError>> + Send + 'static,
39            EF: FnOnce(BevyError, Sender<CommandQueue>) + Send + 'static,
40        {
41            let (sender, receiver) = unbounded::<CommandQueue>();
42
43            let task = <$pool>::get().spawn(async move {
44                match task(sender.clone()).await {
45                    Ok(()) => (),
46                    Err(error) => handler(error, sender),
47                }
48            });
49
50            AsyncComponent { task, receiver }
51        }
52    };
53}
54
55impl AsyncComponent {
56    new_async_component!(
57        /// Generates a new [`AsyncComponent`] for the `task`, scheduled on the [`AsyncComputeTaskPool`].
58        /// Like the [`AsyncComputeTaskPool`], this is intended for CPU-intensive work that may span
59        /// across multiple frames.
60        ///
61        /// The `handler` passed in will be called if `task` returns an error.
62        /// This allows handling errors within the context of the task.
63        new_async, AsyncComputeTaskPool
64    );
65
66    new_async_component!(
67    /// Generates a new [`AsyncComponent`] for the `task`, scheduled on the [`ComputeTaskPool`].
68    /// Like the [`ComputeTaskPool`], this is intended for CPU-intensive work that must be completed
69    /// to deliver the next frame.
70    ///
71    /// The `handler` passed in will be called if `task` returns an error.
72    /// This allows handling errors within the context of the task.
73        new_compute, ComputeTaskPool
74    );
75
76    new_async_component!(
77    /// Generates a new [`AsyncComponent`] for the `task`, scheduled on the [`IoTaskPool`].
78    /// Like the [`IoTaskPool`], this is intended for IO-intensive work.
79    ///
80    /// The `handler` passed in will be called if `task` returns an error.
81    /// This allows handling errors within the context of the task.
82        new_io, IoTaskPool
83    );
84}
85
86/// A helper function for reporting progress from a task controlled by [`AsyncComponent`].
87///
88/// This method is a shorthand for creating a `CommandQueue`, pushing a single command that dispatches
89/// an event (of type `E`) and sends it over the `sender`.
90///
91/// # Example
92/// ```rust
93/// # use bevy::prelude::*;
94/// # use utils::{AsyncComponent, report_progress};
95/// #[derive(Event)]
96/// struct FooEvent;
97///
98/// # fn main() {
99/// #     let mut app = App::new();
100/// #     app.add_plugins(TaskPoolPlugin::default());
101/// #     app.add_event::<FooEvent>();
102/// #     app.add_systems(Startup, setup);
103/// #     app.run();
104/// # }
105/// #
106/// # fn setup(mut commands: Commands) {
107/// #    commands.spawn(AsyncComponent::new_async(async |sender| {
108/// report_progress(&sender, FooEvent)?;
109/// #        Ok(())
110/// #    }, |_, _| {}));
111/// # }
112/// ```
113/// # Errors
114/// This method forwards the `Result` received from calling `sender.send(...)`.
115pub fn report_progress<E>(
116    sender: &Sender<CommandQueue>,
117    event: E,
118) -> Result<(), SendError<CommandQueue>>
119where
120    E: Event,
121{
122    let mut queue = CommandQueue::default();
123    queue.push(move |world: &mut World| {
124        world.send_event(event);
125    });
126
127    sender.send(queue)
128}
129
130/// Polls each [`AsyncComponent`] in the ECS tree and checks for progress and/or completion.
131///
132/// If an [`AsyncComponent`]'s `Receiver` contains updates, they are appended to the current world.
133/// The [`AsyncComponent::task`] is polled for completion, and once completed the component is removed
134/// from the world.
135pub(crate) fn handle_async_components(
136    mut commands: Commands,
137    mut query: Query<(Entity, &mut AsyncComponent)>,
138) {
139    for (entity, mut component) in &mut query {
140        let queue = component.receiver.try_iter().reduce(|mut acc, mut queue| {
141            acc.append(&mut queue);
142            acc
143        });
144
145        if let Some(mut queue) = queue {
146            commands.append(&mut queue);
147        }
148
149        if block_on(future::poll_once(&mut component.task)).is_some() {
150            commands.entity(entity).despawn();
151        }
152    }
153}