move tasks to network manager

This commit is contained in:
John Smith
2022-07-22 13:05:28 -04:00
parent 47fc9ec75c
commit 6f6ec298cf
10 changed files with 761 additions and 687 deletions

View File

@@ -10,6 +10,7 @@ mod connection_limits;
mod connection_manager;
mod connection_table;
mod network_connection;
mod tasks;
pub mod tests;
@@ -22,6 +23,7 @@ use connection_handle::*;
use connection_limits::*;
use connection_manager::*;
use dht::*;
use futures_util::stream::{FuturesUnordered, StreamExt};
use hashlink::LruCache;
use intf::*;
#[cfg(not(target_arch = "wasm32"))]
@@ -42,6 +44,16 @@ pub const IPADDR_MAX_INACTIVE_DURATION_US: u64 = 300_000_000u64; // 5 minutes
pub const GLOBAL_ADDRESS_CHANGE_DETECTION_COUNT: usize = 3;
pub const BOOT_MAGIC: &[u8; 4] = b"BOOT";
pub const BOOTSTRAP_TXT_VERSION: u8 = 0;
#[derive(Clone, Debug)]
pub struct BootstrapRecord {
min_version: u8,
max_version: u8,
dial_info_details: Vec<DialInfoDetail>,
}
pub type BootstrapRecordMap = BTreeMap<DHTKey, BootstrapRecord>;
#[derive(Copy, Clone, Debug, Default)]
pub struct ProtocolConfig {
pub outbound: ProtocolSet,
@@ -129,6 +141,10 @@ struct NetworkManagerUnlockedInner {
// Background processes
rolling_transfers_task: TickTask<EyreReport>,
relay_management_task: TickTask<EyreReport>,
bootstrap_task: TickTask<EyreReport>,
peer_minimum_refresh_task: TickTask<EyreReport>,
ping_validator_task: TickTask<EyreReport>,
node_info_update_single_future: MustJoinSingleFuture<()>,
}
#[derive(Clone)]
@@ -152,11 +168,15 @@ impl NetworkManager {
public_address_check_cache: LruCache::new(8),
}
}
fn new_unlocked_inner(_config: VeilidConfig) -> NetworkManagerUnlockedInner {
//let c = config.get();
fn new_unlocked_inner(config: VeilidConfig) -> NetworkManagerUnlockedInner {
let c = config.get();
NetworkManagerUnlockedInner {
rolling_transfers_task: TickTask::new(ROLLING_TRANSFERS_INTERVAL_SECS),
relay_management_task: TickTask::new(RELAY_MANAGEMENT_INTERVAL_SECS),
bootstrap_task: TickTask::new(1),
peer_minimum_refresh_task: TickTask::new_ms(c.network.dht.min_peer_refresh_time_ms),
ping_validator_task: TickTask::new(1),
node_info_update_single_future: MustJoinSingleFuture::new(),
}
}
@@ -186,6 +206,31 @@ impl NetworkManager {
Box::pin(this2.clone().relay_management_task_routine(s, l, t))
});
}
// Set bootstrap tick task
{
let this2 = this.clone();
this.unlocked_inner
.bootstrap_task
.set_routine(move |s, _l, _t| Box::pin(this2.clone().bootstrap_task_routine(s)));
}
// Set peer minimum refresh tick task
{
let this2 = this.clone();
this.unlocked_inner
.peer_minimum_refresh_task
.set_routine(move |s, _l, _t| {
Box::pin(this2.clone().peer_minimum_refresh_task_routine(s))
});
}
// Set ping validator tick task
{
let this2 = this.clone();
this.unlocked_inner
.ping_validator_task
.set_routine(move |s, l, t| {
Box::pin(this2.clone().ping_validator_task_routine(s, l, t))
});
}
this
}
pub fn config(&self) -> VeilidConfig {
@@ -298,6 +343,10 @@ impl NetworkManager {
return Err(e);
}
// Inform routing table entries that our dial info has changed
self.send_node_info_updates(true).await;
// Inform api clients that things have changed
self.send_network_update();
Ok(())
@@ -312,10 +361,32 @@ impl NetworkManager {
if let Err(e) = self.unlocked_inner.rolling_transfers_task.stop().await {
warn!("rolling_transfers_task not stopped: {}", e);
}
debug!("stopping relay management task task");
debug!("stopping relay management task");
if let Err(e) = self.unlocked_inner.relay_management_task.stop().await {
warn!("relay_management_task not stopped: {}", e);
}
debug!("stopping bootstrap task");
if let Err(e) = self.unlocked_inner.bootstrap_task.stop().await {
error!("bootstrap_task not stopped: {}", e);
}
debug!("stopping peer minimum refresh task");
if let Err(e) = self.unlocked_inner.peer_minimum_refresh_task.stop().await {
error!("peer_minimum_refresh_task not stopped: {}", e);
}
debug!("stopping ping_validator task");
if let Err(e) = self.unlocked_inner.ping_validator_task.stop().await {
error!("ping_validator_task not stopped: {}", e);
}
debug!("stopping node info update singlefuture");
if self
.unlocked_inner
.node_info_update_single_future
.join()
.await
.is_err()
{
error!("node_info_update_single_future not stopped");
}
// Shutdown network components if they started up
debug!("shutting down network components");
@@ -386,13 +457,9 @@ impl NetworkManager {
}
}
#[instrument(level = "debug", skip_all, err)]
async fn restart_net(&self, net: Network) -> EyreResult<()> {
net.shutdown().await;
self.send_network_update();
net.startup().await?;
self.send_network_update();
Ok(())
pub fn needs_restart(&self) -> bool {
let net = self.net();
net.needs_restart()
}
pub async fn tick(&self) -> EyreResult<()> {
@@ -406,18 +473,30 @@ impl NetworkManager {
)
};
// If the network needs to be reset, do it
// if things can't restart, then we fail out of the attachment manager
if net.needs_restart() {
self.restart_net(net.clone()).await?;
}
// Run the rolling transfers task
self.unlocked_inner.rolling_transfers_task.tick().await?;
// Run the relay management task
self.unlocked_inner.relay_management_task.tick().await?;
// If routing table has no live entries, then add the bootstrap nodes to it
let live_entry_count = routing_table.get_entry_count(BucketEntryState::Unreliable);
if live_entry_count == 0 {
self.unlocked_inner.bootstrap_task.tick().await?;
}
// If we still don't have enough peers, find nodes until we do
let min_peer_count = {
let c = self.config.get();
c.network.dht.min_peer_count as usize
};
if live_entry_count < min_peer_count {
self.unlocked_inner.peer_minimum_refresh_task.tick().await?;
}
// Ping validate some nodes to groom the table
self.unlocked_inner.ping_validator_task.tick().await?;
// Run the routing table tick
routing_table.tick().await?;
@@ -1313,135 +1392,6 @@ impl NetworkManager {
Ok(true)
}
// Keep relays assigned and accessible
#[instrument(level = "trace", skip(self), err)]
async fn relay_management_task_routine(
self,
stop_token: StopToken,
_last_ts: u64,
cur_ts: u64,
) -> EyreResult<()> {
// Get our node's current node info and network class and do the right thing
let routing_table = self.routing_table();
let node_info = routing_table.get_own_node_info();
let network_class = self.get_network_class();
let mut node_info_changed = false;
// Do we know our network class yet?
if let Some(network_class) = network_class {
// If we already have a relay, see if it is dead, or if we don't need it any more
let has_relay = {
let mut inner = self.inner.lock();
if let Some(relay_node) = inner.relay_node.clone() {
let state = relay_node.operate(|e| e.state(cur_ts));
// Relay node is dead or no longer needed
if matches!(state, BucketEntryState::Dead) {
info!("Relay node died, dropping relay {}", relay_node);
inner.relay_node = None;
node_info_changed = true;
false
} else if !node_info.requires_relay() {
info!(
"Relay node no longer required, dropping relay {}",
relay_node
);
inner.relay_node = None;
node_info_changed = true;
false
} else {
true
}
} else {
false
}
};
// Do we need a relay?
if !has_relay && node_info.requires_relay() {
// Do we need an outbound relay?
if network_class.outbound_wants_relay() {
// The outbound relay is the host of the PWA
if let Some(outbound_relay_peerinfo) = intf::get_outbound_relay_peer().await {
let mut inner = self.inner.lock();
// Register new outbound relay
if let Some(nr) = routing_table.register_node_with_signed_node_info(
outbound_relay_peerinfo.node_id.key,
outbound_relay_peerinfo.signed_node_info,
) {
info!("Outbound relay node selected: {}", nr);
inner.relay_node = Some(nr);
node_info_changed = true;
}
}
// Otherwise we must need an inbound relay
} else {
// Find a node in our routing table that is an acceptable inbound relay
if let Some(nr) = routing_table.find_inbound_relay(cur_ts) {
let mut inner = self.inner.lock();
info!("Inbound relay node selected: {}", nr);
inner.relay_node = Some(nr);
node_info_changed = true;
}
}
}
}
// Re-send our node info if we selected a relay
if node_info_changed {
self.routing_table().send_node_info_updates(true).await;
}
Ok(())
}
// Compute transfer statistics for the low level network
#[instrument(level = "trace", skip(self), err)]
async fn rolling_transfers_task_routine(
self,
stop_token: StopToken,
last_ts: u64,
cur_ts: u64,
) -> EyreResult<()> {
// log_net!("--- network manager rolling_transfers task");
{
let inner = &mut *self.inner.lock();
// Roll the low level network transfer stats for our address
inner
.stats
.self_stats
.transfer_stats_accounting
.roll_transfers(last_ts, cur_ts, &mut inner.stats.self_stats.transfer_stats);
// Roll all per-address transfers
let mut dead_addrs: HashSet<PerAddressStatsKey> = HashSet::new();
for (addr, stats) in &mut inner.stats.per_address_stats {
stats.transfer_stats_accounting.roll_transfers(
last_ts,
cur_ts,
&mut stats.transfer_stats,
);
// While we're here, lets see if this address has timed out
if cur_ts - stats.last_seen_ts >= IPADDR_MAX_INACTIVE_DURATION_US {
// it's dead, put it in the dead list
dead_addrs.insert(*addr);
}
}
// Remove the dead addresses from our tables
for da in &dead_addrs {
inner.stats.per_address_stats.remove(da);
}
}
// Send update
self.send_network_update();
Ok(())
}
// Callbacks from low level network for statistics gathering
pub fn stats_packet_sent(&self, addr: IpAddr, bytes: u64) {
let inner = &mut *self.inner.lock();
@@ -1612,4 +1562,58 @@ impl NetworkManager {
net.reset_network_class();
}
}
// Inform routing table entries that our dial info has changed
pub async fn send_node_info_updates(&self, all: bool) {
let this = self.clone();
// Run in background only once
let _ = self
.clone()
.unlocked_inner
.node_info_update_single_future
.single_spawn(async move {
// Only update if we actually have a valid network class
if matches!(
this.get_network_class().unwrap_or(NetworkClass::Invalid),
NetworkClass::Invalid
) {
trace!(
"not sending node info update because our network class is not yet valid"
);
return;
}
// Get the list of refs to all nodes to update
let cur_ts = intf::get_timestamp();
let node_refs = this.routing_table().get_nodes_needing_updates(cur_ts, all);
// Send the updates
log_net!(debug "Sending node info updates to {} nodes", node_refs.len());
let mut unord = FuturesUnordered::new();
for nr in node_refs {
let rpc = this.rpc_processor();
unord.push(async move {
// Update the node
if let Err(e) = rpc
.rpc_call_node_info_update(Destination::Direct(nr.clone()), None)
.await
{
// Not fatal, but we should be able to see if this is happening
trace!("failed to send node info update to {:?}: {}", nr, e);
return;
}
// Mark the node as updated
nr.set_seen_our_node_info();
});
}
// Wait for futures to complete
while unord.next().await.is_some() {}
log_rtab!(debug "Finished sending node updates");
})
.await;
}
}