Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug(flink): Incorrect SQL generated for over window aggregation #10932

Open
1 task done
romaingd opened this issue Mar 3, 2025 · 0 comments
Open
1 task done

bug(flink): Incorrect SQL generated for over window aggregation #10932

romaingd opened this issue Mar 3, 2025 · 0 comments
Labels
bug Incorrect behavior inside of ibis

Comments

@romaingd
Copy link
Contributor

romaingd commented Mar 3, 2025

What happened?

Hi! Running the Flink + Kafka tutorial, I hit what looks like incorrect SQL being generated when aggregating over a window. See below a simplified version of the tutorial which reproduces the bug. I can't tell if the issue is with Ibis or sqlglot, but the copy=False option seems to play a role.

from pyflink.table import EnvironmentSettings, TableEnvironment

import ibis

env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
table_env.get_config().set("parallelism.default", "1")
connection = ibis.flink.connect(table_env)

import ibis
import ibis.expr.datatypes as dt
import ibis.expr.schema as sch

source_schema = sch.Schema(
    {
        "user_id": dt.int64,
        "time_col": dt.timestamp(scale=3),
        "amt": dt.float64,
    }
)

# Configure the source table with Kafka connector properties.
source_configs = {
    "connector": "kafka",
    "topic": "transaction",
    "properties.bootstrap.servers": "localhost:9092",
    "properties.group.id": "consumer_group_0",
    "scan.startup.mode": "earliest-offset",
    "format": "json",
}

# Create the source table using the defined schema, Kafka connector properties,
# and set watermarking for real-time processing with a 15-second allowed
# lateness.
source_table = connection.create_table(
    "transaction",
    schema=source_schema,
    tbl_properties=source_configs,
    watermark=ibis.watermark(
        time_col="time_col", allowed_delay=ibis.interval(seconds=15)
    ),
)

window = ibis.window(
    group_by=source_table.user_id,
    order_by=source_table.time_col,
    range=(-ibis.interval(minutes=360), 0),
)

faulty = source_table.select(
    source_table.amt.mean().over(window).name("a"),
    source_table.amt.mean().over(window).name("b"),
    source_table.amt.mean().over(window).name("c"),
)

# I want to run `connection.insert("faulty", faulty)` but I get a SQL parsing error - see logs output.
# Looking at generated SQL to understand what happens.
print("sqlglot, copy=False (i.e. connection.insert)")
print(
    connection.compiler.to_sqlglot(faulty).sql(
        dialect=connection.dialect, pretty=True, copy=False
    ),
)
print("----")
print("sqlglot, copy=True")
print(
    connection.compiler.to_sqlglot(faulty).sql(
        dialect=connection.dialect, pretty=True, copy=True
    ),
)

print("----")
print("ibis.to_sql")
print(ibis.to_sql(faulty, dialect="flink"))

Results

sqlglot, copy=False (i.e. connection.insert)
SELECT
  AVG(`t0`.`amt`) OVER (PARTITION BY `t0`.`user_id` ORDER BY `t0`.`time_col` ASC NULLS LAST RANGE BETWEEN INTERVAL '360' MINUTE(3) preceding AND CURRENT ROW) AS `a`,
  AVG(`t0`.`amt`) OVER (PARTITION BY `t0`.`user_id` ORDER BY `t0`.`time_col` ASC NULLS LAST RANGE BETWEEN INTERVAL '360' MINUTE(3)(3) preceding AND CURRENT ROW) AS `b`,
  AVG(`t0`.`amt`) OVER (PARTITION BY `t0`.`user_id` ORDER BY `t0`.`time_col` ASC NULLS LAST RANGE BETWEEN INTERVAL '360' MINUTE(3)(3)(3) preceding AND CURRENT ROW) AS `c`
FROM `transaction` AS `t0`
----
sqlglot, copy=True
SELECT
  AVG(`t0`.`amt`) OVER (PARTITION BY `t0`.`user_id` ORDER BY `t0`.`time_col` ASC NULLS LAST RANGE BETWEEN INTERVAL '360' MINUTE(3) preceding AND CURRENT ROW) AS `a`,
  AVG(`t0`.`amt`) OVER (PARTITION BY `t0`.`user_id` ORDER BY `t0`.`time_col` ASC NULLS LAST RANGE BETWEEN INTERVAL '360' MINUTE(3) preceding AND CURRENT ROW) AS `b`,
  AVG(`t0`.`amt`) OVER (PARTITION BY `t0`.`user_id` ORDER BY `t0`.`time_col` ASC NULLS LAST RANGE BETWEEN INTERVAL '360' MINUTE(3) preceding AND CURRENT ROW) AS `c`
FROM `transaction` AS `t0`
----
ibis.to_sql
SELECT
  AVG(`t0`.`amt`) OVER (PARTITION BY `t0`.`user_id` ORDER BY `t0`.`time_col` ASC NULLS LAST RANGE BETWEEN INTERVAL '360' MINUTE(3) preceding AND CURRENT ROW) AS `a`,
  AVG(`t0`.`amt`) OVER (PARTITION BY `t0`.`user_id` ORDER BY `t0`.`time_col` ASC NULLS LAST RANGE BETWEEN INTERVAL '360' MINUTE(3) preceding AND CURRENT ROW) AS `b`,
  AVG(`t0`.`amt`) OVER (PARTITION BY `t0`.`user_id` ORDER BY `t0`.`time_col` ASC NULLS LAST RANGE BETWEEN INTERVAL '360' MINUTE(3) preceding AND CURRENT ROW) AS `c`
FROM `transaction` AS `t0`

Expected

I expected connection.insert(faulty) to work instead of raising an error.

What version of ibis are you using?

main, i.e. 10.2.0

What backend(s) are you using, if any?

Flink

Relevant log output

Traceback (most recent call last):
  File "/home/romain/perso/ibis/tmp.py", line 55, in <module>
    connection.insert("faulty", faulty)
  File "/home/romain/perso/ibis/ibis/backends/flink/__init__.py", line 945, in insert
    return self.raw_sql(statement.compile())
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/romain/perso/ibis/ibis/backends/flink/__init__.py", line 97, in raw_sql
    return self._table_env.execute_sql(query)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/romain/perso/ibis/.venv/lib/python3.11/site-packages/pyflink/table/table_environment.py", line 837, in execute_sql
    return TableResult(self._j_tenv.executeSql(stmt))
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/romain/perso/ibis/.venv/lib/python3.11/site-packages/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
                   ^^^^^^^^^^^^^^^^^
  File "/home/romain/perso/ibis/.venv/lib/python3.11/site-packages/pyflink/util/exceptions.py", line 146, in deco
    return f(*a, **kw)
           ^^^^^^^^^^^
  File "/home/romain/perso/ibis/.venv/lib/python3.11/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o8.executeSql.
: org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "(" at line 2, column 300.
Was expecting one of:
    "FOLLOWING" ...
    "PRECEDING" ...
    "." ...
    "NOT" ...
    "IN" ...
    "<" ...
    "<=" ...
    ">" ...
    ">=" ...
    "=" ...
    "<>" ...
    "!=" ...
    "BETWEEN" ...
    "LIKE" ...
    "ILIKE" ...
    "RLIKE" ...
    "SIMILAR" ...
    "+" ...
    "-" ...
    "*" ...
    "/" ...
    "%" ...
    "||" ...
    "AND" ...
    "OR" ...
    "IS" ...
    "MEMBER" ...
    "SUBMULTISET" ...
    "CONTAINS" ...
    "OVERLAPS" ...
    "EQUALS" ...
    "PRECEDES" ...
    "SUCCEEDS" ...
    "IMMEDIATELY" ...
    "MULTISET" ...
    "[" ...
    "FORMAT" ...
    "TO" ...
    
        at org.apache.flink.table.planner.parse.CalciteParser.parseSqlList(CalciteParser.java:81)
        at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:102)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:728)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "(" at line 2, column 300.

Code of Conduct

  • I agree to follow this project's Code of Conduct
@romaingd romaingd added the bug Incorrect behavior inside of ibis label Mar 3, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Incorrect behavior inside of ibis
Projects
Status: backlog
Development

No branches or pull requests

1 participant