Skip to content

Commit

Permalink
⚡️ v1.4.9 (#92) Performance boost, bugfixes, and more.
Browse files Browse the repository at this point in the history
  • Loading branch information
bmeares authored Nov 13, 2022
2 parents e783609 + 39e1a6a commit e3c3cf3
Show file tree
Hide file tree
Showing 25 changed files with 258 additions and 88 deletions.
26 changes: 26 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,32 @@

This is the current release cycle, so stay tuned for future releases!

### v1.4.9

- **Fixed in-place syncs for aggregate queries.**
In-place SQL syncs which use aggregation functions are now handled correctly. This version addresses differences in column types between backtrack and new data. For example, the following query will now be correctly synced:

```sql
WITH days_src AS (
SELECT *, DATE_TRUNC('day', "datetime") AS days
FROM plugin_stress_test
)
SELECT days, AVG(val) AS avg_value
FROM days_src
GROUP BY days
```

- **Activate virtual environments for custom instance connectors.**
All pipe methods now activate virtual environments for custom instance connectors.

- **Improved database connection performance.**
Cold connections to a SQL database have been sped up by replacing `sqlalchemy_utils` with handwritten logic (JSON for PostgreSQL-like and SQLite).

- **Fixed an issue with virtual environment verification in a portable environment.**
The portable build has been updated to Python 3.9.15, and this patch includes a check to determine the known `site-package` path for a virtual environment of `None` instead of relying on the default user `site-packages` directory.

- **Fixed some environment warnings when starting the API**

### v1.4.5 – v1.4.8

- **Bugfixes and stability improvements.**
Expand Down
26 changes: 26 additions & 0 deletions docs/mkdocs/news/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,32 @@

This is the current release cycle, so stay tuned for future releases!

### v1.4.9

- **Fixed in-place syncs for aggregate queries.**
In-place SQL syncs which use aggregation functions are now handled correctly. This version addresses differences in column types between backtrack and new data. For example, the following query will now be correctly synced:

```sql
WITH days_src AS (
SELECT *, DATE_TRUNC('day', "datetime") AS days
FROM plugin_stress_test
)
SELECT days, AVG(val) AS avg_value
FROM days_src
GROUP BY days
```

- **Activate virtual environments for custom instance connectors.**
All pipe methods now activate virtual environments for custom instance connectors.

- **Improved database connection performance.**
Cold connections to a SQL database have been sped up by replacing `sqlalchemy_utils` with handwritten logic (JSON for PostgreSQL-like and SQLite).

- **Fixed an issue with virtual environment verification in a portable environment.**
The portable build has been updated to Python 3.9.15, and this patch includes a check to determine the known `site-package` path for a virtual environment of `None` instead of relying on the default user `site-packages` directory.

- **Fixed some environment warnings when starting the API**

### v1.4.5 – v1.4.8

- **Bugfixes and stability improvements.**
Expand Down
4 changes: 2 additions & 2 deletions meerschaum/actions/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,8 @@ def _api_start(

env_text = ''
for key, val in env_dict.items():
value = json.dumps(json.dumps(val)) if isinstance(val, dict) else val
env_text += f"{key}={value}\n"
value = json.dumps(val) if isinstance(val, dict) else val
env_text += f"{key}='{value}'\n"
with open(uvicorn_env_path, 'w+', encoding='utf-8') as f:
if debug:
dprint(f"Writing ENV file to '{uvicorn_env_path}'.")
Expand Down
6 changes: 3 additions & 3 deletions meerschaum/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from meerschaum.config._version import __version__
from meerschaum.config._edit import edit_config, write_config
from meerschaum.config.static import _static_config
from meerschaum.config.static import STATIC_CONFIG

from meerschaum.config._paths import (
PERMANENT_PATCH_DIR_PATH,
Expand Down Expand Up @@ -125,7 +125,7 @@ def get_config(
"""
import json

symlinks_key = _static_config()['config']['symlinks_key']
symlinks_key = STATIC_CONFIG['config']['symlinks_key']
if debug:
from meerschaum.utils.debug import dprint
dprint(f"Indexing keys: {keys}", color=False)
Expand Down Expand Up @@ -318,7 +318,7 @@ def write_plugin_config(


### Make sure readline is available for the portable version.
environment_runtime = _static_config()['environment']['runtime']
environment_runtime = STATIC_CONFIG['environment']['runtime']
if environment_runtime in os.environ:
if os.environ[environment_runtime] == 'portable':
from meerschaum.utils.packages import ensure_readline
Expand Down
2 changes: 1 addition & 1 deletion meerschaum/config/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
Specify the Meerschaum release version.
"""

__version__ = "1.4.8"
__version__ = "1.4.9"
28 changes: 28 additions & 0 deletions meerschaum/connectors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,3 +317,31 @@ def load_plugin_connectors():
if not to_import:
return
import_plugins(*to_import)


def get_connector_plugin(
connector: Connector,
) -> Union[str, None, 'meerschaum.Plugin']:
"""
Determine the plugin for a connector.
This is useful for handling virtual environments for custom instance connectors.
Parameters
----------
connector: Connector
The connector which may require a virtual environment.
Returns
-------
A Plugin, 'mrsm', or None.
"""
if not hasattr(connector, 'type'):
return None
from meerschaum import Plugin
return (
Plugin(connector.__module__.replace('plugins.', '').split('.')[0])
if connector.type in custom_types else (
Plugin(connector.label) if connector.type == 'plugin'
else 'mrsm'
)
)
20 changes: 10 additions & 10 deletions meerschaum/connectors/sql/_pipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ def register_pipe(
'metric_key' : pipe.metric_key,
'location_key' : pipe.location_key,
'parameters' : (
json.dumps(parameters) if self.flavor in ('duckdb',) else parameters
# json.dumps(parameters) if self.flavor not in json_flavors else parameters
json.dumps(parameters) if self.flavor not in json_flavors else parameters
),
}
query = sqlalchemy.insert(pipes).values(**values)
Expand Down Expand Up @@ -1384,6 +1383,7 @@ def get_temp_table_name(label: str) -> str:
debug = debug,
)
backtrack_cols = {str(col.name): str(col.type) for col in backtrack_table_obj.columns}
common_cols = [col for col in new_cols if col in backtrack_cols]
on_cols = {
col: new_cols.get(col, 'object')
for col_key, col in pipe.columns.items()
Expand Down Expand Up @@ -1419,17 +1419,17 @@ def get_temp_table_name(label: str) -> str:
+ '\nAND\n'.join([
(
'COALESCE(new.' + sql_item_name(c, self.flavor) + ", "
+ get_null_replacement(typ, self.flavor) + ") "
+ get_null_replacement(new_cols[c], self.flavor) + ") "
+ ' = '
+ 'COALESCE(old.' + sql_item_name(c, self.flavor) + ", "
+ get_null_replacement(typ, self.flavor) + ") "
) for c, typ in new_cols.items()
+ get_null_replacement(backtrack_cols[c], self.flavor) + ") "
) for c in common_cols
])
+ "\nWHERE\n"
+ '\nAND\n'.join([
(
'old.' + sql_item_name(c, self.flavor) + ' IS NULL'
) for c in new_cols
) for c in common_cols
])
# + "\nAND\n"
# + '\nAND\n'.join([
Expand All @@ -1455,17 +1455,17 @@ def get_temp_table_name(label: str) -> str:
+ '\nAND\n'.join([
(
'COALESCE(new.' + sql_item_name(c, self.flavor) + ", "
+ get_null_replacement(typ, self.flavor) + ") "
+ get_null_replacement(new_cols[c], self.flavor) + ") "
+ ' = '
+ 'COALESCE(old.' + sql_item_name(c, self.flavor) + ", "
+ get_null_replacement(typ, self.flavor) + ") "
) for c, typ in new_cols.items()
+ get_null_replacement(backtrack_cols[c], self.flavor) + ") "
) for c in common_cols
])
+ "\nWHERE\n"
+ '\nAND\n'.join([
(
'old.' + sql_item_name(c, self.flavor) + ' IS NULL'
) for c in new_cols
) for c in common_cols
])
# + "\nAND\n"
# + '\nAND\n'.join([
Expand Down
2 changes: 1 addition & 1 deletion meerschaum/connectors/sql/_users.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def register_user(
'password_hash' : user.password_hash,
'user_type' : user.type,
'attributes' : (
json.dumps(user.attributes) if self.flavor in ('duckdb',) else user.attributes
json.dumps(user.attributes) if self.flavor not in json_flavors else user.attributes
),
}
if old_id is not None:
Expand Down
15 changes: 7 additions & 8 deletions meerschaum/connectors/sql/tables/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ def get_tables(
from meerschaum.utils.warnings import error
from meerschaum.connectors.parse import parse_instance_keys
from meerschaum.utils.packages import attempt_import
from meerschaum.utils.sql import json_flavors
from meerschaum import get_connector

sqlalchemy, sqlalchemy_dialects_postgresql, sqlalchemy_utils_types_json = attempt_import(
sqlalchemy, sqlalchemy_dialects_postgresql = attempt_import(
'sqlalchemy',
'sqlalchemy.dialects.postgresql',
'sqlalchemy_utils.types.json',
lazy = False
)
if not sqlalchemy:
Expand Down Expand Up @@ -79,15 +79,14 @@ def get_tables(
dprint(f"Creating tables for connector '{conn}'.")

id_type = sqlalchemy.Integer
params_type = (
sqlalchemy_utils_types_json.JSONType if conn.flavor not in ('duckdb',)
else sqlalchemy.String
)

if conn.flavor in json_flavors:
params_type = sqlalchemy.types.JSON
else:
params_type = sqlalchemy.types.Text
id_names = ('user_id', 'plugin_id', 'pipe_id')
sequences = {
k: sqlalchemy.Sequence(k + '_seq')
for k in id_names
for k in id_names
}
id_col_args = { k: [k, id_type] for k in id_names }
id_col_kw = { k: {'primary_key': True} for k in id_names }
Expand Down
18 changes: 15 additions & 3 deletions meerschaum/core/Pipe/_attributes.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ def attributes(self) -> Dict[str, Any]:
import time
from meerschaum.config import get_config
from meerschaum.config._patch import apply_patch_to_config
from meerschaum.utils.venv import Venv
from meerschaum.connectors import get_connector_plugin

timeout_seconds = get_config('pipes', 'attributes', 'local_cache_timeout_seconds')

if '_attributes' not in self.__dict__:
Expand All @@ -34,7 +37,8 @@ def attributes(self) -> Dict[str, Any]:
if timed_out:
self._attributes_sync_time = now
local_attributes = self.__dict__.get('_attributes', {})
instance_attributes = self.instance_connector.get_pipe_attributes(self)
with Venv(get_connector_plugin(self.instance_connector)):
instance_attributes = self.instance_connector.get_pipe_attributes(self)
self._attributes = apply_patch_to_config(instance_attributes, local_attributes)
return self._attributes

Expand Down Expand Up @@ -199,15 +203,23 @@ def get_columns_types(self, debug: bool = False) -> Union[Dict[str, str], None]:
}
>>>
"""
return self.instance_connector.get_pipe_columns_types(self, debug=debug)
from meerschaum.utils.venv import Venv
from meerschaum.connectors import get_connector_plugin

with Venv(get_connector_plugin(self.instance_connector)):
return self.instance_connector.get_pipe_columns_types(self, debug=debug)


def get_id(self, **kw: Any) -> Union[int, None]:
"""
Fetch a pipe's ID from its instance connector.
If the pipe does not exist, return `None`.
"""
return self.instance_connector.get_pipe_id(self, **kw)
from meerschaum.utils.venv import Venv
from meerschaum.connectors import get_connector_plugin

with Venv(get_connector_plugin(self.instance_connector)):
return self.instance_connector.get_pipe_id(self, **kw)


@property
Expand Down
7 changes: 6 additions & 1 deletion meerschaum/core/Pipe/_bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ def bootstrap(
from meerschaum.utils.formatting._shell import clear_screen
from meerschaum.utils.formatting import print_tuple
from meerschaum.actions import actions
from meerschaum.utils.venv import Venv
from meerschaum.connectors import get_connector_plugin

_clear = get_config('shell', 'clear_screen', patch=True)

Expand All @@ -74,7 +76,10 @@ def bootstrap(
)
except KeyboardInterrupt as e:
return False, f"Aborting bootstrapping {self}."
register_tuple = self.instance_connector.register_pipe(self, debug=debug)

with Venv(get_connector_plugin(self.instance_connector)):
register_tuple = self.instance_connector.register_pipe(self, debug=debug)

if not register_tuple[0]:
return register_tuple

Expand Down
15 changes: 10 additions & 5 deletions meerschaum/core/Pipe/_clear.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,17 @@ def clear(
"""
from meerschaum.utils.warnings import warn
from meerschaum.utils.venv import Venv
from meerschaum.connectors import get_connector_plugin

if self.cache_pipe is not None:
success, msg = self.cache_pipe.clear(begin=begin, end=end, debug=debug, **kw)
if not success:
warn(msg)
return self.instance_connector.clear_pipe(
self,
begin=begin, end=end, params=params, debug=debug,
**kw
)

with Venv(get_connector_plugin(self.instance_connector)):
return self.instance_connector.clear_pipe(
self,
begin=begin, end=end, params=params, debug=debug,
**kw
)
Loading

0 comments on commit e3c3cf3

Please sign in to comment.