Skip to content

Commit

Permalink
🐛 v1.3.1 Fix dtype enforcement issues.
Browse files Browse the repository at this point in the history
  • Loading branch information
bmeares authored Oct 6, 2022
2 parents 1d035a6 + f485061 commit 2ea3cd8
Show file tree
Hide file tree
Showing 15 changed files with 235 additions and 91 deletions.
19 changes: 19 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,25 @@

This is the current release cycle, so stay tuned for future releases!

### v1.3.1

- **Fixed data type enforcement issues.**
A serious bug in data type enforcement has been patched.
- **Allow `Pipe.dtypes` to be edited.**
You can now set keys in `Pipe.dtypes` and persist them with `Pipe.edit()`.
- **Added `Pipe.update()`.**
`Pipe.update()` is an alias to `Pipe.edit(interactive=False)`.
- **`Pipe.delete()` no longer deletes local attributes.**
It still removes `Pipe.id`, but local attributes will now remain intact.
- **Fixed dynamic columns on DuckDB.**
DuckDB does not allow for altering tables when indices are created, so this patch will drop and rebuild indices when tables are altered.
- **Replaced `CLOB` with `NVARCHAR(2000)` on Oracle SQL.**
This may require migrating existing pipes to use the new data type.
- **Enforce integers are of type `INTEGER` on Oracle SQL.**
Lots of data type enforcement has been added for Oracle SQL.
- **Removed datetime warnings when syncing pipes without a datetime column.**
- **Removed grabbing the current time for the sync time if a sync time cannot be determined.**

### v1.3.0: Dynamic Columns

**Improvements**
Expand Down
19 changes: 19 additions & 0 deletions docs/mkdocs/news/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,25 @@

This is the current release cycle, so stay tuned for future releases!

### v1.3.1

- **Fixed data type enforcement issues.**
A serious bug in data type enforcement has been patched.
- **Allow `Pipe.dtypes` to be edited.**
You can now set keys in `Pipe.dtypes` and persist them with `Pipe.edit()`.
- **Added `Pipe.update()`.**
`Pipe.update()` is an alias to `Pipe.edit(interactive=False)`.
- **`Pipe.delete()` no longer deletes local attributes.**
It still removes `Pipe.id`, but local attributes will now remain intact.
- **Fixed dynamic columns on DuckDB.**
DuckDB does not allow for altering tables when indices are created, so this patch will drop and rebuild indices when tables are altered.
- **Replaced `CLOB` with `NVARCHAR(2000)` on Oracle SQL.**
This may require migrating existing pipes to use the new data type.
- **Enforce integers are of type `INTEGER` on Oracle SQL.**
Lots of data type enforcement has been added for Oracle SQL.
- **Removed datetime warnings when syncing pipes without a datetime column.**
- **Removed grabbing the current time for the sync time if a sync time cannot be determined.**

### v1.3.0: Dynamic Columns

**Improvements**
Expand Down
2 changes: 1 addition & 1 deletion meerschaum/config/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
Specify the Meerschaum release version.
"""

__version__ = "1.3.0"
__version__ = "1.3.1"
44 changes: 31 additions & 13 deletions meerschaum/connectors/sql/_pipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,8 @@ def get_pipe_data(
query = f"SELECT * FROM {sql_item_name(pipe.target, self.flavor)}"
where = ""

existing_cols = pipe.get_columns_types(debug=debug)

if not pipe.columns.get('datetime', None):
_dt = pipe.guess_datetime()
dt = sql_item_name(_dt, self.flavor) if _dt else None
Expand All @@ -708,7 +710,7 @@ def get_pipe_data(
)


if begin is not None:
if begin is not None and dt in existing_cols:
begin_da = dateadd_str(
flavor = self.flavor,
datepart = 'minute',
Expand All @@ -717,7 +719,7 @@ def get_pipe_data(
)
where += f"{dt} >= {begin_da}" + (" AND " if end is not None else "")

if end is not None:
if end is not None and dt in existing_cols:
end_da = dateadd_str(
flavor = self.flavor,
datepart = 'minute',
Expand All @@ -735,15 +737,23 @@ def get_pipe_data(
if len(where) > 0:
query += "\nWHERE " + where

if _dt:
if _dt and dt in existing_cols:
query += "\nORDER BY " + dt + " DESC"

if debug:
dprint(f"Getting pipe data with begin = '{begin}' and end = '{end}'")
kw['dtype'] = pipe.dtypes
if self.flavor == 'sqlite':
if _dt and 'datetime' not in kw['dtype'].get(_dt, 'object'):
kw['dtype'][_dt] = 'datetime64[ns]'

existing_cols = pipe.get_columns_types(debug=debug)
dtypes = pipe.dtypes
if dtypes:
if self.flavor == 'sqlite':
if _dt and 'datetime' not in dtypes.get(_dt, 'object'):
dtypes[_dt] = 'datetime64[ns]'
if existing_cols:
dtypes = {col: typ for col, typ in dtypes.items() if col in existing_cols}
if dtypes:
kw['dtypes'] = dtypes

df = self.read(
query,
debug = debug,
Expand Down Expand Up @@ -926,6 +936,9 @@ def sync_pipe(
if debug:
dprint("Fetched data:\n" + str(df))

if not isinstance(df, pd.DataFrame):
df = pipe.enforce_dtypes(df, debug=debug)

### if table does not exist, create it with indices
is_new = False
add_cols_query = None
Expand All @@ -939,16 +952,13 @@ def sync_pipe(
if not self.exec_queries(add_cols_queries, debug=debug):
warn(f"Failed to add new columns to {pipe}.")

if not isinstance(df, pd.DataFrame):
df = pipe.enforce_dtypes(df, debug=debug)

unseen_df, update_df, delta_df = (
pipe.filter_existing(
df, chunksize=chunksize, begin=begin, end=end, debug=debug, **kw
) if check_existing else (df, None, df)
)
if debug:
dprint("Delta data :\n" + str(delta_df))
dprint("Delta data:\n" + str(delta_df))
dprint("Unseen data:\n" + str(unseen_df))
if update_df is not None:
dprint("Update data:\n" + str(update_df))
Expand Down Expand Up @@ -1067,7 +1077,6 @@ def get_sync_time(
is_guess = False

if _dt is None:
warn(f"Unable to determine the column for the sync time of {pipe}.", stack=False)
return None

ASC_or_DESC = "DESC" if newest else "ASC"
Expand Down Expand Up @@ -1478,6 +1487,7 @@ def get_add_columns_queries(
if not pipe.exists(debug=debug):
return []
from meerschaum.utils.sql import get_pd_type, get_db_type, sql_item_name
from meerschaum.utils.misc import flatten_list
table_obj = self.get_pipe_table(pipe, debug=debug)
df_cols_types = {col: str(typ) for col, typ in df.dtypes.items()}
db_cols_types = {col: get_pd_type(str(typ)) for col, typ in table_obj.columns.items()}
Expand All @@ -1498,4 +1508,12 @@ def get_add_columns_queries(
query = query[:-1]
if self.flavor != 'duckdb':
return [query]
return [query] + [q for ix, q in self.get_create_index_queries(pipe, debug=debug)]

drop_index_queries = list(flatten_list(
[q for ix, q in self.get_drop_index_queries(pipe, debug=debug).items()]
))
create_index_queries = list(flatten_list(
[q for ix, q in self.get_create_index_queries(pipe, debug=debug).items()]
))

return drop_index_queries + [query] + create_index_queries
15 changes: 14 additions & 1 deletion meerschaum/connectors/sql/_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,8 @@ def to_sql(

from meerschaum.utils.sql import sql_item_name, table_exists
from meerschaum.connectors.sql._create_engine import flavor_configs
from meerschaum.utils.packages import attempt_import
sqlalchemy = attempt_import('sqlalchemy', debug=debug)

stats = {'target': name, }
### resort to defaults if None
Expand Down Expand Up @@ -559,6 +561,17 @@ def to_sql(
if not success:
warn(f"Unable to drop {name}")


### Enforce NVARCHAR(2000) as text instead of CLOB.
dtype = to_sql_kw.get('dtype', {})
for col, typ in df.dtypes.items():
if str(typ) == 'object':
dtype[col] = sqlalchemy.types.NVARCHAR(2000)
elif str(typ).lower().startswith('int'):
dtype[col] = sqlalchemy.types.INTEGER

to_sql_kw['dtype'] = dtype

try:
with warnings.catch_warnings():
warnings.filterwarnings('ignore', 'case sensitivity issues')
Expand All @@ -575,7 +588,7 @@ def to_sql(
except Exception as e:
if not silent:
warn(str(e))
success, msg = None, str(e)
success, msg = False, str(e)

end = time.perf_counter()
if success:
Expand Down
2 changes: 1 addition & 1 deletion meerschaum/core/Pipe/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class Pipe:
guess_datetime,
)
from ._show import show
from ._edit import edit, edit_definition
from ._edit import edit, edit_definition, update
from ._sync import sync, get_sync_time, exists, filter_existing
from ._delete import delete
from ._drop import drop
Expand Down
4 changes: 3 additions & 1 deletion meerschaum/core/Pipe/_attributes.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ def dtypes(self) -> Union[Dict[str, Any], None]:
from meerschaum.config._patch import apply_patch_to_config
configured_dtypes = self.parameters.get('dtypes', {})
remote_dtypes = self.infer_dtypes(persist=False)
return apply_patch_to_config(remote_dtypes, configured_dtypes)
patched_dtypes = apply_patch_to_config(remote_dtypes, configured_dtypes)
self.parameters['dtypes'] = patched_dtypes
return self.parameters['dtypes']


@dtypes.setter
Expand Down
2 changes: 1 addition & 1 deletion meerschaum/core/Pipe/_delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def delete(
if not isinstance(result, tuple):
return False, f"Received unexpected result from '{self.instance_connector}': {result}"
if result[0]:
to_delete = ['_id', '_attributes', '_columns', '_tags', '_data']
to_delete = ['_id']
for member in to_delete:
if member in self.__dict__:
del self.__dict__[member]
Expand Down
2 changes: 1 addition & 1 deletion meerschaum/core/Pipe/_dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def enforce_dtypes(self, df: 'pd.DataFrame', debug: bool=False) -> 'pd.DataFrame
for col, typ in common_diff_dtypes.items():
if 'datetime' in typ and 'datetime' in common_dtypes[col]:
df_dtypes[col] = typ
detected_dt_cols[col] = (common_dtypes[col], common_diff_dtypes[typ])
detected_dt_cols[col] = (common_dtypes[col], common_diff_dtypes[col])
for col in detected_dt_cols:
del common_diff_dtypes[col]

Expand Down
8 changes: 8 additions & 0 deletions meerschaum/core/Pipe/_edit.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@
from __future__ import annotations
from meerschaum.utils.typing import Any, SuccessTuple

def update(self, *args, **kw) -> SuccessTuple:
"""
Update a pipe's parameters in its instance.
"""
kw['interactive'] = False
return self.edit(*args, **kw)


def edit(
self,
patch: bool = False,
Expand Down
26 changes: 15 additions & 11 deletions meerschaum/core/Pipe/_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,16 +429,16 @@ def filter_existing(
try:
min_dt = pd.to_datetime(df[self.get_columns('datetime')].min(skipna=True)).to_pydatetime()
except Exception as e:
### NOTE: This will fetch the entire pipe!
min_dt = self.get_sync_time(newest=False, debug=debug)
min_dt = None
if not isinstance(min_dt, datetime.datetime) or str(min_dt) == 'NaT':
### min_dt might be None, a user-supplied value, or the sync time.
min_dt = begin
### If `min_dt` is None, use `datetime.utcnow()`.
begin = round_time(
min_dt,
to = 'down'
) - datetime.timedelta(minutes=1)
begin = (
round_time(
min_dt,
to = 'down'
) - datetime.timedelta(minutes=1)
) if min_dt is not None else None

### end is the newest data in the new dataframe
try:
Expand Down Expand Up @@ -480,14 +480,18 @@ def filter_existing(
)
if debug:
dprint("Existing data:\n" + str(backtrack_df), **kw)

### Detect changes between the old target and new source dataframes.
from meerschaum.utils.misc import filter_unseen_df
delta_df = filter_unseen_df(backtrack_df, df, dtypes=self.dtypes, debug=debug)
dprint("Existing dtypes:\n" + str(backtrack_df.dtypes))

### Separate new rows from changed ones.
on_cols = [col for col_key, col in self.columns.items() if col_key != 'value']
on_cols_dtypes = {col: typ for col, typ in self.dtypes.items() if col in on_cols}

### Detect changes between the old target and new source dataframes.
from meerschaum.utils.misc import filter_unseen_df, add_missing_cols_to_df
delta_df = add_missing_cols_to_df(
filter_unseen_df(backtrack_df, df, dtypes=self.dtypes, debug=debug),
on_cols_dtypes,
)
joined_df = pd.merge(
delta_df,
backtrack_df,
Expand Down
15 changes: 10 additions & 5 deletions meerschaum/utils/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -868,13 +868,17 @@ def add_missing_cols_to_df(df: pd.DataFrame, dtypes: Dict[str, Any]) -> pd.DataF
"""
if set(df.columns) == set(dtypes):
return df

from meerschaum.utils.packages import import_pandas
pd = import_pandas()

df = df.copy()
for col, typ in dtypes.items():
if col in df.columns:
continue
df[col] = None
df[col] = df[col].astype(typ)
df[col] = pd.Series([None] * len(df), dtype=typ)
# df[col] = None
# df[col] = df[col].astype(typ)

return df

Expand Down Expand Up @@ -924,9 +928,10 @@ def filter_unseen_df(
```
"""
if old_df is None or len(old_df) == 0:
if old_df is None:
return new_df

from meerschaum.utils.warnings import warn
from meerschaum.utils.packages import import_pandas
pd = import_pandas(debug=debug)

Expand All @@ -942,7 +947,6 @@ def filter_unseen_df(
### Order matters when checking equality.
new_df = new_df[old_df.columns]
except Exception as e:
from meerschaum.utils.warnings import warn
warn(
"Was not able to cast old columns onto new DataFrame. " +
f"Are both DataFrames the same shape? Error:\n{e}",
Expand All @@ -953,6 +957,7 @@ def filter_unseen_df(
### assume the old_df knows what it's doing, even if it's technically wrong.
if dtypes is None:
dtypes = dict(old_df.dtypes)
dtypes = {col: typ for col, typ in old_df.dtypes.items()}
cast_cols = True
try:
new_df = new_df.astype(dtypes)
Expand All @@ -971,7 +976,7 @@ def filter_unseen_df(

return new_df[
~new_df.fillna(pd.NA).apply(tuple, 1).isin(old_df.fillna(pd.NA).apply(tuple, 1))
].reset_index(drop=True)[list(new_df_dtypes.keys())]
].reset_index(drop=True)[new_df_dtypes.keys()]


def replace_pipes_in_dict(
Expand Down
Loading

0 comments on commit 2ea3cd8

Please sign in to comment.