refactor checkpoint
This commit is contained in:
+254
-122
@@ -5,7 +5,7 @@ use hashlink::LruCache;
|
||||
use intf::*;
|
||||
use receipt_manager::*;
|
||||
use routing_table::*;
|
||||
use rpc_processor::RPCProcessor;
|
||||
use rpc_processor::*;
|
||||
use xx::*;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////////////
|
||||
@@ -69,11 +69,13 @@ struct ClientWhitelistEntry {
|
||||
}
|
||||
|
||||
// Mechanism required to contact another node
|
||||
enum InboundMethod {
|
||||
Direct, // Contact the node directly
|
||||
SignalReverse, // Request via signal the node connect back directly
|
||||
SignalHolePunch, // Request via signal the node negotiate a hole punch
|
||||
Relay, // Must use a third party relay to reach the node
|
||||
enum ContactMethod {
|
||||
Unreachable, // Node is not reachable by any means
|
||||
Direct(DialInfo), // Contact the node directly
|
||||
SignalReverse(NodeRef), // Request via signal the node connect back directly
|
||||
SignalHolePunch(NodeRef), // Request via signal the node negotiate a hole punch
|
||||
InboundRelay(NodeRef), // Must use an inbound relay to reach the node
|
||||
OutboundRelay(NodeRef), // Must use outbound relay to reach the node
|
||||
}
|
||||
|
||||
// The mutable state of the network manager
|
||||
@@ -430,11 +432,40 @@ impl NetworkManager {
|
||||
}
|
||||
|
||||
// Process a received out-of-band receipt
|
||||
pub async fn process_receipt<R: AsRef<[u8]>>(&self, receipt_data: R) -> Result<(), String> {
|
||||
pub async fn process_out_of_band_receipt<R: AsRef<[u8]>>(
|
||||
&self,
|
||||
receipt_data: R,
|
||||
descriptor: ConnectionDescriptor,
|
||||
) -> Result<(), String> {
|
||||
let routing_table = self.routing_table();
|
||||
let receipt_manager = self.receipt_manager();
|
||||
let ts = intf::get_timestamp();
|
||||
|
||||
let receipt = Receipt::from_signed_data(receipt_data.as_ref())
|
||||
.map_err(|_| "failed to parse signed receipt".to_owned())?;
|
||||
receipt_manager.handle_receipt(receipt).await
|
||||
|
||||
// Cache the receipt information in the routing table
|
||||
let source_noderef = routing_table
|
||||
.register_node_with_existing_connection(receipt.get_sender_id(), descriptor, ts)
|
||||
.map_err(|e| format!("node id registration from receipt failed: {}", e))?;
|
||||
|
||||
receipt_manager
|
||||
.handle_receipt(source_noderef, receipt)
|
||||
.await
|
||||
}
|
||||
|
||||
// Process a received in-band receipt
|
||||
pub async fn process_in_band_receipt<R: AsRef<[u8]>>(
|
||||
&self,
|
||||
receipt_data: R,
|
||||
inbound_nr: NodeRef,
|
||||
) -> Result<(), String> {
|
||||
let receipt_manager = self.receipt_manager();
|
||||
|
||||
let receipt = Receipt::from_signed_data(receipt_data.as_ref())
|
||||
.map_err(|_| "failed to parse signed receipt".to_owned())?;
|
||||
|
||||
receipt_manager.handle_receipt(inbound_nr, receipt).await
|
||||
}
|
||||
|
||||
// Builds an envelope for sending over the network
|
||||
@@ -527,124 +558,252 @@ impl NetworkManager {
|
||||
}
|
||||
|
||||
// Figure out how to reach a node
|
||||
// Node info here must be the filtered kind, with only
|
||||
fn get_inbound_method(&self, node_info: &NodeInfo) -> Result<InboundMethod, String> {
|
||||
// Get our network class
|
||||
let network_class = self.get_network_class().unwrap_or(NetworkClass::Invalid);
|
||||
fn get_contact_method(&self, node_ref: &NodeRef) -> Result<ContactMethod, String> {
|
||||
// Get our network class and protocol config
|
||||
let our_network_class = self.get_network_class().unwrap_or(NetworkClass::Invalid);
|
||||
let our_protocol_config = self.get_protocol_config().unwrap();
|
||||
|
||||
// If we don't have a network class yet (no public dial info or haven't finished detection)
|
||||
// then we just need to try to send to the best direct dial info because we won't
|
||||
// know how to use relays effectively yet
|
||||
if matches!(network_class, NetworkClass::Invalid) {
|
||||
return Ok(InboundMethod::Direct);
|
||||
// See if this is a local node reachable directly
|
||||
let local_node_info = node_ref.local_node_info();
|
||||
if let Some(local_direct_dial_info) = local_node_info
|
||||
.first_filtered_dial_info(|di| our_protocol_config.outbound.filter_dial_info(di))
|
||||
{
|
||||
return Ok(ContactMethod::Direct(local_direct_dial_info));
|
||||
}
|
||||
|
||||
// Get the protocol of the best matching direct dial info
|
||||
let protocol_type = node_info.dial_info_list.first().map(|d| d.protocol_type());
|
||||
// Get the best matching direct dial info if we have it
|
||||
let target_node_info = node_ref.node_info();
|
||||
let opt_direct_dial_info = target_node_info
|
||||
.first_filtered_dial_info(|di| our_protocol_config.outbound.filter_dial_info(di));
|
||||
|
||||
// Can the target node do inbound?
|
||||
if node_info.network_class.inbound_capable() {
|
||||
if target_node_info.network_class.inbound_capable() {
|
||||
// Do we need to signal before going inbound?
|
||||
if node_info.network_class.inbound_requires_signal() {
|
||||
// Can we receive a direct reverse connection?
|
||||
if network_class.inbound_capable() && !network_class.inbound_requires_signal() {
|
||||
Ok(InboundMethod::SignalReverse)
|
||||
}
|
||||
// Is this a hole-punch capable protocol?
|
||||
else if protocol_type == Some(ProtocolType::UDP) {
|
||||
Ok(InboundMethod::SignalHolePunch)
|
||||
}
|
||||
// Otherwise we have to relay
|
||||
else {
|
||||
Ok(InboundMethod::Relay)
|
||||
if target_node_info.network_class.inbound_requires_signal() {
|
||||
// Get the target's inbound relay, it must have one or it is not reachable
|
||||
if let Some(target_rpi) = target_node_info.relay_peer_info {
|
||||
// Can we reach the inbound relay?
|
||||
if target_rpi
|
||||
.node_info
|
||||
.first_filtered_dial_info(|di| {
|
||||
our_protocol_config.outbound.filter_dial_info(di)
|
||||
})
|
||||
.is_some()
|
||||
{
|
||||
let target_inbound_relay_nr =
|
||||
self.routing_table().register_node_with_node_info(
|
||||
target_rpi.node_id.key,
|
||||
target_rpi.node_info,
|
||||
)?;
|
||||
|
||||
// Can we receive anything inbound ever?
|
||||
if our_network_class.inbound_capable() {
|
||||
// Can we receive a direct reverse connection?
|
||||
if !our_network_class.inbound_requires_signal() {
|
||||
return Ok(ContactMethod::SignalReverse(target_inbound_relay_nr));
|
||||
}
|
||||
// Can we hole-punch?
|
||||
else if our_protocol_config.inbound.udp
|
||||
&& target_node_info.outbound_protocols.udp
|
||||
{
|
||||
return Ok(ContactMethod::SignalHolePunch(target_inbound_relay_nr));
|
||||
}
|
||||
// Otherwise we have to inbound relay
|
||||
}
|
||||
|
||||
return Ok(ContactMethod::InboundRelay(target_inbound_relay_nr));
|
||||
}
|
||||
}
|
||||
}
|
||||
// Can go direct
|
||||
// Go direct without signaling
|
||||
else {
|
||||
Ok(InboundMethod::Direct)
|
||||
// If we have direct dial info we can use, do it
|
||||
if let Some(ddi) = opt_direct_dial_info {
|
||||
return Ok(ContactMethod::Direct(ddi));
|
||||
}
|
||||
}
|
||||
// If the other node is not inbound capable at all, it requires a relay
|
||||
} else {
|
||||
Ok(InboundMethod::Relay)
|
||||
// If the other node is not inbound capable at all, it is using a full relay
|
||||
if let Some(target_rpi) = target_node_info.relay_peer_info {
|
||||
// Can we reach the full relay?
|
||||
if target_rpi
|
||||
.node_info
|
||||
.first_filtered_dial_info(|di| {
|
||||
our_protocol_config.outbound.filter_dial_info(di)
|
||||
})
|
||||
.is_some()
|
||||
{
|
||||
let target_inbound_relay_nr =
|
||||
self.routing_table().register_node_with_node_info(
|
||||
target_rpi.node_id.key,
|
||||
target_rpi.node_info,
|
||||
)?;
|
||||
return Ok(ContactMethod::InboundRelay(target_inbound_relay_nr));
|
||||
}
|
||||
}
|
||||
}
|
||||
// If we can't reach the node by other means, try our outbound relay if we have one
|
||||
if let Some(relay_node) = self.relay_node() {
|
||||
return Ok(ContactMethod::OutboundRelay(relay_node));
|
||||
}
|
||||
// Otherwise, we can't reach this node
|
||||
Ok(ContactMethod::Unreachable)
|
||||
}
|
||||
|
||||
// Send a reverse connection signal and wait for the return receipt over it
|
||||
// Then send the data across the new connection
|
||||
pub async fn do_reverse_connect(
|
||||
&self,
|
||||
best_node_info: &NodeInfo,
|
||||
relay_nr: NodeRef,
|
||||
target_nr: NodeRef,
|
||||
data: Vec<u8>,
|
||||
) -> Result<(), String> {
|
||||
|
||||
// Get relay to signal from
|
||||
let relay_nr = if let Some(rpi) = best_node_info.relay_peer_info {
|
||||
// Get the noderef for this inbound relay
|
||||
self.routing_table().register_node_with_node_info(rpi.node_id.key, rpi.node_info)?;
|
||||
} else {
|
||||
// If we don't have a relay dial info that matches our protocol configuration
|
||||
// then we can't send to this node!
|
||||
return Err("Can't send to this relay".to_owned())
|
||||
}
|
||||
|
||||
|
||||
// Get the receipt timeout
|
||||
let receipt_time = ms_to_us(self.config.get().network.reverse_connection_receipt_time_ms);
|
||||
|
||||
// Build a return receipt for the signal
|
||||
let (rcpt_data, eventual_value) = self
|
||||
.generate_single_shot_receipt(receipt_time, [])
|
||||
.map_err(map_error_string!())?;
|
||||
let receipt_timeout =
|
||||
ms_to_us(self.config.get().network.reverse_connection_receipt_time_ms);
|
||||
let (receipt, eventual_value) = self
|
||||
.generate_single_shot_receipt(receipt_timeout, [])
|
||||
.map_err(map_to_string)?;
|
||||
|
||||
// Get our peer info
|
||||
let peer_info = self.routing_table().get_own_peer_info();
|
||||
|
||||
// Issue the signal
|
||||
let rpc = self.rpc_processor();
|
||||
rpc.rpc_call_signal(dest, )
|
||||
rpc.rpc_call_signal(
|
||||
Destination::Relay(relay_nr.clone(), target_nr.node_id()),
|
||||
None,
|
||||
SignalInfo::ReverseConnect { receipt, peer_info },
|
||||
)
|
||||
.await
|
||||
.map_err(logthru_net!("failed to send signal to {:?}", relay_nr))
|
||||
.map_err(map_to_string)?;
|
||||
|
||||
// Wait for the return receipt
|
||||
match eventual_value.await {
|
||||
ReceiptEvent::Returned => (),
|
||||
let inbound_nr = match eventual_value.await {
|
||||
ReceiptEvent::Returned(inbound_nr) => inbound_nr,
|
||||
ReceiptEvent::Expired => {
|
||||
return Err("receipt was dropped before expiration".to_owned());
|
||||
return Err(format!(
|
||||
"reverse connect receipt expired from {:?}",
|
||||
target_nr
|
||||
));
|
||||
}
|
||||
ReceiptEvent::Cancelled => {
|
||||
return Err("receipt was dropped before expiration".to_owned());
|
||||
return Err(format!(
|
||||
"reverse connect receipt cancelled from {:?}",
|
||||
target_nr
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
// We expect the inbound noderef to be the same as the target noderef
|
||||
// if they aren't the same, we should error on this and figure out what then hell is up
|
||||
if target_nr != inbound_nr {
|
||||
error!("unexpected noderef mismatch on reverse connect");
|
||||
}
|
||||
|
||||
// And now use the existing connection to send over
|
||||
if let Some(descriptor) = node_ref.last_connection() {
|
||||
if let Some(descriptor) = inbound_nr.last_connection() {
|
||||
match self
|
||||
.net()
|
||||
.send_data_to_existing_connection(descriptor, data)
|
||||
.await
|
||||
.map_err(logthru_net!())?
|
||||
{
|
||||
None => {
|
||||
return Ok(());
|
||||
}
|
||||
Some(d) => d,
|
||||
None => Ok(()),
|
||||
Some(_) => Err("unable to send over reverse connection".to_owned()),
|
||||
}
|
||||
} else {
|
||||
Err("no reverse connection available".to_owned())
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Send a hole punch signal and do a negotiating ping and wait for the return receipt
|
||||
// Then send the data across the new connection
|
||||
pub async fn do_hole_punch(&self, best_node_info: &NodeInfo, data: Vec<u8>) -> Result<(), String> {
|
||||
if let Some(relay_dial_info) = node_info.relay_dial_info_list.first() {
|
||||
self.net()
|
||||
.do_hole_punch(relay_dial_info.clone(), data)
|
||||
.await
|
||||
.map_err(logthru_net!())
|
||||
pub async fn do_hole_punch(
|
||||
&self,
|
||||
relay_nr: NodeRef,
|
||||
target_nr: NodeRef,
|
||||
data: Vec<u8>,
|
||||
) -> Result<(), String> {
|
||||
// Build a return receipt for the signal
|
||||
let receipt_timeout =
|
||||
ms_to_us(self.config.get().network.reverse_connection_receipt_time_ms);
|
||||
let (receipt, eventual_value) = self
|
||||
.generate_single_shot_receipt(receipt_timeout, [])
|
||||
.map_err(map_to_string)?;
|
||||
|
||||
// Get our peer info
|
||||
let peer_info = self.routing_table().get_own_peer_info();
|
||||
|
||||
// Get the udp direct dialinfo for the hole punch
|
||||
let hole_punch_dial_info = if let Some(hpdi) = target_nr
|
||||
.node_info()
|
||||
.first_filtered_dial_info(|di| matches!(di.protocol_type(), ProtocolType::UDP))
|
||||
{
|
||||
hpdi
|
||||
} else {
|
||||
// If we don't have a relay dial info that matches our protocol configuration
|
||||
// then we can't send to this node!
|
||||
Err("Can't send to this node yet".to_owned())
|
||||
return Err("No hole punch capable dialinfo found for node".to_owned());
|
||||
};
|
||||
|
||||
// Do our half of the hole punch by sending an empty packet
|
||||
// Both sides will do this and then the receipt will get sent over the punched hole
|
||||
self.net()
|
||||
.send_data_to_dial_info(hole_punch_dial_info, Vec::new())
|
||||
.await?;
|
||||
|
||||
// Issue the signal
|
||||
let rpc = self.rpc_processor();
|
||||
rpc.rpc_call_signal(
|
||||
Destination::Relay(relay_nr.clone(), target_nr.node_id()),
|
||||
None,
|
||||
SignalInfo::HolePunch { receipt, peer_info },
|
||||
)
|
||||
.await
|
||||
.map_err(logthru_net!("failed to send signal to {:?}", relay_nr))
|
||||
.map_err(map_to_string)?;
|
||||
|
||||
// Wait for the return receipt
|
||||
let inbound_nr = match eventual_value.await {
|
||||
ReceiptEvent::Returned(inbound_nr) => inbound_nr,
|
||||
ReceiptEvent::Expired => {
|
||||
return Err(format!(
|
||||
"reverse connect receipt expired from {:?}",
|
||||
target_nr
|
||||
));
|
||||
}
|
||||
ReceiptEvent::Cancelled => {
|
||||
return Err(format!(
|
||||
"reverse connect receipt cancelled from {:?}",
|
||||
target_nr
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
// We expect the inbound noderef to be the same as the target noderef
|
||||
// if they aren't the same, we should error on this and figure out what then hell is up
|
||||
if target_nr != inbound_nr {
|
||||
error!("unexpected noderef mismatch on reverse connect");
|
||||
}
|
||||
|
||||
// And now use the existing connection to send over
|
||||
if let Some(descriptor) = inbound_nr.last_connection() {
|
||||
match self
|
||||
.net()
|
||||
.send_data_to_existing_connection(descriptor, data)
|
||||
.await
|
||||
.map_err(logthru_net!())?
|
||||
{
|
||||
None => Ok(()),
|
||||
Some(_) => Err("unable to send over reverse connection".to_owned()),
|
||||
}
|
||||
} else {
|
||||
Err("no reverse connection available".to_owned())
|
||||
}
|
||||
}
|
||||
|
||||
// Send raw data to a node
|
||||
//
|
||||
//
|
||||
// We may not have dial info for a node, but have an existing connection for it
|
||||
// because an inbound connection happened first, and no FindNodeQ has happened to that
|
||||
// node yet to discover its dial info. The existing connection should be tried first
|
||||
@@ -652,7 +811,11 @@ impl NetworkManager {
|
||||
//
|
||||
// Sending to a node requires determining a NetworkClass compatible mechanism
|
||||
//
|
||||
pub fn send_data(&self, node_ref: NodeRef, data: Vec<u8>) -> SystemPinBoxFuture<Result<(), String>> {
|
||||
pub fn send_data(
|
||||
&self,
|
||||
node_ref: NodeRef,
|
||||
data: Vec<u8>,
|
||||
) -> SystemPinBoxFuture<Result<(), String>> {
|
||||
let this = self.clone();
|
||||
Box::pin(async move {
|
||||
// First try to send data to the last socket we've seen this peer on
|
||||
@@ -673,53 +836,22 @@ impl NetworkManager {
|
||||
};
|
||||
|
||||
// If we don't have last_connection, try to reach out to the peer via its dial info
|
||||
let best_node_info = match node_ref
|
||||
.best_node_info() {
|
||||
Some(ni) => ni,
|
||||
None => {
|
||||
// If neither this node nor its relays would never ever be
|
||||
// reachable by any of our protocols
|
||||
// then we need to go through the outbound relay
|
||||
if let Some(relay_node) = this.relay_node() {
|
||||
// We have an outbound relay, lets use it
|
||||
return this.send_data(relay_node, data).await;
|
||||
}
|
||||
else {
|
||||
// We have no way to reach the node nor an outbound relay to use
|
||||
return Err("Can't reach this node".to_owned());
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// If we aren't using an outbound relay to reach this node, what inbound method do we use?
|
||||
match this.get_inbound_method(&best_node_info)? {
|
||||
InboundMethod::Direct => {
|
||||
if let Some(dial_info) = best_node_info.dial_info_list.first() {
|
||||
this.net()
|
||||
.send_data_to_dial_info(dial_info.clone(), data)
|
||||
.await
|
||||
.map_err(logthru_net!())
|
||||
} else {
|
||||
// If we don't have a direct dial info that matches our protocol configuration
|
||||
// then we can't send to this node!
|
||||
Err("Can't send to this node yet".to_owned())
|
||||
}
|
||||
match this.get_contact_method(&node_ref).map_err(logthru_net!())? {
|
||||
ContactMethod::OutboundRelay(relay_nr) | ContactMethod::InboundRelay(relay_nr) => {
|
||||
this.send_data(relay_nr, data).await
|
||||
}
|
||||
InboundMethod::SignalReverse => this.do_reverse_connect(&best_node_info, data).await,
|
||||
InboundMethod::SignalHolePunch => this.do_hole_punch(&best_node_info, data).await,
|
||||
InboundMethod::Relay => {
|
||||
if let Some(rpi) = best_node_info.relay_peer_info {
|
||||
// Get the noderef for this inbound relay
|
||||
let inbound_relay_noderef = this.routing_table().register_node_with_node_info(rpi.node_id.key, rpi.node_info)?;
|
||||
// Send to the inbound relay
|
||||
this.send_data(inbound_relay_noderef, data).await
|
||||
} else {
|
||||
// If we don't have a relay dial info that matches our protocol configuration
|
||||
// then we can't send to this node!
|
||||
Err("Can't send to this relay".to_owned())
|
||||
}
|
||||
ContactMethod::Direct(dial_info) => {
|
||||
this.net().send_data_to_dial_info(dial_info, data).await
|
||||
}
|
||||
ContactMethod::SignalReverse(relay_nr) => {
|
||||
this.do_reverse_connect(relay_nr, node_ref, data).await
|
||||
}
|
||||
ContactMethod::SignalHolePunch(relay_nr) => {
|
||||
this.do_hole_punch(relay_nr, node_ref, data).await
|
||||
}
|
||||
ContactMethod::Unreachable => Err("Can't send to this relay".to_owned()),
|
||||
}
|
||||
.map_err(logthru_net!())
|
||||
})
|
||||
}
|
||||
|
||||
@@ -742,7 +874,7 @@ impl NetworkManager {
|
||||
|
||||
// Is this an out-of-band receipt instead of an envelope?
|
||||
if data[0..4] == *RECEIPT_MAGIC {
|
||||
self.process_receipt(data).await?;
|
||||
self.process_out_of_band_receipt(data, descriptor).await?;
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user