Commit e6975c2c authored by nanamicat's avatar nanamicat

new

parent b655b289
Pipeline #42412 passed with stages
in 8 minutes and 11 seconds
......@@ -94,9 +94,9 @@ checksum = "5dd9dc738b7a8311c7ade152424974d8115f2cdad61e8dab8dac9f2362298510"
[[package]]
name = "bytes"
version = "1.4.0"
version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be"
checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3"
[[package]]
name = "cc"
......@@ -502,7 +502,7 @@ dependencies = [
"idna",
"ipnet",
"once_cell",
"rand",
"rand 0.9.2",
"ring",
"thiserror 2.0.17",
"tinyvec",
......@@ -524,7 +524,7 @@ dependencies = [
"moka",
"once_cell",
"parking_lot",
"rand",
"rand 0.9.2",
"resolv-conf",
"smallvec",
"thiserror 2.0.17",
......@@ -784,9 +784,9 @@ dependencies = [
[[package]]
name = "netlink-sys"
version = "0.8.6"
version = "0.8.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "416060d346fbaf1f23f9512963e3e878f1a78e707cb699ba9215761754244307"
checksum = "16c903aa70590cb93691bf97a767c8d1d6122d2cc9070433deb3bbf36ce8bd23"
dependencies = [
"bytes",
"futures",
......@@ -993,6 +993,8 @@ dependencies = [
"futures",
"hickory-resolver",
"netlink-packet-route",
"netlink-sys",
"rand 0.8.5",
"rtnetlink",
"serde",
"serde_derive",
......@@ -1000,14 +1002,35 @@ dependencies = [
"tokio",
]
[[package]]
name = "rand"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
"libc",
"rand_chacha 0.3.1",
"rand_core 0.6.4",
]
[[package]]
name = "rand"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1"
dependencies = [
"rand_chacha",
"rand_core",
"rand_chacha 0.9.0",
"rand_core 0.9.3",
]
[[package]]
name = "rand_chacha"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
dependencies = [
"ppv-lite86",
"rand_core 0.6.4",
]
[[package]]
......@@ -1017,7 +1040,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb"
dependencies = [
"ppv-lite86",
"rand_core",
"rand_core 0.9.3",
]
[[package]]
name = "rand_core"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
dependencies = [
"getrandom 0.2.16",
]
[[package]]
......
......@@ -18,3 +18,7 @@ hickory-resolver = "0.26.0-alpha.1"
rtnetlink = "0.19.0"
netlink-packet-route = "0.26.0"
futures = "0.3.31"
netlink-sys = { version = "0.8.7", features = ["tokio"] }
rand = "0.8"
use std::net::Ipv4Addr;
use serde::Deserialize;
#[derive(Deserialize)]
......@@ -5,7 +7,7 @@ use serde::Deserialize;
pub struct Router {
pub id: u8,
// pub name: String,
// pub address: String,
pub address: Ipv4Addr,
// pub location: String,
// pub user: String,
// pub host: String,
......
use crate::connection::Connection;
use crate::data::Router as RouterData;
use crate::protocol::{Change, Hello, Report};
use crate::protocol::{Downlink, Hello, MessageType, Uplink};
use crate::router::Router;
use crate::server::Server;
use crate::settings::{INTERVAL, Settings};
use crate::settings::{Settings, INTERVAL};
use config::Config;
use std::collections::BTreeMap;
use std::fs;
......@@ -36,15 +36,11 @@ async fn main() -> anyhow::Result<()> {
// .collect::<BTreeMap<u32, HashSet<u8>>>(),
);
let mut hello = Hello { time: 0 };
let socket = UdpSocket::bind(config.bind).await?;
let mut timer = time::interval(INTERVAL);
let mut buf = [0; 1500];
let mut syn: bool = true;
let resolver = Resolver::builder_tokio()?.build();
loop {
......@@ -57,17 +53,11 @@ async fn main() -> anyhow::Result<()> {
let (len, addr) = result?;
if addr == server_addr {
// from server
let (message, _): (Change, usize) = bincode::decode_from_slice(&buf[..len], bincode::config::standard())?;
server.on_message(&message, &routers_data, &connections[&config.id]).await;
syn = false;
let report = Report {
id: config.id,
syn,
ack: server.ack,
peers: Vec::new()
};
let message = bincode::encode_to_vec(&report, bincode::config::standard())?;
let _ = socket.send_to(message.as_slice(), server_addr).await;
let (downlink, _): (Downlink, usize) = bincode::decode_from_slice(&buf[..len], bincode::config::standard())?;
if let Some(uplink)= server.on_message(&downlink, &routers_data, &connections[&config.id], &config).await {
let len = bincode::encode_into_slice(uplink, &mut buf, bincode::config::standard())?;
let _ = socket.send_to(&buf[..len], addr).await;
}
} else if let Some(peer) = Router::get(&mut routers, addr){
// from client
let (message, _): (Hello, usize) = bincode::decode_from_slice(&buf[..len], bincode::config::standard())?;
......@@ -77,7 +67,7 @@ async fn main() -> anyhow::Result<()> {
_ = timer.tick() => {
// to clients
hello.time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)?.as_millis() as u32;
let hello = Hello { time: SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)?.as_millis() as u32 };
let len = bincode::encode_into_slice(&hello, &mut buf, bincode::config::standard())?;
for id in connections[&config.id].keys() {
let router = &routers[id];
......@@ -85,17 +75,19 @@ async fn main() -> anyhow::Result<()> {
}
// to server
let report = Report {
let uplink = Uplink {
id: config.id,
syn,
ack: server.ack,
action: if server.online {MessageType::Update} else {MessageType::Query},
version: server.version,
peers: connections
.iter()
.filter(|(_, to)| to.contains_key(&config.id))
.map(|(from,_)|routers.get_mut(from).unwrap().update(hello.time))
.collect(),
via: Default::default(),
plan: Default::default()
};
let len = bincode::encode_into_slice(&report, &mut buf, bincode::config::standard())?;
let len = bincode::encode_into_slice(&uplink, &mut buf, bincode::config::standard())?;
let _ = socket.send_to(&buf[..len], server_addr).await;
}
}
......
use bincode::{Decode, Encode};
use std::collections::BTreeMap;
use serde_derive::Serialize;
#[derive(Encode, Decode)]
pub struct Hello {
pub time: u32,
}
#[derive(Encode, Decode)]
pub struct Report {
#[derive(Encode, Decode, Default, Debug, Eq, PartialEq, Copy, Clone)]
pub enum MessageType {
#[default]
Query,
Full,
Update,
}
#[derive(Encode, Decode, Default, Debug, Clone)]
pub struct Uplink {
pub id: u8,
pub ack: u8,
pub syn: bool,
pub action: MessageType,
pub version: u32,
pub peers: Vec<PeerQuality>,
pub via: BTreeMap<u8, u8>,
pub plan: BTreeMap<u8, BTreeMap<u8, u8>>,
}
#[derive(Encode, Decode, Default, Debug, Clone)]
pub struct Downlink {
pub action: MessageType,
pub version: u32,
pub ack: u32,
pub via: BTreeMap<u8, u8>,
pub plan: BTreeMap<u8, BTreeMap<u8, u8>>,
}
#[derive(Encode, Decode, Copy, Clone)]
#[derive(Encode, Decode, Serialize, Copy, Clone, Debug)]
pub struct PeerQuality {
pub delay: i16,
pub reliability: u8,
pub jitter: u8,
}
#[derive(Encode, Decode)]
pub struct Change {
pub seq: u8,
pub rst: bool,
pub via: BTreeMap<u8, u8>,
pub plan: BTreeMap<u8, BTreeMap<u8, u8>>,
}
use crate::data::Router as RouterData;
use crate::protocol::{Hello, PeerQuality};
use crate::settings::{Settings, HISTORY, INTERVAL, TIMEOUT};
use crate::settings::{HISTORY, INTERVAL, Settings, TIMEOUT};
use average::Mean;
use std::collections::BTreeMap;
use std::net::{IpAddr, SocketAddr};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::time::SystemTime;
pub struct Router {
......@@ -15,8 +15,8 @@ pub struct Router {
}
impl Router {
pub fn link_address(from: u8, to: u8) -> IpAddr {
IpAddr::from([10, 200, to, from])
pub fn link_address(from: u8, to: u8) -> Ipv4Addr {
Ipv4Addr::from([10, 200, to, from])
}
pub fn get(routers: &mut BTreeMap<u8, Router>, link_address: SocketAddr) -> Option<&mut Router> {
match link_address {
......@@ -29,7 +29,7 @@ impl Router {
}
pub fn new(data: &RouterData, config: &Settings) -> Router {
Router {
link_address: SocketAddr::new(Router::link_address(config.id, data.id), config.bind.port()),
link_address: SocketAddr::new(IpAddr::V4(Router::link_address(config.id, data.id)), config.bind.port()),
quality: PeerQuality {
reliability: 0,
jitter: 0,
......
use crate::settings::ROUTE_PROTOCOL;
use crate::{connection::Connection, data, protocol::Change};
use futures::TryStreamExt;
use rtnetlink::RouteMessageBuilder;
use crate::protocol::{Downlink, MessageType, Uplink};
use crate::settings::Settings;
use crate::{connection::Connection, data};
use netlink_sys::{protocols::NETLINK_ROUTE, Socket, TokioSocket};
use std::collections::BTreeMap;
use std::net::Ipv4Addr;
use std::os::unix::io::{AsRawFd, FromRawFd};
pub struct Server {
pub(crate) ack: u8,
pub(crate) online: bool,
pub(crate) version: u32,
pub(crate) via: BTreeMap<u8, u8>,
pub(crate) plan: BTreeMap<u8, BTreeMap<u8, u8>>,
pub(crate) handle: rtnetlink::Handle,
}
impl Server {
pub fn new() -> Self {
let (connection, handle, _) = rtnetlink::new_connection().unwrap();
let socket = std::mem::ManuallyDrop::new(Socket::new(NETLINK_ROUTE).unwrap());
socket.set_netlink_get_strict_chk(true).unwrap();
let socket = unsafe { TokioSocket::from_raw_fd(socket.as_raw_fd()) };
let (connection, handle, _) = rtnetlink::from_socket(socket);
tokio::spawn(connection);
Server { ack: 0, handle }
Server {
online: false,
version: rand::random(),
via: Default::default(),
plan: Default::default(),
handle,
}
}
pub async fn on_message(
&mut self,
message: &Change,
message: &Downlink,
routers: &Vec<data::Router>,
connections: &BTreeMap<u8, Connection>,
// routers: &mut HashMap<u8, Router>,
// self_peer: &Hello,
) {
if message.rst {
self.ack = message.seq;
self.reset(routers, connections).await;
config: &Settings, // routers: &mut HashMap<u8, Router>,
// self_peer: &Hello,
) -> Option<Uplink> {
if message.ack != self.version {
return None;
}
self.version = message.version;
if self.ack != message.seq {
println!("server seq={}, local ack={}", message.seq, self.ack);
return;
match (self.online, message.action) {
(false, MessageType::Full) => {
self.reset(routers, connections, config).await;
self.online = true;
// TODO: apply via and plan
Some(Uplink {
id: config.id,
action: MessageType::Update,
version: self.version,
peers: Default::default(),
via: Default::default(),
plan: Default::default(),
})
}
(true, MessageType::Query) => Some(Uplink {
id: config.id,
action: MessageType::Full,
version: self.version,
peers: Default::default(),
via: self.via.clone(),
plan: self.plan.clone(),
}),
(true, MessageType::Update) => Some(Uplink {
id: config.id,
action: MessageType::Update,
version: self.version,
peers: Default::default(),
via: Default::default(),
plan: Default::default(),
// TODO: apply via and plan
}),
_ => None,
}
// for (to, via) in message.via.iter() {
// RouteWriter::set_via(*to, *via);
// }
// if let Some(plan) = message.plan.as_ref() {
// for (group_id, to) in message.plan.iter() {
// RouteWriter::set_plan(&GatewayGroup::all()[*group_id], *to);
// }
// }
// RouteWriter::commit();
self.ack += 1;
// self.update(socket, routers, self_peer);
}
// pub fn update(&mut self, socket: &UdpSocket, routers: &mut HashMap<u8, Router>, hello: &Hello) {
// }
pub async fn reset(&self, routers: &Vec<data::Router>, connections: &BTreeMap<u8, Connection>) {
let mut routes = self.handle.route().get(RouteMessageBuilder::<Ipv4Addr>::new().protocol(ROUTE_PROTOCOL).build()).execute();
while let Some(route) = routes.try_next().await.unwrap() {
self.handle.route().del(route).execute().await.unwrap();
}
pub async fn reset(&self, routers: &Vec<data::Router>, connections: &BTreeMap<u8, Connection>, config: &Settings) {
println!("reset");
// let mut routes = self.handle.route().get(RouteMessageBuilder::<Ipv4Addr>::new().protocol(ROUTE_PROTOCOL).build()).execute();
// while let Some(route) = routes.try_next().await.unwrap() {
// assert_eq!(route.header.protocol, ROUTE_PROTOCOL);
// if let Err(e) = self.handle.route().del(route).execute().await {
// panic!("Failed to delete route: {}", e);
// }
// }
// for router in routers.iter() {
// if()
// if connections.contains_key(&router.id) {
// self.handle
// .route()
// .add(
// RouteMessageBuilder::<Ipv4Addr>::new()
// .destination_prefix(router.address, 32)
// .gateway(Router::link_address(config.id, router.id))
// .protocol(ROUTE_PROTOCOL)
// .build(),
// )
// .execute()
// .await
// .unwrap();
// }
// }
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment