From a9ea6998ec70a7789b0e01994424997981fc8cc9 Mon Sep 17 00:00:00 2001 From: Luca Favatella Date: Thu, 1 Sep 2016 13:34:58 +0100 Subject: [PATCH 1/5] Log skipped test group Comment taken from 4557759. --- test/riakc_pb_socket_tests.erl | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/test/riakc_pb_socket_tests.erl b/test/riakc_pb_socket_tests.erl index b613493c..1e800493 100644 --- a/test/riakc_pb_socket_tests.erl +++ b/test/riakc_pb_socket_tests.erl @@ -1377,7 +1377,9 @@ integration_test_() -> GenFun = fun() -> case catch net_adm:ping(riakc_test_utils:test_riak_node()) of pong -> integration_tests(); - _ -> [] + _ -> + ?debugMsg("Skipped - needs live server"), + [] end end, {setup, SetupFun, CleanupFun, {generator, GenFun}}. From 781969fb56de77afcf12c60919a6cfaf9cdf32ac Mon Sep 17 00:00:00 2001 From: Luca Favatella Date: Thu, 1 Sep 2016 13:50:47 +0100 Subject: [PATCH 2/5] Run integration tests on Travis CI ... by running Riak. Excerpt from https://docs.travis-ci.com/user/database-setup/#Riak : > Riak uses the default configuration apart from the storage backend, > which is LevelDB. Riak Search is enabled. --- .travis.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.travis.yml b/.travis.yml index 4b6fd17a..2488a516 100644 --- a/.travis.yml +++ b/.travis.yml @@ -9,3 +9,5 @@ otp_release: - 17.5 - R16B03 - R15B03 +services: + - riak From 46e7c0256c3825fac8617bfeb45714b08ef15c2b Mon Sep 17 00:00:00 2001 From: Luca Favatella Date: Thu, 1 Sep 2016 17:10:09 +0100 Subject: [PATCH 3/5] Configure running riak before running integration tests Refs: * https://github.com/basho/riak-erlang-client/pull/316#issuecomment-244091506 * https://github.com/basho/riak-erlang-client/pull/315#issuecomment-244108321 --- .travis.yml | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/.travis.yml b/.travis.yml index 2488a516..2e58721d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,3 +1,4 @@ +sudo: required language: erlang notifications: webhooks: http://basho-engbot.herokuapp.com/travis?key=8f07584549e458d4c83728f3397ecbd4368e60a8 @@ -11,3 +12,13 @@ otp_release: - R15B03 services: - riak +before_install: + - ls -l "$(command -v riak-admin)" + - mkdir tmp + - mkdir tmp/bin + - echo 'exec sudo riak-admin "$@"' > tmp/bin/riak-admin + - chmod u+x tmp/bin/riak-admin + - cat tmp/bin/riak-admin + - ls -l tmp/bin +before_script: + - make -C buildbot configure RIAK_DIR="$(pwd)/tmp" From c071cd9ae94e6429aa5883fa0cbb253e5a7521d0 Mon Sep 17 00:00:00 2001 From: Luca Favatella Date: Sat, 3 Sep 2016 23:20:42 +0100 Subject: [PATCH 4/5] Allow mapreduce function as string --- .travis.yml | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 2e58721d..e9cfd994 100644 --- a/.travis.yml +++ b/.travis.yml @@ -13,8 +13,14 @@ otp_release: services: - riak before_install: - - ls -l "$(command -v riak-admin)" - mkdir tmp + - test ! -e /etc/riak/advanced.config + - echo "[{riak_kv, [{allow_strfun, true}]}]." > tmp/advanced.config + - sudo cp -p tmp/advanced.config /etc/riak/advanced.config + - cat /etc/riak/advanced.config + - ls -l /etc/riak/advanced.config + - sudo service riak --full-restart + - ls -l "$(command -v riak-admin)" - mkdir tmp/bin - echo 'exec sudo riak-admin "$@"' > tmp/bin/riak-admin - chmod u+x tmp/bin/riak-admin From 994ccea57b9ddab045ccbad2828296868567fcc1 Mon Sep 17 00:00:00 2001 From: Luca Favatella Date: Thu, 1 Sep 2016 15:32:54 +0100 Subject: [PATCH 5/5] Test riak set delete with vclock TODO Make extraction of context from Riak KV client-side CRDT representation part of behaviour `riakc_datatype` - not only of `riakc_set`. --- src/riakc_datatype.erl | 3 + src/riakc_set.erl | 5 ++ test/riakc_pb_socket_tests.erl | 117 +++++++++++++++++++++++++++++++++ 3 files changed, 125 insertions(+) diff --git a/src/riakc_datatype.erl b/src/riakc_datatype.erl index dd6c5a10..a5e1b9a8 100644 --- a/src/riakc_datatype.erl +++ b/src/riakc_datatype.erl @@ -63,6 +63,9 @@ %% internally by the client code. -callback new(Value::term(), context()) -> datatype(). +%% Returns the context. +%-callback context(datatype()) -> context(). + %% Returns the original, unmodified value of the type. This does %% not include the application of any locally-queued operations. -callback value(datatype()) -> term(). diff --git a/src/riakc_set.erl b/src/riakc_set.erl index a10e9642..aecb1343 100644 --- a/src/riakc_set.erl +++ b/src/riakc_set.erl @@ -54,6 +54,7 @@ %% Callbacks -export([new/0, new/1, new/2, + context/1, value/1, to_op/1, is_type/1, @@ -97,6 +98,10 @@ new(Value, Context) when is_list(Value) -> #set{value=ordsets:from_list(Value), context=Context}. +%% @doc Returns the original context of the set. +-spec context(riakc_set()) -> riakc_datatype:context(). +context(#set{context=C}) -> C. + %% @doc Returns the original value of the set as an ordset. -spec value(riakc_set()) -> ordsets:ordset(binary()). value(#set{value=V}) -> V. diff --git a/test/riakc_pb_socket_tests.erl b/test/riakc_pb_socket_tests.erl index 1e800493..a2e15875 100644 --- a/test/riakc_pb_socket_tests.erl +++ b/test/riakc_pb_socket_tests.erl @@ -1364,9 +1364,126 @@ integration_tests() -> %% Make sure card and value are the same ?assertEqual(riakc_hll:card(Hll1), Value) + end)}, + {"delete with vclock set with context", + ?_test(begin + riakc_test_utils:reset_riak(), + {ok, Pid} = riakc_test_utils:start_link(), + {error, {notfound, set}} = riakc_pb_socket:fetch_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>), + ok = riakc_pb_socket:update_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>, riakc_set:to_op(riakc_set:add_element(<<"X">>, riakc_set:new()))), + {ok, S0} = riakc_pb_socket:fetch_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>), + ?assert(riakc_set:is_element(<<"X">>, S0)), + ?assertEqual(1, riakc_set:size(S0)), + ok = riakc_pb_socket:update_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>, riakc_set:to_op(riakc_set:del_element(<<"X">>, S0))), + {ok, S1} = riakc_pb_socket:fetch_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>), + ?assertNot(riakc_set:is_element(<<"X">>, S1)), + ?assertEqual(0, riakc_set:size(S1)), + SC1 = riakc_set:context(S1), + ?debugFmt("The set with context ~1000p is empty - attempt once to delete the corresponding object. In order to do so, on the server: fetch the latest object by key (mapreduce), assert it has no siblings, assert it stores a CRDT, extract object vclock and CRDT context, finally return to the client. If successfully returned object vclock and CRDT context, on the client: if the CRDT context is the expected one, delete the object specifying the vclock detected as corresponding to the CRDT context.~n", [SC1]), + S = map_strfun_returning_object_vclock_and_crdt_context(), + R = riakc_pb_socket:mapred( + Pid, + [{{{<<"sets">>, <<"b">>}, <<"k">>}, undefined}], + [{map, {strfun, S}, none, true}]), + {ok, [{_, [{{object_vclock, OV2}, + {crdt_context, SC2}}]}]} = R, + ?assertEqual(SC1, SC2), + ok = riakc_pb_socket:delete_vclock(Pid, {<<"sets">>, <<"b">>}, <<"k">>, OV2), + {error, {notfound, set}} = riakc_pb_socket:fetch_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>) + end)}, + {"delete with vclock set with context - case determination of vclock corresponding to context failed for concurrent write", + ?_test(begin + riakc_test_utils:reset_riak(), + {ok, Pid} = riakc_test_utils:start_link(), + {error, {notfound, set}} = riakc_pb_socket:fetch_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>), + ok = riakc_pb_socket:update_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>, riakc_set:to_op(riakc_set:add_element(<<"X">>, riakc_set:new()))), + {ok, S0} = riakc_pb_socket:fetch_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>), + ?assert(riakc_set:is_element(<<"X">>, S0)), + ?assertEqual(1, riakc_set:size(S0)), + ok = riakc_pb_socket:update_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>, riakc_set:to_op(riakc_set:del_element(<<"X">>, S0))), + {ok, S1} = riakc_pb_socket:fetch_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>), + ?assertNot(riakc_set:is_element(<<"X">>, S1)), + ?assertEqual(0, riakc_set:size(S1)), + %% Set is empty hence worth deleting. But + %% concurrent actor modifies object... + {ok, PidConcurrent} = riakc_test_utils:start_link(), + {ok, S0Concurrent} = riakc_pb_socket:fetch_type(PidConcurrent, {<<"sets">>, <<"b">>}, <<"k">>), + ?assertEqual(0, riakc_set:size(S0Concurrent)), + ok = riakc_pb_socket:update_type(PidConcurrent, {<<"sets">>, <<"b">>}, <<"k">>, riakc_set:to_op(riakc_set:add_element(<<"Y">>, S0Concurrent))), + {ok, S1Concurrent} = riakc_pb_socket:fetch_type(PidConcurrent, {<<"sets">>, <<"b">>}, <<"k">>), + ?assertEqual(1, riakc_set:size(S1Concurrent)), + %% ... before managing to determine object vclock + %% corresponding to CRDT context... + S = map_strfun_returning_object_vclock_and_crdt_context(), + R = riakc_pb_socket:mapred( + Pid, + [{{{<<"sets">>, <<"b">>}, <<"k">>}, undefined}], + [{map, {strfun, S}, none, true}]), + {ok, [{_, [{{object_vclock, _OV2}, + {crdt_context, SC2}}]}]} = R, + ?assertNotEqual(riakc_set:context(S1), SC2) + %% ... hence decision can be taken on what to do + %% e.g. not calling + %% `riakc_pb_socket:delete_vclock/3`. + end)}, + {"delete with vclock set with context - case delete failed for concurrent write", + ?_test(begin + riakc_test_utils:reset_riak(), + {ok, Pid} = riakc_test_utils:start_link(), + {error, {notfound, set}} = riakc_pb_socket:fetch_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>), + ok = riakc_pb_socket:update_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>, riakc_set:to_op(riakc_set:add_element(<<"X">>, riakc_set:new()))), + {ok, S0} = riakc_pb_socket:fetch_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>), + ?assert(riakc_set:is_element(<<"X">>, S0)), + ?assertEqual(1, riakc_set:size(S0)), + ok = riakc_pb_socket:update_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>, riakc_set:to_op(riakc_set:del_element(<<"X">>, S0))), + {ok, S1} = riakc_pb_socket:fetch_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>), + ?assertNot(riakc_set:is_element(<<"X">>, S1)), + ?assertEqual(0, riakc_set:size(S1)), + %% Set is empty hence worth deleting. Determine + %% object vclock corresponding to CRDT context. + S = map_strfun_returning_object_vclock_and_crdt_context(), + R = riakc_pb_socket:mapred( + Pid, + [{{{<<"sets">>, <<"b">>}, <<"k">>}, undefined}], + [{map, {strfun, S}, none, true}]), + {ok, [{_, [{{object_vclock, OV2}, + {crdt_context, SC2}}]}]} = R, + ?assertEqual(riakc_set:context(S1), SC2), + %% But concurrent actor modifies object... + {ok, PidConcurrent} = riakc_test_utils:start_link(), + {ok, S0Concurrent} = riakc_pb_socket:fetch_type(PidConcurrent, {<<"sets">>, <<"b">>}, <<"k">>), + ?assertEqual(0, riakc_set:size(S0Concurrent)), + ok = riakc_pb_socket:update_type(PidConcurrent, {<<"sets">>, <<"b">>}, <<"k">>, riakc_set:to_op(riakc_set:add_element(<<"Y">>, S0Concurrent))), + {ok, S1Concurrent} = riakc_pb_socket:fetch_type(PidConcurrent, {<<"sets">>, <<"b">>}, <<"k">>), + ?assertEqual(1, riakc_set:size(S1Concurrent)), + %% ... hence attempt to delete object at specific + %% vclock does not delete the object. + ok = riakc_pb_socket:delete_vclock(Pid, {<<"sets">>, <<"b">>}, <<"k">>, OV2), + {ok, S1Concurrent} = riakc_pb_socket:fetch_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>) end)} ]. +map_strfun_returning_object_vclock_and_crdt_context() -> + "fun(O, undefined, none) -> + %% TODO Tombstone in mapreduce %% Ref http://docs.basho.com/riak/kv/2.1.4/developing/app-guide/advanced-mapreduce/#map-phase + %% Assert no siblings. + 1 = riak_object:value_count(O), + %% riak_kv_crdt:value/1 infers type + %% but discards CRDT context: follow + %% its implementation but keep CRDT + %% context. + B = riak_object:bucket(O), + BProps = [_|_] = riak_core_bucket:get_bucket(B), + {{Ctx, _V}, _Stats} = + riak_kv_crdt:value( + O, riak_kv_crdt:to_mod( + proplists:get_value( + datatype, BProps))), + [{{object_vclock, riak_object:encode_vclock( + riak_object:vclock(O))}, + {crdt_context, Ctx}}] + end.". + integration_test_() -> SetupFun = fun() -> %% Grab the riakclient_pb.proto file