diff --git a/.gitignore b/.gitignore index 4e80dc6..6eb978d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,10 +1,7 @@ -m +_build/ +*.beam .eunit -deps/* -ebin/* -rel/antidote dev -apps/antidote/src/* *.swp *.swo src/*.swo @@ -13,9 +10,16 @@ dialyzer_unhandled_warnings dialyzer_warnings doc/ .DS_Store -log/ +logs/ tags rel/vars/*.config -.rebar/* -*.dump - +rebar.lock +docs/_site/ +docs/.sass-cache/ +docs/.jekyll-metadata +docs/_pdf +docs/.idea/ +log/ +data/ +compile_commands.json +test/.rebar3/ \ No newline at end of file diff --git a/README.md b/README.md index e88e93c..638c9b3 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,13 @@ # AntidoteDB -Erlang Rebar application, used as the DataStore for [Antidote](https://github.com/SyncFree/antidote). This module exposes a simple API, so the datastore can be configurable and changed, without affecting Antidote code. +Erlang Rebar application, used as the backend DataStore for [Antidote](https://github.com/SyncFree/antidote). This module exposes a simple API, so the datastore can be configurable and changed, without affecting Antidote code. + +This project is structured as a rebar3 project, and today contains two modules: + ++ **antidote_db**: contains a basic API to interact with the DB. Today it only contains the methods for a eleveldb backend. ++ **leveldb_wrapper**: contains a wrapper of the API provided by eleveldb, with added code containing Antidote logic. + +All tests are under the respective *_SUITE modules, and should be run using `rebar3 ct`. ## ElevelDB @@ -8,3 +15,30 @@ Today, this application uses SyncFree [ElevelDB](https://github.com/SyncFree/ele As this application is intended to be used in [Antidote](https://github.com/SyncFree/antidote), it uses the 'Antidote Comparator' by default. +### Composition of keys + +Keys are composed as follows: + +`{antidote_key, max_value_in_vc, vc_hash, op/snap, vc}` + ++ **antidote_key**: Same key as in Antidote. ++ **max_value_in_vc**: The max value for all DCs in the VC. ++ **vc_hash**: A hash of the VC. This is done in order to provide a more performant sorting algorithm, since we don't need to compare all the VC, to find that keys are different. If the hash matches, we also compare the VC, just to make sure it's not a collision. ++ **op/snap**: an atom indicating if the stored value corresponds to an operation or a snapshot. ++ **vc**: the vc of when the op/snap ocurred. + +### Sorting of keys + +Keys get sorted first by the Antidote key. +Then we look at the MAX value of it's VC, sorting first the biggest one. We reverse the "natural" order, so we can have the most recent operations first. +If the max value for two keys matches, we sort taking into account the hash value. +For matching hashes, we check the rest of the key, sorting accordingly. + +### Testing + +As stated before, the modules have their corresponding *_SUITE modules for unit testing. The antidote_db module uses [meck](https://github.com/eproxus/meck) to mock the calls to eleveldb and the leveldb_wrapper. + +In addition to this tests, and thanks to @peterzeller, we have a Proper module to test the get_ops method. This module generates operations by simulating incrementing a +vectorclock and merging them. This tests can be run using "rebar3 proper -m prop_get_ops -n TESTS" where TESTS is the amount of random inputs that Proper should run with. + +This two modules have more than a 90% code coverage. diff --git a/rebar.config b/rebar.config new file mode 100644 index 0000000..e2e589e --- /dev/null +++ b/rebar.config @@ -0,0 +1,26 @@ +{deps, [ + {eleveldb, ".*", {git, "git://github.com/SyncFree/eleveldb", {branch, "max_order_comparator"}}}, + {antidote_utils, ".*", {git, "git://github.com/SyncFree/antidote_utils", {branch, "additions_for_antidote_db"}}}, + {meck, ".*", {git, "https://github.com/eproxus/meck.git", {tag, "0.8.3"}}} +]}. + +%% Required to assemble a native library like eleveldb when compiling with Rebar3 +{overrides, [ + {override, eleveldb, [ + {plugins, [pc]}, + {artifacts, ["priv/eleveldb.so"]}, + + {provider_hooks, [ + {post, [ + {compile, {pc, compile}}, + {clean, {pc, clean}} + ]} + ]} + ]} +]}. + +{plugins, [rebar3_proper]}. + +{profiles, + [{test, [{deps, [{proper, "1.1.1-beta"}]}]}] +}. diff --git a/src/antidote_db.app.src b/src/antidote_db.app.src new file mode 100644 index 0000000..8d0eb98 --- /dev/null +++ b/src/antidote_db.app.src @@ -0,0 +1,13 @@ +{application, antidote_db, + [ + {description, "Antidote DataBase application"}, + {vsn, "1"}, + {registered, []}, + {applications, [ + kernel, + stdlib, + eleveldb + ]}, + {mod, { antidote_db, []}}, + {env, []} + ]}. diff --git a/src/antidote_db.erl b/src/antidote_db.erl new file mode 100644 index 0000000..15f6dd0 --- /dev/null +++ b/src/antidote_db.erl @@ -0,0 +1,114 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2014 SyncFree Consortium. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(antidote_db). + +-include_lib("antidote_utils/include/antidote_utils.hrl"). + +-export([ + new/2, + close_and_destroy/2, + close/1, + get_snapshot/3, + put_snapshot/3, + get_ops/4, + put_op/4]). + +-type antidote_db() :: {leveldb, eleveldb:db_ref()}. + +-type antidote_db_type() :: leveldb. + +-export_type([antidote_db/0, antidote_db_type/0]). + +%% Given a name, returns a new AntidoteDB (for now, only ElevelDB is supported) +%% OpenOptions are set to use Antidote special comparator in the case of Eleveldb +-spec new(atom(), antidote_db_type()) -> {ok, antidote_db()} | {error, any()}. +new(Name, Type) -> + case Type of + leveldb -> + {ok, Ref} = eleveldb:open(Name, [{create_if_missing, true}, {antidote, true}]), + {ok, {leveldb, Ref}}; + _ -> + {error, type_not_supported} + end. + + +%% Closes and destroys the given base +-spec close_and_destroy(antidote_db(), atom()) -> ok | {error, any()}. +close_and_destroy({Type, DB}, Name) -> + case Type of + leveldb -> + eleveldb:close(DB), + eleveldb:destroy(Name, []); + _ -> + {error, type_not_supported} + end. + +-spec close(antidote_db()) -> ok | {error, any()}. +close({Type, DB}) -> + case Type of + leveldb -> + eleveldb:close(DB); + _ -> + {error, type_not_supported} + end. + +%% Gets the most suitable snapshot for Key that has been committed +%% before CommitTime. If its nothing is found, returns {error, not_found} +-spec get_snapshot(antidote_db:antidote_db(), key(), + snapshot_time()) -> {ok, #materialized_snapshot{}} | {error, not_found}. +get_snapshot({Type, DB}, Key, CommitTime) -> + case Type of + leveldb -> + leveldb_wrapper:get_snapshot(DB, Key, CommitTime); + _ -> + {error, type_not_supported} + end. + +%% Saves the snapshot into AntidoteDB +-spec put_snapshot(antidote_db:antidote_db(), key(), #materialized_snapshot{}) -> ok | error. +put_snapshot({Type, DB}, Key, Snapshot) -> + case Type of + leveldb -> + leveldb_wrapper:put_snapshot(DB, Key, Snapshot); + _ -> + {error, type_not_supported} + end. + +%% Returns a list of operations that have commit time in the range [VCFrom, VCTo] +-spec get_ops(antidote_db:antidote_db(), key(), vectorclock(), vectorclock()) -> [#log_record{}]. +get_ops({Type, DB}, Key, VCFrom, VCTo) -> + case Type of + leveldb -> + leveldb_wrapper:get_ops(DB, Key, VCFrom, VCTo); + _ -> + {error, type_not_supported} + end. + + +%% Saves the operation into AntidoteDB +-spec put_op(antidote_db:antidote_db(), key(), vectorclock(), #log_record{}) -> ok | error. +put_op({Type, DB}, Key, VC, Record) -> + case Type of + leveldb -> + leveldb_wrapper:put_op(DB, Key, VC, Record); + _ -> + {error, type_not_supported} + end. \ No newline at end of file diff --git a/src/antidote_db_sup.erl b/src/antidote_db_sup.erl new file mode 100644 index 0000000..afb94da --- /dev/null +++ b/src/antidote_db_sup.erl @@ -0,0 +1,45 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2014 SyncFree Consortium. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- +-module(antidote_db_sup). + +-behaviour(supervisor). + +%% API +-export([start_link/0]). + +%% Supervisor callbacks +-export([init/1]). + +%% Helper macro for declaring children of supervisor +-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}). + +%% =================================================================== +%% API functions +%% =================================================================== + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +%% =================================================================== +%% Supervisor callbacks +%% =================================================================== + +init([]) -> + {ok, { {one_for_one, 5, 10}, []} }. diff --git a/src/leveldb_wrapper.erl b/src/leveldb_wrapper.erl new file mode 100644 index 0000000..7ae92ed --- /dev/null +++ b/src/leveldb_wrapper.erl @@ -0,0 +1,199 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2014 SyncFree Consortium. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- +-module(leveldb_wrapper). + +-include_lib("antidote_utils/include/antidote_utils.hrl"). + +-export([ + get_snapshot/3, + put_snapshot/3, + get_ops/4, + put_op/4]). + +%% Gets the most suitable snapshot for Key that has been committed +%% before CommitTime. If no snapshot is found, returns {error, not_found} +-spec get_snapshot(eleveldb:db_ref(), key(), snapshot_time()) -> + {ok, #materialized_snapshot{}} | {error, not_found}. +get_snapshot(DB, Key, CommitTime) -> + StartingTime = get_max_time_in_VC(CommitTime), + try + eleveldb:fold(DB, + fun({K, V}, AccIn) -> + {Key1, _MAX, _HASH, SNAP, VC} = binary_to_term(K), + case (Key1 == Key) of %% check same key + true -> + %% check it's a snapshot and has time less than the one required + case (SNAP == snap) and + not vectorclock:gt(vectorclock:from_list(VC), CommitTime) of + true -> + Snapshot = binary_to_term(V), + throw({break, Snapshot}); + _ -> + AccIn + end; + false -> + throw({break}) + end + end, + [], + [{first_key, term_to_binary({Key, StartingTime})}]), + {error, not_found} + catch + {break, SNAP} -> + {ok, SNAP}; + _ -> + {error, not_found} + end. + +%% Saves the snapshot into the DB +-spec put_snapshot(antidote_db:antidote_db(), key(), #materialized_snapshot{}) -> ok | error. +put_snapshot(DB, Key, Snapshot) -> + VCDict = vectorclock_to_dict(Snapshot#materialized_snapshot.snapshot_time), + put(DB, {binary_to_atom(Key), get_max_time_in_VC(VCDict), + erlang:phash2(VCDict), snap, vectorclock_to_list(VCDict)}, Snapshot). + +%% Returns a list of operations that have commit time in the range [VCFrom, VCTo). +%% In other words, it returns all ops which have a VectorClock concurrent or larger than VCFrom, +%% and smaller or equal (for all entries) than VCTo. +%% Examples of what this method returns, can be seen in the tests. +-spec get_ops(eleveldb:db_ref(), key(), vectorclock(), vectorclock()) -> [#log_record{}]. +get_ops(DB, Key, VCFrom, VCTo) -> + %% Convert the VCs passed in to Dicts (if necessary) + VCFromDict = vectorclock_to_dict(VCFrom), + VCToDict = vectorclock_to_dict(VCTo), + + %% Calculate the min time in the VCs that compose the search range + MinTimeToSearch = get_min_time_in_VCs(VCFromDict, VCToDict), + + %% Get the max time in the lower upper bound so the fold starts from keys that have that max value + StartingTime = get_max_time_in_VC(VCToDict), + try + eleveldb:fold(DB, + fun({K, V}, AccIn) -> + {Key1, MAX, _HASH, OP, VC1} = binary_to_term(K), + VC1Dict = vectorclock:from_list(VC1), + case Key == Key1 of + true -> + %% Check that the MinTimeToSearch is smaller than the MAX value of VC1 + case MinTimeToSearch =< MAX of + true -> + %% Check VC in range and the DB entry corresponds to an OP + case vc_in_range(VC1Dict, VCFromDict, VCToDict) and (OP == op) of + true -> + [binary_to_term(V) | AccIn]; + false -> + AccIn + end; + false -> %% All entries of VC1 are smaller than the MinTime to search + throw({break, AccIn}) + end; + false -> %% Not the same key we are looking for + throw({break, AccIn}) + end + end, + [], + [{first_key, term_to_binary({Key, StartingTime})}]) + catch + {break, OPS} -> + OPS; + _ -> + [] + end. + +%% If VCs have the same keys, returns the min time found, +%% otherwise returns 0. +get_min_time_in_VCs(VC1, VC2) -> + case same_keys_in_vcs(VC1, VC2) of + true -> + min(get_min_time_in_VC(VC1), get_min_time_in_VC(VC2)); + false -> + 0 + end. + +%% Checks that the 2 VCs passed in, have the same keys +same_keys_in_vcs(VC1, VC2) -> + dict:fetch_keys(VC1) == dict:fetch_keys(VC2). + + +get_min_time_in_VC(VC) -> + dict:fold(fun find_min_value/3, undefined, VC). + +get_max_time_in_VC(VC) -> + dict:fold(fun find_max_value/3, undefined, VC). + +find_min_value(_Key, Value, Acc) -> + case Acc of + undefined -> + Value; + _ -> + min(Value, Acc) + end. + +find_max_value(_Key, Value, Acc) -> + case Acc of + undefined -> + Value; + _ -> + max(Value, Acc) + end. + +%% Returns true if the VC is in the required range +vc_in_range(VC, VCFrom, VCTo) -> + not vectorclock:lt(VC, VCFrom) and vectorclock:le(VC, VCTo). + +%% Saves the operation into the DB +-spec put_op(eleveldb:db_ref(), key(), vectorclock(), #log_record{}) -> ok | error. +put_op(DB, Key, VC, Record) -> + VCDict = vectorclock_to_dict(VC), + put(DB, {binary_to_atom(Key), get_max_time_in_VC(VCDict), + erlang:phash2(VCDict), op, vectorclock_to_list(VC)}, Record). + +vectorclock_to_dict(VC) -> + case is_list(VC) of + true -> vectorclock:from_list(VC); + false -> VC + end. + +vectorclock_to_list(VC) -> + case is_list(VC) of + true -> VC; + false -> dict:to_list(VC) + end. + +%% Workaround for basho bench +%% TODO find a better solution to this +binary_to_atom(Key) -> + case is_binary(Key) of + true -> list_to_atom(integer_to_list(binary_to_integer(Key))); + false -> Key + end. + +%% @doc puts the Value associated to Key in eleveldb +-spec put(eleveldb:db_ref(), any(), any()) -> ok | {error, any()}. +put(DB, Key, Value) -> + AKey = case is_binary(Key) of + true -> Key; + false -> term_to_binary(Key) + end, + ATerm = case is_binary(Value) of + true -> Value; + false -> term_to_binary(Value) + end, + eleveldb:put(DB, AKey, ATerm, []). diff --git a/test/antidote_db_SUITE.erl b/test/antidote_db_SUITE.erl new file mode 100644 index 0000000..1d5e887 --- /dev/null +++ b/test/antidote_db_SUITE.erl @@ -0,0 +1,121 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2014 SyncFree Consortium. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- +-module(antidote_db_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("antidote_utils/include/antidote_utils.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-export([all/0]). +-export([wrong_types_passed_in/1, + leveldb_get_snapshot/1, + leveldb_put_snapshot/1, + leveldb_get_op/1, + leveldb_put_op/1]). + +all() -> [wrong_types_passed_in, leveldb_get_snapshot, leveldb_put_snapshot, leveldb_get_op, leveldb_put_op]. + +wrong_types_passed_in(_Config) -> + ?assertEqual({error, type_not_supported}, antidote_db:new("TEST", type)), + ?assertEqual({error, type_not_supported}, antidote_db:close_and_destroy({type, db}, name)), + ?assertEqual({error, type_not_supported}, antidote_db:close({type, db})), + ?assertEqual({error, type_not_supported}, antidote_db:get_snapshot({type, db}, key, [])), + ?assertEqual({error, type_not_supported}, antidote_db:put_snapshot({type, db}, key, [])), + ?assertEqual({error, type_not_supported}, antidote_db:get_ops({type, db}, key, [], [])), + ?assertEqual({error, type_not_supported}, antidote_db:put_op({type, db}, key, [], [])). + +%% This tests stopped working in Erlang 18+ because of a problem with Meck and eleveldb dependency +%% +%%leveldb_new(_Config) -> +%% Name = "test_db", +%% Type = leveldb, +%% Ref = ref, +%% meck:new(eleveldb), +%% meck:expect(eleveldb, open, fun(_Name, _Op) -> {ok, Ref} end), +%% ?assertEqual({ok, {leveldb, Ref}}, antidote_db:new(Name, Type)), +%% ?assert(meck:validate(eleveldb)), +%% meck:unload(eleveldb). +%% +%%leveldb_close_and_destroy(_Config) -> +%% DB = ref, +%% AntidoteDB = {leveldb, DB}, +%% Name = "test_db", +%% meck:new(eleveldb), +%% meck:expect(eleveldb, close, fun(_DB) -> ok end), +%% meck:expect(eleveldb, destroy, fun(_DB, _Name) -> ok end), +%% ?assertEqual(ok, antidote_db:close_and_destroy(AntidoteDB, Name)), +%% ?assert(meck:validate(eleveldb)), +%% meck:unload(eleveldb). +%% +%%leveldb_close(_Config) -> +%% DB = ref, +%% AntidoteDB = {leveldb, DB}, +%% meck:new(eleveldb), +%% meck:expect(eleveldb, close, fun(_DB) -> ok end), +%% ?assertEqual(ok, antidote_db:close(AntidoteDB)), +%% ?assert(meck:validate(eleveldb)), +%% meck:unload(eleveldb). + +leveldb_get_snapshot(_Config) -> + DB = ref, + AntidoteDB = {leveldb, DB}, + Key = key, + CommitTime = ct, + Snapshot = {ok, snapshot}, + meck:new(leveldb_wrapper), + meck:expect(leveldb_wrapper, get_snapshot, fun(_DB, _Key, _CommitTime) -> Snapshot end), + ?assertEqual(Snapshot, antidote_db:get_snapshot(AntidoteDB, Key, CommitTime)), + ?assert(meck:validate(leveldb_wrapper)), + meck:unload(leveldb_wrapper). + +leveldb_put_snapshot(_Config) -> + DB = ref, + AntidoteDB = {leveldb, DB}, + Key = key, + Snapshot = snapshot, + meck:new(leveldb_wrapper), + meck:expect(leveldb_wrapper, put_snapshot, fun(_DB, _Key, _Snapshot) -> ok end), + ?assertEqual(ok, antidote_db:put_snapshot(AntidoteDB, Key, Snapshot)), + ?assert(meck:validate(leveldb_wrapper)), + meck:unload(leveldb_wrapper). + +leveldb_get_op(_Config) -> + DB = ref, + AntidoteDB = {leveldb, DB}, + Key = key, + CommitTime = ct, + OPS = [op1, op2], + meck:new(leveldb_wrapper), + meck:expect(leveldb_wrapper, get_ops, fun(_DB, _Key, _CommitTime, _CommitTime) -> OPS end), + ?assertEqual(OPS, antidote_db:get_ops(AntidoteDB, Key, CommitTime, CommitTime)), + ?assert(meck:validate(leveldb_wrapper)), + meck:unload(leveldb_wrapper). + +leveldb_put_op(_Config) -> + DB = ref, + AntidoteDB = {leveldb, DB}, + Key = key, + CommitTime = ct, + OP = op, + meck:new(leveldb_wrapper), + meck:expect(leveldb_wrapper, put_op, fun(_DB, _Key, _CommitTime, _OP) -> ok end), + ?assertEqual(ok, antidote_db:put_op(AntidoteDB, Key, CommitTime, OP)), + ?assert(meck:validate(leveldb_wrapper)), + meck:unload(leveldb_wrapper). \ No newline at end of file diff --git a/test/eleveldb_wrapper_SUITE.erl b/test/eleveldb_wrapper_SUITE.erl new file mode 100644 index 0000000..28f8aeb --- /dev/null +++ b/test/eleveldb_wrapper_SUITE.erl @@ -0,0 +1,229 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2014 SyncFree Consortium. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- +-module(eleveldb_wrapper_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("antidote_utils/include/antidote_utils.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-export([all/0, withFreshDb/1]). +-export([get_snapshot_matching_vc/1, + get_snapshot_not_matching_vc/1, + get_operations_empty_result/1, + get_operations_non_empty/1, + operations_and_snapshots_mixed/1, + concurrent_ops/1, + smallest_op_returned/1, + concurrent_op_lower_than_search_range/1]). + +all() -> [get_snapshot_matching_vc, get_snapshot_not_matching_vc, get_operations_empty_result, + get_operations_non_empty, operations_and_snapshots_mixed, concurrent_ops, smallest_op_returned, + concurrent_op_lower_than_search_range]. + +withFreshDb(F) -> + %% Destroy the test DB to prevent having dirty DBs if a test fails + eleveldb:destroy("test_db", []), + {ok, DB} = eleveldb:open("test_db", [{create_if_missing, true}, {antidote, true}]), + try + F(DB) + after + eleveldb:close(DB), + eleveldb:destroy("test_db", []) + end. + +get_snapshot_matching_vc(_Config) -> + withFreshDb(fun(DB) -> + + Key = key, + put_n_snapshots(DB, Key, 10), + + %% Get some of the snapshots inserted (matching VC) + {ok, S1} = leveldb_wrapper:get_snapshot(DB, Key, vectorclock:from_list([{local, 1}, {remote, 1}])), + {ok, S2} = leveldb_wrapper:get_snapshot(DB, Key, vectorclock:from_list([{local, 4}, {remote, 4}])), + {ok, S3} = leveldb_wrapper:get_snapshot(DB, Key, vectorclock:from_list([{local, 8}, {remote, 8}])), + + ?assertEqual([{local, 1}, {remote, 1}], vectorclock_to_sorted_list(S1#materialized_snapshot.snapshot_time)), + ?assertEqual(1, S1#materialized_snapshot.value), + + ?assertEqual([{local, 4}, {remote, 4}], vectorclock_to_sorted_list(S2#materialized_snapshot.snapshot_time)), + ?assertEqual(4, S2#materialized_snapshot.value), + + ?assertEqual([{local, 8}, {remote, 8}], vectorclock_to_sorted_list(S3#materialized_snapshot.snapshot_time)), + ?assertEqual(8, S3#materialized_snapshot.value) + end). + +get_snapshot_not_matching_vc(_Config) -> + withFreshDb(fun(DB) -> + Key = key, + + %% Add 3 snapshots + VC = vectorclock:from_list([{local, 4}, {remote, 4}]), + leveldb_wrapper:put_snapshot(DB, Key, #materialized_snapshot{snapshot_time = VC, value = 4}), + + VC1 = vectorclock:from_list([{local, 2}, {remote, 3}]), + leveldb_wrapper:put_snapshot(DB, Key, #materialized_snapshot{snapshot_time = VC1, value = 2}), + + VC2 = vectorclock:from_list([{local, 8}, {remote, 7}]), + leveldb_wrapper:put_snapshot(DB, Key, #materialized_snapshot{snapshot_time = VC2, value = 8}), + + %% Request the snapshots using a VC different than the one used to insert them + S4 = leveldb_wrapper:get_snapshot(DB, Key, vectorclock:from_list([{local, 1}, {remote, 0}])), + {ok, S5} = leveldb_wrapper:get_snapshot(DB, Key, vectorclock:from_list([{local, 6}, {remote, 5}])), + {ok, S6} = leveldb_wrapper:get_snapshot(DB, Key, vectorclock:from_list([{local, 8}, {remote, 9}])), + + ?assertEqual({error, not_found}, S4), + + ?assertEqual([{local, 4}, {remote, 4}], vectorclock_to_sorted_list(S5#materialized_snapshot.snapshot_time)), + ?assertEqual(4, S5#materialized_snapshot.value), + + ?assertEqual([{local, 8}, {remote, 7}], vectorclock_to_sorted_list(S6#materialized_snapshot.snapshot_time)), + ?assertEqual(8, S6#materialized_snapshot.value) + end). + +get_operations_empty_result(_Config) -> + withFreshDb(fun(DB) -> + Key = key, + Key1 = key1, + + %% Nothing in the DB returns an empty list + O1 = leveldb_wrapper:get_ops(DB, Key, [{local, 2}, {remote, 2}], [{local, 8}, {remote, 9}]), + ?assertEqual([], O1), + + %% Insert some operations + put_n_operations(DB, Key, 10), + + %% Requesting for ops in a range with noting, returns an empty list + O2 = leveldb_wrapper:get_ops(DB, Key, [{local, 123}, {remote, 100}], [{local, 200}, {remote, 124}]), + ?assertEqual([], O2), + + %% Getting a key not present, returns an empty list + O3 = leveldb_wrapper:get_ops(DB, Key1, [{local, 2}, {remote, 2}], [{local, 4}, {remote, 5}]), + ?assertEqual([], O3), + + %% Searching for the same range returns an empty list + O4 = leveldb_wrapper:get_ops(DB, Key1, [{local, 2}, {remote, 2}], [{local, 2}, {remote, 2}]), + ?assertEqual([], O4) + end). + +get_operations_non_empty(_Config) -> + withFreshDb(fun(DB) -> + %% Fill the DB with values + Key = key, + Key1 = key1, + Key2 = key2, + + put_n_operations(DB, Key, 100), + put_n_operations(DB, Key1, 10), + put_n_operations(DB, Key2, 25), + + O1 = leveldb_wrapper:get_ops(DB, Key1, [{local, 2}, {remote, 2}], [{local, 8}, {remote, 9}]), + ?assertEqual([2, 3, 4, 5, 6, 7, 8], filter_records_into_sorted_numbers(O1)), + + O2 = leveldb_wrapper:get_ops(DB, Key1, [{local, 4}, {remote, 5}], [{local, 7}, {remote, 7}]), + ?assertEqual([5, 6, 7], filter_records_into_sorted_numbers(O2)) + end). + +operations_and_snapshots_mixed(_Config) -> + withFreshDb(fun(DB) -> + Key = key, + Key1 = key1, + Key2 = key2, + + VCTo = [{local, 7}, {remote, 8}], + put_n_operations(DB, Key, 10), + put_n_operations(DB, Key1, 20), + leveldb_wrapper:put_snapshot(DB, Key1, #materialized_snapshot{snapshot_time = [{local, 2}, {remote, 3}], value = 5}), + put_n_operations(DB, Key2, 8), + + %% We want all ops for Key1 that are between the snapshot and + %% [{local, 7}, {remote, 8}]. First get the snapshot, then OPS. + {ok, Snapshot} = leveldb_wrapper:get_snapshot(DB, Key1, vectorclock:from_list(VCTo)), + ?assertEqual([{local, 2}, {remote, 3}], vectorclock_to_sorted_list(Snapshot#materialized_snapshot.snapshot_time)), + ?assertEqual(5, Snapshot#materialized_snapshot.value), + + O1 = leveldb_wrapper:get_ops(DB, Key1, Snapshot#materialized_snapshot.snapshot_time, VCTo), + ?assertEqual([3, 4, 5, 6, 7], filter_records_into_sorted_numbers(O1)) + end). + +%% This test inserts 5 ops, 4 of them are concurrent, and checks that only the first and two of the concurrent are +%% returned, since they are the only ones that match the requested ranged passed in +concurrent_ops(_Config) -> + withFreshDb(fun(DB) -> + ok = leveldb_wrapper:put_op(DB, d, [{dc1, 10}, {dc2, 14}, {dc3, 3}], #log_record{version = 1}), + ok = leveldb_wrapper:put_op(DB, d, [{dc1, 9}, {dc2, 12}, {dc3, 1}], #log_record{version = 2}), + ok = leveldb_wrapper:put_op(DB, d, [{dc1, 7}, {dc2, 2}, {dc3, 12}], #log_record{version = 3}), + ok = leveldb_wrapper:put_op(DB, d, [{dc1, 5}, {dc2, 2}, {dc3, 10}], #log_record{version = 4}), + ok = leveldb_wrapper:put_op(DB, d, [{dc1, 1}, {dc2, 1}, {dc3, 1}], #log_record{version = 5}), + + OPS = leveldb_wrapper:get_ops(DB, d, [{dc1, 10}, {dc2, 17}, {dc3, 2}], [{dc1, 12}, {dc2, 20}, {dc3, 18}]), + + ?assertEqual([1, 3, 4], filter_records_into_sorted_numbers(OPS)) + end). + +smallest_op_returned(_Config) -> + withFreshDb(fun(DB) -> + ok = leveldb_wrapper:put_op(DB, key, [{dc3, 1}], #log_record{version = 4}), + ok = leveldb_wrapper:put_op(DB, key, [{dc2, 4}, {dc3, 1}], #log_record{version = 3}), + ok = leveldb_wrapper:put_op(DB, key, [{dc2, 2}], #log_record{version = 2}), + ok = leveldb_wrapper:put_op(DB, key, [{dc2, 1}], #log_record{version = 1}), + + OPS = leveldb_wrapper:get_ops(DB, key, [{dc2, 3}], [{dc2, 3}, {dc3, 1}]), + + ?assertEqual([4], filter_records_into_sorted_numbers(OPS)) + end). + +%% This test ensures that the break condition doesn't stop in the min value of DCS +%% if the VCs in the search range, have different keyset. +concurrent_op_lower_than_search_range(_Config) -> + withFreshDb(fun(DB) -> + ok = leveldb_wrapper:put_op(DB, key, [{dc3, 2}], #log_record{version = 4}), + ok = leveldb_wrapper:put_op(DB, key, [{dc2, 2}], #log_record{version = 3}), + ok = leveldb_wrapper:put_op(DB, key, [{dc1, 3}, {dc2, 5}], #log_record{version = 2}), + ok = leveldb_wrapper:put_op(DB, key, [{dc1, 3}], #log_record{version = 1}), + + OPS = leveldb_wrapper:get_ops(DB, key, [{dc1, 3}], [{dc1, 3}, {dc2, 5}]), + + ?assertEqual([1, 2, 3], filter_records_into_sorted_numbers(OPS)) + end). + +put_n_snapshots(_DB, _Key, 0) -> + ok; +put_n_snapshots(DB, Key, N) -> + VC = vectorclock:from_list([{local, N}, {remote, N}]), + leveldb_wrapper:put_snapshot(DB, Key, #materialized_snapshot{snapshot_time = VC, value = N}), + put_n_snapshots(DB, Key, N - 1). + +put_n_operations(_DB, _Key, 0) -> + ok; +put_n_operations(DB, Key, N) -> + %% For testing purposes, we use only the version in the record to identify + %% the different ops, since it's easier than reproducing the whole record + leveldb_wrapper:put_op(DB, Key, [{local, N}, {remote, N}], + #log_record{version = N}), + put_n_operations(DB, Key, N - 1). + +filter_records_into_sorted_numbers(List) -> + lists:sort(lists:foldr(fun(Record, Acum) -> [Record#log_record.version | Acum] end, [], List)). + +vectorclock_to_sorted_list(VC) -> + case is_list(VC) of + true -> lists:sort(VC); + false -> lists:sort(dict:to_list(VC)) + end. diff --git a/test/prop_db_wrapper.erl b/test/prop_db_wrapper.erl new file mode 100644 index 0000000..0344c58 --- /dev/null +++ b/test/prop_db_wrapper.erl @@ -0,0 +1,118 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2014 SyncFree Consortium. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- +-module(prop_db_wrapper). + +-behaviour(proper_statem). + +-define(PROPER_NO_TRANS, true). +-include_lib("proper/include/proper.hrl"). +-include_lib("antidote_utils/include/antidote_utils.hrl"). + +-export([command/1, initial_state/0, next_state/3, precondition/2, postcondition/3, put_op/3, get_ops/3]). +-export([prop_ops_put_get/0]). + +-record(state, { + operations = [] :: [{key(), vectorclock(), any()}] +}). + +withFreshDb(F) -> + {ok, Db} = antidote_db:new("test_db", leveldb), + try + F(Db) + after + antidote_db:close_and_destroy(Db, "test_db") + end. + +prop_ops_put_get() -> + ?FORALL(Cmds, commands(?MODULE), + begin + withFreshDb(fun(Db) -> + put(antidote_db_instance, Db), + {_, _, Result} = run_commands(?MODULE, Cmds), + % io:format("Result = ~p~n", [Result]), + Result =:= ok + end) + end). + +command(_S) -> + oneof([ + {call, ?MODULE, put_op, [randomKey(), randomVectorclock(), randomLogEntry()]}, + {call, ?MODULE, get_ops, [randomKey(), randomVectorclock(), randomVectorclock()]} + ]). + +put_op(Key, Clock, LogEntry) -> + Db = get(antidote_db_instance), + antidote_db:put_op(Db, Key, vectorclock:from_list(Clock), LogEntry). + +get_ops(Key, FromClock, ToClock) -> + Db = get(antidote_db_instance), + antidote_db:get_ops(Db, Key, vectorclock:from_list(FromClock), vectorclock:from_list(ToClock)). + +initial_state() -> + #state{}. + +next_state(S, _V, {call, _, put_op, [Key, Clock, Entry]}) -> + % record bla + S#state{operations = [{Key, Clock, Entry} | S#state.operations]}; +next_state(S, _V, {call, _, get_ops, [_Key, _From, _To]}) -> + S. + +precondition(_S, {call, _, get_ops, [_Key, From, To]}) -> + FromVc = vectorclock:from_list(From), + ToVc = vectorclock:from_list(To), + vectorclock:le(FromVc, ToVc); +precondition(_S, {call, _, put_op, [_Key, _Clock, _Entry]}) -> true. + +postcondition(_S, {call, _, put_op, [_Key, _Clock, _Entry]}, _Res) -> + true; +postcondition(S, {call, _, get_ops, [Key, From, To]}, Res) -> + FromVc = vectorclock:from_list(From), + ToVc = vectorclock:from_list(To), + ModelOps = [Op || {K, C, Op} <- S#state.operations, K == Key + , not vectorclock:le(vectorclock:from_list(C), FromVc) + , vectorclock:le(vectorclock:from_list(C), ToVc)], + % io:format("Expected: ~p, Actual: ~p~n", [ModelOps, Res]), + lists:sort(ModelOps) == lists:sort(Res). + +randomKey() -> + elements([keyA, keyB, keyC]). + +randomVectorclock() -> + ?LET(T, vector(3, randomDot()), case T of [X1, X2, X3] -> + makeDot(dc1, X1) ++ makeDot(dc2, X2) ++ makeDot(dc3, X3) end). +%L = non_empty(list(tuple([randomDc(), choose(0, 20)]))), +%L = [tuple([randomDc(), choose(0, 1000)]) | list(tuple([randomDc(), choose(0, 1000)]))], +%L. + +makeDot(_Dc, 0) -> []; +makeDot(Dc, Time) -> [{Dc, Time}]. + +randomDot() -> + frequency([ + {1, 0}, + {9, choose(0, 20)} + ]). + +%%randomDc() -> +%% elements([dc1, dc2, dc3]). + +randomLogEntry() -> + elements([x, y, z]). + diff --git a/test/prop_get_ops.erl b/test/prop_get_ops.erl new file mode 100644 index 0000000..4375532 --- /dev/null +++ b/test/prop_get_ops.erl @@ -0,0 +1,111 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2014 SyncFree Consortium. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- +-module(prop_get_ops). + +-define(PROPER_NO_TRANS, true). +-include_lib("proper/include/proper.hrl"). + +-include_lib("antidote_utils/include/antidote_utils.hrl"). + +-export([prop_get_ops/0, blub/0]). + +blub() -> + Ops = [{inc, dc2, 1}, {inc, dc3, 1}, {inc, dc2, 1}, {pull, dc2, dc3}, {inc, dc3, 1}], + checkSpec(opsToClocks(Ops)). + +prop_get_ops() -> + ?FORALL(Ops, generateOps(), + checkSpec(opsToClocks(Ops)) + ). + +checkSpec(Clocks) -> +%% io:format("Clocks = ~p~n", [Clocks]), + VClocks = [vectorclock:from_list(C) || C <- Clocks], + eleveldb_wrapper_SUITE:withFreshDb( + fun(Db) -> + % insert all the operations + {_, Entries} = lists:foldl( + fun(Clock, {I, Entries}) -> + Entry = {logEntry, I}, + ok = leveldb_wrapper:put_op(Db, key, Clock, Entry), + {I + 1, [{Clock, Entry} | Entries]} + end, {1, []}, VClocks), + % generate all possible {From, To} pairs from the clocks, except the cases where From == To + ClockPairs = [{From, To} || From <- VClocks, To <- VClocks, vectorclock:le(From, To) and not vectorclock:eq(From, To)], + conjunction([{{dict:to_list(From), dict:to_list(To)}, checkGetOps(Db, Entries, From, To)} || {From, To} <- ClockPairs]) + end). + +checkGetOps(Db, Entries, From, To) -> + Records = leveldb_wrapper:get_ops(Db, key, From, To), + Expected = [Rec || + {Clock, Rec} <- Entries, + not vectorclock:lt(Clock, From) and vectorclock:le(Clock, To) + ], + ?WHENFAIL( + begin + io:format("~n---- Start of testcase -------~n"), + [io:format("ok = put_op(Db, key, ~w, ~w),~n", + [dict:to_list(C), E]) || {C, E} <- Entries], + io:format("Records = get_ops(Db, key, ~w, ~w),~n", + [dict:to_list(From), dict:to_list(To)]), + io:format("?assertEqual(~w, filter_records_into_sorted_numbers((Records)),~n", [lists:sort(Expected)]), + io:format("% returned ~w~n", [Records]), + io:format("---- End of testcase -------~n") + end, + lists:sort(Records) == lists:sort(Expected)). + +%%generateClocks() -> +%% ?LET(Ops, generateOps(), opsToClocks(Ops)). + +opsToClocks(Ops) -> + {Clocks, _} = execOps(Ops, {[], orddict:from_list([{R, [vectorclock:new()]} || R <- replicas()])}), + lists:sort([dict:to_list(C) || C <- Clocks]). + +execOps([], State) -> State; +execOps([{pull, SourceR, TargetR} | RemainingOps], {OpClocks, State}) -> + SourceClocks = orddict:fetch(SourceR, State), + TargetClock = lists:last(orddict:fetch(TargetR, State)), + NewSourceClocks = [C || C <- SourceClocks, not vectorclock:le(C, TargetClock)], + MergedClock = + case NewSourceClocks of + [] -> TargetClock; + [C | _] -> + vectorclock:max([C, TargetClock]) + end, + NewState = orddict:append(TargetR, MergedClock, State), + execOps(RemainingOps, {OpClocks, NewState}); +execOps([{inc, R, Amount} | RemainingOps], {OpClocks, State}) -> + RClock = lists:last(orddict:fetch(R, State)), + NewClock = vectorclock:set_clock_of_dc(R, Amount + vectorclock:get_clock_of_dc(R, RClock), RClock), + NewState = orddict:append(R, NewClock, State), + execOps(RemainingOps, {[NewClock | OpClocks], NewState}). + +generateOps() -> + list(oneof([ + % pulls one operation from the first replica to the second + {pull, replica(), replica()}, + % increment vector clock + {inc, replica(), range(1, 3)} + ])). + +replica() -> + oneof(replicas()). + +replicas() -> [dc1, dc2, dc3].