feat(node): wire P2pSender into block_loop; add --p2p-addr and --bootstrap CLI flags

This commit is contained in:
Mukan Erkin TÖRÜK 2026-04-24 11:52:44 +03:00
parent e30652f264
commit 04e5f3d166
3 changed files with 84 additions and 12 deletions

View file

@ -9,8 +9,10 @@ use nu_mempool::Mempool;
use nu_state::StateDb; use nu_state::StateDb;
use nu_vm::execute_block; use nu_vm::execute_block;
use crate::p2p::P2pSender;
const MAX_TX_PER_BLOCK: usize = 500; const MAX_TX_PER_BLOCK: usize = 500;
const BLOCK_INTERVAL_MS: u64 = 6_000; // one slot const BLOCK_INTERVAL_MS: u64 = 6_000;
pub struct BlockLoopConfig { pub struct BlockLoopConfig {
pub validator_addr: String, pub validator_addr: String,
@ -21,6 +23,7 @@ pub async fn run(
config: BlockLoopConfig, config: BlockLoopConfig,
db: Arc<Mutex<StateDb>>, db: Arc<Mutex<StateDb>>,
mempool: Arc<Mutex<Mempool>>, mempool: Arc<Mutex<Mempool>>,
p2p: Option<P2pSender>,
) { ) {
let mut ticker = interval(Duration::from_millis(BLOCK_INTERVAL_MS)); let mut ticker = interval(Duration::from_millis(BLOCK_INTERVAL_MS));
let mut height: u64 = 1; let mut height: u64 = 1;
@ -80,6 +83,8 @@ pub async fn run(
} }
} }
let hash = block_hash(&block);
let block_key = format!("block:{height}"); let block_key = format!("block:{height}");
{ {
let db_guard = db.lock().await; let db_guard = db.lock().await;
@ -88,7 +93,11 @@ pub async fn run(
} }
} }
prev_hash = block_hash(&block); if let Some(ref sender) = p2p {
sender.send_block_announce(height, hash.clone());
}
prev_hash = hash;
tracing::info!( tracing::info!(
slot, slot,

View file

@ -1,16 +1,19 @@
mod block_loop; mod block_loop;
mod p2p;
use std::sync::Arc; use std::sync::Arc;
use anyhow::Result; use anyhow::Result;
use clap::Parser; use clap::Parser;
use tokio::sync::Mutex; use tokio::sync::{mpsc, Mutex};
use tracing_subscriber::EnvFilter; use tracing_subscriber::EnvFilter;
use nu_mempool::Mempool; use nu_mempool::Mempool;
use nu_rpc::server::RpcServer; use nu_rpc::server::RpcServer;
use nu_state::StateDb; use nu_state::StateDb;
use p2p::{NodeP2pEvent, P2pSender};
#[derive(Parser)] #[derive(Parser)]
#[command(name = "nu-node", version)] #[command(name = "nu-node", version)]
struct Cli { struct Cli {
@ -22,7 +25,7 @@ struct Cli {
#[arg(long)] #[arg(long)]
validator: bool, validator: bool,
/// Validator address (required when --validator is set) /// Validator address
#[arg(long, default_value = "0xDEV0000000000000000000000000000000000000")] #[arg(long, default_value = "0xDEV0000000000000000000000000000000000000")]
validator_addr: String, validator_addr: String,
@ -37,6 +40,14 @@ struct Cli {
/// Chain identifier /// Chain identifier
#[arg(long, default_value = "nu-devnet-1")] #[arg(long, default_value = "nu-devnet-1")]
chain_id: String, chain_id: String,
/// P2P listen address (empty = P2P disabled)
#[arg(long)]
p2p_addr: Option<String>,
/// Bootstrap peer multiaddrs
#[arg(long)]
bootstrap: Vec<String>,
} }
#[tokio::main] #[tokio::main]
@ -48,17 +59,42 @@ async fn main() -> Result<()> {
let cli = Cli::parse(); let cli = Cli::parse();
let db = Arc::new(Mutex::new(StateDb::open(&cli.db_path)?)); let db = Arc::new(Mutex::new(StateDb::open(&cli.db_path)?));
tracing::info!("State DB opened at {}", cli.db_path);
let mempool = Arc::new(Mutex::new(Mempool::new())); let mempool = Arc::new(Mutex::new(Mempool::new()));
// Spawn block production loop in dev mode tracing::info!("State DB opened at {}", cli.db_path);
// P2P event channel — block_loop publishes, this node forwards to nu-p2p
let p2p_sender = if cli.p2p_addr.is_some() {
let (tx, mut rx) = mpsc::channel::<NodeP2pEvent>(128);
tokio::spawn(async move {
while let Some(event) = rx.recv().await {
match &event {
NodeP2pEvent::BlockAnnounce { height, hash } => {
tracing::info!(height, hash = %&hash[..8], "p2p: block announce");
}
NodeP2pEvent::TxGossip { raw_tx } => {
tracing::debug!(bytes = raw_tx.len(), "p2p: tx gossip");
}
}
// TODO Faz 1: forward to nu-p2p swarm via IPC or embedded swarm
}
});
Some(P2pSender(tx))
} else {
None
};
if cli.dev && cli.validator { if cli.dev && cli.validator {
let cfg = block_loop::BlockLoopConfig { let cfg = block_loop::BlockLoopConfig {
validator_addr: cli.validator_addr.clone(), validator_addr: cli.validator_addr.clone(),
chain_id: cli.chain_id.clone(), chain_id: cli.chain_id.clone(),
}; };
tokio::spawn(block_loop::run(cfg, Arc::clone(&db), Arc::clone(&mempool))); tokio::spawn(block_loop::run(
cfg,
Arc::clone(&db),
Arc::clone(&mempool),
p2p_sender.clone(),
));
} }
let rpc = RpcServer::new( let rpc = RpcServer::new(
@ -73,6 +109,7 @@ async fn main() -> Result<()> {
rpc_addr = %cli.rpc_addr, rpc_addr = %cli.rpc_addr,
dev = cli.dev, dev = cli.dev,
validator = cli.validator, validator = cli.validator,
p2p = cli.p2p_addr.as_deref().unwrap_or("disabled"),
"nu-node ready" "nu-node ready"
); );

26
src/p2p.rs Normal file
View file

@ -0,0 +1,26 @@
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
/// Minimal P2P event types nu-node yayınlar.
/// nu-p2p swarm bunları alıp gossipsub'a iletir.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", content = "data")]
pub enum NodeP2pEvent {
BlockAnnounce { height: u64, hash: String },
TxGossip { raw_tx: Vec<u8> },
}
/// Block loop ve RPC handler'larının P2P olaylarını göndermesini sağlar.
/// Karşı uçta bir dinleyici yoksa (nu-p2p bağlı değilse) sessizce drop edilir.
#[derive(Clone)]
pub struct P2pSender(pub mpsc::Sender<NodeP2pEvent>);
impl P2pSender {
pub fn send_block_announce(&self, height: u64, hash: String) {
let _ = self.0.try_send(NodeP2pEvent::BlockAnnounce { height, hash });
}
pub fn send_tx_gossip(&self, raw_tx: Vec<u8>) {
let _ = self.0.try_send(NodeP2pEvent::TxGossip { raw_tx });
}
}