Commit 35667ab5 authored by nanamicat's avatar nanamicat

smooth

parent 5f37e261
Pipeline #42432 passed with stages
in 56 seconds
......@@ -47,8 +47,6 @@ async fn main() -> anyhow::Result<()> {
let resolver = Resolver::builder_tokio()?.build();
let mut hello = Hello { seq: rand::random(), time: 0 };
loop {
let server_addr = config.server.to_socket_addrs(&resolver).await?;
......@@ -74,7 +72,8 @@ async fn main() -> anyhow::Result<()> {
now = timer.tick() => {
// to clients
hello.time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)?.as_millis() as u32;
let hello = Hello { time: SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)?.as_millis() as u32 };
let len = bincode::encode_into_slice(&hello, &mut buf, bincode::config::standard())?;
for id in connections[&config.id].keys() {
let router = &routers[id];
......@@ -89,15 +88,13 @@ async fn main() -> anyhow::Result<()> {
peers: if now.duration_since(start) < TIMEOUT { Default::default() } else { connections
.iter()
.filter(|(_, to)| to.contains_key(&config.id))
.map(|(from,_)|routers.get_mut(from).unwrap().update(now))
.map(|(from,_)|routers.get_mut(from).unwrap().update(now, start))
.collect()},
via: Default::default(),
plan: Default::default()
};
let len = bincode::encode_into_slice(&uplink, &mut buf, bincode::config::standard())?;
let _ = socket.send_to(&buf[..len], server_addr).await;
hello.seq += 1;
}
}
}
......
......@@ -4,7 +4,6 @@ use std::collections::BTreeMap;
#[derive(Encode, Decode)]
pub struct Hello {
pub seq: u32,
pub time: u32,
}
......
use crate::data::Router as RouterData;
use crate::protocol::{Hello, PeerQuality};
use crate::settings::{INTERVAL, Settings, TIMEOUT};
use crate::settings::{Settings, INTERVAL, TIMEOUT};
use saturating_cast::SaturatingCast;
use std::collections::BTreeMap;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
......@@ -9,12 +9,12 @@ use tokio::time::Instant;
pub struct Router {
pub link_address: SocketAddr,
seq: u32,
delay: i32, // 平滑后的单向延迟 (包含时钟偏差)
prev_delay: i32, // 上一次的传输耗时 (local - remote)
jitter: i32,
receive: u64,
time: Instant,
remote_time: u32,
local_time: Instant,
}
impl Router {
......@@ -33,48 +33,49 @@ impl Router {
pub fn new(data: &RouterData, config: &Settings) -> Router {
Router {
link_address: SocketAddr::new(IpAddr::V4(Router::link_address(config.id, data.id)), config.bind.port()),
seq: rand::random(),
remote_time: rand::random(),
receive: 0,
jitter: 0,
prev_delay: 0,
delay: 0,
time: Instant::now(),
local_time: Instant::now(),
}
}
pub fn on_message(&mut self, data: &Hello) {
let delay = (SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis() as u32).wrapping_sub(data.time) as i32;
self.delay += (delay - self.delay) / 4;
let d = (delay - self.prev_delay).abs();
self.jitter += (d - self.jitter) / 4;
self.prev_delay = delay;
let delta = data.seq.wrapping_sub(self.seq) as i32;
let delta = (data.time.wrapping_sub(self.remote_time) as i32 as f32 / INTERVAL.as_millis() as f32).round() as i32;
match delta {
..=-64 | 64.. => {
self.receive = 1;
self.seq = data.seq;
self.prev_delay = delay;
self.delay = delay;
-63..=-1 => {
self.receive |= 1 << (-delta);
}
0 => return,
1..=63 => {
self.receive = (self.receive << delta) | 1;
self.seq = data.seq;
self.time = Instant::now();
self.remote_time = data.time;
self.local_time = Instant::now();
}
-63..=-1 => {
self.receive |= 1 << (-delta);
_ => {
self.remote_time = data.time;
return;
}
0 => {}
}
let delay = (SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis() as u32).wrapping_sub(data.time) as i32;
self.delay = delay; //+= (delay - self.delay) / 4;
let d = (delay - self.prev_delay).abs();
self.jitter += (d - self.jitter) / 4;
self.prev_delay = delay;
}
pub(crate) fn update(&mut self, now: Instant) -> PeerQuality {
pub(crate) fn update(&mut self, now: Instant, start: Instant) -> PeerQuality {
let reliability = self.receive.count_ones() as u8;
if reliability > 0 {
let duration = now.duration_since(self.time);
let max = now.duration_since(start).div_duration_f32(INTERVAL) as u32;
let reliability = if max < 64 { (reliability as u32 * 64 / max) as u8 } else { reliability };
let duration = now.duration_since(self.local_time);
if duration > TIMEOUT {
self.receive = 0;
Default::default()
......
......@@ -59,5 +59,4 @@ impl Endpoint {
pub const INTERVAL: Duration = Duration::from_secs(1);
pub const TIMEOUT: Duration = Duration::from_secs(60);
pub const HISTORY: u8 = 100;
pub const ROUTE_PROTOCOL: RouteProtocol = RouteProtocol::Other(252);
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment