simplify tracing

This commit is contained in:
John Smith
2023-06-24 22:59:51 -04:00
parent c8fdded5a7
commit 234f048241
38 changed files with 197 additions and 114 deletions
@@ -34,14 +34,14 @@ impl ConnectionHandle {
self.descriptor.clone()
}
#[instrument(level="trace", skip(self, message), fields(message.len = message.len()))]
#[cfg_attr(feature="verbose-tracing", instrument(level="trace", skip(self, message), fields(message.len = message.len())))]
pub fn send(&self, message: Vec<u8>) -> ConnectionHandleSendResult {
match self.channel.send((Span::current().id(), message)) {
Ok(()) => ConnectionHandleSendResult::Sent,
Err(e) => ConnectionHandleSendResult::NotSent(e.0 .1),
}
}
#[instrument(level="trace", skip(self, message), fields(message.len = message.len()))]
#[cfg_attr(feature="verbose-tracing", instrument(level="trace", skip(self, message), fields(message.len = message.len())))]
pub async fn send_async(&self, message: Vec<u8>) -> ConnectionHandleSendResult {
match self
.channel
+3 -3
View File
@@ -775,7 +775,7 @@ impl NetworkManager {
}
/// Builds an envelope for sending over the network
#[instrument(level = "trace", skip(self, body), err)]
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, body), err))]
fn build_envelope<B: AsRef<[u8]>>(
&self,
dest_node_id: TypedKey,
@@ -806,7 +806,7 @@ impl NetworkManager {
/// node_ref is the direct destination to which the envelope will be sent
/// If 'destination_node_ref' is specified, it can be different than the node_ref being sent to
/// which will cause the envelope to be relayed
#[instrument(level = "trace", skip(self, body), ret, err)]
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, body), ret, err))]
pub async fn send_envelope<B: AsRef<[u8]>>(
&self,
node_ref: NodeRef,
@@ -872,7 +872,7 @@ impl NetworkManager {
// Called when a packet potentially containing an RPC envelope is received by a low-level
// network protocol handler. Processes the envelope, authenticates and decrypts the RPC message
// and passes it to the RPC handler
#[instrument(level = "trace", ret, err, skip(self, data), fields(data.len = data.len()))]
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", ret, err, skip(self, data), fields(data.len = data.len())))]
async fn on_recv_envelope(
&self,
data: &mut [u8],
@@ -358,7 +358,7 @@ impl Network {
// This creates a short-lived connection in the case of connection-oriented protocols
// for the purpose of sending this one message.
// This bypasses the connection table as it is not a 'node to node' connection.
#[instrument(level="trace", err, skip(self, data), fields(data.len = data.len()))]
#[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))]
pub async fn send_data_unbound_to_dial_info(
&self,
dial_info: DialInfo,
@@ -416,7 +416,7 @@ impl Network {
// This creates a short-lived connection in the case of connection-oriented protocols
// for the purpose of sending this one message.
// This bypasses the connection table as it is not a 'node to node' connection.
#[instrument(level="trace", err, skip(self, data), fields(data.len = data.len()))]
#[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))]
pub async fn send_recv_data_unbound_to_dial_info(
&self,
dial_info: DialInfo,
@@ -496,7 +496,7 @@ impl Network {
}
}
#[instrument(level="trace", err, skip(self, data), fields(data.len = data.len()))]
#[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))]
pub async fn send_data_to_existing_connection(
&self,
descriptor: ConnectionDescriptor,
@@ -556,7 +556,7 @@ impl Network {
// Send data directly to a dial info, possibly without knowing which node it is going to
// Returns a descriptor for the connection used to send the data
#[instrument(level="trace", err, skip(self, data), fields(data.len = data.len()))]
#[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))]
pub async fn send_data_to_dial_info(
&self,
dial_info: DialInfo,
@@ -56,11 +56,12 @@ impl RawTcpNetworkConnection {
stream.flush().await.into_network_result()
}
//#[instrument(level="trace", err, skip(self, message), fields(network_result, message.len = message.len()))]
#[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, message), fields(network_result, message.len = message.len())))]
pub async fn send(&self, message: Vec<u8>) -> io::Result<NetworkResult<()>> {
let mut stream = self.stream.clone();
let out = Self::send_internal(&mut stream, message).await?;
//tracing::Span::current().record("network_result", &tracing::field::display(&out));
#[cfg(feature = "verbose-tracing")]
tracing::Span::current().record("network_result", &tracing::field::display(&out));
Ok(out)
}
@@ -87,11 +88,15 @@ impl RawTcpNetworkConnection {
Ok(NetworkResult::Value(out))
}
// #[instrument(level = "trace", err, skip(self), fields(network_result))]
#[cfg_attr(
feature = "verbose-tracing",
instrument(level = "trace", err, skip(self), fields(network_result))
)]
pub async fn recv(&self) -> io::Result<NetworkResult<Vec<u8>>> {
let mut stream = self.stream.clone();
let out = Self::recv_internal(&mut stream).await?;
//tracing::Span::current().record("network_result", &tracing::field::display(&out));
#[cfg(feature = "verbose-tracing")]
tracing::Span::current().record("network_result", &tracing::field::display(&out));
Ok(out)
}
}
@@ -15,7 +15,7 @@ impl RawUdpProtocolHandler {
}
}
// #[instrument(level = "trace", err, skip(self, data), fields(data.len = data.len(), ret.len, ret.descriptor))]
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", err, skip(self, data), fields(data.len = data.len(), ret.len, ret.descriptor)))]
pub async fn recv_message(&self, data: &mut [u8]) -> io::Result<(usize, ConnectionDescriptor)> {
let (message_len, descriptor) = loop {
// Get a packet
@@ -49,12 +49,14 @@ impl RawUdpProtocolHandler {
break (message.len(), descriptor);
};
// tracing::Span::current().record("ret.len", &message_len);
// tracing::Span::current().record("ret.descriptor", &format!("{:?}", descriptor).as_str());
#[cfg(feature = "verbose-tracing")]
tracing::Span::current().record("ret.len", &size);
#[cfg(feature = "verbose-tracing")]
tracing::Span::current().record("ret.descriptor", &format!("{:?}", descriptor).as_str());
Ok((message_len, descriptor))
}
//#[instrument(level = "trace", err, skip(self, data), fields(data.len = data.len(), ret.descriptor))]
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", err, skip(self, data), fields(data.len = data.len(), ret.len, ret.descriptor)))]
pub async fn send_message(
&self,
data: Vec<u8>,
@@ -95,7 +97,10 @@ impl RawUdpProtocolHandler {
SocketAddress::from_socket_addr(local_socket_addr),
);
// tracing::Span::current().record("ret.descriptor", &format!("{:?}", descriptor).as_str());
#[cfg(feature = "verbose-tracing")]
tracing::Span::current().record("ret.len", &len);
#[cfg(feature = "verbose-tracing")]
tracing::Span::current().record("ret.descriptor", &format!("{:?}", descriptor).as_str());
Ok(NetworkResult::value(descriptor))
}
@@ -72,7 +72,7 @@ where
// .map_err(to_io_error_other)
// }
//#[instrument(level = "trace", err, skip(self, message), fields(network_result, message.len = message.len()))]
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", err, skip(self, message), fields(network_result, message.len = message.len())))]
pub async fn send(&self, message: Vec<u8>) -> io::Result<NetworkResult<()>> {
if message.len() > MAX_MESSAGE_SIZE {
bail_io_error_other!("received too large WS message");
@@ -82,6 +82,7 @@ where
Err(e) => err_to_network_result(e),
};
if !out.is_value() {
#[cfg(feature = "verbose-tracing")]
tracing::Span::current().record("network_result", &tracing::field::display(&out));
return Ok(out);
}
@@ -89,11 +90,13 @@ where
Ok(v) => NetworkResult::value(v),
Err(e) => err_to_network_result(e),
};
//tracing::Span::current().record("network_result", &tracing::field::display(&out));
#[cfg(feature = "verbose-tracing")]
tracing::Span::current().record("network_result", &tracing::field::display(&out));
Ok(out)
}
// #[instrument(level = "trace", err, skip(self), fields(network_result, ret.len))]
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", err, skip(self), fields(network_result, ret.len)))]
pub async fn recv(&self) -> io::Result<NetworkResult<Vec<u8>>> {
let out = match self.stream.clone().next().await {
Some(Ok(Message::Binary(v))) => {
@@ -120,7 +123,8 @@ where
)),
};
// tracing::Span::current().record("network_result", &tracing::field::display(&out));
#[cfg(feature = "verbose-tracing")]
tracing::Span::current().record("network_result", &tracing::field::display(&out));
Ok(out)
}
}
@@ -179,7 +179,7 @@ impl NetworkConnection {
}
}
#[instrument(level="trace", skip(message, stats), fields(message.len = message.len()), ret)]
#[cfg_attr(feature="verbose-tracing", instrument(level="trace", skip(message, stats), fields(message.len = message.len()), ret))]
async fn send_internal(
protocol_connection: &ProtocolNetworkConnection,
stats: Arc<Mutex<NetworkConnectionStats>>,
@@ -194,7 +194,7 @@ impl NetworkConnection {
Ok(NetworkResult::Value(out))
}
#[instrument(level="trace", skip(stats), fields(ret.len))]
#[cfg_attr(feature="verbose-tracing", instrument(level="trace", skip(stats), fields(ret.len)))]
async fn recv_internal(
protocol_connection: &ProtocolNetworkConnection,
stats: Arc<Mutex<NetworkConnectionStats>>,
@@ -205,6 +205,7 @@ impl NetworkConnection {
let mut stats = stats.lock();
stats.last_message_recv_time.max_assign(Some(ts));
#[cfg(feature = "verbose-tracing")]
tracing::Span::current().record("ret.len", out.len());
Ok(NetworkResult::Value(out))
+8 -2
View File
@@ -389,7 +389,10 @@ impl NetworkManager {
/// Send a reverse connection signal and wait for the return receipt over it
/// Then send the data across the new connection
/// Only usable for PublicInternet routing domain
#[instrument(level = "trace", skip(self, data), err)]
#[cfg_attr(
feature = "verbose-tracing",
instrument(level = "trace", skip(self, data), err)
)]
async fn do_reverse_connect(
&self,
relay_nr: NodeRef,
@@ -475,7 +478,10 @@ impl NetworkManager {
/// Send a hole punch signal and do a negotiating ping and wait for the return receipt
/// Then send the data across the new connection
/// Only usable for PublicInternet routing domain
#[instrument(level = "trace", skip(self, data), err)]
#[cfg_attr(
feature = "verbose-tracing",
instrument(level = "trace", skip(self, data), err)
)]
async fn do_hole_punch(
&self,
relay_nr: NodeRef,
+4 -4
View File
@@ -79,7 +79,7 @@ impl Network {
/////////////////////////////////////////////////////////////////
#[instrument(level="trace", err, skip(self, data), fields(data.len = data.len()))]
#[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))]
pub async fn send_data_unbound_to_dial_info(
&self,
dial_info: DialInfo,
@@ -119,7 +119,7 @@ impl Network {
// This creates a short-lived connection in the case of connection-oriented protocols
// for the purpose of sending this one message.
// This bypasses the connection table as it is not a 'node to node' connection.
#[instrument(level="trace", err, skip(self, data), fields(data.len = data.len()))]
#[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))]
pub async fn send_recv_data_unbound_to_dial_info(
&self,
dial_info: DialInfo,
@@ -167,7 +167,7 @@ impl Network {
}
}
#[instrument(level="trace", err, skip(self, data), fields(data.len = data.len()))]
#[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))]
pub async fn send_data_to_existing_connection(
&self,
descriptor: ConnectionDescriptor,
@@ -212,7 +212,7 @@ impl Network {
Ok(Some(data))
}
#[instrument(level="trace", err, skip(self, data), fields(data.len = data.len()))]
#[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))]
pub async fn send_data_to_dial_info(
&self,
dial_info: DialInfo,
@@ -64,7 +64,7 @@ impl WebsocketNetworkConnection {
// self.inner.ws_meta.close().await.map_err(to_io).map(drop)
// }
//#[instrument(level = "trace", err, skip(self, message), fields(network_result, message.len = message.len()))]
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", err, skip(self, message), fields(network_result, message.len = message.len())))]
pub async fn send(&self, message: Vec<u8>) -> io::Result<NetworkResult<()>> {
if message.len() > MAX_MESSAGE_SIZE {
bail_io_error_other!("sending too large WS message");
@@ -79,11 +79,12 @@ impl WebsocketNetworkConnection {
.map_err(to_io)
.into_network_result()?;
//tracing::Span::current().record("network_result", &tracing::field::display(&out));
#[cfg(feature = "verbose-tracing")]
tracing::Span::current().record("network_result", &tracing::field::display(&out));
Ok(out)
}
// #[instrument(level = "trace", err, skip(self), fields(network_result, ret.len))]
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", err, skip(self), fields(network_result, ret.len)))]
pub async fn recv(&self) -> io::Result<NetworkResult<Vec<u8>>> {
let out = match SendWrapper::new(self.inner.ws_stream.clone().next()).await {
Some(WsMessage::Binary(v)) => {
@@ -103,7 +104,8 @@ impl WebsocketNetworkConnection {
)));
}
};
// tracing::Span::current().record("network_result", &tracing::field::display(&out));
#[cfg(feature = "verbose-tracing")]
tracing::Span::current().record("network_result", &tracing::field::display(&out));
Ok(out)
}
}