Commit dfdaf2c1 authored by nanamicat's avatar nanamicat

it works

parent de4c1f95
Pipeline #42519 passed with stages
in 3 minutes and 23 seconds
...@@ -751,8 +751,7 @@ dependencies = [ ...@@ -751,8 +751,7 @@ dependencies = [
[[package]] [[package]]
name = "netlink-packet-route" name = "netlink-packet-route"
version = "0.28.0" version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "git+https://github.com/nanamicat/netlink-packet-route.git?rev=dfc76d7#dfc76d732b9924c5efcacb8e29ec4d4b85becd7b"
checksum = "4ce3636fa715e988114552619582b530481fd5ef176a1e5c1bf024077c2c9445"
dependencies = [ dependencies = [
"bitflags 2.10.0", "bitflags 2.10.0",
"libc", "libc",
...@@ -980,6 +979,7 @@ dependencies = [ ...@@ -980,6 +979,7 @@ dependencies = [
"anyhow", "anyhow",
"bincode", "bincode",
"config", "config",
"futures",
"hickory-resolver", "hickory-resolver",
"ipnet", "ipnet",
"itertools", "itertools",
...@@ -1071,8 +1071,7 @@ dependencies = [ ...@@ -1071,8 +1071,7 @@ dependencies = [
[[package]] [[package]]
name = "rtnetlink" name = "rtnetlink"
version = "0.20.0" version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "git+https://github.com/nanamicat/rtnetlink.git#54b3d29cad82e94095afb6a7b8df3f8ca741eba6"
checksum = "4b960d5d873a75b5be9761b1e73b146f52dddcd27bac75263f40fba686d4d7b5"
dependencies = [ dependencies = [
"futures-channel", "futures-channel",
"futures-util", "futures-util",
......
...@@ -23,3 +23,7 @@ saturating_cast = "0.1.0" ...@@ -23,3 +23,7 @@ saturating_cast = "0.1.0"
ipnet = { version = "2.11.0", features = ["serde"] } ipnet = { version = "2.11.0", features = ["serde"] }
itertools = "0.14.0" itertools = "0.14.0"
string-interner = "0.19.0" string-interner = "0.19.0"
futures = "0.3"
[patch.crates-io]
rtnetlink = { git = "https://github.com/nanamicat/rtnetlink.git" }
netlink-packet-route = { git = "https://github.com/nanamicat/netlink-packet-route.git", rev = "dfc76d7" }
use rtnetlink::NexthopMessageBuilder;
use std::net::Ipv4Addr;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (connection, handle, _) = rtnetlink::new_connection()?;
tokio::spawn(connection);
let id = 100;
let gateway = Ipv4Addr::new(10, 200, 1, 3);
println!("Adding nexthop id {} via {}", id, gateway);
let msg = NexthopMessageBuilder::<Ipv4Addr>::new().id(id).gateway(gateway).oif(3).build();
match handle.nexthop().add(msg).replace().execute().await {
Ok(_) => println!("Successfully added nexthop"),
Err(e) => eprintln!("Failed to add nexthop: {}", e),
}
Ok(())
}
...@@ -8,7 +8,7 @@ use crate::{ ...@@ -8,7 +8,7 @@ use crate::{
server::Server, server::Server,
settings::{CONFIG, INTERVAL, WINDOW}, settings::{CONFIG, INTERVAL, WINDOW},
shared::{ shared::{
data::{self, GatewayGroupID}, data::{self},
protocol::{Hello, MessageType, Uplink}, protocol::{Hello, MessageType, Uplink},
}, },
}; };
...@@ -52,7 +52,7 @@ async fn main() -> anyhow::Result<()> { ...@@ -52,7 +52,7 @@ async fn main() -> anyhow::Result<()> {
peer.on_message(&hello); peer.on_message(&hello);
} else if addr.port() == CONFIG.server.port } else if addr.port() == CONFIG.server.port
&& let Ok((downlink, _)) = bincode::decode_from_slice(&buf[..len], bincode::config::standard()) && let Ok((downlink, _)) = bincode::decode_from_slice(&buf[..len], bincode::config::standard())
&& let Some(uplink) = server.on_message(downlink, &routers).await && let Some(uplink) = server.on_message(downlink).await
{ {
let len = bincode::encode_into_slice(uplink, &mut buf, bincode::config::standard())?; let len = bincode::encode_into_slice(uplink, &mut buf, bincode::config::standard())?;
let _ = socket.send_to(&buf[..len], addr).await; let _ = socket.send_to(&buf[..len], addr).await;
......
use crate::{ use crate::{
settings::{CONFIG, INTERVAL, Settings, WINDOW}, settings::{CONFIG, INTERVAL, WINDOW},
shared::{ shared::{
data::{self, DATABASE, RouterID}, data::{self, RouterID},
protocol::{Hello, PeerQuality}, protocol::{Hello, PeerQuality},
}, },
}; };
use ipnet::Ipv4Net;
use saturating_cast::SaturatingCast; use saturating_cast::SaturatingCast;
use std::{ use std::{
collections::BTreeMap, collections::BTreeMap,
...@@ -22,7 +21,6 @@ pub struct Router { ...@@ -22,7 +21,6 @@ pub struct Router {
receive: u64, receive: u64,
remote_time: u32, remote_time: u32,
local_time: Instant, local_time: Instant,
pub addresses: Vec<Ipv4Net>,
} }
impl Router { impl Router {
...@@ -44,9 +42,6 @@ impl Router { ...@@ -44,9 +42,6 @@ impl Router {
prev_delay: 0, prev_delay: 0,
delay: 0, delay: 0,
local_time: Instant::now(), local_time: Instant::now(),
addresses: std::iter::once(Ipv4Net::from(data.address))
.chain(DATABASE.subnets.iter().filter(|s| s.router == data.id).map(|s| s.subnet))
.collect(),
} }
} }
......
...@@ -7,6 +7,10 @@ use crate::{ ...@@ -7,6 +7,10 @@ use crate::{
protocol::{Downlink, MessageType, Uplink}, protocol::{Downlink, MessageType, Uplink},
}, },
}; };
use futures::stream::StreamExt;
use netlink_packet_route::route::{RouteHeader, RouteProtocol, RouteType};
use netlink_sys::AsyncSocket;
use rtnetlink::RouteMessageBuilder; use rtnetlink::RouteMessageBuilder;
use std::{collections::BTreeMap, net::Ipv4Addr}; use std::{collections::BTreeMap, net::Ipv4Addr};
...@@ -20,7 +24,8 @@ pub struct Server { ...@@ -20,7 +24,8 @@ pub struct Server {
impl Server { impl Server {
pub fn new() -> Self { pub fn new() -> Self {
let (connection, handle, _) = rtnetlink::new_connection().unwrap(); let (mut connection, handle, _) = rtnetlink::new_connection().unwrap();
connection.socket_mut().socket_ref().set_netlink_get_strict_chk(true).unwrap();
tokio::spawn(connection); tokio::spawn(connection);
let id = CONFIG.id; let id = CONFIG.id;
Server { Server {
...@@ -32,11 +37,7 @@ impl Server { ...@@ -32,11 +37,7 @@ impl Server {
} }
} }
fn guess(id: RouterID, gw: &data::Gateway, region: usize) -> i32 { pub async fn on_message(&mut self, mut message: Downlink) -> Option<Uplink> {
gw.metrics[region].saturating_add(gw.cost_outbound).saturating_add(if gw.router == id { 0 } else { 100 })
}
pub async fn on_message(&mut self, mut message: Downlink, routers: &BTreeMap<RouterID, Router>) -> Option<Uplink> {
if message.ack != self.version { if message.ack != self.version {
return None; return None;
} }
...@@ -48,7 +49,9 @@ impl Server { ...@@ -48,7 +49,9 @@ impl Server {
for (to, via) in self.via.iter_mut() { for (to, via) in self.via.iter_mut() {
*via = *to; *via = *to;
} }
self.apply(&mut message.via, &mut message.plan, routers).await; data::GatewayGroup::apply(&mut self.via, &mut message.via, &mut self.plan, &mut message.plan);
self.write_nexthop().await;
self.write_route().await;
Some(Uplink { Some(Uplink {
id: CONFIG.id, id: CONFIG.id,
action: MessageType::Update, action: MessageType::Update,
...@@ -67,7 +70,8 @@ impl Server { ...@@ -67,7 +70,8 @@ impl Server {
plan: self.plan.clone(), plan: self.plan.clone(),
}), }),
(true, MessageType::Update) => { (true, MessageType::Update) => {
self.apply(&mut message.via, &mut message.plan, routers).await; data::GatewayGroup::apply(&mut self.via, &mut message.via, &mut self.plan, &mut message.plan);
self.write_nexthop().await;
Some(Uplink { Some(Uplink {
id: CONFIG.id, id: CONFIG.id,
action: MessageType::Update, action: MessageType::Update,
...@@ -80,52 +84,86 @@ impl Server { ...@@ -80,52 +84,86 @@ impl Server {
_ => None, _ => None,
} }
} }
pub async fn apply(&mut self, via: &mut BTreeMap<RouterID, RouterID>, plan: &mut BTreeMap<RegionID, BTreeMap<GatewayGroupID, GatewayID>>, routers: &BTreeMap<RouterID, Router>) {
self.via.append(via); pub async fn write_nexthop(&self) {
for (region, mut plan) in std::mem::take(plan) { for (to_id, via_id) in self.via.iter() {
self.plan.entry(region).or_default().append(&mut plan); let builder = rtnetlink::NexthopMessageBuilder::<std::net::Ipv4Addr>::new().id(to_id.0 as u32);
let msg = if DATABASE.connections[&CONFIG.id].contains_key(via_id) {
let gateway_ip = Router::link_address(CONFIG.id, *via_id);
let route_msg = RouteMessageBuilder::<std::net::Ipv4Addr>::new()
.destination_prefix(gateway_ip, 32)
// 下面这几个默认被设置了值,get 时必须清除掉
.table_id(RouteHeader::RT_TABLE_UNSPEC as u32)
.protocol(RouteProtocol::Unspec)
.kind(RouteType::Unspec)
.build();
match self.handle.route().get(route_msg).execute().next().await.unwrap() {
Ok(route) => {
let oif = route.attributes.into_iter().find_map(|nla| match nla {
netlink_packet_route::route::RouteAttribute::Oif(o) => Some(o),
_ => None,
});
builder.gateway(gateway_ip).oif(oif.unwrap()).build()
}
Err(e) => {
panic!("Error getting route: {}", e);
}
}
} else {
builder.blackhole().build()
};
tracing::info!("{:?}", msg);
if let Err(e) = self.handle.nexthop().add(msg).replace().execute().await {
eprintln!("Error adding nexthop: {}", e);
}
} }
self.write(&self.via, &self.plan, routers).await;
} }
pub async fn write(&self, via: &BTreeMap<RouterID, RouterID>, plan: &BTreeMap<RegionID, BTreeMap<GatewayGroupID, GatewayID>>, routers: &BTreeMap<RouterID, Router>) {
for (to_id, via_id) in via.iter() { pub async fn write_route(&self) {
let to = &routers[to_id]; for router in DATABASE.routers.iter().filter(|router| router.id != CONFIG.id) {
for address in to.addresses.iter() { for (destination, prefix) in std::iter::once((router.address, 32)).chain(DATABASE.subnets.iter().filter(|s| s.router == router.id).map(|s| (s.subnet.addr(), s.subnet.prefix_len()))) {
let builder = RouteMessageBuilder::<Ipv4Addr>::new().destination_prefix(address.addr(), address.prefix_len()).protocol(ROUTE_PROTOCOL); let msg = RouteMessageBuilder::<Ipv4Addr>::new()
let msg = if DATABASE.connections.contains_key(via_id) { .destination_prefix(destination, prefix)
builder.gateway(Router::link_address(CONFIG.id, *via_id)).build() .protocol(ROUTE_PROTOCOL)
} else { .nexthop_id(router.id.0 as u32)
builder.kind(netlink_packet_route::route::RouteType::Unreachable).build() .build();
};
tracing::info!("{:?}", msg); tracing::info!("{:?}", msg);
if let Err(e) = self.handle.route().add(msg).replace().execute().await { if let Err(e) = self.handle.route().add(msg).replace().execute().await {
eprintln!("{}", e); eprintln!("{}", e);
} }
} }
let msg = RouteMessageBuilder::<Ipv4Addr>::new()
.destination_prefix(Ipv4Addr::UNSPECIFIED, 0)
.table_id(router.dest_mark as u32)
.protocol(ROUTE_PROTOCOL)
.nexthop_id(router.id.0 as u32)
.build();
tracing::info!("{:?}", msg);
if let Err(e) = self.handle.route().add(msg).replace().execute().await {
eprintln!("{}", e);
}
} }
// if let Some(global) = plan.get(&RegionID(0)) {
// for (group_id, &gateway_id) in global.iter() {
// if GATEWAYGROUPINDEX[group_id].iter().any(|g| g.router == CONFIG.id) {
// continue;
// }
// let group = DATABASE.gateway_groups.iter().find(|g| g.id == *group_id).unwrap();
// let gateway = GATEWAYGROUPINDEX[group_id].iter().find(|g| g.id == gateway_id).unwrap();
// if let Some(&via_id) = self.via.get(&gateway.router) { for (group_id, &gateway_id) in self.plan[&RegionID(0)].iter() {
// let gateway_ip = Router::link_address(CONFIG.id, via_id); // 自己在的组强制直连
// let msg = RouteMessageBuilder::<Ipv4Addr>::new() if GATEWAYGROUPINDEX[group_id].iter().any(|g| g.router == CONFIG.id) {
// .destination_prefix(Ipv4Addr::UNSPECIFIED, 0) continue;
// .table_id(group.dest_mark as u32) }
// .protocol(ROUTE_PROTOCOL) let group = DATABASE.gateway_groups.iter().find(|g| g.id == *group_id).unwrap();
// .gateway(gateway_ip) let gateway = GATEWAYGROUPINDEX[group_id].iter().find(|g| g.id == gateway_id).unwrap();
// .build();
let msg = RouteMessageBuilder::<Ipv4Addr>::new()
.destination_prefix(Ipv4Addr::UNSPECIFIED, 0)
.table_id(group.dest_mark as u32)
.protocol(ROUTE_PROTOCOL)
.nexthop_id(gateway.router.0 as u32)
.build();
// tracing::info!("Default route: {:?}", msg); tracing::info!("Default route: {:?}", msg);
// if let Err(e) = self.handle.route().add(msg).replace().execute().await { if let Err(e) = self.handle.route().add(msg).replace().execute().await {
// eprintln!("Error adding default route: {}", e); eprintln!("Error adding default route: {}", e);
// } }
// } }
// }
// }
} }
} }
...@@ -19,6 +19,7 @@ pub struct Router { ...@@ -19,6 +19,7 @@ pub struct Router {
pub name: String, pub name: String,
pub location: String, pub location: String,
pub address: Ipv4Addr, pub address: Ipv4Addr,
pub dest_mark: u16,
} }
#[derive(Serialize, Deserialize, Clone, Debug)] #[derive(Serialize, Deserialize, Clone, Debug)]
......
...@@ -47,6 +47,20 @@ impl GatewayGroup { ...@@ -47,6 +47,20 @@ impl GatewayGroup {
.collect() .collect()
} }
pub fn apply(
self_via: &mut BTreeMap<RouterID, RouterID>,
new_via: &mut BTreeMap<RouterID, RouterID>,
self_plan: &mut BTreeMap<RegionID, BTreeMap<GatewayGroupID, GatewayID>>,
new_plan: &mut BTreeMap<RegionID, BTreeMap<GatewayGroupID, GatewayID>>,
) {
self_via.append(new_via);
for (region, inner) in new_plan {
if let Some(p) = self_plan.get_mut(region) {
p.append(inner);
}
}
}
fn guess(id: RouterID, gw: &data::Gateway, region: usize) -> i32 { fn guess(id: RouterID, gw: &data::Gateway, region: usize) -> i32 {
gw.metrics[region].saturating_add(gw.cost_outbound).saturating_add(if gw.router == id { 0 } else { 100 }) gw.metrics[region].saturating_add(gw.cost_outbound).saturating_add(if gw.router == id { 0 } else { 100 })
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment