From 72e4bad8d99bc2fbff005b02d4f9263b7743f7a9 Mon Sep 17 00:00:00 2001 From: elton-cs Date: Mon, 30 Jun 2025 14:59:57 -0500 Subject: [PATCH 1/3] feat: torii with bevy tasks v1 --- Cargo.lock | 1 + Cargo.toml | 1 + README.md | 19 +++ examples/intro_v2.rs | 244 ++++++++++++++++++++++++++++ src/lib.rs | 5 +- src/torii_v2.rs | 369 +++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 638 insertions(+), 1 deletion(-) create mode 100644 examples/intro_v2.rs create mode 100644 src/torii_v2.rs diff --git a/Cargo.lock b/Cargo.lock index 5fc411e..bc8dec1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2657,6 +2657,7 @@ version = "0.1.0" dependencies = [ "anyhow", "bevy", + "crossbeam-channel", "dojo-types", "futures", "reqwest", diff --git a/Cargo.toml b/Cargo.toml index 32cfe77..ee44f27 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ starknet = "0.13" url = "2" tokio = { version = "1.0", features = ["full"] } futures = "0.3" +crossbeam-channel = "0.5" torii-grpc-client = { git = "https://github.com/dojoengine/torii", rev = "ee8756a" } dojo-types = { git = "https://github.com/dojoengine/dojo", rev = "4145801" } reqwest = { version = "0.11.27", features = [ "json", "rustls-tls" ], default-features = false } diff --git a/README.md b/README.md index 5db2a55..4672744 100644 --- a/README.md +++ b/README.md @@ -34,9 +34,28 @@ torii --config ./torii_dev.toml 5. Run this example: ```bash +# Original plugin with external Tokio runtime cargo run --example intro + +# OR use the new v2 plugin with native Bevy tasks +cargo run --example intro_v2 ``` +## Plugin Versions + +This repository contains two plugin implementations: + +### Original Plugin (`DojoPlugin`) +- Uses external Tokio runtime (`TokioRuntime` resource required) +- Full Dojo functionality (Torii + Starknet) +- Proven and stable + +### V2 Plugin (`DojoPluginV2`) +- Uses native Bevy task system (no external Tokio dependency) +- Same functionality as original but more efficient +- Better integration with Bevy's async systems +- Recommended for new projects + ## How to play More is coming with better UI but currently you can: diff --git a/examples/intro_v2.rs b/examples/intro_v2.rs new file mode 100644 index 0000000..56b684e --- /dev/null +++ b/examples/intro_v2.rs @@ -0,0 +1,244 @@ +//! Basic example of Dojo v2 plugin usage using native Bevy tasks. +//! +//! This example demonstrates the same functionality as intro.rs but using +//! the new v2 plugin with native Bevy task integration. + +use bevy::input::ButtonState; +use bevy::{input::keyboard::KeyboardInput, prelude::*}; +use dojo_types::schema::Struct; +use starknet::core::types::Call; +use starknet::core::types::Felt; +use starknet::macros::selector; +use std::collections::HashSet; +use torii_grpc_client::types::{Pagination, PaginationDirection, Query as ToriiQuery}; + +use dojo_bevy_plugin::{ + DojoEntityUpdatedV2, DojoInitializedEventV2, DojoPluginV2, DojoResourceV2, +}; + +const TORII_URL: &str = "http://localhost:8080"; +const KATANA_URL: &str = "http://0.0.0.0:5050"; + +// Manifest related constants. +const WORLD_ADDRESS: Felt = + Felt::from_hex_unchecked("0x04d9778a74d2c9e6e7e4a24cbe913998a80de217c66ee173a604d06dea5469c3"); +const ACTION_ADDRESS: Felt = + Felt::from_hex_unchecked("0x00b056c9813fdc442118bdfead6fda526e5daa5fd7d543304117ed80154ea752"); +const SPAWN_SELECTOR: Felt = selector!("spawn"); +const MOVE_SELECTOR: Felt = selector!("move"); + +/// This event will be triggered every time the position is updated. +#[derive(Event)] +struct PositionUpdatedEvent(pub Position); + +/// A very simple cube to represent the player. +#[derive(Component)] +pub struct Cube { + pub player: Felt, +} + +#[derive(Resource, Default)] +struct EntityTracker { + existing_entities: HashSet, +} + +/// Main entry point. +fn main() { + App::new() + .add_plugins(DefaultPlugins) + .add_plugins(DojoPluginV2) // Use the v2 plugin + .init_resource::() // Use v2 resource + .init_resource::() + .add_event::() + .add_systems(Startup, setup) + .add_systems( + Update, + ( + handle_keyboard_input, + on_dojo_events, + (update_cube_position).after(on_dojo_events), + ), + ) + .run(); +} + +/// This system is responsible for handling the keyboard input. +fn handle_keyboard_input( + mut dojo: ResMut, // Use v2 resource + mut keyboard_input_events: EventReader, +) { + for event in keyboard_input_events.read() { + let key_code = event.key_code; + let is_pressed = event.state == ButtonState::Pressed; + + match key_code { + KeyCode::KeyC if is_pressed => { + // Connect using v2 methods (no tokio runtime needed) + dojo.connect_torii(TORII_URL.to_string(), WORLD_ADDRESS); + dojo.connect_predeployed_account(KATANA_URL.to_string(), 0); + } + KeyCode::Space if is_pressed => { + info!("Spawning (v2)."); + let calls = vec![Call { + to: ACTION_ADDRESS, + selector: SPAWN_SELECTOR, + calldata: vec![], + }]; + dojo.queue_tx(calls); // No tokio runtime needed + } + KeyCode::KeyS if is_pressed => { + info!("Setting up Torii subscription (v2)."); + dojo.subscribe_entities("position".to_string(), None); + } + KeyCode::ArrowLeft | KeyCode::ArrowRight | KeyCode::ArrowUp | KeyCode::ArrowDown + if is_pressed => + { + let direction = match key_code { + KeyCode::ArrowLeft => 0, + KeyCode::ArrowRight => 1, + KeyCode::ArrowUp => 2, + KeyCode::ArrowDown => 3, + _ => panic!("Invalid key code"), + }; + + let calls = vec![Call { + to: ACTION_ADDRESS, + selector: MOVE_SELECTOR, + calldata: vec![Felt::from(direction)], + }]; + + dojo.queue_tx(calls); // No tokio runtime needed + } + _ => continue, + } + } +} + +/// Updates the cube position by reacting to the dedicated event +/// for new position updates. +fn update_cube_position( + mut commands: Commands, + mut meshes: ResMut>, + mut materials: ResMut>, + mut entity_tracker: ResMut, + mut ev_position_updated: EventReader, + mut query: Query<(&mut Transform, &Cube)>, +) { + for ev in ev_position_updated.read() { + let Position { x, y, player } = ev.0; + + if !entity_tracker.existing_entities.contains(&player) { + commands.spawn(( + Mesh3d(meshes.add(Cuboid::new(0.5, 0.5, 0.5))), + MeshMaterial3d(materials.add(Color::srgb(0.8, 0.7, 0.2))), // Different color for v2 + Cube { player }, + Transform::from_xyz(x as f32, y as f32, 0.0), + )); + + entity_tracker.existing_entities.insert(player); + } else { + for (mut transform, cube) in query.iter_mut() { + if cube.player == player { + transform.translation = Vec3::new(x as f32, y as f32, 0.0); + } + } + } + } +} + +/// Reacts on Dojo v2 events. +fn on_dojo_events( + mut dojo: ResMut, + mut ev_initialized: EventReader, // Use v2 events + mut ev_retrieve_entities: EventReader, // Use v2 events + mut ev_position_updated: EventWriter, +) { + for _ in ev_initialized.read() { + info!("Dojo v2 initialized."); + + // Initial fetch using v2 resource + dojo.queue_retrieve_entities(ToriiQuery { + clause: None, + pagination: Pagination { + limit: 100, + cursor: None, + direction: PaginationDirection::Forward, + order_by: vec![], + }, + no_hashed_keys: false, + models: vec![], + historical: false, + }); + } + + for ev in ev_retrieve_entities.read() { + info!(entity_id = ?ev.entity_id, "Torii v2 update"); + + if ev.entity_id == Felt::ZERO { + continue; + } + + for m in &ev.models { + debug!("model: {:?}", &m); + + match m.name.as_str() { + "di-Position" => { + ev_position_updated.write(PositionUpdatedEvent(m.into())); + } + name if name == "di-Moves".to_string() => {} + _ => { + warn!("Model not handled: {:?}", m); + } + } + } + } +} + +/// The position of the player in the game. +#[derive(Component, Debug)] +pub struct Position { + pub player: Felt, + pub x: u32, + pub y: u32, +} + +/// Manual conversion from Dojo struct to Position. +impl From<&Struct> for Position { + fn from(struct_value: &Struct) -> Self { + let player = struct_value + .get("player") + .unwrap() + .as_primitive() + .unwrap() + .as_contract_address() + .unwrap(); + let x = struct_value + .get("x") + .unwrap() + .as_primitive() + .unwrap() + .as_u32() + .unwrap(); + let y = struct_value + .get("y") + .unwrap() + .as_primitive() + .unwrap() + .as_u32() + .unwrap(); + + Position { player, x, y } + } +} + +/// Setups the scene with basic light. +pub fn setup(mut commands: Commands) { + commands.spawn(( + DirectionalLight::default(), + Transform::from_xyz(0.0, 0.0, 30.0).looking_at(Vec3::ZERO, Vec3::Y), + )); + commands.spawn(( + Camera3d::default(), + Transform::from_xyz(0.0, 0.0, 30.0).looking_at(Vec3::ZERO, Vec3::Y), + )); +} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 17ff32f..8fab897 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,6 @@ +// mod bevytasks_torii; // Disabled due to missing modules mod plugin; +mod torii_v2; -pub use plugin::*; +pub use plugin::*; +pub use torii_v2::*; diff --git a/src/torii_v2.rs b/src/torii_v2.rs new file mode 100644 index 0000000..186a061 --- /dev/null +++ b/src/torii_v2.rs @@ -0,0 +1,369 @@ +//! Dojo v2 plugin using native Bevy tasks instead of external Tokio runtime. +//! +//! This plugin provides the same functionality as the original Dojo plugin but uses +//! Bevy's native task system for better integration and performance. + +use bevy::prelude::*; +use bevy::tasks::{IoTaskPool, Task}; +use crossbeam_channel::{Receiver, Sender, unbounded}; +use dojo_types::schema::Struct; +use futures::StreamExt; +use starknet::accounts::single_owner::SignError; +use starknet::accounts::{Account, AccountError, ExecutionEncoding, SingleOwnerAccount}; +use starknet::core::types::{BlockId, BlockTag, Call, InvokeTransactionResult}; +use starknet::providers::jsonrpc::HttpTransport; +use starknet::providers::{JsonRpcClient, Provider}; +use starknet::signers::local_wallet::SignError as LocalWalletSignError; +use starknet::signers::{LocalWallet, SigningKey}; +use starknet::{core::types::Felt, providers::AnyProvider}; +use std::collections::{HashMap, VecDeque}; +use std::sync::Arc; +use tokio::sync::Mutex; +use torii_grpc_client::WorldClient; +use torii_grpc_client::types::proto::world::RetrieveEntitiesResponse; +use torii_grpc_client::types::{Clause, Query as ToriiQuery}; +use url::Url; + +/// The Dojo v2 plugin using native Bevy tasks. +pub struct DojoPluginV2; + +impl Plugin for DojoPluginV2 { + fn build(&self, app: &mut App) { + app.add_event::(); + app.add_event::(); + app.add_systems(Update, (check_torii_task_v2, check_sn_task_v2)); + } +} + +/// Event emitted when Dojo v2 is initialized. +#[derive(Event)] +pub struct DojoInitializedEventV2; + +/// Event emitted when an entity is updated from Torii. +#[derive(Event, Debug)] +pub struct DojoEntityUpdatedV2 { + pub entity_id: Felt, + pub models: Vec, +} + +/// Starknet connection state using Bevy tasks. +#[derive(Default)] +pub struct StarknetConnectionV2 { + pub connecting_task: Option>>>, + pub account: Option>>, + pub pending_txs: VecDeque>>>>, +} + +/// Torii connection state using Bevy tasks. +#[derive(Default)] +pub struct ToriiConnectionV2 { + pub init_task: Option>>, + pub client: Option>>, + pub pending_retrieve_entities: VecDeque>>, + pub subscriptions: Arc>>>, + pub subscription_sender: Option)>>, + pub subscription_receiver: Option)>>, +} + +/// Main Dojo resource using Bevy tasks. +#[derive(Resource, Default)] +pub struct DojoResourceV2 { + pub sn: StarknetConnectionV2, + pub torii: ToriiConnectionV2, +} + +impl DojoResourceV2 { + /// Connects to Torii using Bevy tasks. + pub fn connect_torii(&mut self, torii_url: String, world_address: Felt) { + info!("Connecting to Torii (v2)."); + let task_pool = IoTaskPool::get(); + let task = task_pool.spawn(async move { + WorldClient::new(torii_url, world_address).await + }); + self.torii.init_task = Some(task); + + let (sender, receiver) = unbounded(); + self.torii.subscription_sender = Some(sender); + self.torii.subscription_receiver = Some(receiver); + } + + /// Connects to a Starknet account using Bevy tasks. + pub fn connect_account(&mut self, rpc_url: String, account_addr: Felt, private_key: Felt) { + info!("Connecting to Starknet (v2)."); + let task_pool = IoTaskPool::get(); + let task = task_pool.spawn(async move { + connect_to_starknet_v2(rpc_url, account_addr, private_key).await + }); + self.sn.connecting_task = Some(task); + } + + /// Connects to a predeployed account using Bevy tasks. + pub fn connect_predeployed_account(&mut self, rpc_url: String, account_idx: usize) { + info!("Connecting to Starknet (predeployed, v2)."); + let task_pool = IoTaskPool::get(); + let task = task_pool.spawn(async move { + connect_predeployed_account_v2(rpc_url, account_idx).await + }); + self.sn.connecting_task = Some(task); + } + + /// Queues a transaction using Bevy tasks. + pub fn queue_tx(&mut self, calls: Vec) { + if let Some(account) = self.sn.account.clone() { + let task_pool = IoTaskPool::get(); + let task = task_pool.spawn(async move { + let tx = account.execute_v3(calls); + tx.send().await + }); + self.sn.pending_txs.push_back(task); + } else { + warn!("No Starknet account initialized, skipping transaction."); + } + } + + /// Queues an entity retrieval query using Bevy tasks. + pub fn queue_retrieve_entities(&mut self, query: ToriiQuery) { + if let Some(client) = self.torii.client.clone() { + let task_pool = IoTaskPool::get(); + let task = task_pool.spawn(async move { + let mut client = client.lock().await; + client.retrieve_entities(query).await + }); + self.torii.pending_retrieve_entities.push_back(task); + } else { + warn!("No Torii client initialized, skipping query."); + } + } + + /// Subscribes to entity updates using Bevy tasks. + pub fn subscribe_entities(&mut self, id: String, clause: Option) { + if let Some(client) = self.torii.client.clone() { + let sender = self.torii.subscription_sender.clone(); + let task_pool = IoTaskPool::get(); + let task = task_pool.spawn(async move { + let subscription_result = { + let mut client = client.lock().await; + client.subscribe_entities(clause).await + }; + + match subscription_result { + Ok(mut subscription) => { + while let Some(Ok((n, e))) = subscription.next().await { + debug!("Torii subscribe entities update: {} {:?}", n, e); + if let Some(ref sender) = sender { + let _ = sender.send((e.hashed_keys, e.models)); + } + } + } + Err(e) => { + error!("Failed to subscribe to entities: {:?}", e); + } + } + }); + + // Store the subscription task - we'll need to handle this async + let subscriptions = self.torii.subscriptions.clone(); + let task_id = id.clone(); + let store_task: Task> = IoTaskPool::get().spawn(async move { + let mut subs = subscriptions.lock().await; + subs.insert(task_id, task); + Ok(()) + }); + + // We could store this task too, but for simplicity we'll just fire and forget + std::mem::drop(store_task); + } else { + warn!("No Torii client initialized, skipping subscription."); + } + } +} + +/// System to check Torii tasks and handle responses. +fn check_torii_task_v2( + mut dojo: ResMut, + mut ev_retrieve_entities: EventWriter, + mut ev_initialized: EventWriter, +) { + // Check if Torii client initialization is complete + if let Some(mut task) = dojo.torii.init_task.take() { + if let Some(result) = bevy::tasks::block_on(bevy::tasks::poll_once(&mut task)) { + match result { + Ok(client) => { + info!("Torii client initialized (v2)."); + dojo.torii.client = Some(Arc::new(Mutex::new(client))); + ev_initialized.write(DojoInitializedEventV2); + } + Err(e) => { + error!("Failed to initialize Torii client: {:?}", e); + // Put the task back if it failed + dojo.torii.init_task = Some(task); + } + } + } else { + // Task not ready yet, put it back + dojo.torii.init_task = Some(task); + } + } + + // Check pending entity retrieval tasks + let mut completed_tasks = Vec::new(); + for (index, task) in dojo.torii.pending_retrieve_entities.iter_mut().enumerate() { + if let Some(result) = bevy::tasks::block_on(bevy::tasks::poll_once(task)) { + completed_tasks.push((index, result)); + } + } + + // Process completed tasks in reverse order to maintain indices + for (index, result) in completed_tasks.into_iter().rev() { + dojo.torii.pending_retrieve_entities.remove(index); + + match result { + Ok(response) => { + debug!("Retrieve entities response: {:?}", response); + for e in response.entities { + ev_retrieve_entities.write(DojoEntityUpdatedV2 { + entity_id: Felt::from_bytes_be_slice(&e.hashed_keys), + models: e.models.into_iter().map(|m| m.try_into().unwrap()).collect(), + }); + } + } + Err(e) => { + error!("Failed to retrieve entities: {:?}", e); + } + } + } + + // Check for subscription updates + if let Some(receiver) = &dojo.torii.subscription_receiver { + while let Ok((entity_id, models)) = receiver.try_recv() { + debug!("Torii subscription update: {:?}", (entity_id, &models)); + ev_retrieve_entities.write(DojoEntityUpdatedV2 { entity_id, models }); + } + } +} + +/// System to check Starknet tasks and handle responses. +fn check_sn_task_v2(mut dojo: ResMut) { + // Check if Starknet account connection is complete + if let Some(mut task) = dojo.sn.connecting_task.take() { + if let Some(result) = bevy::tasks::block_on(bevy::tasks::poll_once(&mut task)) { + info!("Connected to Starknet (v2)."); + dojo.sn.account = Some(result); + } else { + // Task not ready yet, put it back + dojo.sn.connecting_task = Some(task); + } + } + + // Check pending transactions + if !dojo.sn.pending_txs.is_empty() && dojo.sn.account.is_some() { + let mut completed_tasks = Vec::new(); + for (index, task) in dojo.sn.pending_txs.iter_mut().enumerate() { + if let Some(result) = bevy::tasks::block_on(bevy::tasks::poll_once(task)) { + completed_tasks.push((index, result)); + } + } + + // Process completed tasks in reverse order to maintain indices + for (index, result) in completed_tasks.into_iter().rev() { + dojo.sn.pending_txs.remove(index); + + match result { + Ok(tx_result) => { + info!("Transaction completed: {:#x}", tx_result.transaction_hash); + } + Err(e) => { + error!("Transaction failed: {:?}", e); + } + } + } + } +} + +/// Connects to a Starknet account (v2). +async fn connect_to_starknet_v2( + rpc_url: String, + account_addr: Felt, + private_key: Felt, +) -> Arc> { + let provider = AnyProvider::JsonRpcHttp(JsonRpcClient::new(HttpTransport::new( + Url::parse(&rpc_url).expect("Expecting valid Starknet RPC URL"), + ))); + + let chain_id = provider.chain_id().await.unwrap(); + let signer = LocalWallet::from(SigningKey::from_secret_scalar(private_key)); + + Arc::new(SingleOwnerAccount::new( + provider, + signer, + account_addr, + chain_id, + ExecutionEncoding::New, + )) +} + +/// Connects to a predeployed account (v2). +pub async fn connect_predeployed_account_v2( + rpc_url: String, + account_idx: usize, +) -> Arc> { + let provider = AnyProvider::JsonRpcHttp(JsonRpcClient::new(HttpTransport::new( + Url::parse(&rpc_url).unwrap(), + ))); + + let client = reqwest::Client::new(); + let response = client + .post(&rpc_url) + .json(&serde_json::json!({ + "jsonrpc": "2.0", + "method": "dev_predeployedAccounts", + "params": [], + "id": 1 + })) + .send() + .await + .expect("Failed to fetch predeployed accounts."); + + let result: serde_json::Value = response + .json() + .await + .expect("Failed to parse predeployed accounts."); + + if let Some(vals) = result.get("result").and_then(|v| v.as_array()) { + let chain_id = provider.chain_id().await.expect("Failed to get chain id."); + + for (i, a) in vals.iter().enumerate() { + let address = a["address"].as_str().unwrap(); + + let private_key = if let Some(pk) = a["privateKey"].as_str() { + pk + } else { + continue; + }; + + let provider = AnyProvider::JsonRpcHttp(JsonRpcClient::new(HttpTransport::new( + Url::parse(&rpc_url).unwrap(), + ))); + + let signer = LocalWallet::from(SigningKey::from_secret_scalar( + Felt::from_hex(private_key).unwrap(), + )); + + let mut account = SingleOwnerAccount::new( + provider, + signer, + Felt::from_hex(address).unwrap(), + chain_id, + ExecutionEncoding::New, + ); + + account.set_block_id(BlockId::Tag(BlockTag::Pending)); + + if i == account_idx { + return Arc::new(account); + } + } + } + + panic!("Account index out of bounds."); +} \ No newline at end of file From b60b3d3743e7e2944c58d3925ef813723d8cc620 Mon Sep 17 00:00:00 2001 From: elton-cs Date: Mon, 30 Jun 2025 15:19:16 -0500 Subject: [PATCH 2/3] fix: switch mutex to futures crate and remove tokio deps --- Cargo.lock | 1 - Cargo.toml | 2 +- src/lib.rs | 4 ++-- src/torii_v2.rs | 39 +++++++++++++++++++++------------------ 4 files changed, 24 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bc8dec1..f9e2678 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2664,7 +2664,6 @@ dependencies = [ "serde", "serde_json", "starknet 0.13.0", - "tokio", "torii-grpc-client", "url", ] diff --git a/Cargo.toml b/Cargo.toml index ee44f27..474fd48 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ anyhow = "1" bevy = "0.16.0" starknet = "0.13" url = "2" -tokio = { version = "1.0", features = ["full"] } +# tokio = { version = "1.0", features = ["full"] } futures = "0.3" crossbeam-channel = "0.5" torii-grpc-client = { git = "https://github.com/dojoengine/torii", rev = "ee8756a" } diff --git a/src/lib.rs b/src/lib.rs index 8fab897..cdfe0b3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,6 @@ // mod bevytasks_torii; // Disabled due to missing modules -mod plugin; +// mod plugin; mod torii_v2; -pub use plugin::*; +// pub use plugin::*; pub use torii_v2::*; diff --git a/src/torii_v2.rs b/src/torii_v2.rs index 186a061..b47341e 100644 --- a/src/torii_v2.rs +++ b/src/torii_v2.rs @@ -8,6 +8,7 @@ use bevy::tasks::{IoTaskPool, Task}; use crossbeam_channel::{Receiver, Sender, unbounded}; use dojo_types::schema::Struct; use futures::StreamExt; +use futures::lock::Mutex; use starknet::accounts::single_owner::SignError; use starknet::accounts::{Account, AccountError, ExecutionEncoding, SingleOwnerAccount}; use starknet::core::types::{BlockId, BlockTag, Call, InvokeTransactionResult}; @@ -18,7 +19,6 @@ use starknet::signers::{LocalWallet, SigningKey}; use starknet::{core::types::Felt, providers::AnyProvider}; use std::collections::{HashMap, VecDeque}; use std::sync::Arc; -use tokio::sync::Mutex; use torii_grpc_client::WorldClient; use torii_grpc_client::types::proto::world::RetrieveEntitiesResponse; use torii_grpc_client::types::{Clause, Query as ToriiQuery}; @@ -51,7 +51,9 @@ pub struct DojoEntityUpdatedV2 { pub struct StarknetConnectionV2 { pub connecting_task: Option>>>, pub account: Option>>, - pub pending_txs: VecDeque>>>>, + pub pending_txs: VecDeque< + Task>>>, + >, } /// Torii connection state using Bevy tasks. @@ -59,7 +61,8 @@ pub struct StarknetConnectionV2 { pub struct ToriiConnectionV2 { pub init_task: Option>>, pub client: Option>>, - pub pending_retrieve_entities: VecDeque>>, + pub pending_retrieve_entities: + VecDeque>>, pub subscriptions: Arc>>>, pub subscription_sender: Option)>>, pub subscription_receiver: Option)>>, @@ -77,9 +80,7 @@ impl DojoResourceV2 { pub fn connect_torii(&mut self, torii_url: String, world_address: Felt) { info!("Connecting to Torii (v2)."); let task_pool = IoTaskPool::get(); - let task = task_pool.spawn(async move { - WorldClient::new(torii_url, world_address).await - }); + let task = task_pool.spawn(async move { WorldClient::new(torii_url, world_address).await }); self.torii.init_task = Some(task); let (sender, receiver) = unbounded(); @@ -91,9 +92,8 @@ impl DojoResourceV2 { pub fn connect_account(&mut self, rpc_url: String, account_addr: Felt, private_key: Felt) { info!("Connecting to Starknet (v2)."); let task_pool = IoTaskPool::get(); - let task = task_pool.spawn(async move { - connect_to_starknet_v2(rpc_url, account_addr, private_key).await - }); + let task = task_pool + .spawn(async move { connect_to_starknet_v2(rpc_url, account_addr, private_key).await }); self.sn.connecting_task = Some(task); } @@ -101,9 +101,8 @@ impl DojoResourceV2 { pub fn connect_predeployed_account(&mut self, rpc_url: String, account_idx: usize) { info!("Connecting to Starknet (predeployed, v2)."); let task_pool = IoTaskPool::get(); - let task = task_pool.spawn(async move { - connect_predeployed_account_v2(rpc_url, account_idx).await - }); + let task = task_pool + .spawn(async move { connect_predeployed_account_v2(rpc_url, account_idx).await }); self.sn.connecting_task = Some(task); } @@ -145,7 +144,7 @@ impl DojoResourceV2 { let mut client = client.lock().await; client.subscribe_entities(clause).await }; - + match subscription_result { Ok(mut subscription) => { while let Some(Ok((n, e))) = subscription.next().await { @@ -169,7 +168,7 @@ impl DojoResourceV2 { subs.insert(task_id, task); Ok(()) }); - + // We could store this task too, but for simplicity we'll just fire and forget std::mem::drop(store_task); } else { @@ -216,14 +215,18 @@ fn check_torii_task_v2( // Process completed tasks in reverse order to maintain indices for (index, result) in completed_tasks.into_iter().rev() { dojo.torii.pending_retrieve_entities.remove(index); - + match result { Ok(response) => { debug!("Retrieve entities response: {:?}", response); for e in response.entities { ev_retrieve_entities.write(DojoEntityUpdatedV2 { entity_id: Felt::from_bytes_be_slice(&e.hashed_keys), - models: e.models.into_iter().map(|m| m.try_into().unwrap()).collect(), + models: e + .models + .into_iter() + .map(|m| m.try_into().unwrap()) + .collect(), }); } } @@ -267,7 +270,7 @@ fn check_sn_task_v2(mut dojo: ResMut) { // Process completed tasks in reverse order to maintain indices for (index, result) in completed_tasks.into_iter().rev() { dojo.sn.pending_txs.remove(index); - + match result { Ok(tx_result) => { info!("Transaction completed: {:#x}", tx_result.transaction_hash); @@ -366,4 +369,4 @@ pub async fn connect_predeployed_account_v2( } panic!("Account index out of bounds."); -} \ No newline at end of file +} From 1bb1831e831705a66c26395e6b6ef1d908bb49c7 Mon Sep 17 00:00:00 2001 From: elton-cs Date: Mon, 30 Jun 2025 17:07:18 -0500 Subject: [PATCH 3/3] feat: handle removing multiple instances of same id subscriptions --- examples/intro_v2.rs | 22 +++++----- src/torii_v2.rs | 100 +++++++++++++++++++++++++++++++++---------- 2 files changed, 88 insertions(+), 34 deletions(-) diff --git a/examples/intro_v2.rs b/examples/intro_v2.rs index 56b684e..bbdf5e3 100644 --- a/examples/intro_v2.rs +++ b/examples/intro_v2.rs @@ -1,6 +1,6 @@ //! Basic example of Dojo v2 plugin usage using native Bevy tasks. //! -//! This example demonstrates the same functionality as intro.rs but using +//! This example demonstrates the same functionality as intro.rs but using //! the new v2 plugin with native Bevy task integration. use bevy::input::ButtonState; @@ -12,9 +12,7 @@ use starknet::macros::selector; use std::collections::HashSet; use torii_grpc_client::types::{Pagination, PaginationDirection, Query as ToriiQuery}; -use dojo_bevy_plugin::{ - DojoEntityUpdatedV2, DojoInitializedEventV2, DojoPluginV2, DojoResourceV2, -}; +use dojo_bevy_plugin::{DojoEntityUpdatedV2, DojoInitializedEventV2, DojoPluginV2, DojoResourceV2}; const TORII_URL: &str = "http://localhost:8080"; const KATANA_URL: &str = "http://0.0.0.0:5050"; @@ -46,8 +44,8 @@ struct EntityTracker { fn main() { App::new() .add_plugins(DefaultPlugins) - .add_plugins(DojoPluginV2) // Use the v2 plugin - .init_resource::() // Use v2 resource + .add_plugins(DojoPluginV2) // Use the v2 plugin + .init_resource::() // Use v2 resource .init_resource::() .add_event::() .add_systems(Startup, setup) @@ -64,7 +62,7 @@ fn main() { /// This system is responsible for handling the keyboard input. fn handle_keyboard_input( - mut dojo: ResMut, // Use v2 resource + mut dojo: ResMut, // Use v2 resource mut keyboard_input_events: EventReader, ) { for event in keyboard_input_events.read() { @@ -84,7 +82,7 @@ fn handle_keyboard_input( selector: SPAWN_SELECTOR, calldata: vec![], }]; - dojo.queue_tx(calls); // No tokio runtime needed + dojo.queue_tx(calls); // No tokio runtime needed } KeyCode::KeyS if is_pressed => { info!("Setting up Torii subscription (v2)."); @@ -107,7 +105,7 @@ fn handle_keyboard_input( calldata: vec![Felt::from(direction)], }]; - dojo.queue_tx(calls); // No tokio runtime needed + dojo.queue_tx(calls); // No tokio runtime needed } _ => continue, } @@ -149,8 +147,8 @@ fn update_cube_position( /// Reacts on Dojo v2 events. fn on_dojo_events( mut dojo: ResMut, - mut ev_initialized: EventReader, // Use v2 events - mut ev_retrieve_entities: EventReader, // Use v2 events + mut ev_initialized: EventReader, // Use v2 events + mut ev_retrieve_entities: EventReader, // Use v2 events mut ev_position_updated: EventWriter, ) { for _ in ev_initialized.read() { @@ -241,4 +239,4 @@ pub fn setup(mut commands: Commands) { Camera3d::default(), Transform::from_xyz(0.0, 0.0, 30.0).looking_at(Vec3::ZERO, Vec3::Y), )); -} \ No newline at end of file +} diff --git a/src/torii_v2.rs b/src/torii_v2.rs index b47341e..b6ce35c 100644 --- a/src/torii_v2.rs +++ b/src/torii_v2.rs @@ -24,6 +24,12 @@ use torii_grpc_client::types::proto::world::RetrieveEntitiesResponse; use torii_grpc_client::types::{Clause, Query as ToriiQuery}; use url::Url; +/// Represents the state of a subscription task +pub struct SubscriptionTaskState { + pub task: Task<()>, + pub is_active: bool, +} + /// The Dojo v2 plugin using native Bevy tasks. pub struct DojoPluginV2; @@ -63,9 +69,10 @@ pub struct ToriiConnectionV2 { pub client: Option>>, pub pending_retrieve_entities: VecDeque>>, - pub subscriptions: Arc>>>, + pub subscriptions: Arc>>, pub subscription_sender: Option)>>, pub subscription_receiver: Option)>>, + pub pending_subscription_stores: VecDeque>>, } /// Main Dojo resource using Bevy tasks. @@ -160,17 +167,31 @@ impl DojoResourceV2 { } }); - // Store the subscription task - we'll need to handle this async + // Store the subscription task with proper cleanup of old subscriptions let subscriptions = self.torii.subscriptions.clone(); let task_id = id.clone(); - let store_task: Task> = IoTaskPool::get().spawn(async move { + let store_task: Task> = IoTaskPool::get().spawn(async move { let mut subs = subscriptions.lock().await; - subs.insert(task_id, task); + + // Clean up old subscription if it exists + if let Some(_old_state) = subs.remove(&task_id) { + // Mark old task as inactive (it will naturally terminate) + debug!("Replacing existing subscription: {}", task_id); + } + + subs.insert( + task_id, + SubscriptionTaskState { + task, + is_active: true, + }, + ); + Ok(()) }); - // We could store this task too, but for simplicity we'll just fire and forget - std::mem::drop(store_task); + // Store the subscription storage task to track completion + self.torii.pending_subscription_stores.push_back(store_task); } else { warn!("No Torii client initialized, skipping subscription."); } @@ -204,6 +225,32 @@ fn check_torii_task_v2( } } + // Check pending subscription storage tasks + let mut completed_stores = Vec::new(); + for (index, task) in dojo + .torii + .pending_subscription_stores + .iter_mut() + .enumerate() + { + if let Some(result) = bevy::tasks::block_on(bevy::tasks::poll_once(task)) { + completed_stores.push((index, result)); + } + } + + // Process completed subscription storage tasks + for (index, result) in completed_stores.into_iter().rev() { + dojo.torii.pending_subscription_stores.remove(index); + match result { + Ok(_) => { + debug!("Subscription successfully stored"); + } + Err(e) => { + error!("Failed to store subscription: {}", e); + } + } + } + // Check pending entity retrieval tasks let mut completed_tasks = Vec::new(); for (index, task) in dojo.torii.pending_retrieve_entities.iter_mut().enumerate() { @@ -258,27 +305,36 @@ fn check_sn_task_v2(mut dojo: ResMut) { } } - // Check pending transactions - if !dojo.sn.pending_txs.is_empty() && dojo.sn.account.is_some() { - let mut completed_tasks = Vec::new(); - for (index, task) in dojo.sn.pending_txs.iter_mut().enumerate() { - if let Some(result) = bevy::tasks::block_on(bevy::tasks::poll_once(task)) { - completed_tasks.push((index, result)); + // Check pending transactions - only if we have an account and pending transactions + if !dojo.sn.pending_txs.is_empty() { + if dojo.sn.account.is_some() { + let mut completed_tasks = Vec::new(); + for (index, task) in dojo.sn.pending_txs.iter_mut().enumerate() { + if let Some(result) = bevy::tasks::block_on(bevy::tasks::poll_once(task)) { + completed_tasks.push((index, result)); + } } - } - // Process completed tasks in reverse order to maintain indices - for (index, result) in completed_tasks.into_iter().rev() { - dojo.sn.pending_txs.remove(index); + // Process completed tasks in reverse order to maintain indices + for (index, result) in completed_tasks.into_iter().rev() { + dojo.sn.pending_txs.remove(index); - match result { - Ok(tx_result) => { - info!("Transaction completed: {:#x}", tx_result.transaction_hash); - } - Err(e) => { - error!("Transaction failed: {:?}", e); + match result { + Ok(tx_result) => { + info!("Transaction completed: {:#x}", tx_result.transaction_hash); + } + Err(e) => { + error!("Transaction failed: {:?}", e); + } } } + } else { + // Clear pending transactions if no account is available + warn!( + "Clearing {} pending transactions - no account available", + dojo.sn.pending_txs.len() + ); + dojo.sn.pending_txs.clear(); } } }