Skip to content

v1.6.8 Ultimate Generators Release!

Compare
Choose a tag to compare
@bmeares bmeares released this 12 May 06:43
· 130 commits to main since this release
432b511

v1.6.8

  • Added as_iterator to Pipe.get_data().
    Passing as_iterator=True (or as_chunks) to Pipe.get_data() returns a generator which returns chunks of Pandas DataFrames.

    Each DataFrame is the result of a Pipe.get_data() call with intermediate datetime bounds between begin and end of size chunk_interval (default datetime.timedelta(days=1) for time-series / 100,000 IDs for integers).

    import meerschaum as mrsm
    
    pipe = mrsm.Pipe(
        'a', 'b',
        columns={'datetime': 'id'},
        dtypes={'id': 'Int64'},
    )
    pipe.sync([
        {'id': 0, 'color': 'red'},
        {'id': 1, 'color': 'blue'},
        {'id': 2, 'color': 'green'},
        {'id': 3, 'color': 'yellow'},
    ])
    
    ### NOTE: due to non-inclusive end bounding,
    ###       chunks sometimes contain
    ###       (chunk_interval - 1) rows.
    chunks = pipe.get_data(
        chunk_interval = 2,
        as_iterator = True,
    )
    for chunk in chunks:
        print(chunk)
    
    #    id color
    # 0   0   red
    # 1   1  blue
    #    id   color
    # 0   2   green
    # 1   3  yellow
  • Add server-side cursor support to SQLConnector.read().
    If chunk_hook is provided, keep an open cursor and stream the chunks one-at-a-time. This allows for processing very large out-of-memory data sets.

    To return the results of the chunk_hook callable rather than a dataframe, pass as_hook_result=True to receive a list of values.

    If as_iterator is provided or chunksize is None, then SQLConnector.read() reverts to the default client-side cursor implementation (which loads the entire result set into memory).

    import meerschaum as mrsm
    conn = mrsm.get_connector()
    
    def process_chunk(df: 'pd.DataFrame', **kw) -> int:
        return len(df)
    
    results = conn.read(
        "very_large_table",
        chunk_hook = process_chunk,
        as_hook_results = True,
        chunksize = 100,
    )
    
    results[:2]
    # [100, 100]
  • Remove --sync-chunks and set its behavior as default.
    Due to the above changes to SQLConnector.read(), sync_chunks now defaults to True in Pipe.sync(). You may disable this behavior with --chunksize 0.