Commit ec9a95bf authored by nanamicat's avatar nanamicat

new

parent 18326b58
Pipeline #42413 passed with stages
in 7 minutes and 16 seconds
...@@ -621,6 +621,15 @@ version = "0.1.0" ...@@ -621,6 +621,15 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "ppv-lite86"
version = "0.2.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9"
dependencies = [
"zerocopy",
]
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
version = "1.0.103" version = "1.0.103"
...@@ -647,6 +656,7 @@ dependencies = [ ...@@ -647,6 +656,7 @@ dependencies = [
"axum", "axum",
"bincode", "bincode",
"config", "config",
"rand",
"serde", "serde",
"serde_json", "serde_json",
"tokio", "tokio",
...@@ -655,6 +665,36 @@ dependencies = [ ...@@ -655,6 +665,36 @@ dependencies = [
"tracing-subscriber", "tracing-subscriber",
] ]
[[package]]
name = "rand"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
"libc",
"rand_chacha",
"rand_core",
]
[[package]]
name = "rand_chacha"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
dependencies = [
"ppv-lite86",
"rand_core",
]
[[package]]
name = "rand_core"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
dependencies = [
"getrandom",
]
[[package]] [[package]]
name = "redox_syscall" name = "redox_syscall"
version = "0.5.18" version = "0.5.18"
...@@ -1194,3 +1234,23 @@ dependencies = [ ...@@ -1194,3 +1234,23 @@ dependencies = [
"encoding_rs", "encoding_rs",
"hashlink", "hashlink",
] ]
[[package]]
name = "zerocopy"
version = "0.8.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd74ec98b9250adb3ca554bdde269adf631549f51d8a8f8f0a10b50f1cb298c3"
dependencies = [
"zerocopy-derive",
]
[[package]]
name = "zerocopy-derive"
version = "0.8.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8a8d209fdf45cf5138cbb5a506f6b52522a25afccc534d1475dad8e31105c6a"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
...@@ -16,3 +16,4 @@ bincode = "2.0.1" ...@@ -16,3 +16,4 @@ bincode = "2.0.1"
tower-http = { version = "0.6.8", features = ["cors"] } tower-http = { version = "0.6.8", features = ["cors"] }
tracing = "0.1.44" tracing = "0.1.44"
tracing-subscriber = "0.3.22" tracing-subscriber = "0.3.22"
rand = "0.8"
# Railgun Routing Protocol
Fast & Simple routing protocol, for overlay network.
- 通过中心化部署,简单的正面算出数学最优解
- 环路避免
- 支持三角路由
## 客户端的行为:
### 每秒
向直接相连的邻居发送 Hello 报文,报内容为:
```typescript
interface Hello {
id: number // 自己的id
seq: number // 这个报文的顺序号,每次发送自增
time: number // 自己的系统时间,EPOCH 以来的毫秒数
}
```
### 收到邻居信息时
若顺序号跟上次收到的差距较大,重置状态
若顺序号比上次收到的略小,忽略
根据 seq 计算可靠性,根据 time 计算单向延迟
单向延迟为:自己的系统时间 - hello报文写的对方的系统时间
双方时钟**不**需要同步
### 每秒
检查邻居是否超时,超时标记为下线
向服务器发送 Report 报文汇报邻居到自己的质量,报文内容为
```typescript
interface Report {
id: number // 自己的 id
ack: number // 顺序号,每次收到 Change 后自增
peers?: { id: number, reliability: number, delay: number }[]
}
```
### 收到服务器 Change 报文时
```typescript
interface Change {
seq: number // 顺序号,每次下发新的 Change 指令时自增
via: Record<number, number> // 有变更的下一跳路由
}
```
如果报文的 seq 为 0,重置状态和路由表
如果报文的 seq 跟自己的 ack 不同,忽略。
按照要求修改路由表,seq 自增, 额外发送一次 Report 报文 (可以不带 peers)
## 服务器行为
每秒:
检查客户端是否超时,若超时标记为下线。若这个路由器正在变更,退出正在变更状态
- 非 正在变更 状态:
为每个路由器检查,目标到每个路由器,下一跳哪个路由器最优。
只假设这台路由器的下一跳会变化,其他路由表用旧的不考虑改变。
如果发现有比现在更优的,就为这个路由器 seq++ 并 下发 Change 报文。之后进入正在变更状态,跳过其他路由器的检查
- 正在变更 状态:
向路由器重新发送的发送 Change 报文
### 收到客户端 Report 报文时
如果报文 ack 为 0,重置状态并立即发送 Change
如果报文 ack 不为 0 而服务器 seq 为 0,立即发送 Change
如果报文 ack 不为 change seq +1,忽略。
如果携带了 peers 信息,存下来。
如果这个路由器正在变更,退出正在变更状态,额外执行一次每秒更新
## 关于顺序号,断线、重启的一些解释:
### 客户端跟客户端之间
- 重启
客户端收到跟之前差距较大的顺序号时,意味着客户端可能发生了重启或者断网一段时间,应当视为新上线的节点。
客户端跟客户端之间的通讯不需要可靠,即使对方启动后不久又立刻重启,这会导致顺序号略微减少不触发重置,并且忽略掉报文视为丢包。但是这没关系,过几秒钟他就又跟上来了。
- 断网
超时收不到信息就标记为下线 (可靠性=0)
### 客户端跟服务器之间
服务器跟客户端之间的通讯比较需要可靠,因此客户端的seq=0服务器必须要回应,服务器的seq=0客户端也必须要回应,如果发生程序异常seq错乱的事情,会忽略掉所有不正确报文让他超时下限重置,然后从seq=0开始。
任何一个顺序号一定要确认过才会自增,不然会反复发送这一个:
- 客户端重启
客户端 ack=0 那个包服务器会立即回复一个 change,如果丢包就会下个包再来
- 服务端重启
客户端 ack!=0,服务器 seq=0,服务器会回复一个 seq=0 的 change,丢包一样
- 服务端断网
客户端没有收到任何新指令,继续维持现有路由表跑
- 客户端断网
服务器超时后把客户端标记为下线,其他人因为unreachable会进行路由表更新
import { GatewayGroup as GatewayGroupData } from '../import/scripts/GatewayGroup'; // import { GatewayGroup as GatewayGroupData } from '../import/scripts/GatewayGroup';
import _ from 'lodash'; // import _ from 'lodash';
import routers from '../import/data/Router.json'; // import routers from '../import/data/Router.json';
import gatewayGroups from '../import/data/GatewayGroup.json'; // import gatewayGroups from '../import/data/GatewayGroup.json';
//
export interface GatewayGroup extends GatewayGroupData {} // export interface GatewayGroup extends GatewayGroupData {}
//
export class GatewayGroup { // export class GatewayGroup {
static all: Record<number, GatewayGroup> = _.keyBy( // static all: Record<number, GatewayGroup> = _.keyBy(
gatewayGroups.map((g) => new this(g)), // gatewayGroups.map((g) => new this(g)),
'id' // 'id'
); // );
public routers: number[]; // public routers: number[];
//
constructor(public data: GatewayGroupData) { // constructor(public data: GatewayGroupData) {
Object.assign(this, data); // Object.assign(this, data);
this.routers = GatewayGroup.groupRouters(data); // this.routers = GatewayGroup.groupRouters(data);
} // }
//
static groupRouters(g: GatewayGroupData): number[] { // static groupRouters(g: GatewayGroupData): number[] {
return _.uniq( // return _.uniq(
g.locationPrefix // g.locationPrefix
.flatMap((p) => routers.filter((r) => r.location.startsWith(p))) // .flatMap((p) => routers.filter((r) => r.location.startsWith(p)))
.concat(routers.filter((r) => g.includeRouters.includes(r.name))) // .concat(routers.filter((r) => g.includeRouters.includes(r.name)))
.filter((r) => !g.excludeRouters.includes(r.name)) // .filter((r) => !g.excludeRouters.includes(r.name))
.map((r) => r.id) // .map((r) => r.id)
.concat(gatewayGroups.filter((g1) => g.children.includes(g1.name)).flatMap((c) => this.groupRouters(c))) // .concat(gatewayGroups.filter((g1) => g.children.includes(g1.name)).flatMap((c) => this.groupRouters(c)))
); // );
} // }
} // }
This diff is collapsed.
use crate::api::create_app; use crate::api::create_app;
use crate::data::{ConnectionData, RouterData}; use crate::data::RouterData;
use crate::protocol::{Change, Report}; use crate::protocol::{Downlink, Uplink};
use crate::router::Router; use crate::router::Router;
use crate::settings::{Settings, INTERVAL, TIMEOUT}; use crate::settings::{Settings, TIMEOUT};
use anyhow::Result;
use ::config::Config; use ::config::Config;
use anyhow::{Context, Result};
use config::Environment; use config::Environment;
use net::SocketAddr; use net::SocketAddr;
use serde::Serialize;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::net; use std::net;
use std::ops::Bound::{Excluded, Included, Unbounded};
use std::sync::Arc; use std::sync::Arc;
use tokio::net::UdpSocket; use tokio::net::UdpSocket;
use tokio::select;
use tokio::sync::RwLock; use tokio::sync::RwLock;
use tokio::time::interval; use tokio::time::Instant;
mod api; mod api;
mod data; mod data;
...@@ -25,10 +22,10 @@ mod quality; ...@@ -25,10 +22,10 @@ mod quality;
mod router; mod router;
mod settings; mod settings;
#[derive(Default, Serialize)] #[derive(Default, Debug)]
pub struct UpdatingState { pub struct UpdatingState {
router_id: u8, router_id: u8,
change: Change, message: Downlink,
} }
#[tokio::main] #[tokio::main]
...@@ -56,8 +53,6 @@ async fn main() -> Result<()> { ...@@ -56,8 +53,6 @@ async fn main() -> Result<()> {
// gateway_groups_map.insert(g.id, g); // gateway_groups_map.insert(g.id, g);
// } // }
let mut updating: UpdatingState = UpdatingState::default();
let listener = tokio::net::TcpListener::bind(config.http_bind).await?; let listener = tokio::net::TcpListener::bind(config.http_bind).await?;
let app = create_app(routers_data.clone(), connections.clone(), routers.clone()); let app = create_app(routers_data.clone(), connections.clone(), routers.clone());
...@@ -66,81 +61,46 @@ async fn main() -> Result<()> { ...@@ -66,81 +61,46 @@ async fn main() -> Result<()> {
axum::serve(listener, app).await.unwrap(); axum::serve(listener, app).await.unwrap();
}); });
let mut updating: UpdatingState = UpdatingState::default();
let socket = UdpSocket::bind(config.udp_bind).await?; let socket = UdpSocket::bind(config.udp_bind).await?;
println!("UDP listening on {}", config.udp_bind); println!("UDP listening on {}", config.udp_bind);
let mut buf = [0u8; u16::MAX as usize]; // Max UDP size let mut buf = [0u8; u16::MAX as usize]; // Max UDP size
let mut interval = interval(INTERVAL);
let mut cursor = 0;
loop { loop {
select! { let (len, addr) = socket.recv_from(&mut buf).await?;
biased; if let Ok((mut uplink, _)) = bincode::decode_from_slice::<Uplink, _>(&buf[..len], bincode::config::standard()) {
let mut routers = routers.write().await;
result = socket.recv_from(&mut buf) => { let now = Instant::now();
let (len, addr) = result?; // 将超时的路由器下线
if let Ok((report, _)) = bincode::decode_from_slice::<Report, _>(&buf[..len], bincode::config::standard()) { for router in routers.values_mut() {
let mut routers = routers.write().await; if router.is_online() && router.time.duration_since(now) >= TIMEOUT {
if let Some(router) = routers.get_mut(&report.id) { router.offline();
let old_updating_exists = updating.router_id != 0;
if let Some(change) = router.on_message(report, addr, &mut updating) {
// 新上线的客户端,发送 rst,seq 不增加
send(&change, &mut buf, &socket, &addr).await?;
}
if old_updating_exists && updating.router_id == 0 { // an updating just resolved
tick(&mut routers, &mut cursor, &socket, &mut buf, &mut updating, &connections).await?
}
}
} }
} }
if updating.router_id != 0 && !routers.get(&updating.router_id).context("router not found")?.is_online() {
now = interval.tick() => { updating.router_id = 0;
let mut routers = routers.write().await;
for router in routers.values_mut() {
if router.addr != None && now.duration_since(router.time) > TIMEOUT {
router.offline();
router.finish(&mut updating);
}
}
if updating.router_id != 0 {
let router = routers.get_mut(&updating.router_id).expect("updating router_id should exist");
send(&updating.change, &mut buf, &socket, &router.addr.unwrap()).await?;
}
if updating.router_id == 0 {
tick(&mut routers, &mut cursor, &socket, &mut buf, &mut updating, &connections).await?
}
} }
} // 处理收到的消息
} if let Some(router) = routers.get_mut(&uplink.id)
&& let Some(downlink) = router.on_message(&mut uplink, addr, &mut updating, now)
async fn tick( {
routers: &mut BTreeMap<u8, Router>, send(&downlink, &mut buf, &socket, &addr).await?;
cursor: &mut u8, } else if updating.router_id == 0
socket: &UdpSocket, && let Some(router) = routers.get(&uplink.id)
buf: &mut [u8], && let Some(downlink) = router.update(&routers, &connections)
updating: &mut UpdatingState, {
connections: &BTreeMap<u8, BTreeMap<u8, ConnectionData>>,
) -> Result<()> {
for (_, router) in routers.range((Excluded(*cursor), Unbounded)).chain(routers.range((Unbounded, Included(*cursor)))) {
if let Some(change) = router.update(&routers, &connections) {
*cursor = router.id;
updating.router_id = router.id; updating.router_id = router.id;
updating.change = change; updating.message = downlink;
send(&updating.change, buf, socket, &router.addr.unwrap()).await?; send(&updating.message, &mut buf, &socket, &addr).await?;
break; routers.get_mut(&uplink.id).expect("").version += 1;
} }
} }
if updating.router_id != 0 {
let router = routers.get_mut(&updating.router_id).unwrap();
router.seq += 1;
}
Ok(())
} }
}
async fn send(change: &Change, buf: &mut [u8], socket: &UdpSocket, addr: &SocketAddr) -> Result<()> { async fn send(downlink: &Downlink, buf: &mut [u8], socket: &UdpSocket, addr: &SocketAddr) -> Result<()> {
let len = bincode::encode_into_slice(change, buf, bincode::config::standard())?; let len = bincode::encode_into_slice(downlink, buf, bincode::config::standard())?;
let _ = socket.send_to(&buf[..len], addr).await; let _ = socket.send_to(&buf[..len], addr).await;
Ok(()) Ok(())
}
} }
...@@ -2,25 +2,36 @@ use bincode::{Decode, Encode}; ...@@ -2,25 +2,36 @@ use bincode::{Decode, Encode};
use serde::Serialize; use serde::Serialize;
use std::collections::BTreeMap; use std::collections::BTreeMap;
#[derive(Encode, Decode, Serialize)] #[derive(Encode, Decode, Default, Debug, Eq, PartialEq, Copy, Clone)]
pub struct Report { pub enum MessageType {
#[default]
Query,
Full,
Update,
}
#[derive(Encode, Decode, Default, Debug, Clone)]
pub struct Uplink {
pub id: u8, pub id: u8,
pub ack: u8, pub action: MessageType,
pub syn: bool, pub version: u32,
pub peers: Vec<PeerQuality>, pub peers: Vec<PeerQuality>,
pub via: BTreeMap<u8, u8>,
pub plan: BTreeMap<u8, BTreeMap<u8, u8>>,
}
#[derive(Encode, Decode, Default, Debug, Clone)]
pub struct Downlink {
pub action: MessageType,
pub version: u32,
pub ack: u32,
pub via: BTreeMap<u8, u8>,
pub plan: BTreeMap<u8, BTreeMap<u8, u8>>,
} }
#[derive(Encode, Decode, Copy, Clone, Serialize)] #[derive(Encode, Decode, Serialize, Copy, Clone, Debug)]
pub struct PeerQuality { pub struct PeerQuality {
pub delay: i16, pub delay: i16,
pub reliability: u8, pub reliability: u8,
pub jitter: u8, pub jitter: u8,
} }
#[derive(Encode, Decode, Default, Serialize)]
pub struct Change {
pub seq: u8,
pub rst: bool,
pub via: BTreeMap<u8, u8>,
pub plan: BTreeMap<u8, BTreeMap<u8, u8>>,
}
...@@ -17,12 +17,11 @@ impl Quality { ...@@ -17,12 +17,11 @@ impl Quality {
cost: 0, cost: 0,
}; };
pub fn concat(&mut self, next: &PeerQuality, cost: u32) -> &mut Self { pub fn concat(&mut self, next: &PeerQuality, cost: u32) {
self.delay += next.delay as i32; self.delay += next.delay as i32;
self.jitter += next.jitter as u32; self.jitter += next.jitter as u32;
self.reliability *= next.reliability as f32 / 255.0; self.reliability *= next.reliability as f32 / 255.0;
self.cost += cost; self.cost += cost;
self
} }
pub fn metric(&self) -> i32 { pub fn metric(&self) -> i32 {
...@@ -43,4 +42,13 @@ impl Quality { ...@@ -43,4 +42,13 @@ impl Quality {
cost: 0, cost: 0,
} }
} }
pub fn from(other: PeerQuality, cost: u32) -> Quality {
Quality {
delay: other.delay as i32,
jitter: other.jitter as u32,
reliability: other.reliability as f32 / 255.0,
cost,
}
}
} }
use crate::data::{ConnectionData, RouterData}; use crate::data::{ConnectionData, RouterData};
use crate::protocol::{Change, PeerQuality, Report}; use crate::protocol::{Downlink, MessageType, PeerQuality, Uplink};
use crate::quality::Quality; use crate::quality::Quality;
use crate::settings::THROTTLE; use crate::settings::THROTTLE;
use crate::UpdatingState; use crate::UpdatingState;
...@@ -12,7 +12,7 @@ use tokio::time::Instant; ...@@ -12,7 +12,7 @@ use tokio::time::Instant;
pub struct Router { pub struct Router {
pub id: u8, pub id: u8,
#[serde(skip)] #[serde(skip)]
pub seq: u8, pub version: u32,
// quality from peer to self. HashMap 的 key 是 from. // quality from peer to self. HashMap 的 key 是 from.
pub peers: BTreeMap<u8, PeerQuality>, pub peers: BTreeMap<u8, PeerQuality>,
pub via: BTreeMap<u8, u8>, // dst router_id -> next hop router_id pub via: BTreeMap<u8, u8>, // dst router_id -> next hop router_id
...@@ -37,7 +37,7 @@ impl Router { ...@@ -37,7 +37,7 @@ impl Router {
pub fn new(data: &RouterData, routers: &Vec<RouterData>, connections: &BTreeMap<u8, BTreeMap<u8, ConnectionData>>) -> Self { pub fn new(data: &RouterData, routers: &Vec<RouterData>, connections: &BTreeMap<u8, BTreeMap<u8, ConnectionData>>) -> Self {
Self { Self {
id: data.id, id: data.id,
seq: 0, version: rand::random(),
peers: connections peers: connections
.iter() .iter()
.filter(|(_, to)| to.contains_key(&data.id)) .filter(|(_, to)| to.contains_key(&data.id))
...@@ -61,88 +61,109 @@ impl Router { ...@@ -61,88 +61,109 @@ impl Router {
// router // router
} }
pub fn offline(&mut self) { pub fn reset(&mut self) {
if self.addr != None { for (to, via) in self.via.iter_mut() {
tracing::info!("router {} offline", self.id); *via = *to;
} else {
tracing::info!("router {} offline to offline", self.id);
}
self.addr = None;
for peer in self.peers.values_mut() {
peer.reliability = 0;
} }
} }
pub fn online(&mut self, addr: SocketAddr) {
pub fn online(&mut self, addr: SocketAddr, now: Instant) {
if self.addr == None { if self.addr == None {
tracing::info!("router {} online", self.id); tracing::info!("router {} online", self.id);
} }
self.addr = Some(addr); self.addr = Some(addr);
self.time = Instant::now(); self.time = now;
} }
pub fn on_message(&mut self, data: Report, addr: SocketAddr, updating: &mut UpdatingState) -> Option<Change> { pub fn offline(&mut self) {
// 不带 syn, 序号正确:已经在线的,或者是上线流程走完 self.addr = None;
// 有 syn:客户端新上线;无 addr, 服务端新上线新上线,先把客户端标记为离线来走登录流程
if !data.syn && data.ack == self.seq.wrapping_add(1) {
self.online(addr);
for (current, new) in self.peers.values_mut().zip(data.peers) {
*current = new
}
if updating.router_id == self.id {
self.via.append(&mut updating.change.via);
self.finish(updating);
}
None
} else if data.syn || self.addr == None {
self.offline();
self.finish(updating);
Some(Change {
seq: self.seq,
via: self.via.clone(),
plan: self.plan.clone(),
rst: true,
})
} else {
tracing::info!("router {} wrong ack ack={}, seq={}", self.id, data.ack, self.seq);
// 序号不对的,忽略
None
}
} }
pub fn finish(&self, updating: &mut UpdatingState) { pub fn is_online(&self) -> bool {
if updating.router_id == self.id { self.addr.is_some()
updating.router_id = 0;
}
} }
pub fn update(&self, routers: &BTreeMap<u8, Router>, connections: &BTreeMap<u8, BTreeMap<u8, ConnectionData>>) -> Option<Change> { pub fn on_message(&mut self, uplink: &mut Uplink, addr: SocketAddr, updating: &mut UpdatingState, now: Instant) -> Option<Downlink> {
if self.addr.is_none() { match uplink.action {
return None; MessageType::Query => {
for (current, new) in self.peers.values_mut().zip(&mut uplink.peers) {
*current = *new
}
Some(Downlink {
action: MessageType::Full,
version: self.version,
ack: uplink.version,
via: self.via.clone(),
plan: self.plan.clone(),
})
}
MessageType::Full => {
if uplink.version == self.version {
self.reset();
self.via.append(&mut uplink.via);
self.plan.append(&mut uplink.plan);
self.online(addr, now);
}
None
}
MessageType::Update => {
for (current, new) in self.peers.values_mut().zip(&mut uplink.peers) {
*current = *new
}
if uplink.version == self.version {
self.online(addr, now);
if updating.router_id == self.id {
updating.router_id = 0;
}
None
} else if !self.is_online() {
Some(Downlink {
action: MessageType::Query,
version: self.version,
ack: uplink.version,
via: Default::default(),
plan: Default::default(),
})
} else if updating.router_id == self.id && uplink.version == self.version - 1 {
Some(updating.message.clone()) // 重传
} else {
tracing::info!("router {} wrong ack ack={}, seq={}", self.id, uplink.version, self.version);
None
}
}
} }
}
pub fn update(&self, routers: &BTreeMap<u8, Router>, connections: &BTreeMap<u8, BTreeMap<u8, ConnectionData>>) -> Option<Downlink> {
let mut changed_via: BTreeMap<u8, u8> = BTreeMap::new(); let mut changed_via: BTreeMap<u8, u8> = BTreeMap::new();
// let mut metric: BTreeMap<u8, i32> = BTreeMap::new(); // let mut metric: BTreeMap<u8, i32> = BTreeMap::new();
for to in routers.values().filter(|&r| r != self) { for to in routers.values().filter(|&r| r != self) {
let current_router = routers.get(self.via.get(&to.id).unwrap()).unwrap();
let current_metric = self.route_quality(to, current_router, routers, connections).metric();
let candidate: Vec<(&Router, i32)> = connections[&self.id] let candidate: Vec<(&Router, i32)> = connections[&self.id]
.keys() .keys()
.map(|id| routers.get(id).unwrap()) .map(|id| routers.get(id).unwrap())
.map(|r| (r, self.route_quality(to, r, routers, connections).metric())) .map(|r| (r, self.route_quality(to, r, routers, connections).metric()))
.collect(); .collect();
let current = candidate.iter().find(|(v, _)| v.id == *self.via.get(&to.id).expect(""));
let (best_router, best_metric) = candidate.iter().min_by_key(|(_, m)| m).unwrap(); let (best_router, best_metric) = candidate.iter().min_by_key(|(_, m)| m).unwrap();
if current.is_none_or(|(current_router, current_metric)| current_router != best_router && *best_metric != i32::MAX && *best_metric + THROTTLE < *current_metric) {
if *best_metric == i32::MAX && current_router != to {
// 无论如何都不可达就标记为直连
changed_via.insert(to.id, to.id);
} else if current_router != *best_router && *best_metric + THROTTLE < current_metric {
changed_via.insert(to.id, best_router.id); changed_via.insert(to.id, best_router.id);
// metric.insert(to.id, *best_metric);
} }
} }
if changed_via.len() > 0 { if changed_via.len() > 0 {
Some(Change { Some(Downlink {
seq: self.seq + 1, action: MessageType::Update,
rst: false, version: self.version + 1,
ack: self.version,
via: changed_via, via: changed_via,
plan: BTreeMap::new(), plan: Default::default(),
}) })
} else { } else {
None None
...@@ -159,32 +180,24 @@ impl Router { ...@@ -159,32 +180,24 @@ impl Router {
let mut current = self; let mut current = self;
let mut next = via; let mut next = via;
loop { loop {
if !next.is_online() {
return Quality::UNREACHABLE;
}
// 不通的情况 via 会标记为直连,实际不可达 // 不通的情况 via 会标记为直连,实际不可达
match next.peers.get(&current.id) { match next.peers.get(&current.id) {
None => return Quality::UNREACHABLE, None => return Quality::UNREACHABLE,
Some(quality) => { Some(quality) => result.concat(quality, connections[&current.id][&next.id].metric),
if quality.reliability == 0 { }
// 不通
return Quality::UNREACHABLE;
}
result.concat(quality, connections[&current.id][&next.id].metric);
if next == to {
// 到达
return result;
}
// Next hop // Next hop
current = next; current = next;
next = &routers[&current.via[&to.id]]; next = &routers[&current.via[&to.id]];
// 检查环路 // 检查环路
if route.contains(&next) { if route.contains(&next) {
return Quality::UNREACHABLE; return Quality::UNREACHABLE;
}
route.push(next);
}
} }
route.push(next);
} }
} }
} }
...@@ -8,6 +8,5 @@ pub struct Settings { ...@@ -8,6 +8,5 @@ pub struct Settings {
pub http_bind: SocketAddr, pub http_bind: SocketAddr,
} }
pub const INTERVAL: Duration = Duration::from_secs(1);
pub const TIMEOUT: Duration = Duration::from_secs(10); pub const TIMEOUT: Duration = Duration::from_secs(10);
pub const THROTTLE: i32 = 10; pub const THROTTLE: i32 = 10;
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment