Skip to content

Commit

Permalink
šŸ› v1.4.10 (#93) Fixed edge case bugs in syncing, data type enforcemenā€¦
Browse files Browse the repository at this point in the history
ā€¦t, and more.
  • Loading branch information
bmeares authored Nov 18, 2022
2 parents f1e029f + 8878775 commit 656279f
Show file tree
Hide file tree
Showing 13 changed files with 120 additions and 56 deletions.
25 changes: 25 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,31 @@

This is the current release cycle, so stay tuned for future releases!

### v1.4.10

- **Fixed an issue with syncing background jobs.**
The `--name` flag of background jobs with colliding with the `name` keyword argument of `SQLConnector.to_sql()`.

- **Fixed a datetime bounding issue when `datetime` index is omitted.**
If the minimum datetime value of the incoming dataframe cannot be determined, do not bound the `get_data()` request.

- **Keep existing parameters when registering plugin pipes.**
When a pipe is registered with a plugin as its connector, the return value of the `register()` function will be patched with the existing in-memory parameters.

- **Fixed a data type syncing issue.**
In cases where fetched data types do not match the data types in the pipe's table (e.g. automatic datetime columns), a bug has been patched to ensure the correct data types are enforced.

- **Added `Venv` to the root namespace.**
Now you can access virtual environments directly from `mrsm`:

```python
import meerschaum as mrsm

with mrsm.Venv('noaa'):
import pandas as pd
```


### v1.4.9

- **Fixed in-place syncs for aggregate queries.**
Expand Down
25 changes: 25 additions & 0 deletions docs/mkdocs/news/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,31 @@

This is the current release cycle, so stay tuned for future releases!

### v1.4.10

- **Fixed an issue with syncing background jobs.**
The `--name` flag of background jobs with colliding with the `name` keyword argument of `SQLConnector.to_sql()`.

- **Fixed a datetime bounding issue when `datetime` index is omitted.**
If the minimum datetime value of the incoming dataframe cannot be determined, do not bound the `get_data()` request.

- **Keep existing parameters when registering plugin pipes.**
When a pipe is registered with a plugin as its connector, the return value of the `register()` function will be patched with the existing in-memory parameters.

- **Fixed a data type syncing issue.**
In cases where fetched data types do not match the data types in the pipe's table (e.g. automatic datetime columns), a bug has been patched to ensure the correct data types are enforced.

- **Added `Venv` to the root namespace.**
Now you can access virtual environments directly from `mrsm`:

```python
import meerschaum as mrsm

with mrsm.Venv('noaa'):
import pandas as pd
```


### v1.4.9

- **Fixed in-place syncs for aggregate queries.**
Expand Down
1 change: 1 addition & 0 deletions meerschaum/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from meerschaum.core.Pipe import Pipe
from meerschaum.plugins import Plugin
from meerschaum.utils import get_pipes
from meerschaum.utils.venv import Venv
from meerschaum._internal.docs import index as __doc__
from meerschaum.connectors import get_connector
from meerschaum.config import __version__
Expand Down
2 changes: 1 addition & 1 deletion meerschaum/config/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
Specify the Meerschaum release version.
"""

__version__ = "1.4.9"
__version__ = "1.4.10"
9 changes: 6 additions & 3 deletions meerschaum/connectors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,10 +338,13 @@ def get_connector_plugin(
if not hasattr(connector, 'type'):
return None
from meerschaum import Plugin
return (
Plugin(connector.__module__.replace('plugins.', '').split('.')[0])
plugin_name = (
connector.__module__.replace('plugins.', '').split('.')[0]
if connector.type in custom_types else (
Plugin(connector.label) if connector.type == 'plugin'
connector.label
if connector.type == 'plugin'
else 'mrsm'
)
)
plugin = Plugin(plugin_name)
return plugin if plugin.is_installed() else None
2 changes: 1 addition & 1 deletion meerschaum/connectors/api/_login.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def login(
else:
msg = (
f"Failed to log into '{self}' as user '{login_data['username']}'.\n" +
f" Please verify login details for connector '{self}'."
f" Please verify login details for connector '{self}'."
)
if warn:
_warn(msg, stack=False)
Expand Down
43 changes: 25 additions & 18 deletions meerschaum/connectors/sql/_pipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def register_pipe(
return False, f"Failed to register {pipe}."
return True, f"Successfully registered {pipe}."


def edit_pipe(
self,
pipe : meerschaum.Pipe = None,
Expand Down Expand Up @@ -1046,6 +1047,7 @@ def sync_pipe(
from meerschaum.utils.misc import generate_password
from meerschaum import Pipe
import time
import copy
pd = import_pandas()
if df is None:
msg = f"DataFrame is None. Cannot sync {pipe}."
Expand Down Expand Up @@ -1094,7 +1096,12 @@ def sync_pipe(

unseen_df, update_df, delta_df = (
pipe.filter_existing(
df, chunksize=chunksize, begin=begin, end=end, debug=debug, **kw
df,
chunksize = chunksize,
begin = begin,
end = end,
debug = debug,
**kw
) if check_existing else (df, None, df)
)
if debug:
Expand All @@ -1106,14 +1113,14 @@ def sync_pipe(
if update_df is not None and not update_df.empty:
transact_id = generate_password(6)
temp_target = '_' + transact_id + '_' + pipe.target
self.to_sql(
update_df,
name = temp_target,
if_exists = 'append',
chunksize = chunksize,
debug = debug,
**kw
)
update_kw = copy.deepcopy(kw)
update_kw.update({
'name': temp_target,
'if_exists': 'append',
'chunksize': chunksize,
'debug': debug,
})
self.to_sql(update_df, **update_kw)
temp_pipe = Pipe(
pipe.connector_keys + '_', pipe.metric_key, pipe.location_key,
instance = pipe.instance_keys,
Expand Down Expand Up @@ -1148,15 +1155,15 @@ def sync_pipe(
kw.pop('name')

### Insert new data into Pipe's table.
stats = self.to_sql(
unseen_df,
name = pipe.target,
if_exists = if_exists,
debug = debug,
as_dict = True,
chunksize = chunksize,
**kw
)
unseen_kw = copy.deepcopy(kw)
unseen_kw.update({
'name': pipe.target,
'if_exists': if_exists,
'debug': debug,
'as_dict': True,
'chunksize': chunksize,
})
stats = self.to_sql(unseen_df, **unseen_kw)
if is_new:
if not self.create_indices(pipe, debug=debug):
if debug:
Expand Down
12 changes: 4 additions & 8 deletions meerschaum/core/Pipe/_register.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ def register(
A `SuccessTuple` of success, message.
"""
from meerschaum.connectors import custom_types
from meerschaum.utils.formatting import get_console
from meerschaum.utils.venv import Venv
from meerschaum.connectors import get_connector_plugin
from meerschaum.connectors import get_connector_plugin, custom_types
from meerschaum.config._patch import apply_patch_to_config

import warnings
with warnings.catch_warnings():
Expand All @@ -45,12 +45,8 @@ def register(
and
getattr(_conn, 'register', None) is not None
):
if _conn.type == 'plugin':
venv = _conn._plugin
elif _conn.__module__.startswith('plugins'):
venv = _conn.__module__[len('plugins:'):].split('.')[0]
try:
with Venv(venv, debug=debug):
with Venv(get_connector_plugin(_conn), debug=debug):
params = self.connector.register(self)
except Exception as e:
get_console().print_exception()
Expand All @@ -63,7 +59,7 @@ def register(
+ f"{params}"
)
else:
self.parameters = params
self.parameters = apply_patch_to_config(params, self.parameters)

if not self.parameters:
cols = self.columns if self.columns else {'datetime': None, 'id': None}
Expand Down
20 changes: 14 additions & 6 deletions meerschaum/core/Pipe/_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,12 +437,13 @@ def filter_existing(
df = self.enforce_dtypes(df, debug=debug)
### begin is the oldest data in the new dataframe
try:
min_dt = pd.to_datetime(df[self.get_columns('datetime')].min(skipna=True)).to_pydatetime()
min_dt = pd.to_datetime(
df[self.columns['datetime']].min(skipna=True)
).to_pydatetime()
except Exception as e:
min_dt = None
if not isinstance(min_dt, datetime.datetime) or str(min_dt) == 'NaT':
### min_dt might be None, a user-supplied value, or the sync time.
min_dt = begin
min_dt = None
begin = (
round_time(
min_dt,
Expand All @@ -452,9 +453,11 @@ def filter_existing(

### end is the newest data in the new dataframe
try:
max_dt = pd.to_datetime(df[self.get_columns('datetime')].max(skipna=True)).to_pydatetime()
max_dt = pd.to_datetime(
df[self.columns['datetime']].max(skipna=True)
).to_pydatetime()
except Exception as e:
max_dt = end
max_dt = None
if not isinstance(max_dt, datetime.datetime) or str(max_dt) == 'NaT':
max_dt = None

Expand Down Expand Up @@ -507,7 +510,12 @@ def filter_existing(
### Detect changes between the old target and new source dataframes.
from meerschaum.utils.misc import filter_unseen_df, add_missing_cols_to_df
delta_df = add_missing_cols_to_df(
filter_unseen_df(backtrack_df, df, dtypes=self.dtypes, debug=debug),
filter_unseen_df(
backtrack_df,
df,
dtypes = self.dtypes,
debug = debug
),
on_cols_dtypes,
)
joined_df = pd.merge(
Expand Down
18 changes: 1 addition & 17 deletions meerschaum/plugins/_Plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,31 +153,15 @@ def requirements_file_path(self) -> Union[pathlib.Path, None]:
return path


def is_installed(self, try_import: bool = True) -> bool:
def is_installed(self, **kw) -> bool:
"""
Check whether a plugin is correctly installed.
**NOTE:** This plugin will import the plugin's module.
Set `try_import` to `False` to avoid importing.
Parameters
----------
try_import: bool, default True
If `True`, attempt importing the plugin's module.
Returns
-------
A `bool` indicating whether a plugin exists and is successfully imported.
"""
# if not self.__file__:
# return False
return self.__file__ is not None
# try:
# _installed = (
# self.__dict__.get('_module', None) is not None and self.__file__ is not None
# ) if try_import else (self.__file__ is not None)
# except ModuleNotFoundError as e:
# _installed = False
# return _installed


def make_tar(self, debug: bool = False) -> pathlib.Path:
Expand Down
13 changes: 12 additions & 1 deletion meerschaum/utils/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,7 @@ def round_time(

return dt + datetime.timedelta(0, rounding - seconds, - dt.microsecond)


def parse_df_datetimes(
df: 'pd.DataFrame',
debug: bool = False
Expand Down Expand Up @@ -961,11 +962,21 @@ def filter_unseen_df(
### assume the old_df knows what it's doing, even if it's technically wrong.
if dtypes is None:
dtypes = {col: str(typ) for col, typ in old_df.dtypes.items()}

dtypes = {
col: (
str(typ) if str(typ) != 'int64' else 'Int64'
) for col, typ in new_df.dtypes.items()
) for col, typ in dtypes.items()
if col in new_df_dtypes and col in old_df_dtypes
}
for col, typ in new_df_dtypes.items():
if col not in dtypes:
dtypes[col] = typ

for col, typ in {k: v for k, v in dtypes.items()}.items():
if new_df_dtypes.get(col, None) != old_df_dtypes.get(col, None):
### Fallback to object if the types don't match.
dtypes[col] = 'object'

cast_cols = True
try:
Expand Down
5 changes: 4 additions & 1 deletion meerschaum/utils/venv/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,10 @@ def get_python_version(python_path: pathlib.Path) -> Union[str, None]:
if filename == python_versioned_name:
real_path = pathlib.Path(os.path.realpath(python_path))
if not real_path.exists():
python_path.unlink()
try:
python_path.unlink()
except Exception as e:
pass
init_venv(venv, verify=False, force=True, debug=debug)
if not python_path.exists():
raise FileNotFoundError(f"Unable to verify Python symlink:\n{python_path}")
Expand Down
1 change: 1 addition & 0 deletions meerschaum/utils/warnings.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ def _no_stack_sw(message, category, filename, lineno, file=None, line=None):
if not stack:
warnings.showwarning = _old_sw


def exception_with_traceback(
message: str,
exception_class = Exception,
Expand Down

0 comments on commit 656279f

Please sign in to comment.