This commit is contained in:
John Smith
2022-11-26 21:37:23 -05:00
parent 5df46aecae
commit b1bdf76ae8
80 changed files with 865 additions and 700 deletions
@@ -78,7 +78,7 @@ impl ConnectionLimits {
pub fn add(&mut self, addr: IpAddr) -> Result<(), AddressFilterError> {
let ipblock = ip_to_ipblock(self.max_connections_per_ip6_prefix_size, addr);
let ts = intf::get_timestamp();
let ts = get_timestamp();
self.purge_old_timestamps(ts);
@@ -134,7 +134,7 @@ impl ConnectionLimits {
pub fn remove(&mut self, addr: IpAddr) -> Result<(), AddressNotInTableError> {
let ipblock = ip_to_ipblock(self.max_connections_per_ip6_prefix_size, addr);
let ts = intf::get_timestamp();
let ts = get_timestamp();
self.purge_old_timestamps(ts);
match ipblock {
@@ -319,7 +319,7 @@ impl ConnectionManager {
};
log_net!(debug "get_or_create_connection retries left: {}", retry_count);
retry_count -= 1;
intf::sleep(500).await;
sleep(500).await;
});
// Add to the connection table
+16 -17
View File
@@ -1,5 +1,5 @@
use crate::*;
use crate::xx::*;
use crate::*;
#[cfg(not(target_arch = "wasm32"))]
mod native;
@@ -403,11 +403,11 @@ impl NetworkManager {
let mut inner = self.inner.lock();
match inner.client_whitelist.entry(client) {
hashlink::lru_cache::Entry::Occupied(mut entry) => {
entry.get_mut().last_seen_ts = intf::get_timestamp()
entry.get_mut().last_seen_ts = get_timestamp()
}
hashlink::lru_cache::Entry::Vacant(entry) => {
entry.insert(ClientWhitelistEntry {
last_seen_ts: intf::get_timestamp(),
last_seen_ts: get_timestamp(),
});
}
}
@@ -419,7 +419,7 @@ impl NetworkManager {
match inner.client_whitelist.entry(client) {
hashlink::lru_cache::Entry::Occupied(mut entry) => {
entry.get_mut().last_seen_ts = intf::get_timestamp();
entry.get_mut().last_seen_ts = get_timestamp();
true
}
hashlink::lru_cache::Entry::Vacant(_) => false,
@@ -429,7 +429,7 @@ impl NetworkManager {
pub fn purge_client_whitelist(&self) {
let timeout_ms = self.with_config(|c| c.network.client_whitelist_timeout_ms);
let mut inner = self.inner.lock();
let cutoff_timestamp = intf::get_timestamp() - ((timeout_ms as u64) * 1000u64);
let cutoff_timestamp = get_timestamp() - ((timeout_ms as u64) * 1000u64);
// Remove clients from the whitelist that haven't been since since our whitelist timeout
while inner
.client_whitelist
@@ -516,7 +516,7 @@ impl NetworkManager {
.wrap_err("failed to generate signed receipt")?;
// Record the receipt for later
let exp_ts = intf::get_timestamp() + expiration_us;
let exp_ts = get_timestamp() + expiration_us;
receipt_manager.record_receipt(receipt, exp_ts, expected_returns, callback);
Ok(out)
@@ -540,7 +540,7 @@ impl NetworkManager {
.wrap_err("failed to generate signed receipt")?;
// Record the receipt for later
let exp_ts = intf::get_timestamp() + expiration_us;
let exp_ts = get_timestamp() + expiration_us;
let eventual = SingleShotEventual::new(Some(ReceiptEvent::Cancelled));
let instance = eventual.instance();
receipt_manager.record_single_shot_receipt(receipt, exp_ts, eventual);
@@ -707,7 +707,7 @@ impl NetworkManager {
// XXX: do we need a delay here? or another hole punch packet?
// Set the hole punch as our 'last connection' to ensure we return the receipt over the direct hole punch
peer_nr.set_last_connection(connection_descriptor, intf::get_timestamp());
peer_nr.set_last_connection(connection_descriptor, get_timestamp());
// Return the receipt using the same dial info send the receipt to it
rpc.rpc_call_return_receipt(Destination::direct(peer_nr), receipt)
@@ -731,7 +731,7 @@ impl NetworkManager {
let node_id_secret = routing_table.node_id_secret();
// Get timestamp, nonce
let ts = intf::get_timestamp();
let ts = get_timestamp();
let nonce = Crypto::get_random_nonce();
// Encode envelope
@@ -1116,8 +1116,7 @@ impl NetworkManager {
// );
// Update timestamp for this last connection since we just sent to it
node_ref
.set_last_connection(connection_descriptor, intf::get_timestamp());
node_ref.set_last_connection(connection_descriptor, get_timestamp());
return Ok(NetworkResult::value(SendDataKind::Existing(
connection_descriptor,
@@ -1149,7 +1148,7 @@ impl NetworkManager {
this.net().send_data_to_dial_info(dial_info, data).await?
);
// If we connected to this node directly, save off the last connection so we can use it again
node_ref.set_last_connection(connection_descriptor, intf::get_timestamp());
node_ref.set_last_connection(connection_descriptor, get_timestamp());
Ok(NetworkResult::value(SendDataKind::Direct(
connection_descriptor,
@@ -1324,7 +1323,7 @@ impl NetworkManager {
});
// Validate timestamp isn't too old
let ts = intf::get_timestamp();
let ts = get_timestamp();
let ets = envelope.get_timestamp();
if let Some(tsbehind) = tsbehind {
if tsbehind > 0 && (ts > ets && ts - ets > tsbehind) {
@@ -1631,7 +1630,7 @@ impl NetworkManager {
// public dialinfo
let inconsistent = if inconsistencies.len() >= PUBLIC_ADDRESS_CHANGE_DETECTION_COUNT
{
let exp_ts = intf::get_timestamp() + PUBLIC_ADDRESS_INCONSISTENCY_TIMEOUT_US;
let exp_ts = get_timestamp() + PUBLIC_ADDRESS_INCONSISTENCY_TIMEOUT_US;
for i in &inconsistencies {
pait.insert(*i, exp_ts);
}
@@ -1644,8 +1643,8 @@ impl NetworkManager {
.public_address_inconsistencies_table
.entry(key)
.or_insert_with(|| HashMap::new());
let exp_ts = intf::get_timestamp()
+ PUBLIC_ADDRESS_INCONSISTENCY_PUNISHMENT_TIMEOUT_US;
let exp_ts =
get_timestamp() + PUBLIC_ADDRESS_INCONSISTENCY_PUNISHMENT_TIMEOUT_US;
for i in inconsistencies {
pait.insert(i, exp_ts);
}
@@ -1733,7 +1732,7 @@ impl NetworkManager {
}
// Get the list of refs to all nodes to update
let cur_ts = intf::get_timestamp();
let cur_ts = get_timestamp();
let node_refs =
this.routing_table()
.get_nodes_needing_updates(routing_domain, cur_ts, all);
@@ -176,7 +176,7 @@ impl IGDManager {
mapped_port: u16,
) -> Option<()> {
let this = self.clone();
intf::blocking_wrapper(move || {
blocking_wrapper(move || {
let mut inner = this.inner.lock();
// If we already have this port mapped, just return the existing portmap
@@ -215,7 +215,7 @@ impl IGDManager {
expected_external_address: Option<IpAddr>,
) -> Option<SocketAddr> {
let this = self.clone();
intf::blocking_wrapper(move || {
blocking_wrapper(move || {
let mut inner = this.inner.lock();
// If we already have this port mapped, just return the existing portmap
@@ -275,7 +275,7 @@ impl IGDManager {
};
// Add to mapping list to keep alive
let timestamp = intf::get_timestamp();
let timestamp = get_timestamp();
inner.port_maps.insert(PortMapKey {
llpt,
at,
@@ -301,7 +301,7 @@ impl IGDManager {
let mut renews: Vec<(PortMapKey, PortMapValue)> = Vec::new();
{
let inner = self.inner.lock();
let now = intf::get_timestamp();
let now = get_timestamp();
for (k, v) in &inner.port_maps {
let mapping_lifetime = now.saturating_sub(v.timestamp);
@@ -323,7 +323,7 @@ impl IGDManager {
}
let this = self.clone();
intf::blocking_wrapper(move || {
blocking_wrapper(move || {
let mut inner = this.inner.lock();
// Process full renewals
@@ -356,7 +356,7 @@ impl IGDManager {
inner.port_maps.insert(k, PortMapValue {
ext_ip: v.ext_ip,
mapped_port,
timestamp: intf::get_timestamp(),
timestamp: get_timestamp(),
renewal_lifetime: (UPNP_MAPPING_LIFETIME_MS / 2) as u64 * 1000u64,
renewal_attempts: 0,
});
@@ -397,7 +397,7 @@ impl IGDManager {
inner.port_maps.insert(k, PortMapValue {
ext_ip: v.ext_ip,
mapped_port: v.mapped_port,
timestamp: intf::get_timestamp(),
timestamp: get_timestamp(),
renewal_lifetime: (UPNP_MAPPING_LIFETIME_MS / 2) as u64 * 1000u64,
renewal_attempts: 0,
});
+2 -39
View File
@@ -1,5 +1,4 @@
mod igd_manager;
mod natpmp_manager;
mod network_class_discovery;
mod network_tcp;
mod network_udp;
@@ -94,11 +93,9 @@ struct NetworkUnlockedInner {
update_network_class_task: TickTask<EyreReport>,
network_interfaces_task: TickTask<EyreReport>,
upnp_task: TickTask<EyreReport>,
natpmp_task: TickTask<EyreReport>,
// Managers
igd_manager: igd_manager::IGDManager,
natpmp_manager: natpmp_manager::NATPMPManager,
}
#[derive(Clone)]
@@ -150,9 +147,7 @@ impl Network {
update_network_class_task: TickTask::new(1),
network_interfaces_task: TickTask::new(5),
upnp_task: TickTask::new(1),
natpmp_task: TickTask::new(1),
igd_manager: igd_manager::IGDManager::new(config.clone()),
natpmp_manager: natpmp_manager::NATPMPManager::new(config),
}
}
@@ -196,13 +191,6 @@ impl Network {
.upnp_task
.set_routine(move |s, l, t| Box::pin(this2.clone().upnp_task_routine(s, l, t)));
}
// Set natpmp tick task
{
let this2 = this.clone();
this.unlocked_inner
.natpmp_task
.set_routine(move |s, l, t| Box::pin(this2.clone().natpmp_task_routine(s, l, t)));
}
this
}
@@ -904,31 +892,11 @@ impl Network {
Ok(())
}
#[instrument(level = "trace", skip(self), err)]
pub async fn natpmp_task_routine(
self,
stop_token: StopToken,
_l: u64,
_t: u64,
) -> EyreResult<()> {
if !self.unlocked_inner.natpmp_manager.tick().await? {
info!("natpmp failed, restarting local network");
let mut inner = self.inner.lock();
inner.network_needs_restart = true;
}
Ok(())
}
pub async fn tick(&self) -> EyreResult<()> {
let (detect_address_changes, upnp, natpmp) = {
let (detect_address_changes, upnp) = {
let config = self.network_manager().config();
let c = config.get();
(
c.network.detect_address_changes,
c.network.upnp,
c.network.natpmp,
)
(c.network.detect_address_changes, c.network.upnp)
};
// If we need to figure out our network class, tick the task for it
@@ -962,11 +930,6 @@ impl Network {
self.unlocked_inner.upnp_task.tick().await?;
}
// If we need to tick natpmp, do it
if natpmp && !self.needs_restart() {
self.unlocked_inner.natpmp_task.tick().await?;
}
Ok(())
}
}
@@ -1,18 +0,0 @@
use super::*;
pub struct NATPMPManager {
config: VeilidConfig,
}
impl NATPMPManager {
//
pub fn new(config: VeilidConfig) -> Self {
Self { config }
}
pub async fn tick(&self) -> EyreResult<bool> {
// xxx
Ok(true)
}
}
@@ -275,7 +275,7 @@ impl DiscoveryContext {
LowLevelProtocolType::UDP => "udp",
LowLevelProtocolType::TCP => "tcp",
});
intf::sleep(PORT_MAP_VALIDATE_DELAY_MS).await
sleep(PORT_MAP_VALIDATE_DELAY_MS).await
} else {
break;
}
@@ -304,9 +304,9 @@ impl DiscoveryContext {
#[instrument(level = "trace", skip(self), ret)]
async fn try_port_mapping(&self) -> Option<DialInfo> {
let (enable_upnp, _enable_natpmp) = {
let enable_upnp = {
let c = self.net.config.get();
(c.network.upnp, c.network.natpmp)
c.network.upnp
};
if enable_upnp {
@@ -58,7 +58,7 @@ impl Network {
// Don't waste more than N seconds getting it though, in case someone
// is trying to DoS us with a bunch of connections or something
// read a chunk of the stream
intf::timeout(
timeout(
tls_connection_initial_timeout_ms,
ps.peek_exact(&mut first_packet),
)
@@ -10,7 +10,7 @@ impl Network {
c.network.protocol.udp.socket_pool_size
};
if task_count == 0 {
task_count = intf::get_concurrency() / 2;
task_count = get_concurrency() / 2;
if task_count == 0 {
task_count = 1;
}
@@ -196,7 +196,7 @@ pub async fn nonblocking_connect(
let async_stream = Async::new(std::net::TcpStream::from(socket))?;
// The stream becomes writable when connected
timeout_or_try!(intf::timeout(timeout_ms, async_stream.writable())
timeout_or_try!(timeout(timeout_ms, async_stream.writable())
.await
.into_timeout_or()
.into_result()?);
@@ -99,13 +99,13 @@ pub struct NetworkConnection {
impl NetworkConnection {
pub(super) fn dummy(id: NetworkConnectionId, descriptor: ConnectionDescriptor) -> Self {
// Create handle for sending (dummy is immediately disconnected)
let (sender, _receiver) = flume::bounded(intf::get_concurrency() as usize);
let (sender, _receiver) = flume::bounded(get_concurrency() as usize);
Self {
connection_id: id,
descriptor,
processor: None,
established_time: intf::get_timestamp(),
established_time: get_timestamp(),
stats: Arc::new(Mutex::new(NetworkConnectionStats {
last_message_sent_time: None,
last_message_recv_time: None,
@@ -125,7 +125,7 @@ impl NetworkConnection {
let descriptor = protocol_connection.descriptor();
// Create handle for sending
let (sender, receiver) = flume::bounded(intf::get_concurrency() as usize);
let (sender, receiver) = flume::bounded(get_concurrency() as usize);
// Create stats
let stats = Arc::new(Mutex::new(NetworkConnectionStats {
@@ -137,7 +137,7 @@ impl NetworkConnection {
let local_stop_token = stop_source.token();
// Spawn connection processor and pass in protocol connection
let processor = intf::spawn(Self::process_connection(
let processor = spawn(Self::process_connection(
connection_manager,
local_stop_token,
manager_stop_token,
@@ -153,7 +153,7 @@ impl NetworkConnection {
connection_id,
descriptor,
processor: Some(processor),
established_time: intf::get_timestamp(),
established_time: get_timestamp(),
stats,
sender,
stop_source: Some(stop_source),
@@ -185,7 +185,7 @@ impl NetworkConnection {
stats: Arc<Mutex<NetworkConnectionStats>>,
message: Vec<u8>,
) -> io::Result<NetworkResult<()>> {
let ts = intf::get_timestamp();
let ts = get_timestamp();
let out = network_result_try!(protocol_connection.send(message).await?);
let mut stats = stats.lock();
@@ -199,7 +199,7 @@ impl NetworkConnection {
protocol_connection: &ProtocolNetworkConnection,
stats: Arc<Mutex<NetworkConnectionStats>>,
) -> io::Result<NetworkResult<Vec<u8>>> {
let ts = intf::get_timestamp();
let ts = get_timestamp();
let out = network_result_try!(protocol_connection.recv().await?);
let mut stats = stats.lock();
@@ -246,7 +246,7 @@ impl NetworkConnection {
// Push mutable timer so we can reset it
// Normally we would use an io::timeout here, but WASM won't support that, so we use a mutable sleep future
let new_timer = || {
intf::sleep(connection_manager.connection_inactivity_timeout_ms()).then(|_| async {
sleep(connection_manager.connection_inactivity_timeout_ms()).then(|_| async {
// timeout
log_net!("== Connection timeout on {:?}", descriptor.green());
RecvLoopAction::Timeout