From b4e6d02e9f3a22c09f0c5c314947ffa8425d91fa Mon Sep 17 00:00:00 2001 From: Grejdi Gjura Date: Fri, 27 Dec 2024 12:04:44 -0500 Subject: [PATCH 1/2] chore: update elixir to 1.18 --- .../schema/cubic_load_test.exs | 27 ++++++++++--------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/ex_cubic_ingestion/test/ex_cubic_ingestion/schema/cubic_load_test.exs b/ex_cubic_ingestion/test/ex_cubic_ingestion/schema/cubic_load_test.exs index d8b2a4f..7f5fd51 100644 --- a/ex_cubic_ingestion/test/ex_cubic_ingestion/schema/cubic_load_test.exs +++ b/ex_cubic_ingestion/test/ex_cubic_ingestion/schema/cubic_load_test.exs @@ -86,7 +86,7 @@ defmodule ExCubicIngestion.Schema.CubicLoadTest do dmap_table: dmap_table, dmap_load_objects: dmap_load_objects } do - {:ok, {nil, new_dmap_load_recs}} = + {:ok, {_table_original, nil, new_dmap_load_recs}} = CubicLoad.insert_new_from_objects_with_table(dmap_load_objects, dmap_table) assert Enum.map(dmap_load_objects, & &1.key) == @@ -96,7 +96,7 @@ defmodule ExCubicIngestion.Schema.CubicLoadTest do load.s3_key end) - assert {:ok, {nil, []}} == + assert {:ok, {dmap_table, nil, []}} == CubicLoad.insert_new_from_objects_with_table(dmap_load_objects, dmap_table) # add a new object @@ -111,7 +111,8 @@ defmodule ExCubicIngestion.Schema.CubicLoadTest do # adding one more load object, should only insert it as a load record assert {:ok, - {nil, [{%CubicLoad{s3_key: "cubic/dmap/sample/20220103.csv.gz"}, nil, _table, nil}]}} = + {_table_after_one_more_load, nil, + [{%CubicLoad{s3_key: "cubic/dmap/sample/20220103.csv.gz"}, nil, _table, nil}]}} = CubicLoad.insert_new_from_objects_with_table(dmap_load_objects, dmap_table) end @@ -120,7 +121,7 @@ defmodule ExCubicIngestion.Schema.CubicLoadTest do ods_table: ods_table, ods_load_objects: ods_load_objects } do - {:ok, {_last_ods_table_snapshot, new_ods_load_recs}} = + {:ok, {_table_original, _last_ods_table_snapshot, new_ods_load_recs}} = CubicLoad.insert_new_from_objects_with_table(ods_load_objects, ods_table) assert Enum.map(ods_load_objects, & &1.key) == @@ -132,7 +133,7 @@ defmodule ExCubicIngestion.Schema.CubicLoadTest do end) # inserting again should not return any new records - assert {:ok, {_last_ods_table_snapshot, []}} = + assert {:ok, {_table_after_insert, _last_ods_table_snapshot, []}} = CubicLoad.insert_new_from_objects_with_table(ods_load_objects, ods_table) # add a new object @@ -147,7 +148,7 @@ defmodule ExCubicIngestion.Schema.CubicLoadTest do # adding one more load object, should only insert it as a load record assert {:ok, - {_last_ods_table_snapshot, + {_table_after_one_more_load, _last_ods_table_snapshot, [ {%CubicLoad{s3_key: "cubic/ods_qlik/SAMPLE/LOAD3.csv.gz"}, _ods_load_snapshot, _table, _ods_table_snapshot} @@ -157,7 +158,8 @@ defmodule ExCubicIngestion.Schema.CubicLoadTest do test "providing an empty list of objects", %{ dmap_table: dmap_table } do - assert {:ok, {nil, []}} = CubicLoad.insert_new_from_objects_with_table([], dmap_table) + assert {:ok, {_table, nil, []}} = + CubicLoad.insert_new_from_objects_with_table([], dmap_table) end end @@ -236,7 +238,8 @@ defmodule ExCubicIngestion.Schema.CubicLoadTest do dmap_load_objects: dmap_load_objects } do # insert records as ready - {:ok, {nil, [{first_new_load_rec, nil, _table, nil} | rest_new_load_recs]}} = + {:ok, + {_table_after_insert, nil, [{first_new_load_rec, nil, _table, nil} | rest_new_load_recs]}} = CubicLoad.insert_new_from_objects_with_table(dmap_load_objects, dmap_table) # set the first record to 'archived' @@ -299,7 +302,7 @@ defmodule ExCubicIngestion.Schema.CubicLoadTest do dmap_load_objects: dmap_load_objects } do # insert records as ready - {:ok, {nil, [{first_new_load_rec, nil, _table, nil} | _rest]}} = + {:ok, {_table_after_insert, nil, [{first_new_load_rec, nil, _table, nil} | _rest]}} = CubicLoad.insert_new_from_objects_with_table(dmap_load_objects, dmap_table) # update it to 'archived' status @@ -319,7 +322,7 @@ defmodule ExCubicIngestion.Schema.CubicLoadTest do dmap_load_objects: dmap_load_objects } do # insert records as ready - {:ok, {nil, new_load_recs}} = + {:ok, {_table_after_insert, nil, new_load_recs}} = CubicLoad.insert_new_from_objects_with_table(dmap_load_objects, dmap_table) expected = @@ -346,7 +349,7 @@ defmodule ExCubicIngestion.Schema.CubicLoadTest do dmap_load_objects: dmap_load_objects } do # insert records as ready - {:ok, {nil, new_load_recs}} = + {:ok, {_table_after_insert, nil, new_load_recs}} = CubicLoad.insert_new_from_objects_with_table(dmap_load_objects, dmap_table) new_load_rec_ids = @@ -368,7 +371,7 @@ defmodule ExCubicIngestion.Schema.CubicLoadTest do dmap_table: dmap_table, dmap_load_objects: dmap_load_objects } do - {:ok, {nil, new_load_recs}} = + {:ok, {_table, nil, new_load_recs}} = CubicLoad.insert_new_from_objects_with_table(dmap_load_objects, dmap_table) {_, ready_loads_by_table_query} = From d538563e4952dec66a04a0393d12b00599ee814d Mon Sep 17 00:00:00 2001 From: Grejdi Gjura Date: Thu, 26 Dec 2024 16:50:44 -0500 Subject: [PATCH 2/2] feat: remove athena partitioning --- .env.template | 3 - ex_cubic_ingestion/config/runtime.exs | 3 +- .../lib/ex_aws/ex_aws_athena.ex | 54 ---------- ex_cubic_ingestion/lib/ex_aws/helpers.ex | 79 -------------- .../lib/ex_cubic_ingestion/workers/ingest.ex | 59 +---------- .../test/ex_aws/helpers_test.exs | 40 ------- .../schema/cubic_load_test.exs | 27 +++-- ex_cubic_ingestion/test/support/ex_aws.ex | 100 ------------------ 8 files changed, 15 insertions(+), 350 deletions(-) delete mode 100644 ex_cubic_ingestion/lib/ex_aws/ex_aws_athena.ex diff --git a/.env.template b/.env.template index db104b7..a0c8e55 100644 --- a/.env.template +++ b/.env.template @@ -25,9 +25,6 @@ GLUE_DATABASE_INCOMING= GLUE_DATABASE_SPRINGBOARD= GLUE_JOB_CUBIC_INGESTION_INGEST_INCOMING= -# athena -ATHENA_WORKGROUP= - # cubic dmap CUBIC_DMAP_BASE_URL= CUBIC_DMAP_CONTROLLED_USER_API_KEY= diff --git a/ex_cubic_ingestion/config/runtime.exs b/ex_cubic_ingestion/config/runtime.exs index 8fcf3bc..d2a4584 100644 --- a/ex_cubic_ingestion/config/runtime.exs +++ b/ex_cubic_ingestion/config/runtime.exs @@ -32,5 +32,4 @@ config :ex_cubic_ingestion, System.get_env("GLUE_JOB_CUBIC_INGESTION_INGEST_INCOMING", ""), dmap_base_url: System.get_env("CUBIC_DMAP_BASE_URL", ""), dmap_controlled_user_api_key: System.get_env("CUBIC_DMAP_CONTROLLED_USER_API_KEY", ""), - dmap_public_user_api_key: System.get_env("CUBIC_DMAP_PUBLIC_USER_API_KEY", ""), - athena_workgroup: System.get_env("ATHENA_WORKGROUP", "") + dmap_public_user_api_key: System.get_env("CUBIC_DMAP_PUBLIC_USER_API_KEY", "") diff --git a/ex_cubic_ingestion/lib/ex_aws/ex_aws_athena.ex b/ex_cubic_ingestion/lib/ex_aws/ex_aws_athena.ex deleted file mode 100644 index e19de1c..0000000 --- a/ex_cubic_ingestion/lib/ex_aws/ex_aws_athena.ex +++ /dev/null @@ -1,54 +0,0 @@ -defmodule ExAws.Athena do - @moduledoc """ - ExAws.Athena module for making Athena requests. - See https://github.com/aws/aws-sdk-go/blob/main/models/apis/athena/2017-05-18/api-2.json - for constructing further requests. - """ - - require Ecto - - @doc """ - Build operation for 'start_query_execution' API - """ - @spec start_query_execution(String.t(), map()) :: ExAws.Operation.t() - def start_query_execution(query_string, result_configuration) do - %ExAws.Operation.JSON{ - http_method: :post, - path: "/", - headers: [ - {"x-amz-target", "AmazonAthena.StartQueryExecution"}, - {"content-type", "application/x-amz-json-1.1"} - ], - data: %{ - ClientRequestToken: Ecto.UUID.generate(), - QueryString: query_string, - ResultConfiguration: result_configuration, - WorkGroup: Application.fetch_env!(:ex_cubic_ingestion, :athena_workgroup), - QueryExecutionContext: %{ - Database: Application.fetch_env!(:ex_cubic_ingestion, :glue_database_springboard), - Catalog: "AwsDataCatalog" - } - }, - service: :athena - } - end - - @doc """ - Build operation for 'batch_get_query_execution' API - """ - @spec batch_get_query_execution([String.t()]) :: ExAws.Operation.t() - def batch_get_query_execution(query_execution_ids) do - %ExAws.Operation.JSON{ - http_method: :post, - path: "/", - headers: [ - {"x-amz-target", "AmazonAthena.BatchGetQueryExecution"}, - {"content-type", "application/x-amz-json-1.1"} - ], - data: %{ - QueryExecutionIds: query_execution_ids - }, - service: :athena - } - end -end diff --git a/ex_cubic_ingestion/lib/ex_aws/helpers.ex b/ex_cubic_ingestion/lib/ex_aws/helpers.ex index 8551450..705da1b 100644 --- a/ex_cubic_ingestion/lib/ex_aws/helpers.ex +++ b/ex_cubic_ingestion/lib/ex_aws/helpers.ex @@ -47,83 +47,4 @@ defmodule ExAws.Helpers do end end end - - @doc """ - Athena: Do a batch call to get the status of all the query executions. Based on all the - statuses, return :ok if all succeeded, and {:error, ...} otherwise. - """ - @spec monitor_athena_query_executions(module(), [{:ok, map()}]) :: :ok | {:error, String.t()} - def monitor_athena_query_executions(lib_ex_aws, success_requests) do - {request_status, %{"QueryExecutions" => query_executions}} = - success_requests - |> Enum.map(fn {:ok, %{"QueryExecutionId" => query_execution_id}} -> - query_execution_id - end) - |> get_athena_query_executions_status(lib_ex_aws) - - # return ok only if all the queries succeeded - with :ok <- request_status, - true <- Enum.all?(query_executions, &query_succeeded?/1) do - Logger.info("Athena Query Executions Status: #{Jason.encode!(query_executions)}") - - :ok - else - _errored -> - {:error, "Athena Batch Get Query Execution: #{Jason.encode!(query_executions)}"} - end - end - - @spec get_athena_query_executions_status([String.t()], module()) :: - {:ok, map()} | {:error, String.t()} - defp get_athena_query_executions_status(query_execution_ids, lib_ex_aws) do - # pause a litte before getting status - Process.sleep(2_000) - - batch_get_query_execution_request = - query_execution_ids - |> ExAws.Athena.batch_get_query_execution() - |> lib_ex_aws.request() - - with {:ok, %{"QueryExecutions" => query_executions}} <- batch_get_query_execution_request, - true <- queries_running?(query_executions) do - Logger.info("Athena Query Executions Status: #{Jason.encode!(query_executions)}") - - get_athena_query_executions_status(query_execution_ids, lib_ex_aws) - else - _succeeded_or_errored -> - batch_get_query_execution_request - end - end - - defp queries_running?(query_executions) do - Enum.any?(query_executions, fn %{"Status" => %{"State" => state}} -> - state == "QUEUED" || state == "RUNNING" - end) - end - - defp query_succeeded?(%{"Status" => %{"State" => "SUCCEEDED"}}) do - true - end - - defp query_succeeded?(%{ - "Status" => %{ - "State" => "FAILED", - "AthenaError" => %{"ErrorMessage" => "Partition already exists."} - } - }) do - true - end - - defp query_succeeded?(%{ - "Status" => %{ - "State" => "FAILED", - "AthenaError" => %{"ErrorMessage" => "Partition entries already exist."} - } - }) do - true - end - - defp query_succeeded?(_query_execution_status) do - false - end end diff --git a/ex_cubic_ingestion/lib/ex_cubic_ingestion/workers/ingest.ex b/ex_cubic_ingestion/lib/ex_cubic_ingestion/workers/ingest.ex index 08290dd..60b092c 100644 --- a/ex_cubic_ingestion/lib/ex_cubic_ingestion/workers/ingest.ex +++ b/ex_cubic_ingestion/lib/ex_cubic_ingestion/workers/ingest.ex @@ -37,8 +37,7 @@ defmodule ExCubicIngestion.Workers.Ingest do # gather the information needed to make AWS requests job_payload = construct_job_payload(load_rec_ids) - with :ok <- run_glue_job(lib_ex_aws, job_payload), - :ok <- add_athena_partitions(lib_ex_aws, job_payload) do + with :ok <- run_glue_job(lib_ex_aws, job_payload) do update_statuses(job_payload) end end @@ -146,61 +145,7 @@ defmodule ExCubicIngestion.Workers.Ingest do end end - # If Glue job is successful, adds the Athena partition for each load only by start a query - # execution with the "ALTER TABLE" statement, and then doing a batched status call for all the - # queries. - @spec add_athena_partitions(module(), {map(), map()}) :: Oban.Worker.result() - defp add_athena_partitions(lib_ex_aws, {_env_payload, %{loads: loads}}) do - success_error_requests = - loads - # make requests to start query executions - |> Enum.map(&start_add_partition_query_execution(lib_ex_aws, &1)) - # split into successful requests and failures - |> Enum.split_with(fn {status, _response_body} -> - status == :ok - end) - - case success_error_requests do - # if all succesful, monitor their status - {success_requests, []} -> - ExAws.Helpers.monitor_athena_query_executions(lib_ex_aws, success_requests) - - # if any failures, fail the job as well - {_success_requests, error_requests} -> - error_requests_bodies = - error_requests - |> Enum.map(fn {:error, {exception, message}} -> - %{exception: exception, message: message} - end) - |> Jason.encode!() - - {:error, "Athena Start Query Executions: #{error_requests_bodies}"} - end - end - - @spec start_add_partition_query_execution(module(), map()) :: {:ok, term()} | {:error, term()} - defp start_add_partition_query_execution(lib_ex_aws, load) do - bucket_operations = Application.fetch_env!(:ex_cubic_ingestion, :s3_bucket_operations) - - prefix_operations = Application.fetch_env!(:ex_cubic_ingestion, :s3_bucket_prefix_operations) - - partitions = - Enum.map_join(load.partition_columns, ", ", fn partition_column -> - "#{partition_column.name} = '#{partition_column.value}'" - end) - - # sleep a little to avoid throttling, at most 10 requests will be made per job - Process.sleep(1000) - - lib_ex_aws.request( - ExAws.Athena.start_query_execution( - "ALTER TABLE #{load.destination_table_name} ADD PARTITION (#{partitions});", - %{OutputLocation: "s3://#{bucket_operations}/#{prefix_operations}athena/"} - ) - ) - end - - # If adding the partition to Athena is successful, update the status of all loads + # If Glue job run is successful, update the status of all loads # to 'ready_for_archiving' allowing the archiving process to begin. @spec update_statuses({map(), map()}) :: Oban.Worker.result() defp update_statuses({_env_payload, %{loads: loads}}) do diff --git a/ex_cubic_ingestion/test/ex_aws/helpers_test.exs b/ex_cubic_ingestion/test/ex_aws/helpers_test.exs index 1a39c25..7aacbb4 100644 --- a/ex_cubic_ingestion/test/ex_aws/helpers_test.exs +++ b/ex_cubic_ingestion/test/ex_aws/helpers_test.exs @@ -67,44 +67,4 @@ defmodule ExAws.HelpersTest do ) end end - - describe "monitor_athena_query_executions/2" do - test "returns immediately if all queries have succeeded" do - query_executions = [ - {:ok, %{"QueryExecutionId" => "success_query_id_1"}}, - {:ok, %{"QueryExecutionId" => "success_query_id_2"}} - ] - - assert :ok = ExAws.Helpers.monitor_athena_query_executions(MockExAws, query_executions) - end - - test "returns immediately if any of the queries have failed or were cancelled" do - query_executions = [ - {:ok, %{"QueryExecutionId" => "cancel_query_id"}}, - {:ok, %{"QueryExecutionId" => "fail_query_id"}} - ] - - assert {:error, _message} = - ExAws.Helpers.monitor_athena_query_executions(MockExAws, query_executions) - end - - test "returns success even if we have a failed query due to an already added partition" do - query_executions = [ - {:ok, %{"QueryExecutionId" => "success_query_id"}}, - {:ok, %{"QueryExecutionId" => "already_added_partition_query_id"}} - ] - - assert :ok = ExAws.Helpers.monitor_athena_query_executions(MockExAws, query_executions) - end - - # note: multiple added partitions - test "returns success even if we have a failed query due to already added partitions" do - query_executions = [ - {:ok, %{"QueryExecutionId" => "success_query_id"}}, - {:ok, %{"QueryExecutionId" => "already_added_partitions_query_id"}} - ] - - assert :ok = ExAws.Helpers.monitor_athena_query_executions(MockExAws, query_executions) - end - end end diff --git a/ex_cubic_ingestion/test/ex_cubic_ingestion/schema/cubic_load_test.exs b/ex_cubic_ingestion/test/ex_cubic_ingestion/schema/cubic_load_test.exs index 7f5fd51..d8b2a4f 100644 --- a/ex_cubic_ingestion/test/ex_cubic_ingestion/schema/cubic_load_test.exs +++ b/ex_cubic_ingestion/test/ex_cubic_ingestion/schema/cubic_load_test.exs @@ -86,7 +86,7 @@ defmodule ExCubicIngestion.Schema.CubicLoadTest do dmap_table: dmap_table, dmap_load_objects: dmap_load_objects } do - {:ok, {_table_original, nil, new_dmap_load_recs}} = + {:ok, {nil, new_dmap_load_recs}} = CubicLoad.insert_new_from_objects_with_table(dmap_load_objects, dmap_table) assert Enum.map(dmap_load_objects, & &1.key) == @@ -96,7 +96,7 @@ defmodule ExCubicIngestion.Schema.CubicLoadTest do load.s3_key end) - assert {:ok, {dmap_table, nil, []}} == + assert {:ok, {nil, []}} == CubicLoad.insert_new_from_objects_with_table(dmap_load_objects, dmap_table) # add a new object @@ -111,8 +111,7 @@ defmodule ExCubicIngestion.Schema.CubicLoadTest do # adding one more load object, should only insert it as a load record assert {:ok, - {_table_after_one_more_load, nil, - [{%CubicLoad{s3_key: "cubic/dmap/sample/20220103.csv.gz"}, nil, _table, nil}]}} = + {nil, [{%CubicLoad{s3_key: "cubic/dmap/sample/20220103.csv.gz"}, nil, _table, nil}]}} = CubicLoad.insert_new_from_objects_with_table(dmap_load_objects, dmap_table) end @@ -121,7 +120,7 @@ defmodule ExCubicIngestion.Schema.CubicLoadTest do ods_table: ods_table, ods_load_objects: ods_load_objects } do - {:ok, {_table_original, _last_ods_table_snapshot, new_ods_load_recs}} = + {:ok, {_last_ods_table_snapshot, new_ods_load_recs}} = CubicLoad.insert_new_from_objects_with_table(ods_load_objects, ods_table) assert Enum.map(ods_load_objects, & &1.key) == @@ -133,7 +132,7 @@ defmodule ExCubicIngestion.Schema.CubicLoadTest do end) # inserting again should not return any new records - assert {:ok, {_table_after_insert, _last_ods_table_snapshot, []}} = + assert {:ok, {_last_ods_table_snapshot, []}} = CubicLoad.insert_new_from_objects_with_table(ods_load_objects, ods_table) # add a new object @@ -148,7 +147,7 @@ defmodule ExCubicIngestion.Schema.CubicLoadTest do # adding one more load object, should only insert it as a load record assert {:ok, - {_table_after_one_more_load, _last_ods_table_snapshot, + {_last_ods_table_snapshot, [ {%CubicLoad{s3_key: "cubic/ods_qlik/SAMPLE/LOAD3.csv.gz"}, _ods_load_snapshot, _table, _ods_table_snapshot} @@ -158,8 +157,7 @@ defmodule ExCubicIngestion.Schema.CubicLoadTest do test "providing an empty list of objects", %{ dmap_table: dmap_table } do - assert {:ok, {_table, nil, []}} = - CubicLoad.insert_new_from_objects_with_table([], dmap_table) + assert {:ok, {nil, []}} = CubicLoad.insert_new_from_objects_with_table([], dmap_table) end end @@ -238,8 +236,7 @@ defmodule ExCubicIngestion.Schema.CubicLoadTest do dmap_load_objects: dmap_load_objects } do # insert records as ready - {:ok, - {_table_after_insert, nil, [{first_new_load_rec, nil, _table, nil} | rest_new_load_recs]}} = + {:ok, {nil, [{first_new_load_rec, nil, _table, nil} | rest_new_load_recs]}} = CubicLoad.insert_new_from_objects_with_table(dmap_load_objects, dmap_table) # set the first record to 'archived' @@ -302,7 +299,7 @@ defmodule ExCubicIngestion.Schema.CubicLoadTest do dmap_load_objects: dmap_load_objects } do # insert records as ready - {:ok, {_table_after_insert, nil, [{first_new_load_rec, nil, _table, nil} | _rest]}} = + {:ok, {nil, [{first_new_load_rec, nil, _table, nil} | _rest]}} = CubicLoad.insert_new_from_objects_with_table(dmap_load_objects, dmap_table) # update it to 'archived' status @@ -322,7 +319,7 @@ defmodule ExCubicIngestion.Schema.CubicLoadTest do dmap_load_objects: dmap_load_objects } do # insert records as ready - {:ok, {_table_after_insert, nil, new_load_recs}} = + {:ok, {nil, new_load_recs}} = CubicLoad.insert_new_from_objects_with_table(dmap_load_objects, dmap_table) expected = @@ -349,7 +346,7 @@ defmodule ExCubicIngestion.Schema.CubicLoadTest do dmap_load_objects: dmap_load_objects } do # insert records as ready - {:ok, {_table_after_insert, nil, new_load_recs}} = + {:ok, {nil, new_load_recs}} = CubicLoad.insert_new_from_objects_with_table(dmap_load_objects, dmap_table) new_load_rec_ids = @@ -371,7 +368,7 @@ defmodule ExCubicIngestion.Schema.CubicLoadTest do dmap_table: dmap_table, dmap_load_objects: dmap_load_objects } do - {:ok, {_table, nil, new_load_recs}} = + {:ok, {nil, new_load_recs}} = CubicLoad.insert_new_from_objects_with_table(dmap_load_objects, dmap_table) {_, ready_loads_by_table_query} = diff --git a/ex_cubic_ingestion/test/support/ex_aws.ex b/ex_cubic_ingestion/test/support/ex_aws.ex index c379f9b..d71a529 100644 --- a/ex_cubic_ingestion/test/support/ex_aws.ex +++ b/ex_cubic_ingestion/test/support/ex_aws.ex @@ -350,106 +350,6 @@ defmodule MockExAws do end end - def request( - %{service: :athena, data: %{QueryExecutionIds: ["success_query_id", "success_query_id"]}}, - _config_overrides - ) do - {:ok, - %{ - "QueryExecutions" => [ - %{"Status" => %{"State" => "SUCCEEDED"}}, - %{"Status" => %{"State" => "SUCCEEDED"}} - ] - }} - end - - def request( - %{service: :athena, data: %{QueryExecutionIds: ["success_query_id"]}}, - _config_overrides - ) do - {:ok, %{"QueryExecutions" => [%{"Status" => %{"State" => "SUCCEEDED"}}]}} - end - - def request( - %{ - service: :athena, - data: %{QueryExecutionIds: ["success_query_id_1", "success_query_id_2"]} - }, - _config_overrides - ) do - {:ok, - %{ - "QueryExecutions" => [ - %{"Status" => %{"State" => "SUCCEEDED"}}, - %{"Status" => %{"State" => "SUCCEEDED"}} - ] - }} - end - - def request( - %{service: :athena, data: %{QueryExecutionIds: ["cancel_query_id", "fail_query_id"]}}, - _config_overrides - ) do - {:ok, - %{ - "QueryExecutions" => [ - %{"Status" => %{"State" => "CANCELLED"}}, - %{"Status" => %{"State" => "FAILED"}} - ] - }} - end - - def request( - %{ - service: :athena, - data: %{QueryExecutionIds: ["success_query_id", "already_added_partition_query_id"]} - }, - _config_overrides - ) do - {:ok, - %{ - "QueryExecutions" => [ - %{"Status" => %{"State" => "SUCCEEDED"}}, - %{ - "Status" => %{ - "State" => "FAILED", - "AthenaError" => %{"ErrorMessage" => "Partition already exists."} - } - } - ] - }} - end - - def request( - %{ - service: :athena, - # note: multiple added partitions - data: %{QueryExecutionIds: ["success_query_id", "already_added_partitions_query_id"]} - }, - _config_overrides - ) do - {:ok, - %{ - "QueryExecutions" => [ - %{"Status" => %{"State" => "SUCCEEDED"}}, - %{ - "Status" => %{ - "State" => "FAILED", - "AthenaError" => %{"ErrorMessage" => "Partition entries already exist."} - } - } - ] - }} - end - - def request(%{service: :athena} = op, _config_overrides) do - if Enum.member?(op.headers, {"x-amz-target", "AmazonAthena.StartQueryExecution"}) do - {:ok, %{"QueryExecutionId" => "success_query_id"}} - else - {:error, "athena failed"} - end - end - @spec request!(ExAws.Operation.t(), keyword) :: term def request!(op, config_overrides \\ []) do case request(op, config_overrides) do