Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Copy incoming files to landing S3 bucket for Prefect flows #278

Merged
merged 8 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions aodncore/bin/upload_to_s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#!/usr/bin/env python

import argparse
import logging

from aodncore.util.aws import upload_to_s3

if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Upload a file to an AWS S3 bucket.')
parser.add_argument('local_file', type=str, help='Path to the local file')
parser.add_argument('-b', '--bucket', type=str, help='Name of the S3 bucket',
default="aodn-dataflow-dev")
parser.add_argument('-k', '--s3-key', type=str,
help='Key in the S3 bucket (default to local path')
parser.add_argument('-v', '--verbose', action='store_true', help='Enable verbose logging')

args = parser.parse_args()

if not args.s3_key:
args.s3_key = args.local_file

if args.verbose:
logging.basicConfig(level=logging.DEBUG)
logging.getLogger('boto3').setLevel(logging.DEBUG)
logging.getLogger('botocore').setLevel(logging.DEBUG)

upload_to_s3(args.local_file, args.bucket, key=args.s3_key)
3 changes: 2 additions & 1 deletion aodncore/pipeline/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@
'upload_uri': {'type': 'string'},
'wfs_url': {'type': 'string'},
'wfs_version': {'type': 'string'},
'wip_dir': {'type': 'string'}
'wip_dir': {'type': 'string'},
'landing_bucket': {'type': 'string'}
},
'required': ['admin_recipients', 'archive_uri', 'error_uri', 'processing_dir', 'upload_uri', 'wip_dir'],
'additionalProperties': False
Expand Down
37 changes: 34 additions & 3 deletions aodncore/pipeline/watch.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from .storage import get_storage_broker
from ..util import (ensure_regex_list, format_exception, lazyproperty, mkdir_p, rm_f, rm_r, validate_dir_writable,
validate_file_writable, validate_membership)
from ..util.aws import upload_to_s3

# OS X test compatibility, due to absence of pyinotify (which is specific to the Linux kernel)
try:
Expand Down Expand Up @@ -185,6 +186,8 @@ def __init__(self):
def run(self, incoming_file):
try:
logging.config.dictConfig(config.get_worker_logging_config(task_name))
logging.getLogger('boto3').setLevel(logging.DEBUG)
logging.getLogger('botocore').setLevel(logging.DEBUG)
logging_extra = {
'celery_task_id': self.request.id,
'celery_task_name': task_name
Expand All @@ -198,13 +201,18 @@ def run(self, incoming_file):
"{self.__class__.__name__}.error_exit_policies -> "
"{policies}".format(self=self, policies=[p.name for p in error_exit_policies]))

self.logger.sysinfo(
f"{self.__class__.__name__}.kwargs -> {kwargs}"
)

file_state_manager = IncomingFileStateManager(input_file=incoming_file,
pipeline_name=pipeline_name,
config=config,
logger=self.logger,
celery_request=self.request,
error_exit_policies=error_exit_policies,
success_exit_policies=success_exit_policies)
success_exit_policies=success_exit_policies,
params=kwargs)

file_state_manager.move_to_processing()

Expand Down Expand Up @@ -386,7 +394,7 @@ class IncomingFileStateManager(object):
'trigger': 'move_to_processing',
'source': 'FILE_IN_INCOMING',
'dest': 'FILE_IN_PROCESSING',
'before': ['_pre_processing_checks', '_move_to_processing']
'before': ['_pre_processing_checks', '_copy_to_landing', '_move_to_processing']
},
{
'trigger': 'move_to_error',
Expand All @@ -404,7 +412,7 @@ class IncomingFileStateManager(object):
]

