feat: WS kline stream, çıkış butonu, client temizliği

This commit is contained in:
Mukan Erkin TÖRÜK 2026-04-19 10:33:50 +03:00
parent e5e8fe02e0
commit 5a43e42a0e
6 changed files with 395 additions and 163 deletions

204
Cargo.lock generated
View file

@ -205,6 +205,12 @@ version = "3.20.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d20789868f4b01b2f2caec9f5c4e0213b41e3e5702a50157d699ae31ced2fcb"
[[package]]
name = "byteorder"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
[[package]]
name = "bytes"
version = "1.11.1"
@ -253,6 +259,16 @@ version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d07550c9036bf2ae0c684c4297d503f838287c83c53686d05370d0e139ae570"
[[package]]
name = "core-foundation"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2a6cd9ae233e7f62ba4e9353e81a88df7fc8a5987b8d445b4d90c879bd156f6"
dependencies = [
"core-foundation-sys",
"libc",
]
[[package]]
name = "core-foundation-sys"
version = "0.8.7"
@ -278,6 +294,12 @@ dependencies = [
"typenum",
]
[[package]]
name = "data-encoding"
version = "2.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7a1e2f27636f116493b8b860f5546edb47c8d8f8ea73e1d2a20be88e28d1fea"
[[package]]
name = "digest"
version = "0.10.7"
@ -393,6 +415,17 @@ version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d"
[[package]]
name = "futures-macro"
version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures-sink"
version = "0.3.32"
@ -412,6 +445,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6"
dependencies = [
"futures-core",
"futures-macro",
"futures-sink",
"futures-task",
"pin-project-lite",
"slab",
@ -940,6 +975,7 @@ dependencies = [
"dotenvy",
"env_logger",
"futures-core",
"futures-util",
"hex",
"hmac",
"log",
@ -950,6 +986,7 @@ dependencies = [
"sha2",
"tokio",
"tokio-stream",
"tokio-tungstenite",
"tower 0.4.13",
"tower-http 0.5.2",
"uuid",
@ -976,6 +1013,12 @@ version = "1.70.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe"
[[package]]
name = "openssl-probe"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe"
[[package]]
name = "parking_lot"
version = "0.12.5"
@ -1083,7 +1126,7 @@ dependencies = [
"rustc-hash",
"rustls",
"socket2",
"thiserror",
"thiserror 2.0.18",
"tokio",
"tracing",
"web-time",
@ -1098,13 +1141,13 @@ dependencies = [
"bytes",
"getrandom 0.3.4",
"lru-slab",
"rand",
"rand 0.9.4",
"ring",
"rustc-hash",
"rustls",
"rustls-pki-types",
"slab",
"thiserror",
"thiserror 2.0.18",
"tinyvec",
"tracing",
"web-time",
@ -1145,14 +1188,35 @@ version = "6.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf"
[[package]]
name = "rand"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ca0ecfa931c29007047d1bc58e623ab12e5590e8c7cc53200d5202b69266d8a"
dependencies = [
"libc",
"rand_chacha 0.3.1",
"rand_core 0.6.4",
]
[[package]]
name = "rand"
version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44c5af06bb1b7d3216d91932aed5265164bf384dc89cd6ba05cf59a35f5f76ea"
dependencies = [
"rand_chacha",
"rand_core",
"rand_chacha 0.9.0",
"rand_core 0.9.5",
]
[[package]]
name = "rand_chacha"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
dependencies = [
"ppv-lite86",
"rand_core 0.6.4",
]
[[package]]
@ -1162,7 +1226,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb"
dependencies = [
"ppv-lite86",
"rand_core",
"rand_core 0.9.5",
]
[[package]]
name = "rand_core"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
dependencies = [
"getrandom 0.2.17",
]
[[package]]
@ -1298,6 +1371,18 @@ dependencies = [
"zeroize",
]
[[package]]
name = "rustls-native-certs"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "612460d5f7bea540c490b2b6395d8e34a953e52b491accd6c86c8164c5932a63"
dependencies = [
"openssl-probe",
"rustls-pki-types",
"schannel",
"security-framework",
]
[[package]]
name = "rustls-pki-types"
version = "1.14.0"
@ -1331,12 +1416,44 @@ version = "1.0.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9774ba4a74de5f7b1c1451ed6cd5285a32eddb5cccb8cc655a4e50009e06477f"
[[package]]
name = "schannel"
version = "0.1.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "91c1b7e4904c873ef0710c1f407dde2e6287de2bebc1bbbf7d430bb7cbffd939"
dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "scopeguard"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "security-framework"
version = "3.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b7f4bc775c73d9a02cde8bf7b2ec4c9d12743edf609006c7facc23998404cd1d"
dependencies = [
"bitflags",
"core-foundation",
"core-foundation-sys",
"libc",
"security-framework-sys",
]
[[package]]
name = "security-framework-sys"
version = "2.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ce2691df843ecc5d231c0b14ece2acc3efb62c0a398c7e1d875f3983ce020e3"
dependencies = [
"core-foundation-sys",
"libc",
]
[[package]]
name = "semver"
version = "1.0.28"
@ -1409,6 +1526,17 @@ dependencies = [
"serde",
]
[[package]]
name = "sha1"
version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba"
dependencies = [
"cfg-if",
"cpufeatures",
"digest",
]
[[package]]
name = "sha2"
version = "0.10.9"
@ -1501,13 +1629,33 @@ dependencies = [
"syn",
]
[[package]]
name = "thiserror"
version = "1.0.69"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52"
dependencies = [
"thiserror-impl 1.0.69",
]
[[package]]
name = "thiserror"
version = "2.0.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4288b5bcbc7920c07a1149a35cf9590a2aa808e0bc1eafaade0b80947865fbc4"
dependencies = [
"thiserror-impl",
"thiserror-impl 2.0.18",
]
[[package]]
name = "thiserror-impl"
version = "1.0.69"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
@ -1596,6 +1744,22 @@ dependencies = [
"tokio-util",
]
[[package]]
name = "tokio-tungstenite"
version = "0.24.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "edc5f74e248dc973e0dbb7b74c7e0d6fcc301c694ff50049504004ef4d0cdcd9"
dependencies = [
"futures-util",
"log",
"rustls",
"rustls-native-certs",
"rustls-pki-types",
"tokio",
"tokio-rustls",
"tungstenite",
]
[[package]]
name = "tokio-util"
version = "0.7.18"
@ -1717,6 +1881,26 @@ version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"
[[package]]
name = "tungstenite"
version = "0.24.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "18e5b8366ee7a95b16d32197d0b2604b43a0be89dc5fac9f8e96ccafbaedda8a"
dependencies = [
"byteorder",
"bytes",
"data-encoding",
"http",
"httparse",
"log",
"rand 0.8.6",
"rustls",
"rustls-pki-types",
"sha1",
"thiserror 1.0.69",
"utf-8",
]
[[package]]
name = "typenum"
version = "1.19.0"
@ -1759,6 +1943,12 @@ dependencies = [
"serde",
]
[[package]]
name = "utf-8"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
[[package]]
name = "utf8_iter"
version = "1.0.4"

View file

@ -50,3 +50,7 @@ dotenvy = "0.15"
# SSE
tokio-stream = { version = "0.1", features = ["sync"] }
futures-core = "0.3"
# WebSocket
tokio-tungstenite = { version = "0.24", features = ["rustls-tls-native-roots"] }
futures-util = "0.3"

View file

@ -2,7 +2,7 @@ use anyhow::{anyhow, Result};
use reqwest::Client;
use serde_json::Value;
use super::models::{FilledOrder, Kline, OrderSide, SymbolFilters, Timeframe};
use super::models::{FilledOrder, Kline, OrderSide, SymbolFilters};
const BINANCE_BASE_URL: &str = "https://api.binance.com";
const BINANCE_TESTNET_URL: &str = "https://testnet.binance.vision";
@ -28,35 +28,6 @@ impl BinanceClient {
}
}
/// Son kapanmış mumu getirir (listenin sondan bir önceki elemanı)
pub async fn get_last_closed_kline(
&self,
symbol: &str,
timeframe: &Timeframe,
) -> Result<Kline> {
let url = format!("{}/api/v3/klines", self.base_url);
let response = self
.http
.get(&url)
.query(&[
("symbol", symbol),
("interval", timeframe.as_str()),
("limit", "2"),
])
.send()
.await?
.json::<Value>()
.await?;
// index 0 = kapanmış mum, index 1 = hâlâ açık mum
let kline_data = response
.as_array()
.and_then(|arr| arr.first())
.ok_or_else(|| anyhow!("Kline verisi alınamadı"))?;
parse_kline(kline_data)
}
/// Market alım emri gönderir, gerçekleşen fiyat ve miktarı döner
pub async fn market_buy(
&self,

View file

@ -1,16 +1,22 @@
use std::sync::Arc;
use log::{error, info};
use futures_util::StreamExt;
use log::{error, info, warn};
use serde_json::Value;
use tokio::sync::{broadcast, Mutex};
use tokio::time::{sleep, Duration};
use tokio_tungstenite::connect_async;
use crate::binance::{client::BinanceClient, models::Timeframe};
use crate::binance::{client::BinanceClient, models::{Kline, Timeframe}};
use crate::bot::strategy::RedCandleStrategy;
use crate::storage::config::BotConfig;
use crate::storage::db::Database;
use crate::storage::history::TradeRecord;
use crate::storage::positions::OpenPosition;
const BINANCE_WS_URL: &str = "wss://stream.binance.com/ws";
const BINANCE_TESTNET_WS_URL: &str = "wss://testnet.binance.vision/ws";
#[derive(Debug, Clone, serde::Serialize)]
pub struct TradeEvent {
pub bot_id: String,
@ -35,9 +41,15 @@ impl BotRunner {
event_tx: broadcast::Sender<TradeEvent>,
) {
let client = BinanceClient::new(api_key, api_secret, config.testnet);
let mut last_traded_open_time: Option<i64> = None;
let ws_base = if config.testnet { BINANCE_TESTNET_WS_URL } else { BINANCE_WS_URL };
let stream = format!(
"{}/{}@kline_{}",
ws_base,
config.symbol.to_lowercase(),
config.timeframe.as_str()
);
info!("[{}] Bot başlatıldı.", config.symbol);
info!("[{}] Bot başlatıldı. WS: {}", config.symbol, stream);
loop {
if *shutdown.lock().await {
@ -45,28 +57,49 @@ impl BotRunner {
break;
}
let wait_secs = seconds_until_trigger(&config.timeframe);
info!("[{}] Sonraki kontrol: {} saniye sonra.", config.symbol, wait_secs);
sleep(Duration::from_secs(wait_secs as u64)).await;
match connect_async(&stream).await {
Ok((ws, _)) => {
info!("[{}] WS bağlantısı kuruldu.", config.symbol);
let (_, mut read) = ws.split();
loop {
if *shutdown.lock().await {
info!("[{}] Bot durduruldu.", config.symbol);
break;
return;
}
match read.next().await {
Some(Ok(msg)) => {
let text = match msg.into_text() {
Ok(t) => t,
Err(_) => continue,
};
let kline = match parse_kline_message(&text) {
Some(k) => k,
None => continue,
};
// Sadece kapanmış mumları işle
if !kline.is_closed {
continue;
}
info!("[{}] Mum kapandı. Açılış: {:.6} | Kapanış: {:.6}", config.symbol, kline.open, kline.close);
let kline_data = Kline::from(&kline);
match RedCandleStrategy::execute(
&client,
&config.symbol,
&config.timeframe,
&kline_data,
config.usdt_amount,
config.profit_percent,
&mut last_traded_open_time,
)
.await
{
Ok(Some(result)) => {
info!(
"[{}] ✅ İşlem | Alış: {:.4} | Satış hedefi: {:.4} | Kar: %{:.2}",
"[{}] ✅ İşlem | Alış: {:.6} | Satış hedefi: {:.6} | Kar: %{:.2}",
config.symbol, result.buy_order.price, result.sell_order.price, config.profit_percent
);
@ -103,7 +136,7 @@ impl BotRunner {
}
}
let event = TradeEvent {
event_tx.send(TradeEvent {
bot_id: config.id.clone(),
bot_name: config.name.clone(),
symbol: config.symbol.clone(),
@ -112,32 +145,84 @@ impl BotRunner {
quantity: result.buy_order.quantity,
profit_percent: config.profit_percent,
timestamp: result.timestamp,
};
event_tx.send(event).ok();
}).ok();
}
Ok(None) => {
info!("[{}] ⏭ İşlem yapılmadı.", config.symbol);
}
Err(e) => {
error!("[{}] ❌ HATA: {:?}", config.symbol, e);
error!("[{}] ❌ Strateji hatası: {:?}", config.symbol, e);
}
}
}
Some(Err(e)) => {
warn!("[{}] WS hata: {}. Yeniden bağlanılıyor...", config.symbol, e);
break;
}
None => {
warn!("[{}] WS bağlantısı kapandı. Yeniden bağlanılıyor...", config.symbol);
break;
}
}
}
}
Err(e) => {
error!("[{}] WS bağlanamadı: {}. 5sn sonra tekrar deneniyor.", config.symbol, e);
}
}
if *shutdown.lock().await {
break;
}
sleep(Duration::from_secs(5)).await;
}
}
}
const TRIGGER_BEFORE_SECS: i64 = 60;
struct KlineWs {
open: f64,
close: f64,
open_time: i64,
close_time: i64,
high: f64,
low: f64,
volume: f64,
is_closed: bool,
}
fn seconds_until_trigger(timeframe: &Timeframe) -> i64 {
let now = chrono::Utc::now().timestamp();
let duration = timeframe.duration_secs();
let candle_start = (now / duration) * duration;
let candle_close = candle_start + duration;
let trigger_at = candle_close - TRIGGER_BEFORE_SECS;
let wait = trigger_at - now;
if wait <= 0 {
candle_close + duration - TRIGGER_BEFORE_SECS - now
} else {
wait
fn parse_kline_message(text: &str) -> Option<KlineWs> {
let v: Value = serde_json::from_str(text).ok()?;
let k = v.get("k")?;
Some(KlineWs {
open_time: k["t"].as_i64()?,
close_time: k["T"].as_i64()?,
open: k["o"].as_str()?.parse().ok()?,
close: k["c"].as_str()?.parse().ok()?,
high: k["h"].as_str()?.parse().ok()?,
low: k["l"].as_str()?.parse().ok()?,
volume: k["v"].as_str()?.parse().ok()?,
is_closed: k["x"].as_bool()?,
})
}
impl KlineWs {
fn is_red(&self) -> bool {
self.close < self.open
}
}
// Kline dönüşümü strategy için
impl From<&KlineWs> for Kline {
fn from(k: &KlineWs) -> Self {
Kline {
open_time: k.open_time,
close_time: k.close_time,
open: k.open,
close: k.close,
high: k.high,
low: k.low,
volume: k.volume,
}
}
}

View file

@ -4,81 +4,55 @@ use log::info;
use crate::binance::{client::BinanceClient, models::Kline};
/// Kırmızı mum stratejisinin karar ve emir mantığı
pub struct RedCandleStrategy;
impl RedCandleStrategy {
/// Son kapanmış mumu kontrol eder.
/// Kırmızıysa market alım + limit satış emri girer.
/// Aynı mum için daha önce işlem yapıldıysa atlar (last_traded_open_time kontrolü).
/// Kapanmış kline verilir. Kırmızıysa market alım + limit satış emri girer.
pub async fn execute(
client: &BinanceClient,
symbol: &str,
timeframe: &crate::binance::models::Timeframe,
kline: &Kline,
usdt_amount: f64,
profit_percent: f64,
last_traded_open_time: &mut Option<i64>,
) -> Result<Option<TradeResult>> {
let kline = client.get_last_closed_kline(symbol, timeframe).await?;
// Aynı mum için tekrar işlem engeli
if let Some(last_time) = *last_traded_open_time {
if kline.open_time == last_time {
info!(
"[{}] Aynı mum için zaten işlem yapıldı (open_time: {}), atlanıyor.",
symbol, kline.open_time
);
return Ok(None);
}
}
if !kline.is_red() {
info!(
"[{}] Son mum kırmızı değil. Açılış: {:.4} | Kapanış: {:.4}",
"[{}] Son mum kırmızı değil. Açılış: {:.6} | Kapanış: {:.6}",
symbol, kline.open, kline.close
);
return Ok(None);
}
info!(
"[{}] KIRMIZI MUM! Açılış: {:.4} | Kapanış: {:.4} | Alım yapılıyor...",
"[{}] KIRMIZI MUM! Açılış: {:.6} | Kapanış: {:.6} | Alım yapılıyor...",
symbol, kline.open, kline.close
);
// Exchange filtrelerini al (lot size, tick size)
let filters = client.get_symbol_filters(symbol).await?;
// Alım miktarını hesapla (mevcut kapanış fiyatı üzerinden)
let raw_qty = usdt_amount / kline.close;
let quantity = filters.format_qty(raw_qty);
info!("[{}] Market alım gönderiliyor | Miktar: {} | Fiyat tahmini: {:.4}", symbol, quantity, kline.close);
// Market alım emri
info!("[{}] Market alım | Miktar: {} | Fiyat tahmini: {:.6}", symbol, quantity, kline.close);
let buy_order = client.market_buy(symbol, &quantity).await?;
info!(
"[{}] Alım gerçekleşti | Fiyat: {:.4} | Miktar: {} | Emir ID: {}",
"[{}] Alım gerçekleşti | Fiyat: {:.6} | Miktar: {} | Emir ID: {}",
symbol, buy_order.price, filters.format_qty(buy_order.quantity), buy_order.order_id
);
// Satış fiyatı = bu işlemin alış fiyatı × (1 + kar%)
let sell_price = buy_order.price * (1.0 + profit_percent / 100.0);
let sell_price_str = filters.format_price(sell_price);
let sell_quantity = filters.format_qty(buy_order.quantity);
// Limit satış emri
let sell_order = client
.limit_sell(symbol, &sell_quantity, &sell_price_str)
.await?;
let sell_order = client.limit_sell(symbol, &sell_quantity, &sell_price_str).await?;
info!(
"[{}] Limit satış emri girildi | Fiyat: {:.4} | Emir ID: {}",
"[{}] Limit satış emri | Fiyat: {:.6} | Emir ID: {}",
symbol, sell_price, sell_order.order_id
);
*last_traded_open_time = Some(kline.open_time);
Ok(Some(TradeResult {
symbol: symbol.to_string(),
kline,
kline: kline.clone(),
buy_order,
sell_order,
profit_percent,
@ -87,7 +61,6 @@ impl RedCandleStrategy {
}
}
/// İşlem sonucu
#[derive(Debug, Clone)]
pub struct TradeResult {
pub symbol: String,
@ -97,4 +70,3 @@ pub struct TradeResult {
pub profit_percent: f64,
pub timestamp: i64,
}

View file

@ -169,6 +169,7 @@
<button class="mode-btn" id="btn-testnet" onclick="switchMode('testnet')">Testnet</button>
<button class="mode-btn" id="btn-live" onclick="switchMode('live')">Canlı</button>
</div>
<button class="btn" onclick="logout()" style="font-size:11px;padding:4px 10px;color:var(--muted)">Çıkış</button>
</header>
<main>
@ -229,6 +230,15 @@ function showAuthOverlay() {
document.getElementById('auth-overlay').style.display = 'flex';
}
function logout() {
AUTH_TOKEN = '';
localStorage.removeItem('mse_token');
if (sseSource) { sseSource.close(); sseSource = null; }
document.getElementById('token-input').value = '';
document.getElementById('auth-error').style.display = 'none';
showAuthOverlay();
}
async function tryConnect() {
const res = await fetch('/api/bots', { headers: { 'Authorization': 'Bearer ' + AUTH_TOKEN } });
if (res.status === 401) {