refactor
This commit is contained in:
@@ -1,14 +1,12 @@
|
||||
use super::*;
|
||||
use core::sync::atomic::Ordering;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Bucket {
|
||||
routing_table: RoutingTable,
|
||||
entries: BTreeMap<DHTKey, BucketEntry>,
|
||||
entries: BTreeMap<DHTKey, Arc<BucketEntry>>,
|
||||
newest_entry: Option<DHTKey>,
|
||||
}
|
||||
pub(super) type EntriesIterMut<'a> =
|
||||
alloc::collections::btree_map::IterMut<'a, DHTKey, BucketEntry>;
|
||||
pub(super) type EntriesIter<'a> = alloc::collections::btree_map::Iter<'a, DHTKey, BucketEntry>;
|
||||
pub(super) type EntriesIter<'a> = alloc::collections::btree_map::Iter<'a, DHTKey, Arc<BucketEntry>>;
|
||||
|
||||
fn state_ordering(state: BucketEntryState) -> usize {
|
||||
match state {
|
||||
@@ -31,14 +29,14 @@ impl Bucket {
|
||||
log_rtab!("Node added: {}", node_id.encode());
|
||||
|
||||
// Add new entry
|
||||
self.entries.insert(node_id, BucketEntry::new());
|
||||
self.entries.insert(node_id, Arc::new(BucketEntry::new()));
|
||||
|
||||
// This is now the newest bucket entry
|
||||
self.newest_entry = Some(node_id);
|
||||
|
||||
// Get a node ref to return
|
||||
let entry_ref = self.entries.get_mut(&node_id).unwrap();
|
||||
NodeRef::new(self.routing_table.clone(), node_id, entry_ref, None)
|
||||
let entry = self.entries.get(&node_id).unwrap().clone();
|
||||
NodeRef::new(self.routing_table.clone(), node_id, entry, None)
|
||||
}
|
||||
|
||||
pub(super) fn remove_entry(&mut self, node_id: &DHTKey) {
|
||||
@@ -50,25 +48,21 @@ impl Bucket {
|
||||
// newest_entry is updated by kick_bucket()
|
||||
}
|
||||
|
||||
pub(super) fn roll_transfers(&mut self, last_ts: u64, cur_ts: u64) {
|
||||
pub(super) fn roll_transfers(&self, last_ts: u64, cur_ts: u64) {
|
||||
// Called every ROLLING_TRANSFERS_INTERVAL_SECS
|
||||
for entry in &mut self.entries {
|
||||
entry.1.roll_transfers(last_ts, cur_ts);
|
||||
for (_k, v) in &self.entries {
|
||||
v.with_mut(|e| e.roll_transfers(last_ts, cur_ts));
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn entry_mut(&mut self, key: &DHTKey) -> Option<&mut BucketEntry> {
|
||||
self.entries.get_mut(key)
|
||||
pub(super) fn entry(&self, key: &DHTKey) -> Option<Arc<BucketEntry>> {
|
||||
self.entries.get(key).cloned()
|
||||
}
|
||||
|
||||
pub(super) fn entries(&self) -> EntriesIter {
|
||||
self.entries.iter()
|
||||
}
|
||||
|
||||
pub(super) fn entries_mut(&mut self) -> EntriesIterMut {
|
||||
self.entries.iter_mut()
|
||||
}
|
||||
|
||||
pub(super) fn kick(&mut self, bucket_depth: usize) -> Option<BTreeSet<DHTKey>> {
|
||||
// Get number of entries to attempt to purge from bucket
|
||||
let bucket_len = self.entries.len();
|
||||
@@ -83,27 +77,34 @@ impl Bucket {
|
||||
let mut extra_entries = bucket_len - bucket_depth;
|
||||
|
||||
// Get the sorted list of entries by their kick order
|
||||
let mut sorted_entries: Vec<(&_, &_)> = self.entries.iter().collect();
|
||||
let mut sorted_entries: Vec<(DHTKey, Arc<BucketEntry>)> = self
|
||||
.entries
|
||||
.iter()
|
||||
.map(|(k, v)| (k.clone(), v.clone()))
|
||||
.collect();
|
||||
let cur_ts = get_timestamp();
|
||||
sorted_entries.sort_by(
|
||||
|a: &(&DHTKey, &BucketEntry), b: &(&DHTKey, &BucketEntry)| -> core::cmp::Ordering {
|
||||
let ea = a.1;
|
||||
let eb = b.1;
|
||||
let astate = state_ordering(ea.state(cur_ts));
|
||||
let bstate = state_ordering(eb.state(cur_ts));
|
||||
// first kick dead nodes, then unreliable nodes
|
||||
if astate < bstate {
|
||||
return core::cmp::Ordering::Less;
|
||||
}
|
||||
if astate > bstate {
|
||||
return core::cmp::Ordering::Greater;
|
||||
}
|
||||
// then kick by time added, most recent nodes are kicked first
|
||||
let ata = ea.peer_stats().time_added;
|
||||
let bta = eb.peer_stats().time_added;
|
||||
bta.cmp(&ata)
|
||||
},
|
||||
);
|
||||
sorted_entries.sort_by(|a, b| -> core::cmp::Ordering {
|
||||
if a.0 == b.0 {
|
||||
return core::cmp::Ordering::Equal;
|
||||
}
|
||||
a.1.with(|ea| {
|
||||
b.1.with(|eb| {
|
||||
let astate = state_ordering(ea.state(cur_ts));
|
||||
let bstate = state_ordering(eb.state(cur_ts));
|
||||
// first kick dead nodes, then unreliable nodes
|
||||
if astate < bstate {
|
||||
return core::cmp::Ordering::Less;
|
||||
}
|
||||
if astate > bstate {
|
||||
return core::cmp::Ordering::Greater;
|
||||
}
|
||||
// then kick by time added, most recent nodes are kicked first
|
||||
let ata = ea.peer_stats().time_added;
|
||||
let bta = eb.peer_stats().time_added;
|
||||
bta.cmp(&ata)
|
||||
})
|
||||
})
|
||||
});
|
||||
|
||||
self.newest_entry = None;
|
||||
for entry in sorted_entries {
|
||||
@@ -111,23 +112,23 @@ impl Bucket {
|
||||
if extra_entries == 0 {
|
||||
// The first 'live' entry we find is our newest entry
|
||||
if self.newest_entry.is_none() {
|
||||
self.newest_entry = Some(*entry.0);
|
||||
self.newest_entry = Some(entry.0);
|
||||
}
|
||||
break;
|
||||
}
|
||||
extra_entries -= 1;
|
||||
|
||||
// if this entry has references we can't drop it yet
|
||||
if entry.1.ref_count > 0 {
|
||||
if entry.1.ref_count.load(Ordering::Acquire) > 0 {
|
||||
// The first 'live' entry we fine is our newest entry
|
||||
if self.newest_entry.is_none() {
|
||||
self.newest_entry = Some(*entry.0);
|
||||
self.newest_entry = Some(entry.0);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
// if no references, lets evict it
|
||||
dead_node_ids.insert(*entry.0);
|
||||
dead_node_ids.insert(entry.0);
|
||||
}
|
||||
|
||||
// Now purge the dead node ids
|
||||
|
||||
Reference in New Issue
Block a user