Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#64 support mainframe century #66

Open
wants to merge 2 commits into
base: feature/61-add-infinity-support
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,9 @@ import org.apache.spark.sql.types.DataType

trait InfinitySupport {
protected def infMinusSymbol: Option[String]

protected def infMinusValue: Option[String]

protected def infPlusSymbol: Option[String]

protected def infPlusValue: Option[String]

protected val origType: DataType

def replaceInfinitySymbols(column: Column): Column = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import za.co.absa.standardization.schema.{MetadataKeys, MetadataValues, StdSchem
import za.co.absa.standardization.time.DateTimePattern
import za.co.absa.standardization.typeClasses.{DoubleLike, LongLike}
import za.co.absa.standardization.types.TypedStructField._
import za.co.absa.standardization.types.parsers.DateTimeParser
import za.co.absa.standardization.types.{ParseOutput, TypeDefaults, TypedStructField}
import za.co.absa.standardization.udf.{UDFBuilder, UDFNames}

Expand Down Expand Up @@ -511,6 +512,17 @@ object TypeParser {
override protected val infPlusValue: Option[String] = metadata.getOptString(MetadataKeys.PlusInfinityValue)
private val columnWithInfinityReplaced: Column = replaceInfinitySymbols(column)

protected val replaceCenturyUDF: UserDefinedFunction = udf((inputDate: String, centuryPattern: String) => {
val centuryIndex = centuryPattern.indexOf(DateTimePattern.patternCenturyChar)
val padding = centuryPattern.length - inputDate.length
val leftPadding = "0" * padding
val pendedInput = leftPadding + inputDate

val charAtPos = pendedInput.charAt(centuryIndex).asDigit
val modifiedChar = (charAtPos + 19).toString // Add 19 and convert back to string
pendedInput.substring(0, centuryIndex) + modifiedChar + pendedInput.substring(centuryIndex + 1)
})

override protected def assemblePrimitiveCastLogic: Column = {
if (pattern.isEpoch) {
castEpoch()
Expand Down Expand Up @@ -604,14 +616,18 @@ object TypeParser {
}

override protected def castStringColumn(stringColumn: Column): Column = {
val columWithCenturyReplaced: Column = if (pattern.isCentury) {
replaceCenturyUDF(stringColumn, lit(pattern.originalPattern.get))
} else { stringColumn }

if (pattern.containsSecondFractions) {
// date doesn't need to care about second fractions
applyPatternToStringColumn(
stringColumn.removeSections(
columWithCenturyReplaced.removeSections(
Seq(pattern.millisecondsPosition, pattern.microsecondsPosition, pattern.nanosecondsPosition).flatten
), pattern.patternWithoutSecondFractions)
} else {
applyPatternToStringColumn(stringColumn, pattern)
applyPatternToStringColumn(columWithCenturyReplaced, pattern)
}
}

Expand Down Expand Up @@ -651,28 +667,32 @@ object TypeParser {
}

override protected def castStringColumn(stringColumn: Column): Column = {
val columWithCenturyReplaced: Column = if (pattern.isCentury) {
replaceCenturyUDF(stringColumn, lit(pattern.originalPattern.get))
} else { stringColumn }

if (pattern.containsSecondFractions) {
//this is a trick how to enforce fractions of seconds into the timestamp
// - turn into timestamp up to seconds precision and that into unix_timestamp,
// - the second fractions turn into numeric fractions
// - add both together and convert to timestamp
val colSeconds = unix_timestamp(applyPatternToStringColumn(
stringColumn.removeSections(
columWithCenturyReplaced.removeSections(
Seq(pattern.millisecondsPosition, pattern.microsecondsPosition, pattern.nanosecondsPosition).flatten
), pattern.patternWithoutSecondFractions))

val colMilliseconds: Option[Column] =
pattern.millisecondsPosition.map(stringColumn.zeroBasedSubstr(_).cast(decimalType) / MillisecondsPerSecond)
pattern.millisecondsPosition.map(columWithCenturyReplaced.zeroBasedSubstr(_).cast(decimalType) / MillisecondsPerSecond)
val colMicroseconds: Option[Column] =
pattern.microsecondsPosition.map(stringColumn.zeroBasedSubstr(_).cast(decimalType) / MicrosecondsPerSecond)
pattern.microsecondsPosition.map(columWithCenturyReplaced.zeroBasedSubstr(_).cast(decimalType) / MicrosecondsPerSecond)
val colNanoseconds: Option[Column] =
pattern.nanosecondsPosition.map(stringColumn.zeroBasedSubstr(_).cast(decimalType) / NanosecondsPerSecond)
pattern.nanosecondsPosition.map(columWithCenturyReplaced.zeroBasedSubstr(_).cast(decimalType) / NanosecondsPerSecond)
val colFractions: Column =
(colMilliseconds ++ colMicroseconds ++ colNanoseconds).reduceOption(_ + _).getOrElse(lit(0))

(colSeconds + colFractions).cast(TimestampType)
} else {
applyPatternToStringColumn(stringColumn, pattern)
applyPatternToStringColumn(columWithCenturyReplaced, pattern)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package za.co.absa.standardization.time

import za.co.absa.standardization.implicits.StringImplicits.StringEnhancements
import za.co.absa.standardization.time.DateTimePattern.{patternMicroSecondChar, patternMilliSecondChar, patternNanoSecondChat}
import za.co.absa.standardization.types.{Section, TypePattern}

/**
Expand All @@ -29,10 +30,12 @@ abstract sealed class DateTimePattern(pattern: String, isDefault: Boolean = fals
extends TypePattern(pattern, isDefault){

val isEpoch: Boolean
val isCentury: Boolean
val epochFactor: Long

val timeZoneInPattern: Boolean
val defaultTimeZone: Option[String]
val originalPattern: Option[String]
val isTimeZoned: Boolean

val millisecondsPosition: Option[Section]
Expand All @@ -47,7 +50,6 @@ abstract sealed class DateTimePattern(pattern: String, isDefault: Boolean = fals
val q = "\""
s"pattern: $q$pattern$q" + defaultTimeZone.map(x => s" (default time zone: $q$x$q)").getOrElse("")
}

}

object DateTimePattern {
Expand All @@ -57,6 +59,8 @@ object DateTimePattern {
val EpochMicroKeyword = "epochmicro"
val EpochNanoKeyword = "epochnano"

val patternCenturyChar = "c"

private val epochUnitFactor = 1
private val epoch1kFactor = 1000
private val epoch1MFactor = 1000000
Expand All @@ -81,10 +85,12 @@ object DateTimePattern {
extends DateTimePattern(pattern, isDefault) {

override val isEpoch: Boolean = true
override val isCentury: Boolean = false
override val epochFactor: Long = DateTimePattern.epochFactor(pattern)

override val timeZoneInPattern: Boolean = true
override val defaultTimeZone: Option[String] = None
override val originalPattern: Option[String] = None
override val isTimeZoned: Boolean = true

override val millisecondsPosition: Option[Section] = pattern match {
Expand All @@ -111,9 +117,9 @@ object DateTimePattern {
override val patternWithoutSecondFractions: String = EpochKeyword
}

private final case class StandardDTPattern(override val pattern: String,
assignedDefaultTimeZone: Option[String] = None,
override val isDefault: Boolean = false)
private abstract class StandardDTPatternBase(override val pattern: String,
assignedDefaultTimeZone: Option[String],
override val isDefault: Boolean = false)
extends DateTimePattern(pattern, isDefault) {

override val isEpoch: Boolean = false
Expand Down Expand Up @@ -143,9 +149,30 @@ object DateTimePattern {
}
}

private final case class StandardDTPattern(override val pattern: String,
assignedDefaultTimeZone: Option[String] = None,
override val isDefault: Boolean = false)
extends StandardDTPatternBase(pattern, assignedDefaultTimeZone, isDefault) {

override val isCentury: Boolean = false
override val originalPattern: Option[String] = None
}

private final case class CenturyDTPattern(override val pattern: String,
override val originalPattern: Option[String],
assignedDefaultTimeZone: Option[String] = None,
override val isDefault: Boolean = false)
extends StandardDTPatternBase(pattern, assignedDefaultTimeZone, isDefault) {

override val isCentury: Boolean = true
}

private def create(pattern: String, assignedDefaultTimeZone: Option[String], isDefault: Boolean): DateTimePattern = {
if (isEpoch(pattern)) {
EpochDTPattern(pattern, isDefault)
} else if (isCentury(pattern)) {
val patternWithoutCentury = pattern.replaceAll(patternCenturyChar, "yy")
CenturyDTPattern(patternWithoutCentury, Some(pattern), assignedDefaultTimeZone, isDefault)
} else {
StandardDTPattern(pattern, assignedDefaultTimeZone, isDefault)
}
Expand All @@ -168,6 +195,10 @@ object DateTimePattern {
}
}

def isCentury(pattern: String): Boolean = {
pattern.contains(s"${patternCenturyChar}yy")
}

def epochFactor(pattern: String): Long = {
pattern.toLowerCase match {
case EpochKeyword => epochUnitFactor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ object TypedStructField {
private def readDateTimePattern: DateTimePattern = {
structField.metadata.getOptString(MetadataKeys.Pattern).map { pattern =>
val timeZoneOpt = structField.metadata.getOptString(MetadataKeys.DefaultTimeZone)
val centuryIndex = pattern.indexOf(DateTimePattern.patternCenturyChar) // TODO
DateTimePattern(pattern, timeZoneOpt)
}.getOrElse(
DateTimePattern.asDefault(defaults.getStringPattern(structField.dataType), None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,62 @@ class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBas
assertResult(exp)(std.as[DateRow].collect().toList)
}

test("date pattern with century from string") {
val seq: Seq[String] = Seq(
"170/01/01",
"170/02/01",
"000/31/12",
"019/16/07"
)
val desiredSchema = StructType(Seq(
StructField(fieldName, DateType, nullable = false,
new MetadataBuilder()
.putString(MetadataKeys.Pattern, "cyy/dd/MM")
.build)
))
val exp: Seq[DateRow] = Seq(
DateRow(Date.valueOf("2070-01-01")),
DateRow(Date.valueOf("2070-01-02")),
DateRow(Date.valueOf("1900-12-31")),
DateRow(Date.valueOf("1919-07-16"))
)

val src = seq.toDF(fieldName)

val std = Standardization.standardize(src, desiredSchema).cacheIfNotCachedYet()
logDataFrameContent(std)

assertResult(exp)(std.as[DateRow].collect().toList)
}

test("date pattern with century from int") {
val seq: Seq[Int] = Seq(
1700101,
1700201,
3112,
191607
)
val desiredSchema = StructType(Seq(
StructField(fieldName, DateType, nullable = false,
new MetadataBuilder()
.putString(MetadataKeys.Pattern, "cyyddMM")
.build)
))
val exp: Seq[DateRow] = Seq(
DateRow(Date.valueOf("2070-01-01")),
DateRow(Date.valueOf("2070-01-02")),
DateRow(Date.valueOf("1900-12-31")),
DateRow(Date.valueOf("1919-07-16"))
)

val src = seq.toDF(fieldName)

val std = Standardization.standardize(src, desiredSchema).cacheIfNotCachedYet()
logDataFrameContent(std)

assertResult(exp)(std.as[DateRow].collect().toList)
}

test("date + time pattern and named time zone") {
val seq = Seq(
"01-00-00 01.01.1970 CET",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,62 @@ class StandardizationInterpreter_TimestampSuite extends AnyFunSuite with SparkTe
assertResult(exp)(std.as[TimestampRow].collect().toList)
}

test("pattern up to seconds precision with century pattern from string") {
val seq = Seq(
"01.01.070 00-00-00",
"02.01.070 00-00-00",
"31.12.100 23-59-59",
"16.07.119 14-41-43"
)
val desiredSchema = StructType(Seq(
StructField(fieldName, TimestampType, nullable = false,
new MetadataBuilder()
.putString("pattern", "dd.MM.cyy HH-mm-ss")
.build)
))
val exp = Seq(
TimestampRow(Timestamp.valueOf("1970-01-01 00:00:00")),
TimestampRow(Timestamp.valueOf("1970-01-02 00:00:00")),
TimestampRow(Timestamp.valueOf("2000-12-31 23:59:59")),
TimestampRow(Timestamp.valueOf("2019-07-16 14:41:43"))
)

val src = seq.toDF(fieldName)

val std = Standardization.standardize(src, desiredSchema).cacheIfNotCachedYet()
logDataFrameContent(std)

assertResult(exp)(std.as[TimestampRow].collect().toList)
}

test("pattern up to seconds precision with century pattern fron int") {
val seq: Seq[Long] = Seq(
101070000000L,
201070000000L,
3112100235959L,
1607119144143L
)
val desiredSchema = StructType(Seq(
StructField(fieldName, TimestampType, nullable = false,
new MetadataBuilder()
.putString("pattern", "ddMMcyyHHmmss")
.build)
))
val exp = Seq(
TimestampRow(Timestamp.valueOf("1970-01-01 00:00:00")),
TimestampRow(Timestamp.valueOf("1970-01-02 00:00:00")),
TimestampRow(Timestamp.valueOf("2000-12-31 23:59:59")),
TimestampRow(Timestamp.valueOf("2019-07-16 14:41:43"))
)

val src = seq.toDF(fieldName)

val std = Standardization.standardize(src, desiredSchema).cacheIfNotCachedYet()
logDataFrameContent(std)

assertResult(exp)(std.as[TimestampRow].collect().toList)
}

test("pattern up to seconds precision with default time zone") {
val seq = Seq(
"31.12.1969 19-00-00",
Expand Down