feat(p2p): implement P2pNode swarm with gossipsub, kademlia, identify, ping

This commit is contained in:
Mukan Erkin TÖRÜK 2026-04-24 11:48:27 +03:00
parent 07493e43ed
commit 2c496771de
8 changed files with 3927 additions and 12 deletions

View file

@ -7,6 +7,19 @@ Format: [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
## [Unreleased] ## [Unreleased]
## [0.2.0] — 2026-04-24
### Added
- `src/node.rs``P2pNode::start()`: libp2p swarm oluşturur, TCP + Noise + Yamux ile dinler, bootstrap peer'lara dial eder
- `P2pCommand` enum: `Publish { topic, message }` ve `Shutdown``mpsc::Receiver` üzerinden dışarıdan kontrol
- Gossipsub topic subscription: `nu/blocks/1`, `nu/txs/1`, `nu/votes/1`, `nu/validators/1`
- Bağlantı event'leri: `ConnectionEstablished/Closed``PeerRegistry` güncellenir
- Gelen gossip mesajları `NetworkMessage` olarak deserialize edilip loglanır
- `src/main.rs``--listen` ve `--bootstrap` CLI argümanları, `clap` derive
### Changed
- `Cargo.toml`: `libp2p macros` feature eklendi (`NetworkBehaviour` derive için), `tracing` `"1"``"0.1"` düzeltildi
## [0.1.0] — 2026-04-24 ## [0.1.0] — 2026-04-24
### Added ### Added

3723
Cargo.lock generated Normal file

File diff suppressed because it is too large Load diff

View file

@ -18,11 +18,13 @@ libp2p = { version = "0.54", features = [
"kad", "kad",
"identify", "identify",
"ping", "ping",
"macros",
] } ] }
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }
serde_json = "1" serde_json = "1"
anyhow = "1" anyhow = "1"
thiserror = "1" thiserror = "1"
tracing = "1" tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] } tracing-subscriber = { version = "0.3", features = ["env-filter"] }
futures = "0.3" futures = "0.3"
clap = { version = "4", features = ["derive"] }

View file

@ -1,8 +1,4 @@
// libp2p combined behaviour: Gossipsub + Kademlia DHT + Identify + Ping use libp2p::{gossipsub, identify, kad, ping, swarm::NetworkBehaviour};
// Full implementation in Faz 1; scaffold defines the composited behaviour struct.
use libp2p::{gossipsub, identify, kad, ping};
use libp2p::swarm::NetworkBehaviour;
#[derive(NetworkBehaviour)] #[derive(NetworkBehaviour)]
pub struct NuBehaviour { pub struct NuBehaviour {

View file

@ -1,4 +1,5 @@
pub mod behaviour; pub mod behaviour;
pub mod messages;
pub mod peer;
pub mod config; pub mod config;
pub mod messages;
pub mod node;
pub mod peer;

View file

@ -1,14 +1,38 @@
use anyhow::Result; use anyhow::Result;
use clap::Parser;
use tokio::sync::mpsc;
use tracing_subscriber::EnvFilter; use tracing_subscriber::EnvFilter;
use nu_p2p::{
config::P2pConfig,
node::{P2pNode, P2pCommand},
};
#[derive(Parser)]
#[command(name = "nu-p2p", version, about = "Narrative Union P2P node")]
struct Cli {
#[arg(long, default_value = "/ip4/0.0.0.0/tcp/30333")]
listen: String,
#[arg(long)]
bootstrap: Vec<String>,
}
#[tokio::main] #[tokio::main]
async fn main() -> Result<()> { async fn main() -> Result<()> {
tracing_subscriber::fmt() tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env()) .with_env_filter(EnvFilter::from_default_env())
.init(); .init();
tracing::info!("nu-p2p starting..."); let cli = Cli::parse();
// TODO Faz 1: build libp2p swarm with NuBehaviour, connect bootstrap peers, run event loop let config = P2pConfig {
Ok(()) listen_addr: cli.listen,
bootstrap_peers: cli.bootstrap,
max_peers: nu_p2p::config::MAX_PEERS,
};
let (_cmd_tx, cmd_rx) = mpsc::channel::<P2pCommand>(32);
P2pNode::new(config).start(cmd_rx).await
} }

157
src/node.rs Normal file
View file

@ -0,0 +1,157 @@
use std::time::Duration;
use anyhow::Result;
use futures::StreamExt;
use libp2p::{
gossipsub, identify, kad, noise, ping,
swarm::SwarmEvent,
tcp, yamux, Multiaddr, PeerId, SwarmBuilder,
};
use tokio::sync::mpsc;
use tracing::{info, warn};
use crate::{
behaviour::{NuBehaviour, NuBehaviourEvent},
config::P2pConfig,
messages::{NetworkMessage, TOPIC_BLOCKS, TOPIC_TXS, TOPIC_VALIDATORS, TOPIC_VOTES},
peer::{PeerInfo, PeerRegistry},
};
pub enum P2pCommand {
Publish { topic: String, message: NetworkMessage },
Shutdown,
}
pub struct P2pNode {
config: P2pConfig,
}
impl P2pNode {
pub fn new(config: P2pConfig) -> Self {
Self { config }
}
pub async fn start(self, mut cmd_rx: mpsc::Receiver<P2pCommand>) -> Result<()> {
let local_key = libp2p::identity::Keypair::generate_ed25519();
let local_peer_id = PeerId::from(local_key.public());
info!("Local peer id: {local_peer_id}");
let mut swarm = SwarmBuilder::with_existing_identity(local_key.clone())
.with_tokio()
.with_tcp(
tcp::Config::default(),
noise::Config::new,
yamux::Config::default,
)?
.with_behaviour(|key| -> Result<NuBehaviour, Box<dyn std::error::Error + Send + Sync>> {
let gossipsub_config = gossipsub::ConfigBuilder::default()
.heartbeat_interval(Duration::from_secs(10))
.validation_mode(gossipsub::ValidationMode::Strict)
.build()
.map_err(|e| Box::<dyn std::error::Error + Send + Sync>::from(e))?;
let gossipsub = gossipsub::Behaviour::new(
gossipsub::MessageAuthenticity::Signed(key.clone()),
gossipsub_config,
)
.map_err(|e| Box::<dyn std::error::Error + Send + Sync>::from(e))?;
let kademlia = kad::Behaviour::new(
local_peer_id,
kad::store::MemoryStore::new(local_peer_id),
);
let identify = identify::Behaviour::new(identify::Config::new(
"/nu/1.0.0".into(),
key.public(),
));
let ping = ping::Behaviour::new(ping::Config::default());
Ok(NuBehaviour { gossipsub, kademlia, identify, ping })
})?
.build();
for topic_str in [TOPIC_BLOCKS, TOPIC_TXS, TOPIC_VOTES, TOPIC_VALIDATORS] {
let topic = gossipsub::IdentTopic::new(topic_str);
swarm.behaviour_mut().gossipsub.subscribe(&topic)?;
}
let listen_addr: Multiaddr = self.config.listen_addr.parse()?;
swarm.listen_on(listen_addr)?;
for addr_str in &self.config.bootstrap_peers {
match addr_str.parse::<Multiaddr>() {
Ok(addr) => { let _ = swarm.dial(addr); }
Err(e) => warn!("Invalid bootstrap addr {addr_str}: {e}"),
}
}
let mut registry = PeerRegistry::new(self.config.max_peers);
loop {
tokio::select! {
event = swarm.select_next_some() => {
match event {
SwarmEvent::NewListenAddr { address, .. } => {
info!("Listening on {address}");
}
SwarmEvent::ConnectionEstablished { peer_id, .. } => {
info!("Connected: {peer_id}");
registry.add(PeerInfo {
peer_id: peer_id.to_string(),
addr: String::new(),
is_validator: false,
});
info!("Peers: {}", registry.count());
}
SwarmEvent::ConnectionClosed { peer_id, .. } => {
info!("Disconnected: {peer_id}");
registry.remove(&peer_id.to_string());
}
SwarmEvent::Behaviour(event) => {
handle_behaviour_event(event);
}
_ => {}
}
}
cmd = cmd_rx.recv() => {
match cmd {
Some(P2pCommand::Publish { topic, message }) => {
let topic = gossipsub::IdentTopic::new(&topic);
if let Ok(data) = serde_json::to_vec(&message) {
let _ = swarm.behaviour_mut().gossipsub.publish(topic, data);
}
}
Some(P2pCommand::Shutdown) | None => {
info!("P2P node shutting down");
break;
}
}
}
}
}
Ok(())
}
}
fn handle_behaviour_event(event: NuBehaviourEvent) {
match event {
NuBehaviourEvent::Gossipsub(gossipsub::Event::Message { message, .. }) => {
match serde_json::from_slice::<NetworkMessage>(&message.data) {
Ok(msg) => info!("Gossip received: {:?}", msg),
Err(e) => warn!("Gossip decode error: {e}"),
}
}
NuBehaviourEvent::Identify(identify::Event::Received { peer_id, info, .. }) => {
info!("Identified peer {peer_id}: {}", info.protocol_version);
}
NuBehaviourEvent::Ping(ping::Event { peer, result, .. }) => {
if let Ok(rtt) = result {
info!("Ping {peer}: {rtt:?}");
}
}
_ => {}
}
}

View file

@ -1,4 +1,3 @@
use std::collections::HashSet;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct PeerInfo { pub struct PeerInfo {