Commit 9d847083 authored by nanamicat's avatar nanamicat

db

parent 5093db1d
......@@ -238,6 +238,12 @@ dependencies = [
"const-random",
]
[[package]]
name = "either"
version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719"
[[package]]
name = "encoding_rs"
version = "0.8.35"
......@@ -445,6 +451,15 @@ dependencies = [
"tower-service",
]
[[package]]
name = "itertools"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285"
dependencies = [
"either",
]
[[package]]
name = "itoa"
version = "1.0.15"
......@@ -674,9 +689,11 @@ dependencies = [
"axum",
"bincode",
"config",
"itertools",
"rand",
"serde",
"serde_json",
"string-interner",
"tokio",
"tower-http",
"tracing",
......@@ -889,6 +906,16 @@ dependencies = [
"windows-sys 0.60.2",
]
[[package]]
name = "string-interner"
version = "0.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23de088478b31c349c9ba67816fa55d9355232d63c3afea8bf513e31f0f1d2c0"
dependencies = [
"hashbrown 0.15.5",
"serde",
]
[[package]]
name = "syn"
version = "2.0.111"
......
......@@ -17,3 +17,5 @@ tower-http = { version = "0.6.8", features = ["cors"] }
tracing = "0.1.44"
tracing-subscriber = "0.3.22"
rand = "0.9.2"
string-interner = "0.19.0"
itertools = "0.14.0"
......@@ -4,19 +4,27 @@ use std::collections::BTreeMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use crate::data::{self, RouterID};
use crate::data::{self, DATABASE, RouterID};
use crate::router::Router;
use tower_http::cors::CorsLayer;
#[derive(Serialize, Clone)]
pub struct Info {
pub routers: Vec<data::Router>,
pub connections: BTreeMap<RouterID, BTreeMap<RouterID, data::Connection>>,
pub struct Info<'a> {
pub routers: &'a Vec<data::Router>,
pub connections: &'a BTreeMap<RouterID, BTreeMap<RouterID, data::Connection>>,
}
pub fn create_app(routers: Vec<data::Router>, connections: BTreeMap<RouterID, BTreeMap<RouterID, data::Connection>>, routers2: Arc<RwLock<BTreeMap<RouterID, Router>>>) -> axum::Router {
pub fn create_app(routers: Arc<RwLock<BTreeMap<RouterID, Router>>>) -> axum::Router {
axum::Router::new()
.route("/info", get(|| async move { Json(Info { routers, connections }) }))
.route("/metrics", get(|| async move { Json(routers2.read().await.clone()) }))
.route(
"/info",
get(|| async move {
Json(Info {
routers: &DATABASE.routers,
connections: &DATABASE.connections,
})
}),
)
.route("/metrics", get(|| async move { Json(routers.read().await.clone()) }))
.layer(CorsLayer::permissive())
}
use bincode::{Decode, Encode};
use itertools::{EitherOrBoth, Itertools};
use serde::{Deserialize, Serialize};
use std::{collections::BTreeMap, sync::OnceLock};
use string_interner::{StringInterner, Symbol, backend::StringBackend};
#[derive(Serialize, Deserialize, Clone)]
pub struct Router {
......@@ -14,7 +17,8 @@ pub struct Router {
#[derive(Serialize, Deserialize, Clone)]
pub struct Gateway {
pub id: GatewayID,
pub router: String,
#[serde(deserialize_with = "deserialize_router_id", serialize_with = "serialize_router_id")]
pub router: RouterID,
pub cost_outbound: i32,
pub metrics: Vec<i32>,
}
......@@ -25,8 +29,8 @@ pub struct GatewayGroup {
pub id: GatewayGroupID,
pub name: String,
pub location_prefix: Vec<String>,
pub include_routers: Vec<String>,
pub exclude_routers: Vec<String>,
pub include_routers: Vec<RouterID>,
pub exclude_routers: Vec<RouterID>,
pub children: Vec<String>,
pub desk_mark: u16,
}
......@@ -51,6 +55,16 @@ pub struct Region {}
#[derive(Serialize, Deserialize, Encode, Decode, Clone, Copy, Default, Ord, PartialOrd, Eq, PartialEq, Debug)]
pub struct RouterID(pub u8);
impl Symbol for RouterID {
fn try_from_usize(index: usize) -> Option<Self> {
if index <= u8::MAX as usize { Some(Self(index as u8)) } else { None }
}
fn to_usize(self) -> usize {
self.0 as usize
}
}
#[derive(Serialize, Deserialize, Encode, Decode, Clone, Copy, Default, Ord, PartialOrd, Eq, PartialEq, Debug)]
pub struct GatewayID(pub u8);
......@@ -71,3 +85,80 @@ impl From<u16> for GatewayGroupID {
#[derive(Serialize, Deserialize, Encode, Decode, Clone, Copy, Default, Ord, PartialOrd, Eq, PartialEq, Debug)]
pub struct RegionID(pub u8);
#[derive(Default)]
pub struct Database {
pub routers: Vec<Router>,
pub gateways: Vec<Gateway>,
pub gateway_groups: Vec<GatewayGroup>,
pub regions: Vec<Region>,
pub connections: BTreeMap<RouterID, BTreeMap<RouterID, Connection>>,
}
pub static DATABASE: std::sync::LazyLock<Database> = std::sync::LazyLock::new(|| Database::load());
static ROUTER_ID_REGISTRY: OnceLock<StringInterner<StringBackend<RouterID>>> = OnceLock::new();
impl Database {
pub fn load() -> Self {
Self {
routers: register(Self::load_file("import/data/Router.json"), |r| r.id.0, |r| r.name.clone(), &ROUTER_ID_REGISTRY),
gateways: Self::load_file("import/data/Gateway.json"),
gateway_groups: Self::load_file("import/data/GatewayGroup.json"),
regions: Self::load_file("import/data/Region.json"),
connections: Self::load_file("import/connections.json"),
}
}
fn load_file<T: serde::de::DeserializeOwned>(path: &str) -> T {
serde_json::from_str(&std::fs::read_to_string(path).unwrap()).unwrap()
}
}
pub fn register<T, I, N, S, Sym>(mut data: Vec<T>, num: N, str: S, registry: &OnceLock<StringInterner<StringBackend<Sym>>>) -> Vec<T>
where
T: serde::de::DeserializeOwned,
N: Fn(&T) -> I,
S: Fn(&T) -> String,
I: Ord + Copy + std::fmt::Display + From<u8>,
std::ops::RangeInclusive<I>: Iterator<Item = I>,
Sym: Symbol + std::fmt::Debug,
{
data.sort_by_key(&num);
let mut interner = StringInterner::<StringBackend<Sym>>::new();
if let Some(max_id) = data.last().map(&num) {
for item in (I::from(0)..=max_id).merge_join_by(data.iter(), |i, r| i.cmp(&num(r))) {
let name = match item {
EitherOrBoth::Both(_, r) => str(r),
EitherOrBoth::Left(i) => i.to_string(),
EitherOrBoth::Right(_) => unreachable!(),
};
let symbol = interner.get_or_intern(name);
println!("{:?} = {}", symbol, item.left().unwrap());
}
}
interner.shrink_to_fit();
registry.set(interner).unwrap();
data
}
fn deserialize_router_id<'de, D>(deserializer: D) -> Result<RouterID, D::Error>
where
D: serde::Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
ROUTER_ID_REGISTRY.get().unwrap().get(&s).ok_or_else(|| serde::de::Error::custom(format!("Unknown router: {s}")))
}
fn serialize_router_id<S>(id: &RouterID, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let s = ROUTER_ID_REGISTRY
.get()
.unwrap()
.resolve(*id)
.ok_or_else(|| serde::ser::Error::custom(format!("Unknown id: {}", id.0)))?;
serializer.serialize_str(s)
}
use crate::data::{self, GatewayGroup};
use crate::data::{self, GatewayGroup, RouterID};
use std::collections::BTreeSet;
impl GatewayGroup {
pub fn search_routers(&self, routers_data: &[data::Router], groups_data: &[data::GatewayGroup]) -> BTreeSet<String> {
let mut routers: BTreeSet<String> = self
pub fn search_routers(&self, routers_data: &[data::Router], groups_data: &[data::GatewayGroup]) -> BTreeSet<RouterID> {
let mut routers: BTreeSet<RouterID> = self
.children
.iter()
.flat_map(|c| groups_data.iter().find(|g| &g.name == c))
.flat_map(|g| g.search_routers(routers_data, groups_data))
.chain(routers_data.iter().filter(|r| self.location_prefix.iter().any(|p| r.location.starts_with(p))).map(|r| r.name.clone()))
.chain(routers_data.iter().filter(|r| self.location_prefix.iter().any(|p| r.location.starts_with(p))).map(|r| r.id))
.chain(self.include_routers.iter().cloned())
.collect();
for r in &self.exclude_routers {
......
use crate::api::create_app;
use crate::data::{GatewayGroupID, GatewayID, RegionID, RouterID};
use crate::data::{DATABASE, GatewayGroupID, RegionID, RouterID};
use crate::protocol::{Downlink, Uplink};
use crate::router::Router;
use crate::settings::{Settings, TIMEOUT};
......@@ -33,30 +33,21 @@ async fn main() -> Result<()> {
tracing_subscriber::fmt::init();
let config: Settings = Config::builder().add_source(Environment::default()).build()?.try_deserialize()?;
let routers_data: Vec<data::Router> = serde_json::from_str(&std::fs::read_to_string("import/data/Router.json")?)?;
let gateways_data: Vec<data::Gateway> = serde_json::from_str(&std::fs::read_to_string("import/data/Gateway.json")?)?;
let gateways_groups_data: Vec<data::GatewayGroup> = serde_json::from_str(&std::fs::read_to_string("import/data/GatewayGroup.json")?)?;
let regions_data: Vec<data::Region> = serde_json::from_str(&std::fs::read_to_string("import/data/Region.json")?)?;
let connections_data: BTreeMap<RouterID, BTreeMap<RouterID, data::Connection>> = serde_json::from_str(&std::fs::read_to_string("import/connections.json")?)?;
let gateways_group: BTreeMap<GatewayGroupID, Vec<&data::Gateway>> = gateways_groups_data
let regions: Vec<RegionID> = DATABASE.regions.iter().enumerate().map(|(i, _)| RegionID(i as u8)).collect();
let gateways_group: BTreeMap<GatewayGroupID, Vec<&data::Gateway>> = DATABASE
.gateway_groups
.iter()
.map(|g| {
let routers = g.search_routers(&routers_data, &gateways_groups_data);
(g.id, gateways_data.iter().filter(|gw| routers.contains(&gw.router)).collect())
let routers = g.search_routers(&DATABASE.routers, &DATABASE.gateway_groups);
(g.id, DATABASE.gateways.iter().filter(|gw| routers.contains(&gw.router)).collect())
})
.collect();
let gateway_routers: BTreeMap<GatewayID, RouterID> = gateways_data.iter().map(|gw| (gw.id, routers_data.iter().find(|r| r.name == gw.router).unwrap().id)).collect();
let regions: Vec<RegionID> = regions_data.iter().enumerate().map(|(i, _)| RegionID(i as u8)).collect();
let routers: BTreeMap<RouterID, Router> = routers_data
.iter()
.map(|c| (c.id, Router::new(c, &routers_data, &connections_data, &regions, &gateways_group, &gateway_routers)))
.collect();
let routers: BTreeMap<RouterID, Router> = DATABASE.routers.iter().map(|c| (c.id, Router::new(c, &regions, &gateways_group))).collect();
let routers = Arc::new(RwLock::new(routers));
let listener = tokio::net::TcpListener::bind(config.http_bind).await?;
let app = create_app(routers_data.clone(), connections_data.clone(), routers.clone());
let app = create_app(routers.clone());
tokio::spawn(async move {
println!("HTTP listening on {}", &listener.local_addr().unwrap());
......@@ -97,7 +88,7 @@ async fn main() -> Result<()> {
&& let Some(router) = routers.get(&uplink.id)
&& router.is_online()
&& router.last_update != now
&& let Some(downlink) = router.update(now, &routers, &connections_data, &regions, &gateways_group, &gateway_routers)
&& let Some(downlink) = router.update(now, &routers, &regions, &gateways_group)
{
updating.router_id = router.id;
updating.message = downlink;
......
use crate::data::{GatewayGroupID, GatewayID, RegionID, RouterID};
use crate::data::{DATABASE, GatewayGroupID, GatewayID, RegionID, RouterID};
use crate::protocol::{Downlink, MessageType, PeerQuality, Uplink};
use crate::quality::Quality;
use crate::settings::{HALF_LIFE, PENALTY, PENALTY_MIN};
......@@ -34,19 +34,17 @@ impl PartialEq<Self> for Router {
impl Eq for Router {}
impl Router {
pub fn new(
data: &data::Router,
routers: &Vec<data::Router>,
connections: &BTreeMap<RouterID, BTreeMap<RouterID, data::Connection>>,
regions: &Vec<RegionID>,
gateway_groups: &BTreeMap<GatewayGroupID, Vec<&data::Gateway>>,
gateway_router: &BTreeMap<GatewayID, RouterID>,
) -> Self {
pub fn new(data: &data::Router, regions: &Vec<RegionID>, gateway_groups: &BTreeMap<GatewayGroupID, Vec<&data::Gateway>>) -> Self {
Self {
id: data.id,
version: rand::random(),
peers: connections.iter().filter(|(_, to)| to.contains_key(&data.id)).map(|(&from, _)| (from, Default::default())).collect(),
via: routers.iter().filter(|r| r.id != data.id).map(|r| (r.id, r.id)).collect(),
peers: DATABASE
.connections
.iter()
.filter(|(_, to)| to.contains_key(&data.id))
.map(|(&from, _)| (from, Default::default()))
.collect(),
via: DATABASE.routers.iter().filter(|r| r.id != data.id).map(|r| (r.id, r.id)).collect(),
plan: regions
.iter()
.map(|&region| {
......@@ -54,7 +52,7 @@ impl Router {
region,
gateway_groups
.iter()
.map(|(&gid, gws)| (gid, gws.iter().min_by_key(|gw| Self::guess_metric(data, gw, gateway_router, &region)).unwrap().id))
.map(|(&gid, gws)| (gid, gws.iter().min_by_key(|gw| Self::guess_metric(data, gw, &region)).unwrap().id))
.collect(),
)
})
......@@ -65,10 +63,10 @@ impl Router {
}
}
fn guess_metric(data: &data::Router, gw: &data::Gateway, gateway_router: &BTreeMap<GatewayID, RouterID>, region: &RegionID) -> i32 {
fn guess_metric(data: &data::Router, gw: &data::Gateway, region: &RegionID) -> i32 {
gw.metrics[region.0 as usize]
.saturating_add(gw.cost_outbound)
.saturating_add(if gateway_router[&gw.id] == data.id { 0 } else { 100 })
.saturating_add(if gw.router == data.id { 0 } else { 100 })
}
pub fn online(&mut self, addr: SocketAddr, now: Instant) {
......@@ -146,15 +144,7 @@ impl Router {
}
}
pub fn update(
&self,
now: Instant,
routers: &BTreeMap<RouterID, Router>,
connections: &BTreeMap<RouterID, BTreeMap<RouterID, data::Connection>>,
regions: &Vec<RegionID>,
gateway_groups: &BTreeMap<GatewayGroupID, Vec<&data::Gateway>>,
gateway_router: &BTreeMap<GatewayID, RouterID>,
) -> Option<Downlink> {
pub fn update(&self, now: Instant, routers: &BTreeMap<RouterID, Router>, regions: &Vec<RegionID>, gateway_groups: &BTreeMap<GatewayGroupID, Vec<&data::Gateway>>) -> Option<Downlink> {
let penalty = PENALTY_MIN + (PENALTY as f32 * f32::exp2(-now.duration_since(self.last_update).div_duration_f32(HALF_LIFE))) as i32;
let mut changed_via = BTreeMap::new();
let mut changed_plan = BTreeMap::new();
......@@ -165,11 +155,11 @@ impl Router {
// Route updates
for to in routers.values().filter(|&r| r != self) {
let current_via = &routers[&self.via[&to.id]];
let current_metric = self.route_metric(to, current_via, routers, connections);
let (best_via, best_metric) = match connections[&self.id]
let current_metric = self.route_metric(to, current_via, routers);
let (best_via, best_metric) = match DATABASE.connections[&self.id]
.keys()
.map(|id| &routers[id])
.map(|r| (r, self.route_metric(to, r, routers, connections)))
.map(|r| (r, self.route_metric(to, r, routers)))
.min_by_key(|(_, m)| *m)
.unwrap()
{
......@@ -192,11 +182,11 @@ impl Router {
for &region in regions {
for (&gid, gateways) in gateway_groups {
let current_gw = self.plan[&region][&gid];
let current_metric = metrics[&gateway_router[&current_gw]];
let current_metric = metrics[&DATABASE.gateways.iter().find(|f| f.id == current_gw).unwrap().router];
let (best_gw, best_metric) = gateways
.iter()
.map(|g| (g, metrics[&gateway_router[&g.id]].saturating_add(g.cost_outbound).saturating_add(g.metrics[region.0 as usize])))
.map(|g| (g, metrics[&g.router].saturating_add(g.cost_outbound).saturating_add(g.metrics[region.0 as usize])))
.min_by_key(|(_, m)| *m)
.unwrap();
......@@ -222,7 +212,7 @@ impl Router {
}
}
pub fn route_metric(&self, to: &Router, via: &Router, routers: &BTreeMap<RouterID, Router>, connections: &BTreeMap<RouterID, BTreeMap<RouterID, data::Connection>>) -> i32 {
pub fn route_metric(&self, to: &Router, via: &Router, routers: &BTreeMap<RouterID, Router>) -> i32 {
assert!(self != to);
assert!(self != via);
......@@ -236,7 +226,7 @@ impl Router {
if quality.is_none() || quality.unwrap().reliability == 0 || !next.is_online() || route.contains(&next) {
return i32::MAX;
}
result.concat(quality.unwrap(), connections[&current.id][&next.id].metric);
result.concat(quality.unwrap(), DATABASE.connections[&current.id][&next.id].metric);
route.push(next);
current = next;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment