Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[eixir] feat: remove athena partitioning #123

Merged
merged 2 commits into from
Dec 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions .env.template
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@ GLUE_DATABASE_INCOMING=
GLUE_DATABASE_SPRINGBOARD=
GLUE_JOB_CUBIC_INGESTION_INGEST_INCOMING=

# athena
ATHENA_WORKGROUP=

# cubic dmap
CUBIC_DMAP_BASE_URL=
CUBIC_DMAP_CONTROLLED_USER_API_KEY=
Expand Down
3 changes: 1 addition & 2 deletions ex_cubic_ingestion/config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,4 @@ config :ex_cubic_ingestion,
System.get_env("GLUE_JOB_CUBIC_INGESTION_INGEST_INCOMING", ""),
dmap_base_url: System.get_env("CUBIC_DMAP_BASE_URL", ""),
dmap_controlled_user_api_key: System.get_env("CUBIC_DMAP_CONTROLLED_USER_API_KEY", ""),
dmap_public_user_api_key: System.get_env("CUBIC_DMAP_PUBLIC_USER_API_KEY", ""),
athena_workgroup: System.get_env("ATHENA_WORKGROUP", "")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There will be a complete cleanup effort on the Infra side after these set of changes are working well.

dmap_public_user_api_key: System.get_env("CUBIC_DMAP_PUBLIC_USER_API_KEY", "")
54 changes: 0 additions & 54 deletions ex_cubic_ingestion/lib/ex_aws/ex_aws_athena.ex

This file was deleted.

79 changes: 0 additions & 79 deletions ex_cubic_ingestion/lib/ex_aws/helpers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -47,83 +47,4 @@ defmodule ExAws.Helpers do
end
end
end

@doc """
Athena: Do a batch call to get the status of all the query executions. Based on all the
statuses, return :ok if all succeeded, and {:error, ...} otherwise.
"""
@spec monitor_athena_query_executions(module(), [{:ok, map()}]) :: :ok | {:error, String.t()}
def monitor_athena_query_executions(lib_ex_aws, success_requests) do
{request_status, %{"QueryExecutions" => query_executions}} =
success_requests
|> Enum.map(fn {:ok, %{"QueryExecutionId" => query_execution_id}} ->
query_execution_id
end)
|> get_athena_query_executions_status(lib_ex_aws)

# return ok only if all the queries succeeded
with :ok <- request_status,
true <- Enum.all?(query_executions, &query_succeeded?/1) do
Logger.info("Athena Query Executions Status: #{Jason.encode!(query_executions)}")

:ok
else
_errored ->
{:error, "Athena Batch Get Query Execution: #{Jason.encode!(query_executions)}"}
end
end

@spec get_athena_query_executions_status([String.t()], module()) ::
{:ok, map()} | {:error, String.t()}
defp get_athena_query_executions_status(query_execution_ids, lib_ex_aws) do
# pause a litte before getting status
Process.sleep(2_000)

batch_get_query_execution_request =
query_execution_ids
|> ExAws.Athena.batch_get_query_execution()
|> lib_ex_aws.request()

with {:ok, %{"QueryExecutions" => query_executions}} <- batch_get_query_execution_request,
true <- queries_running?(query_executions) do
Logger.info("Athena Query Executions Status: #{Jason.encode!(query_executions)}")

get_athena_query_executions_status(query_execution_ids, lib_ex_aws)
else
_succeeded_or_errored ->
batch_get_query_execution_request
end
end

defp queries_running?(query_executions) do
Enum.any?(query_executions, fn %{"Status" => %{"State" => state}} ->
state == "QUEUED" || state == "RUNNING"
end)
end

defp query_succeeded?(%{"Status" => %{"State" => "SUCCEEDED"}}) do
true
end

defp query_succeeded?(%{
"Status" => %{
"State" => "FAILED",
"AthenaError" => %{"ErrorMessage" => "Partition already exists."}
}
}) do
true
end

defp query_succeeded?(%{
"Status" => %{
"State" => "FAILED",
"AthenaError" => %{"ErrorMessage" => "Partition entries already exist."}
}
}) do
true
end

defp query_succeeded?(_query_execution_status) do
false
end
end
59 changes: 2 additions & 57 deletions ex_cubic_ingestion/lib/ex_cubic_ingestion/workers/ingest.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ defmodule ExCubicIngestion.Workers.Ingest do
# gather the information needed to make AWS requests
job_payload = construct_job_payload(load_rec_ids)

with :ok <- run_glue_job(lib_ex_aws, job_payload),
:ok <- add_athena_partitions(lib_ex_aws, job_payload) do
with :ok <- run_glue_job(lib_ex_aws, job_payload) do
update_statuses(job_payload)
end
end
Expand Down Expand Up @@ -146,61 +145,7 @@ defmodule ExCubicIngestion.Workers.Ingest do
end
end

# If Glue job is successful, adds the Athena partition for each load only by start a query
# execution with the "ALTER TABLE" statement, and then doing a batched status call for all the
# queries.
@spec add_athena_partitions(module(), {map(), map()}) :: Oban.Worker.result()
defp add_athena_partitions(lib_ex_aws, {_env_payload, %{loads: loads}}) do
success_error_requests =
loads
# make requests to start query executions
|> Enum.map(&start_add_partition_query_execution(lib_ex_aws, &1))
# split into successful requests and failures
|> Enum.split_with(fn {status, _response_body} ->
status == :ok
end)

case success_error_requests do
# if all succesful, monitor their status
{success_requests, []} ->
ExAws.Helpers.monitor_athena_query_executions(lib_ex_aws, success_requests)

# if any failures, fail the job as well
{_success_requests, error_requests} ->
error_requests_bodies =
error_requests
|> Enum.map(fn {:error, {exception, message}} ->
%{exception: exception, message: message}
end)
|> Jason.encode!()

{:error, "Athena Start Query Executions: #{error_requests_bodies}"}
end
end

@spec start_add_partition_query_execution(module(), map()) :: {:ok, term()} | {:error, term()}
defp start_add_partition_query_execution(lib_ex_aws, load) do
bucket_operations = Application.fetch_env!(:ex_cubic_ingestion, :s3_bucket_operations)

prefix_operations = Application.fetch_env!(:ex_cubic_ingestion, :s3_bucket_prefix_operations)

partitions =
Enum.map_join(load.partition_columns, ", ", fn partition_column ->
"#{partition_column.name} = '#{partition_column.value}'"
end)

# sleep a little to avoid throttling, at most 10 requests will be made per job
Process.sleep(1000)

lib_ex_aws.request(
ExAws.Athena.start_query_execution(
"ALTER TABLE #{load.destination_table_name} ADD PARTITION (#{partitions});",
%{OutputLocation: "s3://#{bucket_operations}/#{prefix_operations}athena/"}
)
)
end

# If adding the partition to Athena is successful, update the status of all loads
# If Glue job run is successful, update the status of all loads
# to 'ready_for_archiving' allowing the archiving process to begin.
@spec update_statuses({map(), map()}) :: Oban.Worker.result()
defp update_statuses({_env_payload, %{loads: loads}}) do
Expand Down
40 changes: 0 additions & 40 deletions ex_cubic_ingestion/test/ex_aws/helpers_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -67,44 +67,4 @@ defmodule ExAws.HelpersTest do
)
end
end

describe "monitor_athena_query_executions/2" do
test "returns immediately if all queries have succeeded" do
query_executions = [
{:ok, %{"QueryExecutionId" => "success_query_id_1"}},
{:ok, %{"QueryExecutionId" => "success_query_id_2"}}
]

assert :ok = ExAws.Helpers.monitor_athena_query_executions(MockExAws, query_executions)
end

test "returns immediately if any of the queries have failed or were cancelled" do
query_executions = [
{:ok, %{"QueryExecutionId" => "cancel_query_id"}},
{:ok, %{"QueryExecutionId" => "fail_query_id"}}
]

assert {:error, _message} =
ExAws.Helpers.monitor_athena_query_executions(MockExAws, query_executions)
end

test "returns success even if we have a failed query due to an already added partition" do
query_executions = [
{:ok, %{"QueryExecutionId" => "success_query_id"}},
{:ok, %{"QueryExecutionId" => "already_added_partition_query_id"}}
]

assert :ok = ExAws.Helpers.monitor_athena_query_executions(MockExAws, query_executions)
end

# note: multiple added partitions
test "returns success even if we have a failed query due to already added partitions" do
query_executions = [
{:ok, %{"QueryExecutionId" => "success_query_id"}},
{:ok, %{"QueryExecutionId" => "already_added_partitions_query_id"}}
]

assert :ok = ExAws.Helpers.monitor_athena_query_executions(MockExAws, query_executions)
end
end
end
100 changes: 0 additions & 100 deletions ex_cubic_ingestion/test/support/ex_aws.ex
Original file line number Diff line number Diff line change
Expand Up @@ -350,106 +350,6 @@ defmodule MockExAws do
end
end

def request(
%{service: :athena, data: %{QueryExecutionIds: ["success_query_id", "success_query_id"]}},
_config_overrides
) do
{:ok,
%{
"QueryExecutions" => [
%{"Status" => %{"State" => "SUCCEEDED"}},
%{"Status" => %{"State" => "SUCCEEDED"}}
]
}}
end

def request(
%{service: :athena, data: %{QueryExecutionIds: ["success_query_id"]}},
_config_overrides
) do
{:ok, %{"QueryExecutions" => [%{"Status" => %{"State" => "SUCCEEDED"}}]}}
end

def request(
%{
service: :athena,
data: %{QueryExecutionIds: ["success_query_id_1", "success_query_id_2"]}
},
_config_overrides
) do
{:ok,
%{
"QueryExecutions" => [
%{"Status" => %{"State" => "SUCCEEDED"}},
%{"Status" => %{"State" => "SUCCEEDED"}}
]
}}
end

def request(
%{service: :athena, data: %{QueryExecutionIds: ["cancel_query_id", "fail_query_id"]}},
_config_overrides
) do
{:ok,
%{
"QueryExecutions" => [
%{"Status" => %{"State" => "CANCELLED"}},
%{"Status" => %{"State" => "FAILED"}}
]
}}
end

def request(
%{
service: :athena,
data: %{QueryExecutionIds: ["success_query_id", "already_added_partition_query_id"]}
},
_config_overrides
) do
{:ok,
%{
"QueryExecutions" => [
%{"Status" => %{"State" => "SUCCEEDED"}},
%{
"Status" => %{
"State" => "FAILED",
"AthenaError" => %{"ErrorMessage" => "Partition already exists."}
}
}
]
}}
end

def request(
%{
service: :athena,
# note: multiple added partitions
data: %{QueryExecutionIds: ["success_query_id", "already_added_partitions_query_id"]}
},
_config_overrides
) do
{:ok,
%{
"QueryExecutions" => [
%{"Status" => %{"State" => "SUCCEEDED"}},
%{
"Status" => %{
"State" => "FAILED",
"AthenaError" => %{"ErrorMessage" => "Partition entries already exist."}
}
}
]
}}
end

def request(%{service: :athena} = op, _config_overrides) do
if Enum.member?(op.headers, {"x-amz-target", "AmazonAthena.StartQueryExecution"}) do
{:ok, %{"QueryExecutionId" => "success_query_id"}}
else
{:error, "athena failed"}
end
end

@spec request!(ExAws.Operation.t(), keyword) :: term
def request!(op, config_overrides \\ []) do
case request(op, config_overrides) do
Expand Down
Loading