From ba51b72ed2579a54644c4670b88470864fd0402f224a1610f3d87e16acf9158d Mon Sep 17 00:00:00 2001 From: Mukan Erkin Date: Fri, 24 Apr 2026 16:49:26 +0300 Subject: [PATCH] feat(p2p): add HTTP publish API for nu-node integration Co-Authored-By: Claude Sonnet 4.6 --- CHANGELOG.md | 8 ++ CLAUDE.md | 19 ++++- Cargo.lock | 214 ++++++++++++++++++++++++++++++++++++++++++++++-- Cargo.toml | 1 + src/main.rs | 56 ++++++++++++- src/messages.rs | 26 ++++-- 6 files changed, 303 insertions(+), 21 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5acc5a3..eb4b941 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,14 @@ Format: [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) ## [Unreleased] +## [0.3.0] — 2026-04-24 + +### Added +- `--api-addr` CLI argümanı (varsayılan: `127.0.0.1:30334`) — nu-node'un mesaj yayınlayabileceği yerel HTTP endpoint +- `POST /publish` endpoint (axum) — `NetworkMessage` JSON alır, gossipsub'a `P2pCommand::Publish` olarak iletir; nu-node ↔ nu-p2p entegrasyonu tamamlandı +- `NetworkMessage::topic()` — her varyant için doğru gossip topic'i döner (messages.rs) +- `axum = "0.7"` bağımlılığı eklendi + ## [0.2.0] — 2026-04-24 ### Added diff --git a/CLAUDE.md b/CLAUDE.md index aed1c26..334d78e 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -25,11 +25,22 @@ nu/votes/1 → VoteAnnounce nu/validators/1 → ValidatorHeartbeat ``` -## Geliştirme +## nu-node Entegrasyonu + +nu-p2p ayrı bir process olarak çalışır. nu-node, `--p2p-api` flag'i ile bu servisin HTTP adresini alır ve mesajları `POST /publish` üzerinden iletir. ```bash -cargo run --bin nu-p2p +# Nu-p2p başlat (publish API dahil) +cargo run --bin nu-p2p -- \ + --listen /ip4/0.0.0.0/tcp/30333 \ + --api-addr 127.0.0.1:30334 -# Faz 1: iki node arası gossip testi -RUST_LOG=debug cargo run --bin nu-p2p -- --bootstrap /ip4/127.0.0.1/tcp/30333 +# Nu-node'u P2P'ye bağlı başlat +cd ../nu-node && cargo run -- --dev --validator --p2p-api http://127.0.0.1:30334 + +# İki node gossip testi +cargo run --bin nu-p2p -- \ + --listen /ip4/0.0.0.0/tcp/30335 \ + --api-addr 127.0.0.1:30336 \ + --bootstrap /ip4/127.0.0.1/tcp/30333 ``` diff --git a/Cargo.lock b/Cargo.lock index 5ba57e6..a74d7aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -201,13 +201,19 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "attohttpc" version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d9a9bf8b79a749ee0b911b91b671cc2b6c670bdbc7e3dfd537576ddc94bb2a2" dependencies = [ - "http", + "http 0.2.12", "log", "url", ] @@ -218,6 +224,61 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +[[package]] +name = "axum" +version = "0.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" +dependencies = [ + "async-trait", + "axum-core", + "bytes", + "futures-util", + "http 1.4.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.9.0", + "hyper-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "axum-core" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http 1.4.0", + "http-body 1.0.1", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "base-x" version = "0.2.11" @@ -877,7 +938,7 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http", + "http 0.2.12", "indexmap", "slab", "tokio", @@ -1001,6 +1062,16 @@ dependencies = [ "itoa", ] +[[package]] +name = "http" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3ba2a386d7f85a81f119ad7498ebe444d2e22c2af0b86b069416ace48b3311a" +dependencies = [ + "bytes", + "itoa", +] + [[package]] name = "http-body" version = "0.4.6" @@ -1008,7 +1079,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" dependencies = [ "bytes", - "http", + "http 0.2.12", + "pin-project-lite", +] + +[[package]] +name = "http-body" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" +dependencies = [ + "bytes", + "http 1.4.0", +] + +[[package]] +name = "http-body-util" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a" +dependencies = [ + "bytes", + "futures-core", + "http 1.4.0", + "http-body 1.0.1", "pin-project-lite", ] @@ -1035,8 +1129,8 @@ dependencies = [ "futures-core", "futures-util", "h2", - "http", - "http-body", + "http 0.2.12", + "http-body 0.4.6", "httparse", "httpdate", "itoa", @@ -1048,6 +1142,41 @@ dependencies = [ "want", ] +[[package]] +name = "hyper" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6299f016b246a94207e63da54dbe807655bf9e00044f73ded42c3ac5305fbcca" +dependencies = [ + "atomic-waker", + "bytes", + "futures-channel", + "futures-core", + "http 1.4.0", + "http-body 1.0.1", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "smallvec", + "tokio", +] + +[[package]] +name = "hyper-util" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96547c2556ec9d12fb1578c4eaf448b04993e7fb79cbaad930a656880a6bdfa0" +dependencies = [ + "bytes", + "http 1.4.0", + "http-body 1.0.1", + "hyper 1.9.0", + "pin-project-lite", + "tokio", + "tower-service", +] + [[package]] name = "icu_collections" version = "2.2.0" @@ -1194,8 +1323,8 @@ dependencies = [ "attohttpc", "bytes", "futures", - "http", - "hyper", + "http 0.2.12", + "hyper 0.14.32", "log", "rand 0.8.6", "tokio", @@ -1775,12 +1904,24 @@ dependencies = [ "regex-automata", ] +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "memchr" version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" +[[package]] +name = "mime" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" + [[package]] name = "minimal-lexical" version = "0.2.1" @@ -1952,6 +2093,7 @@ name = "nu-p2p" version = "0.1.0" dependencies = [ "anyhow", + "axum", "clap", "futures", "libp2p", @@ -2558,6 +2700,12 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "ryu" +version = "1.0.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9774ba4a74de5f7b1c1451ed6cd5285a32eddb5cccb8cc655a4e50009e06477f" + [[package]] name = "scopeguard" version = "1.2.0" @@ -2613,6 +2761,29 @@ dependencies = [ "zmij", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10a9ff822e371bb5403e391ecd83e182e0e77ba7f6fe0160b795797109d1b457" +dependencies = [ + "itoa", + "serde", + "serde_core", +] + +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + [[package]] name = "sha2" version = "0.10.9" @@ -2758,6 +2929,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" + [[package]] name = "synstructure" version = "0.13.2" @@ -2936,6 +3113,28 @@ dependencies = [ "tokio", ] +[[package]] +name = "tower" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebe5ef63511595f1344e2d5cfa636d973292adc0eec1f0ad45fae9f0851ab1d4" +dependencies = [ + "futures-core", + "futures-util", + "pin-project-lite", + "sync_wrapper", + "tokio", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" + [[package]] name = "tower-service" version = "0.3.3" @@ -2948,6 +3147,7 @@ version = "0.1.44" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100" dependencies = [ + "log", "pin-project-lite", "tracing-attributes", "tracing-core", diff --git a/Cargo.toml b/Cargo.toml index 44536cc..45edc48 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,3 +28,4 @@ tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } futures = "0.3" clap = { version = "4", features = ["derive"] } +axum = "0.7" diff --git a/src/main.rs b/src/main.rs index bcd990b..acbaad9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,11 +1,14 @@ use anyhow::Result; +use axum::{extract::State, routing::post, Json, Router}; use clap::Parser; -use tokio::sync::mpsc; +use std::sync::Arc; +use tokio::{net::TcpListener, sync::mpsc}; use tracing_subscriber::EnvFilter; use nu_p2p::{ config::P2pConfig, - node::{P2pNode, P2pCommand}, + messages::NetworkMessage, + node::{P2pCommand, P2pNode}, }; #[derive(Parser)] @@ -16,6 +19,27 @@ struct Cli { #[arg(long)] bootstrap: Vec, + + /// Local HTTP API for nu-node to publish messages (e.g. 127.0.0.1:30334) + #[arg(long, default_value = "127.0.0.1:30334")] + api_addr: String, +} + +struct ApiState { + cmd_tx: mpsc::Sender, +} + +async fn publish_handler( + State(state): State>, + Json(msg): Json, +) -> axum::http::StatusCode { + let topic = msg.topic(); + let cmd = P2pCommand::Publish { topic, message: msg }; + if state.cmd_tx.try_send(cmd).is_ok() { + axum::http::StatusCode::OK + } else { + axum::http::StatusCode::SERVICE_UNAVAILABLE + } } #[tokio::main] @@ -32,7 +56,31 @@ async fn main() -> Result<()> { max_peers: nu_p2p::config::MAX_PEERS, }; - let (_cmd_tx, cmd_rx) = mpsc::channel::(32); + let (cmd_tx, cmd_rx) = mpsc::channel::(256); - P2pNode::new(config).start(cmd_rx).await + // Spawn P2P swarm + let swarm_handle = { + let node = P2pNode::new(config); + tokio::spawn(async move { node.start(cmd_rx).await }) + }; + + // Publish API — nu-node posts here to gossip messages + let api_state = Arc::new(ApiState { cmd_tx }); + let router = Router::new() + .route("/publish", post(publish_handler)) + .with_state(api_state); + + let listener = TcpListener::bind(&cli.api_addr).await?; + tracing::info!("P2P publish API listening on {}", cli.api_addr); + + tokio::select! { + res = axum::serve(listener, router) => { + if let Err(e) = res { tracing::error!("API server error: {e}"); } + } + res = swarm_handle => { + if let Err(e) = res { tracing::error!("Swarm task error: {e}"); } + } + } + + Ok(()) } diff --git a/src/messages.rs b/src/messages.rs index 1f5c4de..284564c 100644 --- a/src/messages.rs +++ b/src/messages.rs @@ -9,11 +9,25 @@ pub const TOPIC_VALIDATORS: &str = "nu/validators/1"; #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type", content = "data")] pub enum NetworkMessage { - BlockAnnounce { height: u64, hash: String }, - BlockRequest { height: u64 }, - BlockResponse { raw_block: Vec }, - TxGossip { raw_tx: Vec }, - PeerExchange { peers: Vec }, // multiaddrs + BlockAnnounce { height: u64, hash: String }, + BlockRequest { height: u64 }, + BlockResponse { raw_block: Vec }, + TxGossip { raw_tx: Vec }, + PeerExchange { peers: Vec }, ValidatorHeartbeat { address: String, slot: u32 }, - VoteAnnounce { node_id: String, voter: String, approve: bool }, + VoteAnnounce { node_id: String, voter: String, approve: bool }, +} + +impl NetworkMessage { + pub fn topic(&self) -> String { + match self { + Self::BlockAnnounce { .. } + | Self::BlockRequest { .. } + | Self::BlockResponse { .. } => TOPIC_BLOCKS.to_string(), + Self::TxGossip { .. } => TOPIC_TXS.to_string(), + Self::VoteAnnounce { .. } => TOPIC_VOTES.to_string(), + Self::ValidatorHeartbeat { .. } + | Self::PeerExchange { .. } => TOPIC_VALIDATORS.to_string(), + } + } }