From 09869e35e51f4a00a013fcd81e849838d12a4773 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Fri, 3 Jan 2025 15:47:24 +0100 Subject: [PATCH] Implement a custom SELECT with SAMPLE BY support TODO: - tests, including both sqlalchemy 1.4 and 2.0 and in combination with other clauses (GROUP BY etc) - documentation Example usage: ```python from sqlalchemy import create_engine, MetaData, Table, Column from questdb_connect import ( Timestamp, Double, Symbol, ) from sqlalchemy import func from questdb_connect import select engine = create_engine('questdb://admin:quest@localhost:8812/main') metadata = MetaData() # Define a table for sensor readings sensors = Table( 'sensors', metadata, Column('ts', Timestamp), Column('temperature', Double), Column('humidity', Double), Column('location', Symbol), ) def main(): metadata.create_all(engine) location_samples = select( sensors.c.ts, func.avg(sensors.c.temperature).label('avg_temp'), func.min(sensors.c.temperature).label('min_temp'), func.max(sensors.c.temperature).label('max_temp') ).where( sensors.c.location == 'warehouse' ).sample_by(1, 'd'); with engine.connect() as conn: for row in conn.execute(location_samples).fetchall(): print(f"Time: {row.ts}, Average Temp: {row.avg_temp}, Minimal Temp: {row.min_temp}, Maximal Temp: {row.max_temp}") if __name__ == '__main__': main() ``` --- src/questdb_connect/__init__.py | 6 ++ src/questdb_connect/compilers.py | 33 ++++++++++ src/questdb_connect/dml.py | 105 +++++++++++++++++++++++++++++++ 3 files changed, 144 insertions(+) create mode 100644 src/questdb_connect/dml.py diff --git a/src/questdb_connect/__init__.py b/src/questdb_connect/__init__.py index 82042ef..7d5b886 100644 --- a/src/questdb_connect/__init__.py +++ b/src/questdb_connect/__init__.py @@ -11,6 +11,7 @@ create_engine, create_superset_engine, ) +from questdb_connect.dml import select, QDBSelect from questdb_connect.identifier_preparer import QDBIdentifierPreparer from questdb_connect.inspector import QDBInspector from questdb_connect.keywords_functions import get_functions_list, get_keywords_list @@ -51,6 +52,11 @@ threadsafety = 2 paramstyle = "pyformat" +__all__ = ( + "select", + "QDBSelect", +) + class Error(Exception): pass diff --git a/src/questdb_connect/compilers.py b/src/questdb_connect/compilers.py index 7994514..d3ae93b 100644 --- a/src/questdb_connect/compilers.py +++ b/src/questdb_connect/compilers.py @@ -30,6 +30,39 @@ def get_column_specification(self, column: sqlalchemy.Column, **_): class QDBSQLCompiler(sqlalchemy.sql.compiler.SQLCompiler, abc.ABC): + def visit_sample_by(self, sample_by, **kw): + """Compile a SAMPLE BY clause.""" + if sample_by.unit: + return f"SAMPLE BY {sample_by.value}{sample_by.unit}" + return f"SAMPLE BY {sample_by.value}" + + def visit_select(self, select, **kw): + """Add SAMPLE BY support to the standard SELECT compilation.""" + + text = super().visit_select(select, **kw) + + # TODO: The exact positioning is a big funky, fix it + if hasattr(select, '_sample_by_clause') and select._sample_by_clause is not None: + # Add SAMPLE BY before ORDER BY and LIMIT + sample_text = self.process(select._sample_by_clause, **kw) + + # Find positions of ORDER BY and LIMIT + order_by_pos = text.find("ORDER BY") + limit_pos = text.find("LIMIT") + + # Determine where to insert SAMPLE BY + if order_by_pos >= 0: + # Insert before ORDER BY + text = text[:order_by_pos] + sample_text + " " + text[order_by_pos:] + elif limit_pos >= 0: + # Insert before LIMIT + text = text[:limit_pos] + sample_text + " " + text[limit_pos:] + else: + # Append at the end + text += " " + sample_text + + return text + def _is_safe_for_fast_insert_values_helper(self): return True diff --git a/src/questdb_connect/dml.py b/src/questdb_connect/dml.py new file mode 100644 index 0000000..91e0fde --- /dev/null +++ b/src/questdb_connect/dml.py @@ -0,0 +1,105 @@ +from __future__ import annotations + +from typing import Any, Optional, Union, Sequence + +from sqlalchemy.sql import Select as StandardSelect +from sqlalchemy.sql import ClauseElement +from sqlalchemy import select as sa_select +from sqlalchemy.sql.visitors import Visitable + + +class SampleByClause(ClauseElement): + """Represents the QuestDB SAMPLE BY clause.""" + + __visit_name__ = "sample_by" + stringify_dialect = "questdb" + + def __init__( + self, + value: Union[int, float], + unit: Optional[str] = None + ): + self.value = value + self.unit = unit.lower() if unit else None + + def __str__(self) -> str: + if self.unit: + return f"SAMPLE BY {self.value}{self.unit}" + return f"SAMPLE BY {self.value}" + + def get_children(self, **kwargs: Any) -> Sequence[Visitable]: + return [] + + +class QDBSelect(StandardSelect): + """QuestDB-specific implementation of SELECT. + + Adds methods for QuestDB-specific syntaxes such as SAMPLE BY. + + The :class:`_questdb.QDBSelect` object is created using the + :func:`sqlalchemy.dialects.questdb.select` function. + """ + + stringify_dialect = "questdb" + _sample_by_clause: Optional[SampleByClause] = None + + def get_children(self, **kwargs: Any) -> Sequence[Visitable]: + children = super().get_children(**kwargs) + if self._sample_by_clause is not None: + children = children + [self._sample_by_clause] + return children + + def sample_by( + self, + value: Union[int, float], + unit: Optional[str] = None + ) -> QDBSelect: + """Add a SAMPLE BY clause to the select statement. + + The SAMPLE BY clause allows time-based sampling of data. + + :param value: + For time-based sampling: the time interval + + + :param unit: + Time unit for sampling: + - 's': seconds + - 'm': minutes + - 'h': hours + - 'd': days + + Example time-based sampling:: + + select([table.c.value]).sample_by(1, 'h') # sample every hour + select([table.c.value]).sample_by(30, 'm') # sample every 30 minutes + + """ + # Create a copy of our object with _generative + s = self.__class__.__new__(self.__class__) + s.__dict__ = self.__dict__.copy() + + # Set the sample by clause + s._sample_by_clause = SampleByClause(value, unit) + return s + + +def select(*entities: Any, **kwargs: Any) -> QDBSelect: + """Construct a QuestDB-specific variant :class:`_questdb.Select` construct. + + .. container:: inherited_member + + The :func:`sqlalchemy.dialects.questdb.select` function creates + a :class:`sqlalchemy.dialects.questdb.Select`. This class is based + on the dialect-agnostic :class:`_sql.Select` construct which may + be constructed using the :func:`_sql.select` function in + SQLAlchemy Core. + + The :class:`_questdb.Select` construct includes additional method + :meth:`_questdb.Select.sample_by` for QuestDB's SAMPLE BY clause. + """ + stmt = sa_select(*entities, **kwargs) + # Convert the SQLAlchemy Select into our QDBSelect + qdbs = QDBSelect.__new__(QDBSelect) + qdbs.__dict__ = stmt.__dict__.copy() + return qdbs \ No newline at end of file