diff --git a/README.rst b/README.rst index 023e203..bffbde5 100644 --- a/README.rst +++ b/README.rst @@ -75,11 +75,7 @@ Example: "tap_stream_id": "sample-dbname.public.sample-name", "stream": "sample-stream", "database_name": "sample-dbname", - "table_name": "public.sample-name", - "is_view": false, - "key_properties": [ - "id" - ], + "table_name": "public.sample-name" "schema": { "properties": { "id": { @@ -113,14 +109,21 @@ Example: { "metadata": { "selected-by-default": false, - "selected": true + "selected": true, + "is-view": false, + "table-key-properties": ["id"], + "schema-name": "sample-stream", + "valid-replication-keys": [ + "updated_at" + ] }, "breadcrumb": [], }, { "metadata": { "selected-by-default": true, - "sql-datatype": "int2" + "sql-datatype": "int2", + "inclusion": "automatic" }, "breadcrumb": [ "properties", @@ -130,7 +133,8 @@ Example: { "metadata": { "selected-by-default": true, - "sql-datatype": "varchar" + "sql-datatype": "varchar", + "inclusion": "available" }, "breadcrumb": [ "properties", @@ -140,7 +144,8 @@ Example: { "metadata": { "selected-by-default": true, - "sql-datatype": "datetime" + "sql-datatype": "datetime", + "inclusion": "available", }, "breadcrumb": [ "properties", @@ -171,7 +176,8 @@ Example: { "breadcrumb": [], "metadata": { - "selected-by-default": false + "selected-by-default": false, + ... } } ] @@ -187,7 +193,8 @@ Example: "breadcrumb": [], "metadata": { "selected": true, - "selected-by-default": false + "selected-by-default": false, + ... } } ] @@ -217,21 +224,24 @@ Incremental replication works in conjunction with a state file to only extract n time the tap is invoked i.e continue from the last synced data. To use incremental replication, we need to add the ``replication_method`` and ``replication_key`` -to the top level under each stream in the ``catalog.json`` file. +to the streams (tables) metadata in the ``catalog.json`` file. Example: .. code-block:: json - { - "streams": [ - { - "replication_method": "INCREMENTAL", - "replication_key": "updated_at", + "metadata": [ + { + "breadcrumb": [], + "metadata": { + "selected": true, + "selected-by-default": false, + "replication-method": "INCREMENTAL", + "replication-key": "updated_at", ... } - ] - } + } + ] We can then invoke the tap again in sync mode. This time the output will have ``STATE`` messages that contains a ``replication_key_value`` and ``bookmark`` for data that were extracted. diff --git a/tap_redshift/__init__.py b/tap_redshift/__init__.py index ca8e096..473da72 100644 --- a/tap_redshift/__init__.py +++ b/tap_redshift/__init__.py @@ -36,7 +36,7 @@ from tap_redshift import resolve -__version__ = '1.0.0b7' +__version__ = '1.0.0b8' LOGGER = singer.get_logger() @@ -126,24 +126,22 @@ def discover_catalog(conn, db_schema): schema = Schema(type='object', properties={ c['name']: schema_for_column(c) for c in cols}) - metadata = create_column_metadata(cols) + key_properties = [ + column for column in table_pks.get(table_name, []) + if schema.properties[column].inclusion != 'unsupported'] + is_view = table_types.get(table_name) == 'VIEW' + db_name = conn.get_dsn_parameters()['dbname'] + metadata = create_column_metadata( + db_name, cols, is_view, table_name, key_properties) tap_stream_id = '{}.{}'.format( - conn.get_dsn_parameters()['dbname'], qualified_table_name) + db_name, qualified_table_name) entry = CatalogEntry( - database=conn.get_dsn_parameters()['dbname'], tap_stream_id=tap_stream_id, stream=table_name, schema=schema, table=qualified_table_name, metadata=metadata) - key_properties = [ - column for column in table_pks.get(table_name, []) - if schema.properties[column].inclusion != 'unsupported'] - - if key_properties: - entry.key_properties = key_properties - entry.is_view = table_types.get(table_name) == 'VIEW' entries.append(entry) return Catalog(entries) @@ -200,9 +198,20 @@ def schema_for_column(c): return result -def create_column_metadata(cols): +def create_column_metadata( + db_name, cols, is_view, + table_name, key_properties=[]): mdata = metadata.new() mdata = metadata.write(mdata, (), 'selected-by-default', False) + if not is_view: + mdata = metadata.write( + mdata, (), 'table-key-properties', key_properties) + else: + mdata = metadata.write( + mdata, (), 'view-key-properties', key_properties) + mdata = metadata.write(mdata, (), 'is-view', is_view) + mdata = metadata.write(mdata, (), 'schema-name', table_name) + mdata = metadata.write(mdata, (), 'database-name', db_name) valid_rep_keys = [] for c in cols: @@ -219,6 +228,10 @@ def create_column_metadata(cols): ('properties', c['name']), 'sql-datatype', c['type'].lower()) + mdata = metadata.write(mdata, + ('properties', c['name']), + 'inclusion', + schema.inclusion) if valid_rep_keys: mdata = metadata.write(mdata, (), 'valid-replication-keys', valid_rep_keys) @@ -297,9 +310,8 @@ def sync_table(connection, catalog_entry, state): formatted_start_date = str(datetime.datetime.strptime( start_date, '%Y-%m-%dT%H:%M:%SZ')) - replication_key = singer.get_bookmark(state, - tap_stream_id, - 'replication_key') + replication_key = metadata.to_map(catalog_entry.metadata).get( + (), {}).get('replication-key') replication_key_value = None bookmark_is_empty = state.get('bookmarks', {}).get( tap_stream_id) is None @@ -389,6 +401,13 @@ def generate_messages(conn, db_schema, catalog, state): for catalog_entry in catalog.streams: state = singer.set_currently_syncing(state, catalog_entry.tap_stream_id) + catalog_md = metadata.to_map(catalog_entry.metadata) + + if catalog_md.get((), {}).get('is-view'): + key_properties = catalog_md.get((), {}).get('view-key-properties') + else: + key_properties = catalog_md.get((), {}).get('table-key-properties') + bookmark_properties = catalog_md.get((), {}).get('replication-key') # Emit a state message to indicate that we've started this stream yield singer.StateMessage(value=copy.deepcopy(state)) @@ -397,7 +416,8 @@ def generate_messages(conn, db_schema, catalog, state): yield singer.SchemaMessage( stream=catalog_entry.stream, schema=catalog_entry.schema.to_dict(), - key_properties=catalog_entry.key_properties) + key_properties=key_properties, + bookmark_properties=bookmark_properties) # Emit a RECORD message for each record in the result set with metrics.job_timer('sync_table') as timer: @@ -439,33 +459,37 @@ def build_state(raw_state, catalog): for catalog_entry in catalog.streams: tap_stream_id = catalog_entry.tap_stream_id - if catalog_entry.replication_key: - state = singer.write_bookmark(state, - tap_stream_id, - 'replication_key', - catalog_entry.replication_key) + catalog_metadata = metadata.to_map(catalog_entry.metadata) + replication_method = catalog_metadata.get( + (), {}).get('replication-method') + raw_stream_version = singer.get_bookmark( + raw_state, tap_stream_id, 'version') + + if replication_method == 'INCREMENTAL': + replication_key = catalog_metadata.get( + (), {}).get('replication-key') + + state = singer.write_bookmark( + state, tap_stream_id, 'replication_key', replication_key) # Only keep the existing replication_key_value if the # replication_key hasn't changed. raw_replication_key = singer.get_bookmark(raw_state, tap_stream_id, 'replication_key') - if raw_replication_key == catalog_entry.replication_key: - rep_key_val = singer.get_bookmark(raw_state, - tap_stream_id, - 'replication_key_value') - raw_replication_key_value = rep_key_val + if raw_replication_key == replication_key: + raw_replication_key_value = singer.get_bookmark( + raw_state, tap_stream_id, 'replication_key_value') state = singer.write_bookmark(state, tap_stream_id, 'replication_key_value', raw_replication_key_value) - # Persist any existing version, even if it's None - if raw_state.get('bookmarks', {}).get(tap_stream_id): - raw_stream_version = singer.get_bookmark(raw_state, - tap_stream_id, - 'version') + if raw_stream_version is not None: + state = singer.write_bookmark( + state, tap_stream_id, 'version', raw_stream_version) + elif replication_method == 'FULL_TABLE' and raw_stream_version is None: state = singer.write_bookmark(state, tap_stream_id, 'version', diff --git a/tap_redshift/resolve.py b/tap_redshift/resolve.py index 10d45d9..3f38442 100644 --- a/tap_redshift/resolve.py +++ b/tap_redshift/resolve.py @@ -118,12 +118,9 @@ def resolve_catalog(discovered, catalog, state): result.streams.append(CatalogEntry( tap_stream_id=catalog_entry.tap_stream_id, stream=catalog_entry.stream, - database=catalog_entry.database, table=catalog_entry.table, - is_view=catalog_entry.is_view, schema=schema, - replication_key=catalog_entry.replication_key, - key_properties=catalog_entry.key_properties + metadata=catalog_entry.metadata )) return result diff --git a/tests/conftest.py b/tests/conftest.py index ab9d899..3a1cc28 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -83,9 +83,7 @@ def column_specs_cursor(): def expected_catalog_from_db(): return Catalog.from_dict({'streams': [ {'tap_stream_id': 'test-db.public.table1', - 'database_name': 'test-db', 'table_name': 'public.table1', - 'key_properties': ['col1'], 'schema': { 'properties': { 'col1': { @@ -109,33 +107,39 @@ def expected_catalog_from_db(): 'format': 'date-time', 'type': 'string'}}, 'type': 'object'}, - 'is_view': False, 'stream': 'table1', 'metadata': [ {'breadcrumb': (), 'metadata': {'selected-by-default': False, 'valid-replication-keys': [ - 'col3', 'col4', 'col5']}}, + 'col3', 'col4', 'col5'], + 'table-key-properties': ['col1'], + 'is-view': False, + 'schema-name': 'table1', + 'database-name': 'test-db'}}, {'breadcrumb': ('properties', 'col1'), 'metadata': {'selected-by-default': True, - 'sql-datatype': 'int2'}}, + 'sql-datatype': 'int2', + 'inclusion': 'available'}}, {'breadcrumb': ('properties', 'col2'), 'metadata': {'selected-by-default': True, - 'sql-datatype': 'float8'}}, + 'sql-datatype': 'float8', + 'inclusion': 'available'}}, {'breadcrumb': ('properties', 'col3'), 'metadata': {'selected-by-default': True, - 'sql-datatype': 'timestamptz'}}, + 'sql-datatype': 'timestamptz', + 'inclusion': 'available'}}, {'breadcrumb': ('properties', 'col4'), 'metadata': {'selected-by-default': True, - 'sql-datatype': 'timestamp'}}, + 'sql-datatype': 'timestamp', + 'inclusion': 'available'}}, {'breadcrumb': ('properties', 'col5'), 'metadata': {'selected-by-default': True, - 'sql-datatype': 'timestamp with time zone'}} + 'sql-datatype': 'timestamp with time zone', + 'inclusion': 'available'}} ]}, {'tap_stream_id': 'test-db.public.table2', - 'database_name': 'test-db', 'table_name': 'public.table2', - 'key_properties': ['col1', 'col2'], 'schema': { 'properties': { 'col1': { @@ -148,7 +152,6 @@ def expected_catalog_from_db(): 'type': ['null', 'boolean']}}, 'type': 'object' }, - 'is_view': False, 'stream': 'table2', 'metadata': [ {'breadcrumb': (), @@ -156,15 +159,20 @@ def expected_catalog_from_db(): 'forced-replication-method': { 'replication-method': 'FULL_TABLE', 'reason': 'No replication keys found from table' - }}}, + }, + 'table-key-properties': ['col1', 'col2'], + 'is-view': False, + 'schema-name': 'table2', + 'database-name': 'test-db'}}, {'breadcrumb': ('properties', 'col1'), 'metadata': {'selected-by-default': True, - 'sql-datatype': 'int4'}}, + 'sql-datatype': 'int4', + 'inclusion': 'available'}}, {'breadcrumb': ('properties', 'col2'), 'metadata': {'selected-by-default': True, - 'sql-datatype': 'bool'}}]}, + 'sql-datatype': 'bool', + 'inclusion': 'available'}}]}, {'tap_stream_id': 'test-db.public.view1', - 'database_name': 'test-db', 'table_name': 'public.view1', 'schema': { 'properties': { @@ -175,7 +183,6 @@ def expected_catalog_from_db(): 'inclusion': 'unsupported', 'description': 'Unsupported column type unknown'}}, 'type': 'object'}, - 'is_view': True, 'stream': 'view1', 'metadata': [ {'breadcrumb': (), @@ -183,13 +190,19 @@ def expected_catalog_from_db(): 'forced-replication-method': { 'replication-method': 'FULL_TABLE', 'reason': 'No replication keys found from table' - }}}, + }, + 'view-key-properties': [], + 'is-view': True, + 'schema-name': 'view1', + 'database-name': 'test-db'}}, {'breadcrumb': ('properties', 'col1'), 'metadata': {'selected-by-default': True, - 'sql-datatype': 'varchar'}}, + 'sql-datatype': 'varchar', + 'inclusion': 'available'}}, {'breadcrumb': ('properties', 'col2'), 'metadata': {'selected-by-default': False, - 'sql-datatype': 'unknown'}} + 'sql-datatype': 'unknown', + 'inclusion': 'unsupported'}} ]} ]}) diff --git a/tests/tap_redshift/test_tap_redshift.py b/tests/tap_redshift/test_tap_redshift.py index 31d8fb2..748e7e7 100644 --- a/tests/tap_redshift/test_tap_redshift.py +++ b/tests/tap_redshift/test_tap_redshift.py @@ -225,7 +225,8 @@ def test_discover_catalog(self, discovery_conn, expected_catalog_from_db): equal_to(metadata.get( expected_metadata, bcrumb, mdata_key))) - assert_that(actual_entry, equal_to(expected_entry)) + assert_that(sorted(actual_entry.to_dict()), + equal_to(sorted(expected_entry.to_dict()))) def test_create_column_metadata(self): cols = [{'pos': 1, 'name': 'col1', 'type': 'int2', 'nullable': 'NO'}, @@ -233,18 +234,29 @@ def test_create_column_metadata(self): 'nullable': 'YES'}, {'pos': 3, 'name': 'col3', 'type': 'timestamptz', 'nullable': 'NO'}] + db_name = 'test-db' + table_name = 'test_table' + key_properties = ['col1'] + is_view = False expected_mdata = metadata.new() metadata.write(expected_mdata, (), 'selected-by-default', False) + metadata.write(expected_mdata, (), 'valid-replication-keys', ['col3']) + metadata.write(expected_mdata, (), + 'table-key-properties', key_properties) + metadata.write(expected_mdata, (), 'is-view', is_view) + metadata.write(expected_mdata, (), 'schema-name', table_name) + metadata.write(expected_mdata, (), 'database-name', db_name) for col in cols: - metadata.write(expected_mdata, (), - 'valid-replication-keys', - ['col3']) + schema = tap_redshift.schema_for_column(col) metadata.write(expected_mdata, ( 'properties', col['name']), 'selected-by-default', True) metadata.write(expected_mdata, ( 'properties', col['name']), 'sql-datatype', col['type']) + metadata.write(expected_mdata, ( + 'properties', col['name']), 'inclusion', schema.inclusion) - actual_mdata = tap_redshift.create_column_metadata(cols) + actual_mdata = tap_redshift.create_column_metadata( + db_name, cols, is_view, table_name, key_properties) assert_that(actual_mdata, equal_to(metadata.to_list(expected_mdata))) def test_type_int4(self): @@ -296,18 +308,44 @@ def test_type_date_time(self): expected_schema = stream_schema['schema']['properties']['created_at'] assert_that(column_schema, equal_to(expected_schema)) - def test_valid_rep_keys(self, discovery_conn, expected_catalog_from_db): + def test_table_metadata(self, discovery_conn, expected_catalog_from_db): actual_catalog = tap_redshift.discover_catalog(discovery_conn, 'public') for i, actual_entry in enumerate(actual_catalog.streams): expected_entry = expected_catalog_from_db.streams[i] actual_metadata = metadata.to_map(actual_entry.metadata) expected_metadata = metadata.to_map(expected_entry.metadata) + actual_valid_rep_keys = metadata.get( actual_metadata, (), 'valid-replication-keys') expected_valid_rep_keys = metadata.get( expected_metadata, (), 'valid-replication-keys') + assert_that(actual_valid_rep_keys, equal_to(expected_valid_rep_keys)) + actual_table_key_properties = metadata.get( + actual_metadata, (), 'table-key-properties') + expected_table_key_properties = metadata.get( + expected_metadata, (), 'table-key-properties') + + assert_that(actual_table_key_properties, + equal_to(expected_table_key_properties)) + + actual_schema_name = metadata.get( + actual_metadata, (), 'schema-name') + expected_schema_name = metadata.get( + expected_metadata, (), 'schema-name') + + assert_that(actual_schema_name, + equal_to(expected_schema_name)) + + actual_is_view = metadata.get( + actual_metadata, (), 'is-view') + expected_is_view = metadata.get( + expected_metadata, (), 'is-view') + + assert_that(actual_is_view, + equal_to(expected_is_view)) + # TODO write tests for full and incremental sync