From c86020825deb775dff1fa0292ab232f57b199229 Mon Sep 17 00:00:00 2001 From: Rakesh Kashyap Hanasoge Padmanabha <rkashyap@rkashyap-mn3.linkedin.biz> Date: Mon, 15 May 2023 21:30:54 -0700 Subject: [PATCH 1/4] Seq join bug fix --- .../SequentialJoinAsDerivation.scala | 21 ++++++++++++++++--- .../workflow/AnchoredFeatureJoinStep.scala | 7 +++++-- .../AnchorToDataSourceMapper.scala | 1 - .../SuppressedExceptionHandlerUtils.scala | 5 +++++ .../offline/AnchoredFeaturesIntegTest.scala | 12 ++++++++++- 5 files changed, 39 insertions(+), 7 deletions(-) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/strategies/SequentialJoinAsDerivation.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/strategies/SequentialJoinAsDerivation.scala index d35fc5486..e00fded12 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/strategies/SequentialJoinAsDerivation.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/strategies/SequentialJoinAsDerivation.scala @@ -7,6 +7,7 @@ import com.linkedin.feathr.common.{FeatureAggregationType, FeatureValue} import com.linkedin.feathr.offline.PostTransformationUtil import com.linkedin.feathr.offline.anchored.feature.FeatureAnchorWithSource import com.linkedin.feathr.offline.client.DataFrameColName +import com.linkedin.feathr.offline.client.DataFrameColName.genFeatureColumnName import com.linkedin.feathr.offline.derived.DerivedFeature import com.linkedin.feathr.offline.derived.functions.SeqJoinDerivationFunction import com.linkedin.feathr.offline.derived.strategies.SequentialJoinAsDerivation._ @@ -18,7 +19,7 @@ import com.linkedin.feathr.offline.mvel.plugins.FeathrExpressionExecutionContext import com.linkedin.feathr.offline.source.accessor.DataPathHandler import com.linkedin.feathr.offline.transformation.DataFrameDefaultValueSubstituter.substituteDefaults import com.linkedin.feathr.offline.transformation.{AnchorToDataSourceMapper, MvelDefinition} -import com.linkedin.feathr.offline.util.{CoercionUtilsScala, DataFrameSplitterMerger, FeathrUtils, FeaturizedDatasetUtils} +import com.linkedin.feathr.offline.util.{CoercionUtilsScala, DataFrameSplitterMerger, FeathrUtils, FeaturizedDatasetUtils, SuppressedExceptionHandlerUtils} import com.linkedin.feathr.sparkcommon.{ComplexAggregation, SeqJoinCustomAggregation} import org.apache.logging.log4j.LogManager import org.apache.spark.sql.functions._ @@ -52,6 +53,22 @@ private[offline] class SequentialJoinAsDerivation(ss: SparkSession, val seqJoinDerivationFunction = derivationFunction val baseFeatureName = seqJoinDerivationFunction.left.feature val expansionFeatureName = seqJoinDerivationFunction.right.feature + val shouldAddDefault = FeathrUtils.getFeathrJobParam(ss, FeathrUtils.ADD_DEFAULT_COL_FOR_MISSING_DATA).toBoolean + + // If expansion feature is missing because of data issues, then we just copy the expansion feature column to be the final feature value. + if (shouldAddDefault && SuppressedExceptionHandlerUtils.missingFeatures.contains(expansionFeatureName)) { + val seqJoinedFeatureResult = df.withColumn(genFeatureColumnName(FEATURE_NAME_PREFIX + derivedFeature.producedFeatureNames.head), + col(genFeatureColumnName(FEATURE_NAME_PREFIX + expansionFeatureName))) + + // Add the additional column with the keytags to mimic the exact behavior of the seq join flow, this will get dropped later. + val seqJoinFeatureResultWithRenamed = seqJoinedFeatureResult.withColumn(genFeatureColumnName(FEATURE_NAME_PREFIX + + derivedFeature.producedFeatureNames.head, Some(keyTags.map(keyTagList).toList)), + col(genFeatureColumnName(FEATURE_NAME_PREFIX + expansionFeatureName))) + val missingFeature = derivedFeature.producedFeatureNames.head + log.warn(s"Missing data for features ${missingFeature}. Default values will be populated for this column.") + SuppressedExceptionHandlerUtils.missingDataSuppressedExceptionMsgs += missingFeature + return seqJoinFeatureResultWithRenamed + } val aggregationFunction = seqJoinDerivationFunction.aggregation val tagStrList = Some(keyTags.map(keyTagList).toList) val outputKey = seqJoinDerivationFunction.left.outputKey @@ -219,8 +236,6 @@ private[offline] class SequentialJoinAsDerivation(ss: SparkSession, val anchorDFMap1 = anchorToDataSourceMapper.getBasicAnchorDFMapForJoin(ss, Seq(featureAnchor), failOnMissingPartition) val updatedAnchorDFMap = anchorDFMap1.filter(anchorEntry => anchorEntry._2.isDefined) .map(anchorEntry => anchorEntry._1 -> anchorEntry._2.get) - // We dont need to check if the anchored feature's dataframes are missing (due to skip missing feature) as such - // seq join features have already been removed in the FeatureGroupsUpdater#getUpdatedFeatureGroupsWithoutInvalidPaths. val featureInfo = FeatureTransformation.directCalculate( anchorGroup: AnchorFeatureGroups, updatedAnchorDFMap(featureAnchor), diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/workflow/AnchoredFeatureJoinStep.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/workflow/AnchoredFeatureJoinStep.scala index 51d97ebe6..fe770a7c7 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/workflow/AnchoredFeatureJoinStep.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/workflow/AnchoredFeatureJoinStep.scala @@ -17,7 +17,7 @@ import com.linkedin.feathr.offline.mvel.plugins.FeathrExpressionExecutionContext import com.linkedin.feathr.offline.source.accessor.DataSourceAccessor import com.linkedin.feathr.offline.transformation.DataFrameDefaultValueSubstituter.substituteDefaults import com.linkedin.feathr.offline.transformation.DataFrameExt._ -import com.linkedin.feathr.offline.util.{DataFrameUtils, FeathrUtils, FeaturizedDatasetUtils} +import com.linkedin.feathr.offline.util.{DataFrameUtils, FeathrUtils, FeaturizedDatasetUtils, SuppressedExceptionHandlerUtils} import com.linkedin.feathr.offline.util.FeathrUtils.shouldCheckPoint import org.apache.logging.log4j.LogManager import org.apache.spark.sql.{DataFrame, SparkSession} @@ -113,10 +113,13 @@ private[offline] class AnchoredFeatureJoinStep( val containsFeature: Seq[Boolean] = anchorDFMap.map(y => y._1.selectedFeatures.contains(x)).toSeq !containsFeature.contains(true) }) + log.warn(s"Missing data for features ${missingFeatures.mkString}. Default values will be populated for this column.") + SuppressedExceptionHandlerUtils.missingDataSuppressedExceptionMsgs += missingFeatures.mkString + SuppressedExceptionHandlerUtils.missingFeatures ++= missingFeatures val missingAnchoredFeatures = ctx.featureGroups.allAnchoredFeatures.filter(featureName => missingFeatures.contains(featureName._1)) substituteDefaultsForDataMissingFeatures(ctx.sparkSession, observationDF, ctx.logicalPlan, missingAnchoredFeatures) - }else observationDF + } else observationDF val allAnchoredFeatures: Map[String, FeatureAnchorWithSource] = ctx.featureGroups.allAnchoredFeatures val joinStages = ctx.logicalPlan.joinStages diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala index cd18e512a..eb0cc2804 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala @@ -86,7 +86,6 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH } catch { case e: Exception => if (shouldSkipFeature || shouldAddDefaultCol) None else throw e } - anchorsWithDate.map(anchor => (anchor, timeSeriesSource)) }) } diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/SuppressedExceptionHandlerUtils.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/SuppressedExceptionHandlerUtils.scala index 95e1fe90c..a4aaf6699 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/SuppressedExceptionHandlerUtils.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/SuppressedExceptionHandlerUtils.scala @@ -1,9 +1,14 @@ package com.linkedin.feathr.offline.util +import com.linkedin.feathr.offline.anchored.feature.FeatureAnchorWithSource + /** * Util classes and methods to handle suppressed exceptions. */ object SuppressedExceptionHandlerUtils { val MISSING_DATA_EXCEPTION = "missing_data_exception" var missingDataSuppressedExceptionMsgs = "" + + // Set of features that may be missing because of missing data. + var missingFeatures = scala.collection.mutable.Set.empty[String] } diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala index 071db5034..4d97667f2 100644 --- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala +++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala @@ -409,7 +409,7 @@ class AnchoredFeaturesIntegTest extends FeathrIntegTest { | key: a_id | featureList: ["featureWithNull", "derived_featureWithNull", "featureWithNull2", "featureWithNull3", "featureWithNull4", | "featureWithNull5", "derived_featureWithNull2", "featureWithNull6", "featureWithNull7", "derived_featureWithNull7" - | "aEmbedding", "memberEmbeddingAutoTZ", "aEmbedding", "featureWithNullSql"] + | "aEmbedding", "memberEmbeddingAutoTZ", "aEmbedding", "featureWithNullSql", "seqJoin_featureWithNull"] | } """.stripMargin, featureDefAsString = @@ -529,6 +529,14 @@ class AnchoredFeaturesIntegTest extends FeathrIntegTest { | derived_featureWithNull: "featureWithNull * 2" | derived_featureWithNull2: "featureWithNull2 * 2" | derived_featureWithNull7: "featureWithNull7 * 2" + | seqJoin_featureWithNull: { + | key: x + | join: { + | base: {key: x, feature: featureWithNull2} + | expansion: {key: y, feature: featureWithNull5} + | } + | aggregation: "SUM" + | } |} """.stripMargin, observationDataPath = "anchorAndDerivations/testMVELLoopExpFeature-observations.csv") @@ -550,6 +558,8 @@ class AnchoredFeaturesIntegTest extends FeathrIntegTest { assertEquals(featureList(0).getAs[Row]("derived_featureWithNull2"), Row(mutable.WrappedArray.make(Array("")), mutable.WrappedArray.make(Array(2.0f)))) assertEquals(featureList(0).getAs[Row]("featureWithNullSql"), 1.0f) + assertEquals(featureList(0).getAs[Row]("seqJoin_featureWithNull"), + Row(mutable.WrappedArray.make(Array("")), mutable.WrappedArray.make(Array(1.0f)))) setFeathrJobParam(ADD_DEFAULT_COL_FOR_MISSING_DATA, "false") } From 559fbcaca0d25727674d885f44a5fe79509d2cad Mon Sep 17 00:00:00 2001 From: Rakesh Kashyap Hanasoge Padmanabha <rkashyap@rkashyap-mn3.linkedin.biz> Date: Tue, 16 May 2023 12:09:50 -0700 Subject: [PATCH 2/4] Address comments --- .../com/linkedin/feathr/offline/client/FeathrClient.scala | 4 ++-- .../feathr/offline/derived/DerivedFeatureEvaluator.scala | 5 ++++- .../derived/strategies/SequentialJoinAsDerivation.scala | 2 +- .../offline/join/workflow/AnchoredFeatureJoinStep.scala | 1 - .../feathr/offline/swa/SlidingWindowAggregationJoiner.scala | 2 +- .../offline/util/SuppressedExceptionHandlerUtils.scala | 1 - .../linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala | 5 ++++- 7 files changed, 12 insertions(+), 8 deletions(-) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/client/FeathrClient.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/client/FeathrClient.scala index d7cf748fe..2bb7bccce 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/client/FeathrClient.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/client/FeathrClient.scala @@ -83,7 +83,7 @@ class FeathrClient private[offline] (sparkSession: SparkSession, featureGroups: def joinFeaturesWithSuppressedExceptions(joinConfig: FeatureJoinConfig, obsData: SparkFeaturizedDataset, jobContext: JoinJobContext = JoinJobContext()): (SparkFeaturizedDataset, Map[String, String]) = { (joinFeatures(joinConfig, obsData, jobContext), Map(SuppressedExceptionHandlerUtils.MISSING_DATA_EXCEPTION - -> SuppressedExceptionHandlerUtils.missingDataSuppressedExceptionMsgs)) + -> SuppressedExceptionHandlerUtils.missingFeatures.mkString)) } /** @@ -231,7 +231,7 @@ class FeathrClient private[offline] (sparkSession: SparkSession, featureGroups: val (joinedDF, header) = doJoinObsAndFeatures(joinConfig, jobContext, obsData) (joinedDF, header, Map(SuppressedExceptionHandlerUtils.MISSING_DATA_EXCEPTION - -> SuppressedExceptionHandlerUtils.missingDataSuppressedExceptionMsgs)) + -> SuppressedExceptionHandlerUtils.missingFeatures.mkString)) } /** diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/DerivedFeatureEvaluator.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/DerivedFeatureEvaluator.scala index 4d9f45af5..61bc48c4a 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/DerivedFeatureEvaluator.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/DerivedFeatureEvaluator.scala @@ -10,7 +10,7 @@ import com.linkedin.feathr.offline.join.algorithms.{SequentialJoinConditionBuild import com.linkedin.feathr.offline.logical.FeatureGroups import com.linkedin.feathr.offline.mvel.plugins.FeathrExpressionExecutionContext import com.linkedin.feathr.offline.source.accessor.DataPathHandler -import com.linkedin.feathr.offline.util.FeaturizedDatasetUtils +import com.linkedin.feathr.offline.util.{FeaturizedDatasetUtils, SuppressedExceptionHandlerUtils} import com.linkedin.feathr.offline.{ErasedEntityTaggedFeature, FeatureDataFrame} import com.linkedin.feathr.sparkcommon.FeatureDerivationFunctionSpark import com.linkedin.feathr.{common, offline} @@ -37,6 +37,9 @@ private[offline] class DerivedFeatureEvaluator(derivationStrategies: DerivationS def evaluate(keyTag: Seq[Int], keyTagList: Seq[String], contextDF: DataFrame, derivedFeature: DerivedFeature): FeatureDataFrame = { val tags = Some(keyTag.map(keyTagList).toList) val producedFeatureColName = DataFrameColName.genFeatureColumnName(derivedFeature.producedFeatureNames.head, tags) + if (derivedFeature.consumedFeatureNames.exists(x => SuppressedExceptionHandlerUtils.missingFeatures.contains(x.getFeatureName))) { + SuppressedExceptionHandlerUtils.missingFeatures.add(derivedFeature.producedFeatureNames.head) + } derivedFeature.derivation match { case g: SeqJoinDerivationFunction => diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/strategies/SequentialJoinAsDerivation.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/strategies/SequentialJoinAsDerivation.scala index e00fded12..02b14820f 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/strategies/SequentialJoinAsDerivation.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/strategies/SequentialJoinAsDerivation.scala @@ -66,7 +66,7 @@ private[offline] class SequentialJoinAsDerivation(ss: SparkSession, col(genFeatureColumnName(FEATURE_NAME_PREFIX + expansionFeatureName))) val missingFeature = derivedFeature.producedFeatureNames.head log.warn(s"Missing data for features ${missingFeature}. Default values will be populated for this column.") - SuppressedExceptionHandlerUtils.missingDataSuppressedExceptionMsgs += missingFeature + SuppressedExceptionHandlerUtils.missingFeatures += missingFeature return seqJoinFeatureResultWithRenamed } val aggregationFunction = seqJoinDerivationFunction.aggregation diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/workflow/AnchoredFeatureJoinStep.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/workflow/AnchoredFeatureJoinStep.scala index fe770a7c7..8c126ca81 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/workflow/AnchoredFeatureJoinStep.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/workflow/AnchoredFeatureJoinStep.scala @@ -114,7 +114,6 @@ private[offline] class AnchoredFeatureJoinStep( !containsFeature.contains(true) }) log.warn(s"Missing data for features ${missingFeatures.mkString}. Default values will be populated for this column.") - SuppressedExceptionHandlerUtils.missingDataSuppressedExceptionMsgs += missingFeatures.mkString SuppressedExceptionHandlerUtils.missingFeatures ++= missingFeatures val missingAnchoredFeatures = ctx.featureGroups.allAnchoredFeatures.filter(featureName => missingFeatures.contains(featureName._1)) substituteDefaultsForDataMissingFeatures(ctx.sparkSession, observationDF, ctx.logicalPlan, diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowAggregationJoiner.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowAggregationJoiner.scala index 0683c3601..6e3f962d9 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowAggregationJoiner.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowAggregationJoiner.scala @@ -179,7 +179,7 @@ private[offline] class SlidingWindowAggregationJoiner( res.map(emptyFeatures.add) val exceptionMsg = emptyFeatures.mkString log.warn(s"Missing data for features ${emptyFeatures}. Default values will be populated for this column.") - SuppressedExceptionHandlerUtils.missingDataSuppressedExceptionMsgs += exceptionMsg + SuppressedExceptionHandlerUtils.missingFeatures ++= emptyFeatures anchors.map(anchor => (anchor, originalSourceDf)) } else { val sourceDF: DataFrame = preprocessedDf match { diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/SuppressedExceptionHandlerUtils.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/SuppressedExceptionHandlerUtils.scala index a4aaf6699..d38d33b2f 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/SuppressedExceptionHandlerUtils.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/SuppressedExceptionHandlerUtils.scala @@ -7,7 +7,6 @@ import com.linkedin.feathr.offline.anchored.feature.FeatureAnchorWithSource */ object SuppressedExceptionHandlerUtils { val MISSING_DATA_EXCEPTION = "missing_data_exception" - var missingDataSuppressedExceptionMsgs = "" // Set of features that may be missing because of missing data. var missingFeatures = scala.collection.mutable.Set.empty[String] diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala index 4d97667f2..ee9d7dd93 100644 --- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala +++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala @@ -6,7 +6,7 @@ import com.linkedin.feathr.offline.config.location.SimplePath import com.linkedin.feathr.offline.generation.SparkIOUtils import com.linkedin.feathr.offline.job.PreprocessedDataFrameManager import com.linkedin.feathr.offline.source.dataloader.{AvroJsonDataLoader, CsvDataLoader} -import com.linkedin.feathr.offline.util.FeathrTestUtils +import com.linkedin.feathr.offline.util.{FeathrTestUtils, SuppressedExceptionHandlerUtils} import com.linkedin.feathr.offline.util.FeathrUtils.{ADD_DEFAULT_COL_FOR_MISSING_DATA, SKIP_MISSING_FEATURE, setFeathrJobParam} import org.apache.spark.sql.Row import org.apache.spark.sql.functions.col @@ -560,6 +560,9 @@ class AnchoredFeaturesIntegTest extends FeathrIntegTest { assertEquals(featureList(0).getAs[Row]("featureWithNullSql"), 1.0f) assertEquals(featureList(0).getAs[Row]("seqJoin_featureWithNull"), Row(mutable.WrappedArray.make(Array("")), mutable.WrappedArray.make(Array(1.0f)))) + assertEquals(SuppressedExceptionHandlerUtils.missingFeatures, + Set("featureWithNull", "featureWithNull3", "featureWithNull5", "featureWithNull4", "featureWithNull7", + "aEmbedding", "featureWithNull6", "derived_featureWithNull", "seqJoin_featureWithNull")) setFeathrJobParam(ADD_DEFAULT_COL_FOR_MISSING_DATA, "false") } From c3fe65f2e332a1b18b721bf02735163a7f419961 Mon Sep 17 00:00:00 2001 From: Rakesh Kashyap Hanasoge Padmanabha <rkashyap@rkashyap-mn3.linkedin.biz> Date: Tue, 16 May 2023 12:44:42 -0700 Subject: [PATCH 3/4] version bump --- gradle.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index bd4e6fb38..936110679 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ -version=1.0.4-rc3 +version=1.0.4-rc4 SONATYPE_AUTOMATIC_RELEASE=true POM_ARTIFACT_ID=feathr_2.12 From f23176749ca4ea727c13e58aab535bc6b8ef91a1 Mon Sep 17 00:00:00 2001 From: Rakesh Kashyap Hanasoge Padmanabha <rkashyap@rkashyap-mn3.linkedin.biz> Date: Tue, 16 May 2023 14:29:39 -0700 Subject: [PATCH 4/4] Fix failing tests --- .../offline/AnchoredFeaturesIntegTest.scala | 2 +- .../derived/TestSequentialJoinAsDerivation.scala | 16 +++++++++++++++- gradle.properties | 2 +- 3 files changed, 17 insertions(+), 3 deletions(-) diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala index ee9d7dd93..3d35101a1 100644 --- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala +++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala @@ -562,7 +562,7 @@ class AnchoredFeaturesIntegTest extends FeathrIntegTest { Row(mutable.WrappedArray.make(Array("")), mutable.WrappedArray.make(Array(1.0f)))) assertEquals(SuppressedExceptionHandlerUtils.missingFeatures, Set("featureWithNull", "featureWithNull3", "featureWithNull5", "featureWithNull4", "featureWithNull7", - "aEmbedding", "featureWithNull6", "derived_featureWithNull", "seqJoin_featureWithNull")) + "aEmbedding", "featureWithNull6", "derived_featureWithNull", "seqJoin_featureWithNull", "derived_featureWithNull7")) setFeathrJobParam(ADD_DEFAULT_COL_FOR_MISSING_DATA, "false") } diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/derived/TestSequentialJoinAsDerivation.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/derived/TestSequentialJoinAsDerivation.scala index e11a8cd2a..f061029c2 100644 --- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/derived/TestSequentialJoinAsDerivation.scala +++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/derived/TestSequentialJoinAsDerivation.scala @@ -11,8 +11,9 @@ import com.linkedin.feathr.offline.job.FeatureTransformation.FEATURE_NAME_PREFIX import com.linkedin.feathr.offline.join.algorithms.{SeqJoinExplodedJoinKeyColumnAppender, SequentialJoinConditionBuilder, SparkJoinWithJoinCondition} import com.linkedin.feathr.offline.logical.FeatureGroups import com.linkedin.feathr.offline.mvel.plugins.FeathrExpressionExecutionContext +import com.linkedin.feathr.offline.util.FeathrUtils import com.linkedin.feathr.offline.{TestFeathr, TestUtils} -import org.apache.spark.SparkException +import org.apache.spark.{SparkConf, SparkContext, SparkException} import org.apache.spark.sql.functions.{when => _, _} import org.apache.spark.sql.types._ import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSession} @@ -985,6 +986,12 @@ class TestSequentialJoinAsDerivation extends TestFeathr with MockitoSugar { val mockDerivationFunction = mock[SeqJoinDerivationFunction] val mockBaseTaggedDependency = mock[BaseTaggedDependency] val mockTaggedDependency = mock[TaggedDependency] + val mockSparkContext = mock[SparkContext] + val mockSparkConf = mock[SparkConf] + when(mockSparkContext.getConf).thenReturn(mockSparkConf) + when(mockSparkSession.sparkContext).thenReturn(mockSparkContext) + when(mockSparkConf.get(s"${FeathrUtils.FEATHR_PARAMS_PREFIX}${FeathrUtils.ADD_DEFAULT_COL_FOR_MISSING_DATA}", "false")) + .thenReturn("false") // mock derivation function when(mockDerivedFeature.derivation.asInstanceOf[SeqJoinDerivationFunction]).thenReturn(mockDerivationFunction) when(mockDerivedFeature.producedFeatureNames).thenReturn(Seq("seqJoinFeature")) @@ -1059,6 +1066,13 @@ class TestSequentialJoinAsDerivation extends TestFeathr with MockitoSugar { val mockDerivationFunction = mock[SeqJoinDerivationFunction] val mockBaseTaggedDependency = mock[BaseTaggedDependency] val mockTaggedDependency = mock[TaggedDependency] + val mockSparkConf = mock[SparkConf] + val mockSparkContext = mock[SparkContext] + when(mockSparkSession.sparkContext).thenReturn(mockSparkContext) + when(mockSparkContext.getConf).thenReturn(mockSparkConf) + when(mockSparkConf.get(s"${FeathrUtils.FEATHR_PARAMS_PREFIX}${FeathrUtils.ADD_DEFAULT_COL_FOR_MISSING_DATA}", + "false")) + .thenReturn("false") // mock derivation function when(mockDerivedFeature.derivation.asInstanceOf[SeqJoinDerivationFunction]).thenReturn(mockDerivationFunction) when(mockDerivedFeature.producedFeatureNames).thenReturn(Seq("seqJoinFeature")) diff --git a/gradle.properties b/gradle.properties index 936110679..570596788 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ -version=1.0.4-rc4 +version=1.0.4-rc5 SONATYPE_AUTOMATIC_RELEASE=true POM_ARTIFACT_ID=feathr_2.12