diff --git a/CHANGELOG.md b/CHANGELOG.md index cc8da128..01250652 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,7 @@ This is the current release cycle, so stay tuned for future releases! -### v1.4.5 – v1.4.7 +### v1.4.5 – v1.4.8 - **Bugfixes and stability improvements.** These versions included several bugfixes, such as patching `--skip-check-existing` for in-place syncs and fixing the behavior of `--params` ([`build_where()`](https://docs.meerschaum.io/utils/sql.html#meerschaum.utils.sql.build_where)). diff --git a/docs/mkdocs/news/changelog.md b/docs/mkdocs/news/changelog.md index cc8da128..01250652 100644 --- a/docs/mkdocs/news/changelog.md +++ b/docs/mkdocs/news/changelog.md @@ -4,7 +4,7 @@ This is the current release cycle, so stay tuned for future releases! -### v1.4.5 – v1.4.7 +### v1.4.5 – v1.4.8 - **Bugfixes and stability improvements.** These versions included several bugfixes, such as patching `--skip-check-existing` for in-place syncs and fixing the behavior of `--params` ([`build_where()`](https://docs.meerschaum.io/utils/sql.html#meerschaum.utils.sql.build_where)). diff --git a/docs/mkdocs/reference/pipes/syncing.md b/docs/mkdocs/reference/pipes/syncing.md index 40bf9e56..1eaa5000 100644 --- a/docs/mkdocs/reference/pipes/syncing.md +++ b/docs/mkdocs/reference/pipes/syncing.md @@ -17,9 +17,10 @@ } } + # 📥 Syncing -Meerschaum efficiently syncs immutable time-series data, such as IoT sensor data streams. The syncing process consists of three basic stages, similar to ETL: **fetch**, **filter**, and **insert**. +Meerschaum efficiently syncs immutable time-series data, such as IoT sensor data streams. The syncing process consists of three basic stages, similar to ETL: **fetch**, **filter**, and **upsert**. !!! abstract "Want to read more?" I wrote my [master's thesis](https://meerschaum.io/files/pdf/thesis.pdf) on comparing different fetch strategies and came across some intriguing results. [Here are the presentation slides](https://meerschaum.io/files/pdf/slides.pdf) which summarize my findings. @@ -28,7 +29,7 @@ Meerschaum efficiently syncs immutable time-series data, such as IoT sensor data ## Stages -The primary reason for syncing in this way is to take advantage of the properties of immutable time-series data to minimize the stress imposed on remote source databases. +The primary reason for syncing in this way is to take advantage of the properties of time-series data to minimize the stress imposed on remote source databases. ### **Fetch** (*Extract* and *Transform*) This is where the real time-series optimizations come into play. When syncing a SQL pipe, the definition sub-query is executed with additional filtering in the `WHERE` clause to only fetch the newest data. @@ -37,15 +38,15 @@ For example, if the definition of a pipe is `#!sql SELECT * FROM remote_table`, ```sql WITH definition AS ( - SELECT * FROM remote_table + SELECT * + FROM remote_table ) -SELECT DISTINCT * -FROM - definition +SELECT * +FROM definition WHERE definition.datetime >= CAST( '2021-06-23 14:52:00' AS TIMESTAMP - ) + INTERVAL '0 minute' + ) ``` !!! question "How does fetch work?" @@ -91,7 +92,7 @@ After fetching remote data, the difference is taken to remove duplicate rows. Th To skip the filter stage, you can use the `--skip-check-existing` flag. -### Insert (*Load*) +### Upsert (*Load*) Once data are fetched and filtered, they are inserted into the table of the corresponding [Meerschaum instance](/reference/connectors/#instances-and-repositories). Depending on the type of instance connector, the data may be bulk uploaded (for TimescaleDB and PostgreSQL), inserted into a table, or posted to an API endpoint. diff --git a/meerschaum/api/_websockets.py b/meerschaum/api/_websockets.py index aaabb324..d2ce4ad4 100644 --- a/meerschaum/api/_websockets.py +++ b/meerschaum/api/_websockets.py @@ -43,4 +43,3 @@ async def websocket_endpoint( except fastapi.WebSocketDisconnect: del websockets[session_id] break - diff --git a/meerschaum/config/_version.py b/meerschaum/config/_version.py index 79453bbf..bd71f3ba 100644 --- a/meerschaum/config/_version.py +++ b/meerschaum/config/_version.py @@ -2,4 +2,4 @@ Specify the Meerschaum release version. """ -__version__ = "1.4.7" +__version__ = "1.4.8" diff --git a/meerschaum/plugins/_Plugin.py b/meerschaum/plugins/_Plugin.py index f29d2d70..bb9df35e 100644 --- a/meerschaum/plugins/_Plugin.py +++ b/meerschaum/plugins/_Plugin.py @@ -116,13 +116,15 @@ def module(self): self._module = import_plugins(str(self), warn=False) return self._module + @property def __file__(self) -> Union[str, None]: """ Return the file path (str) of the plugin if it exists, otherwise `None`. """ - if '_module' in self.__dict__: + if self.__dict__.get('_module', None) is not None: return self.module.__file__ + potential_dir = PLUGINS_RESOURCES_PATH / self.name if ( potential_dir.exists() diff --git a/meerschaum/utils/sql.py b/meerschaum/utils/sql.py index 88d46fb5..ceb7d1b5 100644 --- a/meerschaum/utils/sql.py +++ b/meerschaum/utils/sql.py @@ -143,6 +143,7 @@ 'substrings': { 'CHAR': 'object', 'TIMESTAMP': 'datetime64[ns]', + 'TIME': 'datetime64[ns]', 'DATE': 'datetime64[ns]', 'DOUBLE': 'float64', 'INT': 'Int64', @@ -670,8 +671,8 @@ def build_where( if excludes: where += f"{leading_and}{_key} NOT IN (" for item in excludes: - quoted_item = str(item).replace("'", "''") item = str(item)[len(negation_prefix):] + quoted_item = str(item).replace("'", "''") where += f"'{quoted_item}', " where = where[:-2] + ")" continue diff --git a/meerschaum/utils/yaml.py b/meerschaum/utils/yaml.py index 3fafb342..b3215ab9 100644 --- a/meerschaum/utils/yaml.py +++ b/meerschaum/utils/yaml.py @@ -10,7 +10,7 @@ from meerschaum.utils.misc import filter_keywords from meerschaum.utils.packages import attempt_import, all_packages, _import_module -from meerschaum.utils.warnings import error +from meerschaum.utils.warnings import error, warn from meerschaum.utils.threading import Lock _lib = None @@ -55,6 +55,7 @@ def safe_load(*args, **kw): return _yaml.load(*args, **filter_keywords(_yaml.load, **kw)) return _yaml.safe_load(*args, **filter_keywords(_yaml.safe_load, **kw)) + @staticmethod def load(*args, **kw): """ @@ -71,6 +72,7 @@ def load(*args, **kw): _args += [_yaml.Loader] return _yaml.load(*_args, **filter_keywords(_yaml.load, **kw)) + @staticmethod def dump(data, stream=None, **kw): """ diff --git a/tests/utils/test_sql.py b/tests/utils/test_sql.py new file mode 100644 index 00000000..bbe02867 --- /dev/null +++ b/tests/utils/test_sql.py @@ -0,0 +1,90 @@ +#! /usr/bin/env python3 +# -*- coding: utf-8 -*- +# vim:fenc=utf-8 + +""" +Test SQL utility functions. +""" + +from typing import Dict, List, Any +import pytest +from meerschaum.utils.sql import ( + build_where, + get_pd_type, +) +import meerschaum as mrsm + +@pytest.mark.parametrize( + 'params,expected_subqueries', + [ + ( + {'a': 1}, + ["\"a\" = '1'"] + ), + ( + {'a': 1, 'b': 2}, + ["\"a\" = '1'", "\"b\" = '2'"] + ), + ( + {'a': [1], 'b': 2}, + ["\"a\" IN ('1')", "\"b\" = '2'"] + ), + ( + {'a': [1], 'b': '_2'}, + ["\"a\" IN ('1')", "\"b\" != '2'"] + ), + ( + {'a': ['_1'], 'b': '_2'}, + ["\"a\" NOT IN ('1')", "\"b\" != '2'"] + ), + ( + {'a': ['_1', 10, '_2', 20], 'b': ['_2', '_3']}, + [ + "\"a\" NOT IN ('1', '2')", + "\"a\" IN ('10', '20')", + "\"b\" NOT IN ('2', '3')", + ] + ), + ] +) +def test_build_where(params: Dict[str, Any], expected_subqueries: List[str]): + """ + Test that build_where() correctly produces the expected query. + """ + where_subquery = build_where( + params, + mrsm.get_connector('sql', 'build_where_test', uri='postgresql://foo:bar@localhost:5432/baz') + ) + for subquery in expected_subqueries: + assert subquery in where_subquery + + +@pytest.mark.parametrize( + 'db_type,pd_type', + [ + ('TEXT', 'object'), + ('DATETIME', 'datetime64[ns]'), + ('NVARCHAR(2000)', 'object'), + ('JSON', 'object'), + ('DATE', 'datetime64[ns]'), + ('TIMESTAMP', 'datetime64[ns]'), + ('BOOL', 'bool'), + ('BOOLEAN', 'bool'), + ('FLOAT', 'float64'), + ('DOUBLE', 'float64'), + ('TIMESTAMPTZ', 'datetime64[ns, UTC]'), + ('TIMESTAMP WITH TIMEZONE', 'datetime64[ns, UTC]'), + ('CLOB', 'object'), + ('NUMBER', 'float64'), + ('INT', 'Int64'), + ('BIGINT', 'Int64'), + ('VARCHAR', 'object'), + ('CHAR', 'object'), + ('not a type', 'object'), + ] +) +def test_get_pd_type(db_type: str, pd_type: str): + """ + Verify that various database types are mapped to Pandas types. + """ + assert get_pd_type(db_type) == pd_type