Skip to content

Commit

Permalink
fix-ri-sp-without-crawler (#792)
Browse files Browse the repository at this point in the history
  • Loading branch information
iakov-aws authored Apr 23, 2024
1 parent 2143394 commit 05f179e
Showing 1 changed file with 24 additions and 13 deletions.
37 changes: 24 additions & 13 deletions cid/helpers/cur.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,25 +117,36 @@ def ensure_column(self, column: str, column_type: str=None):
if column in [col.get('Name', '').lower() for col in self.metadata.get('Columns', [])]:
return

table_can_be_updated = False
# Check Crawler Behavior - if it has a Configuration/CrawlerOutput/TablesAddOrUpdateBehavior != MergeNewColumns, it will override columns
crawler_name = self.metadata.get('Parameters', {}).get('UPDATED_BY_CRAWLER')
if crawler_name:
# Check Crawler Behavior - if it does not have a Configuration/CrawlerOutput/TablesAddOrUpdateBehavior == MergeNewColumns, it will override columns
try:
crawler = self.glue.get_crawler(crawler_name)
config = json.loads(crawler.get('Configuration', '{}'))
add_or_update = config.get('CrawlerOutput', {}).get('Tables', {}).get('AddOrUpdateBehavior')
if add_or_update == 'MergeNewColumns':
table_can_be_updated = True
except self.glue.client.exceptions.ClientError as exc:
raise CidCritical(f'Column {column} is not found in CUR ({self.table_name}). And we were unable to check if crawler {crawler_name} is configured to override columns.') from exc
config = json.loads(crawler.get('Configuration', '{}'))
add_or_update = config.get('CrawlerOutput', {}).get('Tables', {}).get('AddOrUpdateBehavior')
if add_or_update != 'MergeNewColumns':
raise CidCritical(f'Column {column} is not found in CUR ({self.table_name}). And we were unable to add it as crawler {crawler_name} is configured to override columns. Change crawler settings and run again.')
logger.debug('Failed to get crawler info')
if table_can_be_updated:
column_type = column_type or self.get_type_of_column(column)
try:
self.athena.query(f'ALTER TABLE {self.table_name} ADD COLUMNS ({column} {column_type})')
except (self.athena.client.exceptions.ClientError, CidCritical) as exc:
raise CidCritical(f'Column {column} is not found in CUR and we were unable to add it. Please check FAQ.') from exc
self._metadata = self.athena.get_table_metadata(self.table_name) # refresh table metadata
logger.critical(f"Column '{column}' was added to CUR ({self.table_name}). Please make sure crawler do not override that columns. Crawler='{crawler_name}'")
return

# if table cannot be updated, check if it is ri/sp case - let's hope dashboard views can handle it:
if column in self.sp_required_columns + self.ri_required_columns:
logger.warn(f"Column '{column}' is not present in ({self.table_name}). Will try to continue without.")
return

# if a required column is not there and not ri/sp -> stop
logger.critical(f"Column '{column}' is not in ({self.table_name}). Cannot continue")

column_type = column_type or self.get_type_of_column(column)
try:
self.athena.query(f'ALTER TABLE {self.table_name} ADD COLUMNS ({column} {column_type})')
except (self.athena.client.exceptions.ClientError, CidCritical) as exc:
raise CidCritical(f'Column {column} is not found in CUR and we were unable to add it. Please check FAQ.') from exc
self._metadata = self.athena.get_table_metadata(self.table_name) # refresh table metadata
logger.critical(f"Column '{column}' was added to CUR ({self.table_name}). Please make sure crawler do not override that columns. Crawler='{crawler_name}'")

def table_is_cur(self, table: dict=None, name: str=None, return_reason: bool=False) -> bool:
""" return True if table metadata fits CUR definition. """
Expand Down

0 comments on commit 05f179e

Please sign in to comment.