From fe70da7f9829b3bea81966159ae4719f7fc1beb0 Mon Sep 17 00:00:00 2001 From: Kethan Cherukuri <105211331+kethan1122@users.noreply.github.com> Date: Mon, 20 Mar 2023 12:07:54 +0530 Subject: [PATCH] TDL-22338 add new fields, streams and bug fixes (#7) * add account_balance stream * adds account_balance stream * add account_sid field to keys stream * change query params for usage_records and alerts and pagination_url for alerts stream * next_page url * revert bookmarking >= and fallback to default value for unset config values * Tdl 22160 add integration tests (#6) * setup circleci * Update config.yml * fix community issues * fix pagination issue * update query params for calls stream * updated replication_key and query_param for calls stream * add basetest for integration tests * remove replication_keys for full_table stream accounts * changelog and version bump * update changelog * remove foreign keys * add discovery test * add pagination test * add all fields and automatic fields * deleted old tests * deleted old test and removed extra lines in circleci config * add start_date and bookmark test * PR comments * update comments * PR comments * update bookmarks test * update pagination test * changelog and version update * update README --- .circleci/config.yml | 2 +- CHANGELOG.md | 11 + README.md | 63 ++- setup.py | 2 +- tap_twilio/__init__.py | 5 +- .../schemas/conference_participants.json | 3 + tap_twilio/schemas/conferences.json | 6 + .../schemas/incoming_phone_numbers.json | 3 + tap_twilio/schemas/messages.json | 3 + tap_twilio/streams.py | 21 +- tap_twilio/sync.py | 34 +- tests/__init__.py | 0 tests/base.py | 416 ++++++++++++++++++ tests/test_all_fields.py | 84 ++++ tests/test_automatic_fields.py | 95 ++++ tests/test_bookmarks.py | 168 +++++++ tests/test_configuration.py | 41 -- tests/test_discovery.py | 141 ++++++ tests/test_pagination.py | 72 +++ tests/test_start_date.py | 137 ++++++ tests/test_sync.py | 117 ----- 21 files changed, 1221 insertions(+), 203 deletions(-) delete mode 100644 tests/__init__.py create mode 100644 tests/base.py create mode 100644 tests/test_all_fields.py create mode 100644 tests/test_automatic_fields.py create mode 100644 tests/test_bookmarks.py delete mode 100644 tests/test_configuration.py create mode 100644 tests/test_discovery.py create mode 100644 tests/test_pagination.py create mode 100644 tests/test_start_date.py delete mode 100644 tests/test_sync.py diff --git a/.circleci/config.yml b/.circleci/config.yml index ea1222f..3b03d4c 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -50,4 +50,4 @@ workflows: - master jobs: - build: - context: circleci-user \ No newline at end of file + context: circleci-user diff --git a/CHANGELOG.md b/CHANGELOG.md index d3257d1..3a1a424 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,16 @@ # Changelog +## 0.1.0 + * Fixes following issues [#7](https://github.com/singer-io/tap-twilio/pull/7) + * Adds `emergency_address_status` field to `incoming_phone_numbers` stream + * Adds `label` field to `conference_participants` stream + * Adds `reason_conference_ended` and `call_sid_ending_conference` fields to `conferences` stream + * Adds `price_unit` field to `messages` stream + * Adds `account_sid` field to `keys` stream during process_records + * Adds `account_balance` stream implementation + * Updates/reverts query_params for `alerts` and `usage_records` streams + * Fixes pagination issue for `alerts` issue + ## 0.0.3 * Fixes following issues [#4](https://github.com/singer-io/tap-twilio/pull/4) * Fixes pagination issue diff --git a/README.md b/README.md index 01cfe76..71bdd13 100644 --- a/README.md +++ b/README.md @@ -8,27 +8,29 @@ This tap: - Pulls raw data from the [twilio Advertiser API]([xxx](https://support.twilio.com/s/advertiser-api-documentation)) - Extracts the following resources: - - [accounts](https://www.twilio.com/docs/usage/api/account#read-multiple-account-resources) - - [addresses](https://www.twilio.com/docs/usage/api/address#read-multiple-address-resources) - - [dependent_phone_numbers](https://www.twilio.com/docs/usage/api/address?code-sample=code-list-dependent-pns-subresources&code-language=curl&code-sdk-version=json#instance-subresources) - - [applications](https://www.twilio.com/docs/usage/api/applications#read-multiple-application-resources) - - [available_phone_number_countries](https://www.twilio.com/docs/phone-numbers/api/availablephonenumber-resource#read-a-list-of-countries) - - [available_phone_numbers_local](https://www.twilio.com/docs/phone-numbers/api/availablephonenumberlocal-resource#read-multiple-availablephonenumberlocal-resources) - - [available_phone_numbers_mobile](https://www.twilio.com/docs/phone-numbers/api/availablephonenumber-mobile-resource#read-multiple-availablephonenumbermobile-resources) - - [available_phone_numbers_toll_free](https://www.twilio.com/docs/phone-numbers/api/availablephonenumber-tollfree-resource#read-multiple-availablephonenumbertollfree-resources) - - [incoming_phone_numbers](https://www.twilio.com/docs/phone-numbers/api/incomingphonenumber-resource#read-multiple-incomingphonenumber-resources) - - [keys](https://www.twilio.com/docs/usage/api/keys#read-a-key-resource) - - [calls](https://www.twilio.com/docs/sms/api/message-resource#read-multiple-message-resources) - - [conferences](https://www.twilio.com/docs/voice/api/conference-resource#read-multiple-conference-resources) - - [conference_participants](https://www.twilio.com/docs/voice/api/conference-participant-resource#read-multiple-participant-resources) - - [outgoing_caller_ids](https://www.twilio.com/docs/voice/api/outgoing-caller-ids#outgoingcallerids-list-resource) - - [recordings](https://www.twilio.com/docs/voice/api/recording#read-multiple-recording-resources) - - [transcriptions](https://www.twilio.com/docs/voice/api/recording-transcription?code-sample=code-read-list-all-transcriptions&code-language=curl&code-sdk-version=json#read-multiple-transcription-resources) - - [queues](https://www.twilio.com/docs/voice/api/queue-resource#read-multiple-queue-resources) - - [message_media](https://www.twilio.com/docs/sms/api/media-resource#read-multiple-media-resources) - - [usage_records](https://www.twilio.com/docs/usage/api/usage-record#read-multiple-usagerecord-resources) - - [usage_triggers](https://www.twilio.com/docs/usage/api/usage-trigger#read-multiple-usagetrigger-resources) - - [alerts](https://www.twilio.com/docs/usage/monitor-alert#read-multiple-alert-resources) +- [accounts](https://www.twilio.com/docs/usage/api/account#read-multiple-account-resources) + - [account_balance](https://www.twilio.com/docs/usage/api/account#read-multiple-account-resources) + - [addresses](https://www.twilio.com/docs/usage/api/address#read-multiple-address-resources) + - [dependent_phone_numbers](https://www.twilio.com/docs/usage/api/address?code-sample=code-list-dependent-pns-subresources&code-language=curl&code-sdk-version=json#instance-subresources) + - [applications](https://www.twilio.com/docs/usage/api/applications#read-multiple-application-resources) + - [available_phone_number_countries](https://www.twilio.com/docs/phone-numbers/api/availablephonenumber-resource#read-a-list-of-countries) + - [available_phone_numbers_local](https://www.twilio.com/docs/phone-numbers/api/availablephonenumberlocal-resource#read-multiple-availablephonenumberlocal-resources) + - [available_phone_numbers_mobile](https://www.twilio.com/docs/phone-numbers/api/availablephonenumber-mobile-resource#read-multiple-availablephonenumbermobile-resources) + - [available_phone_numbers_toll_free](https://www.twilio.com/docs/phone-numbers/api/availablephonenumber-tollfree-resource#read-multiple-availablephonenumbertollfree-resources) + - [incoming_phone_numbers](https://www.twilio.com/docs/phone-numbers/api/incomingphonenumber-resource#read-multiple-incomingphonenumber-resources) + - [keys](https://www.twilio.com/docs/usage/api/keys#read-a-key-resource) + - [calls](https://www.twilio.com/docs/voice/api/call-resource#read-multiple-call-resources) + - [conferences](https://www.twilio.com/docs/voice/api/conference-resource#read-multiple-conference-resources) + - [conference_participants](https://www.twilio.com/docs/voice/api/conference-participant-resource#read-multiple-participant-resources) + - [outgoing_caller_ids](https://www.twilio.com/docs/voice/api/outgoing-caller-ids#outgoingcallerids-list-resource) + - [recordings](https://www.twilio.com/docs/voice/api/recording#read-multiple-recording-resources) + - [transcriptions](https://www.twilio.com/docs/voice/api/recording-transcription?code-sample=code-read-list-all-transcriptions&code-language=curl&code-sdk-version=json#read-multiple-transcription-resources) + - [queues](https://www.twilio.com/docs/voice/api/queue-resource#read-multiple-queue-resources) + - [messages](https://www.twilio.com/docs/sms/api/message-resource#read-multiple-message-resources) + - [message_media](https://www.twilio.com/docs/sms/api/media-resource#read-multiple-media-resources) + - [usage_records](https://www.twilio.com/docs/usage/api/usage-record#read-multiple-usagerecord-resources) + - [usage_triggers](https://www.twilio.com/docs/usage/api/usage-trigger#read-multiple-usagetrigger-resources) +- [alerts](https://www.twilio.com/docs/usage/monitor-alert#read-multiple-alert-resources) - Outputs the schema for each resource - Incrementally pulls data based on the input state @@ -45,9 +47,17 @@ This tap: - Transformations: subresources_to_array +[account_balance](https://www.twilio.com/docs/usage/api/account#read-multiple-account-resources) +- Endpoint: https://api.twilio.com/2010-04-01/Accounts/{AccountSid}/Balance.json +- Parent: accounts +- Primary key fields: account_sid +- Replication strategy: FULL_TABLE +- Transformations: None + + [addresses](https://www.twilio.com/docs/usage/api/address#read-multiple-address-resources) - Endpoint: https://api.twilio.com/2010-04-01/Accounts/{AccountSid}/Addresses.json -- Parent: account +- Parent: accounts - Primary key fields: sid - Replication strategy: INCREMENTAL - Transformations: subresources_to_array @@ -116,6 +126,7 @@ This tap: - Replication strategy: INCREMENTAL - Transformations: subresources_to_array + [calls](https://www.twilio.com/docs/sms/api/message-resource#read-multiple-message-resources) - Endpoint: https://api.twilio.com/2010-04-01/Accounts/{AccountSid}/Calls.json - Parent: accounts @@ -172,6 +183,14 @@ This tap: - Transformations: subresources_to_array +[messages](https://www.twilio.com/docs/sms/api/message-resource#read-multiple-message-resources) +- Endpoint: https://api.twilio.com/2010-04-01/Accounts/{AccountSid}/Messages.json +- Parent: accounts +- Primary key fields: sid +- Replication strategy: INCREMENTAL +- Transformations: subresources_to_array + + [message_media](https://www.twilio.com/docs/sms/api/media-resource#read-multiple-media-resources) - Endpoint: https://api.twilio.com/2010-04-01/Accounts/{AccountSid}/Messages/{ParentId}/Media.json - Parent: messages diff --git a/setup.py b/setup.py index d7c4682..7a53ea4 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from setuptools import setup, find_packages setup(name='tap-twilio', - version='0.0.3', + version='0.1.0', description='Singer.io tap for extracting data from the Twilio API', author='jeff.huth@bytecode.io', classifiers=['Programming Language :: Python :: 3 :: Only'], diff --git a/tap_twilio/__init__.py b/tap_twilio/__init__.py index 39cbb08..67986b4 100644 --- a/tap_twilio/__init__.py +++ b/tap_twilio/__init__.py @@ -2,9 +2,7 @@ import sys import json -import argparse import singer -from singer import metadata, utils from tap_twilio.client import TwilioClient from tap_twilio.discover import discover from tap_twilio.sync import sync @@ -14,8 +12,7 @@ REQUIRED_CONFIG_KEYS = [ 'account_sid', 'auth_token', - 'start_date', - 'user_agent' + 'start_date' ] def do_discover(): diff --git a/tap_twilio/schemas/conference_participants.json b/tap_twilio/schemas/conference_participants.json index c7379d5..c4c2a46 100644 --- a/tap_twilio/schemas/conference_participants.json +++ b/tap_twilio/schemas/conference_participants.json @@ -22,6 +22,9 @@ "end_conference_on_exit": { "type": ["null", "boolean"] }, + "label": { + "type": ["null", "string"] + }, "muted": { "type": ["null", "boolean"] }, diff --git a/tap_twilio/schemas/conferences.json b/tap_twilio/schemas/conferences.json index 36afc93..36e60f0 100644 --- a/tap_twilio/schemas/conferences.json +++ b/tap_twilio/schemas/conferences.json @@ -8,6 +8,9 @@ "api_version": { "type": ["null", "string"] }, + "call_sid_ending_conference": { + "type": ["null", "string"] + }, "date_created": { "type": ["null", "string"], "format": "date-time" @@ -22,6 +25,9 @@ "sid": { "type": ["null", "string"] }, + "reason_conference_ended": { + "type": ["null", "string"] + }, "region": { "type": ["null", "string"] }, diff --git a/tap_twilio/schemas/incoming_phone_numbers.json b/tap_twilio/schemas/incoming_phone_numbers.json index 557e8a6..065e25c 100644 --- a/tap_twilio/schemas/incoming_phone_numbers.json +++ b/tap_twilio/schemas/incoming_phone_numbers.json @@ -103,6 +103,9 @@ "emergency_address_sid": { "type": ["null", "string"] }, + "emergency_address_status": { + "type": ["null", "string"] + }, "address_sid": { "type": ["null", "string"] }, diff --git a/tap_twilio/schemas/messages.json b/tap_twilio/schemas/messages.json index 66210c7..d67389e 100644 --- a/tap_twilio/schemas/messages.json +++ b/tap_twilio/schemas/messages.json @@ -48,6 +48,9 @@ "type": ["null", "number"], "multipleOf": 1e-8 }, + "price_unit": { + "type": ["null", "string"] + }, "sid": { "type": ["null", "string"] }, diff --git a/tap_twilio/streams.py b/tap_twilio/streams.py index 4fe6fa7..4ed5a9e 100644 --- a/tap_twilio/streams.py +++ b/tap_twilio/streams.py @@ -25,6 +25,16 @@ 'params': {}, 'pagingation': 'root', 'children': { + 'account_balance': { + 'api_url': 'https://api.twilio.com', + 'api_version': '2010-04-01', + 'path': 'Accounts/{ParentId}/Balance.json', + 'data_key': 'account_balance', + 'sub_resource_key': 'balance', + 'key_properties': ['account_sid'], + 'replication_method': 'FULL_TABLE', + 'params': {} + }, # pylint: disable=line-too-long # Reference: https://www.twilio.com/docs/usage/api/address#read-multiple-address-resources 'addresses': { @@ -146,6 +156,7 @@ 'replication_method': 'INCREMENTAL', # Fetch ALL, filter results 'replication_keys': ['date_updated'], 'params': {}, + 'parent': 'accounts', 'pagingation': 'root' }, # pylint: disable=line-too-long @@ -286,9 +297,8 @@ 'data_key': 'usage_records', 'key_properties': ['account_sid', 'category', 'start_date'], 'replication_method': 'INCREMENTAL', # Filter query - 'replication_keys': ['end_date'], - 'bookmark_query_field_from': 'start_date', # Daily - 'bookmark_query_field_to': 'end_date', + 'replication_keys': ['start_date'], + 'bookmark_query_field_from': 'StartDate', # Daily 'params': {}, 'pagingation': 'root' }, @@ -318,10 +328,11 @@ 'key_properties': ['sid'], 'replication_method': 'INCREMENTAL', # Filter query 'replication_keys': ['date_updated'], - 'bookmark_query_field_from': 'start_date', # Bookmark - 'bookmark_query_field_to': 'end_date', # Current Date + 'bookmark_query_field_from': 'StartDate', # Bookmark + 'bookmark_query_field_to': 'EndDate', 'max_days_ago': 30, 'params': {}, + 'pagination_key': 'next_page_url', 'pagingation': 'meta' } } diff --git a/tap_twilio/sync.py b/tap_twilio/sync.py index e1bddec..026cfdd 100644 --- a/tap_twilio/sync.py +++ b/tap_twilio/sync.py @@ -76,13 +76,16 @@ def process_records(catalog, # pylint: disable=too-many-branches for record in records: # If child object, add parent_id to record if parent_id and parent: - record[parent + '_id'] = parent_id + if parent == 'accounts': + record['account_sid'] = parent_id + else: + record[parent + '_id'] = parent_id # Transform record for Singer.io with Transformer() as transformer: try: transformed_record = transformer.transform( - record, + dict(record), schema, stream_metadata) except Exception as err: @@ -173,7 +176,8 @@ def sync_endpoint( date_window_days=None, parent=None, parent_id=None, - account_sid=None): + account_sid=None, + required_streams=None): static_params = endpoint_config.get('params', {}) bookmark_query_field_from = endpoint_config.get('bookmark_query_field_from') bookmark_query_field_to = endpoint_config.get('bookmark_query_field_to') @@ -202,8 +206,9 @@ def sync_endpoint( params = static_params # adds in endpoint specific, sort, filter params - if bookmark_query_field_from and bookmark_query_field_to: + if bookmark_query_field_from: params[bookmark_query_field_from] = strftime(start_window)[:10] # truncate date + if bookmark_query_field_to: params[bookmark_query_field_to] = strftime(end_window)[:10] # truncate date # pagination: loop thru all pages of data using next (if not None) @@ -248,8 +253,10 @@ def sync_endpoint( break # No data results # Get pagination details - if data.get("next_page_uri"): - next_url = endpoint_config.get("api_url") + data["next_page_uri"] + # Next page url key in API response is different for alerts and remaining streams + next_url_key_in_api_response = endpoint_config.get('pagination_key', 'next_page_uri') + if data.get(next_url_key_in_api_response): + next_url = endpoint_config.get("api_url") + data[next_url_key_in_api_response] else: next_url = None @@ -301,8 +308,9 @@ def sync_endpoint( children = endpoint_config.get('children') if children: for child_stream_name, child_endpoint_config in children.items(): - # will this work if only grandchildren are selected - if child_stream_name in selected_streams: + # Following check will make sure tap extracts the data for all the child streams + # even if the parent isn't selected + if child_stream_name in selected_streams or child_stream_name in required_streams: LOGGER.info('START Syncing: {}'.format(child_stream_name)) write_schema(catalog, child_stream_name) # For each parent record @@ -333,7 +341,7 @@ def sync_endpoint( ParentId=parent_id) elif child_stream_name == 'dependent_phone_numbers': child_path = child_endpoint_config.get('path').format( - ParentId=parent_id, AccountSid=config.get('account_sid')) + ParentId=parent_id, AccountSid=record.get('account_sid')) else: child_path = record.get('_subresource_uris', {}).get( child_endpoint_config.get('sub_resource_key', @@ -355,10 +363,11 @@ def sync_endpoint( selected_streams=selected_streams, # The child endpoint may be an endpoint that needs to window # so we'll re-pull from the config here (or pass in the default) - date_window_days=int(config.get('date_window_days', '30')), + date_window_days=int(config.get('date_window_days') or'30'), parent=child_endpoint_config.get('parent'), parent_id=parent_id, - account_sid=account_sid) + account_sid=account_sid, + required_streams=required_streams) else: LOGGER.info( 'No child stream {} for parent stream {} in subresource uris' @@ -486,7 +495,8 @@ def sync(client, config, catalog, state): endpoint_config=endpoint_config, bookmark_field=bookmark_field, selected_streams=selected_streams, - date_window_days=int(config.get('date_window_days', '30'))) + date_window_days=int(config.get('date_window_days') or '30'), + required_streams=required_streams) update_currently_syncing(state, None) LOGGER.info('FINISHED Syncing: {}, total_records: {}'.format( diff --git a/tests/__init__.py b/tests/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/tests/base.py b/tests/base.py new file mode 100644 index 0000000..9eff419 --- /dev/null +++ b/tests/base.py @@ -0,0 +1,416 @@ +import unittest +import os +from datetime import datetime as dt +from datetime import timedelta +import dateutil.parser +from tap_tester import LOGGER, connections, menagerie, runner + + +class TwilioBaseTest(unittest.TestCase): + """Setup expectations for test sub classes. Metadata describing streams. + + A bunch of shared methods that are used in tap-tester tests. Shared + tap-specific methods (as needed). + """ + + AUTOMATIC_FIELDS = "automatic" + REPLICATION_KEYS = "valid-replication-keys" + PRIMARY_KEYS = "table-key-properties" + REPLICATION_METHOD = "forced-replication-method" + INCREMENTAL = "INCREMENTAL" + FULL_TABLE = "FULL_TABLE" + EXPECTED_PAGE_SIZE = "expected-page-size" + OBEYS_START_DATE = "obey-start-date" + EXPECTED_PARENT_STREAM = "expected-parent-stream" + START_DATE_FORMAT = "%Y-%m-%dT00:00:00Z" + BOOKMARK_DATE_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" + + start_date = "2023-02-01T00:00:00Z" + + # Skipping below streams beacuse they require a paid account to generate data + NO_DATA_STREAMS = {"applications", "conference_participants", "dependent_phone_numbers", "transcriptions", "message_media"} + + # Below stream don't support pagination as we have less data + NON_PAGINATION_STREAMS = {"accounts", "keys", "incoming_phone_numbers", "outgoing_caller_ids", "usage_triggers"} + + # For below streams Twilio's API returns duplicate records and moreover data for below streams gets generated automatically + DUPLICATE_RECORD_STREAMS = {"available_phone_numbers_toll_free", "available_phone_numbers_mobile", "available_phone_numbers_local"} + + @staticmethod + def tap_name(): + """The name of the tap.""" + return "tap-twilio" + + @staticmethod + def get_type(): + """the expected url route ending.""" + return "platform.twilio" + + def get_properties(self, original=True): + """Configuration properties required for the tap.""" + return_value = {"start_date": os.getenv("TWILIO_START_DATE", "2023-01-01T00:00:00Z")} + + if not original: + return_value["start_date"] = self.start_date + + return return_value + + def get_credentials(self): + """Authentication information for the test account.""" + return { + "account_sid": os.getenv("TWILIO_ACCOUNT_SID"), + "auth_token": os.getenv("TWILIO_AUTH_TOKEN"), + } + + def required_environment_variables(self): + return {"TWILIO_ACCOUNT_SID", "TWILIO_AUTH_TOKEN"} + + def expected_metadata(self): + """The expected streams and metadata about the streams.""" + return { + "accounts": { + self.PRIMARY_KEYS: {"sid"}, + self.REPLICATION_METHOD: self.FULL_TABLE, + self.EXPECTED_PAGE_SIZE: 1, + self.OBEYS_START_DATE: False + }, + "account_balance": { + self.PRIMARY_KEYS: {"account_sid"}, + self.REPLICATION_METHOD: self.FULL_TABLE, + self.EXPECTED_PAGE_SIZE: 1, + self.OBEYS_START_DATE: False + }, + "addresses": { + self.PRIMARY_KEYS: {"sid"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"date_updated"}, + self.EXPECTED_PAGE_SIZE: 50, + self.OBEYS_START_DATE: True + }, + "alerts": { + self.PRIMARY_KEYS: {"sid"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"date_updated"}, + self.EXPECTED_PAGE_SIZE: 50, + self.OBEYS_START_DATE: True + }, + "applications": { + self.PRIMARY_KEYS: {"sid"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"date_updated"}, + self.EXPECTED_PAGE_SIZE: 50, + self.OBEYS_START_DATE: True + }, + "available_phone_number_countries": { + self.PRIMARY_KEYS: {"country_code"}, + self.REPLICATION_METHOD: self.FULL_TABLE, + self.EXPECTED_PAGE_SIZE: 50, + self.OBEYS_START_DATE: False + }, + "available_phone_numbers_local": { + self.PRIMARY_KEYS: {"iso_country", "phone_number"}, + self.REPLICATION_METHOD: self.FULL_TABLE, + self.EXPECTED_PAGE_SIZE: 50, + self.OBEYS_START_DATE: False + }, + "available_phone_numbers_mobile": { + self.PRIMARY_KEYS: {"iso_country", "phone_number"}, + self.REPLICATION_METHOD: self.FULL_TABLE, + self.EXPECTED_PAGE_SIZE: 50, + self.OBEYS_START_DATE: False + }, + "available_phone_numbers_toll_free": { + self.PRIMARY_KEYS: {"iso_country", "phone_number"}, + self.REPLICATION_METHOD: self.FULL_TABLE, + self.EXPECTED_PAGE_SIZE: 50, + self.OBEYS_START_DATE: False + }, + "calls": { + self.PRIMARY_KEYS: {"sid"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"date_updated"}, + self.EXPECTED_PAGE_SIZE: 50, + self.OBEYS_START_DATE: True + }, + "conference_participants": { + self.PRIMARY_KEYS: {"uri"}, + self.REPLICATION_METHOD: self.FULL_TABLE, + self.EXPECTED_PAGE_SIZE: 50, + self.OBEYS_START_DATE: False + }, + "conferences": { + self.PRIMARY_KEYS: {"sid"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"date_updated"}, + self.EXPECTED_PAGE_SIZE: 50, + self.OBEYS_START_DATE: True + }, + "dependent_phone_numbers": { + self.PRIMARY_KEYS: {"sid"}, + self.REPLICATION_METHOD: self.FULL_TABLE, + self.EXPECTED_PAGE_SIZE: 50, + self.OBEYS_START_DATE: False + }, + "incoming_phone_numbers": { + self.PRIMARY_KEYS: {"sid"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"date_updated"}, + self.EXPECTED_PAGE_SIZE: 50, + self.OBEYS_START_DATE: True + }, + "keys": { + self.PRIMARY_KEYS: {"sid"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"date_updated"}, + self.EXPECTED_PAGE_SIZE: 50, + self.OBEYS_START_DATE: True + }, + "message_media": { + self.PRIMARY_KEYS: {"sid"}, + self.REPLICATION_METHOD: self.FULL_TABLE, + self.EXPECTED_PAGE_SIZE: 50, + self.OBEYS_START_DATE: False + }, + "messages": { + self.PRIMARY_KEYS: {"sid"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"date_updated"}, + self.EXPECTED_PAGE_SIZE: 50, + self.OBEYS_START_DATE: True + }, + "outgoing_caller_ids": { + self.PRIMARY_KEYS: {"sid"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"date_updated"}, + self.EXPECTED_PAGE_SIZE: 50, + self.OBEYS_START_DATE: True + }, + "queues": { + self.PRIMARY_KEYS: {"sid"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"date_updated"}, + self.EXPECTED_PAGE_SIZE: 50, + self.OBEYS_START_DATE: True + }, + "recordings": { + self.PRIMARY_KEYS: {"sid"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"date_created"}, + self.EXPECTED_PAGE_SIZE: 50, + self.OBEYS_START_DATE: True + }, + "transcriptions": { + self.PRIMARY_KEYS: {"sid"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"date_updated"}, + self.EXPECTED_PAGE_SIZE: 50, + self.OBEYS_START_DATE: True + }, + "usage_records": { + self.PRIMARY_KEYS: {"account_sid", "category", "start_date"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"start_date"}, + self.EXPECTED_PAGE_SIZE: 50, + self.OBEYS_START_DATE: True + }, + "usage_triggers": { + self.PRIMARY_KEYS: {"sid"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"date_updated"}, + self.EXPECTED_PAGE_SIZE: 50, + self.OBEYS_START_DATE: True + } + } + + def expected_streams(self): + """A set of expected stream names.""" + return set(self.expected_metadata().keys()) + + def expected_primary_keys(self): + """return a dictionary with key of table name and value as a set of + primary key fields.""" + return { + table: properties.get(self.PRIMARY_KEYS, set()) for table, properties in self.expected_metadata().items() + } + + def expected_replication_keys(self): + """return a dictionary with key of table name and value as a set of + replication key fields.""" + return { + table: properties.get(self.REPLICATION_KEYS, set()) + for table, properties in self.expected_metadata().items() + } + + def expected_automatic_fields(self): + """return a dictionary with key of table name and value as a set of + automatic fields""" + auto_fields = {} + for k, v in self.expected_metadata().items(): + + auto_fields[k] = ( + v.get(self.PRIMARY_KEYS, set()) | v.get(self.REPLICATION_KEYS, set()) + ) + return auto_fields + + def expected_replication_method(self): + """return a dictionary with key of table name and value of replication + method.""" + return { + table: properties.get(self.REPLICATION_METHOD, None) + for table, properties in self.expected_metadata().items() + } + + def expected_page_limits(self): + """return a dictionary with key of table name and value of expected page size""" + return { + table: properties.get(self.EXPECTED_PAGE_SIZE, set()) + for table, properties in self.expected_metadata().items() + } + + def setUp(self): + missing_envs = [x for x in self.required_environment_variables() if os.getenv(x) is None] + if missing_envs: + raise Exception(f"Missing environment variables, please set {missing_envs}.") + + def run_and_verify_check_mode(self, conn_id): + """Run the tap in check mode and verify it succeeds. This should be ran + prior to field selection and initial sync. + + Return the connection id and found catalogs from menagerie. + """ + # run in check mode + check_job_name = runner.run_check_mode(self, conn_id) + + # verify check exit codes + exit_status = menagerie.get_exit_status(conn_id, check_job_name) + menagerie.verify_check_exit_status(self, exit_status, check_job_name) + + found_catalogs = menagerie.get_catalogs(conn_id) + self.assertGreater(len(found_catalogs), 0, msg=f"unable to locate schemas for connection " f"{conn_id}") + + found_catalog_names = {found_catalog["stream_name"] for found_catalog in found_catalogs} + self.assertSetEqual(self.expected_streams(), found_catalog_names, msg="discovered schemas do not match") + LOGGER.info("discovered schemas are OK") + + return found_catalogs + + def run_and_verify_sync(self, conn_id): + """Run a sync job and make sure it exited properly. + + Return a dictionary with keys of streams synced and values of + records synced for each stream + """ + # Run a sync job using orchestrator + sync_job_name = runner.run_sync_mode(self, conn_id) + + # Verify tap and target exit codes + exit_status = menagerie.get_exit_status(conn_id, sync_job_name) + menagerie.verify_sync_exit_status(self, exit_status, sync_job_name) + + # Verify actual rows were synced + sync_record_count = runner.examine_target_output_file( + self, conn_id, self.expected_streams(), self.expected_primary_keys() + ) + self.assertGreater( + sum(sync_record_count.values()), 0, msg=f"failed to replicate any data:" f" {sync_record_count}" + ) + LOGGER.info(f"total replicated row count: {sum(sync_record_count.values())}") + + return sync_record_count + + def perform_and_verify_table_and_field_selection(self, conn_id, test_catalogs, select_all_fields=True): + """Perform table and field selection based off of the streams to select + set and field selection parameters. + + Verify this results in the expected streams selected and all or + no fields selected for those streams. + """ + + # Select all available fields or select no fields from all testable streams + self.select_all_streams_and_fields(conn_id=conn_id, catalogs=test_catalogs, select_all_fields=select_all_fields) + + catalogs = menagerie.get_catalogs(conn_id) + + # Ensure our selection affects the catalog + expected_selected = [tc.get("stream_name") for tc in test_catalogs] + for cat in catalogs: + catalog_entry = menagerie.get_annotated_schema(conn_id, cat["stream_id"]) + + # Verify all testable streams are selected + selected = catalog_entry.get("annotated-schema").get("selected") + LOGGER.info(f"Validating selection on {cat['stream_name']}: {selected}") + if cat["stream_name"] not in expected_selected: + self.assertFalse(selected, msg="Stream selected, but not testable.") + continue # Skip remaining assertions if we aren't selecting this stream + self.assertTrue(selected, msg="Stream not selected.") + + if select_all_fields: + # Verify all fields within each selected stream are selected + for field, field_props in catalog_entry.get("annotated-schema").get("properties").items(): + field_selected = field_props.get("selected") + LOGGER.info(f"\tValidating selection on {cat['stream_name']}.{field}:" f" {field_selected}") + self.assertTrue(field_selected, msg="Field not selected.") + + else: + # Verify only automatic fields are selected + expected_automatic_fields = self.expected_automatic_fields().get(cat["stream_name"]) + selected_fields = self.get_selected_fields_from_metadata(catalog_entry["metadata"]) + self.assertEqual(expected_automatic_fields, selected_fields) + + @staticmethod + def get_selected_fields_from_metadata(metadata): + selected_fields = set() + for field in metadata: + is_field_metadata = len(field["breadcrumb"]) > 1 + inclusion_automatic_or_selected = ( + field["metadata"]["selected"] is True or field["metadata"]["inclusion"] == "automatic" + ) + if is_field_metadata and inclusion_automatic_or_selected: + selected_fields.add(field["breadcrumb"][1]) + return selected_fields + + @staticmethod + def select_all_streams_and_fields(conn_id, catalogs, select_all_fields: bool = True): + """Select all streams and all fields within streams.""" + for catalog in catalogs: + schema = menagerie.get_annotated_schema(conn_id, catalog["stream_id"]) + + non_selected_properties = [] + if not select_all_fields: + # get a list of all properties so that none are selected + non_selected_properties = schema.get("annotated-schema", {}).get("properties", {}).keys() + + connections.select_catalog_and_fields_via_metadata(conn_id, catalog, schema, [], non_selected_properties) + + def calculated_states_by_stream(self, current_state): + # {stream_name: [days, hours, minutes], ...} + timedelta_by_stream = {stream: [5, 0, 0] for stream in self.expected_streams()} + + stream_to_calculated_state = {stream: "" for stream in current_state["bookmarks"].keys()} + for stream, state in current_state["bookmarks"].items(): + + state_as_datetime = dateutil.parser.parse(state) + + days, hours, minutes = timedelta_by_stream[stream] + calculated_state_as_datetime = state_as_datetime - timedelta(days=days, hours=hours, minutes=minutes) + + state_format = "%Y-%m-%dT00:00:00Z" + calculated_state_formatted = dt.strftime(calculated_state_as_datetime, state_format) + + stream_to_calculated_state[stream] = calculated_state_formatted + + return stream_to_calculated_state + + @staticmethod + def assertIsDateFormat(value, str_format): + """ + Assertion Method that verifies a string value is a formatted datetime with + the specified format. + """ + try: + _ = dt.strptime(value, str_format) + except ValueError as err: + raise AssertionError( + f"Value does not conform to expected format: {str_format}" + ) from err + diff --git a/tests/test_all_fields.py b/tests/test_all_fields.py new file mode 100644 index 0000000..ecf8ec1 --- /dev/null +++ b/tests/test_all_fields.py @@ -0,0 +1,84 @@ +from base import TwilioBaseTest +from tap_tester import LOGGER, connections, menagerie, runner + + +class AllFieldsTest(TwilioBaseTest): + + # Skipping following fields since we are not able to generate test data + MISSING_FIELDS = { + "incoming_phone_numbers": {"voice_receive_mode"} + } + + def name(self): + return "tap_twilio_all_fields_test" + + def test_name(self): + LOGGER.info("All Fields Test for tap-twilio") + + def test_run(self): + """ + - Verify no unexpected streams were replicated. + - Verify that more than just the automatic fields are replicated + for each stream. + - Verify all fields for each stream are + replicated + """ + + # Instantiate connection + conn_id = connections.ensure_connection(self) + + streams_to_test = self.expected_streams() - self.NO_DATA_STREAMS + + # Run check mode + found_catalogs = self.run_and_verify_check_mode(conn_id) + + # table and field selection + test_catalogs_all_fields = [ + catalog for catalog in found_catalogs if catalog.get("tap_stream_id") in streams_to_test + ] + self.perform_and_verify_table_and_field_selection(conn_id, test_catalogs_all_fields, select_all_fields=True) + + # Run sync mode + sync_record_count = self.run_and_verify_sync(conn_id) + sync_records = runner.get_records_from_target_output() + + # Verify no unexpected streams were replicated + self.assertSetEqual(streams_to_test, set(sync_records.keys())) + + # get all fields metadata after performing table and field selection + catalog_all_fields = dict() + for catalog in test_catalogs_all_fields: + stream_id, stream_name = catalog["stream_id"], catalog["stream_name"] + catalog_entry = menagerie.get_annotated_schema(conn_id, stream_id) + fields_from_field_level_md = [ + md_entry["breadcrumb"][1] for md_entry in catalog_entry["metadata"] if md_entry["breadcrumb"] != [] + ] + catalog_all_fields[stream_name] = set(fields_from_field_level_md) + + for stream in streams_to_test: + with self.subTest(stream=stream): + + # expected automatic fields + expected_automatic_fields = self.expected_automatic_fields().get(stream) + + # expected all fields + expected_all_fields = catalog_all_fields[stream] + + # verify that we get some records for each stream + self.assertGreater(sync_record_count.get(stream), 0) + + # collect actual fields + messages = sync_records.get(stream) + actual_all_fields = set() # aggregate across all records + all_record_fields = [ + set(message["data"].keys()) for message in messages["messages"] if message["action"] == "upsert" + ] + for fields in all_record_fields: + actual_all_fields.update(fields) + + # Verify that more than just the automatic fields are replicated for each stream + self.assertTrue(expected_automatic_fields.issubset(actual_all_fields), + msg=f'{expected_automatic_fields-actual_all_fields} is not in "expected_all_keys"') + + # verify all fields for each stream were replicated + self.assertSetEqual(expected_all_fields - self.MISSING_FIELDS.get(stream, set()), actual_all_fields) diff --git a/tests/test_automatic_fields.py b/tests/test_automatic_fields.py new file mode 100644 index 0000000..602840d --- /dev/null +++ b/tests/test_automatic_fields.py @@ -0,0 +1,95 @@ +import json + +from base import TwilioBaseTest +from tap_tester import LOGGER, connections, runner + + + +class AutomaticFieldsTest(TwilioBaseTest): + def name(self): + return "tap_twilio_automatic_fields_test" + + def test_name(self): + LOGGER.info("Automatic Field Test for tap-twilio") + + def test_run(self): + """ + - Verify we can deselect all fields except when inclusion=automatic, + which is handled by base.py methods Verify that only the automatic + fields are sent to the target. + - Verify that all replicated records have unique primary key + values. + """ + # instantiate connection + conn_id = connections.ensure_connection(self) + + streams_to_test = self.expected_streams() - self.NO_DATA_STREAMS - self.DUPLICATE_RECORD_STREAMS + + # run check mode + found_catalogs = self.run_and_verify_check_mode(conn_id) + + # table and field selection + test_catalogs_automatic_fields = [ + catalog for catalog in found_catalogs if catalog.get("stream_name") in streams_to_test + ] + + self.perform_and_verify_table_and_field_selection( + conn_id, test_catalogs_automatic_fields, select_all_fields=False + ) + + # run initial sync + record_count_by_stream = self.run_and_verify_sync(conn_id) + all_messages = runner.get_records_from_target_output() + + for stream in streams_to_test: + with self.subTest(stream=stream): + + # expected values + expected_primary_keys = self.expected_primary_keys() + expected_keys = self.expected_automatic_fields().get(stream) + + # Verify that you get some records for each stream + self.assertGreater( + record_count_by_stream.get(stream, -1), 0, + msg="The number of records is not over the stream max limit", + ) + + # collect actual values + stream_messages = all_messages.get(stream) + record_messages_keys = [ + set(message["data"].keys()) + for message in stream_messages["messages"] + if message["action"] == "upsert" + ] + + # automatically Verify that only the automatic fields are sent to the target + for actual_keys in record_messages_keys: + self.assertSetEqual(expected_keys, actual_keys) + + # Get records + records = [ + message.get("data") + for message in stream_messages.get("messages", []) + if message.get("action") == "upsert" + ] + + # Remove duplicate records + records_pks_list = [ + tuple(message.get(pk) for pk in expected_primary_keys[stream]) + for message in [json.loads(t) for t in {json.dumps(d) for d in records}] + ] + + # Remove duplicate primary keys + records_pks_set = set(records_pks_list) + + # Verify there are no duplicate records + self.assertEqual( + len(records), len(records_pks_set), + msg=f"{stream} contains duplicate records") + + # Verify defined primary key is unique + self.assertEqual( + len(records_pks_set), + len(records_pks_list), + msg=f"{expected_primary_keys} are not unique primary keys for {stream} stream.", + ) diff --git a/tests/test_bookmarks.py b/tests/test_bookmarks.py new file mode 100644 index 0000000..e4540c2 --- /dev/null +++ b/tests/test_bookmarks.py @@ -0,0 +1,168 @@ +from base import TwilioBaseTest +from tap_tester import LOGGER, connections, menagerie, runner + + +class BookmarksTest(TwilioBaseTest): + def name(self): + return "tap_twilio_bookmarks_test" + + def test_name(self): + LOGGER.info("Bookmarks Test for tap-twilio") + + def test_run(self): + """ + - Verify for each incremental stream you can do a sync which records bookmarks + and that the format matches expectations. + - Verify that a bookmark doesn't exist for full table streams. + - Verify the bookmark is the max value sent to the target for the a given replication key. + - Verify 2nd sync respects the bookmark. All data of the 2nd sync is >= the bookmark + from the first sync. The number of records in the 2nd sync is less then the first + """ + # Instantiate connection + conn_id = connections.ensure_connection(self) + + streams_to_test = self.expected_streams() - self.NO_DATA_STREAMS - self.DUPLICATE_RECORD_STREAMS + expected_replication_keys = self.expected_replication_keys() + expected_replication_methods = self.expected_replication_method() + + # Run check mode + found_catalogs = self.run_and_verify_check_mode(conn_id) + + # Table and field selection + test_catalogs_all_fields = [ + catalog for catalog in found_catalogs if catalog.get("tap_stream_id") in streams_to_test + ] + self.perform_and_verify_table_and_field_selection(conn_id, test_catalogs_all_fields, select_all_fields=True) + + ######################## + # Run first sync + ######################## + + first_sync_record_count = self.run_and_verify_sync(conn_id) + first_sync_records = runner.get_records_from_target_output() + first_sync_bookmarks = menagerie.get_state(conn_id) + + ####################### + # Update State between Syncs + ####################### + + new_state = {"bookmarks": dict()} + simulated_states = self.calculated_states_by_stream(first_sync_bookmarks) + + for stream, updated_state in simulated_states.items(): + new_state["bookmarks"][stream] = updated_state + menagerie.set_state(conn_id, new_state) + + ####################### + # Run Second sync + ####################### + + second_sync_record_count = self.run_and_verify_sync(conn_id) + second_sync_records = runner.get_records_from_target_output() + second_sync_bookmarks = menagerie.get_state(conn_id) + + ######################## + # Test by Stream + ######################## + + # Verify currently syncing is set to None after successful sync + self.assertNotIn("currently_syncing", first_sync_bookmarks) + self.assertNotIn("currently_syncing", second_sync_bookmarks) + + for stream in streams_to_test: + with self.subTest(stream=stream): + + # Expected values + expected_replication_method = expected_replication_methods[stream] + + # Information required for assetions from sync 1 & 2 based on expected values + first_sync_count = first_sync_record_count.get(stream, 0) + second_sync_count = second_sync_record_count.get(stream, 0) + + # Verify at least 1 record was replicated in the second sync + self.assertGreater(second_sync_count, 0, msg=f"We are not fully testing bookmarking for {stream}") + + first_sync_messages = [ + record.get("data") + for record in first_sync_records.get(stream, {}).get("messages", []) + if record.get("action") == "upsert" + ] + second_sync_messages = [ + record.get("data") + for record in second_sync_records.get(stream, {}).get("messages", []) + if record.get("action") == "upsert" + ] + first_bookmark_value = first_sync_bookmarks.get("bookmarks", {stream: None}).get(stream) + second_bookmark_value = second_sync_bookmarks.get("bookmarks", {stream: None}).get(stream) + + if expected_replication_method == self.INCREMENTAL: + + # Collect information specific to incremental streams from sync 1 & 2 + replication_key = next(iter(expected_replication_keys[stream])) + simulated_bookmark = new_state["bookmarks"][stream] + + # Verify the bookmark incremental stremas is not None + self.assertIsNotNone(first_bookmark_value) + self.assertIsNotNone(second_bookmark_value) + + # Verify the bookmark value is of type str + self.assertIsInstance(first_bookmark_value, str) + self.assertIsInstance(second_bookmark_value, str) + + # Verify the bookmark has expected DATE_FORMAT + self.assertIsDateFormat(first_bookmark_value, self.BOOKMARK_DATE_FORMAT) + self.assertIsDateFormat(second_bookmark_value, self.BOOKMARK_DATE_FORMAT) + + # Verify the 2nd bookmark is equal to 1st sync bookmark + self.assertEqual(first_bookmark_value, second_bookmark_value) + + for record in first_sync_messages: + replication_key_value = record.get(replication_key) + # Verify 1st sync bookmark value is the max replication key value for a given stream + self.assertLessEqual( + replication_key_value, + first_bookmark_value, + msg="First sync bookmark was set incorrectly, a record with a greater replication key value was synced", + ) + + for record in second_sync_messages: + replication_key_value = record.get(replication_key) + # Verify the 2nd sync replication key value is greater or equal to the 1st sync bookmarks + self.assertGreaterEqual( + replication_key_value, + simulated_bookmark, + msg="Second sync records do not respect the previous bookmark", + ) + # Verify the 2nd sync bookmark value is the max replication key value for a given stream + self.assertLessEqual( + replication_key_value, + second_bookmark_value, + msg="Second sync bookmark was set incorrectly, a record with a greater replication key value was synced", + ) + + # Verify that we get less data in the 2nd sync + self.assertLessEqual( + second_sync_count, + first_sync_count, + msg="Second sync does not have less records, bookmark usage not verified", + ) + + elif expected_replication_method == self.FULL_TABLE: + + # Verify the syncs do not set a bookmark for full table streams + self.assertIsNone(first_bookmark_value) + self.assertIsNone(second_bookmark_value) + + # Verify the number of records in the second sync is the same as the first + self.assertEqual(second_sync_count, first_sync_count) + + # Verify both syncs have same records + for record in first_sync_messages: + self.assertIn(record, second_sync_messages) + + else: + raise NotImplementedError( + "INVALID EXPECTATIONS\t\tSTREAM: {} REPLICATION_METHOD: {}".format( + stream, expected_replication_method + ) + ) diff --git a/tests/test_configuration.py b/tests/test_configuration.py deleted file mode 100644 index 3dea9e4..0000000 --- a/tests/test_configuration.py +++ /dev/null @@ -1,41 +0,0 @@ -def config(): - return { - "test_name": "test_sync", - "tap_name": "tap-twilio", - "type": "platform.twilio", - "properties": { - "account_sid": "TWILIO_ACCOUNT_SID", - }, - "credentials": { - "auth_token": "TWILIO_AUTH_TOKEN" - }, - "bookmark": { - "bookmark_key": "addresses", - "bookmark_timestamp": "2023-03-09T05:26:26.000000Z" - }, - "streams": { - "accounts": {"sid"}, - "addresses": {"sid"}, - "dependent_phone_numbers": {"sid"}, - "applications": {"sid"}, - "available_phone_number_countries": {"country_code"}, - "available_phone_numbers_local": {"iso_country", "phone_number"}, - "available_phone_numbers_mobile": {"iso_country", "phone_number"}, - "available_phone_numbers_toll_free": {"iso_country", "phone_number"}, - "incoming_phone_numbers": {"sid"}, - "keys": {"sid"}, - "calls": {"sid"}, - "conferences": {"sid"}, - "conference_participants": {"uri"}, - "outgoing_caller_ids": {"sid"}, - "recordings": {"sid"}, - "transcriptions": {"sid"}, - "queues": {"sid"}, - "messages": {"sid"}, - "message_media": {"sid"}, - "usage_records": {"account_sid", "category", "start_date"}, - "usage_triggers": {"sid"}, - "alerts": {"sid"} - }, - "exclude_streams": [] - } diff --git a/tests/test_discovery.py b/tests/test_discovery.py new file mode 100644 index 0000000..251d600 --- /dev/null +++ b/tests/test_discovery.py @@ -0,0 +1,141 @@ +"""Test tap discovery mode and metadata.""" +import re + +from base import TwilioBaseTest +from tap_tester import LOGGER, connections, menagerie + + +class DiscoveryTest(TwilioBaseTest): + """Test tap discovery mode and metadata conforms to standards.""" + + @staticmethod + def name(): + return "tap_twilio_discovery_test" + + def test_name(self): + LOGGER.info("Discovery Test for tap-twilio") + + def test_run(self): + """Testing that discovery creates the appropriate catalog with valid + metadata. + + - Verify number of actual streams discovered match expected. + - Verify the stream names discovered were what we expect. + - Verify stream names follow naming convention streams should only have lowercase alphas and + underscores. + - Verify there is only 1 top level breadcrumb. + - Verify replication key(s). + - Verify primary key(s). + - Verify that if there is a replication key we are doing INCREMENTAL otherwise FULL. + - Verify the actual replication matches our expected replication method. + - Verify that primary, replication keys are given the inclusion of automatic. + - Verify that all other fields have inclusion of available metadata. + """ + + conn_id = connections.ensure_connection(self) + + streams_to_test = self.expected_streams() + + found_catalogs = self.run_and_verify_check_mode(conn_id) + + LOGGER.info("Established Connection to Twilio") + + # Verify stream names follow naming convention + # streams should only have lowercase alphas and underscores + + found_catalog_names = {c["tap_stream_id"] for c in found_catalogs} + self.assertTrue( + all([re.fullmatch(r"[a-z_]+", name) for name in found_catalog_names]), + msg="One or more streams don't follow standard naming", + ) + + for stream in streams_to_test: + with self.subTest(stream=stream): + + # Verify the catalog is found for a given stream + catalog = next(iter([catalog for catalog in found_catalogs if catalog["stream_name"] == stream])) + self.assertIsNotNone(catalog) + + # collecting expected values + expected_primary_keys = self.expected_primary_keys()[stream] + expected_replication_keys = self.expected_replication_keys()[stream] + expected_automatic_fields = self.expected_automatic_fields()[stream] + expected_replication_method = self.expected_replication_method()[stream] + + # collecting actual values + schema_and_metadata = menagerie.get_annotated_schema(conn_id, catalog["stream_id"]) + metadata = schema_and_metadata["metadata"] + stream_properties = [item for item in metadata if item.get("breadcrumb") == []] + actual_primary_keys = set( + stream_properties[0].get("metadata", {self.PRIMARY_KEYS: []}).get(self.PRIMARY_KEYS, []) + ) + actual_replication_keys = set( + stream_properties[0].get("metadata", {self.REPLICATION_KEYS: []}).get(self.REPLICATION_KEYS, []) + ) + actual_replication_method = ( + stream_properties[0].get("metadata", {self.REPLICATION_METHOD: None}).get(self.REPLICATION_METHOD) + ) + actual_automatic_fields = { + item.get("breadcrumb", ["properties", None])[1] + for item in metadata + if item.get("metadata").get("inclusion") == "automatic" + } + + ########################################################################## + # metadata assertions + ########################################################################## + + # verify there is only 1 top level breadcrumb in metadata + self.assertTrue( + len(stream_properties) == 1, + msg=f"There is NOT only one top level breadcrumb for {stream}" + + f"\nstream_properties | {stream_properties}", + ) + + # verify primary key(s) + self.assertSetEqual( + expected_primary_keys, + actual_primary_keys, + msg=f"expected primary keys is {expected_primary_keys} " + f"but actual primary keys is {actual_primary_keys}", + ) + + # verify replication method + self.assertEqual( + expected_replication_method, + actual_replication_method, + msg=f"expected replication method is {expected_replication_method}" + f" but actual replication method is {actual_replication_method}", + ) + + # Verify replication key is present for any stream with replication + # method = INCREMENTAL + if actual_replication_method == "INCREMENTAL": + # verify replication key(s) + self.assertEqual( + expected_replication_keys, + actual_replication_keys, + msg=f"expected replication key is {expected_replication_keys}" + f" but actual replication key is {actual_replication_keys}", + ) + else: + self.assertEqual(actual_replication_keys, set()) + + # verify the stream is given the inclusion of available + self.assertEqual(catalog["metadata"]["inclusion"], "available", msg=f"{stream} cannot be selected") + + # verify the primary, replication keys are given the inclusions of automatic + self.assertSetEqual(expected_automatic_fields, actual_automatic_fields) + + # verify all other fields are given inclusion of available + self.assertTrue( + all( + { + item.get("metadata").get("inclusion") == "available" + for item in metadata + if item.get("breadcrumb", []) != [] + and item.get("breadcrumb", ["properties", None])[1] not in actual_automatic_fields + } + ), + msg="Not all non key properties are set to available in metadata", + ) diff --git a/tests/test_pagination.py b/tests/test_pagination.py new file mode 100644 index 0000000..de97ae2 --- /dev/null +++ b/tests/test_pagination.py @@ -0,0 +1,72 @@ +from math import ceil +from base import TwilioBaseTest +from tap_tester import LOGGER, connections, runner + + +class PaginationTest(TwilioBaseTest): + def name(self): + return "tap_twilio_pagination_test" + + def test_name(self): + LOGGER.info("Pagination Test for tap-twilio") + + def test_run(self): + + # instantiate connection + conn_id = connections.ensure_connection(self) + + streams_to_test = self.expected_streams() - self.DUPLICATE_RECORD_STREAMS + + # Run check mode + found_catalogs = self.run_and_verify_check_mode(conn_id) + + # table and field selection + test_catalogs_all_fields = [ + catalog for catalog in found_catalogs if catalog.get("tap_stream_id") in streams_to_test + ] + self.perform_and_verify_table_and_field_selection(conn_id, test_catalogs_all_fields, select_all_fields=True) + + # Run sync mode + sync_record_count = self.run_and_verify_sync(conn_id) + sync_records = runner.get_records_from_target_output() + + # Test by stream + for stream in streams_to_test: + with self.subTest(stream=stream): + + record_count = sync_record_count.get(stream, 0) + + sync_messages = sync_records.get(stream, {"messages": []}).get("messages") + + primary_keys = self.expected_primary_keys().get(stream) + + stream_page_size = self.expected_page_limits()[stream] + if stream not in self.NO_DATA_STREAMS.union(self.NON_PAGINATION_STREAMS): + self.assertLessEqual(stream_page_size, record_count) + + # Verify there are no duplicates across pages + records_pks_list = [ + tuple(message.get("data").get(primary_key) for primary_key in primary_keys) + for message in sync_messages + ] + + self.assertCountEqual(set(records_pks_list), records_pks_list, msg=f"We have duplicate records for {stream}") + + # Chunk the replicated records (just primary keys) into expected pages + pages = [] + page_count = ceil(len(records_pks_list) / stream_page_size) + for page_index in range(page_count): + page_start = page_index * stream_page_size + page_end = (page_index + 1) * stream_page_size + pages.append(set(records_pks_list[page_start:page_end])) + + # Verify by primary keys that data is unique for each page + for current_index, current_page in enumerate(pages): + with self.subTest(current_page_primary_keys=current_page): + + for other_index, other_page in enumerate(pages): + # don't compare the page to itself + if current_index == other_index: + continue + + self.assertTrue(current_page.isdisjoint(other_page), msg=f'other_page_primary_keys={other_page}') diff --git a/tests/test_start_date.py b/tests/test_start_date.py new file mode 100644 index 0000000..ca5be42 --- /dev/null +++ b/tests/test_start_date.py @@ -0,0 +1,137 @@ +from base import TwilioBaseTest +from tap_tester import connections, runner, LOGGER + + +class StartDateTest(TwilioBaseTest): + + # Creating variables to store two different start dates + + start_date_1 = "" + start_date_2 = "" + + def name(self): + return "tap_twilio_start_date_test" + + def test_run(self): + """ + - Test that the start_date configuration is respected + - Verify that a sync with a later start date has at least one record synced + and less records than the 1st sync with a previous start date + - Verify that each stream has less records than the earlier start date sync + - Verify all data from later start data has bookmark values >= start_date + - Verify that the minimum bookmark sent to the target for the later start_date sync + is greater than or equal to the start date + - Verify by primary key values, that all records in the 1st sync are included in the 2nd sync. + """ + + self.start_date_1 = self.get_properties() + self.start_date_2 = '2023-02-12T00:00:00Z' + self.START_DATE = self.start_date_1 + + ########################################################################## + ### First Sync + ########################################################################## + + # Instantiate connection + conn_id_1 = connections.ensure_connection(self) + + streams_to_test = self.expected_streams() - self.NO_DATA_STREAMS + + # Run check mode + found_catalogs_1 = self.run_and_verify_check_mode(conn_id_1) + + # Table and field selection + test_catalogs_1_all_fields = [catalog for catalog in found_catalogs_1 + if catalog.get('tap_stream_id') in streams_to_test] + self.perform_and_verify_table_and_field_selection( + conn_id_1, test_catalogs_1_all_fields, select_all_fields=True) + + # Run initial sync + record_count_by_stream_1 = self.run_and_verify_sync(conn_id_1) + synced_records_1 = runner.get_records_from_target_output() + + ########################################################################## + ### Update START DATE Between Syncs + ########################################################################## + + LOGGER.info(f"REPLICATION START DATE CHANGE: {self.START_DATE} ===>>> {self.start_date_2}") + self.START_DATE = self.start_date_2 + + ########################################################################## + ### Second Sync + ########################################################################## + + # Create a new connection with the new start_date + conn_id_2 = connections.ensure_connection(self, original_properties=False) + + # Run check mode + found_catalogs_2 = self.run_and_verify_check_mode(conn_id_2) + + # Table and field selection + test_catalogs_2_all_fields = [catalog for catalog in found_catalogs_2 + if catalog.get('tap_stream_id') in streams_to_test] + self.perform_and_verify_table_and_field_selection( + conn_id_2, test_catalogs_2_all_fields, select_all_fields=True) + + # Run sync + record_count_by_stream_2 = self.run_and_verify_sync(conn_id_2) + synced_records_2 = runner.get_records_from_target_output() + + for stream in streams_to_test: + with self.subTest(stream=stream): + + # Expected values + expected_primary_keys = self.expected_primary_keys()[stream] + expected_start_date_1 = self.start_date_1['start_date'] + expected_start_date_2 = self.start_date_2 + + # All the streams obey the start date + # Collect information for assertions from sync 1 and sync 2 based on expected values + record_count_sync_1 = record_count_by_stream_1.get(stream, 0) + record_count_sync_2 = record_count_by_stream_2.get(stream, 0) + + # Verify both the syncs have extracted atleast one record + self.assertGreater(record_count_sync_1, 0, msg="First sync should sync at least 1 record for testing") + self.assertGreater(record_count_sync_2, 0, msg="Second sync should sync at least 1 record for testing") + + if self.expected_replication_method()[stream] == self.INCREMENTAL: + + expected_replication_keys = self.expected_replication_keys()[stream] + primary_keys_list_1 = [tuple(message.get('data').get(expected_pk) for expected_pk in expected_primary_keys) + for message in synced_records_1.get(stream).get('messages') + if message.get('action') == 'upsert'] + primary_keys_list_2 = [tuple(message.get('data').get(expected_pk) for expected_pk in expected_primary_keys) + for message in synced_records_2.get(stream, {'messages': []}).get('messages') + if message.get('action') == 'upsert'] + + primary_keys_sync_1 = set(primary_keys_list_1) + primary_keys_sync_2 = set(primary_keys_list_2) + + replication_key_sync_1 = [message.get('data').get(expected_rk) for expected_rk in expected_replication_keys + for message in synced_records_1.get(stream).get('messages') + if message.get('action') == 'upsert'] + replication_key_sync_2 = [message.get('data').get(expected_rk) for expected_rk in expected_replication_keys + for message in synced_records_2.get(stream, {'messages': []}).get('messages') + if message.get('action') == 'upsert'] + + replication_key_sync_1 = list(replication_key_sync_1) + replication_key_sync_2 = list(replication_key_sync_2) + + # Verify the number of records replicated in sync 1 is greater than the number + # of records replicated in sync 2 + self.assertGreaterEqual(record_count_sync_1, record_count_sync_2) + + # Verify the records replicated in sync 2 were also replicated in sync 1 + self.assertTrue(primary_keys_sync_2.issubset(primary_keys_sync_1)) + + # Verify that the replication keys in sync 1 are greater than or equal to start_date_1 + for replication_key_value in replication_key_sync_1: + self.assertGreaterEqual(replication_key_value, expected_start_date_1) + + # Verify that the replication keys in sync 2 are greater than or equal to start_date_2 + for replication_key_value in replication_key_sync_2: + self.assertGreaterEqual(replication_key_value, expected_start_date_2) + + else: + self.assertEquals(record_count_sync_1, record_count_sync_1, + msg="FULL_TABLE streams should extract less records for later start date.") diff --git a/tests/test_sync.py b/tests/test_sync.py deleted file mode 100644 index 3a4abe1..0000000 --- a/tests/test_sync.py +++ /dev/null @@ -1,117 +0,0 @@ -""" -Test tap combined -""" - -import unittest -from datetime import datetime, timedelta -import os -from test_configuration import config -from tap_tester import menagerie -import tap_tester.runner as runner -import tap_tester.connections as connections - -configuration = config() - -class TestSyncNonReportStreams(unittest.TestCase): - """ Test the non-report streams """ - - def name(self): - return configuration['test_name'] - - def tap_name(self): - """The name of the tap""" - return configuration['tap_name'] - - def get_type(self): - """the expected url route ending""" - return configuration['type'] - - def expected_check_streams(self): - return set(configuration['streams'].keys()) - - def expected_sync_streams(self): - return set(configuration['streams'].keys()) - - def expected_pks(self): - return configuration['streams'] - - def get_properties(self): - """Configuration properties required for the tap.""" - properties_dict = {} - props = configuration['properties'] - for prop in props: - properties_dict[prop] = os.getenv(props[prop]) - properties_dict["start_date"] = "2023-01-01T00:00:00Z" - return properties_dict - - def get_credentials(self): - """Authentication information for the test account. Username is expected as a property.""" - credentials_dict = {} - creds = configuration['credentials'] - for cred in creds: - credentials_dict[cred] = os.getenv(creds[cred]) - - return credentials_dict - - def setUp(self): - missing_envs = [] - props = configuration['properties'] - creds = configuration['credentials'] - - for prop in props: - if os.getenv(props[prop]) is None: - missing_envs.append(prop) - for cred in creds: - if os.getenv(creds[cred]) is None: - missing_envs.append(cred) - - if len(missing_envs) != 0: - raise Exception("set " + ", ".join(missing_envs)) - - def test_run(self): - - conn_id = connections.ensure_connection(self, payload_hook=None) - - # Run the tap in check mode - check_job_name = runner.run_check_mode(self, conn_id) - - # Verify the check's exit status - exit_status = menagerie.get_exit_status(conn_id, check_job_name) - menagerie.verify_check_exit_status(self, exit_status, check_job_name) - - # Verify that there are catalogs found - found_catalogs = menagerie.get_catalogs(conn_id) - self.assertGreater(len(found_catalogs), 0, msg="unable to locate schemas for connection {}".format(conn_id)) - - found_catalog_names = set(map(lambda c: c['tap_stream_id'], found_catalogs)) - subset = self.expected_check_streams().issubset(found_catalog_names) - self.assertTrue(subset, msg="Expected check streams are not subset of discovered catalog") - # - # # Select some catalogs - our_catalogs = [c for c in found_catalogs if c.get('tap_stream_id') in self.expected_sync_streams()] - for catalog in our_catalogs: - schema = menagerie.get_annotated_schema(conn_id, catalog['stream_id']) - connections.select_catalog_and_fields_via_metadata(conn_id, catalog, schema, [], []) - - # # Verify that all streams sync at least one row for initial sync - # # This test is also verifying access token expiration handling. If test fails with - # # authentication error, refresh token was not replaced after expiring. - menagerie.set_state(conn_id, {}) - sync_job_name = runner.run_sync_mode(self, conn_id) - - # # Verify tap and target exit codes - exit_status = menagerie.get_exit_status(conn_id, sync_job_name) - menagerie.verify_sync_exit_status(self, exit_status, sync_job_name) - record_count_by_stream = runner.examine_target_output_file(self, conn_id, self.expected_sync_streams(), - self.expected_pks()) - zero_count_streams = {k for k, v in record_count_by_stream.items() if v == 0} - self.assertFalse(zero_count_streams, - msg="The following streams did not sync any rows {}".format(zero_count_streams)) - - # # Verify that bookmark values are correct after incremental sync - bookmark_props = configuration['bookmark'] - current_state = menagerie.get_state(conn_id) - test_bookmark = current_state['bookmarks'][bookmark_props['bookmark_key']] - print(test_bookmark) - self.assertTrue(test_bookmark == bookmark_props['bookmark_timestamp'], - msg="The bookmark value does not match the expected result") \ No newline at end of file