Commit eec03bfb authored by root's avatar root

v3 start

parent 35cf976f
/target
/import
\ No newline at end of file
/.idea/
/target/
/import/
\ No newline at end of file
This diff is collapsed.
[package]
name = "railgun-routing-client"
version = "0.1.0"
edition = "2021"
edition = "2024"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
config = "0.13.1"
lazy_static = "1.4.0"
average = "0.13.1"
serde_json = "1.0"
serde = "1.0.159"
tokio = { version = "1", features = ["full"] }
rtnetlink = "0.12.0"
config = "0.15.19"
average = "0.16.0"
serde_json = "1.0.145"
serde = { version = "1.0.228", features = ["derive"] }
serde_derive = "1.0"
tokio = { version = "1.48", features = ["full"] }
rtnetlink = "0.19.0"
anyhow = "1.0.100"
bincode = { version = "2.0.1", features = ["derive"] }
......@@ -3,8 +3,8 @@ use std::collections::{HashMap, HashSet};
use crate::data::{GatewayGroup, Router};
impl GatewayGroup {
pub fn routers(&self, groups: &Vec<GatewayGroup>, routers: &Vec<Router>) -> HashSet<u8> {
return routers
pub fn routers(&self, groups: &[GatewayGroup], routers: &[Router]) -> HashSet<u8> {
routers
.iter()
.filter(|r| self.include_routers.contains(&r.name))
.chain(
......@@ -20,6 +20,6 @@ impl GatewayGroup {
.filter(|g| self.children.contains(&g.name))
.flat_map(|g1| g1.routers(groups, routers)),
)
.collect();
.collect()
}
}
use std::collections::{HashMap, HashSet};
use crate::data::Router as RouterData;
use crate::protocol::{Change, Hello, Report};
use crate::router::Router;
use crate::server::Server;
use crate::settings::{Settings, INTERVAL};
use anyhow::ensure;
use config::Config;
use std::collections::HashMap;
use std::fs;
use std::net::{IpAddr, SocketAddr};
use std::net::{IpAddr, Ipv6Addr, SocketAddr};
use std::time::SystemTime;
use tokio::net::UdpSocket;
use tokio::time;
use crate::data::{GatewayGroup, Router as RouterData};
use crate::protocol::{Change, Hello};
use crate::router::Router;
use crate::server::Server;
use crate::settings::CONFIG;
mod data;
mod gateway_group;
mod protocol;
......@@ -21,70 +21,78 @@ mod server;
mod settings;
#[tokio::main]
async fn main() {
let routers_data =
serde_json::from_slice::<Vec<RouterData>>(&fs::read("import/Router.json").unwrap())
.unwrap();
let groups: Vec<GatewayGroup> =
serde_json::from_slice(&fs::read("import/GatewayGroup.json").unwrap()).unwrap();
let mut routers: HashMap<u8, Router> =
serde_json::from_slice::<Vec<RouterData>>(&fs::read("import/Router.json").unwrap())
.unwrap()
async fn main() -> anyhow::Result<()> {
let config: Settings = Config::builder()
.add_source(config::File::with_name("config/config.json"))
.add_source(config::Environment::with_prefix("RAILGUN"))
.build()?
.try_deserialize()?;
let routers_data = serde_json::from_slice::<Vec<RouterData>>(&fs::read("import/Router.json")?)?;
let mut routers: HashMap<u8, Router> = routers_data
.iter()
.map(|r| (r.id, Router::new(r)))
.map(|r| (r.id, Router::new(r, &config)))
.collect();
// let groups: Vec<GatewayGroup> = serde_json::from_slice(&fs::read("import/GatewayGroup.json")?)?;
let mut server = Server::new(
&routers,
groups
.iter()
.map(|g| (g.id, g.routers(&groups, &routers_data)))
.collect::<HashMap<u16, HashSet<u8>>>(),
// &routers,
// groups
// .iter()
// .map(|g| (g.id, g.routers(&groups, &routers_data)))
// .collect::<HashMap<u16, HashSet<u8>>>(),
);
let mut self_peer = Hello {
id: CONFIG.id,
seq: 0,
time: 0,
};
let mut hello = Hello { time: 0 };
let socket = UdpSocket::bind(SocketAddr::new(IpAddr::from([0, 0, 0, 0]), CONFIG.port))
.await
.unwrap();
let socket = UdpSocket::bind(SocketAddr::new(
IpAddr::V6(Ipv6Addr::UNSPECIFIED),
config.port,
))
.await?;
let mut timer = time::interval(time::Duration::from_millis(CONFIG.interval));
let mut timer = time::interval(INTERVAL);
let mut buf = [0; 1500];
loop {
tokio::select! {
result = socket.recv_from(&mut buf) => {
let (_, src) = result.unwrap();
if src == CONFIG.server {
Ok((len, src)) = socket.recv_from(&mut buf) => {
if src == config.server {
// from server
let message: Change = serde_json::from_slice(&buf).unwrap();
server.on_message(&socket, &message, &mut routers, &self_peer);
} else {
let (message, _): (Change, usize) = bincode::decode_from_slice(&buf[..len], bincode::config::standard())?;
server.on_message(&socket, &message, &mut routers, &hello);
let report = Report {
id: config.id,
ack: server.ack,
peers: Vec::new()
};
let message = bincode::encode_to_vec(&report, bincode::config::standard())?;
let _ = socket.send_to(message.as_slice(), config.server);
} else if let Some(peer) = Router::get(&mut routers, src){
// from client
let message: Hello = serde_json::from_slice(&buf).unwrap();
let peer = routers.get_mut(&message.id).unwrap();
assert_eq!(src, peer.link_address);
let (message, _): (Hello, usize) = bincode::decode_from_slice(&buf[..len], bincode::config::standard())?;
peer.on_message(&message);
}
}
_ = timer.tick() => {
self_peer.time = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let message = serde_json::to_vec(&self_peer).unwrap();
// to clients
hello.time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)?.as_millis() as u16;
let message = bincode::encode_to_vec(&hello, bincode::config::standard())?;
for peer in routers.values() {
let _ = socket.send_to(message.as_slice(), peer.link_address);
}
let _ = server.update(&socket, &mut routers, &self_peer);
self_peer.seq += 1;
// to server
let report = Report {
id: config.id,
ack: server.ack,
peers: routers
.values_mut()
.map(|peer| peer.update(hello.time))
.collect(),
};
let message = bincode::encode_to_vec(&report, bincode::config::standard())?;
let _ = socket.send_to(message.as_slice(), config.server);
}
}
}
......
use std::collections::HashMap;
use bincode::{Decode, Encode};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
#[derive(Encode, Decode)]
pub struct Hello {
pub id: u8,
pub seq: u32,
pub time: u64,
pub time: u16,
}
#[derive(Serialize, Deserialize)]
pub struct Change {
pub seq: u32,
pub via: HashMap<u8, u8>,
pub plan: HashMap<u8, u8>,
#[derive(Encode, Decode)]
pub struct Report {
pub id: u8,
pub ack: u8,
pub peers: Vec<PeerQuality>,
}
#[derive(Serialize, Deserialize)]
#[derive(Encode, Decode, Copy, Clone)]
pub struct PeerQuality {
pub delay: f64,
pub jitter: f64,
pub reliability: f64,
pub delay: i16,
pub reliability: u8,
pub jitter: u8,
}
#[derive(Serialize, Deserialize)]
pub struct Report {
pub id: u8,
pub ack: u32,
pub peers: Option<HashMap<u8, PeerQuality>>,
#[derive(Encode, Decode)]
pub struct Change {
pub seq: u8,
pub via: Vec<(u8, u8)>,
pub plan: Vec<(u8, u8)>,
}
use rtnetlink::{new_connection, Error, Handle};
use std::collections::HashMap;
struct RouteWriter {
via: HashMap<u8, u8>,
plan: HashMap<u16, u8>,
}
impl RouteWriter {
fn new() -> Self {
let (connection, handle, _) = new_connection().unwrap();
tokio::spawn(connection);
Self {
via: HashMap::new(),
plan: HashMap::new(),
};
}
fn set_via(&mut self, to: u8, via: u8) {
self.via.insert(to, via);
}
fn set_plan(&mut self, group: u16, to: u8) {
self.plan.insert(group, to);
}
fn commit(&mut self) {
// self.via.clear();
// self.plan.clear();
}
}
// struct RouteWriter {
// via: HashMap<u8, u8>,
// plan: HashMap<u16, u8>,
// }
//
// impl RouteWriter {
// fn new() -> Self {
// let (connection, handle, _) = new_connection().unwrap();
// tokio::spawn(connection);
//
// Self {
// via: HashMap::new(),
// plan: HashMap::new(),
// };
// }
//
// fn set_via(&mut self, to: u8, via: u8) {
// self.via.insert(to, via);
// }
//
// fn set_plan(&mut self, group: u16, to: u8) {
// self.plan.insert(group, to);
// }
//
// fn commit(&mut self) {
// // self.via.clear();
// // self.plan.clear();
// }
// }
use std::net::{IpAddr, SocketAddr};
use std::time::SystemTime;
use average::Mean;
use crate::data::Router as RouterData;
use crate::protocol::{Hello, PeerQuality};
use crate::settings::CONFIG;
use crate::settings::{Settings, HISTORY, INTERVAL, TIMEOUT};
use average::Mean;
use std::collections::HashMap;
use std::net::{IpAddr, SocketAddr};
use std::time::SystemTime;
pub struct Router {
address: String,
pub link_address: SocketAddr,
subnets: Vec<String>,
delay: f64,
jitter: f64,
reliability: f64,
seq: u32,
time: u64,
history: Vec<Option<u64>>,
quality: PeerQuality,
seq: u8,
remote_time: u16,
local_time: u16,
history: Vec<Option<i16>>,
}
impl Router {
pub fn new(data: &RouterData) -> Router {
pub fn get(routers: &mut HashMap<u8, Router>, link_address: SocketAddr) -> Option<&mut Router> {
match link_address {
SocketAddr::V4(addr) => {
let id = addr.ip().octets()[3];
routers.get_mut(&id)
}
SocketAddr::V6(_) => None,
}
}
pub fn new(data: &RouterData, config: &Settings) -> Router {
Router {
address: String::from(""),
link_address: SocketAddr::new(IpAddr::from([10, 200, data.id, CONFIG.id]), CONFIG.port),
subnets: Vec::new(),
delay: 0.0,
jitter: 0.0,
reliability: 0.0,
link_address: SocketAddr::new(
IpAddr::from([169, 254, data.id, config.id]),
config.port,
),
quality: PeerQuality {
reliability: 0,
jitter: 0,
delay: 0,
},
seq: 0,
time: 0,
local_time: 0,
remote_time: 0,
history: Vec::new(),
}
}
pub fn reset(&mut self) {
self.delay = 0.0;
self.jitter = 0.0;
self.reliability = 0.0;
self.seq = 0;
self.time = 0;
self.quality = PeerQuality {
reliability: 0,
jitter: 0,
delay: 0,
};
self.history.clear();
}
pub fn on_message(&mut self, data: &Hello) {
if data.seq == 0
|| data.seq < self.seq - CONFIG.timeout
|| data.seq > self.seq + CONFIG.timeout
{
// 收到 seq = 0 或 seq 与之前差距较大,就 reset
self.reset();
self.seq = data.seq - 1;
} else if data.seq <= self.seq {
// 收到 seq 比已知略小的,忽略
// 这个包发出距离上一个包
let diff = data.time.wrapping_sub(self.remote_time) as i16;
// 收到时间略小于或相等的,可能是网络乱序或重包,忽略
if -(TIMEOUT.as_millis() as i16) < diff && diff <= 0 {
return;
}
if 0 < diff && diff <= (TIMEOUT.as_millis() as i16) {
// 差距较小,补上中间丢的包
let step = (diff as f64 / INTERVAL.as_millis() as f64).round() as u8;
for _ in 0..step - 1 {
self.history.push(None);
}
} else {
// 差距较大,就 reset
self.reset();
}
let time = SystemTime::now()
self.remote_time = data.time;
self.local_time = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let step = data.seq - self.seq;
let delay = time - data.time;
.as_millis() as u16;
let delay = self.local_time.wrapping_sub(self.remote_time) as i16;
for _ in 0..step - 1 {
self.history.push(None);
}
self.history.push(Some(delay));
self.history
.splice(0..self.history.len() - CONFIG.history, []);
let history: Vec<u64> = self
.history
.iter()
.filter(|s| s.is_some())
.map(|s| s.unwrap())
.collect();
if (self.history.len() > HISTORY as usize) {
self.history.drain(0..self.history.len() - HISTORY as usize);
}
self.reliability = history.len() as f64 / CONFIG.history as f64;
self.delay = history.iter().sum::<u64>() as f64 / history.len() as f64;
self.jitter = (0..history.len() - 1)
.map(|i| (history[i] - history[i + 1]) as f64)
let received: Vec<i16> = self.history.iter().filter_map(|&s| s).collect();
assert!(!received.is_empty()); // 因为走到这里一定刚放过一个进去
self.quality.reliability = (received.len() * 255 / HISTORY as usize) as u8;
self.quality.delay = received
.iter()
.map(|&x| f64::from(x))
.collect::<Mean>()
.mean();
self.seq = data.seq;
self.time = time;
.mean() as i16;
self.quality.jitter = (1..received.len())
.map(|i| f64::from(received[i].abs_diff(received[i - 1])))
.collect::<Mean>()
.mean() as u8;
}
pub(crate) fn update(&mut self, time: u64) -> PeerQuality {
if self.reliability > 0.0 {
pub(crate) fn update(&mut self, local_time: u16) -> PeerQuality {
if self.quality.reliability > 0 {
let diff = (local_time.wrapping_sub(self.local_time) as i16 as f64
/ INTERVAL.as_millis() as f64)
.round() as i16;
// 有几个包没到
let step = ((time - self.time) / CONFIG.interval) as u32;
if step > CONFIG.timeout {
if diff > TIMEOUT.as_millis() as i16 {
self.reset();
} else if diff >= (INTERVAL.as_millis() * 2) as i16 {
self.quality.reliability = self.quality.reliability.saturating_sub(255 / HISTORY);
}
}
let lost = (time - self.time) / CONFIG.interval - 2;
let reliability = f64::max(0.0, self.reliability - lost as f64 / CONFIG.history as f64);
PeerQuality {
delay: self.delay,
jitter: self.jitter,
reliability,
}
self.quality
}
}
......@@ -2,19 +2,16 @@ use std::collections::{HashMap, HashSet};
use tokio::net::UdpSocket;
use crate::data::GatewayGroup;
use crate::protocol::{Change, Hello, Report};
use crate::router::Router;
use crate::settings::CONFIG;
pub struct Server {
ack: u32,
groups: HashMap<u16, HashSet<u8>>,
pub(crate) ack: u8,
}
impl Server {
pub fn new(routers: &HashMap<u8, Router>, groups: HashMap<u16, HashSet<u8>>) -> Self {
Server { ack: 0, groups }
pub fn new() -> Self {
Server { ack: 0 }
}
pub fn on_message(
......@@ -56,20 +53,8 @@ impl Server {
&mut self,
socket: &UdpSocket,
routers: &mut HashMap<u8, Router>,
self_peer: &Hello,
hello: &Hello,
) {
let data = Report {
id: self_peer.id,
ack: self.ack,
peers: Some(
routers
.into_iter()
.map(|(&id, peer)| (id, peer.update(self_peer.time)))
.collect(),
),
};
let message = serde_json::to_vec(&data).unwrap();
let _ = socket.send_to(message.as_slice(), CONFIG.server);
}
}
use std::net::SocketAddr;
use std::time::Duration;
use config::Config;
use lazy_static::lazy_static;
use serde::Deserialize;
use tokio::time;
#[derive(Deserialize)]
pub struct Settings {
pub id: u8,
pub server: SocketAddr,
pub port: u16,
pub timeout: u32,
pub history: usize,
pub interval: u64,
pub table: u16,
pub proto: u16,
// pub TIMEOUT: u32,
// pub history: usize,
// pub INTERVAL: u64,
// pub table: u16,
// pub proto: u16,
}
lazy_static! {
pub static ref CONFIG: Settings = Config::builder()
.add_source(config::File::with_name("config/config.json"))
.add_source(config::Environment::with_prefix("RAILGUN"))
.build()
.unwrap()
.try_deserialize()
.unwrap();
}
pub const INTERVAL: Duration = Duration::from_secs(1);
pub const TIMEOUT: Duration = Duration::from_secs(10);
pub const HISTORY: u8 = 100;
// pub static ref CONFIG: Settings = Config::builder()
// .add_source(config::File::with_name("config/config.json"))
// .add_source(config::Environment::with_prefix("RAILGUN"))
// .build()
// .unwrap()
// .try_deserialize()
// .unwrap();
//
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment