Skip to content

Commit

Permalink
Implement a custom SELECT with SAMPLE BY support
Browse files Browse the repository at this point in the history
TODO:
- tests, including both sqlalchemy 1.4 and 2.0 and in combination with other clauses (GROUP BY etc)
- documentation

Example usage:
```python
from sqlalchemy import create_engine, MetaData, Table, Column
from questdb_connect import (
    Timestamp,
    Double,
    Symbol,
)
from sqlalchemy import func
from questdb_connect import select

engine = create_engine('questdb://admin:quest@localhost:8812/main')
metadata = MetaData()

# Define a table for sensor readings
sensors = Table(
    'sensors',
    metadata,
    Column('ts', Timestamp),
    Column('temperature', Double),
    Column('humidity', Double),
    Column('location', Symbol),
)

def main():
    metadata.create_all(engine)

    location_samples = select(
        sensors.c.ts,
        func.avg(sensors.c.temperature).label('avg_temp'),
        func.min(sensors.c.temperature).label('min_temp'),
        func.max(sensors.c.temperature).label('max_temp')
    ).where(
        sensors.c.location == 'warehouse'
    ).sample_by(1, 'd');

    with engine.connect() as conn:
        for row in conn.execute(location_samples).fetchall():
            print(f"Time: {row.ts}, Average Temp: {row.avg_temp}, Minimal Temp: {row.min_temp}, Maximal Temp: {row.max_temp}")

if __name__ == '__main__':
    main()
```
  • Loading branch information
jerrinot committed Jan 3, 2025
1 parent 166a068 commit 09869e3
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 0 deletions.
6 changes: 6 additions & 0 deletions src/questdb_connect/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
create_engine,
create_superset_engine,
)
from questdb_connect.dml import select, QDBSelect
from questdb_connect.identifier_preparer import QDBIdentifierPreparer
from questdb_connect.inspector import QDBInspector
from questdb_connect.keywords_functions import get_functions_list, get_keywords_list
Expand Down Expand Up @@ -51,6 +52,11 @@
threadsafety = 2
paramstyle = "pyformat"

__all__ = (
"select",
"QDBSelect",
)


class Error(Exception):
pass
Expand Down
33 changes: 33 additions & 0 deletions src/questdb_connect/compilers.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,39 @@ def get_column_specification(self, column: sqlalchemy.Column, **_):


class QDBSQLCompiler(sqlalchemy.sql.compiler.SQLCompiler, abc.ABC):
def visit_sample_by(self, sample_by, **kw):
"""Compile a SAMPLE BY clause."""
if sample_by.unit:
return f"SAMPLE BY {sample_by.value}{sample_by.unit}"
return f"SAMPLE BY {sample_by.value}"

def visit_select(self, select, **kw):
"""Add SAMPLE BY support to the standard SELECT compilation."""

text = super().visit_select(select, **kw)

# TODO: The exact positioning is a big funky, fix it
if hasattr(select, '_sample_by_clause') and select._sample_by_clause is not None:
# Add SAMPLE BY before ORDER BY and LIMIT
sample_text = self.process(select._sample_by_clause, **kw)

# Find positions of ORDER BY and LIMIT
order_by_pos = text.find("ORDER BY")
limit_pos = text.find("LIMIT")

# Determine where to insert SAMPLE BY
if order_by_pos >= 0:
# Insert before ORDER BY
text = text[:order_by_pos] + sample_text + " " + text[order_by_pos:]
elif limit_pos >= 0:
# Insert before LIMIT
text = text[:limit_pos] + sample_text + " " + text[limit_pos:]
else:
# Append at the end
text += " " + sample_text

return text

def _is_safe_for_fast_insert_values_helper(self):
return True

Expand Down
105 changes: 105 additions & 0 deletions src/questdb_connect/dml.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
from __future__ import annotations

from typing import Any, Optional, Union, Sequence

from sqlalchemy.sql import Select as StandardSelect
from sqlalchemy.sql import ClauseElement
from sqlalchemy import select as sa_select
from sqlalchemy.sql.visitors import Visitable


class SampleByClause(ClauseElement):
"""Represents the QuestDB SAMPLE BY clause."""

__visit_name__ = "sample_by"
stringify_dialect = "questdb"

def __init__(
self,
value: Union[int, float],
unit: Optional[str] = None
):
self.value = value
self.unit = unit.lower() if unit else None

def __str__(self) -> str:
if self.unit:
return f"SAMPLE BY {self.value}{self.unit}"
return f"SAMPLE BY {self.value}"

def get_children(self, **kwargs: Any) -> Sequence[Visitable]:
return []


class QDBSelect(StandardSelect):
"""QuestDB-specific implementation of SELECT.
Adds methods for QuestDB-specific syntaxes such as SAMPLE BY.
The :class:`_questdb.QDBSelect` object is created using the
:func:`sqlalchemy.dialects.questdb.select` function.
"""

stringify_dialect = "questdb"
_sample_by_clause: Optional[SampleByClause] = None

def get_children(self, **kwargs: Any) -> Sequence[Visitable]:
children = super().get_children(**kwargs)
if self._sample_by_clause is not None:
children = children + [self._sample_by_clause]
return children

def sample_by(
self,
value: Union[int, float],
unit: Optional[str] = None
) -> QDBSelect:
"""Add a SAMPLE BY clause to the select statement.
The SAMPLE BY clause allows time-based sampling of data.
:param value:
For time-based sampling: the time interval
:param unit:
Time unit for sampling:
- 's': seconds
- 'm': minutes
- 'h': hours
- 'd': days
Example time-based sampling::
select([table.c.value]).sample_by(1, 'h') # sample every hour
select([table.c.value]).sample_by(30, 'm') # sample every 30 minutes
"""
# Create a copy of our object with _generative
s = self.__class__.__new__(self.__class__)
s.__dict__ = self.__dict__.copy()

# Set the sample by clause
s._sample_by_clause = SampleByClause(value, unit)
return s


def select(*entities: Any, **kwargs: Any) -> QDBSelect:
"""Construct a QuestDB-specific variant :class:`_questdb.Select` construct.
.. container:: inherited_member
The :func:`sqlalchemy.dialects.questdb.select` function creates
a :class:`sqlalchemy.dialects.questdb.Select`. This class is based
on the dialect-agnostic :class:`_sql.Select` construct which may
be constructed using the :func:`_sql.select` function in
SQLAlchemy Core.
The :class:`_questdb.Select` construct includes additional method
:meth:`_questdb.Select.sample_by` for QuestDB's SAMPLE BY clause.
"""
stmt = sa_select(*entities, **kwargs)
# Convert the SQLAlchemy Select into our QDBSelect
qdbs = QDBSelect.__new__(QDBSelect)
qdbs.__dict__ = stmt.__dict__.copy()
return qdbs

0 comments on commit 09869e3

Please sign in to comment.