From 265097375a850aa29c0068a84afc1f3656d28e870972a42b9746b29266d73437 Mon Sep 17 00:00:00 2001 From: Mukan Erkin Date: Fri, 24 Apr 2026 10:54:48 +0300 Subject: [PATCH] feat(nu-node): single-validator block production loop MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - nu-vm/engine.rs: execute_block runs all txs in a block against StateDb; TokenTransfer fully applied, other variants return "not implemented" receipt - StateAccessor trait: set_balance/inc_nonce now take &self (RocksDB interior mutability) - src/block_loop.rs: tokio task that produces a block each slot (6s) in dev mode; drains mempool, executes txs, removes successful ones, persists block to RocksDB - StateDb wrapped in Arc — block loop holds write lock per block, RPC holds read lock for nu_getAccount - main.rs: spawns block_loop when --dev --validator flags are set Co-Authored-By: Claude Sonnet 4.6 --- Cargo.lock | 5 ++ Cargo.toml | 4 ++ crates/nu-rpc/src/handlers.rs | 28 +++----- crates/nu-rpc/src/server.rs | 10 +-- crates/nu-state/src/accessor.rs | 11 ++-- crates/nu-vm/Cargo.toml | 1 + crates/nu-vm/src/engine.rs | 73 +++++++++++++++++++++ crates/nu-vm/src/executor.rs | 4 +- crates/nu-vm/src/lib.rs | 3 + src/block_loop.rs | 109 ++++++++++++++++++++++++++++++++ src/main.rs | 33 +++++++--- 11 files changed, 241 insertions(+), 40 deletions(-) create mode 100644 crates/nu-vm/src/engine.rs create mode 100644 src/block_loop.rs diff --git a/Cargo.lock b/Cargo.lock index 82e97cf..f69eeed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -808,7 +808,9 @@ name = "nu-node" version = "0.1.0" dependencies = [ "anyhow", + "chrono", "clap", + "hex", "nu-block", "nu-consensus", "nu-mempool", @@ -816,6 +818,8 @@ dependencies = [ "nu-state", "nu-vm", "serde", + "serde_json", + "sha2", "tokio", "tracing", "tracing-subscriber", @@ -855,6 +859,7 @@ name = "nu-vm" version = "0.1.0" dependencies = [ "anyhow", + "nu-block", "nu-state", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index d451991..7c0062e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,6 +42,10 @@ anyhow.workspace = true tracing.workspace = true tracing-subscriber.workspace = true clap.workspace = true +sha2.workspace = true +hex.workspace = true +chrono.workspace = true +serde_json.workspace = true nu-consensus = { path = "crates/nu-consensus" } nu-mempool = { path = "crates/nu-mempool" } nu-state = { path = "crates/nu-state" } diff --git a/crates/nu-rpc/src/handlers.rs b/crates/nu-rpc/src/handlers.rs index cb3f29c..345cebf 100644 --- a/crates/nu-rpc/src/handlers.rs +++ b/crates/nu-rpc/src/handlers.rs @@ -10,7 +10,7 @@ use nu_state::account::AccountState; pub async fn dispatch(req: JsonRpcRequest, state: &AppState) -> JsonRpcResponse { match req.method.as_str() { "nu_chainInfo" => handle_chain_info(&req, state), - "nu_getAccount" => handle_get_account(&req, state), + "nu_getAccount" => handle_get_account(&req, state).await, "nu_sendRawTx" => handle_send_raw_tx(&req, state).await, "nu_getBlock" => not_implemented(&req, "nu_getBlock"), "nu_getTx" => not_implemented(&req, "nu_getTx"), @@ -33,14 +33,15 @@ fn handle_chain_info(req: &JsonRpcRequest, state: &AppState) -> JsonRpcResponse ) } -fn handle_get_account(req: &JsonRpcRequest, state: &AppState) -> JsonRpcResponse { +async fn handle_get_account(req: &JsonRpcRequest, state: &AppState) -> JsonRpcResponse { let address = match req.params.get(0).and_then(|v| v.as_str()) { Some(a) => a.to_string(), None => return JsonRpcResponse::err(req.id.clone(), -32602, "Missing address param".into()), }; let key = format!("account:{address}"); - match state.db.get::(&key) { + let db = state.db.lock().await; + match db.get::(&key) { Ok(Some(account)) => { JsonRpcResponse::ok(req.id.clone(), serde_json::to_value(account).unwrap()) } @@ -55,23 +56,16 @@ fn handle_get_account(req: &JsonRpcRequest, state: &AppState) -> JsonRpcResponse async fn handle_send_raw_tx(req: &JsonRpcRequest, state: &AppState) -> JsonRpcResponse { let raw_value = match req.params.get(0) { Some(v) => v.clone(), - None => { - return JsonRpcResponse::err(req.id.clone(), -32602, "Missing tx param".into()) - } + None => return JsonRpcResponse::err(req.id.clone(), -32602, "Missing tx param".into()), }; let tx: RawTransaction = match serde_json::from_value(raw_value) { Ok(t) => t, Err(e) => { - return JsonRpcResponse::err( - req.id.clone(), - -32602, - format!("Invalid tx format: {e}"), - ) + return JsonRpcResponse::err(req.id.clone(), -32602, format!("Invalid tx format: {e}")) } }; - // Basic structural validation before touching mempool if tx.tx_id.is_empty() { return JsonRpcResponse::err(req.id.clone(), -32602, "tx_id is required".into()); } @@ -79,7 +73,7 @@ async fn handle_send_raw_tx(req: &JsonRpcRequest, state: &AppState) -> JsonRpcRe return JsonRpcResponse::err(req.id.clone(), -32602, "sender is required".into()); } - let tx_id = tx.tx_id.clone(); + let tx_id = tx.tx_id.clone(); let now_ms = chrono::Utc::now().timestamp_millis(); let mut pool = state.mempool.lock().await; @@ -89,14 +83,10 @@ async fn handle_send_raw_tx(req: &JsonRpcRequest, state: &AppState) -> JsonRpcRe } if pool.insert(tx, now_ms) { - tracing::info!("tx accepted into mempool: {tx_id} (pool size: {})", pool.len()); + tracing::info!("tx accepted: {tx_id} (pool size: {})", pool.len()); JsonRpcResponse::ok(req.id.clone(), json!({ "tx_id": tx_id })) } else { - JsonRpcResponse::err( - req.id.clone(), - -32000, - "mempool full or sender limit reached".into(), - ) + JsonRpcResponse::err(req.id.clone(), -32000, "mempool full or sender limit reached".into()) } } diff --git a/crates/nu-rpc/src/server.rs b/crates/nu-rpc/src/server.rs index 1ff5ecc..aa89fb7 100644 --- a/crates/nu-rpc/src/server.rs +++ b/crates/nu-rpc/src/server.rs @@ -14,10 +14,10 @@ use crate::{ types::{JsonRpcRequest, JsonRpcResponse}, }; use nu_mempool::Mempool; -use nu_state::db::StateDb; +use nu_state::StateDb; pub struct AppState { - pub db: Arc, + pub db: Arc>, pub mempool: Arc>, pub chain_id: String, } @@ -30,9 +30,9 @@ pub struct RpcServer { impl RpcServer { pub fn new( bind_addr: impl Into, - db: Arc, - mempool: Arc>, - chain_id: String, + db: Arc>, + mempool: Arc>, + chain_id: String, ) -> Self { Self { bind_addr: bind_addr.into(), diff --git a/crates/nu-state/src/accessor.rs b/crates/nu-state/src/accessor.rs index ad72efd..92c8c56 100644 --- a/crates/nu-state/src/accessor.rs +++ b/crates/nu-state/src/accessor.rs @@ -1,10 +1,13 @@ use crate::{account::AccountState, db::StateDb}; +/// Read/write access to account state. +/// `set_balance` and `inc_nonce` take `&self` because StateDb uses RocksDB's +/// interior mutability — writes do not require exclusive access at the Rust level. pub trait StateAccessor { fn get_balance(&self, address: &str) -> u64; fn get_nonce(&self, address: &str) -> u64; - fn set_balance(&mut self, address: &str, balance: u64); - fn inc_nonce(&mut self, address: &str); + fn set_balance(&self, address: &str, balance: u64); + fn inc_nonce(&self, address: &str); } impl StateAccessor for StateDb { @@ -26,7 +29,7 @@ impl StateAccessor for StateDb { .unwrap_or(0) } - fn set_balance(&mut self, address: &str, balance: u64) { + fn set_balance(&self, address: &str, balance: u64) { let key = format!("account:{address}"); let mut account = self .get::(&key) @@ -37,7 +40,7 @@ impl StateAccessor for StateDb { let _ = self.put(&key, &account); } - fn inc_nonce(&mut self, address: &str) { + fn inc_nonce(&self, address: &str) { let key = format!("account:{address}"); let mut account = self .get::(&key) diff --git a/crates/nu-vm/Cargo.toml b/crates/nu-vm/Cargo.toml index 8e4808b..6670f3d 100644 --- a/crates/nu-vm/Cargo.toml +++ b/crates/nu-vm/Cargo.toml @@ -10,3 +10,4 @@ anyhow.workspace = true thiserror.workspace = true tracing.workspace = true nu-state = { path = "../nu-state" } +nu-block = { path = "../nu-block" } diff --git a/crates/nu-vm/src/engine.rs b/crates/nu-vm/src/engine.rs new file mode 100644 index 0000000..b2e710d --- /dev/null +++ b/crates/nu-vm/src/engine.rs @@ -0,0 +1,73 @@ +use nu_block::types::{Block, RawTransaction, TxPayload, TxReceipt}; +use nu_state::StateAccessor; + +use crate::executor::{execute_token_transfer, ExecutionContext}; + +pub struct BlockResult { + pub receipts: Vec, + pub applied: u32, + pub failed: u32, +} + +/// Executes all transactions in a block against the given state. +/// Each tx is validated and applied atomically; failures produce a receipt +/// but do not roll back already-applied txs. +pub fn execute_block( + block: &Block, + state: &dyn StateAccessor, + now_ms: i64, +) -> BlockResult { + let mut receipts = Vec::with_capacity(block.transactions.len()); + let mut applied = 0u32; + let mut failed = 0u32; + + for tx in &block.transactions { + let receipt = execute_tx(tx, state, block.header.height, now_ms); + if receipt.success { applied += 1; } else { failed += 1; } + receipts.push(receipt); + } + + BlockResult { receipts, applied, failed } +} + +fn execute_tx( + tx: &RawTransaction, + state: &dyn StateAccessor, + block_height: u64, + now_ms: i64, +) -> TxReceipt { + let ctx = ExecutionContext { state, block_height, now_ms }; + + let result = match &tx.payload { + TxPayload::TokenTransfer { to, amount } => { + execute_token_transfer(&ctx, &tx.sender, to, *amount, tx.fee, tx.nonce) + } + // Remaining variants return "not yet implemented" — applied in later Faz 1 tasks + other => { + let name = payload_name(other); + Err(crate::errors::VmError::Unknown(format!("{name} execution not yet implemented"))) + } + }; + + match result { + Ok(()) => TxReceipt { tx_id: tx.tx_id.clone(), success: true, error: String::new() }, + Err(e) => TxReceipt { tx_id: tx.tx_id.clone(), success: false, error: e.to_string() }, + } +} + +fn payload_name(payload: &TxPayload) -> &'static str { + match payload { + TxPayload::TokenTransfer { .. } => "TokenTransfer", + TxPayload::NodeSubmit { .. } => "NodeSubmit", + TxPayload::VoteRegister { .. } => "VoteRegister", + TxPayload::VoteCast { .. } => "VoteCast", + TxPayload::NftTransfer { .. } => "NftTransfer", + TxPayload::CollectionClaim { .. } => "CollectionClaim", + TxPayload::StakeOp { .. } => "StakeOp", + TxPayload::ValidatorRegister { .. }=> "ValidatorRegister", + TxPayload::NodeApprove { .. } => "NodeApprove", + TxPayload::NftMint { .. } => "NftMint", + TxPayload::NodeReject { .. } => "NodeReject", + TxPayload::VotingOpen { .. } => "VotingOpen", + } +} diff --git a/crates/nu-vm/src/executor.rs b/crates/nu-vm/src/executor.rs index 0628bb8..df03301 100644 --- a/crates/nu-vm/src/executor.rs +++ b/crates/nu-vm/src/executor.rs @@ -2,13 +2,13 @@ use crate::errors::VmError; use nu_state::StateAccessor; pub struct ExecutionContext<'a> { - pub state: &'a mut dyn StateAccessor, + pub state: &'a dyn StateAccessor, pub block_height: u64, pub now_ms: i64, } pub fn execute_token_transfer( - ctx: &mut ExecutionContext, + ctx: &ExecutionContext, sender: &str, to: &str, amount: u64, diff --git a/crates/nu-vm/src/lib.rs b/crates/nu-vm/src/lib.rs index f31399d..fdeaed4 100644 --- a/crates/nu-vm/src/lib.rs +++ b/crates/nu-vm/src/lib.rs @@ -1,4 +1,7 @@ +pub mod engine; pub mod executor; pub mod rewards; pub mod slashing; pub mod errors; + +pub use engine::{execute_block, BlockResult}; diff --git a/src/block_loop.rs b/src/block_loop.rs new file mode 100644 index 0000000..9e01d7b --- /dev/null +++ b/src/block_loop.rs @@ -0,0 +1,109 @@ +use std::sync::Arc; + +use tokio::sync::Mutex; +use tokio::time::{interval, Duration}; + +use nu_block::{builder::BlockBuilder, types::Block}; +use nu_consensus::slot::current_slot; +use nu_mempool::Mempool; +use nu_state::StateDb; +use nu_vm::execute_block; + +const MAX_TX_PER_BLOCK: usize = 500; +const BLOCK_INTERVAL_MS: u64 = 6_000; // one slot + +pub struct BlockLoopConfig { + pub validator_addr: String, + pub chain_id: String, +} + +pub async fn run( + config: BlockLoopConfig, + db: Arc>, + mempool: Arc>, +) { + let mut ticker = interval(Duration::from_millis(BLOCK_INTERVAL_MS)); + let mut height: u64 = 1; + let mut prev_hash = "0".repeat(64); + + tracing::info!( + chain_id = %config.chain_id, + validator = %config.validator_addr, + "block loop started" + ); + + loop { + ticker.tick().await; + + let slot = current_slot(); + let now_ms = chrono::Utc::now().timestamp_millis(); + + let pending = { + let mut pool = mempool.lock().await; + pool.select_for_block(MAX_TX_PER_BLOCK, now_ms) + }; + + if pending.is_empty() { + tracing::debug!(slot, height, "no pending txs — skipping block"); + continue; + } + + let mut builder = BlockBuilder::new( + height, + prev_hash.clone(), + slot, + config.validator_addr.clone(), + ); + for p in &pending { + builder.add_tx(p.tx.clone()); + } + + let block = match builder.build("0".repeat(64), vec![]) { + Ok(b) => b, + Err(e) => { + tracing::error!(slot, height, "block build error: {e}"); + continue; + } + }; + + let result = { + let db_guard = db.lock().await; + execute_block(&block, &*db_guard, now_ms) + }; + + { + let mut pool = mempool.lock().await; + for r in &result.receipts { + if r.success { + pool.remove(&r.tx_id); + } + } + } + + let block_key = format!("block:{height}"); + { + let db_guard = db.lock().await; + if let Err(e) = db_guard.put(&block_key, &block) { + tracing::error!(height, "failed to persist block: {e}"); + } + } + + prev_hash = block_hash(&block); + + tracing::info!( + slot, + height, + applied = result.applied, + failed = result.failed, + "block produced" + ); + + height += 1; + } +} + +fn block_hash(block: &Block) -> String { + use sha2::{Digest, Sha256}; + let data = serde_json::to_vec(&block.header).unwrap_or_default(); + hex::encode(Sha256::digest(&data)) +} diff --git a/src/main.rs b/src/main.rs index 6edd683..401c707 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,5 @@ +mod block_loop; + use std::sync::Arc; use anyhow::Result; @@ -12,14 +14,18 @@ use nu_state::StateDb; #[derive(Parser)] #[command(name = "nu-node", version)] struct Cli { - /// Run in single-validator dev mode (no consensus required) + /// Single-validator dev mode — no consensus, produces blocks every slot #[arg(long)] dev: bool, - /// Act as validator in this session + /// Act as block-producing validator #[arg(long)] validator: bool, + /// Validator address (required when --validator is set) + #[arg(long, default_value = "0xDEV0000000000000000000000000000000000000")] + validator_addr: String, + /// JSON-RPC HTTP bind address #[arg(long, default_value = "0.0.0.0:9545")] rpc_addr: String, @@ -41,15 +47,20 @@ async fn main() -> Result<()> { let cli = Cli::parse(); - if cli.dev { - tracing::info!("Starting in --dev mode (single validator, consensus disabled)"); - } - - let db = Arc::new(StateDb::open(&cli.db_path)?); + let db = Arc::new(Mutex::new(StateDb::open(&cli.db_path)?)); tracing::info!("State DB opened at {}", cli.db_path); let mempool = Arc::new(Mutex::new(Mempool::new())); + // Spawn block production loop in dev mode + if cli.dev && cli.validator { + let cfg = block_loop::BlockLoopConfig { + validator_addr: cli.validator_addr.clone(), + chain_id: cli.chain_id.clone(), + }; + tokio::spawn(block_loop::run(cfg, Arc::clone(&db), Arc::clone(&mempool))); + } + let rpc = RpcServer::new( cli.rpc_addr.clone(), Arc::clone(&db), @@ -58,9 +69,11 @@ async fn main() -> Result<()> { ); tracing::info!( - "nu-node ready — chain_id={} rpc={}", - cli.chain_id, - cli.rpc_addr + chain_id = %cli.chain_id, + rpc_addr = %cli.rpc_addr, + dev = cli.dev, + validator = cli.validator, + "nu-node ready" ); rpc.run().await?;