mod block_loop; mod genesis; mod p2p; use std::sync::Arc; use anyhow::Result; use clap::Parser; use tokio::sync::{mpsc, Mutex}; use tracing_subscriber::EnvFilter; use nu_mempool::Mempool; use nu_rpc::server::RpcServer; use nu_state::StateDb; use p2p::{NodeP2pEvent, P2pSender}; use tokio::sync::mpsc as rpc_mpsc; #[derive(Parser)] #[command(name = "nu-node", version)] struct Cli { /// Single-validator dev mode — no consensus, produces blocks every slot #[arg(long)] dev: bool, /// Act as block-producing validator #[arg(long)] validator: bool, /// Validator address #[arg(long, default_value = "0xDEV0000000000000000000000000000000000000")] validator_addr: String, /// JSON-RPC HTTP bind address #[arg(long, default_value = "0.0.0.0:9545")] rpc_addr: String, /// RocksDB data directory #[arg(long, default_value = "./data/state")] db_path: String, /// Chain identifier #[arg(long, default_value = "nu-devnet-1")] chain_id: String, /// nu-p2p publish API address (empty = P2P disabled) /// e.g. http://127.0.0.1:30334 #[arg(long)] p2p_api: Option, /// Path to genesis.json — applied once on first start if DB is empty #[arg(long)] genesis: Option, } #[tokio::main] async fn main() -> Result<()> { tracing_subscriber::fmt() .with_env_filter(EnvFilter::from_default_env()) .init(); let cli = Cli::parse(); let db = Arc::new(Mutex::new(StateDb::open(&cli.db_path)?)); let mempool = Arc::new(Mutex::new(Mempool::new())); tracing::info!("State DB opened at {}", cli.db_path); // Apply genesis if provided and DB is empty if let Some(ref genesis_path) = cli.genesis { let db_guard = db.lock().await; if genesis::needs_seed(&db_guard) { genesis::apply(&db_guard, genesis_path)?; } else { tracing::info!("genesis already applied — skipping"); } } // P2P event channel — block_loop and RPC publish, forwarded to nu-p2p via HTTP let (p2p_sender, rpc_p2p_tx) = if let Some(ref api_url) = cli.p2p_api { let (block_tx, mut block_rx) = mpsc::channel::(128); let (rpc_tx, mut rpc_rx) = rpc_mpsc::channel::>(256); let publish_url = format!("{}/publish", api_url.trim_end_matches('/')); let client = reqwest::Client::new(); tokio::spawn(async move { loop { tokio::select! { Some(event) = block_rx.recv() => { let body = match &event { NodeP2pEvent::BlockAnnounce { height, hash } => { tracing::info!(height, hash = %&hash[..8], "p2p: block announce"); serde_json::json!({ "type": "BlockAnnounce", "data": { "height": height, "hash": hash } }) } NodeP2pEvent::TxGossip { raw_tx } => { tracing::debug!(bytes = raw_tx.len(), "p2p: tx gossip"); serde_json::json!({ "type": "TxGossip", "data": { "raw_tx": raw_tx } }) } }; let _ = client.post(&publish_url).json(&body).send().await; } Some(raw) = rpc_rx.recv() => { tracing::debug!(bytes = raw.len(), "p2p: rpc tx gossip"); let body = serde_json::json!({ "type": "TxGossip", "data": { "raw_tx": raw } }); let _ = client.post(&publish_url).json(&body).send().await; } else => break, } } }); (Some(P2pSender(block_tx)), Some(rpc_tx)) } else { (None, None) }; if cli.dev && cli.validator { let cfg = block_loop::BlockLoopConfig { validator_addr: cli.validator_addr.clone(), chain_id: cli.chain_id.clone(), dev_mode: cli.dev, }; tokio::spawn(block_loop::run( cfg, Arc::clone(&db), Arc::clone(&mempool), p2p_sender.clone(), )); } let rpc = RpcServer::with_p2p( cli.rpc_addr.clone(), Arc::clone(&db), Arc::clone(&mempool), cli.chain_id.clone(), rpc_p2p_tx, ); tracing::info!( chain_id = %cli.chain_id, rpc_addr = %cli.rpc_addr, dev = cli.dev, validator = cli.validator, p2p = cli.p2p_api.as_deref().unwrap_or("disabled"), "nu-node ready" ); rpc.run().await?; Ok(()) }