Skip to content

Commit

Permalink
feat: remove athena partitioning
Browse files Browse the repository at this point in the history
  • Loading branch information
grejdi-mbta committed Dec 31, 2024
1 parent b4e6d02 commit d538563
Show file tree
Hide file tree
Showing 8 changed files with 15 additions and 350 deletions.
3 changes: 0 additions & 3 deletions .env.template
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@ GLUE_DATABASE_INCOMING=
GLUE_DATABASE_SPRINGBOARD=
GLUE_JOB_CUBIC_INGESTION_INGEST_INCOMING=

# athena
ATHENA_WORKGROUP=

# cubic dmap
CUBIC_DMAP_BASE_URL=
CUBIC_DMAP_CONTROLLED_USER_API_KEY=
Expand Down
3 changes: 1 addition & 2 deletions ex_cubic_ingestion/config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,4 @@ config :ex_cubic_ingestion,
System.get_env("GLUE_JOB_CUBIC_INGESTION_INGEST_INCOMING", ""),
dmap_base_url: System.get_env("CUBIC_DMAP_BASE_URL", ""),
dmap_controlled_user_api_key: System.get_env("CUBIC_DMAP_CONTROLLED_USER_API_KEY", ""),
dmap_public_user_api_key: System.get_env("CUBIC_DMAP_PUBLIC_USER_API_KEY", ""),
athena_workgroup: System.get_env("ATHENA_WORKGROUP", "")
dmap_public_user_api_key: System.get_env("CUBIC_DMAP_PUBLIC_USER_API_KEY", "")
54 changes: 0 additions & 54 deletions ex_cubic_ingestion/lib/ex_aws/ex_aws_athena.ex

This file was deleted.

79 changes: 0 additions & 79 deletions ex_cubic_ingestion/lib/ex_aws/helpers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -47,83 +47,4 @@ defmodule ExAws.Helpers do
end
end
end

@doc """
Athena: Do a batch call to get the status of all the query executions. Based on all the
statuses, return :ok if all succeeded, and {:error, ...} otherwise.
"""
@spec monitor_athena_query_executions(module(), [{:ok, map()}]) :: :ok | {:error, String.t()}
def monitor_athena_query_executions(lib_ex_aws, success_requests) do
{request_status, %{"QueryExecutions" => query_executions}} =
success_requests
|> Enum.map(fn {:ok, %{"QueryExecutionId" => query_execution_id}} ->
query_execution_id
end)
|> get_athena_query_executions_status(lib_ex_aws)

# return ok only if all the queries succeeded
with :ok <- request_status,
true <- Enum.all?(query_executions, &query_succeeded?/1) do
Logger.info("Athena Query Executions Status: #{Jason.encode!(query_executions)}")

:ok
else
_errored ->
{:error, "Athena Batch Get Query Execution: #{Jason.encode!(query_executions)}"}
end
end

@spec get_athena_query_executions_status([String.t()], module()) ::
{:ok, map()} | {:error, String.t()}
defp get_athena_query_executions_status(query_execution_ids, lib_ex_aws) do
# pause a litte before getting status
Process.sleep(2_000)

batch_get_query_execution_request =
query_execution_ids
|> ExAws.Athena.batch_get_query_execution()
|> lib_ex_aws.request()

with {:ok, %{"QueryExecutions" => query_executions}} <- batch_get_query_execution_request,
true <- queries_running?(query_executions) do
Logger.info("Athena Query Executions Status: #{Jason.encode!(query_executions)}")

get_athena_query_executions_status(query_execution_ids, lib_ex_aws)
else
_succeeded_or_errored ->
batch_get_query_execution_request
end
end

defp queries_running?(query_executions) do
Enum.any?(query_executions, fn %{"Status" => %{"State" => state}} ->
state == "QUEUED" || state == "RUNNING"
end)
end

defp query_succeeded?(%{"Status" => %{"State" => "SUCCEEDED"}}) do
true
end

defp query_succeeded?(%{
"Status" => %{
"State" => "FAILED",
"AthenaError" => %{"ErrorMessage" => "Partition already exists."}
}
}) do
true
end

defp query_succeeded?(%{
"Status" => %{
"State" => "FAILED",
"AthenaError" => %{"ErrorMessage" => "Partition entries already exist."}
}
}) do
true
end

defp query_succeeded?(_query_execution_status) do
false
end
end
59 changes: 2 additions & 57 deletions ex_cubic_ingestion/lib/ex_cubic_ingestion/workers/ingest.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ defmodule ExCubicIngestion.Workers.Ingest do
# gather the information needed to make AWS requests
job_payload = construct_job_payload(load_rec_ids)

with :ok <- run_glue_job(lib_ex_aws, job_payload),
:ok <- add_athena_partitions(lib_ex_aws, job_payload) do
with :ok <- run_glue_job(lib_ex_aws, job_payload) do
update_statuses(job_payload)
end
end
Expand Down Expand Up @@ -146,61 +145,7 @@ defmodule ExCubicIngestion.Workers.Ingest do
end
end

# If Glue job is successful, adds the Athena partition for each load only by start a query
# execution with the "ALTER TABLE" statement, and then doing a batched status call for all the
# queries.
@spec add_athena_partitions(module(), {map(), map()}) :: Oban.Worker.result()
defp add_athena_partitions(lib_ex_aws, {_env_payload, %{loads: loads}}) do
success_error_requests =
loads
# make requests to start query executions
|> Enum.map(&start_add_partition_query_execution(lib_ex_aws, &1))
# split into successful requests and failures
|> Enum.split_with(fn {status, _response_body} ->
status == :ok
end)

case success_error_requests do
# if all succesful, monitor their status
{success_requests, []} ->
ExAws.Helpers.monitor_athena_query_executions(lib_ex_aws, success_requests)

# if any failures, fail the job as well
{_success_requests, error_requests} ->
error_requests_bodies =
error_requests
|> Enum.map(fn {:error, {exception, message}} ->
%{exception: exception, message: message}
end)
|> Jason.encode!()

{:error, "Athena Start Query Executions: #{error_requests_bodies}"}
end
end

@spec start_add_partition_query_execution(module(), map()) :: {:ok, term()} | {:error, term()}
defp start_add_partition_query_execution(lib_ex_aws, load) do
bucket_operations = Application.fetch_env!(:ex_cubic_ingestion, :s3_bucket_operations)

prefix_operations = Application.fetch_env!(:ex_cubic_ingestion, :s3_bucket_prefix_operations)

partitions =
Enum.map_join(load.partition_columns, ", ", fn partition_column ->
"#{partition_column.name} = '#{partition_column.value}'"
end)

# sleep a little to avoid throttling, at most 10 requests will be made per job
Process.sleep(1000)

lib_ex_aws.request(
ExAws.Athena.start_query_execution(
"ALTER TABLE #{load.destination_table_name} ADD PARTITION (#{partitions});",
%{OutputLocation: "s3://#{bucket_operations}/#{prefix_operations}athena/"}
)
)
end

# If adding the partition to Athena is successful, update the status of all loads
# If Glue job run is successful, update the status of all loads
# to 'ready_for_archiving' allowing the archiving process to begin.
@spec update_statuses({map(), map()}) :: Oban.Worker.result()
defp update_statuses({_env_payload, %{loads: loads}}) do
Expand Down
40 changes: 0 additions & 40 deletions ex_cubic_ingestion/test/ex_aws/helpers_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -67,44 +67,4 @@ defmodule ExAws.HelpersTest do
)
end
end

describe "monitor_athena_query_executions/2" do
test "returns immediately if all queries have succeeded" do
query_executions = [
{:ok, %{"QueryExecutionId" => "success_query_id_1"}},
{:ok, %{"QueryExecutionId" => "success_query_id_2"}}
]

assert :ok = ExAws.Helpers.monitor_athena_query_executions(MockExAws, query_executions)
end

test "returns immediately if any of the queries have failed or were cancelled" do
query_executions = [
{:ok, %{"QueryExecutionId" => "cancel_query_id"}},
{:ok, %{"QueryExecutionId" => "fail_query_id"}}
]

assert {:error, _message} =
ExAws.Helpers.monitor_athena_query_executions(MockExAws, query_executions)
end

test "returns success even if we have a failed query due to an already added partition" do
query_executions = [
{:ok, %{"QueryExecutionId" => "success_query_id"}},
{:ok, %{"QueryExecutionId" => "already_added_partition_query_id"}}
]

assert :ok = ExAws.Helpers.monitor_athena_query_executions(MockExAws, query_executions)
end

# note: multiple added partitions
test "returns success even if we have a failed query due to already added partitions" do
query_executions = [
{:ok, %{"QueryExecutionId" => "success_query_id"}},
{:ok, %{"QueryExecutionId" => "already_added_partitions_query_id"}}
]

assert :ok = ExAws.Helpers.monitor_athena_query_executions(MockExAws, query_executions)
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ defmodule ExCubicIngestion.Schema.CubicLoadTest do
dmap_table: dmap_table,
dmap_load_objects: dmap_load_objects
} do
{:ok, {_table_original, nil, new_dmap_load_recs}} =
{:ok, {nil, new_dmap_load_recs}} =
CubicLoad.insert_new_from_objects_with_table(dmap_load_objects, dmap_table)

assert Enum.map(dmap_load_objects, & &1.key) ==
Expand All @@ -96,7 +96,7 @@ defmodule ExCubicIngestion.Schema.CubicLoadTest do
load.s3_key
end)

assert {:ok, {dmap_table, nil, []}} ==
assert {:ok, {nil, []}} ==
CubicLoad.insert_new_from_objects_with_table(dmap_load_objects, dmap_table)

# add a new object
Expand All @@ -111,8 +111,7 @@ defmodule ExCubicIngestion.Schema.CubicLoadTest do

# adding one more load object, should only insert it as a load record
assert {:ok,
{_table_after_one_more_load, nil,
[{%CubicLoad{s3_key: "cubic/dmap/sample/20220103.csv.gz"}, nil, _table, nil}]}} =
{nil, [{%CubicLoad{s3_key: "cubic/dmap/sample/20220103.csv.gz"}, nil, _table, nil}]}} =
CubicLoad.insert_new_from_objects_with_table(dmap_load_objects, dmap_table)
end

Expand All @@ -121,7 +120,7 @@ defmodule ExCubicIngestion.Schema.CubicLoadTest do
ods_table: ods_table,
ods_load_objects: ods_load_objects
} do
{:ok, {_table_original, _last_ods_table_snapshot, new_ods_load_recs}} =
{:ok, {_last_ods_table_snapshot, new_ods_load_recs}} =
CubicLoad.insert_new_from_objects_with_table(ods_load_objects, ods_table)

assert Enum.map(ods_load_objects, & &1.key) ==
Expand All @@ -133,7 +132,7 @@ defmodule ExCubicIngestion.Schema.CubicLoadTest do
end)

# inserting again should not return any new records
assert {:ok, {_table_after_insert, _last_ods_table_snapshot, []}} =
assert {:ok, {_last_ods_table_snapshot, []}} =
CubicLoad.insert_new_from_objects_with_table(ods_load_objects, ods_table)

# add a new object
Expand All @@ -148,7 +147,7 @@ defmodule ExCubicIngestion.Schema.CubicLoadTest do

# adding one more load object, should only insert it as a load record
assert {:ok,
{_table_after_one_more_load, _last_ods_table_snapshot,
{_last_ods_table_snapshot,
[
{%CubicLoad{s3_key: "cubic/ods_qlik/SAMPLE/LOAD3.csv.gz"}, _ods_load_snapshot,
_table, _ods_table_snapshot}
Expand All @@ -158,8 +157,7 @@ defmodule ExCubicIngestion.Schema.CubicLoadTest do
test "providing an empty list of objects", %{
dmap_table: dmap_table
} do
assert {:ok, {_table, nil, []}} =
CubicLoad.insert_new_from_objects_with_table([], dmap_table)
assert {:ok, {nil, []}} = CubicLoad.insert_new_from_objects_with_table([], dmap_table)
end
end

Expand Down Expand Up @@ -238,8 +236,7 @@ defmodule ExCubicIngestion.Schema.CubicLoadTest do
dmap_load_objects: dmap_load_objects
} do
# insert records as ready
{:ok,
{_table_after_insert, nil, [{first_new_load_rec, nil, _table, nil} | rest_new_load_recs]}} =
{:ok, {nil, [{first_new_load_rec, nil, _table, nil} | rest_new_load_recs]}} =
CubicLoad.insert_new_from_objects_with_table(dmap_load_objects, dmap_table)

# set the first record to 'archived'
Expand Down Expand Up @@ -302,7 +299,7 @@ defmodule ExCubicIngestion.Schema.CubicLoadTest do
dmap_load_objects: dmap_load_objects
} do
# insert records as ready
{:ok, {_table_after_insert, nil, [{first_new_load_rec, nil, _table, nil} | _rest]}} =
{:ok, {nil, [{first_new_load_rec, nil, _table, nil} | _rest]}} =
CubicLoad.insert_new_from_objects_with_table(dmap_load_objects, dmap_table)

# update it to 'archived' status
Expand All @@ -322,7 +319,7 @@ defmodule ExCubicIngestion.Schema.CubicLoadTest do
dmap_load_objects: dmap_load_objects
} do
# insert records as ready
{:ok, {_table_after_insert, nil, new_load_recs}} =
{:ok, {nil, new_load_recs}} =
CubicLoad.insert_new_from_objects_with_table(dmap_load_objects, dmap_table)

expected =
Expand All @@ -349,7 +346,7 @@ defmodule ExCubicIngestion.Schema.CubicLoadTest do
dmap_load_objects: dmap_load_objects
} do
# insert records as ready
{:ok, {_table_after_insert, nil, new_load_recs}} =
{:ok, {nil, new_load_recs}} =
CubicLoad.insert_new_from_objects_with_table(dmap_load_objects, dmap_table)

new_load_rec_ids =
Expand All @@ -371,7 +368,7 @@ defmodule ExCubicIngestion.Schema.CubicLoadTest do
dmap_table: dmap_table,
dmap_load_objects: dmap_load_objects
} do
{:ok, {_table, nil, new_load_recs}} =
{:ok, {nil, new_load_recs}} =
CubicLoad.insert_new_from_objects_with_table(dmap_load_objects, dmap_table)

{_, ready_loads_by_table_query} =
Expand Down
Loading

0 comments on commit d538563

Please sign in to comment.