Commit 7b63775b authored by nanamicat's avatar nanamicat

fix

parent 2550c678
Pipeline #42427 passed with stages
in 3 minutes and 35 seconds
......@@ -31,17 +31,6 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "average"
version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75aacce61912644dbb732271789748142bc4bf7ce6382d67bb26850516b7a391"
dependencies = [
"easy-cast",
"float-ord",
"num-traits",
]
[[package]]
name = "bincode"
version = "2.0.1"
......@@ -232,9 +221,9 @@ checksum = "2a2330da5de22e8a3cb63252ce2abb30116bf5265e89c0e01bc17015ce30a476"
[[package]]
name = "digest"
version = "0.10.6"
version = "0.10.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8168378f4e5023e7218c89c891c0fd8ecdb5e5e4f18cb78f38cf245dd021e76f"
checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292"
dependencies = [
"block-buffer",
"crypto-common",
......@@ -260,15 +249,6 @@ dependencies = [
"const-random",
]
[[package]]
name = "easy-cast"
version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23f40539c229fc2e4674bdecdf24bfcc2cb83631ca911c78a035fa9f2381c32b"
dependencies = [
"libm",
]
[[package]]
name = "encoding_rs"
version = "0.8.35"
......@@ -313,12 +293,6 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3a3076410a55c90011c298b04d0cfa770b00fa04e1e3c97d3f6c9de105a03844"
[[package]]
name = "float-ord"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ce81f49ae8a0482e4c55ea62ebbd7e5a686af544c00b9d090bba3ff9be97b3d"
[[package]]
name = "foldhash"
version = "0.1.5"
......@@ -691,12 +665,6 @@ version = "0.2.178"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37c93d8daa9d8a012fd8ab92f088405fb202ea0b6ab73ee2482ae66af4f42091"
[[package]]
name = "libm"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "348108ab3fba42ec82ff6e9564fc4ca0247bdccdc68dd8af9764bbc79c3c8ffb"
[[package]]
name = "litemap"
version = "0.8.1"
......@@ -822,16 +790,6 @@ dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "num-traits"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "578ede34cf02f8924ab9447f50c28075b4d3e5b269972345e7e0372b38c6cdcd"
dependencies = [
"autocfg",
"libm",
]
[[package]]
name = "once_cell"
version = "1.21.3"
......@@ -899,7 +857,7 @@ version = "2.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b1403e8401ad5dedea73c626b99758535b342502f8d1e361f4a2dd952749122"
dependencies = [
"thiserror 1.0.40",
"thiserror 1.0.69",
"ucd-trie",
]
......@@ -1002,7 +960,6 @@ name = "railgun-routing-client"
version = "0.1.0"
dependencies = [
"anyhow",
"average",
"bincode",
"config",
"hickory-resolver",
......@@ -1010,6 +967,7 @@ dependencies = [
"netlink-sys",
"rand",
"rtnetlink",
"saturating_cast",
"serde",
"serde_derive",
"serde_json",
......@@ -1104,7 +1062,7 @@ dependencies = [
"netlink-proto",
"netlink-sys",
"nix",
"thiserror 1.0.40",
"thiserror 1.0.69",
"tokio",
]
......@@ -1130,6 +1088,12 @@ version = "1.0.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f91339c0467de62360649f8d3e185ca8de4224ff281f66000de5eb2a77a79041"
[[package]]
name = "saturating_cast"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fc4972f129a0ea378b69fa7c186d63255606e362ad00795f00b869dea5265eb"
[[package]]
name = "scopeguard"
version = "1.1.0"
......@@ -1202,9 +1166,9 @@ dependencies = [
[[package]]
name = "sha2"
version = "0.10.6"
version = "0.10.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "82e6b795fe2e3b1e845bafcb27aa35405c4d47cdfc92af5fc8d3002f76cebdc0"
checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283"
dependencies = [
"cfg-if",
"cpufeatures",
......@@ -1306,11 +1270,11 @@ checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417"
[[package]]
name = "thiserror"
version = "1.0.40"
version = "1.0.69"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "978c9a314bd8dc99be594bc3c175faaa9794be04a5a5e153caba6915336cebac"
checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52"
dependencies = [
"thiserror-impl 1.0.40",
"thiserror-impl 1.0.69",
]
[[package]]
......@@ -1324,9 +1288,9 @@ dependencies = [
[[package]]
name = "thiserror-impl"
version = "1.0.40"
version = "1.0.69"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f"
checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1"
dependencies = [
"proc-macro2",
"quote",
......
......@@ -7,7 +7,6 @@ edition = "2024"
[dependencies]
config = "0.15.19"
average = "0.16.0"
serde_json = "1.0.145"
serde = { version = "1.0.228", features = ["derive"] }
serde_derive = "1.0"
......@@ -21,3 +20,4 @@ netlink-sys = { version = "0.8.7", features = ["tokio"] }
tracing = "0.1.44"
tracing-subscriber = "0.3.22"
rand = "0.9.2"
saturating_cast = "0.1.0"
\ No newline at end of file
......@@ -47,6 +47,8 @@ async fn main() -> anyhow::Result<()> {
let resolver = Resolver::builder_tokio()?.build();
let mut hello = Hello { seq: rand::random(), time: 0 };
loop {
let server_addr = config.server.to_socket_addrs(&resolver).await?;
......@@ -72,7 +74,7 @@ async fn main() -> anyhow::Result<()> {
now = timer.tick() => {
// to clients
let hello = Hello { time: SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)?.as_millis() as u32 };
hello.time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)?.as_millis() as u32;
let len = bincode::encode_into_slice(&hello, &mut buf, bincode::config::standard())?;
for id in connections[&config.id].keys() {
let router = &routers[id];
......@@ -87,13 +89,15 @@ async fn main() -> anyhow::Result<()> {
peers: if now.duration_since(start) < TIMEOUT { Default::default() } else { connections
.iter()
.filter(|(_, to)| to.contains_key(&config.id))
.map(|(from,_)|routers.get_mut(from).unwrap().update(hello.time))
.map(|(from,_)|routers.get_mut(from).unwrap().update(now))
.collect()},
via: Default::default(),
plan: Default::default()
};
let len = bincode::encode_into_slice(&uplink, &mut buf, bincode::config::standard())?;
let _ = socket.send_to(&buf[..len], server_addr).await;
hello.seq += 1;
}
}
}
......
......@@ -4,6 +4,7 @@ use std::collections::BTreeMap;
#[derive(Encode, Decode)]
pub struct Hello {
pub seq: u32,
pub time: u32,
}
......@@ -34,7 +35,7 @@ pub struct Downlink {
pub plan: BTreeMap<u8, BTreeMap<u8, u8>>,
}
#[derive(Encode, Decode, Serialize, Copy, Clone, Debug)]
#[derive(Encode, Decode, Serialize, Copy, Clone, Debug, Default)]
pub struct PeerQuality {
pub delay: i16,
pub reliability: u8,
......
use crate::data::Router as RouterData;
use crate::protocol::{Hello, PeerQuality};
use crate::settings::{Settings, HISTORY, INTERVAL, TIMEOUT};
use average::Mean;
use crate::settings::{INTERVAL, Settings, TIMEOUT};
use saturating_cast::SaturatingCast;
use std::collections::BTreeMap;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::time::SystemTime;
use tokio::time::Instant;
const WINDOW_SIZE: u32 = 64; // 用于计算丢包率的窗口大小(通常64或128)
pub struct Router {
pub link_address: SocketAddr,
quality: PeerQuality,
remote_time: u32,
local_time: u32,
history: Vec<Option<i32>>,
seq: u32,
delay: i32, // 平滑后的单向延迟 (包含时钟偏差)
prev_delay: i32, // 上一次的传输耗时 (local - remote)
jitter: i32,
receive: u64,
time: Instant,
}
impl Router {
......@@ -30,78 +35,60 @@ impl Router {
pub fn new(data: &RouterData, config: &Settings) -> Router {
Router {
link_address: SocketAddr::new(IpAddr::V4(Router::link_address(config.id, data.id)), config.bind.port()),
quality: PeerQuality {
reliability: 0,
jitter: 0,
delay: 0,
},
local_time: 0,
remote_time: 0,
history: Vec::new(),
}
}
pub fn reset(&mut self) {
self.quality = PeerQuality {
reliability: 0,
seq: rand::random(),
receive: 0,
jitter: 0,
prev_delay: 0,
delay: 0,
};
self.history.clear();
time: Instant::now(),
}
}
pub fn on_message(&mut self, data: &Hello) {
// 这个包发出距离上一个包
let diff = data.time.wrapping_sub(self.remote_time) as i32;
let step = (diff as f64 / INTERVAL.as_millis() as f64).round() as isize;
let delay = (SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis() as u32).wrapping_sub(data.time) as i32;
self.delay += (delay - self.delay) / 16;
self.local_time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis() as u32;
let delay = self.local_time.wrapping_sub(data.time) as i32;
let d = (delay - self.prev_delay).abs();
self.jitter += (d - self.jitter) / 16;
self.prev_delay = delay;
// 收到时间略小于或相等的,可能是网络乱序或重包
if -(TIMEOUT.as_millis() as i32) < diff && diff <= 0 {
if let Some(index) = self.history.len().checked_sub((1 - step) as usize)
&& self.history[index].is_none()
{
self.history[index] = Some(delay);
let delta = data.seq.wrapping_sub(self.seq) as i32;
match delta {
..=-64 | 64.. => {
self.receive = 1;
self.seq = data.seq;
self.prev_delay = delay;
self.delay = delay;
}
} else if 0 < diff && diff <= (TIMEOUT.as_millis() as i32) {
// 差距较小,补上中间丢的包
for _ in 0..step - 1 {
self.history.push(None);
1..=63 => {
self.receive = (self.receive << delta) | 1;
self.seq = data.seq;
self.time = Instant::now();
}
self.history.push(Some(delay));
if self.history.len() > HISTORY as usize {
self.history.drain(0..self.history.len() - HISTORY as usize);
-63..=-1 => {
self.receive |= 1 << (-delta);
}
self.remote_time = data.time;
} else {
// 差距较大,就 reset
self.reset();
self.history.push(Some(delay));
self.remote_time = data.time;
0 => {}
}
let received: Vec<i32> = self.history.iter().filter_map(|&s| s).collect();
assert!(!received.is_empty()); // 因为走到这里一定刚放过一个进去
self.quality.reliability = (received.len() as f64 * 255.0 / self.history.len() as f64).round() as u8;
self.quality.delay = received.iter().map(|&x| f64::from(x)).collect::<Mean>().mean() as i16;
self.quality.jitter = (1..received.len()).map(|i| f64::from(received[i].abs_diff(received[i - 1]))).collect::<Mean>().mean() as u8;
}
pub(crate) fn update(&mut self, local_time: u32) -> PeerQuality {
if self.quality.reliability > 0 {
let diff = local_time.wrapping_sub(self.local_time);
// 有几个包没到
if diff > TIMEOUT.as_millis() as u32 {
self.reset();
} else if diff >= (INTERVAL.as_millis() * 2) as u32 {
self.quality.reliability = self.quality.reliability.saturating_sub(255 / HISTORY);
pub(crate) fn update(&mut self, now: Instant) -> PeerQuality {
let reliability = self.receive.count_ones() as u8;
if reliability > 0 {
let duration = now.duration_since(self.time);
if duration > TIMEOUT {
self.receive = 0;
Default::default()
} else {
PeerQuality {
delay: self.delay.saturating_cast(),
jitter: self.jitter.saturating_cast(),
reliability: reliability.saturating_sub((duration.div_duration_f64(INTERVAL) as u8).saturating_sub(1)),
}
}
} else {
Default::default()
}
self.quality
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment