Commit 2bea4472 authored by nanamicat's avatar nanamicat

plan

parent 7e86e199
// import { GatewayGroup as GatewayGroupData } from '../import/scripts/GatewayGroup';
// import _ from 'lodash';
// import routers from '../import/data/Router.json';
// import gatewayGroups from '../import/data/GatewayGroup.json';
//
// export interface GatewayGroup extends GatewayGroupData {}
//
// export class GatewayGroup {
// static all: Record<number, GatewayGroup> = _.keyBy(
// gatewayGroups.map((g) => new this(g)),
// 'id'
// );
// public routers: number[];
//
// constructor(public data: GatewayGroupData) {
// Object.assign(this, data);
// this.routers = GatewayGroup.groupRouters(data);
// }
//
// static groupRouters(g: GatewayGroupData): number[] {
// return _.uniq(
// g.locationPrefix
// .flatMap((p) => routers.filter((r) => r.location.startsWith(p)))
// .concat(routers.filter((r) => g.includeRouters.includes(r.name)))
// .filter((r) => !g.excludeRouters.includes(r.name))
// .map((r) => r.id)
// .concat(gatewayGroups.filter((g1) => g.children.includes(g1.name)).flatMap((c) => this.groupRouters(c)))
// );
// }
// }
// import { RemoteInfo, Socket } from 'dgram';
// import { Change, PeerQuality, Report } from './protocol.rs';
// import routers from '../import/data/Router.json';
// import assert from 'assert';
// import { Quality } from './Quality';
// import _ from 'lodash';
// import config from '../config/config.json';
//
// import _connections from '../import/connections.json';
// import { GatewayGroup } from './GatewayGroup';
//
// const connections: Record<number, Record<number, { metric: number; protocol: string }>> = _connections;
//
// export class Router {
// static all: Router[] = routers.map((s) => new Router(s.id));
// static updating?: {
// router: Router;
// message: Change;
// };
//
// seq = 0;
// peers: Record<number, PeerQuality> = {};
// via: Map<Router, Router> = new Map();
// plan: Record<number, number> = {};
//
// time: number = 0;
// rinfo?: RemoteInfo;
//
// constructor(public id: number) {}
//
// static update(socket: Socket) {
// for (const from of Router.all) from.update(socket);
// }
//
// reset() {
// this.seq = 0;
// this.peers = {};
// for (const router of Router.all.filter((r) => r.id !== this.id)) {
// this.via.set(router, router);
// }
// if (Router.updating?.router == this) Router.updating = undefined;
// for (const plan of Object.values(GatewayGroup.all).filter((group) => !group.routers.includes(this.id))) {
// this.plan[plan.id] = this.id;
// }
// }
//
// onMessage(socket: Socket, data: Report) {
// // console.log(`recv: ${this.id} ${JSON.stringify(data)} this.seq:${this.seq}`);
// if (data.ack == this.seq + 1) {
// this.time = Date.now();
// if (data.peers) this.peers = data.peers;
// if (Router.updating?.router === this) {
// Router.updating = undefined;
// Router.update(socket);
// }
// } else if (data.ack === 0) {
// // 客户端重启
// this.reset();
// this.time = Date.now();
// this.send(socket, { seq: this.seq, via: {}, plan: {} });
// } else if (this.seq == 0) {
// // 服务器重启或客户端下线
// this.time = Date.now();
// this.send(socket, { seq: this.seq, via: {}, plan: {} });
// } else {
// console.log(`ignoring packet from ${data.id}, packet ack=${data.ack}, server seq=${this.seq}`);
// }
// }
//
// update(socket: Socket) {
// if (!this.rinfo) return;
// const timeout = Router.updating?.router == this ? config.timeout : config.timeout2;
// if (Date.now() - this.time > timeout * config.interval) {
// console.log(`router ${this.id} lost connection.`);
// this.rinfo = undefined;
// this.reset();
// return;
// }
// if (Router.updating) {
// if (Router.updating.router == this) this.send(socket, Router.updating.message);
// return;
// }
//
// const changedVia: Record<number, number> = {};
// const metric: Record<number, number> = {};
// for (const to of Router.all.filter((r) => r.id !== this.id)) {
// // 计算最优下一跳
//
// const items: [Router, number][] = Router.all.filter((r) => r.id !== this.id).map((r) => [r, this.route_quality(to, r).metric()]);
// const [currentRoute, currentMetric] = items.find(([v, m]) => v === this.via.get(to))!;
// const [bestRoute, bestMetric] = _.minBy(items, ([v, m]) => m)!;
//
// // 变更
// if (currentRoute !== bestRoute && bestMetric + config.throttle < currentMetric) {
// changedVia[to.id] = bestRoute.id;
// this.via.set(to, bestRoute);
// metric[to.id] = bestMetric;
// // console.log(`change: this ${this.id} to ${to.id} current ${this.via.get(to)!.id} quality ${JSON.stringify(this.route_quality(to, this.via.get(to)!))} metric ${this.route_quality(to, this.via.get(to)!).metric()} best ${best_route.id} quality ${JSON.stringify(this.route_quality(to, best_route))} metric ${this.route_quality(to, best_route).metric()}`);
// } else {
// metric[to.id] = currentMetric;
// }
// }
//
// // 计算 route plan
// // 凡是自己可以作为那个 plan 出口的,是不会计算直接跳过的,所以这里有 plan 到自己的意思是,没有找到任何一个通的可以出的地方,所以只好从自己出了
// const changedPlan: Record<number, number> = {};
// for (const group of Object.values(GatewayGroup.all).filter((group) => !group.routers.includes(this.id))) {
// const currentPlan = this.plan[group.id];
// const currentMetric = currentPlan === this.id ? Infinity : metric[currentPlan];
// const items = group.routers.map((toId) => [toId, metric[toId]]);
// const [bestPlan, bestMetric] = _.minBy(items, ([_, m]) => m)!;
//
// if (currentPlan !== this.id && bestMetric === Infinity) {
// // 原来通的,现在不通了
// this.plan[group.id] = changedPlan[group.id] = this.id;
// } else if (currentPlan !== bestPlan && bestMetric + config.throttle < currentMetric) {
// this.plan[group.id] = changedPlan[group.id] = bestPlan;
// }
// }
//
// if (!_.isEmpty(changedVia) || !_.isEmpty(changedPlan)) {
// this.seq++;
// const message: Change = { seq: this.seq, via: changedVia, plan: changedPlan };
// Router.updating = { router: this, message };
// this.send(socket, message);
// }
// }
//
// send(socket: Socket, message: Change) {
// console.log(`send: ${this.id} ${JSON.stringify(message)}`);
// return socket.send(JSON.stringify(message), this.rinfo!.port, this.rinfo!.address);
// }
//
// route_quality(to: Router, via: Router = this.via.get(to)!) {
// assert(via !== undefined);
// assert(this !== via);
// assert(this !== to);
//
// const result = new Quality();
//
// let current: Router = this;
// let next = via;
// const route = [current, next];
//
// while (true) {
// const quality = next.peers[current.id];
// if (!quality || quality.reliability <= 0) return Quality.unreachable; // 不通
// result.concat(quality.delay, quality.jitter, quality.reliability, connections[current.id][next.id].metric);
// if (next === to) return result; // 到达
//
// // 寻找下一跳
// current = next;
// next = current.via.get(to)!;
// assert(next); //to server_id 型路由,由于 server 两两相连,下一跳一定是存在的,至少能直达
// if (route.includes(next)) {
// // 环路
// return Quality.unreachable;
// } else {
// route.push(next);
// }
// }
// }
// }
//
// for (const router of Router.all) router.reset();
......@@ -4,17 +4,17 @@ use std::collections::BTreeMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use crate::data::{ConnectionData, RouterData};
use crate::data::{self, RouterID};
use crate::router::Router;
use tower_http::cors::CorsLayer;
#[derive(Serialize, Clone)]
pub struct Info {
pub routers: Arc<Vec<RouterData>>,
pub connections: Arc<BTreeMap<u8, BTreeMap<u8, ConnectionData>>>,
pub routers: Vec<data::Router>,
pub connections: BTreeMap<RouterID, BTreeMap<RouterID, data::Connection>>,
}
pub fn create_app(routers: Arc<Vec<RouterData>>, connections: Arc<BTreeMap<u8, BTreeMap<u8, ConnectionData>>>, routers2: Arc<RwLock<BTreeMap<u8, Router>>>) -> axum::Router {
pub fn create_app(routers: Vec<data::Router>, connections: BTreeMap<RouterID, BTreeMap<RouterID, data::Connection>>, routers2: Arc<RwLock<BTreeMap<RouterID, Router>>>) -> axum::Router {
axum::Router::new()
.route("/info", get(|| async move { Json(Info { routers, connections }) }))
.route("/metrics", get(|| async move { Json(routers2.read().await.clone()) }))
......
use bincode::{Decode, Encode};
use serde::{Deserialize, Serialize};
use std::fmt;
#[derive(Serialize, Deserialize, Encode, Decode, Clone, Copy, Default, Ord, PartialOrd, Eq, PartialEq, Debug)]
pub struct RouterID(pub u8);
#[derive(Serialize, Deserialize, Encode, Decode, Clone, Copy, Default, Ord, PartialOrd, Eq, PartialEq, Debug)]
pub struct GatewayID(pub u8);
// 为了节约流量,GatewayGroupID 在网络上使用 u8 格式,比表里配的值少 20000
#[derive(Serialize, Deserialize, Encode, Decode, Clone, Copy, Default, Ord, PartialOrd, Eq, PartialEq, Debug)]
#[serde(from = "u16", into = "u16")]
pub struct GatewayGroupID(pub u8);
impl From<GatewayGroupID> for u16 {
fn from(val: GatewayGroupID) -> Self {
val.0 as u16 + 20000
}
}
impl From<u16> for GatewayGroupID {
fn from(value: u16) -> Self {
Self((value - 20000) as u8)
}
}
#[derive(Serialize, Deserialize, Encode, Decode, Clone, Copy, Default, Ord, PartialOrd, Eq, PartialEq, Debug)]
pub struct RegionID(pub u8);
impl fmt::Display for RegionID {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Serialize, Deserialize, Clone)]
pub struct RouterData {
pub id: u8,
pub struct Router {
pub id: RouterID,
pub name: String,
pub location: String,
// pub host: String,
......@@ -10,9 +42,37 @@ pub struct RouterData {
// pub ssh_port: u16,
// pub user: String,
}
#[derive(Serialize, Deserialize, Clone)]
pub struct Gateway {
pub id: GatewayID,
pub router: String,
pub r#type: GatewayType,
pub cost_outbound: i32,
pub metrics: Vec<i32>,
}
#[derive(Serialize, Deserialize, PartialEq, Clone, Copy)]
#[serde(rename_all = "lowercase")]
pub enum GatewayType {
Common,
VPC,
Virtual,
}
#[derive(Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct GatewayGroup {
pub id: GatewayGroupID,
pub name: String,
pub location_prefix: Vec<String>,
pub include_routers: Vec<String>,
pub exclude_routers: Vec<String>,
pub children: Vec<String>,
pub desk_mark: u16,
}
#[derive(Serialize, Deserialize, Clone)]
pub struct ConnectionData {
pub struct Connection {
pub protocol: Schema,
pub metric: u32,
}
......@@ -24,3 +84,6 @@ pub enum Schema {
UDP,
TCP,
}
#[derive(Serialize, Deserialize, PartialEq, Clone)]
pub struct Region {}
// use crate::router::RouterData;
// use serde::{Deserialize, Serialize}; // We will define this in router.rs
//
// #[derive(Debug, Clone, Serialize, Deserialize)]
// pub struct GatewayGroup {
// pub id: u8,
// pub name: String,
// pub location_prefix: Vec<String>, // "locationPrefix" in JSON
// pub include_routers: Vec<String>, // "includeRouters"
// pub exclude_routers: Vec<String>, // "excludeRouters"
// pub children: Vec<String>,
// #[serde(skip)]
// pub routers: Vec<u8>, // Calculated field
// }
//
// impl GatewayGroup {
// pub fn new(
// data: GatewayGroup,
// all_groups: &[GatewayGroup],
// all_routers: &[RouterData],
// ) -> Self {
// let mut group = data;
// group.routers = Self::group_routers(&group, all_groups, all_routers);
// group
// }
//
// pub fn group_routers(
// group: &GatewayGroup,
// all_groups: &[GatewayGroup],
// all_routers: &[RouterData],
// ) -> Vec<u8> {
// let mut router_ids = Vec::new();
//
// // 1. Filter by location prefix
// for router in all_routers {
// for prefix in &group.location_prefix {
// if router.location.starts_with(prefix) {
// router_ids.push(router.id);
// }
// }
// }
//
// // 2. Include routers
// for router in all_routers {
// if group.include_routers.contains(&router.name) {
// router_ids.push(router.id);
// }
// }
//
// // 3. Exclude routers
// router_ids.retain(|id| {
// let name = all_routers.iter().find(|r| r.id == *id).map(|r| &r.name);
// if let Some(name) = name {
// !group.exclude_routers.contains(name)
// } else {
// true
// }
// });
//
// // 4. Recursive children
// for child_name in &group.children {
// if let Some(child_group) = all_groups.iter().find(|g| g.name == *child_name) {
// let child_routers = Self::group_routers(child_group, all_groups, all_routers);
// router_ids.extend(child_routers);
// }
// }
//
// router_ids.sort_unstable();
// router_ids.dedup();
// router_ids
// }
// }
use crate::api::create_app;
use crate::data::RouterData;
use crate::data::{GatewayGroupID, GatewayID, GatewayType, RegionID, RouterID};
use crate::protocol::{Downlink, Uplink};
use crate::router::Router;
use crate::settings::{Settings, TIMEOUT};
use anyhow::{Context, Result};
use ::config::Config;
use anyhow::{Context, Result};
use config::Environment;
use net::SocketAddr;
use std::collections::BTreeMap;
use std::collections::{BTreeMap, BTreeSet};
use std::net;
use std::sync::Arc;
use tokio::net::UdpSocket;
......@@ -16,7 +16,6 @@ use tokio::time::Instant;
mod api;
mod data;
mod gateway_group;
mod protocol;
mod quality;
mod router;
......@@ -24,26 +23,52 @@ mod settings;
#[derive(Default, Debug)]
pub struct UpdatingState {
router_id: u8,
router_id: RouterID,
message: Downlink,
}
fn search_gateway_group(data: &data::GatewayGroup, routers_data: &Vec<data::Router>, gateways_groups_data: &Vec<data::GatewayGroup>) -> BTreeSet<String> {
let mut routers: BTreeSet<String> = data
.children
.iter()
.flat_map(|c| gateways_groups_data.iter().find(|g| &g.name == c))
.flat_map(|g| search_gateway_group(g, routers_data, gateways_groups_data))
.chain(routers_data.iter().filter(|r| data.location_prefix.iter().any(|p| r.location.starts_with(p))).map(|r| r.name.clone()))
.chain(data.include_routers.iter().cloned())
.collect();
data.exclude_routers.iter().for_each(|r| {
routers.remove(r);
});
routers
}
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt::init();
let config: Settings = Config::builder().add_source(Environment::default()).build()?.try_deserialize()?;
let routers_data: Arc<Vec<RouterData>> = Arc::new(serde_json::from_str(&std::fs::read_to_string("import/data/Router.json")?)?);
let connections = Arc::new(serde_json::from_str(&std::fs::read_to_string("import/connections.json")?)?);
let routers_data: Vec<data::Router> = serde_json::from_str(&std::fs::read_to_string("import/data/Router.json")?)?;
let gateways_data: Vec<data::Gateway> = serde_json::from_str(&std::fs::read_to_string("import/data/Gateway.json")?)?;
let gateways_groups_data: Vec<data::GatewayGroup> = serde_json::from_str(&std::fs::read_to_string("import/data/GatewayGroup.json")?)?;
let regions_data: Vec<data::Region> = serde_json::from_str(&std::fs::read_to_string("import/data/Region.json")?)?;
let connections_data: BTreeMap<RouterID, BTreeMap<RouterID, data::Connection>> = serde_json::from_str(&std::fs::read_to_string("import/connections.json")?)?;
// let routers_map: BTreeMap<u8, RouterData> = routers_data.iter().map(|r| (r.id, r.clone())).collect();
// let routers_map = Arc::new(routers_map);
let gateways_group: BTreeMap<GatewayGroupID, Vec<&data::Gateway>> = gateways_groups_data
.iter()
.map(|g| {
let routers = search_gateway_group(g, &routers_data, &gateways_groups_data);
(g.id, gateways_data.iter().filter(|gw| gw.r#type != GatewayType::VPC && routers.contains(&gw.router)).collect())
})
.collect();
let gateway_routers: BTreeMap<GatewayID, RouterID> = gateways_data.iter().map(|gw| (gw.id, routers_data.iter().find(|r| r.name == gw.router).unwrap().id)).collect();
let regions: Vec<RegionID> = regions_data.iter().enumerate().map(|(i, _)| RegionID(i as u8)).collect();
let routers: BTreeMap<u8, Router> = routers_data.iter().map(|c| (c.id, Router::new(c, &routers_data, &connections))).collect();
let routers: BTreeMap<RouterID, Router> = routers_data
.iter()
.map(|c| (c.id, Router::new(c, &routers_data, &connections_data, &regions, &gateways_group, &gateway_routers)))
.collect();
let routers = Arc::new(RwLock::new(routers));
// let gateway_groups_data: Vec<GatewayGroup> = serde_json::from_str(&std::fs::read_to_string(
// "../import/data/GatewayGroup.json",
// )?)?;
// 3. Initialize State
// let all_router_ids: Vec<u8> = routers_data.iter().map(|r| r.id).collect();
// let mut gateway_groups_map: BTreeMap<u8, GatewayGroup> = BTreeMap::new();
......@@ -54,7 +79,7 @@ async fn main() -> Result<()> {
// }
let listener = tokio::net::TcpListener::bind(config.http_bind).await?;
let app = create_app(routers_data.clone(), connections.clone(), routers.clone());
let app = create_app(routers_data.clone(), connections_data.clone(), routers.clone());
tokio::spawn(async move {
println!("HTTP listening on {}", &listener.local_addr().unwrap());
......@@ -79,26 +104,26 @@ async fn main() -> Result<()> {
router.offline();
}
}
if updating.router_id != 0 && !routers.get(&updating.router_id).context("router not found")?.is_online() {
updating.router_id = 0;
if updating.router_id != Default::default() && !routers.get(&updating.router_id).context("router not found")?.is_online() {
updating.router_id = Default::default();
}
tracing::debug!("recv {:?}", uplink);
// 处理收到的消息
if let Some(router) = routers.get_mut(&uplink.id)
&& let Some(downlink) = router.on_message(&mut uplink, addr, &mut updating, now)
{
tracing::debug!("sync to {}: {:?}", router.id, downlink);
tracing::debug!("sync to {:?}: {:?}", router.id, downlink);
send(&downlink, &mut buf, &socket, &addr).await?;
} else if updating.router_id == 0
} else if updating.router_id == Default::default()
&& now.duration_since(start) >= TIMEOUT // 刚启动时静默,先学习
&& let Some(router) = routers.get(&uplink.id)
&& router.is_online()
&& router.last_update != now
&& let Some(downlink) = router.update(now, &routers, &connections)
&& let Some(downlink) = router.update(now, &routers, &connections_data, &regions, &gateways_group, &gateway_routers)
{
updating.router_id = router.id;
updating.message = downlink;
tracing::info!("command to {}: {:?}", router.id, updating.message);
tracing::info!("command to {:?}: {:?}", router.id, updating.message);
send(&updating.message, &mut buf, &socket, &addr).await?;
routers.get_mut(&uplink.id).context("router not found")?.version += 1;
}
......
......@@ -2,6 +2,8 @@ use bincode::{Decode, Encode};
use serde::Serialize;
use std::collections::BTreeMap;
use crate::data::{GatewayGroupID, GatewayID, RegionID, RouterID};
#[derive(Encode, Decode, Default, Debug, Eq, PartialEq, Copy, Clone)]
pub enum MessageType {
#[default]
......@@ -12,12 +14,12 @@ pub enum MessageType {
#[derive(Encode, Decode, Default, Debug, Clone)]
pub struct Uplink {
pub id: u8,
pub id: RouterID,
pub action: MessageType,
pub version: u32,
pub peers: Vec<PeerQuality>,
pub via: BTreeMap<u8, u8>,
pub plan: BTreeMap<u8, BTreeMap<u8, u8>>,
pub via: BTreeMap<RouterID, RouterID>,
pub plan: BTreeMap<RegionID, BTreeMap<GatewayGroupID, GatewayID>>,
}
#[derive(Encode, Decode, Default, Debug, Clone)]
......@@ -25,8 +27,8 @@ pub struct Downlink {
pub action: MessageType,
pub version: u32,
pub ack: u32,
pub via: BTreeMap<u8, u8>,
pub plan: BTreeMap<u8, BTreeMap<u8, u8>>,
pub via: BTreeMap<RouterID, RouterID>,
pub plan: BTreeMap<RegionID, BTreeMap<GatewayGroupID, GatewayID>>,
}
#[derive(Encode, Decode, Serialize, Copy, Clone, Debug, Default)]
......
use crate::data::{ConnectionData, RouterData};
use crate::data::{GatewayGroupID, GatewayID, RegionID, RouterID};
use crate::protocol::{Downlink, MessageType, PeerQuality, Uplink};
use crate::quality::Quality;
use crate::settings::{HALF_LIFE, PENALTY, PENALTY_MIN};
use crate::UpdatingState;
use crate::{UpdatingState, data};
use serde::Serialize;
use std::collections::BTreeMap;
use std::i32;
use std::net::SocketAddr;
use tokio::time::Instant;
#[derive(Serialize, Clone)]
pub struct Router {
pub id: u8,
pub id: RouterID,
#[serde(skip)]
pub version: u32,
// quality from peer to self. HashMap 的 key 是 from.
pub peers: BTreeMap<u8, PeerQuality>,
pub via: BTreeMap<u8, u8>, // dst router_id -> next hop router_id
pub plan: BTreeMap<u8, BTreeMap<u8, u8>>, // group id -> region id -> gateway_id
pub peers: BTreeMap<RouterID, PeerQuality>, // quality from peer to self. HashMap 的 key 是 from.
pub via: BTreeMap<RouterID, RouterID>, // dst router_id -> next hop router_id
pub plan: BTreeMap<RegionID, BTreeMap<GatewayGroupID, GatewayID>>, // group id -> region id -> gateway_id
#[serde(skip)]
pub last_seen: Instant,
#[serde(skip)]
pub last_update: Instant,
#[serde(skip)]
pub addr: Option<SocketAddr>,
// Static config
// pub config: RouterData,
}
impl PartialEq<Self> for Router {
......@@ -36,26 +34,46 @@ impl PartialEq<Self> for Router {
impl Eq for Router {}
impl Router {
pub fn new(data: &RouterData, routers: &Vec<RouterData>, connections: &BTreeMap<u8, BTreeMap<u8, ConnectionData>>) -> Self {
pub fn new(
data: &data::Router,
routers: &Vec<data::Router>,
connections: &BTreeMap<RouterID, BTreeMap<RouterID, data::Connection>>,
regions: &Vec<RegionID>,
gateway_groups: &BTreeMap<GatewayGroupID, Vec<&data::Gateway>>,
gateway_router: &BTreeMap<GatewayID, RouterID>,
) -> Self {
Self {
id: data.id,
version: rand::random(),
peers: connections
peers: connections.iter().filter(|(_, to)| to.contains_key(&data.id)).map(|(&from, _)| (from, Default::default())).collect(),
via: routers.iter().filter(|r| r.id != data.id).map(|r| (r.id, r.id)).collect(),
plan: regions
.iter()
.map(|&region| {
(
region,
gateway_groups
.iter()
.filter(|(_, to)| to.contains_key(&data.id))
.map(|(&from, _)| (from, Default::default()))
.map(|(&group_id, gateways)| (group_id, gateways.iter().min_by_key(|gw| Self::guess_metric(data, gw, &region, gateway_router)).unwrap().id))
.collect(),
)
})
.collect(),
via: routers.iter().filter(|r| r.id != data.id).map(|r| (r.id, r.id)).collect(),
plan: BTreeMap::new(),
last_seen: Instant::now(),
last_update: Instant::now(),
addr: None,
}
}
pub fn guess_metric(data: &data::Router, gw: &data::Gateway, region: &RegionID, gateway_router: &BTreeMap<GatewayID, RouterID>) -> i32 {
gw.metrics[region.0 as usize]
.saturating_add(gw.cost_outbound)
.saturating_add(if gateway_router[&gw.id] == data.id { 0 } else { 100 })
}
pub fn online(&mut self, addr: SocketAddr, now: Instant) {
if self.addr.is_none() {
tracing::info!("router {} online", self.id);
tracing::info!("router {:?} online", self.id);
}
self.addr = Some(addr);
self.last_seen = now;
......@@ -63,7 +81,7 @@ impl Router {
pub fn offline(&mut self) {
if self.addr.is_some() {
tracing::info!("router {} offline", self.id);
tracing::info!("router {:?} offline", self.id);
}
self.addr = None;
}
......@@ -78,7 +96,7 @@ impl Router {
*current = *new
}
} else if uplink.peers.len() != 0 {
tracing::error!("router {} peers count wrong. local {} remote {}", self.id, self.peers.len(), uplink.peers.len());
tracing::error!("router {:?} peers count wrong. local {} remote {}", self.id, self.peers.len(), uplink.peers.len());
}
match uplink.action {
......@@ -104,7 +122,7 @@ impl Router {
if uplink.version == self.version {
self.online(addr, now);
if updating.router_id == self.id {
updating.router_id = 0;
updating.router_id = Default::default();
self.via.append(&mut updating.message.via);
self.plan.append(&mut updating.message.plan);
self.last_update = now;
......@@ -121,59 +139,90 @@ impl Router {
} else if updating.router_id == self.id && uplink.version == self.version - 1 {
Some(updating.message.clone()) // 重传
} else {
tracing::info!("router {} wrong ack ack={}, seq={}", self.id, uplink.version, self.version);
tracing::info!("router {:?} wrong ack ack={}, seq={}", self.id, uplink.version, self.version);
None
}
}
}
}
pub fn update(&self, now: Instant, routers: &BTreeMap<u8, Router>, connections: &BTreeMap<u8, BTreeMap<u8, ConnectionData>>) -> Option<Downlink> {
pub fn update(
&self,
now: Instant,
routers: &BTreeMap<RouterID, Router>,
connections: &BTreeMap<RouterID, BTreeMap<RouterID, data::Connection>>,
regions: &Vec<RegionID>,
gateway_groups: &BTreeMap<GatewayGroupID, Vec<&data::Gateway>>,
gateway_router: &BTreeMap<GatewayID, RouterID>,
) -> Option<Downlink> {
let penalty = PENALTY_MIN + (PENALTY as f32 * f32::exp2(-now.duration_since(self.last_update).div_duration_f32(HALF_LIFE))) as i32;
let mut changed_via: BTreeMap<u8, u8> = BTreeMap::new();
// let mut metric: BTreeMap<u8, i32> = BTreeMap::new();
let mut changed_via: BTreeMap<RouterID, RouterID> = BTreeMap::new();
let mut changed_plan: BTreeMap<RegionID, BTreeMap<GatewayGroupID, GatewayID>> = BTreeMap::new();
let mut metric: BTreeMap<RouterID, i32> = BTreeMap::new();
metric.insert(self.id, 0);
let mut overcome = false;
for to in routers.values().filter(|&r| r != self) {
let current_router = routers.get(self.via.get(&to.id).unwrap()).unwrap();
let current_metric = self.route_quality(to, current_router, routers, connections).map_or(i32::MAX, |r| r.metric());
let candidate: Vec<(&Router, i32)> = connections[&self.id]
match connections[&self.id]
.keys()
.map(|id| routers.get(id).unwrap())
.filter_map(|r| self.route_quality(to, r, routers, connections).map(|q| (r, q.metric())))
.collect();
match candidate.iter().min_by_key(|(_, m)| m) {
.min_by_key(|(_, m)| *m)
{
None if current_router != to => {
// 无论如何都不可达就标记为直连
overcome = true;
changed_via.insert(to.id, to.id);
metric.insert(to.id, i32::MAX);
}
Some((best_router, best_metric)) if current_router != *best_router && *best_metric < current_metric => {
if *best_metric + penalty < current_metric {
Some((best_router, best_metric)) if current_router != best_router && best_metric < current_metric => {
if best_metric + penalty < current_metric {
overcome = true
}
changed_via.insert(to.id, best_router.id);
metric.insert(to.id, best_metric);
}
_ => {}
}
}
for region in regions {
for (group_id, gateways) in gateway_groups.iter() {
let current_gateway = self.plan[&region][group_id];
let current_metric = metric[&gateway_router[&current_gateway]];
let (best_gateway, best_metric) = gateways
.iter()
.map(|g| (g, metric[&gateway_router[&g.id]].saturating_add(g.cost_outbound).saturating_add(g.metrics[region.0 as usize])))
.min_by_key(|(_, m)| *m)
.unwrap();
if current_gateway != best_gateway.id && best_metric < current_metric {
if best_metric.saturating_add(penalty) < current_metric {
overcome = true;
}
changed_plan.entry(*region).or_default().insert(*group_id, best_gateway.id);
}
}
}
if overcome {
Some(Downlink {
action: MessageType::Update,
version: self.version + 1,
ack: self.version,
via: changed_via,
plan: Default::default(),
plan: changed_plan,
})
} else {
None
}
}
pub fn route_quality(&self, to: &Router, via: &Router, routers: &BTreeMap<u8, Router>, connections: &BTreeMap<u8, BTreeMap<u8, ConnectionData>>) -> Option<Quality> {
pub fn route_quality(&self, to: &Router, via: &Router, routers: &BTreeMap<RouterID, Router>, connections: &BTreeMap<RouterID, BTreeMap<RouterID, data::Connection>>) -> Option<Quality> {
assert!(self != to);
assert!(self != via);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment