Skip to content

Commit

Permalink
✨ v1.5.3 (#104) Add support for syncing dictionaries and lists (`dtyp…
Browse files Browse the repository at this point in the history
…e=json`).

* ✨ v1.5.3 Add support for syncing dictionaries and lists (`dtype=json`).

* 🐛 Fixed an issue with plugin archive creation.
  • Loading branch information
bmeares authored Jan 16, 2023
1 parent c773463 commit c931515
Show file tree
Hide file tree
Showing 18 changed files with 640 additions and 74 deletions.
38 changes: 38 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,44 @@

This is the current release cycle, so stay tuned for future releases!

### v1.5.3

- **Pipes now support syncing dictionaries and lists.**
Complex columns (dicts or lists) will now be preserved:

```python
import meerschaum as mrsm
pipe = mrsm.Pipe('a', 'b')
pipe.sync([{'a': {'b': 1}}])
df = pipe.get_data()
print(df['a'][0])
# {'b': 1}
```

You can also force strings to be parsed by setting the data type to `json`:

```python
import meerschaum as mrsm
pipe = mrsm.Pipe(
'foo', 'bar',
columns = {'datetime': 'id'},
dtypes = {'data': 'json', 'id': 'Int64'},
)
docs = [{'id': 1, 'data': '{"foo": "bar"}'}]
pipe.sync(docs)
df = pipe.get_data()
print(df['data'][0])
# {'foo': 'bar'}
```

For PostgreSQL-like databases (e.g. TimescaleDB), this is stored as `JSONB` under the hood. For all others, it's stored as the equivalent for `TEXT`.

- **Fixed determining the version when installing plugins.**
Like the `required` list, the `__version__` string must be explicitly set in order for the correct version to be determined.

- **Automatically cast `postgres` to `postgresql`**
When a `SQLConnector` is built with a flavor of `postgres`, it will be automatically set to `postgresql`.

### v1.5.0 – v1.5.2

- **Pipes may now use integers for the `datetime` column.**
Expand Down
38 changes: 38 additions & 0 deletions docs/mkdocs/news/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,44 @@

This is the current release cycle, so stay tuned for future releases!

### v1.5.3

- **Pipes now support syncing dictionaries and lists.**
Complex columns (dicts or lists) will now be preserved:

```python
import meerschaum as mrsm
pipe = mrsm.Pipe('a', 'b')
pipe.sync([{'a': {'b': 1}}])
df = pipe.get_data()
print(df['a'][0])
# {'b': 1}
```

You can also force strings to be parsed by setting the data type to `json`:

```python
import meerschaum as mrsm
pipe = mrsm.Pipe(
'foo', 'bar',
columns = {'datetime': 'id'},
dtypes = {'data': 'json', 'id': 'Int64'},
)
docs = [{'id': 1, 'data': '{"foo": "bar"}'}]
pipe.sync(docs)
df = pipe.get_data()
print(df['data'][0])
# {'foo': 'bar'}
```

For PostgreSQL-like databases (e.g. TimescaleDB), this is stored as `JSONB` under the hood. For all others, it's stored as the equivalent for `TEXT`.

- **Fixed determining the version when installing plugins.**
Like the `required` list, the `__version__` string must be explicitly set in order for the correct version to be determined.

- **Automatically cast `postgres` to `postgresql`**
When a `SQLConnector` is built with a flavor of `postgres`, it will be automatically set to `postgresql`.

### v1.5.0 – v1.5.2

- **Pipes may now use integers for the `datetime` column.**
Expand Down
23 changes: 22 additions & 1 deletion docs/mkdocs/reference/compose.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,16 @@ mkdir awesome-sauce && \
vim mrsm-compose.yaml
```

!!! tip "Plugins directories"

You may set multiple paths for `plugins_dir`. This is very useful if you want to group plugins together. A value of `null` will include the environment's plugins in your project.

```yaml
plugins_dir:
- "./plugins"
- null
```

## 🪖 Commands

If you've used `docker-compose`, you'll catch on to Meerschaum Compose pretty quickly. Here's a quick summary:
Expand Down Expand Up @@ -118,7 +128,7 @@ The supported top-level keys in a Compose file are the following:
- **`root_dir`** (*optional*, default `./root/`)
A path to the root directory; see [`MRSM_ROOT_DIR`](/reference/environment/#mrsm_root_dir).
- **`plugins_dir`** (*optional*, default `./plugins/`)
A path to the plugins directory; see [`MRSM_PLUGINS_DIR`](/reference/environment/#mrsm_plugins_dir).
Either a path or list of paths to the plugins directories. A value of `null` will include the current environment plugins directories in the project. See [`MRSM_PLUGINS_DIR`](/reference/environment/#mrsm_plugins_dir).
- **`plugins`** (*optional*)
A list of plugins expected to be in the plugins directory. Missing plugins will be [installed from `api:mrsm`](/reference/plugins/using-plugins/).
To install from a custom repository, append `@api:<label>` to the plugins' names or set the configuration variable `meerschaum:default_repository`.
Expand All @@ -127,6 +137,17 @@ The supported top-level keys in a Compose file are the following:
- **`environment`** (*optional*)
Additional environment variables to pass to subprocesses.

!!! tip "Accessing the host configuration"

The Meerschaum Compose YAML file also supports Meerschaum symlinks. For example, to alias a new connector `sql:foo` to your host's `sql:main`:

```yaml
config:
meerschaum:
sql:
foo: MRSM{meerschaum:connectors:sql:main}
```

### The `sync` Key

Keys under the root key `sync` are the following:
Expand Down
11 changes: 11 additions & 0 deletions docs/mkdocs/reference/environment.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,17 @@ MRSM_PLUGINS_DIR=plugins \
mrsm show plugins
```

### Multiple Plugins Directories

To allow you to group plugins together, Meerschaum supports a multiple plugins directories at once. Just set `MRSM_PLUGINS_DIR` to a JSON-encoded list of paths:

```bash
export MRSM_PLUGINS_DIR='[
"./plugins",
"./plugins_2"
]'
```

## **`MRSM_<TYPE>_<LABEL>`**

You can temporarily register new connectors in a variable in the form `MRSM_<TYPE>_<LABEL>`, where `<TYPE>` is either `SQL` or `API`, and `<LABEL>` is the label for the connector (converted to lower case). Check here for more information about [environment connectors](/reference/connectors/#-environment-connectors), but in a nutshell, set the variable to the [URI](https://en.wikipedia.org/wiki/Uniform_Resource_Identifier) of your connector.
Expand Down
3 changes: 2 additions & 1 deletion meerschaum/actions/stop.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ def _stop_jobs(
(("Stopped job" + ("s" if len(_quit_daemons) != 1 else '') +
" '" + "', '".join([d.daemon_id for d in _quit_daemons]) + "'.")
if _quit_daemons else '')
+ (("Killed job" + ("s" if len(_kill_daemons) != 1 else '') +
+ (("\n" if _quit_daemons else "")
+ ("Killed job" + ("s" if len(_kill_daemons) != 1 else '') +
" '" + "', '".join([d.daemon_id for d in _kill_daemons]) + "'.")
if _kill_daemons else '')
)
Expand Down
2 changes: 1 addition & 1 deletion meerschaum/config/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
Specify the Meerschaum release version.
"""

__version__ = "1.5.2"
__version__ = "1.5.3"
8 changes: 8 additions & 0 deletions meerschaum/connectors/sql/SQLConnector.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class SQLConnector(Connector):
clear_pipe,
get_pipe_table,
get_pipe_columns_types,
get_to_sql_dtype,
)
from ._plugins import (
register_plugin,
Expand Down Expand Up @@ -118,6 +119,10 @@ def __init__(
as long enough parameters are supplied to the constructor.
"""
if 'uri' in kw:
uri = kw['uri']
if uri.startswith('postgres://'):
uri = uri.replace('postgres://', 'postgresql://', 1)
kw['uri'] = uri
from_uri_params = self.from_uri(kw['uri'], as_dict=True)
label = label or from_uri_params.get('label', None)
from_uri_params.pop('label', None)
Expand Down Expand Up @@ -151,6 +156,9 @@ def __init__(
)
self.flavor = flavor or self.parse_uri(self.__dict__['uri']).get('flavor', None)

if self.flavor == 'postgres':
self.flavor = 'postgresql'

self._debug = debug
### Store the PID and thread at initialization
### so we can dispose of the Pool in child processes or threads.
Expand Down
98 changes: 87 additions & 11 deletions meerschaum/connectors/sql/_pipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ def register_pipe(
'metric_key' : pipe.metric_key,
'location_key' : pipe.location_key,
'parameters' : (
json.dumps(parameters) if self.flavor not in json_flavors else parameters
json.dumps(parameters)
if self.flavor not in json_flavors
else parameters
),
}
query = sqlalchemy.insert(pipes).values(**values)
Expand Down Expand Up @@ -115,8 +117,9 @@ def edit_pipe(
sqlalchemy = attempt_import('sqlalchemy')

values = {
'parameters' : (
json.dumps(parameters) if self.flavor in ('duckdb',)
'parameters': (
json.dumps(parameters)
if self.flavor not in json_flavors
else parameters
),
}
Expand Down Expand Up @@ -685,6 +688,7 @@ def get_pipe_data(
A `pd.DataFrame` of the pipe's data.
"""
import json
from meerschaum.utils.sql import sql_item_name
dtypes = pipe.dtypes
if dtypes:
Expand All @@ -706,9 +710,6 @@ def get_pipe_data(
existing_cols = pipe.get_columns_types(debug=debug)
if existing_cols:
dtypes = {col: typ for col, typ in dtypes.items() if col in existing_cols}
if dtypes:
kw['dtypes'] = dtypes

query = self.get_pipe_data_query(
pipe,
begin = begin,
Expand All @@ -734,6 +735,10 @@ def get_pipe_data(
df = parse_df_datetimes(df, debug=debug) if isinstance(df, pd.DataFrame) else (
[parse_df_datetimes(c, debug=debug) for c in df]
)
for col, typ in dtypes.items():
if typ != 'json':
continue
df[col] = df[col].apply(lambda x: json.loads(x) if x is not None else x)
return df


Expand Down Expand Up @@ -1063,8 +1068,8 @@ def sync_pipe(
from meerschaum.utils.warnings import warn
from meerschaum.utils.debug import dprint
from meerschaum.utils.packages import import_pandas
from meerschaum.utils.sql import get_update_queries, sql_item_name
from meerschaum.utils.misc import generate_password
from meerschaum.utils.sql import get_update_queries, sql_item_name, json_flavors
from meerschaum.utils.misc import generate_password, get_json_cols
from meerschaum import Pipe
import time
import copy
Expand Down Expand Up @@ -1137,6 +1142,7 @@ def sync_pipe(
'name': temp_target,
'if_exists': 'append',
'chunksize': chunksize,
'dtype': self.get_to_sql_dtype(pipe, update_df, update_dtypes=False),
'debug': debug,
})
self.to_sql(update_df, **update_kw)
Expand Down Expand Up @@ -1182,7 +1188,18 @@ def sync_pipe(
'debug': debug,
'as_dict': True,
'chunksize': chunksize,
'dtype': self.get_to_sql_dtype(pipe, unseen_df, update_dtypes=True),
})

### Account for first-time syncs of JSON columns.
json_cols = get_json_cols(unseen_df)
if json_cols:
if not pipe.exists(debug=debug):
pipe.dtypes.update({col: 'json' for col in json_cols})
edit_success, edit_msg = pipe.edit(interactive=False, debug=debug)
if not edit_success:
warn(f"Unable to update JSON dtypes for {pipe}:\n{e}")

stats = self.to_sql(unseen_df, **unseen_kw)
if is_new:
if not self.create_indices(pipe, debug=debug):
Expand Down Expand Up @@ -2293,13 +2310,27 @@ def get_add_columns_queries(
"""
if not pipe.exists(debug=debug):
return []
import copy
from meerschaum.utils.sql import get_pd_type, get_db_type, sql_item_name
from meerschaum.utils.misc import flatten_list
table_obj = self.get_pipe_table(pipe, debug=debug)
df_cols_types = (
{col: str(typ) for col, typ in df.dtypes.items()}
if not isinstance(df, dict) else df
{
col: str(typ)
for col, typ in df.dtypes.items()
}
if not isinstance(df, dict)
else copy.deepcopy(df)
)
if len(df) > 0 and not isinstance(df, dict):
for col, typ in list(df_cols_types.items()):
if typ != 'object':
continue
val = df.iloc[0][col]
if isinstance(val, (dict, list)):
df_cols_types[col] = 'json'
elif isinstance(val, str):
df_cols_types[col] = 'str'
db_cols_types = {col: get_pd_type(str(typ.type)) for col, typ in table_obj.columns.items()}
new_cols = set(df_cols_types) - set(db_cols_types)
if not new_cols:
Expand Down Expand Up @@ -2371,7 +2402,7 @@ def get_alter_columns_queries(
if not altered_cols:
return []

text_type = get_db_type('object', self.flavor)
text_type = get_db_type('str', self.flavor)
altered_cols_types = {
col: text_type
for col in altered_cols
Expand Down Expand Up @@ -2462,3 +2493,48 @@ def get_alter_columns_queries(
))

return drop_index_queries + queries + create_index_queries


def get_to_sql_dtype(
self,
pipe: 'meerschaum.Pipe',
df: 'pd.DataFrame',
update_dtypes: bool = True,
) -> Dict[str, 'sqlalchemy.sql.visitors.TraversibleType']:
"""
Given a pipe and DataFrame, return the `dtype` dictionary for `to_sql()`.
Parameters
----------
pipe: meerschaum.Pipe
The pipe which may contain a `dtypes` parameter.
df: pd.DataFrame
The DataFrame to be pushed via `to_sql()`.
update_dtypes: bool, default True
If `True`, patch the pipe's dtypes onto the DataFrame's dtypes.
Returns
-------
A dictionary with `sqlalchemy` datatypes.
Examples
--------
>>> import pandas as pd
>>> import meerschaum as mrsm
>>>
>>> conn = mrsm.get_connector('sql:memory')
>>> df = pd.DataFrame([{'a': {'b': 1}}])
>>> pipe = mrsm.Pipe('a', 'b', dtypes={'a': 'json'})
>>> get_to_sql_dtype(pipe, df)
{'a': <class 'sqlalchemy.sql.sqltypes.JSON'>}
"""
from meerschaum.utils.sql import get_db_type
df_dtypes = {col: str(typ) for col, typ in df.dtypes.items()}
if update_dtypes:
df_dtypes.update(pipe.dtypes)
return {
col: get_db_type(typ, self.flavor, as_sqlalchemy=True)
for col, typ in df_dtypes.items()
}
Loading

0 comments on commit c931515

Please sign in to comment.