Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC - DO NOT MERGE - Test deletion of object version (i.e. vclock) corresponding to CRDT version (i.e. context) #317

Closed
wants to merge 5 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
sudo: required
language: erlang
notifications:
webhooks: http://basho-engbot.herokuapp.com/travis?key=8f07584549e458d4c83728f3397ecbd4368e60a8
@@ -9,3 +10,21 @@ otp_release:
- 17.5
- R16B03
- R15B03
services:
- riak
before_install:
- mkdir tmp
- test ! -e /etc/riak/advanced.config
- echo "[{riak_kv, [{allow_strfun, true}]}]." > tmp/advanced.config
- sudo cp -p tmp/advanced.config /etc/riak/advanced.config
- cat /etc/riak/advanced.config
- ls -l /etc/riak/advanced.config
- sudo service riak --full-restart
- ls -l "$(command -v riak-admin)"
- mkdir tmp/bin
- echo 'exec sudo riak-admin "$@"' > tmp/bin/riak-admin
- chmod u+x tmp/bin/riak-admin
- cat tmp/bin/riak-admin
- ls -l tmp/bin
before_script:
- make -C buildbot configure RIAK_DIR="$(pwd)/tmp"
3 changes: 3 additions & 0 deletions src/riakc_datatype.erl
Original file line number Diff line number Diff line change
@@ -63,6 +63,9 @@
%% internally by the client code.
-callback new(Value::term(), context()) -> datatype().

%% Returns the context.
%-callback context(datatype()) -> context().

%% Returns the original, unmodified value of the type. This does
%% not include the application of any locally-queued operations.
-callback value(datatype()) -> term().
5 changes: 5 additions & 0 deletions src/riakc_set.erl
Original file line number Diff line number Diff line change
@@ -54,6 +54,7 @@

%% Callbacks
-export([new/0, new/1, new/2,
context/1,
value/1,
to_op/1,
is_type/1,
@@ -97,6 +98,10 @@ new(Value, Context) when is_list(Value) ->
#set{value=ordsets:from_list(Value),
context=Context}.

%% @doc Returns the original context of the set.
-spec context(riakc_set()) -> riakc_datatype:context().
context(#set{context=C}) -> C.

%% @doc Returns the original value of the set as an ordset.
-spec value(riakc_set()) -> ordsets:ordset(binary()).
value(#set{value=V}) -> V.
121 changes: 120 additions & 1 deletion test/riakc_pb_socket_tests.erl
Original file line number Diff line number Diff line change
@@ -1364,9 +1364,126 @@ integration_tests() ->

%% Make sure card and value are the same
?assertEqual(riakc_hll:card(Hll1), Value)
end)},
{"delete with vclock set with context",
?_test(begin
riakc_test_utils:reset_riak(),
{ok, Pid} = riakc_test_utils:start_link(),
{error, {notfound, set}} = riakc_pb_socket:fetch_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>),
ok = riakc_pb_socket:update_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>, riakc_set:to_op(riakc_set:add_element(<<"X">>, riakc_set:new()))),
{ok, S0} = riakc_pb_socket:fetch_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>),
?assert(riakc_set:is_element(<<"X">>, S0)),
?assertEqual(1, riakc_set:size(S0)),
ok = riakc_pb_socket:update_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>, riakc_set:to_op(riakc_set:del_element(<<"X">>, S0))),
{ok, S1} = riakc_pb_socket:fetch_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>),
?assertNot(riakc_set:is_element(<<"X">>, S1)),
?assertEqual(0, riakc_set:size(S1)),
SC1 = riakc_set:context(S1),
?debugFmt("The set with context ~1000p is empty - attempt once to delete the corresponding object. In order to do so, on the server: fetch the latest object by key (mapreduce), assert it has no siblings, assert it stores a CRDT, extract object vclock and CRDT context, finally return to the client. If successfully returned object vclock and CRDT context, on the client: if the CRDT context is the expected one, delete the object specifying the vclock detected as corresponding to the CRDT context.~n", [SC1]),
S = map_strfun_returning_object_vclock_and_crdt_context(),
R = riakc_pb_socket:mapred(
Pid,
[{{{<<"sets">>, <<"b">>}, <<"k">>}, undefined}],
[{map, {strfun, S}, none, true}]),
{ok, [{_, [{{object_vclock, OV2},
{crdt_context, SC2}}]}]} = R,
?assertEqual(SC1, SC2),
ok = riakc_pb_socket:delete_vclock(Pid, {<<"sets">>, <<"b">>}, <<"k">>, OV2),
{error, {notfound, set}} = riakc_pb_socket:fetch_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>)
end)},
{"delete with vclock set with context - case determination of vclock corresponding to context failed for concurrent write",
?_test(begin
riakc_test_utils:reset_riak(),
{ok, Pid} = riakc_test_utils:start_link(),
{error, {notfound, set}} = riakc_pb_socket:fetch_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>),
ok = riakc_pb_socket:update_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>, riakc_set:to_op(riakc_set:add_element(<<"X">>, riakc_set:new()))),
{ok, S0} = riakc_pb_socket:fetch_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>),
?assert(riakc_set:is_element(<<"X">>, S0)),
?assertEqual(1, riakc_set:size(S0)),
ok = riakc_pb_socket:update_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>, riakc_set:to_op(riakc_set:del_element(<<"X">>, S0))),
{ok, S1} = riakc_pb_socket:fetch_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>),
?assertNot(riakc_set:is_element(<<"X">>, S1)),
?assertEqual(0, riakc_set:size(S1)),
%% Set is empty hence worth deleting. But
%% concurrent actor modifies object...
{ok, PidConcurrent} = riakc_test_utils:start_link(),
{ok, S0Concurrent} = riakc_pb_socket:fetch_type(PidConcurrent, {<<"sets">>, <<"b">>}, <<"k">>),
?assertEqual(0, riakc_set:size(S0Concurrent)),
ok = riakc_pb_socket:update_type(PidConcurrent, {<<"sets">>, <<"b">>}, <<"k">>, riakc_set:to_op(riakc_set:add_element(<<"Y">>, S0Concurrent))),
{ok, S1Concurrent} = riakc_pb_socket:fetch_type(PidConcurrent, {<<"sets">>, <<"b">>}, <<"k">>),
?assertEqual(1, riakc_set:size(S1Concurrent)),
%% ... before managing to determine object vclock
%% corresponding to CRDT context...
S = map_strfun_returning_object_vclock_and_crdt_context(),
R = riakc_pb_socket:mapred(
Pid,
[{{{<<"sets">>, <<"b">>}, <<"k">>}, undefined}],
[{map, {strfun, S}, none, true}]),
{ok, [{_, [{{object_vclock, _OV2},
{crdt_context, SC2}}]}]} = R,
?assertNotEqual(riakc_set:context(S1), SC2)
%% ... hence decision can be taken on what to do
%% e.g. not calling
%% `riakc_pb_socket:delete_vclock/3`.
end)},
{"delete with vclock set with context - case delete failed for concurrent write",
?_test(begin
riakc_test_utils:reset_riak(),
{ok, Pid} = riakc_test_utils:start_link(),
{error, {notfound, set}} = riakc_pb_socket:fetch_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>),
ok = riakc_pb_socket:update_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>, riakc_set:to_op(riakc_set:add_element(<<"X">>, riakc_set:new()))),
{ok, S0} = riakc_pb_socket:fetch_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>),
?assert(riakc_set:is_element(<<"X">>, S0)),
?assertEqual(1, riakc_set:size(S0)),
ok = riakc_pb_socket:update_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>, riakc_set:to_op(riakc_set:del_element(<<"X">>, S0))),
{ok, S1} = riakc_pb_socket:fetch_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>),
?assertNot(riakc_set:is_element(<<"X">>, S1)),
?assertEqual(0, riakc_set:size(S1)),
%% Set is empty hence worth deleting. Determine
%% object vclock corresponding to CRDT context.
S = map_strfun_returning_object_vclock_and_crdt_context(),
R = riakc_pb_socket:mapred(
Pid,
[{{{<<"sets">>, <<"b">>}, <<"k">>}, undefined}],
[{map, {strfun, S}, none, true}]),
{ok, [{_, [{{object_vclock, OV2},
{crdt_context, SC2}}]}]} = R,
?assertEqual(riakc_set:context(S1), SC2),
%% But concurrent actor modifies object...
{ok, PidConcurrent} = riakc_test_utils:start_link(),
{ok, S0Concurrent} = riakc_pb_socket:fetch_type(PidConcurrent, {<<"sets">>, <<"b">>}, <<"k">>),
?assertEqual(0, riakc_set:size(S0Concurrent)),
ok = riakc_pb_socket:update_type(PidConcurrent, {<<"sets">>, <<"b">>}, <<"k">>, riakc_set:to_op(riakc_set:add_element(<<"Y">>, S0Concurrent))),
{ok, S1Concurrent} = riakc_pb_socket:fetch_type(PidConcurrent, {<<"sets">>, <<"b">>}, <<"k">>),
?assertEqual(1, riakc_set:size(S1Concurrent)),
%% ... hence attempt to delete object at specific
%% vclock does not delete the object.
ok = riakc_pb_socket:delete_vclock(Pid, {<<"sets">>, <<"b">>}, <<"k">>, OV2),
{ok, S1Concurrent} = riakc_pb_socket:fetch_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>)
end)}
].

map_strfun_returning_object_vclock_and_crdt_context() ->
"fun(O, undefined, none) ->
%% TODO Tombstone in mapreduce %% Ref http://docs.basho.com/riak/kv/2.1.4/developing/app-guide/advanced-mapreduce/#map-phase
%% Assert no siblings.
1 = riak_object:value_count(O),
%% riak_kv_crdt:value/1 infers type
%% but discards CRDT context: follow
%% its implementation but keep CRDT
%% context.
B = riak_object:bucket(O),
BProps = [_|_] = riak_core_bucket:get_bucket(B),
{{Ctx, _V}, _Stats} =
riak_kv_crdt:value(
O, riak_kv_crdt:to_mod(
proplists:get_value(
datatype, BProps))),
[{{object_vclock, riak_object:encode_vclock(
riak_object:vclock(O))},
{crdt_context, Ctx}}]
end.".

integration_test_() ->
SetupFun = fun() ->
%% Grab the riakclient_pb.proto file
@@ -1377,7 +1494,9 @@ integration_test_() ->
GenFun = fun() ->
case catch net_adm:ping(riakc_test_utils:test_riak_node()) of
pong -> integration_tests();
_ -> []
_ ->
?debugMsg("Skipped - needs live server"),
[]
end
end,
{setup, SetupFun, CleanupFun, {generator, GenFun}}.