diff --git a/.gitignore b/.gitignore index 0476c925..65c850a1 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,4 @@ test-unchanged.escript *.dump _build/ rebar.lock +.DS_Store diff --git a/.travis.yml b/.travis.yml index 1e0f9ce1..4b70972d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,18 +1,8 @@ -sudo: required -dist: trusty language: erlang otp_release: - - 19.1 - - 18.3 - - 17.5 - - R16B03 - - R15B03 -env: - - RIAK_DOWNLOAD_URL=http://s3.amazonaws.com/downloads.basho.com/riak/2.0/2.0.7/ubuntu/trusty/riak_2.0.7-1_amd64.deb - - RIAK_DOWNLOAD_URL=http://s3.amazonaws.com/downloads.basho.com/riak/2.1/2.1.4/ubuntu/trusty/riak_2.1.4-1_amd64.deb -before_script: - - sudo ./tools/travis-ci/riak-install -d "$RIAK_DOWNLOAD_URL" - - sudo ./tools/setup-riak -notifications: - slack: - secure: JVsrhRuWRTQauP7OjSc1XO6+P3eiOZtkjYhU2R53Hn9dK1KmJRBR5MzO1nq6BUs+bViXiAyW0YOoDTWF0eUw5gdd6sqnvx0+mYJVfYDTfbjp46yqj03Nj+J5HZ1KWPM78NSZ8jpZvdwk35ZpHqhsh/zWOY2RYmIVQKLB9EthHLU= + - 20.3.8 + - 21.3 + - 22.3 +script: + - chmod u+x rebar3 + - ./rebar3 do upgrade, compile, xref, dialyzer, eunit diff --git a/include/riakc.hrl b/include/riakc.hrl index 74495626..1a18f4f2 100644 --- a/include/riakc.hrl +++ b/include/riakc.hrl @@ -32,7 +32,8 @@ auto_reconnect | {auto_reconnect, boolean()} | keepalive | - {keepalive, boolean()}. + {keepalive, boolean()}| + {silence_terminate_crash, boolean()}. %% Options for starting or modifying the connection: %% `queue_if_disconnected' when present or true will cause requests to %% be queued while the connection is down. `auto_reconnect' when diff --git a/rebar b/rebar deleted file mode 100755 index 13bcbf9a..00000000 Binary files a/rebar and /dev/null differ diff --git a/rebar.config b/rebar.config index a6184160..835031c1 100644 --- a/rebar.config +++ b/rebar.config @@ -1,4 +1,4 @@ -{require_otp_vsn, "R15|R16|17|18|19|20|21"}. +{require_otp_vsn, "20|21|22"}. {cover_enabled, true}. diff --git a/rebar.config.script b/rebar.config.script deleted file mode 100644 index cbcd06ed..00000000 --- a/rebar.config.script +++ /dev/null @@ -1,17 +0,0 @@ -case erlang:system_info(otp_release) of - - [$R|_] -> - %% Rxx < 17.0, not border inclusive - HashDefine = [{d,pre17}], - case lists:keysearch(erl_opts, 1, CONFIG) of - {value, {erl_opts, Opts}} -> - lists:keyreplace(erl_opts,1,CONFIG,{erl_opts,Opts++HashDefine}); - false -> - CONFIG ++ [{erl_opts, HashDefine}] - end; - - _ -> - %% "17", our future with map - CONFIG - -end. diff --git a/rebar3 b/rebar3 index f0b1ba78..e550663a 100755 Binary files a/rebar3 and b/rebar3 differ diff --git a/src/riakc_pb_socket.erl b/src/riakc_pb_socket.erl index f7db56d6..e3cd9f6a 100644 --- a/src/riakc_pb_socket.erl +++ b/src/riakc_pb_socket.erl @@ -49,6 +49,7 @@ set_client_id/2, set_client_id/3, get_server_info/1, get_server_info/2, get/3, get/4, get/5, + fetch/2, put/2, put/3, put/4, delete/3, delete/4, delete/5, delete_vclock/4, delete_vclock/5, delete_vclock/6, @@ -70,6 +71,11 @@ search/3, search/4, search/5, search/6, get_index/4, get_index/5, get_index/6, get_index/7, %% @deprecated get_index_eq/4, get_index_range/5, get_index_eq/5, get_index_range/6, + aae_merge_root/2, aae_merge_branches/3, aae_fetch_clocks/3, + aae_range_tree/7, aae_range_clocks/5, aae_range_replkeys/5, + aae_find_keys/5, aae_find_tombs/5, aae_reap_tombs/6, aae_erase_keys/6, + aae_list_buckets/1, aae_list_buckets/2, + aae_object_stats/4, cs_bucket_fold/3, default_timeout/1, tunnel/4, @@ -129,6 +135,15 @@ {end_incl, boolean()}. -type cs_opts() :: [cs_opt()]. +-type key_range() :: {riakc_obj:key(), riakc_obj:key()} | all. +-type segment_filter() :: {list(pos_integer()), tree_size()} | all. +-type modified_range() :: {ts(), ts()} | all. +-type ts() :: pos_integer(). +-type hash_method() :: pre_hash | {rehash, non_neg_integer()}. +-type change_method() :: {job, pos_integer()}|local|count. + +-type tree_size() :: xxsmall| xsmall| small| medium| large| xlarge. + %% Which client operation the default timeout is being requested %% for. `timeout' is the global default timeout. Any of these defaults %% can be overridden by setting the application environment variable @@ -177,7 +192,8 @@ % certificate authentication ssl_opts = [], % Arbitrary SSL options, see the erlang SSL % documentation. - reconnect_interval=?FIRST_RECONNECT_INTERVAL :: non_neg_integer()}). + reconnect_interval=?FIRST_RECONNECT_INTERVAL :: non_neg_integer(), + silence_terminate_crash = false :: boolean()}). -export_type([address/0, portnum/0]). @@ -249,12 +265,12 @@ is_connected(Pid, Timeout) -> %% @doc Ping the server %% @equiv ping(Pid, default_timeout(ping_timeout)) --spec ping(pid()) -> pong. +-spec ping(pid()) -> pong|{error, timeout}. ping(Pid) -> call_infinity(Pid, {req, rpbpingreq, default_timeout(ping_timeout)}). %% @doc Ping the server specifying timeout --spec ping(pid(), timeout()) -> pong. +-spec ping(pid(), timeout()) -> pong|{error, timeout}. ping(Pid, Timeout) -> call_infinity(Pid, {req, rpbpingreq, Timeout}). @@ -321,6 +337,15 @@ get(Pid, Bucket, Key, Options, Timeout) -> Req = get_options(Options, #rpbgetreq{type =T, bucket = B, key = Key}), call_infinity(Pid, {req, Req, Timeout}). +%% @doc Fetch replicated objects from a queue +-spec fetch(pid(), binary()) -> + {ok, queue_empty}| + {ok|crc_wonky, {deleted, term(), binary()}|binary()}. +fetch(Pid, QueueName) -> + Req = #rpbfetchreq{queuename = QueueName}, + call_infinity(Pid, {req, Req, default_timeout(get_timeout)}). + + %% @doc Put the metadata/value in the object under bucket/key %% @equiv put(Pid, Obj, []) %% @see put/4 @@ -1315,6 +1340,597 @@ replace_coverage(Pid, Bucket, Cover, Other) -> {req, #rpbcoveragereq{type=T, bucket=B, replace_cover=Cover, unavailable_cover=Other}, Timeout}). + +%% @doc Get the merged aae tictactree root for the given `NVal' +-spec aae_merge_root(pid(), NVal::pos_integer()) -> + {ok, {root, binary()}} | + {error, any()}. +aae_merge_root(Pid, NVal) -> + Timeout = default_timeout(get_coverage_timeout), + call_infinity(Pid, + {req, + #rpbaaefoldmergerootnvalreq{n_val = NVal}, + Timeout}). + + +%% @doc get the aae merged branches for the given `NVal', restricted +%% to the given list of `Branches' +-spec aae_merge_branches(pid(), + NVal::pos_integer(), + Branches::list(pos_integer())) -> + {ok, {branches, [{BranchId::integer(), Branch::binary()}]}} | + {error, any()}. +aae_merge_branches(Pid, NVal, Branches) -> + Timeout = default_timeout(get_coverage_timeout), + call_infinity(Pid, + {req, + #rpbaaefoldmergebranchnvalreq{n_val = NVal, + id_filter = Branches}, + Timeout}). + + +%% @doc get the aae merged branches for the given `NVal', restricted +%% to the given list of `Branches' +-spec aae_fetch_clocks(pid(), + NVal::pos_integer(), + Segments::list(pos_integer())) -> + {ok, {keysclocks, + [{{riakc_obj:bucket(), + riakc_obj:key()}, + binary()}]}} | + {error, any()}. +aae_fetch_clocks(Pid, NVal, Segments) -> + Timeout = default_timeout(get_coverage_timeout), + call_infinity(Pid, + {req, + #rpbaaefoldfetchclocksnvalreq{n_val = NVal, + id_filter = Segments}, + Timeout}). + +%% @doc generate a tictac tree by folding over a range of keys +%% in`Bucket'. The fold can be limited to the keys in `KeyRange' which +%% is a pair `{Start::binary(), End::binary()}` that defines a range +%% of keys, or the atom `all'. The `TreeSize' parameter is an atom, +%% one of `xxsmall', `xsmall', `small', `medium', `large', or `xlarge' +%% which determines, well, the tictac tree size. `SegmentFilter' +%% further limits ths returned tree, it can be a pair of `{Segments, +%% TreeSize}' where `Segments' is a list of integers (segments to +%% return) and `TreeSize' the tree size that was initially queried to +%% return the segments in `Segments', or it can be the atom +%% `all'. `ModifiedRange' can restrict the tree fold to only include +%% keys whose last modified date is in the range. The Range is a pair +%% `{Start::pos_integer(), End::pos_integer()}' where both `Start' and +%% `End' are 32-bit unix timestamps that represents seconds since the +%% epoch. Finally `HashMethod' is one of `pre_hash' or `{rehash, +%% IV::non_neg_integer()}'. The former uses the default hashing, the +%% latter instructs the tictac tree to be built hashing the objects' +%% vector clocks with a hash initialised with the value of `IV'. This +%% is for those of you worried about hash collisions. NOTE: what is +%% returned is mochijson2 style {struct, ETC} terms, as this is what +%% leveled_tictact:import_tree expects +-spec aae_range_tree(pid(), riakc_obj:bucket(), + key_range(), tree_size(), + segment_filter(), modified_range(), hash_method()) -> + {ok, {tree, Tree::any()}} | {error, any()}. +aae_range_tree(Pid, BucketType, KeyRange, TreeSize, + SegmentFilter, ModifiedRange, HashMethod) -> + Timeout = default_timeout(get_coverage_timeout), + {KR, SK, EK} = + case KeyRange of + all -> + {false, undefined, undefined}; + {SK0, EK0} -> + {true, SK0, EK0} + end, + {SF, SFL, FTS} = + case SegmentFilter of + all -> + {false, [], undefined}; + {SFL0, FTS0} -> + {true, SFL0, FTS0} + end, + {MR, MRLow, MRHigh} = + case ModifiedRange of + all -> + {false, undefined, undefined}; + {MRL, MRH} -> + {true, MRL, MRH} + end, + {HM, IV} = + case HashMethod of + pre_hash -> + {true, undefined}; + {rehash, IV0} -> + {false, IV0} + end, + {T, B} = + case BucketType of + B0 when is_binary(B0) -> + {undefined, B0}; + {T0, B0} -> + {T0, B0} + end, + call_infinity(Pid, + {req, + #rpbaaefoldmergetreesrangereq{type = T, + bucket = B, + key_range = KR, + start_key = SK, + end_key = EK, + tree_size = TreeSize, + segment_filter = SF, + id_filter = SFL, + filter_tree_size = FTS, + modified_range = MR, + last_mod_start = MRLow, + last_mod_end = MRHigh, + use_prehash = HM, + init_vector = IV}, + Timeout}). + +-spec aae_range_clocks(pid(), + riakc_obj:bucket(), key_range(), + segment_filter(), modified_range()) -> + {ok, + {keysclocks, + [{{riakc_obj:bucket(), + riakc_obj:key()}, + binary()}]}} | + {error, any()}. +aae_range_clocks(Pid, BucketType, KeyRange, SegmentFilter, ModifiedRange) -> + Timeout = default_timeout(get_coverage_timeout), + {KR, SK, EK} = + case KeyRange of + all -> + {false, undefined, undefined}; + {SK0, EK0} -> + {true, SK0, EK0} + end, + {SF, SFL, FTS} = + case SegmentFilter of + all -> + {false, [], undefined}; + {SFL0, FTS0} -> + {true, SFL0, FTS0} + end, + {MR, MRLow, MRHigh} = + case ModifiedRange of + all -> + {false, undefined, undefined}; + {MRL, MRH} -> + {true, MRL, MRH} + end, + {T, B} = + case BucketType of + B0 when is_binary(B0) -> + {undefined, B0}; + {T0, B0} -> + {T0, B0} + end, + call_infinity(Pid, + {req, + #rpbaaefoldfetchclocksrangereq{type = T, + bucket = B, + key_range = KR, + start_key = SK, + end_key = EK, + segment_filter = SF, + id_filter = SFL, + filter_tree_size = FTS, + modified_range = MR, + last_mod_start = MRLow, + last_mod_end = MRHigh}, + Timeout}). + + +%% @doc aae_range_repllkeys +%% Fold over a range of keys and queue up those keys to be replicated to the +%% other site. Once the keys are replicated the objects will then be fetched, +%% as long as a site is consuming from that replication queue. +%% Will return the number of keys which have been queued for replication. +-spec aae_range_replkeys(pid(), riakc_obj:bucket(), + key_range(), modified_range(), + atom()) -> + {ok, non_neg_integer()} | + {error, any()}. +aae_range_replkeys(Pid, BucketType, KeyRange, ModifiedRange, QueueName) -> + Timeout = default_timeout(get_coverage_timeout), + {KR, SK, EK} = + case KeyRange of + all -> + {false, undefined, undefined}; + {SK0, EK0} -> + {true, SK0, EK0} + end, + {MR, MRLow, MRHigh} = + case ModifiedRange of + all -> + {false, undefined, undefined}; + {MRL, MRH} -> + {true, MRL, MRH} + end, + {T, B} = + case BucketType of + B0 when is_binary(B0) -> + {undefined, B0}; + {T0, B0} -> + {T0, B0} + end, + QN = atom_to_binary(QueueName, utf8), + call_infinity(Pid, + {req, + #rpbaaefoldreplkeysreq{type = T, + bucket = B, + key_range = KR, + start_key = SK, + end_key = EK, + modified_range = MR, + last_mod_start = MRLow, + last_mod_end = MRHigh, + queuename = QN}, + Timeout}). + + +%% @doc aae_find_keys folds over the tictacaae store to get +%% operational information. `Rhc' is the client. `Bucket' is the +%% bucket to fold over. `KeyRange' as before is a two tuple of +%% `{Start, End}' where both ` Start' and `End' are binaries that +%% represent the first and last key of a range to fold over. The atom +%% `all' means all fol over keys in the bucket. `ModifiedRange' is a +%% pair `{StartDate, EndDate}' or 32-bit integer unix timestamps, or +%% the atom `all', that limits the fold to only the keys that have a +%% last-modified date in the range. the `Query' is either +%% `{sibling_coun, N}` or `{object_size, N}' where `N' is an +%% integer. for `sibling_count' `N' means return all keys that have +%% more than `N' siblings. NOTE: 1 sibling means a single value in +%% this implementation, therefore if you want all keys that have more +%% than a single value AT THE VNODE then `{sibling_count, 1}' is your +%% query. NOTE NOTE: It is possible that all N vnodes have a single +%% value, and that value is different on each vnode (temporarily +%% only), this query would not detect that state. For `object_size' it +%% means return all keys whose object size is greater than `N'. The +%% result is a list of pairs `{Key, Count | Size}' +-spec aae_find_keys(pid(), + riakc_obj:bucket(), key_range(), + modified_range(), Query) -> + {ok, {keys, list({riakc_obj:key(), pos_integer()})}} | + {error, any()} when + Query :: {sibling_count, pos_integer()} | {object_size, pos_integer()}. +aae_find_keys(Pid, BucketType, KeyRange, ModifiedRange, Query) -> + Timeout = default_timeout(get_coverage_timeout), + {KR, SK, EK} = + case KeyRange of + all -> + {false, undefined, undefined}; + {SK0, EK0} -> + {true, SK0, EK0} + end, + {MR, MRLow, MRHigh} = + case ModifiedRange of + all -> + {false, undefined, undefined}; + {MRL, MRH} -> + {true, MRL, MRH} + end, + {T, B} = + case BucketType of + B0 when is_binary(B0) -> + {undefined, B0}; + {T0, B0} -> + {T0, B0} + end, + call_infinity(Pid, + {req, + #rpbaaefoldfindkeysreq{type = T, + bucket = B, + key_range = KR, + start_key = SK, + end_key = EK, + modified_range = MR, + last_mod_start = MRLow, + last_mod_end = MRHigh, + finder = element(1, Query), + find_limit = element(2, Query) + }, + Timeout}). + + +%% @doc find_tombs will find tombstone keys in a given bucket and key_range +%% returning the key and delete_hash, where the delete_hash is an integer that +%% can be used in a reap request. The SegmentFilter is intended to be used as a +%% mechanism for assiting in scheduling work - a way for splitting out the +%% process of finding/reaping tombstones into batches without having +%% inconsistencies within the AAE trees. +-spec aae_find_tombs(pid(), + riakc_obj:bucket(), key_range(), + segment_filter(), + modified_range()) -> + {ok, {keys, list({riakc_obj:key(), pos_integer()})}} | + {error, any()}. +aae_find_tombs(Pid, BucketType, KeyRange, SegmentFilter, ModifiedRange) -> + Timeout = default_timeout(get_coverage_timeout), + {KR, SK, EK} = + case KeyRange of + all -> + {false, undefined, undefined}; + {SK0, EK0} -> + {true, SK0, EK0} + end, + {MR, MRLow, MRHigh} = + case ModifiedRange of + all -> + {false, undefined, undefined}; + {MRL, MRH} -> + {true, MRL, MRH} + end, + {T, B} = + case BucketType of + B0 when is_binary(B0) -> + {undefined, B0}; + {T0, B0} -> + {T0, B0} + end, + {SF, SFL, FTS} = + case SegmentFilter of + all -> + {false, [], undefined}; + {SFL0, FTS0} -> + {true, SFL0, FTS0} + end, + call_infinity(Pid, + {req, + #rpbaaefoldfindtombsreq{type = T, + bucket = B, + key_range = KR, + start_key = SK, + end_key = EK, + segment_filter = SF, + id_filter = SFL, + filter_tree_size = FTS, + modified_range = MR, + last_mod_start = MRLow, + last_mod_end = MRHigh + }, + Timeout}). + +%% @doc reap_tombs will find tombstone keys in a given bucket and key_range. +%% The SegmentFilter is intended to be used as a mechanism for assiting in +%% scheduling work - a way for splitting out the process of finding/reaping +%% tombstones into batches without having inconsistencies within the AAE trees. +%% reap_tombs can be passed a change_method of count if a count of matching +%% tombstones is all that is required - this is an alternative to running +%% find_tombs and taking the length of the list. To actually reap either +%% `local` of `{ob, ID}` should be passed as the change_method. Using `local` +%% will reap each tombstone from the node local to which it is discovered, +%% whch will have the impact of distributing the reap load across the cluster +%% and increasing parallelisation of reap activity. Otherwise a job id can be +%% passed an a specific reaper will be started on the co-ordinating node of the +%% query only. The Id will be a positive integer used to identify this reap +%% task in logs. +-spec aae_reap_tombs(pid(), + riakc_obj:bucket(), key_range(), + segment_filter(), + modified_range(), + change_method()) -> + {ok, non_neg_integer()} | {error, any()}. +aae_reap_tombs(Pid, + BucketType, KeyRange, + SegmentFilter, ModifiedRange, + ChangeMethod) -> + Timeout = default_timeout(get_coverage_timeout), + {KR, SK, EK} = + case KeyRange of + all -> + {false, undefined, undefined}; + {SK0, EK0} -> + {true, SK0, EK0} + end, + {MR, MRLow, MRHigh} = + case ModifiedRange of + all -> + {false, undefined, undefined}; + {MRL, MRH} -> + {true, MRL, MRH} + end, + {T, B} = + case BucketType of + B0 when is_binary(B0) -> + {undefined, B0}; + {T0, B0} -> + {T0, B0} + end, + {SF, SFL, FTS} = + case SegmentFilter of + all -> + {false, [], undefined}; + {SFL0, FTS0} -> + {true, SFL0, FTS0} + end, + {CM0, JobID} = + case ChangeMethod of + {job, ID} when is_integer(ID) -> + {job, ID}; + count -> + {count, undefined}; + local -> + {local, undefined} + end, + call_infinity(Pid, + {req, + #rpbaaefoldreaptombsreq{type = T, + bucket = B, + key_range = KR, + start_key = SK, + end_key = EK, + segment_filter = SF, + id_filter = SFL, + filter_tree_size = FTS, + modified_range = MR, + last_mod_start = MRLow, + last_mod_end = MRHigh, + change_method = CM0, + job_id = JobID + }, + Timeout}). + +%% @doc erase_keys will find keys in a given bucket and key_range. +%% The SegmentFilter is intended to be used as a mechanism for assiting in +%% scheduling work - a way for splitting out the process of finding/reaping +%% tombstones into batches without having inconsistencies within the AAE trees. +%% erase_keys can be passed a change_method of count if a count of matching +%% keys is all that is required - this is an alternative to running +%% find_keys and taking the length of the list. To actually erase the object +%% either `local` of `{ob, ID}` should be passed as the change_method. Using +%% `local` will delete each object from the node local to which it is +%% discovered, which will have the impact of distributing the delete load +%% across the cluster and increasing parallelisation of delete activity. +%% Otherwise a job id can be passed an a specific eraser process will be +%% started on the co-ordinating node of the query only. The Id will be a +%% positive integer used to identify this erase task in logs. +-spec aae_erase_keys(pid(), + riakc_obj:bucket(), key_range(), + segment_filter(), + modified_range(), + change_method()) -> + {ok, non_neg_integer()} | {error, any()}. +aae_erase_keys(Pid, + BucketType, KeyRange, + SegmentFilter, ModifiedRange, + ChangeMethod) -> + Timeout = default_timeout(get_coverage_timeout), + {KR, SK, EK} = + case KeyRange of + all -> + {false, undefined, undefined}; + {SK0, EK0} -> + {true, SK0, EK0} + end, + {MR, MRLow, MRHigh} = + case ModifiedRange of + all -> + {false, undefined, undefined}; + {MRL, MRH} -> + {true, MRL, MRH} + end, + {T, B} = + case BucketType of + B0 when is_binary(B0) -> + {undefined, B0}; + {T0, B0} -> + {T0, B0} + end, + {SF, SFL, FTS} = + case SegmentFilter of + all -> + {false, [], undefined}; + {SFL0, FTS0} -> + {true, SFL0, FTS0} + end, + {CM0, JobID} = + case ChangeMethod of + {job, ID} when is_integer(ID) -> + {job, ID}; + count -> + {count, undefined}; + local -> + {local, undefined} + end, + call_infinity(Pid, + {req, + #rpbaaefolderasekeysreq{type = T, + bucket = B, + key_range = KR, + start_key = SK, + end_key = EK, + segment_filter = SF, + id_filter = SFL, + filter_tree_size = FTS, + modified_range = MR, + last_mod_start = MRLow, + last_mod_end = MRHigh, + change_method = CM0, + job_id = JobID + }, + Timeout}). + +%% @doc aae_object_stats folds over the tictacaae store to get +%% operational information. `Rhc' is the client. `Bucket' is the +%% bucket to fold over. `KeyRange' as before is a two tuple of +%% `{Start, End}' where both ` Start' and `End' are binaries that +%% represent the first and last key of a range to fold over. The atom +%% `all' means all fol over keys in the bucket. `ModifiedRange' is a +%% pair `{StartDate, EndDate}' or 32-bit integer unix timestamps, or +%% the atom `all', that limits the fold to only the keys that have a +%% last-modified date in the range. the `Query' is either +%% `{sibling_coun, N}` or `{object_size, N}' where `N' is an +%% integer. for `sibling_count' `N' means return all keys that have +%% more than `N' siblings. NOTE: 1 sibling means a single value in +%% this implementation, therefore if you want all keys that have more +%% than a single value AT THE VNODE then `{sibling_count, 1}' is your +%% query. NOTE NOTE: It is possible that all N vnodes have a single +%% value, and that value is different on each vnode (temporarily +%% only), this query would not detect that state. For `object_size' it +%% means return all keys whose object size is greater than `N'. The +%% result is a list of pairs `{Key, Count | Size}' +-spec aae_object_stats(pid(), + riakc_obj:bucket(), key_range(), + modified_range()) -> + {ok, {stats, list({Key::atom(), Val::atom() | list()})}} | + {error, any()}. +aae_object_stats(Pid, BucketType, KeyRange, ModifiedRange) -> + Timeout = default_timeout(get_coverage_timeout), + {KR, SK, EK} = + case KeyRange of + all -> + {false, undefined, undefined}; + {SK0, EK0} -> + {true, SK0, EK0} + end, + {MR, MRLow, MRHigh} = + case ModifiedRange of + all -> + {false, undefined, undefined}; + {MRL, MRH} -> + {true, MRL, MRH} + end, + {T, B} = + case BucketType of + B0 when is_binary(B0) -> + {undefined, B0}; + {T0, B0} -> + {T0, B0} + end, + call_infinity(Pid, + {req, + #rpbaaefoldobjectstatsreq{type = T, + bucket = B, + key_range = KR, + start_key = SK, + end_key = EK, + modified_range = MR, + last_mod_start = MRLow, + last_mod_end = MRHigh}, + Timeout}). + +%% @doc +%% List all the buckets with references in the AAE store. For reasonable +%% (e.g. < o(1000)) this should be quick and efficient unless using the +%% leveled_so parallel store. A minimum n_val can be passed if known. If +%% there are buckets (with keys) below the minimum n_val they may not be +%% detecting in the query. Will default to 1. +-spec aae_list_buckets(pid()) -> list(riakc_obj:bucket()). +aae_list_buckets(Pid) -> + Timeout = default_timeout(get_coverage_timeout), + call_infinity(Pid, {req, #rpbaaefoldlistbucketsreq{}, Timeout}). + +-spec aae_list_buckets(pid(), pos_integer()) -> list(riakc_obj:bucket()). +aae_list_buckets(Pid, MinNVal) when is_integer(MinNVal), MinNVal > 0 -> + Timeout = default_timeout(get_coverage_timeout), + call_infinity(Pid, + {req, + #rpbaaefoldlistbucketsreq{n_val = MinNVal}, + Timeout}). + %% ==================================================================== %% gen_server callbacks %% ==================================================================== @@ -1328,7 +1944,12 @@ init([Address, Port, Options]) -> queue = queue:new()}), case connect(State) of {error, Reason} when State#state.auto_reconnect /= true -> - {stop, {tcp, Reason}}; + case State#state.silence_terminate_crash of + true -> + {stop, normal}; + _ -> + {stop, {tcp, Reason}} + end; {error, _Reason} -> erlang:send_after(State#state.reconnect_interval, self(), reconnect), {ok, State}; @@ -1503,7 +2124,9 @@ parse_options([{cacertfile, File}|Options], State) -> parse_options([{keyfile, File}|Options], State) -> parse_options(Options, State#state{keyfile=File}); parse_options([{ssl_opts, Opts}|Options], State) -> - parse_options(Options, State#state{ssl_opts=Opts}). + parse_options(Options, State#state{ssl_opts=Opts}); +parse_options([{silence_terminate_crash,Bool}|Options], State) -> + parse_options(Options, State#state{silence_terminate_crash=Bool}). maybe_reply({reply, Reply, State}) -> Request = State#state.active, @@ -1549,6 +2172,9 @@ get_options([{n_val, N} | Rest], Req) get_options([{sloppy_quorum, Bool} | Rest], Req) when Bool == true; Bool == false -> get_options(Rest, Req#rpbgetreq{sloppy_quorum = Bool}); +get_options([{node_confirms, NodeConfirms} | Rest], Req) -> + NCOpt = riak_pb_kv_codec:encode_quorum(NodeConfirms), + get_options(Rest, Req#rpbgetreq{node_confirms = NCOpt}); get_options([{_, _} | _Rest], _Req) -> erlang:error(badarg). @@ -1662,6 +2288,9 @@ counter_val_options([{r, R} | Rest], Req) -> counter_val_options(Rest, Req#rpbcountergetreq{r=riak_pb_kv_codec:encode_quorum(R)}); counter_val_options([{pr, PR} | Rest], Req) -> counter_val_options(Rest, Req#rpbcountergetreq{pr=riak_pb_kv_codec:encode_quorum(PR)}); +counter_val_options([{node_confirms, NodeConfirms} | Rest], Req) -> + NCOpt = riak_pb_kv_codec:encode_quorum(NodeConfirms), + counter_val_options(Rest, Req#rpbcountergetreq{node_confirms = NCOpt}); counter_val_options([_ | _Rest], _Req) -> erlang:error(badarg). @@ -1715,6 +2344,23 @@ process_response(#request{msg = #rpbgetreq{type = Type, bucket = Bucket, key = K B = maybe_make_bucket_type(Type, Bucket), {reply, {ok, riakc_obj:new_obj(B, Key, Vclock, Contents)}, State}; +%% rpbfetchreq +process_response(#request{msg = #rpbfetchreq{}}, + #rpbfetchresp{queue_empty = true}, State) -> + {reply, {ok, queue_empty}, State}; +process_response(#request{msg = #rpbfetchreq{}}, + #rpbfetchresp{deleted = true, + crc_check = CRC, + replencoded_object = ObjBin, + deleted_vclock = VclockBin}, State) -> + {reply, + {crc_check(CRC,ObjBin), {deleted, VclockBin, ObjBin}}, + State}; +process_response(#request{msg = #rpbfetchreq{}}, + #rpbfetchresp{crc_check = CRC, + replencoded_object = ObjBin}, State) -> + {reply, {crc_check(CRC,ObjBin), ObjBin}, State}; + %% rpbputreq process_response(#request{msg = #rpbputreq{}}, rpbputresp, State) -> @@ -1911,6 +2557,86 @@ process_response(#request{msg = #rpbcountergetreq{}}, process_response(#request{msg = #rpbcountergetreq{}}, #rpbcountergetresp{value=Value}, State) -> {reply, {ok, Value}, State}; +%% Responses to AAE fold requests +process_response(#request{msg = #rpbaaefoldmergerootnvalreq{}}, + #rpbaaefoldtreeresp{size = _TreeSize, + level_one = Root}, + State) -> + {reply, {ok, {root, Root}}, State}; +process_response(#request{msg = #rpbaaefoldmergebranchnvalreq{}}, + #rpbaaefoldtreeresp{size = _TreeSize, + level_two = Branches}, + State) -> + {reply, {ok, {branches, lists:map(fun unpack_branch/1, Branches)}}, State}; +process_response(#request{msg = #rpbaaefoldfetchclocksnvalreq{}}, + #rpbaaefoldkeyvalueresp{response_type = <<"clock">>} = Rsp, + State) -> + KeysNClocks = Rsp#rpbaaefoldkeyvalueresp.keys_value, + {reply, + {ok, {keysclocks, lists:map(fun unpack_keyclock_fun/1, KeysNClocks)}}, + State}; +process_response(#request{msg = #rpbaaefoldmergetreesrangereq{tree_size = TS}}, + #rpbaaefoldtreeresp{size = TS, + level_one = Root, + level_two = Branches}, + State) -> + TreeToImport = + {struct, + [{<<"level1">>, + base64:encode_to_string(Root)}, + {<<"level2">>, + {struct, + lists:map(fun encode_branch/1, Branches)}}]}, + {reply, {ok, {tree, TreeToImport}}, State}; +process_response(#request{msg = #rpbaaefoldfetchclocksrangereq{}}, + #rpbaaefoldkeyvalueresp{response_type = <<"clock">>} = Rsp, + State) -> + KeysNClocks = Rsp#rpbaaefoldkeyvalueresp.keys_value, + {reply, + {ok, {keysclocks, lists:map(fun unpack_keyclock_fun/1, KeysNClocks)}}, + State}; +process_response(#request{msg = #rpbaaefoldfindkeysreq{}}, + #rpbaaefoldkeycountresp{keys_count = KeysCount}, + State) -> + {reply, + {ok, {keys, lists:map(fun unpack_keycount_fun/1, KeysCount)}}, + State}; +process_response(#request{msg = #rpbaaefoldfindtombsreq{}}, + #rpbaaefoldkeycountresp{keys_count = KeysDH}, + State) -> + %% In this case the integer value in each entry is not a count but a + %% delete hash + {reply, + {ok, {keys, lists:map(fun unpack_keycount_fun/1, KeysDH)}}, + State}; +process_response(#request{msg = #rpbaaefoldreaptombsreq{}}, + #rpbaaefoldkeycountresp{response_type = ReapTag, + keys_count = [ReapCount]}, + State) -> + true = <<"reap_tombs">> == ReapTag, + true = <<"dispatched_count">> == ReapCount#rpbkeyscount.tag, + {reply, {ok, ReapCount#rpbkeyscount.count}, State}; +process_response(#request{msg = #rpbaaefolderasekeysreq{}}, + #rpbaaefoldkeycountresp{response_type = EraseTag, + keys_count = [ReapCount]}, + State) -> + true = <<"erase_keys">> == EraseTag, + true = <<"dispatched_count">> == ReapCount#rpbkeyscount.tag, + {reply, {ok, ReapCount#rpbkeyscount.count}, State}; +process_response(#request{msg = #rpbaaefoldreplkeysreq{}}, + #rpbaaefoldkeycountresp{keys_count = [DispatchCount]}, + State) -> + true = <<"dispatched_count">> == DispatchCount#rpbkeyscount.tag, + {reply, {ok, DispatchCount#rpbkeyscount.count}, State}; +process_response(#request{msg = #rpbaaefoldobjectstatsreq{}}, + #rpbaaefoldkeycountresp{keys_count = KeysCount}, + State) -> + RawStats = lists:map(fun unpack_keycount_fun/1, KeysCount), + {reply, {ok, {stats, stats_output(RawStats)}}, State}; +process_response(#request{msg = #rpbaaefoldlistbucketsreq{}}, + #rpbaaefoldlistbucketsresp{bucket_list = BucketList}, + State) -> + {reply, {ok, lists:map(fun unpack_bucket/1, BucketList)}, State}; process_response(#request{msg = #dtfetchreq{}}, #dtfetchresp{}=Resp, State) -> @@ -2055,6 +2781,75 @@ response_type(_ReturnTerms, _ReturnBody) -> keys. +unpack_bucket(RpbAaeFoldBucket) -> + case RpbAaeFoldBucket#rpbaaefoldbucket.type of + undefined -> + RpbAaeFoldBucket#rpbaaefoldbucket.bucket; + T -> + {T, RpbAaeFoldBucket#rpbaaefoldbucket.bucket} + end. + +unpack_keyclock_fun(RpbKeysClock) -> + case RpbKeysClock#rpbkeysvalue.type of + undefined -> + {{RpbKeysClock#rpbkeysvalue.bucket, + RpbKeysClock#rpbkeysvalue.key}, + RpbKeysClock#rpbkeysvalue.value}; + T -> + {{{T, RpbKeysClock#rpbkeysvalue.bucket}, + RpbKeysClock#rpbkeysvalue.key}, + RpbKeysClock#rpbkeysvalue.value} + end. + +unpack_keycount_fun(RpbKeysCount) -> + case RpbKeysCount#rpbkeyscount.order of + undefined -> + {RpbKeysCount#rpbkeyscount.tag, + RpbKeysCount#rpbkeyscount.count}; + Order -> + {RpbKeysCount#rpbkeyscount.tag, + Order, + RpbKeysCount#rpbkeyscount.count} + end. + +%% For absolute equivalence with HTTP client output need to +%% sort both this list of stats and nest lists for any repeated keys, and ehn +%% sort nay nested lists +stats_output(Stats) -> + Output = + lists:sort(lists:foldr(fun stats_fold_fun/2, [], lists:sort(Stats))), + lists:map(fun sort_nested/1, Output). + + +sort_nested({K, L}) when is_list(L) -> + L0 = + lists:map(fun({O, C}) -> {integer_to_binary(O), C} end, + lists:reverse(lists:sort(L))), + {K, L0}; +sort_nested(AnyOther) -> + AnyOther. + +stats_fold_fun({K, O, C}, Acc) -> + case lists:keyfind(K, 1, Acc) of + false -> + [{K, [{O, C}]}|Acc]; + {K, L} -> + lists:keyreplace(K, 1, Acc, {K, lists:sort([{O, C}|L])}) + end; +stats_fold_fun({K, C}, Acc) -> + [{K, C}|Acc]. + + +encode_branch(BranchBin) -> + {I, CB} = split_branch(BranchBin), + {integer_to_binary(I), base64:encode_to_string(CB)}. + +unpack_branch(BranchBin) -> + {I, CB} = split_branch(BranchBin), + {I, zlib:uncompress(CB)}. + +split_branch(<>) -> {I, CB}. + %% Helper for index responses -spec process_index_response('keys'|'terms'|'objects', list(), list()) -> index_stream_result(). @@ -2292,7 +3087,12 @@ disconnect(State) -> erlang:send_after(State#state.reconnect_interval, self(), reconnect), {noreply, increase_reconnect_interval(NewState)}; false -> - {stop, disconnected, NewState} + case State#state.silence_terminate_crash of + true -> + {stop, normal, NewState}; + _ -> + {stop, disconnected, NewState} + end end. %% Double the reconnect interval up to the maximum @@ -2385,6 +3185,12 @@ remove_queued_request(Ref, State) -> NewState#state{queue = queue:from_list(L2)} end. +crc_check(CRC, Bin) -> + case erlang:crc32(Bin) of + CRC -> ok; + _ -> crc_wonky + end. + %% @private -ifdef(deprecated_19). mk_reqid() -> erlang:phash2(crypto:strong_rand_bytes(10)). % only has to be unique per-pid @@ -2557,4 +3363,27 @@ increase_reconnect_interval_test(State) -> increase_reconnect_interval_test(NextState) end. +stats_output_test() -> + %% Raw details returned as in the mapped protocol buffer records + RawStats = + [{<<"total_count">>,10000}, + {<<"total_size">>,1218213}, + {<<"sizes">>,2,9994},{<<"sizes">>,3,6}, + {<<"siblings">>,1,9900},{<<"siblings">>,2,10},{<<"siblings">>,3,10}, + {<<"siblings">>,4,10},{<<"siblings">>,5,10},{<<"siblings">>,6,10}, + {<<"siblings">>,7,10},{<<"siblings">>,8,10},{<<"siblings">>,9,10}, + {<<"siblings">>,10,10},{<<"siblings">>,11,10}], + %% What the HTTP client would return for the equivalent + ExpectedStats = + lists:sort( + [{<<"siblings">>, + [{<<"11">>,10},{<<"10">>,10},{<<"9">>,10},{<<"8">>,10}, + {<<"7">>,10},{<<"6">>,10},{<<"5">>,10},{<<"4">>,10}, + {<<"3">>,10},{<<"2">>,10},{<<"1">>,9900}]}, + {<<"sizes">>,[{<<"3">>,6},{<<"2">>,9994}]}, + {<<"total_size">>,1218213}, + {<<"total_count">>,10000}]), + OutStats = stats_output(RawStats), + ?assertMatch(ExpectedStats, OutStats). + -endif.