def __init__(self, input_file, pipeline_name, config, logger, celery_request, error_exit_policies=None,
success_exit_policies=None, error_broker=None):
success_exit_policies=None, error_broker=None, params=None):
self.input_file = input_file
self.pipeline_name = pipeline_name
self.config = config
Expand All @@ -413,6 +421,12 @@ def __init__(self, input_file, pipeline_name, config, logger, celery_request, er
self.error_exit_policies = error_exit_policies or []
self.success_exit_policies = success_exit_policies or []
self._error_broker = error_broker
if params and "custom_params" in params:
self.custom_params = params["custom_params"]
self.copy_to_landing_enabled = self.custom_params.get("copy_to_landing_bucket", False)
self.landing_prefix = self.custom_params.get("landing_prefix", "")
else:
self.copy_to_landing_enabled = False

self._machine = Machine(model=self, states=self.states, initial='FILE_IN_INCOMING', auto_transitions=False,
transitions=self.transitions, after_state_change='_after_state_change')
Expand Down Expand Up @@ -461,6 +475,10 @@ def error_name(self):
def error_uri(self):
return os.path.join(self.config.pipeline_config['global']['error_uri'], self.pipeline_name)

@property
def landing_bucket(self):
return self.config.pipeline_config['global'].get('landing_bucket')

def _after_state_change(self):
self._log_state()

Expand All @@ -477,6 +495,19 @@ def _pre_processing_checks(self):
self.logger.exception('exception occurred initialising IncomingFileStateManager')
raise

def _copy_to_landing(self):
# skip this step all together if not configured to copy to landing
if not self.landing_bucket or not self.copy_to_landing_enabled:
return

self.logger.info(
f"{self.__class__.__name__}.copy_to_landing -> 's3://{self.landing_bucket}/{self.landing_prefix}'")
self.logger.sysinfo(f"Uploading {self.input_file} to 's3://{self.landing_bucket}/{self.landing_prefix}'")
try:
upload_to_s3(self.input_file, self.landing_bucket, self.landing_prefix, self.basename)
except Exception as e:
self.logger.warning(f"Failed to upload file to s3://{self.landing_bucket}/{self.landing_prefix}: {e}")

def _move_to_processing(self):
self.logger.info("{self.__class__.__name__}.move_to_processing -> '{self.processing_path}'".format(self=self))
safe_move_file(self.input_file, self.processing_path)
Expand Down
3 changes: 2 additions & 1 deletion aodncore/testlib/conf/pipeline.conf
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
"upload_uri": "file:///tmp/probably/doesnt/exist/upload",
"wfs_url": "http://geoserver.example.com/geoserver/wfs",
"wfs_version": "1.1.0",
"wip_dir": "/tmp/probably/doesnt/exist/wip"
"wip_dir": "/tmp/probably/doesnt/exist/wip",
"landing_bucket": "probably-doesnt-exist-landing-bucket"
},
"logging": {
"level": "SYSINFO",
Expand Down
38 changes: 38 additions & 0 deletions aodncore/util/aws.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import os
from typing import Optional

import boto3
from botocore.exceptions import NoCredentialsError


def upload_to_s3(local_file: str,
bucket_name: str,
bucket_prefix: Optional[str] = None,
key: Optional[str] = None,
aws_profile: Optional[str] = None) -> None:
"""
Upload a file to an S3 bucket

:param local_file: Full path to local file
:param bucket_name: Bucket to upload to
:param bucket_prefix: Prefix to add to key
:param key: Path in bucket to upload to (defaults to local file name)
:param aws_profile: AWS profile to use
:return: None
"""

if not key:
key = os.path.basename(local_file)

if bucket_prefix:
key = f"{bucket_prefix}/{key}"

session = boto3.session.Session(profile_name=aws_profile)
s3 = session.client('s3')
try:
s3.upload_file(local_file, bucket_name, key)
print(f"Upload Successful: {local_file} to {bucket_name}/{key}")
except FileNotFoundError:
print(f"The file {local_file} was not found")
except NoCredentialsError:
print("Credentials not available")
4 changes: 2 additions & 2 deletions test_aodncore/pipeline/test_configlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
'upload_uri': 'file:///tmp/probably/doesnt/exist/upload',
'wfs_url': 'http://geoserver.example.com/geoserver/wfs',
'wfs_version': '1.1.0',
'wip_dir': '/tmp/probably/doesnt/exist/wip'

'wip_dir': '/tmp/probably/doesnt/exist/wip',
'landing_bucket': 'probably-doesnt-exist-landing-bucket'
},
'logging': {
'level': 'SYSINFO',
Expand Down
Loading