Commit cdae44ba authored by nanamicat's avatar nanamicat

syn

parent c0124156
use crate::api::create_app; use crate::api::create_app;
use crate::data::RouterData; use crate::data::{ConnectionData, RouterData};
use crate::protocol::{Change, Report}; use crate::protocol::{Change, Report};
use crate::router::Router; use crate::router::Router;
use crate::settings::{INTERVAL, Settings, TIMEOUT}; use crate::settings::{Settings, INTERVAL, TIMEOUT};
use ::config::Config;
use anyhow::Result; use anyhow::Result;
use ::config::Config;
use config::Environment; use config::Environment;
use net::SocketAddr;
use serde::Serialize; use serde::Serialize;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::net;
use std::ops::Bound::{Excluded, Included, Unbounded};
use std::sync::Arc; use std::sync::Arc;
use tokio::net::UdpSocket; use tokio::net::UdpSocket;
use tokio::select; use tokio::select;
...@@ -55,7 +58,7 @@ async fn main() -> Result<()> { ...@@ -55,7 +58,7 @@ async fn main() -> Result<()> {
let mut updating: UpdatingState = UpdatingState::default(); let mut updating: UpdatingState = UpdatingState::default();
let listener = tokio::net::TcpListener::bind(config.http_bind).await?; let listener = tokio::net::TcpListener::bind(config.http_bind).await?;
let app = create_app(routers_data, connections.clone(), routers.clone()); let app = create_app(routers_data.clone(), connections.clone(), routers.clone());
tokio::spawn(async move { tokio::spawn(async move {
println!("HTTP listening on {}", &listener.local_addr().unwrap()); println!("HTTP listening on {}", &listener.local_addr().unwrap());
...@@ -67,6 +70,7 @@ async fn main() -> Result<()> { ...@@ -67,6 +70,7 @@ async fn main() -> Result<()> {
let mut buf = [0u8; u16::MAX as usize]; // Max UDP size let mut buf = [0u8; u16::MAX as usize]; // Max UDP size
let mut interval = interval(INTERVAL); let mut interval = interval(INTERVAL);
let mut cursor = 0;
loop { loop {
select! { select! {
...@@ -75,14 +79,15 @@ async fn main() -> Result<()> { ...@@ -75,14 +79,15 @@ async fn main() -> Result<()> {
result = socket.recv_from(&mut buf) => { result = socket.recv_from(&mut buf) => {
let (len, addr) = result?; let (len, addr) = result?;
if let Ok((report, _)) = bincode::decode_from_slice::<Report, _>(&buf[..len], bincode::config::standard()) { if let Ok((report, _)) = bincode::decode_from_slice::<Report, _>(&buf[..len], bincode::config::standard()) {
// println!("{}", serde_json::to_string(&report)?);
let mut routers = routers.write().await; let mut routers = routers.write().await;
if let Some(router) = routers.get_mut(&report.id) { if let Some(router) = routers.get_mut(&report.id) {
let old_updating_exists = updating.router_id != 0;
if let Some(change) = router.on_message(report, addr, &mut updating) { if let Some(change) = router.on_message(report, addr, &mut updating) {
// println!("2.{} {}", router.id, serde_json::to_string(&change)?); // 新上线的客户端,发送 rst,seq 不增加
let len = bincode::encode_into_slice(change, &mut buf, bincode::config::standard())?; send(&change, &mut buf, &socket, &router.addr.unwrap()).await?;
let _ = socket.send_to(&buf[..len], addr).await; }
router.seq += 1; if old_updating_exists && updating.router_id == 0 { // an updating just resolved
tick(&mut routers, &mut cursor, &socket, &mut buf, &mut updating, &connections).await?
} }
} }
} }
...@@ -93,11 +98,8 @@ async fn main() -> Result<()> { ...@@ -93,11 +98,8 @@ async fn main() -> Result<()> {
if updating.router_id != 0 { if updating.router_id != 0 {
let router = routers.get_mut(&updating.router_id).expect("updating router_id should exist"); let router = routers.get_mut(&updating.router_id).expect("updating router_id should exist");
if now.duration_since(router.time) < TIMEOUT { if now.duration_since(router.time) < TIMEOUT {
// println!("3.{}", serde_json::to_string(&updating)?); send(&updating.change, &mut buf, &socket, &router.addr.unwrap()).await?;
let len = bincode::encode_into_slice(&updating.change, &mut buf, bincode::config::standard())?;
let _ = socket.send_to(&buf[..len], router.addr.unwrap()).await;
} else { } else {
// println!("1");
router.offline(); router.offline();
router.finish(&mut updating); router.finish(&mut updating);
} }
...@@ -105,24 +107,42 @@ async fn main() -> Result<()> { ...@@ -105,24 +107,42 @@ async fn main() -> Result<()> {
if updating.router_id == 0 { if updating.router_id == 0 {
for router in routers.values_mut() { for router in routers.values_mut() {
if router.addr != None && now.duration_since(router.time) > TIMEOUT { if router.addr != None && now.duration_since(router.time) > TIMEOUT {
// println!("2");
router.offline(); router.offline();
} }
} }
if let Some((router, change)) = routers.values().find_map(|r|r.update(&routers, &connections).map(|change|(r,change))) { tick(&mut routers, &mut cursor, &socket, &mut buf, &mut updating, &connections).await?
updating.router_id = router.id;
updating.change = change;
// println!("1.{}", serde_json::to_string(&updating)?);
let len = bincode::encode_into_slice(&updating.change, &mut buf, bincode::config::standard())?;
let _ = socket.send_to(&buf[..len], router.addr.unwrap()).await;
}
if updating.router_id != 0 {
let router = routers.get_mut(&updating.router_id).unwrap();
router.seq += 1;
}
} }
} }
} }
} }
async fn tick(
routers: &mut BTreeMap<u8, Router>,
cursor: &mut u8,
socket: &UdpSocket,
buf: &mut [u8],
updating: &mut UpdatingState,
connections: &BTreeMap<u8, BTreeMap<u8, ConnectionData>>,
) -> Result<()> {
for (_, router) in routers.range((Excluded(*cursor), Unbounded)).chain(routers.range((Unbounded, Included(*cursor)))) {
if let Some(change) = router.update(&routers, &connections) {
*cursor = router.id;
updating.router_id = router.id;
updating.change = change;
send(&updating.change, buf, socket, &router.addr.unwrap()).await?;
break;
}
}
if updating.router_id != 0 {
let router = routers.get_mut(&updating.router_id).unwrap();
router.seq += 1;
}
Ok(())
}
async fn send(change: &Change, buf: &mut [u8], socket: &UdpSocket, addr: &SocketAddr) -> Result<()> {
let len = bincode::encode_into_slice(change, buf, bincode::config::standard())?;
let _ = socket.send_to(&buf[..len], addr).await;
Ok(())
}
} }
...@@ -62,7 +62,9 @@ impl Router { ...@@ -62,7 +62,9 @@ impl Router {
} }
pub fn offline(&mut self) { pub fn offline(&mut self) {
println!("router {} offline", self.id); if self.addr != None {
println!("router {} offline", self.id);
}
self.addr = None; self.addr = None;
for peer in self.peers.values_mut() { for peer in self.peers.values_mut() {
peer.reliability = 0; peer.reliability = 0;
...@@ -77,58 +79,30 @@ impl Router { ...@@ -77,58 +79,30 @@ impl Router {
} }
pub fn on_message(&mut self, data: Report, addr: SocketAddr, updating: &mut UpdatingState) -> Option<Change> { pub fn on_message(&mut self, data: Report, addr: SocketAddr, updating: &mut UpdatingState) -> Option<Change> {
// self.addr None (+不满足上线条件) => 客户端不认为自己新上线,但是服务端不认识,立即发送 rst 当前状态,暂不承认上线 // 不带 syn, 序号正确:已经在线的,或者是上线流程走完
// syn + peer => 新上线的客户端,立即发送 rst 当前状态,暂不承认上线,有 updating 解除 updating, addr=none // 有 syn:客户端新上线;无 addr, 服务端新上线新上线,先把客户端标记为离线来走登录流程
// syn + no peer + 正确的 ack 号 => 确认了关于 rst 的指令,上线 if data.syn || self.addr == None {
// self.addr exist + no syn + peer + 正确的 ack 号 => 客户端汇报测量信息 self.offline();
// self.addr exist + no syn + no peer + 正确的 ack 号 => 确认指令,解除 updating self.finish(updating);
Some(Change {
match (self.addr != None, data.syn, data.peers.len() != 0, data.ack == self.seq) { seq: self.seq,
(false, true, false, true) => { via: self.via.clone(),
self.online(addr); plan: self.plan.clone(),
None rst: true,
} })
(false, _, _, _) | (true, true, true, _) => { } else if data.ack == self.seq.wrapping_add(1) {
// if self.addr == None { self.online(addr);
// println!("3.0"); for (current, new) in self.peers.values_mut().zip(data.peers) {
// } else { *current = new
// println!("3.1");
// }
self.offline();
self.finish(updating);
Some(Change {
seq: self.seq,
via: self.via.clone(),
plan: self.plan.clone(),
rst: true,
})
}
(true, false, true, true) => {
self.online(addr);
for (current, new) in self.peers.values_mut().zip(data.peers) {
*current = new
}
None
} }
(true, false, false, true) => { if (updating.router_id == self.id) {
self.online(addr);
self.via.append(&mut updating.change.via); self.via.append(&mut updating.change.via);
self.finish(updating); self.finish(updating);
None
}
_ => {
// println!(
// "???, {}, {}, {}, {}, {} != {}",
// self.addr != None,
// data.syn,
// data.peers.len() != 0,
// data.ack == self.seq.wrapping_add(1),
// data.ack,
// self.seq
// );
None
} }
None
} else {
// 序号不对的,忽略
None
} }
} }
...@@ -144,7 +118,7 @@ impl Router { ...@@ -144,7 +118,7 @@ impl Router {
} }
let mut changed_via: BTreeMap<u8, u8> = BTreeMap::new(); let mut changed_via: BTreeMap<u8, u8> = BTreeMap::new();
let mut metric: BTreeMap<u8, i32> = BTreeMap::new(); // let mut metric: BTreeMap<u8, i32> = BTreeMap::new();
for to in routers.values().filter(|&r| r != self) { for to in routers.values().filter(|&r| r != self) {
let candidate: Vec<(&Router, i32)> = connections[&self.id] let candidate: Vec<(&Router, i32)> = connections[&self.id]
...@@ -154,16 +128,15 @@ impl Router { ...@@ -154,16 +128,15 @@ impl Router {
.collect(); .collect();
let current = candidate.iter().find(|(v, _)| v.id == *self.via.get(&to.id).expect("")); let current = candidate.iter().find(|(v, _)| v.id == *self.via.get(&to.id).expect(""));
let (best_router, best_metric) = candidate.iter().min_by_key(|(_, m)| m).unwrap(); let (best_router, best_metric) = candidate.iter().min_by_key(|(_, m)| m).unwrap();
if current.is_none_or(|(current_router, current_metric)| current_router != best_router && *best_metric != i32::MAX && *best_metric + THROTTLE < *current_metric) { if current.is_none_or(|(current_router, current_metric)| current_router != best_router && *best_metric != i32::MAX && *best_metric + THROTTLE < *current_metric) {
changed_via.insert(to.id, best_router.id); changed_via.insert(to.id, best_router.id);
metric.insert(to.id, *best_metric); // metric.insert(to.id, *best_metric);
} }
} }
if changed_via.len() > 0 { if changed_via.len() > 0 {
Some(Change { Some(Change {
seq: self.seq, seq: self.seq + 1,
rst: false, rst: false,
via: changed_via, via: changed_via,
plan: BTreeMap::new(), plan: BTreeMap::new(),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment