Skip to content

Commit

Permalink
#957 Implement earliest and latest functions (#1018)
Browse files Browse the repository at this point in the history
Signed-off-by: currantw <[email protected]>
  • Loading branch information
currantw authored Jan 17, 2025
1 parent 5884fea commit 54248bb
Show file tree
Hide file tree
Showing 9 changed files with 160 additions and 39 deletions.
16 changes: 15 additions & 1 deletion docs/ppl-lang/PPL-Example-Commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -499,9 +499,23 @@ _- **Limitation: another command usage of (relation) subquery is in `appendcols`
- `source = table | eval cdate = CAST('2012-08-07' as date), ctime = cast('2012-08-07T08:07:06' as timestamp) | fields cdate, ctime`
- `source = table | eval chained_cast = cast(cast("true" as boolean) as integer) | fields chained_cast`

### **Relative Time Functions**

#### **relative_timestamp**
[See additional function details](functions/ppl-datetime#RELATIVE_TIMESTAMP)
[See additional function details](functions/ppl-datetime#relative_timestamp)
- `source = table | eval one_hour_ago = relative_timestamp("-1h") | where timestamp < one_hour_ago`
- `source = table | eval start_of_today = relative_timestamp("@d") | where timestamp > start_of_today`
- `source = table | eval last_saturday = relative_timestamp("-1d@w6") | where timestamp >= last_saturday`

#### **earliest**
[See additional function details](functions/ppl-datetime#earliest)
- `source = table | where earliest("-1wk", timestamp)`
- `source = table | where earliest("@qtr", timestamp)`
- `source = table | where earliest("-2y@q", timestamp)`

#### **latest**
[See additional function details](functions/ppl-datetime#latest)
- `source = table | where latest("-60m", timestamp)`
- `source = table | where latest("@year", timestamp)`
- `source = table | where latest("-day@w1", timestamp)`
---
92 changes: 88 additions & 4 deletions docs/ppl-lang/functions/ppl-datetime.md
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,45 @@ Example:
+-------------------------------+


### `EARLIEST`

**Description:**

**Usage:** earliest(string, timestamp) returns whether the timestamp defined by the given relative string is earlier
than or at the same time as the given timestamp. See [RELATIVE_TIMESTAMP](#relative_timestamp)
for more details on relative timestamp strings.

Argument type: STRING, TIMESTAMP

Return type: BOOLEAN

Example:

os> source=people | eval earliest = earliest("-1s", now()) | fields earliest | head 1
fetched rows / total rows = 1/1
+----------+
| earliest |
|----------|
| True |
+----------+

os> source=people | eval earliest = earliest("now", now()) | fields earliest | head 1
fetched rows / total rows = 1/1
+----------+
| earliest |
|----------|
| True |
+----------+

os> source=people | eval earliest = earliest("+1s", now()) | fields earliest | head 1
fetched rows / total rows = 1/1
+----------+
| earliest |
|----------|
| False |
+----------+


### `FROM_UNIXTIME`

**Description:**
Expand Down Expand Up @@ -507,6 +546,45 @@ Example:
+--------------------------+


### `LATEST`

**Description:**

**Usage:** latest(string, timestamp) returns whether the timestamp defined by the given relative string is later
than or at the same time as the given timestamp. See [RELATIVE_TIMESTAMP](#relative_timestamp)
for more details on relative timestamp strings.

Argument type: STRING, TIMESTAMP

Return type: BOOLEAN

Example:

os> source=people | eval latest = latest("-1s", now()) | fields latest | head 1
fetched rows / total rows = 1/1
+--------+
| latest |
|--------|
| False |
+--------+

os> source=people | eval latest = latest("now", now()) | fields latest | head 1
fetched rows / total rows = 1/1
+--------+
| latest |
|--------|
| True |
+--------+

os> source=people | eval latest = latest("+1s", now()) | fields latest | head 1
fetched rows / total rows = 1/1
+--------+
| latest |
|--------|
| True |
+--------+


### `LOCALTIMESTAMP`

**Description:**
Expand Down Expand Up @@ -738,7 +816,7 @@ Example:
**Description:**


**Usage:** relative_timestamp(str) returns a relative timestamp corresponding to the given relative string and the
**Usage:** relative_timestamp(string) returns a relative timestamp corresponding to the given relative string and the
current timestamp at the time of query execution.

The relative timestamp string has syntax `[+|-]<offset_time_integer><offset_time_unit>@<snap_time_unit>`, and is
Expand All @@ -750,9 +828,15 @@ made up of two optional components.
specified), and rounds the time <i>down</i> to the start of the specified time unit. For example, `@wk` is the start
of the current week (Sunday is considered to be the first day of the week).

The special relative timestamp string `now`, corresponding to the current timestamp, is also supported. The current
timestamp is determined once at the start of query execution, and is used for all relative timestamp calculations for
that query.
The special relative timestamp string `now`, corresponding to the current timestamp, is also supported.

The current timestamp is determined once at the start of query execution, and is used for all relative timestamp
calculations for that query. The Spark session time zone (`spark.sql.session.timeZone`) is used for determining relative
timestamps. Offsets using time units (seconds, minutes, or hours) represent a fixed time period; adding twenty-four
hours (`+24h`) will yield a timestamp that is exactly twenty-four hours later, but which may not have the same local
time (because of daylight savings, for example). Conversely, offsets using date units (days, weeks, months, quarters, or
years) do not represent a fixed time period; adding one day (`+1d`) will yield a timestamp with the same local time,
but which may not be exactly twenty-four hours later.

The relative timestamp string is case-insensitive.

Expand Down
1 change: 1 addition & 0 deletions docs/ppl-lang/ppl-where-command.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,4 @@ PPL query:
| where case(factor = 2, 'even', factor = 4, 'even', factor = 6, 'even', factor = 8, 'even' else 'odd') = 'even'
| stats count() by factor`
- `source = table | where timestamp >= relative_timestamp("-1d@w6")`
- `source = table | where earliest("-1d@w0", timestamp) and latest("now", timestamp)`
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ class FlintSparkPPLBuiltInDateTimeFunctionITSuite
assertSameRows(Seq(Row(23)), frame)

frame = sql(s"""
| source =$testTable
| source=$testTable
| | eval day = day_of_week(relative_timestamp("@w0"))
| | fields day
| | head 1
Expand All @@ -406,30 +406,28 @@ class FlintSparkPPLBuiltInDateTimeFunctionITSuite
assertSameRows(Seq(Row(true)), frame)
}

// TODO #957: Support earliest
ignore("test EARLIEST") {
test("test EARLIEST") {
var frame = sql(s"""
| source=$testTable
| | eval earliest_hour_before = earliest(now(), "-1h")
| | eval earliest_now = earliest(now(), "now")
| | eval earliest_hour_after = earliest(now(), "+1h")
| | fields earliest_hour_before, earliest_now, earliest_hour_after
| | eval earliest_second_before = earliest("-1s", now())
| | eval earliest_now = earliest("now", now())
| | eval earliest_second_after = earliest("+1s", now())
| | fields earliest_second_before, earliest_now, earliest_second_after
| | head 1
| """.stripMargin)
assertSameRows(Seq(Row(true), Row(true), Row(false)), frame)
assertSameRows(Seq(Row(true, true, false)), frame)
}

// TODO #957: Support latest
ignore("test LATEST") {
test("test LATEST") {
var frame = sql(s"""
| source=$testTable
| | eval latest_hour_before = latest(now(), "-1h")
| | eval latest_now = latest(now(), "now")
| | eval latest_hour_after = latest(now(), "+1h")
| | fields latest_hour_before, latest_now, latest_hour_after
| | eval latest_second_before = latest("-1s", now())
| | eval latest_now = latest("now", now())
| | eval latest_second_after = latest("+1s", now())
| | fields latest_second_before, latest_now, latest_second_after
| | head 1
| """.stripMargin)
assertSameRows(Seq(Row(false), Row(true), Row(true)), frame)
assertSameRows(Seq(Row(false, true, true)), frame)
}

test("test CURRENT_TIME is not supported") {
Expand Down
10 changes: 5 additions & 5 deletions ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,6 @@ FIRST: 'FIRST';
LAST: 'LAST';
LIST: 'LIST';
VALUES: 'VALUES';
EARLIEST: 'EARLIEST';
EARLIEST_TIME: 'EARLIEST_TIME';
LATEST: 'LATEST';
LATEST_TIME: 'LATEST_TIME';
PER_DAY: 'PER_DAY';
PER_HOUR: 'PER_HOUR';
PER_MINUTE: 'PER_MINUTE';
Expand Down Expand Up @@ -338,7 +334,6 @@ MONTHNAME: 'MONTHNAME';
NOW: 'NOW';
PERIOD_ADD: 'PERIOD_ADD';
PERIOD_DIFF: 'PERIOD_DIFF';
RELATIVE_TIMESTAMP: 'RELATIVE_TIMESTAMP';
SEC_TO_TIME: 'SEC_TO_TIME';
STR_TO_DATE: 'STR_TO_DATE';
SUBDATE: 'SUBDATE';
Expand All @@ -360,6 +355,11 @@ UTC_TIMESTAMP: 'UTC_TIMESTAMP';
WEEKDAY: 'WEEKDAY';
YEARWEEK: 'YEARWEEK';

// RELATIVE TIME FUNCTIONS
RELATIVE_TIMESTAMP: 'RELATIVE_TIMESTAMP';
EARLIEST: 'EARLIEST';
LATEST: 'LATEST';

// TEXT FUNCTIONS
SUBSTR: 'SUBSTR';
SUBSTRING: 'SUBSTRING';
Expand Down
12 changes: 7 additions & 5 deletions ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,6 @@ dateTimeFunctionName
| NOW
| PERIOD_ADD
| PERIOD_DIFF
| RELATIVE_TIMESTAMP
| QUARTER
| SECOND
| SECOND_OF_MINUTE
Expand All @@ -780,6 +779,13 @@ dateTimeFunctionName
| WEEK_OF_YEAR
| YEAR
| YEARWEEK
| relativeTimeFunctionName
;

relativeTimeFunctionName
: RELATIVE_TIMESTAMP
| EARLIEST
| LATEST
;

getFormatFunction
Expand Down Expand Up @@ -1171,10 +1177,6 @@ keywordsCanBeId
| LAST
| LIST
| VALUES
| EARLIEST
| EARLIEST_TIME
| LATEST
| LATEST_TIME
| PER_DAY
| PER_HOUR
| PER_MINUTE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,10 @@ public enum BuiltinFunctionName {
LOCALTIMESTAMP(FunctionName.of("localtimestamp")),
SYSDATE(FunctionName.of("sysdate")),

// Relative timestamp functions
// Relative time functions
RELATIVE_TIMESTAMP(FunctionName.of("relative_timestamp")),
EARLIEST(FunctionName.of("earliest")),
LATEST(FunctionName.of("latest")),

/** Text Functions. */
TOSTRING(FunctionName.of("tostring")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,13 +227,7 @@ public Instant apply(String relativeString, Object currentTimestamp, String zone
? ((Timestamp) currentTimestamp).toInstant()
: (Instant) currentTimestamp;

/// The Spark session time zone (`spark.sql.session.timeZone`)
/// is used, which may be different from the system time zone.
ZoneId zoneId = ZoneId.of(zoneIdString);

/// Relative time calculations are performed using [ZonedDateTime] because offsets (e.g. one hour ago)
/// need to account for changes in the time zone offset (e.g. daylight savings time), while snaps (e.g.
/// start of previous Wednesday) need to account for the local date time.
ZonedDateTime currentDateTime = ZonedDateTime.ofInstant(currentInstant, zoneId);
ZonedDateTime relativeDateTime = TimeUtils.getRelativeZonedDateTime(relativeString, currentDateTime);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.apache.spark.sql.catalyst.expressions.CurrentTimestamp$;
import org.apache.spark.sql.catalyst.expressions.DateAddInterval$;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.catalyst.expressions.GreaterThanOrEqual$;
import org.apache.spark.sql.catalyst.expressions.LessThanOrEqual$;
import org.apache.spark.sql.catalyst.expressions.Literal$;
import org.apache.spark.sql.catalyst.expressions.ScalaUDF;
import org.apache.spark.sql.catalyst.expressions.TimestampAdd$;
Expand All @@ -37,13 +39,15 @@
import static org.opensearch.sql.expression.function.BuiltinFunctionName.DATE_SUB;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.DAY_OF_MONTH;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.COALESCE;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.EARLIEST;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.JSON;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.JSON_ARRAY;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.JSON_ARRAY_LENGTH;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.JSON_EXTRACT;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.JSON_KEYS;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.JSON_OBJECT;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.JSON_VALID;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.LATEST;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.SUBTRACT;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.MULTIPLY;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.DIVIDE;
Expand Down Expand Up @@ -174,9 +178,25 @@ public interface BuiltinFunctionTransformer {
args -> {
return ToUTCTimestamp$.MODULE$.apply(CurrentTimestamp$.MODULE$.apply(), CurrentTimeZone$.MODULE$.apply());
})

// Relative time functions
.put(
RELATIVE_TIMESTAMP,
args -> buildRelativeTimestamp(args.get(0)))
.put(
EARLIEST,
args -> {
Expression relativeTimestamp = buildRelativeTimestamp(args.get(0));
Expression timestamp = args.get(1);
return LessThanOrEqual$.MODULE$.apply(relativeTimestamp, timestamp);
})
.put(
RELATIVE_TIMESTAMP,
args -> SerializableUdf.visit("relative_timestamp", List.of(args.get(0), CurrentTimestamp$.MODULE$.apply(), CurrentTimeZone$.MODULE$.apply())))
LATEST,
args -> {
Expression relativeTimestamp = buildRelativeTimestamp(args.get(0));
Expression timestamp = args.get(1);
return GreaterThanOrEqual$.MODULE$.apply(relativeTimestamp, timestamp);
})
.build();

static Expression builtinFunction(org.opensearch.sql.ast.expression.Function function, List<Expression> args) {
Expand Down Expand Up @@ -218,4 +238,10 @@ static Expression[] createIntervalArgs(IntervalUnit unit, Expression value) {
}
return args;
}

private static Expression buildRelativeTimestamp(Expression relativeStringExpression) {
return SerializableUdf.visit(
RELATIVE_TIMESTAMP.getName().getFunctionName(),
List.of(relativeStringExpression, CurrentTimestamp$.MODULE$.apply(), CurrentTimeZone$.MODULE$.apply()));
}
}

0 comments on commit 54248bb

Please sign in to comment.