Skip to content

Commit

Permalink
Merge pull request #52 from mbta/gg-1202207406750018-worker-for-sched…
Browse files Browse the repository at this point in the history
…ule-dmap

[elixir] feat: schedule dmap worker
  • Loading branch information
grejdi-mbta authored Jun 9, 2022
2 parents c47b372 + 6485bad commit b34a5e1
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 2 deletions.
12 changes: 10 additions & 2 deletions ex_cubic_ingestion/config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,20 @@ config :ex_cubic_ingestion,

config :ex_cubic_ingestion, Oban,
repo: ExCubicIngestion.Repo,
plugins: [],
plugins: [
{
Oban.Plugins.Cron,
crontab: [
{"0 15 * * *", ExCubicIngestion.Workers.ScheduleDmap, max_attempts: 1}
]
}
],
queues: [
archive: 5,
error: 5,
fetch_dmap: 1,
ingest: 5
ingest: 5,
schedule_dmap: 1
]

# Import environment specific config. This must remain at the bottom
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ defmodule ExCubicIngestion.Schema.CubicDmapFeed do
Repo.get!(not_deleted(), id)
end

@spec get_by!(Keyword.t() | map(), Keyword.t()) :: t() | nil
def get_by!(clauses, opts \\ []) do
Repo.get_by!(not_deleted(), clauses, opts)
end

@spec all :: [t()]
def all do
Repo.all(not_deleted())
end

@doc """
Finds the dataset that was last updated and updates the feed's last updated value
"""
Expand Down
26 changes: 26 additions & 0 deletions ex_cubic_ingestion/lib/ex_cubic_ingestion/workers/schedule_dmap.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
defmodule ExCubicIngestion.Workers.ScheduleDmap do
@moduledoc """
This worker will be run daily, and it will queue up fetch DMAP jobs for
each active feed.
"""

use Oban.Worker,
queue: :schedule_dmap,
max_attempts: 1

alias ExCubicIngestion.Repo
alias ExCubicIngestion.Schema.CubicDmapFeed
alias ExCubicIngestion.Workers.FetchDmap

require Oban
require Oban.Job

@impl Oban.Worker
def perform(_job) do
Repo.transaction(fn ->
Enum.each(CubicDmapFeed.all(), &Oban.insert(FetchDmap.new(%{feed_id: &1.id})))
end)

:ok
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
defmodule ExCubicIngestion.Repo.Migrations.AddUseTransactionDmap do
use Ecto.Migration

alias ExCubicIngestion.Repo
alias ExCubicIngestion.Schema.CubicDmapFeed
alias ExCubicIngestion.Schema.CubicTable

def up do
create unique_index("cubic_dmap_feeds", :relative_url)

Repo.insert!(%CubicDmapFeed{
relative_url: "/controlledresearchusersapi/transactional/use_transaction_location",
last_updated_at: ~U[2022-06-01 00:00:00.000000Z]
})

Repo.insert!(%CubicTable{
name: "cubic_dmap__use_transaction_location",
s3_prefix: "cubic/dmap/use_transaction_location/"
})
end

def down do
Repo.delete!(CubicTable.get_by!(
name: "cubic_dmap__use_transaction_location"
))

Repo.delete!(CubicDmapFeed.get_by!(
relative_url: "/controlledresearchusersapi/transactional/use_transaction_location"
))

drop index("cubic_dmap_feeds", [:relative_url])
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,36 @@ defmodule ExCubicIngestion.Schema.CubicDmapFeedTest do
alias ExCubicIngestion.Schema.CubicDmapDataset
alias ExCubicIngestion.Schema.CubicDmapFeed

describe "get_by!/2" do
test "getting only items that are not deleted or error" do
dmap_feed =
Repo.insert!(%CubicDmapFeed{
relative_url: "/controlledresearchusersapi/transactional/sample1"
})

# insert deleted record
Repo.insert!(%CubicDmapFeed{
relative_url: "/controlledresearchusersapi/transactional/sample2",
deleted_at: ~U[2022-01-01 20:50:50Z]
})

assert dmap_feed ==
CubicDmapFeed.get_by!(
relative_url: "/controlledresearchusersapi/transactional/sample1"
)

assert_raise Ecto.NoResultsError, fn ->
CubicDmapFeed.get_by!(relative_url: "/controlledresearchusersapi/transactional/sample2")
end

assert_raise Ecto.NoResultsError, fn ->
CubicDmapFeed.get_by!(
relative_url: "/controlledresearchusersapi/transactional/does_not_exist"
)
end
end
end

describe "update_last_updated_for_feed/2" do
test "update with the latest updated dataset" do
dmap_feed =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
defmodule ExCubicIngestion.Workers.ScheduleDmapTest do
use ExCubicIngestion.DataCase, async: true
use Oban.Testing, repo: ExCubicIngestion.Repo

alias ExCubicIngestion.Schema.CubicDmapFeed
alias ExCubicIngestion.Workers.FetchDmap
alias ExCubicIngestion.Workers.ScheduleDmap

describe "perform/1" do
test "run job without error" do
Repo.insert!(%CubicDmapFeed{
relative_url: "/controlledresearchusersapi/sample1"
})

assert :ok == perform_job(ScheduleDmap, %{})
end

test "fetch dmap jobs are queued" do
dmap_feed_1 =
Repo.insert!(%CubicDmapFeed{
relative_url: "/controlledresearchusersapi/sample1"
})

dmap_feed_2 =
Repo.insert!(%CubicDmapFeed{
relative_url: "/controlledresearchusersapi/sample2"
})

dmap_feed_deleted =
Repo.insert!(%CubicDmapFeed{
relative_url: "/deleted",
deleted_at: ~U[2022-05-01 10:49:50Z]
})

:ok = perform_job(ScheduleDmap, %{})

assert_enqueued(worker: FetchDmap, args: %{feed_id: dmap_feed_1.id})

assert_enqueued(worker: FetchDmap, args: %{feed_id: dmap_feed_2.id})

refute_enqueued(worker: FetchDmap, args: %{feed_id: dmap_feed_deleted.id})
end
end
end

0 comments on commit b34a5e1

Please sign in to comment.