Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test deletion of object version (i.e. vclock) corresponding to CRDT version (i.e. context) #335

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/riakc_set.erl
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@
del_element/2]).

%% Query functions
-export([size/1,
-export([context/1,
size/1,
is_element/2,
fold/3]).

Expand Down Expand Up @@ -140,6 +141,10 @@ del_element(_Bin, #set{context=undefined}) ->
del_element(Bin, #set{removes=R0}=Set) when is_binary(Bin) ->
Set#set{removes=ordsets:add_element(Bin, R0)}.

%% @doc Returns the original context of the set.
-spec context(riakc_set()) -> riakc_datatype:context().
context(#set{context=C}) -> C.

%% @doc Returns the cardinality (size) of the set. <em>Note: this only
%% operates on the original value as retrieved from Riak.</em>
-spec size(riakc_set()) -> pos_integer().
Expand Down
117 changes: 117 additions & 0 deletions test/riakc_pb_socket_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1401,9 +1401,126 @@ integration_tests() ->
Rsp ->
?debugFmt("gsets bucket is not present, skipping (~p)", [Rsp])
end
end)},
{"delete with vclock set with context",
?_test(begin
riakc_test_utils:reset_riak(),
{ok, Pid} = riakc_test_utils:start_link(),
{error, {notfound, set}} = riakc_pb_socket:fetch_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>),
ok = riakc_pb_socket:update_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>, riakc_set:to_op(riakc_set:add_element(<<"X">>, riakc_set:new()))),
{ok, S0} = riakc_pb_socket:fetch_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>),
?assert(riakc_set:is_element(<<"X">>, S0)),
?assertEqual(1, riakc_set:size(S0)),
ok = riakc_pb_socket:update_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>, riakc_set:to_op(riakc_set:del_element(<<"X">>, S0))),
{ok, S1} = riakc_pb_socket:fetch_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>),
?assertNot(riakc_set:is_element(<<"X">>, S1)),
?assertEqual(0, riakc_set:size(S1)),
SC1 = riakc_set:context(S1),
?debugFmt("The set with context ~1000p is empty - attempt once to delete the corresponding object. In order to do so, on the server: fetch the latest object by key (mapreduce), assert it has no siblings, assert it stores a CRDT, extract object vclock and CRDT context, finally return to the client. If successfully returned object vclock and CRDT context, on the client: if the CRDT context is the expected one, delete the object specifying the vclock detected as corresponding to the CRDT context.~n", [SC1]),
S = map_strfun_returning_object_vclock_and_crdt_context(),
R = riakc_pb_socket:mapred(
Pid,
[{{{<<"sets">>, <<"b">>}, <<"k">>}, undefined}],
[{map, {strfun, S}, none, true}]),
{ok, [{_, [{{object_vclock, OV2},
{crdt_context, SC2}}]}]} = R,
?assertEqual(SC1, SC2),
ok = riakc_pb_socket:delete_vclock(Pid, {<<"sets">>, <<"b">>}, <<"k">>, OV2),
{error, {notfound, set}} = riakc_pb_socket:fetch_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>)
end)},
{"delete with vclock set with context - case determination of vclock corresponding to context failed for concurrent write",
?_test(begin
riakc_test_utils:reset_riak(),
{ok, Pid} = riakc_test_utils:start_link(),
{error, {notfound, set}} = riakc_pb_socket:fetch_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>),
ok = riakc_pb_socket:update_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>, riakc_set:to_op(riakc_set:add_element(<<"X">>, riakc_set:new()))),
{ok, S0} = riakc_pb_socket:fetch_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>),
?assert(riakc_set:is_element(<<"X">>, S0)),
?assertEqual(1, riakc_set:size(S0)),
ok = riakc_pb_socket:update_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>, riakc_set:to_op(riakc_set:del_element(<<"X">>, S0))),
{ok, S1} = riakc_pb_socket:fetch_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>),
?assertNot(riakc_set:is_element(<<"X">>, S1)),
?assertEqual(0, riakc_set:size(S1)),
%% Set is empty hence worth deleting. But
%% concurrent actor modifies object...
{ok, PidConcurrent} = riakc_test_utils:start_link(),
{ok, S0Concurrent} = riakc_pb_socket:fetch_type(PidConcurrent, {<<"sets">>, <<"b">>}, <<"k">>),
?assertEqual(0, riakc_set:size(S0Concurrent)),
ok = riakc_pb_socket:update_type(PidConcurrent, {<<"sets">>, <<"b">>}, <<"k">>, riakc_set:to_op(riakc_set:add_element(<<"Y">>, S0Concurrent))),
{ok, S1Concurrent} = riakc_pb_socket:fetch_type(PidConcurrent, {<<"sets">>, <<"b">>}, <<"k">>),
?assertEqual(1, riakc_set:size(S1Concurrent)),
%% ... before managing to determine object vclock
%% corresponding to CRDT context...
S = map_strfun_returning_object_vclock_and_crdt_context(),
R = riakc_pb_socket:mapred(
Pid,
[{{{<<"sets">>, <<"b">>}, <<"k">>}, undefined}],
[{map, {strfun, S}, none, true}]),
{ok, [{_, [{{object_vclock, _OV2},
{crdt_context, SC2}}]}]} = R,
?assertNotEqual(riakc_set:context(S1), SC2)
%% ... hence decision can be taken on what to do
%% e.g. not calling
%% `riakc_pb_socket:delete_vclock/3`.
end)},
{"delete with vclock set with context - case delete failed for concurrent write",
?_test(begin
riakc_test_utils:reset_riak(),
{ok, Pid} = riakc_test_utils:start_link(),
{error, {notfound, set}} = riakc_pb_socket:fetch_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>),
ok = riakc_pb_socket:update_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>, riakc_set:to_op(riakc_set:add_element(<<"X">>, riakc_set:new()))),
{ok, S0} = riakc_pb_socket:fetch_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>),
?assert(riakc_set:is_element(<<"X">>, S0)),
?assertEqual(1, riakc_set:size(S0)),
ok = riakc_pb_socket:update_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>, riakc_set:to_op(riakc_set:del_element(<<"X">>, S0))),
{ok, S1} = riakc_pb_socket:fetch_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>),
?assertNot(riakc_set:is_element(<<"X">>, S1)),
?assertEqual(0, riakc_set:size(S1)),
%% Set is empty hence worth deleting. Determine
%% object vclock corresponding to CRDT context.
S = map_strfun_returning_object_vclock_and_crdt_context(),
R = riakc_pb_socket:mapred(
Pid,
[{{{<<"sets">>, <<"b">>}, <<"k">>}, undefined}],
[{map, {strfun, S}, none, true}]),
{ok, [{_, [{{object_vclock, OV2},
{crdt_context, SC2}}]}]} = R,
?assertEqual(riakc_set:context(S1), SC2),
%% But concurrent actor modifies object...
{ok, PidConcurrent} = riakc_test_utils:start_link(),
{ok, S0Concurrent} = riakc_pb_socket:fetch_type(PidConcurrent, {<<"sets">>, <<"b">>}, <<"k">>),
?assertEqual(0, riakc_set:size(S0Concurrent)),
ok = riakc_pb_socket:update_type(PidConcurrent, {<<"sets">>, <<"b">>}, <<"k">>, riakc_set:to_op(riakc_set:add_element(<<"Y">>, S0Concurrent))),
{ok, S1Concurrent} = riakc_pb_socket:fetch_type(PidConcurrent, {<<"sets">>, <<"b">>}, <<"k">>),
?assertEqual(1, riakc_set:size(S1Concurrent)),
%% ... hence attempt to delete object at specific
%% vclock does not delete the object.
ok = riakc_pb_socket:delete_vclock(Pid, {<<"sets">>, <<"b">>}, <<"k">>, OV2),
{ok, S1Concurrent} = riakc_pb_socket:fetch_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>)
end)}
].

map_strfun_returning_object_vclock_and_crdt_context() ->
"fun(O, undefined, none) ->
%% TODO Tombstone in mapreduce %% Ref http://docs.basho.com/riak/kv/2.1.4/developing/app-guide/advanced-mapreduce/#map-phase
%% Assert no siblings.
1 = riak_object:value_count(O),
%% riak_kv_crdt:value/1 infers type
%% but discards CRDT context: follow
%% its implementation but keep CRDT
%% context.
B = riak_object:bucket(O),
BProps = [_|_] = riak_core_bucket:get_bucket(B),
{{Ctx, _V}, _Stats} =
riak_kv_crdt:value(
O, riak_kv_crdt:to_mod(
proplists:get_value(
datatype, BProps))),
[{{object_vclock, riak_object:encode_vclock(
riak_object:vclock(O))},
{crdt_context, Ctx}}]
end.".

integration_test_() ->
SetupFun = fun() ->
%% Grab the riakclient_pb.proto file
Expand Down
2 changes: 1 addition & 1 deletion tools
Submodule tools updated 1 files
+1 −0 lib/gen_adv_conf.bash