Skip to content

Commit

Permalink
mix format
Browse files Browse the repository at this point in the history
  • Loading branch information
fredwu committed Oct 12, 2021
1 parent d67f424 commit b108123
Show file tree
Hide file tree
Showing 10 changed files with 90 additions and 82 deletions.
3 changes: 3 additions & 0 deletions .formatter.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"]
]
17 changes: 9 additions & 8 deletions lib/opq.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ defmodule OPQ do

def enqueue(feeder, mod, fun, args)
when is_atom(mod) and
is_atom(fun) and
is_list(args) do
is_atom(fun) and
is_list(args) do
enqueue(feeder, {mod, fun, args})
end

Expand All @@ -29,32 +29,33 @@ defmodule OPQ do
Opt.stop(feeder)
end

def pause(feeder), do: GenStage.cast(feeder, :pause)
def pause(feeder), do: GenStage.cast(feeder, :pause)
def resume(feeder), do: GenStage.cast(feeder, :resume)
def info(feeder), do: GenStage.call(feeder, :info, Opt.timeout(feeder))
def info(feeder), do: GenStage.call(feeder, :info, Opt.timeout(feeder))

defp start_links(opts) do
{:ok, feeder} = Feeder.start_link(opts[:name])
{:ok, feeder} = Feeder.start_link(opts[:name])

Opt.save_opts(opts[:name] || feeder, opts)

opts
|> Keyword.merge([name: feeder])
|> Keyword.merge(name: feeder)
|> start_consumers(interval: opts[:interval])

{:ok, feeder}
end

defp start_consumers(opts, interval: 0) do
opts
|> Keyword.merge([producer_consumer: opts[:name]])
|> Keyword.merge(producer_consumer: opts[:name])
|> WorkerSupervisor.start_link()
end

defp start_consumers(opts, _) do
{:ok, rate_limiter} = RateLimiter.start_link(opts)

opts
|> Keyword.merge([producer_consumer: rate_limiter])
|> Keyword.merge(producer_consumer: rate_limiter)
|> WorkerSupervisor.start_link()
end
end
5 changes: 3 additions & 2 deletions lib/opq/feeder.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ defmodule OPQ.Feeder do

use GenStage

def start_link(nil), do: GenStage.start_link(__MODULE__, :ok)
def start_link(nil), do: GenStage.start_link(__MODULE__, :ok)
def start_link(name), do: GenStage.start_link(__MODULE__, :ok, name: name)

def init(:ok) do
{:producer, {:normal, :queue.new, 0}}
{:producer, {:normal, :queue.new(), 0}}
end

def handle_cast(:stop, state) do
Expand Down Expand Up @@ -58,6 +58,7 @@ defmodule OPQ.Feeder do
case :queue.out(queue) do
{{:value, event}, queue} ->
dispatch_events(status, queue, demand - 1, [event | events])

{:empty, queue} ->
{:noreply, Enum.reverse(events), {status, queue, demand}}
end
Expand Down
27 changes: 15 additions & 12 deletions lib/opq/options.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ defmodule OPQ.Options do
Options for configuring OPQ.
"""

@worker OPQ.Worker
@workers 10
@worker OPQ.Worker
@workers 10
@interval 0
@timeout 5_000
@timeout 5_000

@doc """
## Examples
Expand All @@ -18,16 +18,19 @@ defmodule OPQ.Options do
4
"""
def assign_defaults(opts) do
Keyword.merge([
worker: worker(),
workers: workers(),
interval: interval(),
timeout: timeout(),
], opts)
Keyword.merge(
[
worker: worker(),
workers: workers(),
interval: interval(),
timeout: timeout()
],
opts
)
end

defp worker(), do: Application.get_env(:opq, :worker, @worker)
defp workers(), do: Application.get_env(:opq, :workers, @workers)
defp worker(), do: Application.get_env(:opq, :worker, @worker)
defp workers(), do: Application.get_env(:opq, :workers, @workers)
defp interval(), do: Application.get_env(:opq, :interval, @interval)
defp timeout(), do: Application.get_env(:opq, :timeout, @timeout)
defp timeout(), do: Application.get_env(:opq, :timeout, @timeout)
end
2 changes: 1 addition & 1 deletion lib/opq/options_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ defmodule OPQ.OptionsHandler do
def stop(feeder), do: Agent.stop(name(feeder))

defp load_opts(feeder), do: Agent.get(name(feeder), & &1)
defp name(feeder), do: :"opq-#{Kernel.inspect(feeder)}"
defp name(feeder), do: :"opq-#{Kernel.inspect(feeder)}"
end
4 changes: 2 additions & 2 deletions lib/opq/worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ defmodule OPQ.Worker do
Task.start_link(fn -> process_item(item) end)
end

defp process_item({mod, fun, args}), do: apply(mod, fun, args)
defp process_item({mod, fun, args}), do: apply(mod, fun, args)
defp process_item(item) when is_function(item), do: item.()
defp process_item(item), do: item
defp process_item(item), do: item
end
17 changes: 8 additions & 9 deletions lib/opq/worker_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,15 @@ defmodule OPQ.WorkerSupervisor do
%{id: opts[:worker], start: {opts[:worker], :start_link, []}, restart: :temporary}
]

cs_opts =
[
strategy: :one_for_one,
subscribe_to: [
{
opts[:producer_consumer],
min_demand: 0, max_demand: opts[:workers], timeout: opts[:timeout]
}
]
cs_opts = [
strategy: :one_for_one,
subscribe_to: [
{
opts[:producer_consumer],
min_demand: 0, max_demand: opts[:workers], timeout: opts[:timeout]
}
]
]

ConsumerSupervisor.init(children, cs_opts)
end
Expand Down
42 changes: 21 additions & 21 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,49 +3,49 @@ defmodule OPQ.Mixfile do

def project do
[
app: :opq,
version: "3.2.0",
elixir: "~> 1.5",
elixirc_paths: elixirc_paths(Mix.env),
package: package(),
name: "OPQ: One Pooled Queue",
description: "A simple, in-memory queue with worker pooling and rate limiting in Elixir.",
start_permanent: Mix.env == :prod,
deps: deps(),
test_coverage: [tool: ExCoveralls],
app: :opq,
version: "3.2.0",
elixir: "~> 1.5",
elixirc_paths: elixirc_paths(Mix.env()),
package: package(),
name: "OPQ: One Pooled Queue",
description: "A simple, in-memory queue with worker pooling and rate limiting in Elixir.",
start_permanent: Mix.env() == :prod,
deps: deps(),
test_coverage: [tool: ExCoveralls],
preferred_cli_env: [coveralls: :test],
aliases: [publish: ["hex.publish", &git_tag/1]],
aliases: [publish: ["hex.publish", &git_tag/1]]
]
end

def application do
[
extra_applications: [:logger],
extra_applications: [:logger]
]
end

defp elixirc_paths(:test), do: ["lib", "test/support"]
defp elixirc_paths(_), do: ["lib"]
defp elixirc_paths(_), do: ["lib"]

defp deps do
[
{:gen_stage, "~> 1.1"},
{:ex_doc, ">= 0.0.0", only: :dev},
{:excoveralls, "~> 0.14", only: :test},
{:gen_stage, "~> 1.1"},
{:ex_doc, ">= 0.0.0", only: :dev},
{:excoveralls, "~> 0.14", only: :test}
]
end

defp package do
[
maintainers: ["Fred Wu"],
licenses: ["MIT"],
links: %{"GitHub" => "https://github.com/fredwu/opq"}
licenses: ["MIT"],
links: %{"GitHub" => "https://github.com/fredwu/opq"}
]
end

defp git_tag(_args) do
System.cmd "git", ["tag", "v" <> Mix.Project.config[:version]]
System.cmd "git", ["push"]
System.cmd "git", ["push", "--tags"]
System.cmd("git", ["tag", "v" <> Mix.Project.config()[:version]])
System.cmd("git", ["push"])
System.cmd("git", ["push", "--tags"])
end
end
1 change: 1 addition & 0 deletions test/helpers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
defmodule Helpers do
def wait(fun), do: wait(500, fun)
def wait(0, fun), do: fun.()

def wait(timeout, fun) do
try do
fun.()
Expand Down
Loading

0 comments on commit b108123

Please sign in to comment.