diff --git a/Cargo.lock b/Cargo.lock index 5fc411e..f9e2678 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2657,13 +2657,13 @@ version = "0.1.0" dependencies = [ "anyhow", "bevy", + "crossbeam-channel", "dojo-types", "futures", "reqwest", "serde", "serde_json", "starknet 0.13.0", - "tokio", "torii-grpc-client", "url", ] diff --git a/Cargo.toml b/Cargo.toml index 32cfe77..474fd48 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,8 +8,9 @@ anyhow = "1" bevy = "0.16.0" starknet = "0.13" url = "2" -tokio = { version = "1.0", features = ["full"] } +# tokio = { version = "1.0", features = ["full"] } futures = "0.3" +crossbeam-channel = "0.5" torii-grpc-client = { git = "https://github.com/dojoengine/torii", rev = "ee8756a" } dojo-types = { git = "https://github.com/dojoengine/dojo", rev = "4145801" } reqwest = { version = "0.11.27", features = [ "json", "rustls-tls" ], default-features = false } diff --git a/README.md b/README.md index 5db2a55..4672744 100644 --- a/README.md +++ b/README.md @@ -34,9 +34,28 @@ torii --config ./torii_dev.toml 5. Run this example: ```bash +# Original plugin with external Tokio runtime cargo run --example intro + +# OR use the new v2 plugin with native Bevy tasks +cargo run --example intro_v2 ``` +## Plugin Versions + +This repository contains two plugin implementations: + +### Original Plugin (`DojoPlugin`) +- Uses external Tokio runtime (`TokioRuntime` resource required) +- Full Dojo functionality (Torii + Starknet) +- Proven and stable + +### V2 Plugin (`DojoPluginV2`) +- Uses native Bevy task system (no external Tokio dependency) +- Same functionality as original but more efficient +- Better integration with Bevy's async systems +- Recommended for new projects + ## How to play More is coming with better UI but currently you can: diff --git a/examples/intro_v2.rs b/examples/intro_v2.rs new file mode 100644 index 0000000..bbdf5e3 --- /dev/null +++ b/examples/intro_v2.rs @@ -0,0 +1,242 @@ +//! Basic example of Dojo v2 plugin usage using native Bevy tasks. +//! +//! This example demonstrates the same functionality as intro.rs but using +//! the new v2 plugin with native Bevy task integration. + +use bevy::input::ButtonState; +use bevy::{input::keyboard::KeyboardInput, prelude::*}; +use dojo_types::schema::Struct; +use starknet::core::types::Call; +use starknet::core::types::Felt; +use starknet::macros::selector; +use std::collections::HashSet; +use torii_grpc_client::types::{Pagination, PaginationDirection, Query as ToriiQuery}; + +use dojo_bevy_plugin::{DojoEntityUpdatedV2, DojoInitializedEventV2, DojoPluginV2, DojoResourceV2}; + +const TORII_URL: &str = "http://localhost:8080"; +const KATANA_URL: &str = "http://0.0.0.0:5050"; + +// Manifest related constants. +const WORLD_ADDRESS: Felt = + Felt::from_hex_unchecked("0x04d9778a74d2c9e6e7e4a24cbe913998a80de217c66ee173a604d06dea5469c3"); +const ACTION_ADDRESS: Felt = + Felt::from_hex_unchecked("0x00b056c9813fdc442118bdfead6fda526e5daa5fd7d543304117ed80154ea752"); +const SPAWN_SELECTOR: Felt = selector!("spawn"); +const MOVE_SELECTOR: Felt = selector!("move"); + +/// This event will be triggered every time the position is updated. +#[derive(Event)] +struct PositionUpdatedEvent(pub Position); + +/// A very simple cube to represent the player. +#[derive(Component)] +pub struct Cube { + pub player: Felt, +} + +#[derive(Resource, Default)] +struct EntityTracker { + existing_entities: HashSet, +} + +/// Main entry point. +fn main() { + App::new() + .add_plugins(DefaultPlugins) + .add_plugins(DojoPluginV2) // Use the v2 plugin + .init_resource::() // Use v2 resource + .init_resource::() + .add_event::() + .add_systems(Startup, setup) + .add_systems( + Update, + ( + handle_keyboard_input, + on_dojo_events, + (update_cube_position).after(on_dojo_events), + ), + ) + .run(); +} + +/// This system is responsible for handling the keyboard input. +fn handle_keyboard_input( + mut dojo: ResMut, // Use v2 resource + mut keyboard_input_events: EventReader, +) { + for event in keyboard_input_events.read() { + let key_code = event.key_code; + let is_pressed = event.state == ButtonState::Pressed; + + match key_code { + KeyCode::KeyC if is_pressed => { + // Connect using v2 methods (no tokio runtime needed) + dojo.connect_torii(TORII_URL.to_string(), WORLD_ADDRESS); + dojo.connect_predeployed_account(KATANA_URL.to_string(), 0); + } + KeyCode::Space if is_pressed => { + info!("Spawning (v2)."); + let calls = vec![Call { + to: ACTION_ADDRESS, + selector: SPAWN_SELECTOR, + calldata: vec![], + }]; + dojo.queue_tx(calls); // No tokio runtime needed + } + KeyCode::KeyS if is_pressed => { + info!("Setting up Torii subscription (v2)."); + dojo.subscribe_entities("position".to_string(), None); + } + KeyCode::ArrowLeft | KeyCode::ArrowRight | KeyCode::ArrowUp | KeyCode::ArrowDown + if is_pressed => + { + let direction = match key_code { + KeyCode::ArrowLeft => 0, + KeyCode::ArrowRight => 1, + KeyCode::ArrowUp => 2, + KeyCode::ArrowDown => 3, + _ => panic!("Invalid key code"), + }; + + let calls = vec![Call { + to: ACTION_ADDRESS, + selector: MOVE_SELECTOR, + calldata: vec![Felt::from(direction)], + }]; + + dojo.queue_tx(calls); // No tokio runtime needed + } + _ => continue, + } + } +} + +/// Updates the cube position by reacting to the dedicated event +/// for new position updates. +fn update_cube_position( + mut commands: Commands, + mut meshes: ResMut>, + mut materials: ResMut>, + mut entity_tracker: ResMut, + mut ev_position_updated: EventReader, + mut query: Query<(&mut Transform, &Cube)>, +) { + for ev in ev_position_updated.read() { + let Position { x, y, player } = ev.0; + + if !entity_tracker.existing_entities.contains(&player) { + commands.spawn(( + Mesh3d(meshes.add(Cuboid::new(0.5, 0.5, 0.5))), + MeshMaterial3d(materials.add(Color::srgb(0.8, 0.7, 0.2))), // Different color for v2 + Cube { player }, + Transform::from_xyz(x as f32, y as f32, 0.0), + )); + + entity_tracker.existing_entities.insert(player); + } else { + for (mut transform, cube) in query.iter_mut() { + if cube.player == player { + transform.translation = Vec3::new(x as f32, y as f32, 0.0); + } + } + } + } +} + +/// Reacts on Dojo v2 events. +fn on_dojo_events( + mut dojo: ResMut, + mut ev_initialized: EventReader, // Use v2 events + mut ev_retrieve_entities: EventReader, // Use v2 events + mut ev_position_updated: EventWriter, +) { + for _ in ev_initialized.read() { + info!("Dojo v2 initialized."); + + // Initial fetch using v2 resource + dojo.queue_retrieve_entities(ToriiQuery { + clause: None, + pagination: Pagination { + limit: 100, + cursor: None, + direction: PaginationDirection::Forward, + order_by: vec![], + }, + no_hashed_keys: false, + models: vec![], + historical: false, + }); + } + + for ev in ev_retrieve_entities.read() { + info!(entity_id = ?ev.entity_id, "Torii v2 update"); + + if ev.entity_id == Felt::ZERO { + continue; + } + + for m in &ev.models { + debug!("model: {:?}", &m); + + match m.name.as_str() { + "di-Position" => { + ev_position_updated.write(PositionUpdatedEvent(m.into())); + } + name if name == "di-Moves".to_string() => {} + _ => { + warn!("Model not handled: {:?}", m); + } + } + } + } +} + +/// The position of the player in the game. +#[derive(Component, Debug)] +pub struct Position { + pub player: Felt, + pub x: u32, + pub y: u32, +} + +/// Manual conversion from Dojo struct to Position. +impl From<&Struct> for Position { + fn from(struct_value: &Struct) -> Self { + let player = struct_value + .get("player") + .unwrap() + .as_primitive() + .unwrap() + .as_contract_address() + .unwrap(); + let x = struct_value + .get("x") + .unwrap() + .as_primitive() + .unwrap() + .as_u32() + .unwrap(); + let y = struct_value + .get("y") + .unwrap() + .as_primitive() + .unwrap() + .as_u32() + .unwrap(); + + Position { player, x, y } + } +} + +/// Setups the scene with basic light. +pub fn setup(mut commands: Commands) { + commands.spawn(( + DirectionalLight::default(), + Transform::from_xyz(0.0, 0.0, 30.0).looking_at(Vec3::ZERO, Vec3::Y), + )); + commands.spawn(( + Camera3d::default(), + Transform::from_xyz(0.0, 0.0, 30.0).looking_at(Vec3::ZERO, Vec3::Y), + )); +} diff --git a/src/lib.rs b/src/lib.rs index 17ff32f..cdfe0b3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,6 @@ -mod plugin; +// mod bevytasks_torii; // Disabled due to missing modules +// mod plugin; +mod torii_v2; -pub use plugin::*; +// pub use plugin::*; +pub use torii_v2::*; diff --git a/src/torii_v2.rs b/src/torii_v2.rs new file mode 100644 index 0000000..b6ce35c --- /dev/null +++ b/src/torii_v2.rs @@ -0,0 +1,428 @@ +//! Dojo v2 plugin using native Bevy tasks instead of external Tokio runtime. +//! +//! This plugin provides the same functionality as the original Dojo plugin but uses +//! Bevy's native task system for better integration and performance. + +use bevy::prelude::*; +use bevy::tasks::{IoTaskPool, Task}; +use crossbeam_channel::{Receiver, Sender, unbounded}; +use dojo_types::schema::Struct; +use futures::StreamExt; +use futures::lock::Mutex; +use starknet::accounts::single_owner::SignError; +use starknet::accounts::{Account, AccountError, ExecutionEncoding, SingleOwnerAccount}; +use starknet::core::types::{BlockId, BlockTag, Call, InvokeTransactionResult}; +use starknet::providers::jsonrpc::HttpTransport; +use starknet::providers::{JsonRpcClient, Provider}; +use starknet::signers::local_wallet::SignError as LocalWalletSignError; +use starknet::signers::{LocalWallet, SigningKey}; +use starknet::{core::types::Felt, providers::AnyProvider}; +use std::collections::{HashMap, VecDeque}; +use std::sync::Arc; +use torii_grpc_client::WorldClient; +use torii_grpc_client::types::proto::world::RetrieveEntitiesResponse; +use torii_grpc_client::types::{Clause, Query as ToriiQuery}; +use url::Url; + +/// Represents the state of a subscription task +pub struct SubscriptionTaskState { + pub task: Task<()>, + pub is_active: bool, +} + +/// The Dojo v2 plugin using native Bevy tasks. +pub struct DojoPluginV2; + +impl Plugin for DojoPluginV2 { + fn build(&self, app: &mut App) { + app.add_event::(); + app.add_event::(); + app.add_systems(Update, (check_torii_task_v2, check_sn_task_v2)); + } +} + +/// Event emitted when Dojo v2 is initialized. +#[derive(Event)] +pub struct DojoInitializedEventV2; + +/// Event emitted when an entity is updated from Torii. +#[derive(Event, Debug)] +pub struct DojoEntityUpdatedV2 { + pub entity_id: Felt, + pub models: Vec, +} + +/// Starknet connection state using Bevy tasks. +#[derive(Default)] +pub struct StarknetConnectionV2 { + pub connecting_task: Option>>>, + pub account: Option>>, + pub pending_txs: VecDeque< + Task>>>, + >, +} + +/// Torii connection state using Bevy tasks. +#[derive(Default)] +pub struct ToriiConnectionV2 { + pub init_task: Option>>, + pub client: Option>>, + pub pending_retrieve_entities: + VecDeque>>, + pub subscriptions: Arc>>, + pub subscription_sender: Option)>>, + pub subscription_receiver: Option)>>, + pub pending_subscription_stores: VecDeque>>, +} + +/// Main Dojo resource using Bevy tasks. +#[derive(Resource, Default)] +pub struct DojoResourceV2 { + pub sn: StarknetConnectionV2, + pub torii: ToriiConnectionV2, +} + +impl DojoResourceV2 { + /// Connects to Torii using Bevy tasks. + pub fn connect_torii(&mut self, torii_url: String, world_address: Felt) { + info!("Connecting to Torii (v2)."); + let task_pool = IoTaskPool::get(); + let task = task_pool.spawn(async move { WorldClient::new(torii_url, world_address).await }); + self.torii.init_task = Some(task); + + let (sender, receiver) = unbounded(); + self.torii.subscription_sender = Some(sender); + self.torii.subscription_receiver = Some(receiver); + } + + /// Connects to a Starknet account using Bevy tasks. + pub fn connect_account(&mut self, rpc_url: String, account_addr: Felt, private_key: Felt) { + info!("Connecting to Starknet (v2)."); + let task_pool = IoTaskPool::get(); + let task = task_pool + .spawn(async move { connect_to_starknet_v2(rpc_url, account_addr, private_key).await }); + self.sn.connecting_task = Some(task); + } + + /// Connects to a predeployed account using Bevy tasks. + pub fn connect_predeployed_account(&mut self, rpc_url: String, account_idx: usize) { + info!("Connecting to Starknet (predeployed, v2)."); + let task_pool = IoTaskPool::get(); + let task = task_pool + .spawn(async move { connect_predeployed_account_v2(rpc_url, account_idx).await }); + self.sn.connecting_task = Some(task); + } + + /// Queues a transaction using Bevy tasks. + pub fn queue_tx(&mut self, calls: Vec) { + if let Some(account) = self.sn.account.clone() { + let task_pool = IoTaskPool::get(); + let task = task_pool.spawn(async move { + let tx = account.execute_v3(calls); + tx.send().await + }); + self.sn.pending_txs.push_back(task); + } else { + warn!("No Starknet account initialized, skipping transaction."); + } + } + + /// Queues an entity retrieval query using Bevy tasks. + pub fn queue_retrieve_entities(&mut self, query: ToriiQuery) { + if let Some(client) = self.torii.client.clone() { + let task_pool = IoTaskPool::get(); + let task = task_pool.spawn(async move { + let mut client = client.lock().await; + client.retrieve_entities(query).await + }); + self.torii.pending_retrieve_entities.push_back(task); + } else { + warn!("No Torii client initialized, skipping query."); + } + } + + /// Subscribes to entity updates using Bevy tasks. + pub fn subscribe_entities(&mut self, id: String, clause: Option) { + if let Some(client) = self.torii.client.clone() { + let sender = self.torii.subscription_sender.clone(); + let task_pool = IoTaskPool::get(); + let task = task_pool.spawn(async move { + let subscription_result = { + let mut client = client.lock().await; + client.subscribe_entities(clause).await + }; + + match subscription_result { + Ok(mut subscription) => { + while let Some(Ok((n, e))) = subscription.next().await { + debug!("Torii subscribe entities update: {} {:?}", n, e); + if let Some(ref sender) = sender { + let _ = sender.send((e.hashed_keys, e.models)); + } + } + } + Err(e) => { + error!("Failed to subscribe to entities: {:?}", e); + } + } + }); + + // Store the subscription task with proper cleanup of old subscriptions + let subscriptions = self.torii.subscriptions.clone(); + let task_id = id.clone(); + let store_task: Task> = IoTaskPool::get().spawn(async move { + let mut subs = subscriptions.lock().await; + + // Clean up old subscription if it exists + if let Some(_old_state) = subs.remove(&task_id) { + // Mark old task as inactive (it will naturally terminate) + debug!("Replacing existing subscription: {}", task_id); + } + + subs.insert( + task_id, + SubscriptionTaskState { + task, + is_active: true, + }, + ); + + Ok(()) + }); + + // Store the subscription storage task to track completion + self.torii.pending_subscription_stores.push_back(store_task); + } else { + warn!("No Torii client initialized, skipping subscription."); + } + } +} + +/// System to check Torii tasks and handle responses. +fn check_torii_task_v2( + mut dojo: ResMut, + mut ev_retrieve_entities: EventWriter, + mut ev_initialized: EventWriter, +) { + // Check if Torii client initialization is complete + if let Some(mut task) = dojo.torii.init_task.take() { + if let Some(result) = bevy::tasks::block_on(bevy::tasks::poll_once(&mut task)) { + match result { + Ok(client) => { + info!("Torii client initialized (v2)."); + dojo.torii.client = Some(Arc::new(Mutex::new(client))); + ev_initialized.write(DojoInitializedEventV2); + } + Err(e) => { + error!("Failed to initialize Torii client: {:?}", e); + // Put the task back if it failed + dojo.torii.init_task = Some(task); + } + } + } else { + // Task not ready yet, put it back + dojo.torii.init_task = Some(task); + } + } + + // Check pending subscription storage tasks + let mut completed_stores = Vec::new(); + for (index, task) in dojo + .torii + .pending_subscription_stores + .iter_mut() + .enumerate() + { + if let Some(result) = bevy::tasks::block_on(bevy::tasks::poll_once(task)) { + completed_stores.push((index, result)); + } + } + + // Process completed subscription storage tasks + for (index, result) in completed_stores.into_iter().rev() { + dojo.torii.pending_subscription_stores.remove(index); + match result { + Ok(_) => { + debug!("Subscription successfully stored"); + } + Err(e) => { + error!("Failed to store subscription: {}", e); + } + } + } + + // Check pending entity retrieval tasks + let mut completed_tasks = Vec::new(); + for (index, task) in dojo.torii.pending_retrieve_entities.iter_mut().enumerate() { + if let Some(result) = bevy::tasks::block_on(bevy::tasks::poll_once(task)) { + completed_tasks.push((index, result)); + } + } + + // Process completed tasks in reverse order to maintain indices + for (index, result) in completed_tasks.into_iter().rev() { + dojo.torii.pending_retrieve_entities.remove(index); + + match result { + Ok(response) => { + debug!("Retrieve entities response: {:?}", response); + for e in response.entities { + ev_retrieve_entities.write(DojoEntityUpdatedV2 { + entity_id: Felt::from_bytes_be_slice(&e.hashed_keys), + models: e + .models + .into_iter() + .map(|m| m.try_into().unwrap()) + .collect(), + }); + } + } + Err(e) => { + error!("Failed to retrieve entities: {:?}", e); + } + } + } + + // Check for subscription updates + if let Some(receiver) = &dojo.torii.subscription_receiver { + while let Ok((entity_id, models)) = receiver.try_recv() { + debug!("Torii subscription update: {:?}", (entity_id, &models)); + ev_retrieve_entities.write(DojoEntityUpdatedV2 { entity_id, models }); + } + } +} + +/// System to check Starknet tasks and handle responses. +fn check_sn_task_v2(mut dojo: ResMut) { + // Check if Starknet account connection is complete + if let Some(mut task) = dojo.sn.connecting_task.take() { + if let Some(result) = bevy::tasks::block_on(bevy::tasks::poll_once(&mut task)) { + info!("Connected to Starknet (v2)."); + dojo.sn.account = Some(result); + } else { + // Task not ready yet, put it back + dojo.sn.connecting_task = Some(task); + } + } + + // Check pending transactions - only if we have an account and pending transactions + if !dojo.sn.pending_txs.is_empty() { + if dojo.sn.account.is_some() { + let mut completed_tasks = Vec::new(); + for (index, task) in dojo.sn.pending_txs.iter_mut().enumerate() { + if let Some(result) = bevy::tasks::block_on(bevy::tasks::poll_once(task)) { + completed_tasks.push((index, result)); + } + } + + // Process completed tasks in reverse order to maintain indices + for (index, result) in completed_tasks.into_iter().rev() { + dojo.sn.pending_txs.remove(index); + + match result { + Ok(tx_result) => { + info!("Transaction completed: {:#x}", tx_result.transaction_hash); + } + Err(e) => { + error!("Transaction failed: {:?}", e); + } + } + } + } else { + // Clear pending transactions if no account is available + warn!( + "Clearing {} pending transactions - no account available", + dojo.sn.pending_txs.len() + ); + dojo.sn.pending_txs.clear(); + } + } +} + +/// Connects to a Starknet account (v2). +async fn connect_to_starknet_v2( + rpc_url: String, + account_addr: Felt, + private_key: Felt, +) -> Arc> { + let provider = AnyProvider::JsonRpcHttp(JsonRpcClient::new(HttpTransport::new( + Url::parse(&rpc_url).expect("Expecting valid Starknet RPC URL"), + ))); + + let chain_id = provider.chain_id().await.unwrap(); + let signer = LocalWallet::from(SigningKey::from_secret_scalar(private_key)); + + Arc::new(SingleOwnerAccount::new( + provider, + signer, + account_addr, + chain_id, + ExecutionEncoding::New, + )) +} + +/// Connects to a predeployed account (v2). +pub async fn connect_predeployed_account_v2( + rpc_url: String, + account_idx: usize, +) -> Arc> { + let provider = AnyProvider::JsonRpcHttp(JsonRpcClient::new(HttpTransport::new( + Url::parse(&rpc_url).unwrap(), + ))); + + let client = reqwest::Client::new(); + let response = client + .post(&rpc_url) + .json(&serde_json::json!({ + "jsonrpc": "2.0", + "method": "dev_predeployedAccounts", + "params": [], + "id": 1 + })) + .send() + .await + .expect("Failed to fetch predeployed accounts."); + + let result: serde_json::Value = response + .json() + .await + .expect("Failed to parse predeployed accounts."); + + if let Some(vals) = result.get("result").and_then(|v| v.as_array()) { + let chain_id = provider.chain_id().await.expect("Failed to get chain id."); + + for (i, a) in vals.iter().enumerate() { + let address = a["address"].as_str().unwrap(); + + let private_key = if let Some(pk) = a["privateKey"].as_str() { + pk + } else { + continue; + }; + + let provider = AnyProvider::JsonRpcHttp(JsonRpcClient::new(HttpTransport::new( + Url::parse(&rpc_url).unwrap(), + ))); + + let signer = LocalWallet::from(SigningKey::from_secret_scalar( + Felt::from_hex(private_key).unwrap(), + )); + + let mut account = SingleOwnerAccount::new( + provider, + signer, + Felt::from_hex(address).unwrap(), + chain_id, + ExecutionEncoding::New, + ); + + account.set_block_id(BlockId::Tag(BlockTag::Pending)); + + if i == account_idx { + return Arc::new(account); + } + } + } + + panic!("Account index out of bounds."); +}