remove lease manager, add network class to infoq, other config stuff

This commit is contained in:
John Smith
2022-04-03 12:58:06 -04:00
parent 53cd521ba8
commit a33473d8ea
20 changed files with 332 additions and 455 deletions
+141 -90
View File
@@ -3,7 +3,6 @@ use connection_manager::*;
use dht::*;
use hashlink::LruCache;
use intf::*;
use lease_manager::*;
use receipt_manager::*;
use routing_table::*;
use rpc_processor::RPCProcessor;
@@ -15,32 +14,6 @@ pub const MAX_MESSAGE_SIZE: usize = MAX_ENVELOPE_SIZE;
pub const IPADDR_TABLE_SIZE: usize = 1024;
pub const IPADDR_MAX_INACTIVE_DURATION_US: u64 = 300_000_000u64; // 5 minutes
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
pub enum NetworkClass {
Server = 0, // S = Device with public IP and no UDP firewall
Mapped = 1, // M = Device with portmap behind any NAT
FullNAT = 2, // F = Device without portmap behind full-cone NAT
AddressRestrictedNAT = 3, // R1 = Device without portmap behind address-only restricted NAT
PortRestrictedNAT = 4, // R2 = Device without portmap behind address-and-port restricted NAT
OutboundOnly = 5, // O = Outbound only
WebApp = 6, // W = PWA in normal web browser
TorWebApp = 7, // T = PWA in Tor browser
Invalid = 8, // I = Invalid network class, unreachable or can not send packets
}
impl NetworkClass {
pub fn inbound_capable(&self) -> bool {
matches!(
self,
Self::Server
| Self::Mapped
| Self::FullNAT
| Self::AddressRestrictedNAT
| Self::PortRestrictedNAT
)
}
}
#[derive(Copy, Clone, Debug, Default)]
pub struct ProtocolConfig {
pub udp_enabled: bool,
@@ -78,7 +51,6 @@ struct NetworkComponents {
net: Network,
connection_manager: ConnectionManager,
rpc_processor: RPCProcessor,
lease_manager: LeaseManager,
receipt_manager: ReceiptManager,
}
@@ -114,12 +86,17 @@ impl Default for NetworkManagerStats {
}
}
}
struct ClientWhitelistEntry {
last_seen: u64,
}
// The mutable state of the network manager
struct NetworkManagerInner {
routing_table: Option<RoutingTable>,
components: Option<NetworkComponents>,
network_class: Option<NetworkClass>,
stats: NetworkManagerStats,
client_whitelist: LruCache<key::DHTKey, ClientWhitelistEntry>,
}
struct NetworkManagerUnlockedInner {
@@ -143,6 +120,7 @@ impl NetworkManager {
components: None,
network_class: None,
stats: NetworkManagerStats::default(),
client_whitelist: LruCache::new_unbounded(),
}
}
fn new_unlocked_inner(_config: VeilidConfig) -> NetworkManagerUnlockedInner {
@@ -195,15 +173,6 @@ impl NetworkManager {
.rpc_processor
.clone()
}
pub fn lease_manager(&self) -> LeaseManager {
self.inner
.lock()
.components
.as_ref()
.unwrap()
.lease_manager
.clone()
}
pub fn receipt_manager(&self) -> ReceiptManager {
self.inner
.lock()
@@ -250,19 +219,16 @@ impl NetworkManager {
let net = Network::new(self.clone());
let connection_manager = ConnectionManager::new(self.clone());
let rpc_processor = RPCProcessor::new(self.clone());
let lease_manager = LeaseManager::new(self.clone());
let receipt_manager = ReceiptManager::new(self.clone());
self.inner.lock().components = Some(NetworkComponents {
net: net.clone(),
connection_manager: connection_manager.clone(),
rpc_processor: rpc_processor.clone(),
lease_manager: lease_manager.clone(),
receipt_manager: receipt_manager.clone(),
});
// Start network components
rpc_processor.startup().await?;
lease_manager.startup().await?;
receipt_manager.startup().await?;
net.startup().await?;
connection_manager.startup().await;
@@ -289,7 +255,6 @@ impl NetworkManager {
components.connection_manager.shutdown().await;
components.net.shutdown().await;
components.receipt_manager.shutdown().await;
components.lease_manager.shutdown().await;
components.rpc_processor.shutdown().await;
}
@@ -301,14 +266,54 @@ impl NetworkManager {
trace!("NetworkManager::shutdown end");
}
pub fn update_client_whitelist(&self, client: key::DHTKey) {
let mut inner = self.inner.lock();
match inner.client_whitelist.entry(client) {
hashlink::lru_cache::Entry::Occupied(mut entry) => {
entry.get_mut().last_seen = intf::get_timestamp()
}
hashlink::lru_cache::Entry::Vacant(entry) => {
entry.insert(ClientWhitelistEntry {
last_seen: intf::get_timestamp(),
});
}
}
}
pub fn check_client_whitelist(&self, client: key::DHTKey) -> bool {
let mut inner = self.inner.lock();
match inner.client_whitelist.entry(client) {
hashlink::lru_cache::Entry::Occupied(mut entry) => {
entry.get_mut().last_seen = intf::get_timestamp();
true
}
hashlink::lru_cache::Entry::Vacant(_) => false,
}
}
pub fn purge_client_whitelist(&self) {
let timeout_ms = self.config.get().network.client_whitelist_timeout_ms;
let mut inner = self.inner.lock();
let cutoff_timestamp = intf::get_timestamp() - ((timeout_ms as u64) * 1000u64);
// Remove clients from the whitelist that haven't been since since our whitelist timeout
while inner
.client_whitelist
.peek_lru()
.map(|v| v.1.last_seen < cutoff_timestamp)
.unwrap_or_default()
{
inner.client_whitelist.remove_lru();
}
}
pub async fn tick(&self) -> Result<(), String> {
let (routing_table, net, lease_manager, receipt_manager) = {
let (routing_table, net, receipt_manager) = {
let inner = self.inner.lock();
let components = inner.components.as_ref().unwrap();
(
inner.routing_table.as_ref().unwrap().clone(),
components.net.clone(),
components.lease_manager.clone(),
components.receipt_manager.clone(),
)
};
@@ -326,12 +331,12 @@ impl NetworkManager {
// Run the low level network tick
net.tick().await?;
// Run the lease manager tick
lease_manager.tick().await?;
// Run the receipt manager tick
receipt_manager.tick().await?;
// Purge the client whitelist
self.purge_client_whitelist();
Ok(())
}
@@ -344,6 +349,26 @@ impl NetworkManager {
}
}
// Get our node's capabilities
pub fn generate_node_info(&self) -> NodeInfo {
let network_class = self.get_network_class().unwrap_or(NetworkClass::Invalid);
let will_route = network_class.can_relay(); // xxx: eventually this may have more criteria added
let will_tunnel = network_class.can_relay(); // xxx: we may want to restrict by battery life and network bandwidth at some point
let will_signal = network_class.can_signal();
let will_relay = network_class.can_relay();
let will_validate_dial_info = network_class.can_validate_dial_info();
NodeInfo {
network_class,
will_route,
will_tunnel,
will_signal,
will_relay,
will_validate_dial_info,
}
}
// Return what protocols we have enabled
pub fn get_protocol_config(&self) -> Option<ProtocolConfig> {
if let Some(components) = &self.inner.lock().components {
@@ -514,61 +539,19 @@ impl NetworkManager {
return Ok(true);
}
// Decode envelope header
// Decode envelope header (may fail signature validation)
let envelope =
Envelope::from_data(data).map_err(|_| "envelope failed to decode".to_owned())?;
// Get routing table and rpc processor
let (routing_table, lease_manager, rpc) = {
let (routing_table, rpc) = {
let inner = self.inner.lock();
(
inner.routing_table.as_ref().unwrap().clone(),
inner.components.as_ref().unwrap().lease_manager.clone(),
inner.components.as_ref().unwrap().rpc_processor.clone(),
)
};
// Peek at header and see if we need to send this to a relay lease
// If the recipient id is not our node id, then it needs relaying
let sender_id = envelope.get_sender_id();
let recipient_id = envelope.get_recipient_id();
if recipient_id != routing_table.node_id() {
// Ensure a lease exists for this node before we relay it
let relay_nr = if let Some(lease_nr) =
lease_manager.server_has_valid_relay_lease(&recipient_id)
{
// Inbound lease
lease_nr
} else if let Some(lease_nr) = lease_manager.server_has_valid_relay_lease(&sender_id) {
// Resolve the node to send this to
rpc.resolve_node(recipient_id, Some(lease_nr.clone())).await.map_err(|e| {
format!(
"failed to resolve recipient node for relay, dropping outbound relayed packet...: {:?}",
e
)
})?
} else {
return Err("received envelope not intended for this node".to_owned());
};
// Re-send the packet to the leased node
self.net()
.send_data(relay_nr, data.to_vec())
.await
.map_err(|e| format!("failed to forward envelope: {}", e))?;
// Inform caller that we dealt with the envelope, but did not process it locally
return Ok(false);
}
// DH to get decryption key (cached)
let node_id_secret = routing_table.node_id_secret();
// Decrypt the envelope body
// xxx: punish nodes that send messages that fail to decrypt eventually
let body = envelope
.decrypt_body(self.crypto(), data, &node_id_secret)
.map_err(|_| "failed to decrypt envelope body".to_owned())?;
// Get timestamp range
let (tsbehind, tsahead) = {
let c = self.config.get();
@@ -598,6 +581,74 @@ impl NetworkManager {
}
}
// Peek at header and see if we need to relay this
// If the recipient id is not our node id, then it needs relaying
let sender_id = envelope.get_sender_id();
let recipient_id = envelope.get_recipient_id();
if recipient_id != routing_table.node_id() {
// See if the source node is allowed to resolve nodes
// This is a costly operation, so only outbound-relay permitted
// nodes are allowed to do this, for example PWA users
let relay_nr = if self.check_client_whitelist(sender_id) {
// Cache the envelope information in the routing table
// let source_noderef = routing_table
// .register_node_with_existing_connection(envelope.get_sender_id(), descriptor, ts)
// .map_err(|e| format!("node id registration failed: {}", e))?;
// source_noderef.operate(|e| e.set_min_max_version(envelope.get_min_max_version()));
// If the sender is in the client whitelist, allow a full resolve_node,
// which effectively lets the client use our routing table
rpc.resolve_node(recipient_id).await.map_err(|e| {
format!(
"failed to resolve recipient node for relay, dropping outbound relayed packet...: {:?}",
e
)
}).map_err(logthru_net!())?
} else {
// If this is not a node in the client whitelist, only allow inbound relay
// which only performs a lightweight lookup before passing the packet back out
// See if we have the node in our routing table
// We should, because relays are chosen by nodes that have established connectivity and
// should be mutually in each others routing tables. The node needing the relay will be
// pinging this node regularly to keep itself in the routing table
if let Some(nr) = routing_table.lookup_node_ref(recipient_id) {
// ensure we have dial_info for the entry already,
if !nr.operate(|e| e.dial_infos().is_empty()) {
nr
} else {
return Err(format!(
"Inbound relay asked for recipient with no dial info: {}",
recipient_id
));
}
} else {
return Err(format!(
"Inbound relay asked for recipient not in routing table: {}",
recipient_id
));
}
};
// Re-send the packet to the leased node
self.net()
.send_data(relay_nr, data.to_vec())
.await
.map_err(|e| format!("failed to forward envelope: {}", e))?;
// Inform caller that we dealt with the envelope, but did not process it locally
return Ok(false);
}
// DH to get decryption key (cached)
let node_id_secret = routing_table.node_id_secret();
// Decrypt the envelope body
// xxx: punish nodes that send messages that fail to decrypt eventually
let body = envelope
.decrypt_body(self.crypto(), data, &node_id_secret)
.map_err(|_| "failed to decrypt envelope body".to_owned())?;
// Cache the envelope information in the routing table
let source_noderef = routing_table
.register_node_with_existing_connection(envelope.get_sender_id(), descriptor, ts)