feat(node): wire P2P gossip channel to RPC sendRawTx and block announce

This commit is contained in:
Mukan Erkin TÖRÜK 2026-04-24 11:55:27 +03:00
parent 04e5f3d166
commit 9837edfb1f
4 changed files with 58 additions and 19 deletions

View file

@ -7,6 +7,14 @@ Format: [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
## [Unreleased] ## [Unreleased]
## [0.4.0] — 2026-04-24
### Added
- `src/p2p.rs``P2pSender` ve `NodeP2pEvent` (BlockAnnounce, TxGossip); block loop ve RPC handler'larının P2P event'i yayınlamasını sağlar
- `--p2p-addr` ve `--bootstrap` CLI flag'leri — P2P etkinleştirildiğinde event channel başlatılır
- `RpcServer::with_p2p` constructor — RPC katmanına P2P gossip sender enjekte eder
- `nu_sendRawTx`: tx kabul edildiğinde raw bytes P2P gossip channel'ına iletilir
## [0.3.0] — 2026-04-24 ## [0.3.0] — 2026-04-24
### Added ### Added

View file

@ -59,7 +59,7 @@ async fn handle_send_raw_tx(req: &JsonRpcRequest, state: &AppState) -> JsonRpcRe
None => return JsonRpcResponse::err(req.id.clone(), -32602, "Missing tx param".into()), None => return JsonRpcResponse::err(req.id.clone(), -32602, "Missing tx param".into()),
}; };
let tx: RawTransaction = match serde_json::from_value(raw_value) { let tx: RawTransaction = match serde_json::from_value(raw_value.clone()) {
Ok(t) => t, Ok(t) => t,
Err(e) => { Err(e) => {
return JsonRpcResponse::err(req.id.clone(), -32602, format!("Invalid tx format: {e}")) return JsonRpcResponse::err(req.id.clone(), -32602, format!("Invalid tx format: {e}"))
@ -84,6 +84,11 @@ async fn handle_send_raw_tx(req: &JsonRpcRequest, state: &AppState) -> JsonRpcRe
if pool.insert(tx, now_ms) { if pool.insert(tx, now_ms) {
tracing::info!("tx accepted: {tx_id} (pool size: {})", pool.len()); tracing::info!("tx accepted: {tx_id} (pool size: {})", pool.len());
if let Some(ref p2p_tx) = state.p2p_tx {
if let Ok(raw) = serde_json::to_vec(&raw_value) {
let _ = p2p_tx.try_send(raw);
}
}
JsonRpcResponse::ok(req.id.clone(), json!({ "tx_id": tx_id })) JsonRpcResponse::ok(req.id.clone(), json!({ "tx_id": tx_id }))
} else { } else {
JsonRpcResponse::err(req.id.clone(), -32000, "mempool full or sender limit reached".into()) JsonRpcResponse::err(req.id.clone(), -32000, "mempool full or sender limit reached".into())

View file

@ -7,7 +7,7 @@ use axum::{
routing::post, routing::post,
Json, Router, Json, Router,
}; };
use tokio::{net::TcpListener, sync::Mutex}; use tokio::{net::TcpListener, sync::{mpsc, Mutex}};
use crate::{ use crate::{
handlers::dispatch, handlers::dispatch,
@ -20,6 +20,8 @@ pub struct AppState {
pub db: Arc<Mutex<StateDb>>, pub db: Arc<Mutex<StateDb>>,
pub mempool: Arc<Mutex<Mempool>>, pub mempool: Arc<Mutex<Mempool>>,
pub chain_id: String, pub chain_id: String,
/// Raw serialized tx bytes forwarded to P2P gossip on acceptance
pub p2p_tx: Option<mpsc::Sender<Vec<u8>>>,
} }
pub struct RpcServer { pub struct RpcServer {
@ -33,10 +35,20 @@ impl RpcServer {
db: Arc<Mutex<StateDb>>, db: Arc<Mutex<StateDb>>,
mempool: Arc<Mutex<Mempool>>, mempool: Arc<Mutex<Mempool>>,
chain_id: String, chain_id: String,
) -> Self {
Self::with_p2p(bind_addr, db, mempool, chain_id, None)
}
pub fn with_p2p(
bind_addr: impl Into<String>,
db: Arc<Mutex<StateDb>>,
mempool: Arc<Mutex<Mempool>>,
chain_id: String,
p2p_tx: Option<mpsc::Sender<Vec<u8>>>,
) -> Self { ) -> Self {
Self { Self {
bind_addr: bind_addr.into(), bind_addr: bind_addr.into(),
state: Arc::new(AppState { db, mempool, chain_id }), state: Arc::new(AppState { db, mempool, chain_id, p2p_tx }),
} }
} }

View file

@ -13,6 +13,7 @@ use nu_rpc::server::RpcServer;
use nu_state::StateDb; use nu_state::StateDb;
use p2p::{NodeP2pEvent, P2pSender}; use p2p::{NodeP2pEvent, P2pSender};
use tokio::sync::mpsc as rpc_mpsc;
#[derive(Parser)] #[derive(Parser)]
#[command(name = "nu-node", version)] #[command(name = "nu-node", version)]
@ -63,11 +64,15 @@ async fn main() -> Result<()> {
tracing::info!("State DB opened at {}", cli.db_path); tracing::info!("State DB opened at {}", cli.db_path);
// P2P event channel — block_loop publishes, this node forwards to nu-p2p // P2P event channel — block_loop and RPC publish, forwarded to nu-p2p swarm
let p2p_sender = if cli.p2p_addr.is_some() { let (p2p_sender, rpc_p2p_tx) = if cli.p2p_addr.is_some() {
let (tx, mut rx) = mpsc::channel::<NodeP2pEvent>(128); let (block_tx, mut block_rx) = mpsc::channel::<NodeP2pEvent>(128);
let (rpc_tx, mut rpc_rx) = rpc_mpsc::channel::<Vec<u8>>(256);
tokio::spawn(async move { tokio::spawn(async move {
while let Some(event) = rx.recv().await { loop {
tokio::select! {
Some(event) = block_rx.recv() => {
match &event { match &event {
NodeP2pEvent::BlockAnnounce { height, hash } => { NodeP2pEvent::BlockAnnounce { height, hash } => {
tracing::info!(height, hash = %&hash[..8], "p2p: block announce"); tracing::info!(height, hash = %&hash[..8], "p2p: block announce");
@ -76,12 +81,20 @@ async fn main() -> Result<()> {
tracing::debug!(bytes = raw_tx.len(), "p2p: tx gossip"); tracing::debug!(bytes = raw_tx.len(), "p2p: tx gossip");
} }
} }
// TODO Faz 1: forward to nu-p2p swarm via IPC or embedded swarm // TODO: forward to nu-p2p swarm
}
Some(raw) = rpc_rx.recv() => {
tracing::debug!(bytes = raw.len(), "p2p: rpc tx gossip");
// TODO: forward to nu-p2p swarm
}
else => break,
}
} }
}); });
Some(P2pSender(tx))
(Some(P2pSender(block_tx)), Some(rpc_tx))
} else { } else {
None (None, None)
}; };
if cli.dev && cli.validator { if cli.dev && cli.validator {
@ -97,11 +110,12 @@ async fn main() -> Result<()> {
)); ));
} }
let rpc = RpcServer::new( let rpc = RpcServer::with_p2p(
cli.rpc_addr.clone(), cli.rpc_addr.clone(),
Arc::clone(&db), Arc::clone(&db),
Arc::clone(&mempool), Arc::clone(&mempool),
cli.chain_id.clone(), cli.chain_id.clone(),
rpc_p2p_tx,
); );
tracing::info!( tracing::info!(