diff --git a/crates/nu-consensus/src/validator_set.rs b/crates/nu-consensus/src/validator_set.rs index e63384b..e2a0f96 100644 --- a/crates/nu-consensus/src/validator_set.rs +++ b/crates/nu-consensus/src/validator_set.rs @@ -11,7 +11,31 @@ impl ValidatorSet { } pub fn register(&mut self, record: ValidatorRecord) { - self.validators.push(record); + if !self.validators.iter().any(|v| v.address == record.address) { + self.validators.push(record); + } + } + + pub fn update(&mut self, address: &str, f: impl FnOnce(&mut ValidatorRecord)) { + if let Some(v) = self.validators.iter_mut().find(|v| v.address == address) { + f(v); + } + } + + pub fn get(&self, address: &str) -> Option<&ValidatorRecord> { + self.validators.iter().find(|v| v.address == address) + } + + pub fn active_count(&self) -> usize { + self.validators.iter().filter(|v| v.is_active).count() + } + + /// Returns the expected block producer for the given slot. + /// In single-validator dev mode this always returns that validator. + pub fn slot_producer(&self, slot: u32, prev_block_hash: &str) -> Option { + let schedule = self.schedule(slot, prev_block_hash); + let idx = (slot as usize) % schedule.len().max(1); + schedule.into_iter().nth(idx) } /// Weighted shuffle seeded by slot + prev_block_hash; filters ineligible validators. @@ -23,9 +47,13 @@ impl ValidatorSet { .iter() .filter(|v| v.is_active && v.stake >= MIN_VALIDATOR_STAKE) .filter(|v| v.consecutive_blocks < MAX_CONSECUTIVE_BLOCKS) + .filter(|v| !v.is_banned(slot)) .collect(); - // Weighted shuffle: higher pon_score → higher probability of early position + if candidates.is_empty() { + return vec![]; + } + Self::weighted_shuffle(&mut candidates, &seed); candidates @@ -43,7 +71,6 @@ impl ValidatorSet { } fn weighted_shuffle(candidates: &mut Vec<&ValidatorRecord>, seed: &[u8; 32]) { - // Fisher-Yates with pon_score-weighted random keys derived from seed for i in (1..candidates.len()).rev() { let key = u64::from_le_bytes(seed[..8].try_into().unwrap()) .wrapping_add(i as u64) @@ -53,3 +80,9 @@ impl ValidatorSet { } } } + +impl Default for ValidatorSet { + fn default() -> Self { + Self::new() + } +} diff --git a/src/block_loop.rs b/src/block_loop.rs index 81da6ea..da62f46 100644 --- a/src/block_loop.rs +++ b/src/block_loop.rs @@ -7,20 +7,26 @@ use nu_block::{ builder::BlockBuilder, types::{Block, RawTransaction, TxPayload}, }; -use nu_consensus::slot::current_slot; +use nu_consensus::{ + slot::current_slot, + types::ValidatorRecord, + validator_set::ValidatorSet, +}; use nu_mempool::Mempool; -use nu_state::StateDb; +use nu_state::{story_node::StoryNodeState, ValidatorState, StateDb}; use nu_vm::execute_block; use crate::p2p::P2pSender; -const MAX_TX_PER_BLOCK: usize = 500; -const BLOCK_INTERVAL_MS: u64 = 6_000; -const SCHEDULER_ADDR: &str = "0x0000000000000000000000000000SCHEDULER"; +const MAX_TX_PER_BLOCK: usize = 500; +const BLOCK_INTERVAL_MS: u64 = 6_000; +const SCHEDULER_ADDR: &str = "0x0000000000000000000000000000SCHEDULER"; pub struct BlockLoopConfig { pub validator_addr: String, pub chain_id: String, + /// dev mode: skip rotation check, always produce + pub dev_mode: bool, } pub async fn run( @@ -29,13 +35,14 @@ pub async fn run( mempool: Arc>, p2p: Option, ) { - let mut ticker = interval(Duration::from_millis(BLOCK_INTERVAL_MS)); + let mut ticker = interval(Duration::from_millis(BLOCK_INTERVAL_MS)); let mut height: u64 = 1; - let mut prev_hash = "0".repeat(64); + let mut prev_hash = "0".repeat(64); tracing::info!( chain_id = %config.chain_id, validator = %config.validator_addr, + dev_mode = config.dev_mode, "block loop started" ); @@ -45,6 +52,28 @@ pub async fn run( let slot = current_slot(); let now_ms = chrono::Utc::now().timestamp_millis(); + // Load validator set from DB + let validator_set = { + let db_guard = db.lock().await; + load_validator_set(&db_guard) + }; + + // Rotation check: in non-dev mode skip if not our slot + if !config.dev_mode { + let expected = validator_set.slot_producer(slot, &prev_hash); + match &expected { + Some(addr) if addr != &config.validator_addr => { + tracing::debug!(slot, expected = %addr, "not our slot — skipping"); + continue; + } + None => { + tracing::warn!(slot, "no eligible validator — skipping"); + continue; + } + _ => {} + } + } + // Scheduler: inject auto-txs for pending nodes { let db_guard = db.lock().await; @@ -93,9 +122,7 @@ pub async fn run( { let mut pool = mempool.lock().await; for r in &result.receipts { - if r.success { - pool.remove(&r.tx_id); - } + if r.success { pool.remove(&r.tx_id); } } } @@ -106,6 +133,8 @@ pub async fn run( if let Err(e) = db_guard.put(&format!("block:{height}"), &block) { tracing::error!(height, "failed to persist block: {e}"); } + // Update validator PoN: honest block produced + update_validator_pon(&db_guard, &config.validator_addr); } if let Some(ref sender) = p2p { @@ -117,8 +146,9 @@ pub async fn run( tracing::info!( slot, height, - applied = result.applied, - failed = result.failed, + applied = result.applied, + failed = result.failed, + validator = %config.validator_addr, "block produced" ); @@ -126,13 +156,51 @@ pub async fn run( } } -/// Scans pending nodes and produces VotingOpen / NodeApprove / NodeReject auto-txs. +fn load_validator_set(db: &StateDb) -> ValidatorSet { + let mut set = ValidatorSet::new(); + let records: Vec = db.scan_prefix("validator:"); + for vs in records { + set.register(ValidatorRecord { + address: vs.address, + stake: vs.stake, + pon_score: vs.pon_score, + is_active: vs.is_active, + last_block: vs.last_block, + slash_count: vs.slash_count, + skip_count: vs.skip_count, + consecutive_blocks: vs.consecutive_blocks, + ban_until_slot: vs.ban_until_slot, + }); + } + set +} + +fn update_validator_pon(db: &StateDb, address: &str) { + use nu_consensus::pon_score::update_on_honest_block; + if let Some(mut vs) = db.get::(&format!("validator:{address}")).ok().flatten() { + let mut record = ValidatorRecord { + address: vs.address.clone(), + stake: vs.stake, + pon_score: vs.pon_score, + is_active: vs.is_active, + last_block: vs.last_block, + slash_count: vs.slash_count, + skip_count: vs.skip_count, + consecutive_blocks: vs.consecutive_blocks, + ban_until_slot: vs.ban_until_slot, + }; + update_on_honest_block(&mut record); + vs.pon_score = record.pon_score; + vs.consecutive_blocks += 1; + vs.skip_count = 0; + let _ = db.put(&format!("validator:{address}"), &vs); + } +} + fn generate_scheduler_txs(db: &StateDb, now_ms: i64) -> Vec { - use nu_state::story_node::StoryNodeState; use sha2::{Digest, Sha256}; let mut txs = Vec::new(); - let nodes: Vec = db.scan_prefix("node_temp:"); for node in nodes { @@ -140,10 +208,9 @@ fn generate_scheduler_txs(db: &StateDb, now_ms: i64) -> Vec { Some(TxPayload::VotingOpen { node_id: node.temp_id.clone() }) } else if node.ready_for_finalization(now_ms) { if node.is_approved() { - let canonical_id = node.temp_id.clone(); // simplified — real impl derives from tree depth Some(TxPayload::NodeApprove { temp_id: node.temp_id.clone(), - canonical_id, + canonical_id: node.temp_id.clone(), }) } else { Some(TxPayload::NodeReject { node_id: node.temp_id.clone() }) diff --git a/src/main.rs b/src/main.rs index a5f6f8a..835e139 100644 --- a/src/main.rs +++ b/src/main.rs @@ -101,6 +101,7 @@ async fn main() -> Result<()> { let cfg = block_loop::BlockLoopConfig { validator_addr: cli.validator_addr.clone(), chain_id: cli.chain_id.clone(), + dev_mode: cli.dev, }; tokio::spawn(block_loop::run( cfg,