Skip to content

Commit

Permalink
feat: fix rotation logic
Browse files Browse the repository at this point in the history
  • Loading branch information
functor-flow committed Nov 22, 2024
1 parent ab0492c commit c7a8a8a
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 21 deletions.
4 changes: 3 additions & 1 deletion pallets/offworker/src/dispatches/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ pub mod dispatches {
);

// TODO: make a periodical irrationality delta reseter that subnet owner can control
IrrationalityDelta::<T>::set(subnet_id, delta);
IrrationalityDelta::<T>::mutate(subnet_id, |current| {
*current = current.saturating_add(delta)
});
pallet_subnet_emission::Pallet::<T>::handle_decrypted_weights(
subnet_id,
decrypted_weights,
Expand Down
6 changes: 5 additions & 1 deletion pallets/offworker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ pub mod pallet {
}
};

#[cfg(feature = "testing-offworker")]
let acc_id =
T::AccountId::decode(&mut sp_runtime::traits::TrailingZeroInput::zeroes()).unwrap();

// The valid subnets are ones that have encrypted weights, with matching key of the
// offchain worker. The hanging, are potential subnets that have turned off the
// encryption in the middle of offchain worker process
Expand All @@ -215,7 +219,7 @@ pub mod pallet {
});

log::info!("Valid subnets: {:?}", valid_subnets);
let deregistered_subnets = Self::process_subnets(valid_subnets, block_number);
let deregistered_subnets = Self::process_subnets(valid_subnets, acc_id, block_number);
deregistered_subnets.iter().for_each(|subnet_id| {
log::info!("Deregistered subnet: {}", subnet_id);
Self::delete_subnet_state(subnet_id);
Expand Down
16 changes: 12 additions & 4 deletions pallets/offworker/src/process.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use super::*;

impl<T: Config> Pallet<T> {
pub fn process_subnets(subnets: Vec<u16>, current_block: u64) -> Vec<u16> {
pub fn process_subnets(
subnets: Vec<u16>,
acc_id: T::AccountId,
current_block: u64,
) -> Vec<u16> {
let mut deregistered_subnets = Vec::new();

subnets.into_iter().for_each(|subnet_id| {
Expand Down Expand Up @@ -48,10 +52,14 @@ impl<T: Config> Pallet<T> {
.filter(|(block, _)| *block > last_processed_block)
.collect::<Vec<_>>();

let (epochs, result) =
process_consensus_params::<T>(subnet_id, new_params, simulation_result);
let (send_weights, result) = process_consensus_params::<T>(
subnet_id,
acc_id.clone(),
new_params,
simulation_result,
);

if epochs.is_empty() {
if !send_weights {
Self::save_subnet_state(subnet_id, max_block, result.simulation_result);
} else if let Err(err) = Self::do_send_weights(subnet_id, result.delta) {
log::error!(
Expand Down
55 changes: 40 additions & 15 deletions pallets/offworker/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,43 @@ use super::*;
use crate::profitability::{get_copier_stake, is_copying_irrational};
use types::SimulationYumaParams;

fn should_send_rotation_weights<T>(subnet_id: u16, acc_id: &T::AccountId) -> bool
where
T: pallet_subspace::Config + pallet_subnet_emission::Config + pallet::Config,
{
// Get subnet decryption data and return early if None
let decryption_info = match SubnetDecryptionData::<T>::get(subnet_id) {
Some(data) => data,
None => return false,
};

// If there's rotation info and we're the one rotating from, return true
if let Some((rotating_from_id, _block)) = decryption_info.rotating_from {
if &rotating_from_id == acc_id {
return true;
}
}

false
}

pub fn process_consensus_params<T>(
subnet_id: u16,
acc_id: T::AccountId,
consensus_params: Vec<(u64, ConsensusParams<T>)>,
mut simulation_result: ConsensusSimulationResult<T>,
) -> (Vec<BlockWeights>, ShouldDecryptResult<T>)
simulation_result: ConsensusSimulationResult<T>,
) -> (bool, ShouldDecryptResult<T>)
where
T: pallet_subspace::Config + pallet_subnet_emission::Config + pallet::Config,
{
let mut epochs = Vec::new();
let mut result = ShouldDecryptResult::<T> {
should_decrypt: false,
simulation_result: simulation_result.clone(),
delta: I64F64::from_num(0),
};

let mut final_should_send = false;

log::info!("Processing consensus params for subnet {}", subnet_id);
log::info!(
"Number of consensus params entries: {}",
Expand Down Expand Up @@ -112,7 +134,7 @@ where
weights_for_should_decrypt.len()
);

let should_decrypt_result = should_decrypt_weights::<T>(
let mut should_decrypt_result = should_decrypt_weights::<T>(
&weights_for_should_decrypt,
params.clone(),
subnet_id,
Expand All @@ -126,25 +148,28 @@ where
should_decrypt_result.delta
);

simulation_result = should_decrypt_result.simulation_result.clone();
let current_should_send = if !should_decrypt_result.should_decrypt {
let rotation_should_send = should_send_rotation_weights::<T>(subnet_id, &acc_id);
if rotation_should_send {
should_decrypt_result.delta = I64F64::from_num(0);
}
rotation_should_send
} else {
true
};

if should_decrypt_result.should_decrypt {
if current_should_send {
log::info!(
"Adding decrypted weights for block {} to epochs",
param_block
);
epochs.push((*param_block, decrypted_weights));
result = should_decrypt_result;
}
}

log::info!(
"Final processing result: {} epochs processed, should_decrypt: {}",
epochs.len(),
result.should_decrypt
);

(epochs, result)
final_should_send = current_should_send;
}
log::info!("should_decrypt: {}", result.should_decrypt);
(final_should_send, result)
}

/// Returns
Expand Down
6 changes: 6 additions & 0 deletions pallets/subnet_emission/src/decryption.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ impl<T: Config> Pallet<T> {
activation_block: None, /* will be set based on the first encrypted
* weight
* occurrence */
rotating_from: None,
last_keep_alive: block,
}),
);
Expand Down Expand Up @@ -390,6 +391,7 @@ impl<T: Config> Pallet<T> {
node_public_key: public_key,
last_keep_alive: current_block,
activation_block: None,
rotating_from: None,
});
}
}
Expand Down Expand Up @@ -573,6 +575,7 @@ impl<T: Config> Pallet<T> {
}
}

/// TODO: delete the wc state
pub(crate) fn rotate_decryption_node_if_needed(subnet_id: u16, info: SubnetDecryptionInfo<T>) {
let block_number = pallet_subspace::Pallet::<T>::get_current_block_number();
let activation_block = match info.activation_block {
Expand All @@ -591,6 +594,8 @@ impl<T: Config> Pallet<T> {
None => return,
};

let previous_node_id = SubnetDecryptionData::<T>::get(subnet_id).unwrap_or(info).node_id;

let new_node =
active_nodes.get(current.checked_rem(active_nodes.len()).unwrap_or(0)).cloned();

Expand All @@ -602,6 +607,7 @@ impl<T: Config> Pallet<T> {
node_public_key: new_node.node_public_key,
activation_block: None, /* This will get updated based on the first encrypted
* weights */
rotating_from: Some((previous_node_id, block_number)),
last_keep_alive: block_number,
}),
);
Expand Down
3 changes: 3 additions & 0 deletions pallets/subnet_emission/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ pub type PublicKey = (Vec<u8>, Vec<u8>);
pub type BlockWeights = (u64, Vec<(u16, Vec<(u16, u16)>, Vec<u8>)>);
pub type KeylessBlockWeights = (u64, Vec<(u16, Vec<(u16, u16)>)>);

/// Information about a subnet decryption node, including its identity, activation status,
/// and rotation details.
#[derive(Clone, Encode, Decode, TypeInfo, Debug)]
pub struct SubnetDecryptionInfo<T>
where
Expand All @@ -14,4 +16,5 @@ where
// gets assigned when first encrypted weights appear on the subnet
pub activation_block: Option<u64>,
pub last_keep_alive: u64,
pub rotating_from: Option<(T::AccountId, u64)>, // rotating from at block
}
1 change: 1 addition & 0 deletions tests/src/offworker/offworker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ fn test_offchain_worker_behavior() {

for (block_number_str, block_weights) in &data.weights {
let block_number: u64 = block_number_str.parse().unwrap();

dbg!(block_number);

PendingEmission::<Test>::set(TEST_SUBNET_ID, PENDING_EMISSION);
Expand Down
1 change: 1 addition & 0 deletions tests/src/offworker/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ pub fn initialize_authorities(
node_public_key: public_key,
last_keep_alive: first_block,
activation_block: None,
rotating_from: None,
};
let decryption_nodes = vec![decryption_info.clone()];
DecryptionNodes::<Test>::set(decryption_nodes);
Expand Down
4 changes: 4 additions & 0 deletions tests/src/subnet_emission.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1431,6 +1431,7 @@ fn decrypted_weight_run_result_is_applied_and_cleaned_up() {
node_id: 1001,
node_public_key: key.clone(),
last_keep_alive: pallet_subspace::Tempo::<Test>::get(netuid) as u64,
rotating_from: None,
};

pallet_subnet_emission::DecryptionNodes::<Test>::set(vec![subnet_decryption_data.clone()]);
Expand Down Expand Up @@ -1537,6 +1538,7 @@ fn rotate_decryption_node() {
node_id: dn_1,
node_public_key: key_1,
last_keep_alive: decryption_node_interval,
rotating_from: None,
}),
);

Expand All @@ -1547,6 +1549,7 @@ fn rotate_decryption_node() {
node_id: dn_2,
node_public_key: key_2,
last_keep_alive: decryption_node_interval,
rotating_from: None,
}),
);

Expand Down Expand Up @@ -1582,6 +1585,7 @@ fn ban_decryption_node() {
node_id: dn_1,
node_public_key: key_1,
last_keep_alive: 0,
rotating_from: None,
}),
);

Expand Down

0 comments on commit c7a8a8a

Please sign in to comment.