Commit 66048b4c authored by nanamicat's avatar nanamicat

http

parent 7cf5ec91
/node_modules/
/import/
/target/
\ No newline at end of file
This diff is collapsed.
// use crate::protocol::PeerQuality;
// use crate::router::GlobalState;
// use axum::{Router as AxumRouter, extract::State, response::Json, routing::get};
// use serde::Serialize;
// use std::sync::Arc;
//
//
// struct Status {
// tick: u64,
// last_update_ms: u64,
// msg: String,
// }
//
// // Response structs
// #[derive(Serialize)]
// struct RouterStateResponse {
// id: u8,
// peers: std::collections::HashMap<u8, PeerQuality>,
// via: std::collections::HashMap<u8, u8>,
// }
//
// #[derive(Serialize)]
// struct RouterConfigResponse {
// id: u8,
// name: String,
// location: String,
// host: String,
// #[serde(rename = "sshPort")]
// ssh_port: u16,
// user: String,
// }
//
// =
// // serde_json::from_str(&std::fs::read_to_string("../import/connections.json")?)?;
// // let mut routers: BTreeMap<u8, Router>
//
// pub fn app(state: Arc<(&BTreeMap<u8, BTreeMap<u8, ConnectionData>>>)) -> AxumRouter {
// AxumRouter::new()
// .route("/api/state", get(get_state))
// .route("/api/routers", get(get_routers))
// .route("/api/connections", get(get_connections))
// .with_state(state)
// }
//
// async fn get_state(State(state): State<Arc<GlobalState>>) -> Json<Vec<RouterStateResponse>> {
// let mut result = Vec::new();
// // iterate routers
// for router_lock in &state.routers {
// let router = router_lock.read().await;
// result.push(RouterStateResponse {
// id: router.id,
// peers: router.peers.clone(),
// via: router.via.clone(),
// });
// }
// Json(result)
// }
//
// async fn get_routers(State(state): State<Arc<GlobalState>>) -> Json<Vec<RouterConfigResponse>> {
// let mut result = Vec::new();
// for router_lock in &state.routers {
// let router = router_lock.read().await;
// result.push(RouterConfigResponse {
// id: router.config.id,
// name: router.config.name.clone(),
// location: router.config.location.clone(),
// host: router.config.host.clone(),
// ssh_port: router.config.ssh_port,
// user: router.config.user.clone(),
// });
// }
// Json(result)
// }
//
// async fn get_connections(
// State(state): State<Arc<GlobalState>>,
// ) -> Json<std::collections::HashMap<u8, std::collections::HashMap<u8, crate::router::ConnectionData>>>
// {
// Json(state.connections.clone())
// }
// use crate::router::RouterData;
// use serde::{Deserialize, Serialize}; // We will define this in router.rs
//
// #[derive(Debug, Clone, Serialize, Deserialize)]
// pub struct GatewayGroup {
// pub id: u8,
// pub name: String,
// pub location_prefix: Vec<String>, // "locationPrefix" in JSON
// pub include_routers: Vec<String>, // "includeRouters"
// pub exclude_routers: Vec<String>, // "excludeRouters"
// pub children: Vec<String>,
// #[serde(skip)]
// pub routers: Vec<u8>, // Calculated field
// }
//
// impl GatewayGroup {
// pub fn new(
// data: GatewayGroup,
// all_groups: &[GatewayGroup],
// all_routers: &[RouterData],
// ) -> Self {
// let mut group = data;
// group.routers = Self::group_routers(&group, all_groups, all_routers);
// group
// }
//
// pub fn group_routers(
// group: &GatewayGroup,
// all_groups: &[GatewayGroup],
// all_routers: &[RouterData],
// ) -> Vec<u8> {
// let mut router_ids = Vec::new();
//
// // 1. Filter by location prefix
// for router in all_routers {
// for prefix in &group.location_prefix {
// if router.location.starts_with(prefix) {
// router_ids.push(router.id);
// }
// }
// }
//
// // 2. Include routers
// for router in all_routers {
// if group.include_routers.contains(&router.name) {
// router_ids.push(router.id);
// }
// }
//
// // 3. Exclude routers
// router_ids.retain(|id| {
// let name = all_routers.iter().find(|r| r.id == *id).map(|r| &r.name);
// if let Some(name) = name {
// !group.exclude_routers.contains(name)
// } else {
// true
// }
// });
//
// // 4. Recursive children
// for child_name in &group.children {
// if let Some(child_group) = all_groups.iter().find(|g| g.name == *child_name) {
// let child_routers = Self::group_routers(child_group, all_groups, all_routers);
// router_ids.extend(child_routers);
// }
// }
//
// router_ids.sort_unstable();
// router_ids.dedup();
// router_ids
// }
// }
use crate::data::{ConnectionData, RouterData};
use crate::http::HttpMonitor;
use crate::protocol::{Change, Report};
use crate::router::Router;
use crate::settings::{Settings, TIMEOUT};
use ::config::Config;
use ::http::Response;
use anyhow::Result;
use config::{Environment, File};
use futures::SinkExt;
use std::collections::BTreeMap;
use tokio::net::UdpSocket;
use tokio::select;
use tokio::time::{Duration, interval};
mod api;
mod data;
mod gateway_group;
mod http;
mod protocol;
mod quality;
mod router;
mod settings;
#[derive(Default)]
pub struct UpdatingState {
router_id: u8,
change: Change,
}
#[tokio::main]
async fn main() -> Result<()> {
let config: Settings = Config::builder()
.add_source(File::with_name("config/config.json"))
.add_source(Environment::with_prefix("RAILGUN"))
.build()?
.try_deserialize()?;
let routers_data: Vec<RouterData> = serde_json::from_str(&std::fs::read_to_string("../import/data/Router.json")?)?;
let connections: BTreeMap<u8, BTreeMap<u8, ConnectionData>> = serde_json::from_str(&std::fs::read_to_string("../import/connections.json")?)?;
let mut routers: BTreeMap<u8, Router> = routers_data.into_iter().map(|c| (c.id, Router::new(c, &connections))).collect();
// let gateway_groups_data: Vec<GatewayGroup> = serde_json::from_str(&std::fs::read_to_string(
// "../import/data/GatewayGroup.json",
// )?)?;
// 3. Initialize State
// let all_router_ids: Vec<u8> = routers_data.iter().map(|r| r.id).collect();
// let mut gateway_groups_map: BTreeMap<u8, GatewayGroup> = BTreeMap::new();
// let raw_groups = gateway_groups_data.clone();
// for g_data in gateway_groups_data {
// let g = GatewayGroup::new(g_data, &raw_groups, &routers_data);
// gateway_groups_map.insert(g.id, g);
// }
let mut updating: UpdatingState = UpdatingState::default();
// let (status_tx, status_rx) = watch::channel(Status {
// tick: 0,
// last_update_ms: now_ms(),
// msg: "starting".to_string(),
// });
// let app = axum::Router::new()
// .with_state();
// let listener = tokio::net::TcpListener::bind().await?;
// let _ = axum::serve(listener, app);
let socket = UdpSocket::bind(config.udp_bind).await?;
println!("UDP listening on {}", config.udp_bind);
let mut buf = [0u8; 65535]; // Max UDP size
let mut http = HttpMonitor::bind(config.http_bind).await?;
println!("HTTP listening on {}", config.http_bind);
let mut interval = interval(Duration::from_secs(1));
loop {
select! {
biased;
result = socket.recv_from(&mut buf) => {
let (len, addr) = result?;
if let Ok((report, _)) =
bincode::decode_from_slice::<Report, _>(&buf[..len], bincode::config::standard())
&& let Some(router) = routers.get_mut(&report.id)
{
router.on_message(report,&socket, addr, &mut updating);
}
}
now = interval.tick() => {
if updating.router_id != 0 {
let router = routers.get_mut(&updating.router_id).expect("updating router_id should exist");
if now.duration_since(router.time) < TIMEOUT {
let _ = router.send(&updating.change, &socket, router.addr.unwrap()); // updating 期间 addr 一定存在
} else {
router.offline();
router.finish(&mut updating);
}
}
if updating.router_id == 0 {
for router in routers.values_mut() {
if now.duration_since(router.time) > TIMEOUT {
router.offline();
}
}
if let Some(result) = routers.values().find_map(|r|r.update(&routers, &connections)) {
updating = result;
let router = routers.get_mut(&updating.router_id).expect("updating router_id should exist");
let _ = router.send(&updating.change, &socket, router.addr.unwrap());
}
}
}
result = http.next_request() => {
if let Ok((req, mut connection)) = result {
if req.uri().path() == "/status" {
let json = serde_json::to_string(&connections)?;
let response = Response::builder()
.header("Content-Type", "application/json")
.body(json)?;
tokio::spawn(async move {
connection.send(response).await
});
}
}
}
}
}
}
use crate::protocol::PeerQuality;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)]
pub struct Quality {
pub delay: i32,
pub jitter: u32,
pub reliability: f32,
pub cost: u32,
}
impl Quality {
pub const UNREACHABLE: Quality = Quality {
delay: 0,
jitter: 0,
reliability: 0.0,
cost: 0,
};
pub fn concat(&mut self, next: &PeerQuality, cost: u32) -> &mut Self {
self.delay += next.delay as i32;
self.jitter += next.jitter as u32;
self.reliability *= next.reliability as f32 / 255.0;
self.cost += cost;
self
}
pub fn metric(&self) -> i32 {
assert!(0.0 <= self.reliability && self.reliability <= 1.0);
if self.reliability == 0.0 {
i32::MAX
} else {
self.delay + ((1.0 - self.reliability) * 1000.0).round() as i32 + self.cost as i32 / 10
}
}
fn default() -> Self {
Self {
delay: 0,
jitter: 0,
reliability: 1.0,
cost: 0,
}
}
}
use crate::data::{ConnectionData, RouterData};
use crate::protocol::{Change, PeerQuality, Report};
use crate::quality::Quality;
use crate::settings::THROTTLE;
use crate::UpdatingState;
use std::collections::BTreeMap;
use std::net::SocketAddr;
use tokio::net::UdpSocket;
use tokio::time::Instant;
pub struct Router {
pub id: u8,
pub seq: u8,
// quality from peer to self. HashMap 的 key 是 from.
pub peers: BTreeMap<u8, PeerQuality>,
pub via: BTreeMap<u8, u8>, // dst router_id -> next hop router_id
pub plan: BTreeMap<u8, BTreeMap<u8, u8>>, // group id -> region id -> gateway_id
pub time: Instant, // ms
pub addr: Option<SocketAddr>,
// Static config
// pub config: RouterData,
}
impl PartialEq<Self> for Router {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
}
}
impl Eq for Router {}
impl Router {
pub fn new(data: RouterData, connections: &BTreeMap<u8, BTreeMap<u8, ConnectionData>>) -> Self {
Self {
id: data.id,
seq: 0,
peers: connections
.iter()
.filter(|(_, to)| to.contains_key(&data.id))
.map(|(&from, _)| {
(
from,
PeerQuality {
delay: 0,
reliability: 0,
jitter: 0,
},
)
})
.collect(),
via: BTreeMap::new(),
plan: BTreeMap::new(),
time: Instant::now(),
addr: None,
}
// router.reset(all_router_ids);
// router
}
pub fn offline(&mut self) {
self.addr = None;
for peer in self.peers.values_mut() {
peer.reliability = 0;
}
}
pub fn online(&mut self, addr: SocketAddr) {
self.addr = Some(addr);
self.time = Instant::now();
}
pub fn on_message(&mut self, data: Report, socket: &UdpSocket, addr: SocketAddr, updating: &mut UpdatingState) {
// self.addr None (+不满足上线条件) => 客户端不认为自己新上线,但是服务端不认识,立即发送 rst 当前状态,暂不承认上线
// syn + peer => 新上线的客户端,立即发送 rst 当前状态,暂不承认上线,有 updating 解除 updating, addr=none
// syn + no peer + 正确的 ack 号 => 确认了关于 rst 的指令,上线
// self.addr exist + no syn + peer + 正确的 ack 号 => 客户端汇报测量信息
// self.addr exist + no syn + no peer + 正确的 ack 号 => 确认指令,解除 updating
match (self.addr != None, data.syn, data.peers.len() != 0, data.ack == self.seq.wrapping_add(1)) {
(false, true, false, true) => {
self.online(addr);
}
(false, _, _, _) | (_, true, true, _) => {
self.offline();
self.finish(updating);
let change = Change {
seq: self.seq,
via: self.via.clone(),
plan: self.plan.clone(),
rst: true,
};
self.send(&change, socket, addr);
}
(true, false, true, true) => {
self.online(addr);
for (current, new) in self.peers.values_mut().zip(data.peers) {
*current = new
}
}
(true, false, false, true) => {
self.online(addr);
self.finish(updating);
}
_ => {}
}
}
pub async fn change(&self, message: Change, updating: &mut UpdatingState, socket: &UdpSocket) {
updating.router_id = self.id;
updating.change = message;
self.send(&updating.change, socket, self.addr.unwrap()).await;
}
pub fn finish(&self, updating: &mut UpdatingState) {
if updating.router_id == self.id {
updating.router_id = 0;
}
}
pub async fn send(&self, message: &Change, socket: &UdpSocket, addr: SocketAddr) {
let buf = bincode::encode_to_vec(message, bincode::config::standard()).unwrap();
socket.send_to(&buf, addr).await;
}
pub fn update(&self, routers: &BTreeMap<u8, Router>, connections: &BTreeMap<u8, BTreeMap<u8, ConnectionData>>) -> Option<UpdatingState> {
if self.addr.is_none() {
return None;
}
let mut changed_via: BTreeMap<u8, u8> = BTreeMap::new();
let mut metric: BTreeMap<u8, i32> = BTreeMap::new();
for to in routers.values().filter(|&r| r != self) {
let candidate: Vec<(&Router, i32)> = connections[&self.id]
.keys()
.map(|id| routers.get(id).unwrap())
.map(|r| (r, self.route_quality(to, r, routers, connections).metric()))
.collect();
let current = candidate.iter().find(|(v, _)| v.id == *self.via.get(&to.id).expect(""));
let (best_router, best_metric) = candidate.iter().min_by_key(|(_, m)| m).unwrap();
if current.is_none_or(|(current_router, current_metric)| current_router != best_router && best_metric + THROTTLE < *current_metric) {
changed_via.insert(to.id, best_router.id);
metric.insert(to.id, *best_metric);
}
}
if changed_via.len() > 0 {
Some(UpdatingState {
router_id: self.id,
change: Change {
seq: self.seq,
rst: false,
via: changed_via,
plan: BTreeMap::new(),
},
})
} else {
None
}
}
pub fn route_quality(&self, to: &Router, via: &Router, routers: &BTreeMap<u8, Router>, connections: &BTreeMap<u8, BTreeMap<u8, ConnectionData>>) -> Quality {
assert!(self != to);
assert!(self != via);
let mut result = Quality::default();
let mut route = vec![self, via];
let mut current = self;
let mut next = via;
loop {
match next.peers.get(&current.id) {
None => return Quality::UNREACHABLE,
Some(quality) => {
if quality.reliability == 0 {
// 不通
return Quality::UNREACHABLE;
}
result.concat(quality, connections[&current.id][&next.id].metric);
if next == to {
// 到达
return result;
}
// Next hop
current = next;
next = &routers[&current.via[&to.id]];
// via 如果不可达会指向自己
// 并且也检查环路
if route.contains(&next) {
return Quality::UNREACHABLE;
}
route.push(next);
}
}
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment