utils/async_ecs.rs
1//! This module handles asynchronous operations in ECS.
2//!
3//! Most notably, it defines the `AsyncComponent` struct which tracks the state of an async operation
4//! and automatically executes commands emitted on the world the component is attached to.
5
6use bevy::ecs::world::CommandQueue;
7use bevy::prelude::{BevyError, Commands, Component, Entity, Event, Query, World};
8use bevy::tasks::futures_lite::future;
9use bevy::tasks::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool, Task, block_on};
10pub use crossbeam_channel::{Receiver, Sender};
11use crossbeam_channel::{SendError, unbounded};
12use std::future::Future;
13
14/// Represents an ongoing asynchronous operation that will be polled for progress and/or completion.
15///
16/// To create instances, see [`AsyncComponent::new_async`], [`AsyncComponent::new_compute`] and
17/// [`AsyncComponent::new_io`].
18#[derive(Component)]
19pub struct AsyncComponent {
20 /// The task tracking if the asynchronous operation is completed.
21 task: Task<()>,
22 /// The channel through which updates are emitted.
23 ///
24 /// Updates are modelled through Bevy's [`CommandQueue`](https://docs.rs/bevy/latest/bevy/ecs/world/struct.CommandQueue.html),
25 /// which allows async tasks to send commands back to the main World.
26 receiver: Receiver<CommandQueue>,
27}
28
29/// Bevy doesn't implement a common trait for task pools, so we use this macro to generate the implementation
30/// for each task pool.
31macro_rules! new_async_component {
32 ($( #[$meta:meta] )* $fn_name:ident, $pool:ty) => {
33 $( #[$meta] )*
34 #[must_use = "AsyncComponent will not be polled without being spawned"]
35 pub fn $fn_name<F, Fut, EF>(task: F, handler: EF) -> Self
36 where
37 F: FnOnce(Sender<CommandQueue>) -> Fut + Send + 'static,
38 Fut: Future<Output = Result<(), BevyError>> + Send + 'static,
39 EF: FnOnce(BevyError, Sender<CommandQueue>) + Send + 'static,
40 {
41 let (sender, receiver) = unbounded::<CommandQueue>();
42
43 let task = <$pool>::get().spawn(async move {
44 match task(sender.clone()).await {
45 Ok(()) => (),
46 Err(error) => handler(error, sender),
47 }
48 });
49
50 AsyncComponent { task, receiver }
51 }
52 };
53}
54
55impl AsyncComponent {
56 new_async_component!(
57 /// Generates a new [`AsyncComponent`] for the `task`, scheduled on the [`AsyncComputeTaskPool`].
58 /// Like the [`AsyncComputeTaskPool`], this is intended for CPU-intensive work that may span
59 /// across multiple frames.
60 ///
61 /// The `handler` passed in will be called if `task` returns an error.
62 /// This allows handling errors within the context of the task.
63 new_async, AsyncComputeTaskPool
64 );
65
66 new_async_component!(
67 /// Generates a new [`AsyncComponent`] for the `task`, scheduled on the [`ComputeTaskPool`].
68 /// Like the [`ComputeTaskPool`], this is intended for CPU-intensive work that must be completed
69 /// to deliver the next frame.
70 ///
71 /// The `handler` passed in will be called if `task` returns an error.
72 /// This allows handling errors within the context of the task.
73 new_compute, ComputeTaskPool
74 );
75
76 new_async_component!(
77 /// Generates a new [`AsyncComponent`] for the `task`, scheduled on the [`IoTaskPool`].
78 /// Like the [`IoTaskPool`], this is intended for IO-intensive work.
79 ///
80 /// The `handler` passed in will be called if `task` returns an error.
81 /// This allows handling errors within the context of the task.
82 new_io, IoTaskPool
83 );
84}
85
86/// A helper function for reporting progress from a task controlled by [`AsyncComponent`].
87///
88/// This method is a shorthand for creating a `CommandQueue`, pushing a single command that dispatches
89/// an event (of type `E`) and sends it over the `sender`.
90///
91/// # Example
92/// ```rust
93/// # use bevy::prelude::*;
94/// # use utils::{AsyncComponent, report_progress};
95/// #[derive(Event)]
96/// struct FooEvent;
97///
98/// # fn main() {
99/// # let mut app = App::new();
100/// # app.add_plugins(TaskPoolPlugin::default());
101/// # app.add_event::<FooEvent>();
102/// # app.add_systems(Startup, setup);
103/// # app.run();
104/// # }
105/// #
106/// # fn setup(mut commands: Commands) {
107/// # commands.spawn(AsyncComponent::new_async(async |sender| {
108/// report_progress(&sender, FooEvent)?;
109/// # Ok(())
110/// # }, |_, _| {}));
111/// # }
112/// ```
113/// # Errors
114/// This method forwards the `Result` received from calling `sender.send(...)`.
115pub fn report_progress<E>(
116 sender: &Sender<CommandQueue>,
117 event: E,
118) -> Result<(), SendError<CommandQueue>>
119where
120 E: Event,
121{
122 let mut queue = CommandQueue::default();
123 queue.push(move |world: &mut World| {
124 world.send_event(event);
125 });
126
127 sender.send(queue)
128}
129
130/// Polls each [`AsyncComponent`] in the ECS tree and checks for progress and/or completion.
131///
132/// If an [`AsyncComponent`]'s `Receiver` contains updates, they are appended to the current world.
133/// The [`AsyncComponent::task`] is polled for completion, and once completed the component is removed
134/// from the world.
135pub(crate) fn handle_async_components(
136 mut commands: Commands,
137 mut query: Query<(Entity, &mut AsyncComponent)>,
138) {
139 for (entity, mut component) in &mut query {
140 let queue = component.receiver.try_iter().reduce(|mut acc, mut queue| {
141 acc.append(&mut queue);
142 acc
143 });
144
145 if let Some(mut queue) = queue {
146 commands.append(&mut queue);
147 }
148
149 if block_on(future::poll_once(&mut component.task)).is_some() {
150 commands.entity(entity).despawn();
151 }
152 }
153}