From 53ae04aff99d63bb24593d443d404e6389469820 Mon Sep 17 00:00:00 2001 From: John Smith Date: Sat, 20 Aug 2022 17:08:48 -0400 Subject: [PATCH] commit with debug code --- veilid-core/src/network_manager/mod.rs | 149 +++++++++++--------- veilid-core/src/rpc_processor/mod.rs | 6 +- veilid-core/src/rpc_processor/rpc_status.rs | 3 + 3 files changed, 93 insertions(+), 65 deletions(-) diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index 3ec829e0..fddc8324 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -125,6 +125,7 @@ pub(crate) enum ContactMethod { pub enum SendDataKind { Direct(ConnectionDescriptor), Indirect, + Existing(ConnectionDescriptor), } #[derive(Copy, Clone, Debug, PartialEq, Eq, Ord, PartialOrd, Hash)] @@ -1260,7 +1261,7 @@ impl NetworkManager { // Update timestamp for this last connection since we just sent to it node_ref.set_last_connection(connection_descriptor, intf::get_timestamp()); - return Ok(NetworkResult::value(SendDataKind::Direct( + return Ok(NetworkResult::value(SendDataKind::Existing( connection_descriptor, ))); } @@ -1411,7 +1412,7 @@ impl NetworkManager { // Ensure we can read the magic number if data.len() < 4 { - log_net!(debug "short packet".green()); + log_net!(debug "short packet"); return Ok(false); } @@ -1428,7 +1429,13 @@ impl NetworkManager { } // Decode envelope header (may fail signature validation) - let envelope = Envelope::from_signed_data(data).wrap_err("envelope failed to decode")?; + let envelope = match Envelope::from_signed_data(data) { + Ok(v) => v, + Err(e) => { + log_net!(debug "envelope failed to decode: {}", e); + return Ok(false); + } + }; // Get routing table and rpc processor let (routing_table, rpc) = { @@ -1453,18 +1460,20 @@ impl NetworkManager { let ets = envelope.get_timestamp(); if let Some(tsbehind) = tsbehind { if tsbehind > 0 && (ts > ets && ts - ets > tsbehind) { - bail!( + log_net!(debug "envelope time was too far in the past: {}ms ", timestamp_to_secs(ts - ets) * 1000f64 ); + return Ok(false); } } if let Some(tsahead) = tsahead { if tsahead > 0 && (ts < ets && ets - ts > tsahead) { - bail!( + log_net!(debug "envelope time was too far in the future: {}ms", timestamp_to_secs(ets - ts) * 1000f64 ); + return Ok(false); } } @@ -1633,6 +1642,9 @@ impl NetworkManager { connection_descriptor: ConnectionDescriptor, // the connection descriptor used reporting_peer: NodeRef, // the peer's noderef reporting the socket address ) { + // xxx debug code + info!("report_global_socket_address\nsocket_address: {:#?}\nconnection_descriptor: {:#?}\nreporting_peer: {:#?}", socket_address, connection_descriptor, reporting_peer); + let key = PublicAddressCheckCacheKey( connection_descriptor.protocol_type(), connection_descriptor.address_type(), @@ -1659,71 +1671,80 @@ impl NetworkManager { let network_class = net.get_network_class().unwrap_or(NetworkClass::Invalid); // Determine if our external address has likely changed - let needs_public_address_detection = - if matches!(network_class, NetworkClass::InboundCapable) { - // Get the dial info filter for this connection so we can check if we have any public dialinfo that may have changed - let dial_info_filter = connection_descriptor.make_dial_info_filter(); + let needs_public_address_detection = if matches!( + network_class, + NetworkClass::InboundCapable + ) { + // Get the dial info filter for this connection so we can check if we have any public dialinfo that may have changed + let dial_info_filter = connection_descriptor.make_dial_info_filter(); - // Get current external ip/port from registered global dialinfo - let current_addresses: BTreeSet = routing_table - .all_filtered_dial_info_details( - Some(RoutingDomain::PublicInternet), - &dial_info_filter, - ) - .iter() - .map(|did| did.dial_info.socket_address()) - .collect(); + // Get current external ip/port from registered global dialinfo + let current_addresses: BTreeSet = routing_table + .all_filtered_dial_info_details( + Some(RoutingDomain::PublicInternet), + &dial_info_filter, + ) + .iter() + .map(|did| did.dial_info.socket_address()) + .collect(); - // If we are inbound capable, but start to see inconsistent socket addresses from multiple reporting peers - // then we zap the network class and re-detect it - let mut inner = self.inner.lock(); - let mut inconsistencies = 0; - let mut changed = false; - // Iteration goes from most recent to least recent node/address pair - let pacc = inner - .public_address_check_cache - .entry(key) - .or_insert_with(|| LruCache::new(8)); - for (_, a) in pacc { - if !current_addresses.contains(a) { - inconsistencies += 1; - if inconsistencies >= GLOBAL_ADDRESS_CHANGE_DETECTION_COUNT { - changed = true; + // If we are inbound capable, but start to see inconsistent socket addresses from multiple reporting peers + // then we zap the network class and re-detect it + let mut inner = self.inner.lock(); + let mut inconsistencies = 0; + let mut changed = false; + // Iteration goes from most recent to least recent node/address pair + let pacc = inner + .public_address_check_cache + .entry(key) + .or_insert_with(|| LruCache::new(8)); + for (_, a) in pacc { + if !current_addresses.contains(a) { + inconsistencies += 1; + if inconsistencies >= GLOBAL_ADDRESS_CHANGE_DETECTION_COUNT { + changed = true; + break; + } + } + } + + // xxx debug code + if changed { + info!("XXX\npublic_address_check_cache: {:#?}\ncurrent_addresses: {:#?}\ninconsistencies: {}", inner + .public_address_check_cache, current_addresses, inconsistencies); + } + + changed + } else { + // If we are currently outbound only, we don't have any public dial info + // but if we are starting to see consistent socket address from multiple reporting peers + // then we may be become inbound capable, so zap the network class so we can re-detect it and any public dial info + + let mut inner = self.inner.lock(); + let mut consistencies = 0; + let mut consistent = false; + let mut current_address = Option::::None; + // Iteration goes from most recent to least recent node/address pair + let pacc = inner + .public_address_check_cache + .entry(key) + .or_insert_with(|| LruCache::new(8)); + + for (_, a) in pacc { + if let Some(current_address) = current_address { + if current_address == *a { + consistencies += 1; + if consistencies >= GLOBAL_ADDRESS_CHANGE_DETECTION_COUNT { + consistent = true; break; } } + } else { + current_address = Some(*a); } - changed - } else { - // If we are currently outbound only, we don't have any public dial info - // but if we are starting to see consistent socket address from multiple reporting peers - // then we may be become inbound capable, so zap the network class so we can re-detect it and any public dial info - - let mut inner = self.inner.lock(); - let mut consistencies = 0; - let mut consistent = false; - let mut current_address = Option::::None; - // Iteration goes from most recent to least recent node/address pair - let pacc = inner - .public_address_check_cache - .entry(key) - .or_insert_with(|| LruCache::new(8)); - - for (_, a) in pacc { - if let Some(current_address) = current_address { - if current_address == *a { - consistencies += 1; - if consistencies >= GLOBAL_ADDRESS_CHANGE_DETECTION_COUNT { - consistent = true; - break; - } - } - } else { - current_address = Some(*a); - } - } - consistent - }; + } + consistent + }; if needs_public_address_detection { if detect_address_changes { diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index ea330539..a29a88ce 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -88,6 +88,7 @@ struct RPCMessageEncoded { data: RPCMessageData, } +#[derive(Debug)] pub(crate) struct RPCMessage { header: RPCMessageHeader, operation: RPCOperation, @@ -336,7 +337,10 @@ impl RPCProcessor { inner .waiting_rpc_table .remove(&op_id) - .ok_or_else(RPCError::else_internal("Unmatched operation id"))? + .ok_or_else(RPCError::else_internal(format!( + "Unmatched operation id: {:#?}", + msg + )))? }; eventual.resolve((Span::current().id(), msg)).await; Ok(()) diff --git a/veilid-core/src/rpc_processor/rpc_status.rs b/veilid-core/src/rpc_processor/rpc_status.rs index 47ca02ec..65e8f6bd 100644 --- a/veilid-core/src/rpc_processor/rpc_status.rs +++ b/veilid-core/src/rpc_processor/rpc_status.rs @@ -70,6 +70,9 @@ impl RPCProcessor { SendDataKind::Indirect => { // Do nothing in this case, as the socket address returned here would be for any node other than ours } + SendDataKind::Existing(_) => { + // Do nothing in this case, as an existing connection could not have a different public address or it would have been reset + } } }