Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PROTOTYPE] Add unnamed types support #115

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions DEVELOPMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ index 50b5b41a..535b7a0e 100755

# Bring up more containers
-docker-compose up --no-recreate -d schemaregistry connect control-center
+# FIXME
+# PATCHED
+# docker-compose up --no-recreate -d schemaregistry connect control-center
+docker-compose up --no-recreate -d schemaregistry

Expand All @@ -59,7 +59,7 @@ index 50b5b41a..535b7a0e 100755
-MAX_WAIT=240
-echo -e "\nWaiting up to $MAX_WAIT seconds for Connect to start"
-retry $MAX_WAIT host_check_up connect || exit 1
+# FIXME
+# PATCHED
+# MAX_WAIT=240
+# echo -e "\nWaiting up to $MAX_WAIT seconds for Connect to start"
+# retry $MAX_WAIT host_check_up connect || exit 1
Expand All @@ -68,7 +68,7 @@ index 50b5b41a..535b7a0e 100755

-echo -e "\nStart streaming from the Wikipedia SSE source connector:"
-${DIR}/connectors/submit_wikipedia_sse_config.sh || exit 1
+# FIXME
+# PATCHED
+# echo -e "\nStart streaming from the Wikipedia SSE source connector:"
+# ${DIR}/connectors/submit_wikipedia_sse_config.sh || exit 1

Expand Down Expand Up @@ -102,7 +102,7 @@ index 50b5b41a..535b7a0e 100755
-echo "Waiting up to $MAX_WAIT seconds for Confluent Control Center to start"
-retry $MAX_WAIT host_check_up control-center || exit 1
+# # Verify Confluent Control Center has started
+# FIXME
+# PATCHED
+# MAX_WAIT=300
+# echo
+# echo "Waiting up to $MAX_WAIT seconds for Confluent Control Center to start"
Expand All @@ -120,7 +120,7 @@ index 50b5b41a..535b7a0e 100755

-# Start more containers
-docker-compose up --no-recreate -d ksqldb-server ksqldb-cli restproxy
+# FIXME
+# PATCHED
+# # Start more containers
+# docker-compose up --no-recreate -d ksqldb-server ksqldb-cli restproxy

Expand Down Expand Up @@ -149,7 +149,7 @@ index 50b5b41a..535b7a0e 100755
-echo -e "\nStart additional consumers to read from topics WIKIPEDIANOBOT, WIKIPEDIA_COUNT_GT_1"
-${DIR}/consumers/listen_WIKIPEDIANOBOT.sh
-${DIR}/consumers/listen_WIKIPEDIA_COUNT_GT_1.sh
+# FIXME
+# PATCHED
+# echo -e "\nStart additional consumers to read from topics WIKIPEDIANOBOT, WIKIPEDIA_COUNT_GT_1"
+# ${DIR}/consumers/listen_WIKIPEDIANOBOT.sh
+# ${DIR}/consumers/listen_WIKIPEDIA_COUNT_GT_1.sh
Expand Down
2 changes: 1 addition & 1 deletion lib/avrora/codec.ex
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,6 @@ defmodule Avrora.Codec do
<<72, 48, 48, 48, 48, 48, 48, 48, 48, 45, 48, 48, 48, 48, 45, 48, 48, 48, 48, 45, 48, 48, 48,
48, 45, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 123, 20, 174, 71, 225, 250, 47, 64>>
"""
@callback encode(payload :: binary() | map(), options :: keyword(Avrora.Schema.t())) ::
@callback encode(payload :: term(), options :: keyword(Avrora.Schema.t())) ::
{:ok, result :: binary()} | {:error, reason :: term()}
end
2 changes: 1 addition & 1 deletion lib/avrora/codec/object_container_file.ex
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ defmodule Avrora.Codec.ObjectContainerFile do
end

@impl true
def encode(payload, schema: schema) when is_binary(payload) or is_map(payload) do
def encode(payload, schema: schema) do
with {:ok, schema} <- resolve(schema),
{:ok, body} <- Codec.Plain.encode(payload, schema: schema),
{:ok, schema} <- SchemaEncoder.to_erlavro(schema) do
Expand Down
2 changes: 1 addition & 1 deletion lib/avrora/codec/plain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ defmodule Avrora.Codec.Plain do
end

@impl true
def encode(payload, schema: schema) when is_binary(payload) or is_map(payload) do
def encode(payload, schema: schema) do
with {:ok, schema} <- resolve(schema), do: do_encode(payload, schema)
end

Expand Down
2 changes: 1 addition & 1 deletion lib/avrora/codec/schema_registry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ defmodule Avrora.Codec.SchemaRegistry do
end

@impl true
def encode(payload, schema: schema) when is_binary(payload) or is_map(payload) do
def encode(payload, schema: schema) do
with {:ok, schema} <- resolve(schema) do
schema = if is_nil(schema.id), do: {:error, :invalid_schema_id}, else: {:ok, schema}

Expand Down
118 changes: 59 additions & 59 deletions lib/avrora/schema/encoder.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule Avrora.Schema.Encoder do
alias Avrora.Schema.ReferenceCollector

@type reference_lookup_fun :: (String.t() -> {:ok, String.t()} | {:error, term()})
@undefined_name :undefined
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not much value

@reference_lookup_fun &__MODULE__.reference_lookup/1

@doc """
Expand All @@ -20,23 +21,27 @@ defmodule Avrora.Schema.Encoder do
iex> schema.full_name
"io.acme.Payment"
"""
@spec from_json(String.t(), reference_lookup_fun) :: {:ok, Schema.t()} | {:error, term()}
def from_json(payload, reference_lookup_fun \\ @reference_lookup_fun) when is_binary(payload) do
@spec from_json(String.t()) :: {:ok, Schema.t()} | {:error, term()}
def from_json(definition),
do: from_json(definition, name: @undefined_name, reference_lookup_fun: @reference_lookup_fun)

def from_json(definition, name: name),
do: from_json(definition, name: name, reference_lookup_fun: @reference_lookup_fun)

def from_json(definition, reference_lookup_fun: reference_lookup_fun),
do: from_json(definition, name: @undefined_name, reference_lookup_fun: reference_lookup_fun)

@spec from_json(String.t(), name: :undefined | String.t(), reference_lookup_fun: reference_lookup_fun()) ::
{:ok, Schema.t()} | {:error, term()}
def from_json(definition, name: name, reference_lookup_fun: reference_lookup_fun) do
lookup_table = ets().new()

with {:ok, [schema | _]} <- parse_recursive(payload, lookup_table, reference_lookup_fun),
{:ok, full_name} <- extract_full_name(schema),
{:ok, schema} <- do_compile(full_name, lookup_table) do
{
:ok,
%Schema{
id: nil,
version: nil,
full_name: full_name,
lookup_table: lookup_table,
json: to_json(schema)
}
}
with {:ok, full_name} <- parse_recursive(definition, name, lookup_table, reference_lookup_fun),
{:ok, erlavro} <- do_expand(full_name, lookup_table) do
# NOTE: It could be that json field will be moved to be a method because of
# schema registry support of references. OR we should care about how
# to calculate it
{:ok, %Schema{full_name: full_name, lookup_table: lookup_table, json: to_json(erlavro)}}
else
{:error, reason} ->
true = :ets.delete(lookup_table)
Expand Down Expand Up @@ -70,22 +75,13 @@ defmodule Avrora.Schema.Encoder do
"io.acme.Payment"
"""
@spec from_erlavro(term(), keyword()) :: {:ok, Schema.t()} | {:error, term()}
def from_erlavro(schema, attributes \\ []) do
def from_erlavro(erlavro, attributes \\ []) do
lookup_table = ets().new()

with {:ok, full_name} <- extract_full_name(schema),
lookup_table <- :avro_schema_store.add_type(schema, lookup_table),
json <- Keyword.get_lazy(attributes, :json, fn -> to_json(schema) end) do
{
:ok,
%Schema{
id: nil,
version: nil,
full_name: full_name,
lookup_table: lookup_table,
json: json
}
}
with {:ok, full_name} <- extract_full_name(erlavro),
lookup_table <- :avro_schema_store.add_type(erlavro, lookup_table),
json <- Keyword.get_lazy(attributes, :json, fn -> to_json(erlavro) end) do
{:ok, %Schema{full_name: full_name, lookup_table: lookup_table, json: json}}
else
{:error, reason} ->
true = :ets.delete(lookup_table)
Expand All @@ -108,48 +104,42 @@ defmodule Avrora.Schema.Encoder do
"""
@spec to_erlavro(Schema.t()) :: {:ok, term()} | {:error, term()}
def to_erlavro(%Schema{} = schema),
do: do_compile(schema.full_name, schema.lookup_table)

defp to_json(schema), do: :avro_json_encoder.encode_type(schema)

defp parse_recursive(payload, lookup_table, reference_lookup_fun) do
with {:ok, schema} <- do_parse(payload),
{:ok, _} <- extract_full_name(schema),
{:ok, references} <- ReferenceCollector.collect(schema),
lookup_table <- :avro_schema_store.add_type(schema, lookup_table) do
payloads =
references
|> Enum.reject(&:avro_schema_store.lookup_type(&1, lookup_table))
|> Enum.map(fn reference ->
reference |> reference_lookup_fun.() |> unwrap!()
end)

schemas =
Enum.flat_map(payloads, fn payload ->
payload |> parse_recursive(lookup_table, reference_lookup_fun) |> unwrap!()
end)

{:ok, [schema | schemas]}
do: do_expand(schema.full_name, schema.lookup_table)

defp to_json(erlavro), do: :avro_json_encoder.encode_type(erlavro)

defp unwrap!({:ok, result}), do: result
defp unwrap!({:error, error}), do: throw(error)

defp parse_recursive(definition, name, lookup_table, reference_lookup_fun) do
with {:ok, erlavro} <- do_parse(definition),
{:ok, references} <- ReferenceCollector.collect(erlavro),
{:ok, full_name} <- do_add_type(name, erlavro, lookup_table) do
references
|> Enum.reject(&:avro_schema_store.lookup_type(&1, lookup_table))
|> Enum.each(fn reference_name ->
reference_lookup_fun.(reference_name)
|> unwrap!()
|> parse_recursive(reference_name, lookup_table, reference_lookup_fun)
|> unwrap!()
end)

{:ok, full_name}
end
catch
error -> {:error, error}
end

defp unwrap!({:ok, result}), do: result
defp unwrap!({:error, error}), do: throw(error)

defp extract_full_name(schema) do
case schema do
defp extract_full_name(erlavro) do
case erlavro do
{:avro_fixed_type, _, _, _, _, full_name, _} -> {:ok, full_name}
{:avro_enum_type, _, _, _, _, _, full_name, _} -> {:ok, full_name}
{:avro_record_type, _, _, _, _, _, full_name, _} -> {:ok, full_name}
_ -> {:error, :unnamed_type}
end
end

# Compile complete version of the `erlavro` format with all references
# being resolved, converting errors to error return
defp do_compile(full_name, lookup_table) do
defp do_expand(full_name, lookup_table) do
{:ok, :avro_util.expand_type(full_name, lookup_table)}
rescue
_ in MatchError -> {:error, :bad_reference}
Expand All @@ -165,5 +155,15 @@ defmodule Avrora.Schema.Encoder do
error in ErlangError -> {:error, error.original}
end

defp do_add_type(name, erlavro, lookup_table) do
name = if :avro.is_named_type(erlavro), do: :undefined, else: name
full_name = if :avro.is_named_type(erlavro), do: :avro.get_type_fullname(erlavro), else: name

:avro_schema_store.add_type(name, erlavro, lookup_table)
{:ok, full_name}
rescue
error in ErlangError -> {:error, error.original}
end

defp ets, do: Config.self().ets_lib()
end
8 changes: 6 additions & 2 deletions lib/avrora/storage/file.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,19 @@ defmodule Avrora.Storage.File do
"""
@impl true
def get(key) when is_binary(key) do
with {:ok, body} <- read_schema_file_by_name(key),
do: SchemaEncoder.from_json(body, &read_schema_file_by_name/1)
with {:ok, schema_name} <- Name.parse(key),
{:ok, body} <- read_schema_file_by_name(key) do
SchemaEncoder.from_json(body, name: schema_name.name, reference_lookup_fun: &read_schema_file_by_name/1)
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a refactoring here, nested calls generate too much lookup table. Considering unnamed types, we need to overhaul how we store lookup tables and how we work with them.

end
end

@impl true
def get(key) when is_integer(key), do: {:error, :unsupported}

@impl true
def put(_key, _value), do: {:error, :unsupported}

# TODO: Move `Name.parse` outside of the method
defp read_schema_file_by_name(name) do
with {:ok, schema_name} <- Name.parse(name),
filepath <- name_to_filepath(schema_name.name) do
Expand Down
26 changes: 21 additions & 5 deletions lib/avrora/storage/registry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ defmodule Avrora.Storage.Registry do
{:ok, version} <- Map.fetch(response, "version"),
{:ok, schema} <- Map.fetch(response, "schema"),
{:ok, references} <- extract_references(response),
{:ok, schema} <- SchemaEncoder.from_json(schema, make_reference_lookup_fun(references)) do
{:ok, schema} <-
SchemaEncoder.from_json(schema, name: name, reference_lookup_fun: make_reference_lookup_fun(references)) do
Logger.debug("obtaining schema `#{schema_name.name}` with version `#{version}`")

{:ok, %{schema | id: id, version: version}}
Expand All @@ -51,10 +52,18 @@ defmodule Avrora.Storage.Registry do
with {:ok, response} <- http_client_get("schemas/ids/#{key}"),
{:ok, schema} <- Map.fetch(response, "schema"),
{:ok, references} <- extract_references(response),
{:ok, schema} <- SchemaEncoder.from_json(schema, make_reference_lookup_fun(references)) do
Logger.debug("obtaining schema with global id `#{key}`")

{:ok, %{schema | id: key}}
# TODO: Add note in the readme that:
# There is no such endpoint in registry versions < 5.5.0
{:ok, response} <- http_client_get("schemas/ids/#{key}/versions"),
{:ok, schema_name} <- extract_name(response),
{:ok, schema} <-
SchemaEncoder.from_json(schema,
name: schema_name.name,
reference_lookup_fun: make_reference_lookup_fun(references)
) do
Logger.debug("obtaining schema and version with global id `#{key}`")

{:ok, %{schema | id: key, version: schema_name.version}}
end
end

Expand Down Expand Up @@ -110,6 +119,13 @@ defmodule Avrora.Storage.Registry do
error -> {:error, error}
end

defp extract_name(response) do
case response do
[%{"subject" => name, "version" => version}] -> {:ok, %Name{name: name, version: version}}
_ -> {:error, :invalid_versions}
end
end

defp make_reference_lookup_fun(map) when map_size(map) == 0,
do: &SchemaEncoder.reference_lookup/1

Expand Down
43 changes: 43 additions & 0 deletions test/avrora/codec/plain_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,42 @@ defmodule Avrora.Codec.PlainTest do

assert encoded == "59B02128"
end

test "when payload is matching the Union schema and schema is resolvable" do
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: Add decoding test!

union_schema = union_schema()

Avrora.Storage.MemoryMock
|> expect(:get, fn key ->
assert key == "io.acme.Union"

{:ok, nil}
end)
|> expect(:put, fn key, value ->
assert key == "io.acme.Union"
assert value == union_schema

{:ok, value}
end)

Avrora.Storage.RegistryMock
|> expect(:put, fn key, value ->
assert key == "io.acme.Union"
assert value == union_json()

{:error, :unconfigured_registry_url}
end)

Avrora.Storage.FileMock
|> expect(:get, fn key ->
assert key == "io.acme.Union"

{:ok, union_schema}
end)

{:ok, encoded} = Codec.Plain.encode(123, schema: %Schema{full_name: "io.acme.Union"})

assert encoded == <<0, 246, 1>>
end
end

defp missing_field_error do
Expand Down Expand Up @@ -306,6 +342,11 @@ defmodule Avrora.Codec.PlainTest do
%{schema | id: nil, version: nil}
end

defp union_schema do
{:ok, schema} = Schema.Encoder.from_json(union_json(), name: "io.acme.Union")
%{schema | id: nil, version: nil}
end

defp payment_json do
~s({"namespace":"io.acme","name":"Payment","type":"record","fields":[{"name":"id","type":"string"},{"name":"amount","type":"double"}]})
end
Expand All @@ -329,4 +370,6 @@ defmodule Avrora.Codec.PlainTest do
defp fixed_json do
~s({"namespace":"io.acme","name":"CRC32","type":"fixed","size":8})
end

defp union_json, do: ~s(["int","string"])
end
Loading
Loading