Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AWS-S3] Add a timestamp filter for s3 polling mode #41232

Closed
kaiyan-sheng opened this issue Oct 14, 2024 · 11 comments · Fixed by #41817
Closed

[AWS-S3] Add a timestamp filter for s3 polling mode #41232

kaiyan-sheng opened this issue Oct 14, 2024 · 11 comments · Fixed by #41817
Assignees
Labels
Team:obs-ds-hosted-services Label for the Observability Hosted Services team

Comments

@kaiyan-sheng
Copy link
Contributor

Describe the enhancement:

Current S3 input without SQS notification calls ListObjects API to collect all logs/objects from the given S3 bucket. There is no filter functionality so users will get logs both old and new from the bucket.

It would be nice to have a start_timestamp config parameter for users to specify a timestamp. Instead of ingesting all logs from the bucket, we can call the same ListObjects API call, filter the results using the start_timestamp and only store logs that has a content LastModified >= start_timestamp.

Describe a specific use case for the enhancement or feature:
With the config below, we should only store logs with LastModified after start_timestamp: 2024-10-11T00:00:00+00:00.

filebeat.inputs:
- type: aws-s3
  enabled: true
  bucket_arn: arn:aws:s3:::test-s3-bucket
  start_timestamp: 2024-10-11T00:00:00+00:00

This is what the ListObjects API call returns:

kaiyansheng ~  $ aws s3api list-objects --profile elastic-observability --bucket test-s3-bucket-ks
{
    "Contents": [
        {
            "Key": "AWSLogs/123/",
            "LastModified": "2024-10-11T17:14:41+00:00",
            "ETag": "\"d41d8cd98f00b204e9800998ecf8427e\"",
            "Size": 0,
            "StorageClass": "STANDARD",
            "Owner": {
                "ID": "xxx"
            }
        },
        {
            "Key": "AWSLogs/123/vpcflowlogs/us-east-1/2024/10/11/627286350134_vpcflowlogs_us-east-1_fl-076d15c25200b764f_20241011T1715Z_b12bee6c.log.gz",
            "LastModified": "2024-10-11T17:21:43+00:00",
            "ETag": "\"910555fdc5893bc433a020f5baee904e\"",
            "Size": 1021,
            "StorageClass": "STANDARD",
            "Owner": {
                "ID": "xxx"
            }
        },
...
@botelastic botelastic bot added the needs_team Indicates that the issue/PR needs a Team:* label label Oct 14, 2024
@kaiyan-sheng kaiyan-sheng added the Team:obs-ds-hosted-services Label for the Observability Hosted Services team label Oct 14, 2024
@botelastic botelastic bot removed the needs_team Indicates that the issue/PR needs a Team:* label label Oct 14, 2024
@elasticmachine
Copy link
Collaborator

Pinging @elastic/obs-ds-hosted-services (Team:obs-ds-hosted-services)

@Kavindu-Dodan
Copy link
Contributor

This is a nice addition for end users. Besides, this should improve performance with object parsing and storing, as beats will only process objects that fulfill the timestamp requirement.

@anuj-elastic
Copy link

@Kavindu-Dodan @kaiyan-sheng For your reference, I’m sharing the workaround the customer developed to retrieve only the new files. Please take a look and let me know if the logic might be useful for us as well.

Overview -

The scripts uses Boto3 library to interact with AWS services which are present in S3 bucket for fetching AWS logs. Once the logs are fetched , they are sent over UDP to a designated Port for further processing.

Logic Breakdown

  1. AWS credentials and Boto3 Setup

First the script initializes the Boto3 client with the necessary AWS credentials, allowing it to authenticate and interact with AWS services.

        /*

           s3_client=boto3.client('s3',

                        aws_access_key_id=' ',

                       aws_secret_access_key=' ',

                        region_name=' '

    )



        */
  1. get_object_key() -
    This is the function which will retrieve Key( file name) from aws bucket based on last modified time. We have to provide predefined bucket name and prefix
  • First we determine the current_time and the time from the last 15 minutes .Both times are then adjusted to set the seconds and microseconds to zero, ensuring that no files are lost due to potential differences in microseconds.

          /*
    
              current_time=datetime.now(timezone.utc)
    
              time_15_minutes_ago = current_time - timedelta(minutes=15)
    
             year=time_15_minutes_ago.year
    
              month=time_15_minutes_ago.month
    
              day=time_15_minutes_ago.day
    
              current_time=current_time.replace(second=0,microsecond=0)
    
              time_15_minutes_ago=time_15_minutes_ago.replace(second=0,microsecond=0)
    
    
    
          */
    
  • Next, we set the provided prefix and append '{year}/{month}/{day}' ( which is extracted from time which is last 15 minutes from current_time) to it in order to retrieve files from the current day.

                      /*
    
                      "prefix"="path/to/prefix/{year}/{month}/{day}"
    
                      prefix=prefix.format(year=year,month=month,day=day)
    
    
    
                      */
    
                    
            - We are currently setting the last modified time to 15 minutes ago and retrieving file names based on the following  logic:
    
    
    
                      If the current time is 00:15 and the last_modified_time  is 00:00, the prefix will be '{2024}/{10}/{28}'. However, if there are files in the current (present day) directory with a last_modified_time of 2024-10-23 11:59:58, we will include all files that are present in current prefix.
    
    
                      /*
    
                     
    
                      if time_15_minutes_ago.hour == 0 and time_15_minutes_ago.minute == 0 and time_15_minutes_ago.second == 0:
    
                      for obj in page['Contents']:
    
                          if obj['LastModified']<=current_time:
    
                              obj_list.append(obj['Key'])
    
                      */
    
                      - else we check if last_modified_time is less than current_time and last_modified_time  is within last 15 minutes time , we add that file in our list
    
                      /*
    
                                  for obj in page['Contents']:
    
    
    
                          if obj['LastModified']<=current_time and obj['LastModified']>=time_15_minutes_ago:
    
                              obj_list.append(obj['Key'])
    
    
    
                      */
    
  1. get_object_data()

We then pass bucket name and object key asynchronously to retreive file data based on file suffix

        /*


        response=s3_client.get_object(Bucket=bucket,Key=object_key)

        */
  1. Then we forward data on particular UDP port

@anuj-elastic
Copy link

Hi @Kavindu-Dodan @kaiyan-sheng ,

Could you please provide an update and share the ETA for this? This would help us communicate effectively with the customer, as they consider this a critical feature and have been following up regularly.

@Kavindu-Dodan
Copy link
Contributor

@anuj-elastic I am working on adding this feature through PR #41817. And I am aiming to release this with 8.18.0, which is planned for early next year. Along with that, I am planning to upgrade the integrations to support the new feature. This is planned through elastic/integrations#11919

@anuj-elastic
Copy link

Thanks for the update @Kavindu-Dodan. It's wonderful to hear this update. The customer has been consistently following up on this for the past couple of months, and now I have a timeline to share with them and set expectations accordingly.

@anuj-elastic
Copy link

@Kavindu-Dodan Do you have any idea if the similar issue with Cloudflare Integration will also clubbed with the same fix?

@bturquet
Copy link
Contributor

bturquet commented Dec 9, 2024

@anuj-elastic related to your question, please see the update here

  1. Once 8.16.2 is released, we could check if the registry cleanup improves the performance for this customer.
  2. The registry cleanup mentioned above is a first step of the improvement, if this is not enough we will have to wait for 8.18.0 to be released, to benefit the timestamp filter.

@Kavindu-Dodan
Copy link
Contributor

Kavindu-Dodan commented Dec 30, 2024

@anuj-elastic Additionally to what @bturquet said, PR #41817 will bring configurations to address the performance considerations. Since Cloudflare integration internally utilizes the S3 implementation, I hope these new configurations can also fix the referenced issue.

The update of the integrations will be done through elastic/integrations#11919

@Kavindu-Dodan
Copy link
Contributor

PR #41817 was merged on 07-Jan-2025 and configurations to avoid registry state growth, ignore_older & start_timestamp will be delivered with beats 8.18.0

@Kavindu-Dodan
Copy link
Contributor

Update - We are backporting this improvement to 8.16.x & 8.17.x releases.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Team:obs-ds-hosted-services Label for the Observability Hosted Services team
Projects
None yet
5 participants