Skip to content

Commit

Permalink
🐛 v1.4.8 Added tests and fixed an issue with build_where().
Browse files Browse the repository at this point in the history
  • Loading branch information
bmeares authored Nov 5, 2022
2 parents 8eb645a + dd54efa commit e783609
Show file tree
Hide file tree
Showing 9 changed files with 110 additions and 15 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

This is the current release cycle, so stay tuned for future releases!

### v1.4.5 – v1.4.7
### v1.4.5 – v1.4.8

- **Bugfixes and stability improvements.**
These versions included several bugfixes, such as patching `--skip-check-existing` for in-place syncs and fixing the behavior of `--params` ([`build_where()`](https://docs.meerschaum.io/utils/sql.html#meerschaum.utils.sql.build_where)).
Expand Down
2 changes: 1 addition & 1 deletion docs/mkdocs/news/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

This is the current release cycle, so stay tuned for future releases!

### v1.4.5 – v1.4.7
### v1.4.5 – v1.4.8

- **Bugfixes and stability improvements.**
These versions included several bugfixes, such as patching `--skip-check-existing` for in-place syncs and fixing the behavior of `--params` ([`build_where()`](https://docs.meerschaum.io/utils/sql.html#meerschaum.utils.sql.build_where)).
Expand Down
17 changes: 9 additions & 8 deletions docs/mkdocs/reference/pipes/syncing.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
}
}
</style>

# 📥 Syncing

Meerschaum efficiently syncs immutable time-series data, such as IoT sensor data streams. The syncing process consists of three basic stages, similar to ETL: **fetch**, **filter**, and **insert**.
Meerschaum efficiently syncs immutable time-series data, such as IoT sensor data streams. The syncing process consists of three basic stages, similar to ETL: **fetch**, **filter**, and **upsert**.

!!! abstract "Want to read more?"
I wrote my [master's thesis](https://meerschaum.io/files/pdf/thesis.pdf) on comparing different fetch strategies and came across some intriguing results. [Here are the presentation slides](https://meerschaum.io/files/pdf/slides.pdf) which summarize my findings.
Expand All @@ -28,7 +29,7 @@ Meerschaum efficiently syncs immutable time-series data, such as IoT sensor data
<asciinema-player src="/assets/casts/sync-pipes.cast" preload="true" rows="37"></asciinema-player>

## Stages
The primary reason for syncing in this way is to take advantage of the properties of immutable time-series data to minimize the stress imposed on remote source databases.
The primary reason for syncing in this way is to take advantage of the properties of time-series data to minimize the stress imposed on remote source databases.

### **Fetch** (*Extract* and *Transform*)
This is where the real time-series optimizations come into play. When syncing a SQL pipe, the definition sub-query is executed with additional filtering in the `WHERE` clause to only fetch the newest data.
Expand All @@ -37,15 +38,15 @@ For example, if the definition of a pipe is `#!sql SELECT * FROM remote_table`,

```sql
WITH definition AS (
SELECT * FROM remote_table
SELECT *
FROM remote_table
)
SELECT DISTINCT *
FROM
definition
SELECT *
FROM definition
WHERE
definition.datetime >= CAST(
'2021-06-23 14:52:00' AS TIMESTAMP
) + INTERVAL '0 minute'
)
```

!!! question "How does fetch work?"
Expand Down Expand Up @@ -91,7 +92,7 @@ After fetching remote data, the difference is taken to remove duplicate rows. Th
To skip the filter stage, you can use the `--skip-check-existing` flag.


### Insert (*Load*)
### Upsert (*Load*)

Once data are fetched and filtered, they are inserted into the table of the corresponding [Meerschaum instance](/reference/connectors/#instances-and-repositories). Depending on the type of instance connector, the data may be bulk uploaded (for TimescaleDB and PostgreSQL), inserted into a table, or posted to an API endpoint.

Expand Down
1 change: 0 additions & 1 deletion meerschaum/api/_websockets.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,3 @@ async def websocket_endpoint(
except fastapi.WebSocketDisconnect:
del websockets[session_id]
break

2 changes: 1 addition & 1 deletion meerschaum/config/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
Specify the Meerschaum release version.
"""

__version__ = "1.4.7"
__version__ = "1.4.8"
4 changes: 3 additions & 1 deletion meerschaum/plugins/_Plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,15 @@ def module(self):
self._module = import_plugins(str(self), warn=False)
return self._module


@property
def __file__(self) -> Union[str, None]:
"""
Return the file path (str) of the plugin if it exists, otherwise `None`.
"""
if '_module' in self.__dict__:
if self.__dict__.get('_module', None) is not None:
return self.module.__file__

potential_dir = PLUGINS_RESOURCES_PATH / self.name
if (
potential_dir.exists()
Expand Down
3 changes: 2 additions & 1 deletion meerschaum/utils/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@
'substrings': {
'CHAR': 'object',
'TIMESTAMP': 'datetime64[ns]',
'TIME': 'datetime64[ns]',
'DATE': 'datetime64[ns]',
'DOUBLE': 'float64',
'INT': 'Int64',
Expand Down Expand Up @@ -670,8 +671,8 @@ def build_where(
if excludes:
where += f"{leading_and}{_key} NOT IN ("
for item in excludes:
quoted_item = str(item).replace("'", "''")
item = str(item)[len(negation_prefix):]
quoted_item = str(item).replace("'", "''")
where += f"'{quoted_item}', "
where = where[:-2] + ")"
continue
Expand Down
4 changes: 3 additions & 1 deletion meerschaum/utils/yaml.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from meerschaum.utils.misc import filter_keywords
from meerschaum.utils.packages import attempt_import, all_packages, _import_module
from meerschaum.utils.warnings import error
from meerschaum.utils.warnings import error, warn
from meerschaum.utils.threading import Lock

_lib = None
Expand Down Expand Up @@ -55,6 +55,7 @@ def safe_load(*args, **kw):
return _yaml.load(*args, **filter_keywords(_yaml.load, **kw))
return _yaml.safe_load(*args, **filter_keywords(_yaml.safe_load, **kw))


@staticmethod
def load(*args, **kw):
"""
Expand All @@ -71,6 +72,7 @@ def load(*args, **kw):
_args += [_yaml.Loader]
return _yaml.load(*_args, **filter_keywords(_yaml.load, **kw))


@staticmethod
def dump(data, stream=None, **kw):
"""
Expand Down
90 changes: 90 additions & 0 deletions tests/utils/test_sql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# vim:fenc=utf-8

"""
Test SQL utility functions.
"""

from typing import Dict, List, Any
import pytest
from meerschaum.utils.sql import (
build_where,
get_pd_type,
)
import meerschaum as mrsm

@pytest.mark.parametrize(
'params,expected_subqueries',
[
(
{'a': 1},
["\"a\" = '1'"]
),
(
{'a': 1, 'b': 2},
["\"a\" = '1'", "\"b\" = '2'"]
),
(
{'a': [1], 'b': 2},
["\"a\" IN ('1')", "\"b\" = '2'"]
),
(
{'a': [1], 'b': '_2'},
["\"a\" IN ('1')", "\"b\" != '2'"]
),
(
{'a': ['_1'], 'b': '_2'},
["\"a\" NOT IN ('1')", "\"b\" != '2'"]
),
(
{'a': ['_1', 10, '_2', 20], 'b': ['_2', '_3']},
[
"\"a\" NOT IN ('1', '2')",
"\"a\" IN ('10', '20')",
"\"b\" NOT IN ('2', '3')",
]
),
]
)
def test_build_where(params: Dict[str, Any], expected_subqueries: List[str]):
"""
Test that build_where() correctly produces the expected query.
"""
where_subquery = build_where(
params,
mrsm.get_connector('sql', 'build_where_test', uri='postgresql://foo:bar@localhost:5432/baz')
)
for subquery in expected_subqueries:
assert subquery in where_subquery


@pytest.mark.parametrize(
'db_type,pd_type',
[
('TEXT', 'object'),
('DATETIME', 'datetime64[ns]'),
('NVARCHAR(2000)', 'object'),
('JSON', 'object'),
('DATE', 'datetime64[ns]'),
('TIMESTAMP', 'datetime64[ns]'),
('BOOL', 'bool'),
('BOOLEAN', 'bool'),
('FLOAT', 'float64'),
('DOUBLE', 'float64'),
('TIMESTAMPTZ', 'datetime64[ns, UTC]'),
('TIMESTAMP WITH TIMEZONE', 'datetime64[ns, UTC]'),
('CLOB', 'object'),
('NUMBER', 'float64'),
('INT', 'Int64'),
('BIGINT', 'Int64'),
('VARCHAR', 'object'),
('CHAR', 'object'),
('not a type', 'object'),
]
)
def test_get_pd_type(db_type: str, pd_type: str):
"""
Verify that various database types are mapped to Pandas types.
"""
assert get_pd_type(db_type) == pd_type

0 comments on commit e783609

Please sign in to comment.