feat(nu-node): implement nu_sendRawTx with mempool integration

- nu-block: TxPayload enum with all variants (user + auto/scheduler)
- nu-mempool: PendingTx wraps RawTransaction; priority derived from TxPayload
- nu-rpc: nu_sendRawTx decodes JSON tx, deduplicates, inserts into mempool
- AppState: holds Arc<Mutex<Mempool>> alongside StateDb
- main.rs: initializes mempool and passes to RpcServer

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Mukan Erkin TÖRÜK 2026-04-24 10:32:26 +03:00
parent a42ca0f8d3
commit fd829ba1dd
10 changed files with 213 additions and 56 deletions

4
Cargo.lock generated
View file

@ -796,6 +796,7 @@ version = "0.1.0"
dependencies = [
"anyhow",
"chrono",
"nu-block",
"serde",
"thiserror",
"tokio",
@ -826,6 +827,9 @@ version = "0.1.0"
dependencies = [
"anyhow",
"axum",
"chrono",
"nu-block",
"nu-mempool",
"nu-state",
"serde",
"serde_json",

View file

@ -13,6 +13,75 @@ pub struct BlockHeader {
pub slot: u32,
}
/// All transaction payload variants. Matches nu-proto TxPayload oneof.
/// "Auto" variants (NodeApprove, NftMint, NodeReject, VotingOpen) are
/// produced by the validator scheduler — never submitted by users.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum TxPayload {
TokenTransfer {
to: String,
amount: u64,
},
NodeSubmit {
story_id: String,
parent_node_id: String,
content_hash: String, // IPFS CID
temp_id: String, // client UUID, replaced at approval
entry_fee: u64,
},
VoteRegister {
node_id: String,
stake_lock: u64,
},
VoteCast {
node_id: String,
approve: bool,
},
NftTransfer {
nft_id: String,
to: String,
},
CollectionClaim {
nft_ids: Vec<String>,
},
StakeOp {
stake: bool, // true = stake, false = unstake
amount: u64,
},
ValidatorRegister {
stake: u64,
},
// --- Auto variants (validator-only) ---
NodeApprove {
temp_id: String,
canonical_id: String,
},
NftMint {
node_id: String,
recipient: String,
},
NodeReject {
node_id: String,
},
VotingOpen {
node_id: String,
},
}
impl TxPayload {
pub fn priority_hint(&self) -> u8 {
match self {
Self::VoteCast { .. } | Self::VoteRegister { .. } => 1,
Self::NodeApprove { .. }
| Self::NftMint { .. }
| Self::NodeReject { .. }
| Self::VotingOpen { .. } => 2,
_ => 0,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RawTransaction {
pub tx_id: String,
@ -20,7 +89,7 @@ pub struct RawTransaction {
pub nonce: u64,
pub fee: u64,
pub sig: Vec<u8>,
pub payload: serde_json::Value,
pub payload: TxPayload,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -31,7 +100,7 @@ pub struct Block {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TxReceipt {
pub tx_id: String,
pub success: bool,
pub error: String,
pub tx_id: String,
pub success: bool,
pub error: String,
}

View file

@ -10,3 +10,4 @@ anyhow.workspace = true
thiserror.workspace = true
tracing.workspace = true
chrono.workspace = true
nu-block = { path = "../nu-block" }

View file

@ -2,4 +2,4 @@ pub mod pool;
pub mod priority;
pub mod types;
pub use pool::Mempool;
pub use pool::{Mempool, PendingTx};

View file

@ -1,15 +1,15 @@
use crate::types::*;
use crate::priority::TxPriority;
use std::collections::HashMap;
use nu_block::types::RawTransaction;
use crate::priority::TxPriority;
use crate::types::{MEMPOOL_MAX_TX, MEMPOOL_TTL_MS, MAX_TX_PER_SENDER};
#[derive(Debug, Clone)]
pub struct PendingTx {
pub tx_id: String,
pub sender: String,
pub fee: u64,
pub priority: TxPriority,
pub tx: RawTransaction,
pub priority: TxPriority,
pub received_at: i64, // Unix epoch ms
pub raw: Vec<u8>,
}
pub struct Mempool {
@ -19,50 +19,62 @@ pub struct Mempool {
impl Mempool {
pub fn new() -> Self {
Self {
txs: vec![],
sender_counts: HashMap::new(),
}
Self { txs: vec![], sender_counts: HashMap::new() }
}
pub fn insert(&mut self, tx: PendingTx) -> bool {
pub fn insert(&mut self, tx: RawTransaction, now_ms: i64) -> bool {
if self.txs.len() >= MEMPOOL_MAX_TX {
return false;
}
let priority = TxPriority::from_payload(&tx.payload);
let count = self.sender_counts.entry(tx.sender.clone()).or_insert(0);
if *count >= MAX_TX_PER_SENDER && tx.priority == TxPriority::Normal {
if *count >= MAX_TX_PER_SENDER && priority == TxPriority::Normal {
return false;
}
*count += 1;
self.txs.push(tx);
self.txs.push(PendingTx { tx, priority, received_at: now_ms });
true
}
pub fn contains(&self, tx_id: &str) -> bool {
self.txs.iter().any(|p| p.tx.tx_id == tx_id)
}
pub fn select_for_block(&mut self, max_tx: usize, now_ms: i64) -> Vec<PendingTx> {
self.evict_expired(now_ms);
let mut sorted = self.txs.clone();
sorted.sort_by(|a, b| {
b.priority.cmp(&a.priority)
.then(b.fee.cmp(&a.fee))
b.priority.cmp(&a.priority).then(b.tx.fee.cmp(&a.tx.fee))
});
sorted.into_iter().take(max_tx).collect()
}
pub fn remove(&mut self, tx_id: &str) {
if let Some(pos) = self.txs.iter().position(|t| t.tx_id == tx_id) {
let tx = self.txs.remove(pos);
if let Some(c) = self.sender_counts.get_mut(&tx.sender) {
if let Some(pos) = self.txs.iter().position(|p| p.tx.tx_id == tx_id) {
let removed = self.txs.remove(pos);
if let Some(c) = self.sender_counts.get_mut(&removed.tx.sender) {
*c = c.saturating_sub(1);
}
}
}
fn evict_expired(&mut self, now_ms: i64) {
self.txs.retain(|t| now_ms - t.received_at < MEMPOOL_TTL_MS);
}
pub fn len(&self) -> usize {
self.txs.len()
}
fn evict_expired(&mut self, now_ms: i64) {
let mut removed_senders: Vec<String> = vec![];
self.txs.retain(|p| {
let keep = now_ms - p.received_at < MEMPOOL_TTL_MS;
if !keep {
removed_senders.push(p.tx.sender.clone());
}
keep
});
for sender in removed_senders {
if let Some(c) = self.sender_counts.get_mut(&sender) {
*c = c.saturating_sub(1);
}
}
}
}

View file

@ -1,9 +1,21 @@
// NodeApprove and VoteCast transactions are prioritized over regular transfers.
// Within same priority tier, higher fee wins.
use nu_block::types::TxPayload;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum TxPriority {
Normal = 0,
High = 1, // VoteCast, VoteRegister
Critical = 2, // NodeApprove, VotingOpen, NodeReject (auto-scheduler)
Normal = 0,
High = 1, // VoteCast, VoteRegister
Critical = 2, // NodeApprove, NftMint, NodeReject, VotingOpen (auto-scheduler)
}
impl TxPriority {
pub fn from_payload(payload: &TxPayload) -> Self {
match payload {
TxPayload::VoteCast { .. } | TxPayload::VoteRegister { .. } => Self::High,
TxPayload::NodeApprove { .. }
| TxPayload::NftMint { .. }
| TxPayload::NodeReject { .. }
| TxPayload::VotingOpen { .. } => Self::Critical,
_ => Self::Normal,
}
}
}

View file

@ -10,4 +10,7 @@ serde_json.workspace = true
anyhow.workspace = true
tracing.workspace = true
axum.workspace = true
chrono.workspace = true
nu-state = { path = "../nu-state" }
nu-mempool = { path = "../nu-mempool" }
nu-block = { path = "../nu-block" }

View file

@ -4,13 +4,14 @@ use crate::{
server::AppState,
types::{JsonRpcRequest, JsonRpcResponse},
};
use nu_block::types::RawTransaction;
use nu_state::account::AccountState;
pub fn dispatch(req: JsonRpcRequest, state: &AppState) -> JsonRpcResponse {
pub async fn dispatch(req: JsonRpcRequest, state: &AppState) -> JsonRpcResponse {
match req.method.as_str() {
"nu_chainInfo" => handle_chain_info(&req, state),
"nu_getAccount" => handle_get_account(&req, state),
"nu_sendRawTx" => handle_send_raw_tx(&req, state),
"nu_sendRawTx" => handle_send_raw_tx(&req, state).await,
"nu_getBlock" => not_implemented(&req, "nu_getBlock"),
"nu_getTx" => not_implemented(&req, "nu_getTx"),
"nu_getStory" => not_implemented(&req, "nu_getStory"),
@ -26,7 +27,7 @@ fn handle_chain_info(req: &JsonRpcRequest, state: &AppState) -> JsonRpcResponse
JsonRpcResponse::ok(
req.id.clone(),
json!({
"chain_id": state.chain_id,
"chain_id": state.chain_id,
"node_version": env!("CARGO_PKG_VERSION"),
}),
)
@ -40,9 +41,10 @@ fn handle_get_account(req: &JsonRpcRequest, state: &AppState) -> JsonRpcResponse
let key = format!("account:{address}");
match state.db.get::<AccountState>(&key) {
Ok(Some(account)) => JsonRpcResponse::ok(req.id.clone(), serde_json::to_value(account).unwrap()),
Ok(Some(account)) => {
JsonRpcResponse::ok(req.id.clone(), serde_json::to_value(account).unwrap())
}
Ok(None) => {
// Return empty account — address exists conceptually with zero balance
let empty = AccountState::new(address);
JsonRpcResponse::ok(req.id.clone(), serde_json::to_value(empty).unwrap())
}
@ -50,19 +52,52 @@ fn handle_get_account(req: &JsonRpcRequest, state: &AppState) -> JsonRpcResponse
}
}
fn handle_send_raw_tx(req: &JsonRpcRequest, _state: &AppState) -> JsonRpcResponse {
// Faz 1: accepts tx hex, decodes, validates, adds to mempool
// For now: echo back a stub tx_id so platform integration can proceed
let _raw = match req.params.get(0).and_then(|v| v.as_str()) {
Some(r) => r,
None => return JsonRpcResponse::err(req.id.clone(), -32602, "Missing raw tx param".into()),
async fn handle_send_raw_tx(req: &JsonRpcRequest, state: &AppState) -> JsonRpcResponse {
let raw_value = match req.params.get(0) {
Some(v) => v.clone(),
None => {
return JsonRpcResponse::err(req.id.clone(), -32602, "Missing tx param".into())
}
};
JsonRpcResponse::err(
req.id.clone(),
-32000,
"nu_sendRawTx: mempool integration pending (Faz 1)".into(),
)
let tx: RawTransaction = match serde_json::from_value(raw_value) {
Ok(t) => t,
Err(e) => {
return JsonRpcResponse::err(
req.id.clone(),
-32602,
format!("Invalid tx format: {e}"),
)
}
};
// Basic structural validation before touching mempool
if tx.tx_id.is_empty() {
return JsonRpcResponse::err(req.id.clone(), -32602, "tx_id is required".into());
}
if tx.sender.is_empty() {
return JsonRpcResponse::err(req.id.clone(), -32602, "sender is required".into());
}
let tx_id = tx.tx_id.clone();
let now_ms = chrono::Utc::now().timestamp_millis();
let mut pool = state.mempool.lock().await;
if pool.contains(&tx_id) {
return JsonRpcResponse::err(req.id.clone(), -32000, "tx already in mempool".into());
}
if pool.insert(tx, now_ms) {
tracing::info!("tx accepted into mempool: {tx_id} (pool size: {})", pool.len());
JsonRpcResponse::ok(req.id.clone(), json!({ "tx_id": tx_id }))
} else {
JsonRpcResponse::err(
req.id.clone(),
-32000,
"mempool full or sender limit reached".into(),
)
}
}
fn not_implemented(req: &JsonRpcRequest, method: &str) -> JsonRpcResponse {

View file

@ -7,16 +7,18 @@ use axum::{
routing::post,
Json, Router,
};
use tokio::net::TcpListener;
use tokio::{net::TcpListener, sync::Mutex};
use crate::{
handlers::dispatch,
types::{JsonRpcRequest, JsonRpcResponse},
};
use nu_mempool::Mempool;
use nu_state::db::StateDb;
pub struct AppState {
pub db: Arc<StateDb>,
pub mempool: Arc<Mutex<Mempool>>,
pub chain_id: String,
}
@ -26,10 +28,15 @@ pub struct RpcServer {
}
impl RpcServer {
pub fn new(bind_addr: impl Into<String>, db: Arc<StateDb>, chain_id: String) -> Self {
pub fn new(
bind_addr: impl Into<String>,
db: Arc<StateDb>,
mempool: Arc<Mutex<Mempool>>,
chain_id: String,
) -> Self {
Self {
bind_addr: bind_addr.into(),
state: Arc::new(AppState { db, chain_id }),
state: Arc::new(AppState { db, mempool, chain_id }),
}
}
@ -61,6 +68,6 @@ async fn rpc_handler(
}
};
let resp = dispatch(req, &state);
let resp = dispatch(req, &state).await;
(StatusCode::OK, Json(resp))
}

View file

@ -2,8 +2,10 @@ use std::sync::Arc;
use anyhow::Result;
use clap::Parser;
use tokio::sync::Mutex;
use tracing_subscriber::EnvFilter;
use nu_mempool::Mempool;
use nu_rpc::server::RpcServer;
use nu_state::StateDb;
@ -46,8 +48,20 @@ async fn main() -> Result<()> {
let db = Arc::new(StateDb::open(&cli.db_path)?);
tracing::info!("State DB opened at {}", cli.db_path);
let rpc = RpcServer::new(cli.rpc_addr.clone(), Arc::clone(&db), cli.chain_id.clone());
tracing::info!("nu-node ready — chain_id={} rpc={}", cli.chain_id, cli.rpc_addr);
let mempool = Arc::new(Mutex::new(Mempool::new()));
let rpc = RpcServer::new(
cli.rpc_addr.clone(),
Arc::clone(&db),
Arc::clone(&mempool),
cli.chain_id.clone(),
);
tracing::info!(
"nu-node ready — chain_id={} rpc={}",
cli.chain_id,
cli.rpc_addr
);
rpc.run().await?;
Ok(())