Skip to content

Commit

Permalink
Several fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
ausias-armesto committed Feb 29, 2024
1 parent fd580a9 commit f5414f0
Show file tree
Hide file tree
Showing 22 changed files with 236 additions and 224 deletions.
258 changes: 128 additions & 130 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion charts/cluster-hoprd/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

apiVersion: v2
name: cluster-hoprd
version: 0.3.0-rc.1
version: 0.3.0
description: A Helm chart to deploy ClusterHoprd
type: application
icon: "https://hoprnet.org/assets/icons/logo.svg"
2 changes: 1 addition & 1 deletion charts/cluster-hoprd/templates/cluster-hoprd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ metadata:
name: {{ include "cluster-hoprd.name" . }}
namespace: {{ .Release.Namespace }}
annotations:
argocd.argoproj.io/sync-wave: 4
argocd.argoproj.io/sync-wave: "4"
spec:
identityPoolName: {{ include "cluster-hoprd.name" . }}
replicas: {{ .Values.replicas }}
Expand Down
2 changes: 1 addition & 1 deletion charts/cluster-hoprd/templates/identity-hoprd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ metadata:
name: {{ $key }}
namespace: {{ $release.Namespace | quote }}
annotations:
argocd.argoproj.io/sync-wave: 3
argocd.argoproj.io/sync-wave: "3"
spec:
identityPoolName: {{ $releaseName }}
identityFile: {{ $value.identityFile | quote }}
Expand Down
2 changes: 1 addition & 1 deletion charts/cluster-hoprd/templates/identity-pool.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ metadata:
name: {{ include "cluster-hoprd.name" . }}
namespace: {{ .Release.Namespace }}
annotations:
argocd.argoproj.io/sync-wave: 2
argocd.argoproj.io/sync-wave: "2"
spec:
network: {{ .Values.network }}
secretName: {{ include "cluster-hoprd.name" . }}-wallet
Expand Down
2 changes: 1 addition & 1 deletion charts/cluster-hoprd/templates/secret-wallet.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ metadata:
name: {{ include "cluster-hoprd.name" . }}-wallet
namespace: {{ .Release.Namespace }}
annotations:
argocd.argoproj.io/sync-wave: 1
argocd.argoproj.io/sync-wave: "1"
type: Opaque
data:
DEPLOYER_PRIVATE_KEY: {{ .Values.wallet.deployerPrivateKey | b64enc }}
Expand Down
2 changes: 1 addition & 1 deletion charts/hoprd-operator/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

apiVersion: v2
name: hoprd-operator
version: 0.2.6-rc.4
version: 0.2.6
appVersion: 0.2.8
description: A Helm chart operator for managing Hopr nodes
type: application
Expand Down
2 changes: 1 addition & 1 deletion charts/hoprd-operator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ operator:
persistence:
## @param operator.persistence.size Size of the persistence Volume
##
size: 100Mi
size: 500Mi
## @param operator.persistence.storageClassName Name of the storage class
##
storageClassName: ""
Expand Down
5 changes: 2 additions & 3 deletions src/bootstrap_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ use json_patch::{PatchOperation, ReplaceOperation};
use k8s_openapi::{
api::{
apps::v1::Deployment,
core::v1::{ContainerPort, Service, ServicePort},
core::v1::ContainerPort,
},
apimachinery::pkg::util::intstr::IntOrString,
serde_value::Value,
serde_value::Value
};
use kube::{
api::{ListParams, Patch, PatchParams},
Expand Down
6 changes: 3 additions & 3 deletions src/cluster/cluster_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ async fn reconciler(
ClusterHoprdAction::Rescale => cluster_hoprd.rescale(context.clone()).await,
// The resource is already in desired state, do nothing and re-check after 10 seconds
ClusterHoprdAction::NoOp => Ok(Action::requeue(Duration::from_secs(
constants::RECONCILE_FREQUENCY,
constants::RECONCILE_SHORT_FREQUENCY,
))),
}
}
Expand All @@ -98,7 +98,7 @@ pub fn on_error(
_context: Arc<ContextData>,
) -> Action {
error!("[ClusterHoprd] Reconciliation error:\n{:?}.\n{:?}",error, cluster_hoprd);
Action::requeue(Duration::from_secs(constants::RECONCILE_FREQUENCY))
Action::requeue(Duration::from_secs(constants::RECONCILE_SHORT_FREQUENCY))
}

/// Initialize the controller
Expand All @@ -115,7 +115,7 @@ pub async fn run(client: Client, context_data: Arc<ContextData>) {
Ok(_cluster_hoprd_resource) => {}
Err(reconciliation_err) => {
let err_string = reconciliation_err.to_string();
if !err_string.contains("that was not found in local store") {
if !err_string.contains("that was not found in local store") && !err_string.contains("event queue error") {
// https://github.com/kube-rs/kube/issues/712
error!("[ClusterHoprd] Reconciliation error: {:?}",reconciliation_err)
}
Expand Down
45 changes: 25 additions & 20 deletions src/cluster/cluster_hoprd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl ClusterHoprd {
context_data.send_event(self, ClusterHoprdEventEnum::Ready, None).await;
self.update_status(context_data.clone(), ClusterHoprdPhaseEnum::Ready).await?;
}
Ok(Action::requeue(Duration::from_secs(constants::RECONCILE_FREQUENCY)))
Ok(Action::requeue(Duration::from_secs(constants::RECONCILE_SHORT_FREQUENCY)))
}

// Modifies the hoprd nodes related with ClusterHoprd
Expand Down Expand Up @@ -175,7 +175,7 @@ impl ClusterHoprd {
self.update_status(context_data.clone(), ClusterHoprdPhaseEnum::Failed).await?;
error!("Could not modify ClusterHoprd {cluster_hoprd_name} because cannot recover last configuration");
}
Ok(Action::requeue(Duration::from_secs(constants::RECONCILE_FREQUENCY)))
Ok(Action::requeue(Duration::from_secs(constants::RECONCILE_SHORT_FREQUENCY)))
}

async fn update_last_configuration(&self, client: Client) -> Result<(), Error> {
Expand Down Expand Up @@ -236,10 +236,11 @@ impl ClusterHoprd {
pub async fn rescale(&self, context_data: Arc<ContextData>) -> Result<Action, Error> {
let hoprd_namespace: String = self.namespace().unwrap();
let cluster_hoprd_name: String = self.name_any();
if self.status.as_ref().unwrap().phase.eq(&ClusterHoprdPhaseEnum::NotScaled) {
let status = self.status.as_ref().unwrap();
if status.phase.eq(&ClusterHoprdPhaseEnum::NotScaled) {
context_data.send_event(self, ClusterHoprdEventEnum::Scaling, None).await;
self.update_status(context_data.clone(), ClusterHoprdPhaseEnum::Scaling).await?;
let current_unsynched_nodes = self.spec.replicas - self.status.as_ref().unwrap().current_nodes;
let current_unsynched_nodes = self.spec.replicas - status.current_nodes;
info!("ClusterHoprd {cluster_hoprd_name} in namespace {hoprd_namespace} is not scaled");
match current_unsynched_nodes {
1..=i32::MAX => {
Expand All @@ -253,9 +254,9 @@ impl ClusterHoprd {
self.update_status(context_data.clone(), ClusterHoprdPhaseEnum::Ready).await?;
}
}
Ok(Action::requeue(Duration::from_secs(constants::RECONCILE_FREQUENCY)))
Ok(Action::requeue(Duration::from_secs(constants::RECONCILE_SHORT_FREQUENCY)))
} else {
debug!("ClusterHoprd {cluster_hoprd_name} in namespace {hoprd_namespace} is already being scaling");
debug!("ClusterHoprd {cluster_hoprd_name} in namespace {hoprd_namespace} is already being scaling, currently having {} nodes while needs {}", status.current_nodes, self.spec.replicas);
Ok(Action::await_change())
}
}
Expand Down Expand Up @@ -289,23 +290,26 @@ impl ClusterHoprd {
let client: Client = context_data.client.clone();
let cluster_hoprd_name = self.metadata.name.as_ref().unwrap().to_owned();
let hoprd_namespace = self.metadata.namespace.as_ref().unwrap().to_owned();
let mut cluster_hoprd_status = self
let api: Api<ClusterHoprd> = Api::namespaced(client.clone(), &hoprd_namespace.to_owned());
// We need to get the latest state of the ClusterHoprd as it may be updated by other thread and the values stored in self object might be obsolete
let cluster_hoprd = api.get(&cluster_hoprd_name).await.unwrap();
let mut cluster_hoprd_status = cluster_hoprd
.status
.as_ref()
.unwrap_or(&ClusterHoprdStatus::default())
.to_owned();

let api: Api<ClusterHoprd> = Api::namespaced(client.clone(), &hoprd_namespace.to_owned());

cluster_hoprd_status.update_timestamp = Utc::now().to_rfc3339();
cluster_hoprd_status.checksum = self.get_checksum();
cluster_hoprd_status.checksum = cluster_hoprd.get_checksum();
cluster_hoprd_status.phase = phase;
if phase.eq(&ClusterHoprdPhaseEnum::NodeCreated) {
cluster_hoprd_status.current_nodes += 1;
} else if phase.eq(&ClusterHoprdPhaseEnum::NodeDeleted) {
cluster_hoprd_status.current_nodes -= 1;
};
if phase.eq(&ClusterHoprdPhaseEnum::NodeCreated) || phase.eq(&ClusterHoprdPhaseEnum::NodeDeleted) {
if cluster_hoprd_status.current_nodes == self.spec.replicas {
if cluster_hoprd_status.current_nodes == cluster_hoprd.spec.replicas {
cluster_hoprd_status.phase = ClusterHoprdPhaseEnum::Ready;
} else {
cluster_hoprd_status.phase = ClusterHoprdPhaseEnum::NotScaled;
Expand All @@ -325,19 +329,19 @@ impl ClusterHoprd {
let current_nodes = self.get_my_nodes(api).await.unwrap();
debug!("ClusterHoprd {} in namespace {} has currently {} nodes", self.name_any(), self.namespace().unwrap(), current_nodes.len());
let current_node_numbers = current_nodes.iter().map(|n| {
n.name_any().replace(&format!("{}-", self.metadata.name.as_ref().unwrap()), "").parse::<i32>().unwrap()
// Casting the node numbers and removing 1 unit to align them with the index of the array, so the nodes numbering starts from value 0 instead of 1
n.name_any().replace(&format!("{}-", self.metadata.name.as_ref().unwrap()), "").parse::<i32>().unwrap() - 1
}).collect::<Vec<i32>>();
let next = current_node_numbers.iter().enumerate().find_map(|(index, &value)| {
if index + 1 < current_node_numbers.len() && (value + 1) > current_node_numbers[index + 1] {
Some(value + 1)
let index_i32: i32 = index.try_into().unwrap();
if index_i32 != value {
Some(index_i32 + 1)
} else if index == current_node_numbers.len() -1 {
Some(index_i32 + 2)
} else {
None
}
}).unwrap_or_else( || {
let current_size: i32 = current_node_numbers.len().try_into().unwrap();
if current_size == 1 && current_node_numbers[0] != 1 { 1 } else { current_size + 1 }

});
}).unwrap_or(1);
debug!("Next free node for ClusterHopr {} in namespace {} is {}", self.name_any(), self.namespace().unwrap(), next);
next
}
Expand All @@ -364,7 +368,8 @@ impl ClusterHoprd {
};
match self.create_hoprd_resource(context_data.clone(), node_name.to_owned(), hoprd_spec).await {
Ok(_) => {
context_data.send_event(self, ClusterHoprdEventEnum::NodeCreated, Some(node_name)).await;
info!("Node {} successfully created for cluster {}", node_name.to_owned(), cluster_name.to_owned());
context_data.send_event(self, ClusterHoprdEventEnum::NodeCreated, Some(node_name.to_owned())).await;
self.update_status(context_data.clone(), ClusterHoprdPhaseEnum::NodeCreated).await?;
},
Err(error) => {
Expand Down Expand Up @@ -463,7 +468,7 @@ impl ClusterHoprd {
let node_name = &node.name_any();
let uid = node.metadata.uid.unwrap();
api.delete(node_name, &DeleteParams::default()).await?;
await_condition(api.clone(), node_name, conditions::is_deleted(&uid)).await.unwrap();
await_condition(api.clone(), node_name, conditions::is_deleted(&uid)).await.expect(&format!("Could not delete the node {}", node_name));
}
Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

// Operator Constants
pub const RECONCILE_FREQUENCY: u64 = 10;
pub const RECONCILE_FREQUENCY_ERROR: u64 = 30;
pub const RECONCILE_SHORT_FREQUENCY: u64 = 10;
pub const RECONCILE_LONG_FREQUENCY: u64 = 30;
pub const OPERATOR_ENVIRONMENT: &str = "OPERATOR_ENVIRONMENT";
pub const OPERATOR_FINALIZER: &str = "hoprds.hoprnet.org/finalizer";
pub const OPERATOR_JOB_TIMEOUT: u64 = 300;
Expand Down
6 changes: 3 additions & 3 deletions src/hoprd/hoprd_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ async fn reconciler(hoprd: Arc<Hoprd>, context: Arc<ContextData>) -> Result<Acti
HoprdAction::Modify => hoprd.modify(context.clone()).await,
HoprdAction::Delete => hoprd.delete(context.clone()).await,
HoprdAction::NoOp => Ok(Action::requeue(Duration::from_secs(
constants::RECONCILE_FREQUENCY,
constants::RECONCILE_SHORT_FREQUENCY,
))),
}
}
Expand All @@ -89,7 +89,7 @@ async fn reconciler(hoprd: Arc<Hoprd>, context: Arc<ContextData>) -> Result<Acti
/// - `_context`: Unused argument. Context Data "injected" automatically by kube-rs.
pub fn on_error(hoprd: Arc<Hoprd>, error: &Error, _context: Arc<ContextData>) -> Action {
error!("[Hoprd] Reconciliation error:\n{:?}.\n{:?}",error, hoprd);
Action::requeue(Duration::from_secs(constants::RECONCILE_FREQUENCY))
Action::requeue(Duration::from_secs(constants::RECONCILE_SHORT_FREQUENCY))
}

/// Initialize the controller
Expand All @@ -116,7 +116,7 @@ pub async fn run(client: Client, context_data: Arc<ContextData>) {
Ok(_hoprd_resource) => {}
Err(reconciliation_err) => {
let err_string = reconciliation_err.to_string();
if !err_string.contains("that was not found in local store") {
if !err_string.contains("that was not found in local store") && !err_string.contains("event queue error") {
// https://github.com/kube-rs/kube/issues/712
error!("[Hoprd] Reconciliation error: {:?}", reconciliation_err)
}
Expand Down
4 changes: 3 additions & 1 deletion src/hoprd/hoprd_deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ pub async fn create_deployment(context_data: Arc<ContextData>, hoprd: &Hoprd, id

// Create the deployment defined above
let api: Api<Deployment> = Api::namespaced(context_data.client.clone(), &namespace);
api.create(&PostParams::default(), &deployment).await
let deployment = api.create(&PostParams::default(), &deployment).await?;
info!("Deployment {} created successfully", name.to_owned());
Ok(deployment)
}

pub async fn build_deployment_spec(labels: BTreeMap<String, String>, hoprd_spec: &HoprdSpec, identity_pool: IdentityPool, identity_hoprd: &IdentityHoprd, hoprd_host: &String) -> DeploymentSpec {
Expand Down
4 changes: 3 additions & 1 deletion src/hoprd/hoprd_ingress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ pub async fn create_ingress(

// Create the Ingress defined above
let api: Api<Ingress> = Api::namespaced(context.client.clone(), namespace);
api.create(&PostParams::default(), &ingress).await
let ingress = api.create(&PostParams::default(), &ingress).await?;
info!("Ingress {} created successfully", service_name.to_owned());
Ok(ingress)
}

/// Deletes an existing ingress.
Expand Down
Loading

0 comments on commit f5414f0

Please sign in to comment.