Commit 1ccde179 authored by nanamicat's avatar nanamicat

route table

parent fd7351ee
......@@ -27,7 +27,7 @@ async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let config: Settings = Config::builder().add_source(config::Environment::default()).build()?.try_deserialize()?;
let routers_data = serde_json::from_slice::<Vec<RouterData>>(&fs::read("import/data/Router.json")?)?;
let mut routers: BTreeMap<u8, Router> = routers_data.iter().map(|r| (r.id, Router::new(r, &config))).collect();
let mut routers: BTreeMap<u8, Router> = routers_data.into_iter().map(|r| (r.id, Router::new(r, &config))).collect();
let connections = serde_json::from_slice::<BTreeMap<u8, BTreeMap<u8, Connection>>>(&fs::read("import/connections.json")?)?;
// let groups: Vec<GatewayGroup> = serde_json::from_slice(&fs::read("import/GatewayGroup.json")?)?;
......@@ -64,7 +64,7 @@ async fn main() -> anyhow::Result<()> {
peer.on_message(&hello);
} else if addr.port() == config.server.port
&& let Ok((downlink, _)) = bincode::decode_from_slice(&buf[..len], bincode::config::standard())
&& let Some(uplink) = server.on_message(downlink, &routers_data, &connections[&config.id], &config).await
&& let Some(uplink) = server.on_message(downlink, &routers, &connections[&config.id], &config).await
{
let len = bincode::encode_into_slice(uplink, &mut buf, bincode::config::standard())?;
let _ = socket.send_to(&buf[..len], addr).await;
......
use crate::{
data::Router as RouterData,
protocol::{Hello, PeerQuality},
settings::{Settings, INTERVAL, WINDOW},
};
use crate::{data, data::Router as RouterData, protocol::{Hello, PeerQuality}, settings::{Settings, INTERVAL, WINDOW}};
use saturating_cast::SaturatingCast;
use std::{
collections::BTreeMap,
......@@ -19,6 +15,7 @@ pub struct Router {
receive: u64,
remote_time: u32,
local_time: Instant,
pub(crate) data: data::Router
}
impl Router {
......@@ -28,7 +25,7 @@ impl Router {
SocketAddr::V6(_) => None,
}
}
pub fn new(data: &RouterData, config: &Settings) -> Router {
pub fn new(data: RouterData, config: &Settings) -> Router {
Router {
link_address: SocketAddr::new(IpAddr::V4(Ipv4Addr::from([10, 200, data.id, config.id])), config.bind.port()),
remote_time: rand::random(),
......@@ -37,11 +34,12 @@ impl Router {
prev_delay: 0,
delay: 0,
local_time: Instant::now(),
data
}
}
pub fn on_message(&mut self, data: &Hello) {
let delta = (data.time.wrapping_sub(self.remote_time) as i32 as f32 / INTERVAL.as_millis() as f32).round() as i32;
pub fn on_message(&mut self, message: &Hello) {
let delta = (message.time.wrapping_sub(self.remote_time) as i32 as f32 / INTERVAL.as_millis() as f32).round() as i32;
const WINDOW_MAX: i32 = WINDOW as i32;
const WINDOW_MIN: i32 = 1 - WINDOW_MAX;
match delta {
......@@ -53,18 +51,18 @@ impl Router {
0..WINDOW_MAX => {
// 0 可能是重启
self.receive = (self.receive << delta) | 1;
self.remote_time = data.time;
self.remote_time = message.time;
self.local_time = Instant::now();
}
_ => {
// 可能是下线很久之后上线,也可能是别的异常情况,先承认这个报文,但是不计算丢包和延迟
self.remote_time = data.time;
self.remote_time = message.time;
self.local_time = Instant::now();
return;
}
}
let delay = (SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis() as u32).wrapping_sub(data.time) as i32;
let delay = (SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis() as u32).wrapping_sub(message.time) as i32;
self.delay += (delay - self.delay) / 8;
......
use crate::connection::Connection;
use crate::protocol::{Downlink, MessageType, Uplink};
use crate::router::Router;
use crate::settings::Settings;
use crate::{connection::Connection, data};
use netlink_sys::{protocols::NETLINK_ROUTE, Socket, TokioSocket};
use crate::settings::{ROUTE_PROTOCOL, Settings};
use rtnetlink::RouteMessageBuilder;
use std::collections::BTreeMap;
use std::os::unix::io::{AsRawFd, FromRawFd};
use std::net::Ipv4Addr;
pub struct Server {
pub(crate) online: bool,
......@@ -16,10 +16,7 @@ pub struct Server {
impl Server {
pub fn new(id: u8, routers: &BTreeMap<u8, Router>) -> Self {
let socket = std::mem::ManuallyDrop::new(Socket::new(NETLINK_ROUTE).unwrap());
socket.set_netlink_get_strict_chk(true).unwrap();
let socket = unsafe { TokioSocket::from_raw_fd(socket.as_raw_fd()) };
let (connection, handle, _) = rtnetlink::from_socket(socket);
let (connection, handle) = rtnetlink::new_connection().unwrap();
tokio::spawn(connection);
Server {
online: false,
......@@ -33,7 +30,7 @@ impl Server {
pub async fn on_message(
&mut self,
mut message: Downlink,
routers: &Vec<data::Router>,
routers: &BTreeMap<u8, Router>,
connections: &BTreeMap<u8, Connection>,
config: &Settings, // routers: &mut HashMap<u8, Router>,
// self_peer: &Hello,
......@@ -45,8 +42,10 @@ impl Server {
match (self.online, message.action) {
(false, MessageType::Full) => {
self.reset(routers, connections, config).await;
self.online = true;
for (to, via) in self.via.iter_mut() {
*via = *to;
}
self.via.append(&mut message.via);
self.plan.append(&mut message.plan);
Some(Uplink {
......@@ -82,30 +81,24 @@ impl Server {
}
}
pub async fn reset(&self, routers: &Vec<data::Router>, connections: &BTreeMap<u8, Connection>, config: &Settings) {
println!("reset");
// let mut routes = self.handle.route().get(RouteMessageBuilder::<Ipv4Addr>::new().protocol(ROUTE_PROTOCOL).build()).execute();
// while let Some(route) = routes.try_next().await.unwrap() {
// assert_eq!(route.header.protocol, ROUTE_PROTOCOL);
// if let Err(e) = self.handle.route().del(route).execute().await {
// panic!("Failed to delete route: {}", e);
// }
// }
// for router in routers.iter() {
// if connections.contains_key(&router.id) {
// self.handle
// .route()
// .add(
// RouteMessageBuilder::<Ipv4Addr>::new()
// .destination_prefix(router.address, 32)
// .gateway(Router::link_address(config.id, router.id))
// .protocol(ROUTE_PROTOCOL)
// .build(),
// )
// .execute()
// .await
// .unwrap();
// }
// }
pub async fn write(&self, via: &BTreeMap<u8, u8>, routers: &BTreeMap<u8, Router>, connections: &BTreeMap<u8, Connection>, config: &Settings) {
for (to_id, via_id) in via.iter() {
let to = routers[to_id];
let via = routers[via_id];
if connections.contains_key(via_id) {
self.handle
.route()
.add(
RouteMessageBuilder::<Ipv4Addr>::new()
.destination_prefix(to.data.address, 32)
.gateway(via.link_address.ip())
.protocol(ROUTE_PROTOCOL)
.build(),
)
.execute()
.await
.unwrap();
}
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment