Skip to content

Commit

Permalink
🐛 v1.4.11 (#94) Add support for MySQL 5.7 and fix edge case bugs.
Browse files Browse the repository at this point in the history
  • Loading branch information
bmeares authored Nov 19, 2022
2 parents 656279f + 23bc406 commit a9200a7
Show file tree
Hide file tree
Showing 15 changed files with 243 additions and 51 deletions.
50 changes: 50 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,56 @@

This is the current release cycle, so stay tuned for future releases!

### v1.4.11

- **Add support for older versions of MySQL.**
The `WITH` keyword for CTE blocks was not introduced until MySQL 8.0. This patch uses the older syntax for older versions of MySQL and MariaDB. MySQL 5.7 was added to the test suite.

- **Allow for any iterable in `items_str()`**
If an iterable other than a list is passed to `items_str()`, it will convert to a list before building the string:

```python
from meerschaum.utils.misc import items_str
print(items_str({'apples': 1, 'bananas': 2}, quotes=False)
# apples and bananas
```

- **Fixed an edge case with `datetime` set to `None`.**
This patch will ignore the datetime index even if it was set explicitly to `None`.

- **Added `Pipe.children`.**
To complement `Pipe.parents`, setting the parameters key `children` to a list of pipes' keys will be treated the same as `Pipe.parents`:

```python
import meerschaum as mrsm
pipe = mrsm.Pipe(
'a', 'b',
parameters = {
'children': [
{
'connector': 'a',
'metric': 'b',
'location': 'c',
},
]
}
)
print(pipe.children)
# [Pipe('a', 'b', 'c')]
```

- **Added support for `type:label` syntax in `mrsm.get_connector()`.**
The factory function `mrsm.get_connector()` expects the type and label as two arguments, but this patch allows for passing a single string with both arguments:

```python
import meerschaum as mrsm
print(mrsm.get_connector('sql:local'))
# sql:local
```

- **Fixed more edge case bugs.**
For example, converting to `Int64` sometimes breaks with older versions of `pandas`. This patch adds a workaround.

### v1.4.10

- **Fixed an issue with syncing background jobs.**
Expand Down
50 changes: 50 additions & 0 deletions docs/mkdocs/news/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,56 @@

This is the current release cycle, so stay tuned for future releases!

### v1.4.11

- **Add support for older versions of MySQL.**
The `WITH` keyword for CTE blocks was not introduced until MySQL 8.0. This patch uses the older syntax for older versions of MySQL and MariaDB. MySQL 5.7 was added to the test suite.

- **Allow for any iterable in `items_str()`**
If an iterable other than a list is passed to `items_str()`, it will convert to a list before building the string:

```python
from meerschaum.utils.misc import items_str
print(items_str({'apples': 1, 'bananas': 2}, quotes=False)
# apples and bananas
```

- **Fixed an edge case with `datetime` set to `None`.**
This patch will ignore the datetime index even if it was set explicitly to `None`.

- **Added `Pipe.children`.**
To complement `Pipe.parents`, setting the parameters key `children` to a list of pipes' keys will be treated the same as `Pipe.parents`:

```python
import meerschaum as mrsm
pipe = mrsm.Pipe(
'a', 'b',
parameters = {
'children': [
{
'connector': 'a',
'metric': 'b',
'location': 'c',
},
]
}
)
print(pipe.children)
# [Pipe('a', 'b', 'c')]
```

- **Added support for `type:label` syntax in `mrsm.get_connector()`.**
The factory function `mrsm.get_connector()` expects the type and label as two arguments, but this patch allows for passing a single string with both arguments:

```python
import meerschaum as mrsm
print(mrsm.get_connector('sql:local'))
# sql:local
```

- **Fixed more edge case bugs.**
For example, converting to `Int64` sometimes breaks with older versions of `pandas`. This patch adds a workaround.

### v1.4.10

- **Fixed an issue with syncing background jobs.**
Expand Down
2 changes: 1 addition & 1 deletion meerschaum/config/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
Specify the Meerschaum release version.
"""

__version__ = "1.4.10"
__version__ = "1.4.11"
2 changes: 2 additions & 0 deletions meerschaum/connectors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ def get_connector(
from meerschaum.config import get_config
from meerschaum.config.static import STATIC_CONFIG
global _loaded_plugin_connectors
if isinstance(type, str) and not label and ':' in type:
type, label = type.split(':', maxsplit=1)
with _locks['_loaded_plugin_connectors']:
if not _loaded_plugin_connectors:
load_plugin_connectors()
Expand Down
19 changes: 15 additions & 4 deletions meerschaum/connectors/sql/_fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def get_pipe_metadef(
definition = get_pipe_query(pipe)
btm = get_pipe_backtrack_minutes(pipe)

if not pipe.columns or 'datetime' not in pipe.columns:
if not pipe.columns.get('datetime', None):
_dt = pipe.guess_datetime()
dt_name = sql_item_name(_dt, self.flavor) if _dt else None
is_guess = True
Expand Down Expand Up @@ -252,7 +252,11 @@ def get_pipe_backtrack_minutes(pipe) -> Union[int, float]:
def _simple_fetch_query(pipe, debug: bool=False, **kw) -> str:
"""Build a fetch query from a pipe's definition."""
definition = get_pipe_query(pipe)
return f"WITH definition AS ({definition}) SELECT * FROM definition"
return (
f"WITH definition AS ({definition}) SELECT * FROM definition"
if pipe.connector.flavor not in ('mysql', 'mariadb')
else f"SELECT * FROM ({definition}) AS definition"
)

def _join_fetch_query(
pipe,
Expand Down Expand Up @@ -299,10 +303,17 @@ def _join_fetch_query(
_sync_times_q = _sync_times_q[:(-1 * len('UNION ALL\n'))] + ")"

definition = get_pipe_query(pipe)
query = f"""
query = (
f"""
WITH definition AS ({definition}){_sync_times_q}
SELECT definition.*
FROM definition
FROM definition"""
if pipe.connector.flavor not in ('mysql', 'mariadb')
else (
f"""
SELECT * FROM ({definition}) AS definition"""
)
) + f"""
LEFT OUTER JOIN {sync_times_remote_name} AS st
ON st.{id_remote_name} = definition.{id_remote_name}
WHERE definition.{dt_remote_name} > st.{dt_remote_name}
Expand Down
17 changes: 12 additions & 5 deletions meerschaum/connectors/sql/_pipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2015,11 +2015,18 @@ def get_pipe_rowcount(
f"SELECT {', '.join(_cols_names)} FROM {_pipe_name}"
if not remote else get_pipe_query(pipe)
)
query = f"""
WITH src AS ({src})
SELECT COUNT(*)
FROM src
"""
query = (
f"""
WITH src AS ({src})
SELECT COUNT(*)
FROM src
"""
) if self.flavor not in ('mysql', 'mariadb') else (
f"""
SELECT COUNT(*)
FROM ({src}) AS src
"""
)
if begin is not None or end is not None:
query += "WHERE"
if begin is not None:
Expand Down
1 change: 1 addition & 0 deletions meerschaum/core/Pipe/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ class Pipe:
id,
get_val_column,
parents,
children,
target,
_target_legacy,
guess_datetime,
Expand Down
32 changes: 28 additions & 4 deletions meerschaum/core/Pipe/_attributes.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,10 +306,7 @@ def get_val_column(self, debug: bool = False) -> Union[str, None]:
@property
def parents(self) -> List[meerschaum.Pipe]:
"""
Return a list of `meerschaum.Pipe` objects.
These pipes will be synced before this pipe.
NOTE: Not yet in use!
Return a list of `meerschaum.Pipe` objects to be designated as parents.
"""
if 'parents' not in self.parameters:
return []
Expand All @@ -333,6 +330,33 @@ def parents(self) -> List[meerschaum.Pipe]:
return _parents


@property
def children(self) -> List[meerschaum.Pipe]:
"""
Return a list of `meerschaum.Pipe` objects to be designated as children.
"""
if 'children' not in self.parameters:
return []
from meerschaum.utils.warnings import warn
_children_keys = self.parameters['children']
if not isinstance(_children_keys, list):
warn(
f"Please ensure the children for {self} are defined as a list of keys.",
stacklevel = 4
)
return []
from meerschaum import Pipe
_children = []
for keys in _children_keys:
try:
p = Pipe(**keys)
except Exception as e:
warn(f"Unable to build parent with keys '{keys}' for {self}:\n{e}")
continue
_children.append(p)
return _children


@property
def target(self) -> str:
"""
Expand Down
6 changes: 6 additions & 0 deletions meerschaum/core/Pipe/_dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ def enforce_dtypes(self, df: 'pd.DataFrame', debug: bool=False) -> 'pd.DataFrame
except Exception as e:
if debug:
dprint(f"Encountered an error when casting column {d} to type {t}:\n{e}")
if 'int' in str(t.lower()):
try:
new_df[d] = new_df[d].astype('float64').astype(t)
except Exception as e:
if debug:
dprint(f"Was unable to convert to float then {t}.")
return new_df


Expand Down
4 changes: 2 additions & 2 deletions meerschaum/utils/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -1539,10 +1539,10 @@ def items_str(
c = comma_str if commas else ''

if len(items) == 1:
return q + str(items[0]) + q
return q + str(list(items)[0]) + q

if len(items) == 2:
return q + str(items[0]) + q + s + a + s + q + str(items[1]) + q
return q + str(list(items)[0]) + q + s + a + s + q + str(list(items)[1]) + q

sep = q + c + s + q
output = q + sep.join(str(i) for i in items[:-1]) + q
Expand Down
41 changes: 27 additions & 14 deletions meerschaum/utils/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,15 @@
{and_subquery_f}
""",
'mysql': """
UPDATE {target_table_name} AS f
INNER JOIN (SELECT DISTINCT * FROM {patch_table_name}) AS p
ON {and_subquery_f}
UPDATE {target_table_name} AS f,
(SELECT DISTINCT * FROM {patch_table_name}) AS p
{sets_subquery_f}
WHERE
{and_subquery_f}
""",
'mariadb': """
UPDATE {target_table_name} AS f
INNER JOIN (SELECT DISTINCT * FROM {patch_table_name}) AS p
ON {and_subquery_f}
UPDATE {target_table_name} AS f,
(SELECT DISTINCT * FROM {patch_table_name}) AS p
{sets_subquery_f}
WHERE
{and_subquery_f}
Expand Down Expand Up @@ -128,7 +126,10 @@
'FLOAT': 'float64',
'DOUBLE_PRECISION': 'float64',
'DOUBLE': 'float64',
'DECIMAL': 'float64',
'BIGINT': 'Int64',
'INT': 'Int64',
'INTEGER': 'Int64',
'NUMBER': 'float64',
'TIMESTAMP': 'datetime64[ns]',
'TIMESTAMP WITH TIMEZONE': 'datetime64[ns, UTC]',
Expand All @@ -146,6 +147,7 @@
'TIME': 'datetime64[ns]',
'DATE': 'datetime64[ns]',
'DOUBLE': 'float64',
'DECIMAL': 'float64',
'INT': 'Int64',
'BOOL': 'bool',
},
Expand All @@ -154,12 +156,12 @@
### MySQL doesn't allow for casting as BIGINT, so this is a workaround.
DB_FLAVORS_CAST_DTYPES = {
'mariadb': {
'BIGINT': 'DOUBLE',
'BIGINT': 'DECIMAL',
'TINYINT': 'INT',
'TEXT': 'CHAR(10000) CHARACTER SET utf8',
},
'mysql': {
'BIGINT': 'DOUBLE',
'BIGINT': 'DECIMAL',
'TINYINT': 'INT',
'TEXT': 'CHAR(10000) CHARACTER SET utf8',
},
Expand All @@ -169,6 +171,8 @@
'mssql': {
'NVARCHAR COLLATE "SQL Latin1 General CP1 CI AS"': 'NVARCHAR(MAX)',
'NVARCHAR COLLATE "SQL_Latin1_General_CP1_CI_AS"': 'NVARCHAR(MAX)',
'VARCHAR COLLATE "SQL Latin1 General CP1 CI AS"': 'NVARCHAR(MAX)',
'VARCHAR COLLATE "SQL_Latin1_General_CP1_CI_AS"': 'NVARCHAR(MAX)',
},
}
### Map pandas dtypes to flavor-specific dtypes.
Expand Down Expand Up @@ -202,8 +206,8 @@
'float64': {
'timescaledb': 'DOUBLE PRECISION',
'postgresql': 'DOUBLE PRECISION',
'mariadb': 'DOUBLE',
'mysql': 'DOUBLE',
'mariadb': 'DECIMAL',
'mysql': 'DECIMAL',
'mssql': 'FLOAT',
'oracle': 'FLOAT',
'sqlite': 'FLOAT',
Expand Down Expand Up @@ -490,10 +494,19 @@ def get_distinct_col_count(

_col_name = sql_item_name(col, connector.flavor)

_meta_query = f"""
WITH src AS ( {query} ),
dist AS ( SELECT DISTINCT {_col_name} FROM src )
SELECT COUNT(*) FROM dist"""
_meta_query = (
f"""
WITH src AS ( {query} ),
dist AS ( SELECT DISTINCT {_col_name} FROM src )
SELECT COUNT(*) FROM dist"""
) if self.flavor not in ('mysql', 'mariadb') else (
f"""
SELECT COUNT(*)
FROM (
SELECT DISTINCT {_col_name}
FROM ({query}) AS src
) AS dist"""
)

result = connector.value(_meta_query, debug=debug)
try:
Expand Down
Loading

0 comments on commit a9200a7

Please sign in to comment.