diff --git a/api/v1/aerospikecluster_validating_webhook.go b/api/v1/aerospikecluster_validating_webhook.go index 5c08d6af..f43a7307 100644 --- a/api/v1/aerospikecluster_validating_webhook.go +++ b/api/v1/aerospikecluster_validating_webhook.go @@ -1047,6 +1047,10 @@ func validateNamespaceConfig( return nErr } + if mErr := validateMRTFields(nsConf); mErr != nil { + return mErr + } + if nsStorage, ok := nsConf["storage-engine"]; ok { if nsStorage == nil { return fmt.Errorf( @@ -1235,6 +1239,29 @@ func validateNamespaceConfig( return nil } +func validateMRTFields(nsConf map[string]interface{}) error { + mrtField := isMRTFieldSet(nsConf) + scEnabled := IsNSSCEnabled(nsConf) + + if !scEnabled && mrtField { + return fmt.Errorf("MRT fields are not allowed in non-SC namespace %v", nsConf) + } + + return nil +} + +func isMRTFieldSet(nsConf map[string]interface{}) bool { + mrtFields := []string{"mrt-duration", "disable-mrt-writes"} + + for _, field := range mrtFields { + if _, exists := nsConf[field]; exists { + return true + } + } + + return false +} + func validateNamespaceReplicationFactor( nsConf map[string]interface{}, clSize int, ) error { diff --git a/test/cluster/strong_consistency_test.go b/test/cluster/strong_consistency_test.go index 020431fa..42a3cb2d 100644 --- a/test/cluster/strong_consistency_test.go +++ b/test/cluster/strong_consistency_test.go @@ -200,6 +200,28 @@ var _ = Describe("SCMode", func() { validateRoster(k8sClient, ctx, clusterNamespacedName, scNamespace) }) + + It("Should allow MRT fields in SC namespace", func() { + aeroCluster := createDummyAerospikeCluster(clusterNamespacedName, 3) + racks := getDummyRackConf(1) + + sc1Path := "/test/dev/" + sc1Name + aeroCluster.Spec.Storage.Volumes = append( + aeroCluster.Spec.Storage.Volumes, getStorageVolumeForAerospike(sc1Name, sc1Path)) + + conf := getSCNamespaceConfig(sc1Name, sc1Path) + conf["mrt-duration"] = 15 + + racks[0].InputAerospikeConfig = &asdbv1.AerospikeConfigSpec{ + Value: map[string]interface{}{ + "namespaces": []interface{}{conf}, + }, + } + aeroCluster.Spec.RackConfig.Racks = racks + + err := deployCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + }) }) Context("When doing invalid operation", func() { @@ -322,6 +344,29 @@ var _ = Describe("SCMode", func() { Expect(err).To(HaveOccurred()) }) + It("Should not allow MRT fields in non-SC namespace", func() { + aeroCluster := createDummyAerospikeCluster(clusterNamespacedName, 3) + racks := getDummyRackConf(1) + + sc1Path := "/test/dev/" + sc1Name + aeroCluster.Spec.Storage.Volumes = append( + aeroCluster.Spec.Storage.Volumes, getStorageVolumeForAerospike(sc1Name, sc1Path)) + + conf := getSCNamespaceConfig(sc1Name, sc1Path) + conf["strong-consistency"] = false + conf["mrt-duration"] = 10 + + racks[0].InputAerospikeConfig = &asdbv1.AerospikeConfigSpec{ + Value: map[string]interface{}{ + "namespaces": []interface{}{conf}, + }, + } + aeroCluster.Spec.RackConfig.Racks = racks + + err := deployCluster(k8sClient, ctx, aeroCluster) + Expect(err).To(HaveOccurred()) + }) + }) })