Commit 2550c678 authored by nanamicat's avatar nanamicat

learn

parent 0fbd3a9d
Pipeline #42422 passed with stages
in 2 minutes and 56 seconds
...@@ -3,7 +3,7 @@ use crate::data::Router as RouterData; ...@@ -3,7 +3,7 @@ use crate::data::Router as RouterData;
use crate::protocol::{Hello, MessageType, Uplink}; use crate::protocol::{Hello, MessageType, Uplink};
use crate::router::Router; use crate::router::Router;
use crate::server::Server; use crate::server::Server;
use crate::settings::{Settings, INTERVAL}; use crate::settings::{Settings, INTERVAL, TIMEOUT};
use config::Config; use config::Config;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::fs; use std::fs;
...@@ -19,6 +19,7 @@ mod router; ...@@ -19,6 +19,7 @@ mod router;
mod server; mod server;
mod settings; mod settings;
use hickory_resolver::Resolver; use hickory_resolver::Resolver;
use tokio::time::Instant;
#[tokio::main] #[tokio::main]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
...@@ -42,6 +43,7 @@ async fn main() -> anyhow::Result<()> { ...@@ -42,6 +43,7 @@ async fn main() -> anyhow::Result<()> {
let mut timer = time::interval(INTERVAL); let mut timer = time::interval(INTERVAL);
let mut buf = [0; 1500]; let mut buf = [0; 1500];
let start = Instant::now();
let resolver = Resolver::builder_tokio()?.build(); let resolver = Resolver::builder_tokio()?.build();
...@@ -68,7 +70,7 @@ async fn main() -> anyhow::Result<()> { ...@@ -68,7 +70,7 @@ async fn main() -> anyhow::Result<()> {
} }
} }
_ = timer.tick() => { now = timer.tick() => {
// to clients // to clients
let hello = Hello { time: SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)?.as_millis() as u32 }; let hello = Hello { time: SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)?.as_millis() as u32 };
let len = bincode::encode_into_slice(&hello, &mut buf, bincode::config::standard())?; let len = bincode::encode_into_slice(&hello, &mut buf, bincode::config::standard())?;
...@@ -82,11 +84,11 @@ async fn main() -> anyhow::Result<()> { ...@@ -82,11 +84,11 @@ async fn main() -> anyhow::Result<()> {
id: config.id, id: config.id,
action: if server.online {MessageType::Update} else {MessageType::Query}, action: if server.online {MessageType::Update} else {MessageType::Query},
version: server.version, version: server.version,
peers: connections peers: if now.duration_since(start) < TIMEOUT { Default::default() } else { connections
.iter() .iter()
.filter(|(_, to)| to.contains_key(&config.id)) .filter(|(_, to)| to.contains_key(&config.id))
.map(|(from,_)|routers.get_mut(from).unwrap().update(hello.time)) .map(|(from,_)|routers.get_mut(from).unwrap().update(hello.time))
.collect(), .collect()},
via: Default::default(), via: Default::default(),
plan: Default::default() plan: Default::default()
}; };
......
use crate::data::Router as RouterData; use crate::data::Router as RouterData;
use crate::protocol::{Hello, PeerQuality}; use crate::protocol::{Hello, PeerQuality};
use crate::settings::{HISTORY, INTERVAL, Settings, TIMEOUT}; use crate::settings::{Settings, HISTORY, INTERVAL, TIMEOUT};
use average::Mean; use average::Mean;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::net::{IpAddr, Ipv4Addr, SocketAddr};
...@@ -18,10 +18,7 @@ impl Router { ...@@ -18,10 +18,7 @@ impl Router {
pub fn link_address(from: u8, to: u8) -> Ipv4Addr { pub fn link_address(from: u8, to: u8) -> Ipv4Addr {
Ipv4Addr::from([10, 200, to, from]) Ipv4Addr::from([10, 200, to, from])
} }
pub fn get( pub fn get(routers: &mut BTreeMap<u8, Router>, link_address: SocketAddr) -> Option<&mut Router> {
routers: &mut BTreeMap<u8, Router>,
link_address: SocketAddr,
) -> Option<&mut Router> {
match link_address { match link_address {
SocketAddr::V4(addr) => { SocketAddr::V4(addr) => {
let id = addr.ip().octets()[2]; let id = addr.ip().octets()[2];
...@@ -32,10 +29,7 @@ impl Router { ...@@ -32,10 +29,7 @@ impl Router {
} }
pub fn new(data: &RouterData, config: &Settings) -> Router { pub fn new(data: &RouterData, config: &Settings) -> Router {
Router { Router {
link_address: SocketAddr::new( link_address: SocketAddr::new(IpAddr::V4(Router::link_address(config.id, data.id)), config.bind.port()),
IpAddr::V4(Router::link_address(config.id, data.id)),
config.bind.port(),
),
quality: PeerQuality { quality: PeerQuality {
reliability: 0, reliability: 0,
jitter: 0, jitter: 0,
...@@ -59,49 +53,41 @@ impl Router { ...@@ -59,49 +53,41 @@ impl Router {
pub fn on_message(&mut self, data: &Hello) { pub fn on_message(&mut self, data: &Hello) {
// 这个包发出距离上一个包 // 这个包发出距离上一个包
let diff = data.time.wrapping_sub(self.remote_time) as i32; let diff = data.time.wrapping_sub(self.remote_time) as i32;
let step = (diff as f64 / INTERVAL.as_millis() as f64).round() as isize;
// 收到时间略小于或相等的,可能是网络乱序或重包,忽略 self.local_time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis() as u32;
let delay = self.local_time.wrapping_sub(data.time) as i32;
// 收到时间略小于或相等的,可能是网络乱序或重包
if -(TIMEOUT.as_millis() as i32) < diff && diff <= 0 { if -(TIMEOUT.as_millis() as i32) < diff && diff <= 0 {
return; if let Some(index) = self.history.len().checked_sub((1 - step) as usize)
} && self.history[index].is_none()
if 0 < diff && diff <= (TIMEOUT.as_millis() as i32) { {
self.history[index] = Some(delay);
}
} else if 0 < diff && diff <= (TIMEOUT.as_millis() as i32) {
// 差距较小,补上中间丢的包 // 差距较小,补上中间丢的包
let step = (diff as f64 / INTERVAL.as_millis() as f64).round() as u8;
for _ in 0..step - 1 { for _ in 0..step - 1 {
self.history.push(None); self.history.push(None);
} }
self.history.push(Some(delay));
if self.history.len() > HISTORY as usize {
self.history.drain(0..self.history.len() - HISTORY as usize);
}
self.remote_time = data.time;
} else { } else {
// 差距较大,就 reset // 差距较大,就 reset
self.reset(); self.reset();
} self.history.push(Some(delay));
self.remote_time = data.time;
self.remote_time = data.time;
self.local_time = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis() as u32;
let delay = self.local_time.wrapping_sub(self.remote_time) as i32;
self.history.push(Some(delay));
if self.history.len() > HISTORY as usize {
self.history.drain(0..self.history.len() - HISTORY as usize);
} }
let received: Vec<i32> = self.history.iter().filter_map(|&s| s).collect(); let received: Vec<i32> = self.history.iter().filter_map(|&s| s).collect();
assert!(!received.is_empty()); // 因为走到这里一定刚放过一个进去 assert!(!received.is_empty()); // 因为走到这里一定刚放过一个进去
self.quality.reliability = (received.len() * 255 / HISTORY as usize) as u8; self.quality.reliability = (received.len() as f64 * 255.0 / self.history.len() as f64).round() as u8;
self.quality.delay = received self.quality.delay = received.iter().map(|&x| f64::from(x)).collect::<Mean>().mean() as i16;
.iter() self.quality.jitter = (1..received.len()).map(|i| f64::from(received[i].abs_diff(received[i - 1]))).collect::<Mean>().mean() as u8;
.map(|&x| f64::from(x))
.collect::<Mean>()
.mean() as i16;
self.quality.jitter = (1..received.len())
.map(|i| f64::from(received[i].abs_diff(received[i - 1])))
.collect::<Mean>()
.mean() as u8;
} }
pub(crate) fn update(&mut self, local_time: u32) -> PeerQuality { pub(crate) fn update(&mut self, local_time: u32) -> PeerQuality {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment