From c86020825deb775dff1fa0292ab232f57b199229 Mon Sep 17 00:00:00 2001
From: Rakesh Kashyap Hanasoge Padmanabha <rkashyap@rkashyap-mn3.linkedin.biz>
Date: Mon, 15 May 2023 21:30:54 -0700
Subject: [PATCH 1/4] Seq join bug fix

---
 .../SequentialJoinAsDerivation.scala          | 21 ++++++++++++++++---
 .../workflow/AnchoredFeatureJoinStep.scala    |  7 +++++--
 .../AnchorToDataSourceMapper.scala            |  1 -
 .../SuppressedExceptionHandlerUtils.scala     |  5 +++++
 .../offline/AnchoredFeaturesIntegTest.scala   | 12 ++++++++++-
 5 files changed, 39 insertions(+), 7 deletions(-)

diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/strategies/SequentialJoinAsDerivation.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/strategies/SequentialJoinAsDerivation.scala
index d35fc5486..e00fded12 100644
--- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/strategies/SequentialJoinAsDerivation.scala
+++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/strategies/SequentialJoinAsDerivation.scala
@@ -7,6 +7,7 @@ import com.linkedin.feathr.common.{FeatureAggregationType, FeatureValue}
 import com.linkedin.feathr.offline.PostTransformationUtil
 import com.linkedin.feathr.offline.anchored.feature.FeatureAnchorWithSource
 import com.linkedin.feathr.offline.client.DataFrameColName
+import com.linkedin.feathr.offline.client.DataFrameColName.genFeatureColumnName
 import com.linkedin.feathr.offline.derived.DerivedFeature
 import com.linkedin.feathr.offline.derived.functions.SeqJoinDerivationFunction
 import com.linkedin.feathr.offline.derived.strategies.SequentialJoinAsDerivation._
@@ -18,7 +19,7 @@ import com.linkedin.feathr.offline.mvel.plugins.FeathrExpressionExecutionContext
 import com.linkedin.feathr.offline.source.accessor.DataPathHandler
 import com.linkedin.feathr.offline.transformation.DataFrameDefaultValueSubstituter.substituteDefaults
 import com.linkedin.feathr.offline.transformation.{AnchorToDataSourceMapper, MvelDefinition}
-import com.linkedin.feathr.offline.util.{CoercionUtilsScala, DataFrameSplitterMerger, FeathrUtils, FeaturizedDatasetUtils}
+import com.linkedin.feathr.offline.util.{CoercionUtilsScala, DataFrameSplitterMerger, FeathrUtils, FeaturizedDatasetUtils, SuppressedExceptionHandlerUtils}
 import com.linkedin.feathr.sparkcommon.{ComplexAggregation, SeqJoinCustomAggregation}
 import org.apache.logging.log4j.LogManager
 import org.apache.spark.sql.functions._
@@ -52,6 +53,22 @@ private[offline] class SequentialJoinAsDerivation(ss: SparkSession,
     val seqJoinDerivationFunction = derivationFunction
     val baseFeatureName = seqJoinDerivationFunction.left.feature
     val expansionFeatureName = seqJoinDerivationFunction.right.feature
+    val shouldAddDefault = FeathrUtils.getFeathrJobParam(ss, FeathrUtils.ADD_DEFAULT_COL_FOR_MISSING_DATA).toBoolean
+
+    // If expansion feature is missing because of data issues, then we just copy the expansion feature column to be the final feature value.
+    if (shouldAddDefault && SuppressedExceptionHandlerUtils.missingFeatures.contains(expansionFeatureName)) {
+      val seqJoinedFeatureResult = df.withColumn(genFeatureColumnName(FEATURE_NAME_PREFIX + derivedFeature.producedFeatureNames.head),
+        col(genFeatureColumnName(FEATURE_NAME_PREFIX + expansionFeatureName)))
+
+      // Add the additional column with the keytags to mimic the exact behavior of the seq join flow, this will get dropped later.
+      val seqJoinFeatureResultWithRenamed = seqJoinedFeatureResult.withColumn(genFeatureColumnName(FEATURE_NAME_PREFIX
+        + derivedFeature.producedFeatureNames.head, Some(keyTags.map(keyTagList).toList)),
+        col(genFeatureColumnName(FEATURE_NAME_PREFIX + expansionFeatureName)))
+      val missingFeature = derivedFeature.producedFeatureNames.head
+      log.warn(s"Missing data for features ${missingFeature}. Default values will be populated for this column.")
+      SuppressedExceptionHandlerUtils.missingDataSuppressedExceptionMsgs += missingFeature
+      return seqJoinFeatureResultWithRenamed
+    }
     val aggregationFunction = seqJoinDerivationFunction.aggregation
     val tagStrList = Some(keyTags.map(keyTagList).toList)
     val outputKey = seqJoinDerivationFunction.left.outputKey
@@ -219,8 +236,6 @@ private[offline] class SequentialJoinAsDerivation(ss: SparkSession,
     val anchorDFMap1 = anchorToDataSourceMapper.getBasicAnchorDFMapForJoin(ss, Seq(featureAnchor), failOnMissingPartition)
     val updatedAnchorDFMap = anchorDFMap1.filter(anchorEntry => anchorEntry._2.isDefined)
       .map(anchorEntry => anchorEntry._1 -> anchorEntry._2.get)
-    // We dont need to check if the anchored feature's dataframes are missing (due to skip missing feature) as such
-    // seq join features have already been removed in the FeatureGroupsUpdater#getUpdatedFeatureGroupsWithoutInvalidPaths.
     val featureInfo = FeatureTransformation.directCalculate(
       anchorGroup: AnchorFeatureGroups,
       updatedAnchorDFMap(featureAnchor),
diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/workflow/AnchoredFeatureJoinStep.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/workflow/AnchoredFeatureJoinStep.scala
index 51d97ebe6..fe770a7c7 100644
--- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/workflow/AnchoredFeatureJoinStep.scala
+++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/workflow/AnchoredFeatureJoinStep.scala
@@ -17,7 +17,7 @@ import com.linkedin.feathr.offline.mvel.plugins.FeathrExpressionExecutionContext
 import com.linkedin.feathr.offline.source.accessor.DataSourceAccessor
 import com.linkedin.feathr.offline.transformation.DataFrameDefaultValueSubstituter.substituteDefaults
 import com.linkedin.feathr.offline.transformation.DataFrameExt._
-import com.linkedin.feathr.offline.util.{DataFrameUtils, FeathrUtils, FeaturizedDatasetUtils}
+import com.linkedin.feathr.offline.util.{DataFrameUtils, FeathrUtils, FeaturizedDatasetUtils, SuppressedExceptionHandlerUtils}
 import com.linkedin.feathr.offline.util.FeathrUtils.shouldCheckPoint
 import org.apache.logging.log4j.LogManager
 import org.apache.spark.sql.{DataFrame, SparkSession}
@@ -113,10 +113,13 @@ private[offline] class AnchoredFeatureJoinStep(
         val containsFeature: Seq[Boolean] = anchorDFMap.map(y => y._1.selectedFeatures.contains(x)).toSeq
         !containsFeature.contains(true)
       })
+      log.warn(s"Missing data for features ${missingFeatures.mkString}. Default values will be populated for this column.")
+      SuppressedExceptionHandlerUtils.missingDataSuppressedExceptionMsgs += missingFeatures.mkString
+      SuppressedExceptionHandlerUtils.missingFeatures ++= missingFeatures
       val missingAnchoredFeatures = ctx.featureGroups.allAnchoredFeatures.filter(featureName => missingFeatures.contains(featureName._1))
       substituteDefaultsForDataMissingFeatures(ctx.sparkSession, observationDF, ctx.logicalPlan,
         missingAnchoredFeatures)
-    }else observationDF
+    } else observationDF
 
     val allAnchoredFeatures: Map[String, FeatureAnchorWithSource] = ctx.featureGroups.allAnchoredFeatures
     val joinStages = ctx.logicalPlan.joinStages
diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala
index cd18e512a..eb0cc2804 100644
--- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala
+++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala
@@ -86,7 +86,6 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH
         } catch {
           case e: Exception => if (shouldSkipFeature || shouldAddDefaultCol) None else throw e
         }
-
         anchorsWithDate.map(anchor => (anchor, timeSeriesSource))
     })
   }
diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/SuppressedExceptionHandlerUtils.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/SuppressedExceptionHandlerUtils.scala
index 95e1fe90c..a4aaf6699 100644
--- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/SuppressedExceptionHandlerUtils.scala
+++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/SuppressedExceptionHandlerUtils.scala
@@ -1,9 +1,14 @@
 package com.linkedin.feathr.offline.util
 
+import com.linkedin.feathr.offline.anchored.feature.FeatureAnchorWithSource
+
 /**
  * Util classes and methods to handle suppressed exceptions.
  */
 object SuppressedExceptionHandlerUtils {
   val MISSING_DATA_EXCEPTION = "missing_data_exception"
   var missingDataSuppressedExceptionMsgs = ""
+
+  // Set of features that may be missing because of missing data.
+  var missingFeatures = scala.collection.mutable.Set.empty[String]
 }
diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala
index 071db5034..4d97667f2 100644
--- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala
+++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala
@@ -409,7 +409,7 @@ class AnchoredFeaturesIntegTest extends FeathrIntegTest {
           |   key: a_id
           |   featureList: ["featureWithNull", "derived_featureWithNull", "featureWithNull2", "featureWithNull3", "featureWithNull4",
           |    "featureWithNull5", "derived_featureWithNull2", "featureWithNull6", "featureWithNull7", "derived_featureWithNull7"
-          |    "aEmbedding", "memberEmbeddingAutoTZ", "aEmbedding", "featureWithNullSql"]
+          |    "aEmbedding", "memberEmbeddingAutoTZ", "aEmbedding", "featureWithNullSql", "seqJoin_featureWithNull"]
           | }
       """.stripMargin,
       featureDefAsString =
@@ -529,6 +529,14 @@ class AnchoredFeaturesIntegTest extends FeathrIntegTest {
           | derived_featureWithNull: "featureWithNull * 2"
           | derived_featureWithNull2: "featureWithNull2 * 2"
           | derived_featureWithNull7: "featureWithNull7 * 2"
+          | seqJoin_featureWithNull: {
+          |         key: x
+          |         join: {
+          |           base: {key: x, feature: featureWithNull2}
+          |           expansion: {key: y, feature: featureWithNull5}
+          |         }
+          |         aggregation: "SUM"
+          |     }
           |}
         """.stripMargin,
       observationDataPath = "anchorAndDerivations/testMVELLoopExpFeature-observations.csv")
@@ -550,6 +558,8 @@ class AnchoredFeaturesIntegTest extends FeathrIntegTest {
     assertEquals(featureList(0).getAs[Row]("derived_featureWithNull2"),
       Row(mutable.WrappedArray.make(Array("")), mutable.WrappedArray.make(Array(2.0f))))
     assertEquals(featureList(0).getAs[Row]("featureWithNullSql"), 1.0f)
+    assertEquals(featureList(0).getAs[Row]("seqJoin_featureWithNull"),
+      Row(mutable.WrappedArray.make(Array("")), mutable.WrappedArray.make(Array(1.0f))))
     setFeathrJobParam(ADD_DEFAULT_COL_FOR_MISSING_DATA, "false")
   }
 

From 559fbcaca0d25727674d885f44a5fe79509d2cad Mon Sep 17 00:00:00 2001
From: Rakesh Kashyap Hanasoge Padmanabha <rkashyap@rkashyap-mn3.linkedin.biz>
Date: Tue, 16 May 2023 12:09:50 -0700
Subject: [PATCH 2/4] Address comments

---
 .../com/linkedin/feathr/offline/client/FeathrClient.scala    | 4 ++--
 .../feathr/offline/derived/DerivedFeatureEvaluator.scala     | 5 ++++-
 .../derived/strategies/SequentialJoinAsDerivation.scala      | 2 +-
 .../offline/join/workflow/AnchoredFeatureJoinStep.scala      | 1 -
 .../feathr/offline/swa/SlidingWindowAggregationJoiner.scala  | 2 +-
 .../offline/util/SuppressedExceptionHandlerUtils.scala       | 1 -
 .../linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala  | 5 ++++-
 7 files changed, 12 insertions(+), 8 deletions(-)

diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/client/FeathrClient.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/client/FeathrClient.scala
index d7cf748fe..2bb7bccce 100644
--- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/client/FeathrClient.scala
+++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/client/FeathrClient.scala
@@ -83,7 +83,7 @@ class FeathrClient private[offline] (sparkSession: SparkSession, featureGroups:
   def joinFeaturesWithSuppressedExceptions(joinConfig: FeatureJoinConfig, obsData: SparkFeaturizedDataset,
     jobContext: JoinJobContext = JoinJobContext()): (SparkFeaturizedDataset, Map[String, String]) = {
     (joinFeatures(joinConfig, obsData, jobContext), Map(SuppressedExceptionHandlerUtils.MISSING_DATA_EXCEPTION
-      -> SuppressedExceptionHandlerUtils.missingDataSuppressedExceptionMsgs))
+      -> SuppressedExceptionHandlerUtils.missingFeatures.mkString))
   }
 
   /**
@@ -231,7 +231,7 @@ class FeathrClient private[offline] (sparkSession: SparkSession, featureGroups:
 
     val (joinedDF, header) = doJoinObsAndFeatures(joinConfig, jobContext, obsData)
     (joinedDF, header, Map(SuppressedExceptionHandlerUtils.MISSING_DATA_EXCEPTION
-      -> SuppressedExceptionHandlerUtils.missingDataSuppressedExceptionMsgs))
+      -> SuppressedExceptionHandlerUtils.missingFeatures.mkString))
   }
 
   /**
diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/DerivedFeatureEvaluator.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/DerivedFeatureEvaluator.scala
index 4d9f45af5..61bc48c4a 100644
--- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/DerivedFeatureEvaluator.scala
+++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/DerivedFeatureEvaluator.scala
@@ -10,7 +10,7 @@ import com.linkedin.feathr.offline.join.algorithms.{SequentialJoinConditionBuild
 import com.linkedin.feathr.offline.logical.FeatureGroups
 import com.linkedin.feathr.offline.mvel.plugins.FeathrExpressionExecutionContext
 import com.linkedin.feathr.offline.source.accessor.DataPathHandler
-import com.linkedin.feathr.offline.util.FeaturizedDatasetUtils
+import com.linkedin.feathr.offline.util.{FeaturizedDatasetUtils, SuppressedExceptionHandlerUtils}
 import com.linkedin.feathr.offline.{ErasedEntityTaggedFeature, FeatureDataFrame}
 import com.linkedin.feathr.sparkcommon.FeatureDerivationFunctionSpark
 import com.linkedin.feathr.{common, offline}
@@ -37,6 +37,9 @@ private[offline] class DerivedFeatureEvaluator(derivationStrategies: DerivationS
   def evaluate(keyTag: Seq[Int], keyTagList: Seq[String], contextDF: DataFrame, derivedFeature: DerivedFeature): FeatureDataFrame = {
     val tags = Some(keyTag.map(keyTagList).toList)
     val producedFeatureColName = DataFrameColName.genFeatureColumnName(derivedFeature.producedFeatureNames.head, tags)
+    if (derivedFeature.consumedFeatureNames.exists(x => SuppressedExceptionHandlerUtils.missingFeatures.contains(x.getFeatureName))) {
+      SuppressedExceptionHandlerUtils.missingFeatures.add(derivedFeature.producedFeatureNames.head)
+    }
 
     derivedFeature.derivation match {
       case g: SeqJoinDerivationFunction =>
diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/strategies/SequentialJoinAsDerivation.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/strategies/SequentialJoinAsDerivation.scala
index e00fded12..02b14820f 100644
--- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/strategies/SequentialJoinAsDerivation.scala
+++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/strategies/SequentialJoinAsDerivation.scala
@@ -66,7 +66,7 @@ private[offline] class SequentialJoinAsDerivation(ss: SparkSession,
         col(genFeatureColumnName(FEATURE_NAME_PREFIX + expansionFeatureName)))
       val missingFeature = derivedFeature.producedFeatureNames.head
       log.warn(s"Missing data for features ${missingFeature}. Default values will be populated for this column.")
-      SuppressedExceptionHandlerUtils.missingDataSuppressedExceptionMsgs += missingFeature
+      SuppressedExceptionHandlerUtils.missingFeatures += missingFeature
       return seqJoinFeatureResultWithRenamed
     }
     val aggregationFunction = seqJoinDerivationFunction.aggregation
diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/workflow/AnchoredFeatureJoinStep.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/workflow/AnchoredFeatureJoinStep.scala
index fe770a7c7..8c126ca81 100644
--- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/workflow/AnchoredFeatureJoinStep.scala
+++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/workflow/AnchoredFeatureJoinStep.scala
@@ -114,7 +114,6 @@ private[offline] class AnchoredFeatureJoinStep(
         !containsFeature.contains(true)
       })
       log.warn(s"Missing data for features ${missingFeatures.mkString}. Default values will be populated for this column.")
-      SuppressedExceptionHandlerUtils.missingDataSuppressedExceptionMsgs += missingFeatures.mkString
       SuppressedExceptionHandlerUtils.missingFeatures ++= missingFeatures
       val missingAnchoredFeatures = ctx.featureGroups.allAnchoredFeatures.filter(featureName => missingFeatures.contains(featureName._1))
       substituteDefaultsForDataMissingFeatures(ctx.sparkSession, observationDF, ctx.logicalPlan,
diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowAggregationJoiner.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowAggregationJoiner.scala
index 0683c3601..6e3f962d9 100644
--- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowAggregationJoiner.scala
+++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowAggregationJoiner.scala
@@ -179,7 +179,7 @@ private[offline] class SlidingWindowAggregationJoiner(
           res.map(emptyFeatures.add)
           val exceptionMsg = emptyFeatures.mkString
           log.warn(s"Missing data for features ${emptyFeatures}. Default values will be populated for this column.")
-          SuppressedExceptionHandlerUtils.missingDataSuppressedExceptionMsgs += exceptionMsg
+          SuppressedExceptionHandlerUtils.missingFeatures ++= emptyFeatures
           anchors.map(anchor => (anchor, originalSourceDf))
         } else {
         val sourceDF: DataFrame = preprocessedDf match {
diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/SuppressedExceptionHandlerUtils.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/SuppressedExceptionHandlerUtils.scala
index a4aaf6699..d38d33b2f 100644
--- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/SuppressedExceptionHandlerUtils.scala
+++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/SuppressedExceptionHandlerUtils.scala
@@ -7,7 +7,6 @@ import com.linkedin.feathr.offline.anchored.feature.FeatureAnchorWithSource
  */
 object SuppressedExceptionHandlerUtils {
   val MISSING_DATA_EXCEPTION = "missing_data_exception"
-  var missingDataSuppressedExceptionMsgs = ""
 
   // Set of features that may be missing because of missing data.
   var missingFeatures = scala.collection.mutable.Set.empty[String]
diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala
index 4d97667f2..ee9d7dd93 100644
--- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala
+++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala
@@ -6,7 +6,7 @@ import com.linkedin.feathr.offline.config.location.SimplePath
 import com.linkedin.feathr.offline.generation.SparkIOUtils
 import com.linkedin.feathr.offline.job.PreprocessedDataFrameManager
 import com.linkedin.feathr.offline.source.dataloader.{AvroJsonDataLoader, CsvDataLoader}
-import com.linkedin.feathr.offline.util.FeathrTestUtils
+import com.linkedin.feathr.offline.util.{FeathrTestUtils, SuppressedExceptionHandlerUtils}
 import com.linkedin.feathr.offline.util.FeathrUtils.{ADD_DEFAULT_COL_FOR_MISSING_DATA, SKIP_MISSING_FEATURE, setFeathrJobParam}
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.functions.col
@@ -560,6 +560,9 @@ class AnchoredFeaturesIntegTest extends FeathrIntegTest {
     assertEquals(featureList(0).getAs[Row]("featureWithNullSql"), 1.0f)
     assertEquals(featureList(0).getAs[Row]("seqJoin_featureWithNull"),
       Row(mutable.WrappedArray.make(Array("")), mutable.WrappedArray.make(Array(1.0f))))
+    assertEquals(SuppressedExceptionHandlerUtils.missingFeatures,
+      Set("featureWithNull", "featureWithNull3", "featureWithNull5", "featureWithNull4", "featureWithNull7",
+        "aEmbedding", "featureWithNull6", "derived_featureWithNull", "seqJoin_featureWithNull"))
     setFeathrJobParam(ADD_DEFAULT_COL_FOR_MISSING_DATA, "false")
   }
 

From c3fe65f2e332a1b18b721bf02735163a7f419961 Mon Sep 17 00:00:00 2001
From: Rakesh Kashyap Hanasoge Padmanabha <rkashyap@rkashyap-mn3.linkedin.biz>
Date: Tue, 16 May 2023 12:44:42 -0700
Subject: [PATCH 3/4] version bump

---
 gradle.properties | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/gradle.properties b/gradle.properties
index bd4e6fb38..936110679 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -1,3 +1,3 @@
-version=1.0.4-rc3
+version=1.0.4-rc4
 SONATYPE_AUTOMATIC_RELEASE=true
 POM_ARTIFACT_ID=feathr_2.12

From f23176749ca4ea727c13e58aab535bc6b8ef91a1 Mon Sep 17 00:00:00 2001
From: Rakesh Kashyap Hanasoge Padmanabha <rkashyap@rkashyap-mn3.linkedin.biz>
Date: Tue, 16 May 2023 14:29:39 -0700
Subject: [PATCH 4/4] Fix failing tests

---
 .../offline/AnchoredFeaturesIntegTest.scala      |  2 +-
 .../derived/TestSequentialJoinAsDerivation.scala | 16 +++++++++++++++-
 gradle.properties                                |  2 +-
 3 files changed, 17 insertions(+), 3 deletions(-)

diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala
index ee9d7dd93..3d35101a1 100644
--- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala
+++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala
@@ -562,7 +562,7 @@ class AnchoredFeaturesIntegTest extends FeathrIntegTest {
       Row(mutable.WrappedArray.make(Array("")), mutable.WrappedArray.make(Array(1.0f))))
     assertEquals(SuppressedExceptionHandlerUtils.missingFeatures,
       Set("featureWithNull", "featureWithNull3", "featureWithNull5", "featureWithNull4", "featureWithNull7",
-        "aEmbedding", "featureWithNull6", "derived_featureWithNull", "seqJoin_featureWithNull"))
+        "aEmbedding", "featureWithNull6", "derived_featureWithNull", "seqJoin_featureWithNull", "derived_featureWithNull7"))
     setFeathrJobParam(ADD_DEFAULT_COL_FOR_MISSING_DATA, "false")
   }
 
diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/derived/TestSequentialJoinAsDerivation.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/derived/TestSequentialJoinAsDerivation.scala
index e11a8cd2a..f061029c2 100644
--- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/derived/TestSequentialJoinAsDerivation.scala
+++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/derived/TestSequentialJoinAsDerivation.scala
@@ -11,8 +11,9 @@ import com.linkedin.feathr.offline.job.FeatureTransformation.FEATURE_NAME_PREFIX
 import com.linkedin.feathr.offline.join.algorithms.{SeqJoinExplodedJoinKeyColumnAppender, SequentialJoinConditionBuilder, SparkJoinWithJoinCondition}
 import com.linkedin.feathr.offline.logical.FeatureGroups
 import com.linkedin.feathr.offline.mvel.plugins.FeathrExpressionExecutionContext
+import com.linkedin.feathr.offline.util.FeathrUtils
 import com.linkedin.feathr.offline.{TestFeathr, TestUtils}
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkConf, SparkContext, SparkException}
 import org.apache.spark.sql.functions.{when => _, _}
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSession}
@@ -985,6 +986,12 @@ class TestSequentialJoinAsDerivation extends TestFeathr with MockitoSugar {
     val mockDerivationFunction = mock[SeqJoinDerivationFunction]
     val mockBaseTaggedDependency = mock[BaseTaggedDependency]
     val mockTaggedDependency = mock[TaggedDependency]
+    val mockSparkContext = mock[SparkContext]
+    val mockSparkConf = mock[SparkConf]
+    when(mockSparkContext.getConf).thenReturn(mockSparkConf)
+    when(mockSparkSession.sparkContext).thenReturn(mockSparkContext)
+    when(mockSparkConf.get(s"${FeathrUtils.FEATHR_PARAMS_PREFIX}${FeathrUtils.ADD_DEFAULT_COL_FOR_MISSING_DATA}", "false"))
+      .thenReturn("false")
     // mock derivation function
     when(mockDerivedFeature.derivation.asInstanceOf[SeqJoinDerivationFunction]).thenReturn(mockDerivationFunction)
     when(mockDerivedFeature.producedFeatureNames).thenReturn(Seq("seqJoinFeature"))
@@ -1059,6 +1066,13 @@ class TestSequentialJoinAsDerivation extends TestFeathr with MockitoSugar {
     val mockDerivationFunction = mock[SeqJoinDerivationFunction]
     val mockBaseTaggedDependency = mock[BaseTaggedDependency]
     val mockTaggedDependency = mock[TaggedDependency]
+    val mockSparkConf = mock[SparkConf]
+    val mockSparkContext = mock[SparkContext]
+    when(mockSparkSession.sparkContext).thenReturn(mockSparkContext)
+    when(mockSparkContext.getConf).thenReturn(mockSparkConf)
+    when(mockSparkConf.get(s"${FeathrUtils.FEATHR_PARAMS_PREFIX}${FeathrUtils.ADD_DEFAULT_COL_FOR_MISSING_DATA}",
+      "false"))
+      .thenReturn("false")
     // mock derivation function
     when(mockDerivedFeature.derivation.asInstanceOf[SeqJoinDerivationFunction]).thenReturn(mockDerivationFunction)
     when(mockDerivedFeature.producedFeatureNames).thenReturn(Seq("seqJoinFeature"))
diff --git a/gradle.properties b/gradle.properties
index 936110679..570596788 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -1,3 +1,3 @@
-version=1.0.4-rc4
+version=1.0.4-rc5
 SONATYPE_AUTOMATIC_RELEASE=true
 POM_ARTIFACT_ID=feathr_2.12