From 074949b214c5fe6840e93ebfbf066537d3168e71 Mon Sep 17 00:00:00 2001 From: Russell Brown Date: Thu, 8 Jan 2015 10:18:34 +0000 Subject: [PATCH 01/22] Move spec to correct line so we can gen docs --- src/eleveldb.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/eleveldb.erl b/src/eleveldb.erl index cbe8b897..0e75563e 100644 --- a/src/eleveldb.erl +++ b/src/eleveldb.erl @@ -166,10 +166,10 @@ async_write(_CallerRef, _Ref, _Updates, _Opts) -> erlang:nif_error({error, not_loaded}). -spec async_iterator(reference(), db_ref(), read_options()) -> ok. --spec async_iterator(reference(), db_ref(), read_options(), keys_only) -> ok. async_iterator(_CallerRef, _Ref, _Opts) -> erlang:nif_error({error, not_loaded}). +-spec async_iterator(reference(), db_ref(), read_options(), keys_only) -> ok. async_iterator(_CallerRef, _Ref, _Opts, keys_only) -> erlang:nif_error({error, not_loaded}). From 1fb0a2dd57d1853fef62b02efa0d3336100c1886 Mon Sep 17 00:00:00 2001 From: "Andrew J. Stone" Date: Tue, 17 Feb 2015 15:48:15 -0500 Subject: [PATCH 02/22] use version 2.0.1 of cuttlefish --- rebar.config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rebar.config b/rebar.config index f02d3680..95b3a33c 100644 --- a/rebar.config +++ b/rebar.config @@ -9,7 +9,7 @@ {erl_opts, [warnings_as_errors, {parse_transform, lager_transform}, debug_info]}. {deps, [ - {cuttlefish, ".*", {git, "git://github.com/basho/cuttlefish.git", {branch, "develop"}}} + {cuttlefish, ".*", {git, "git://github.com/basho/cuttlefish.git", {tag, "2.0.1"}}} ]}. {port_env, [ From 9df7c69b7930ec685444aa2382b086a91a878760 Mon Sep 17 00:00:00 2001 From: Joseph Blomstedt Date: Thu, 19 Feb 2015 13:27:03 -0800 Subject: [PATCH 03/22] async_put (immutable work-in-progress) --- src/eleveldb.erl | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/eleveldb.erl b/src/eleveldb.erl index 0e75563e..d0803e42 100644 --- a/src/eleveldb.erl +++ b/src/eleveldb.erl @@ -25,6 +25,7 @@ close/1, get/3, put/4, + async_put/5, delete/3, write/3, fold/4, @@ -161,6 +162,11 @@ write(Ref, Updates, Opts) -> async_write(CallerRef, Ref, Updates, Opts), ?WAIT_FOR_REPLY(CallerRef). +async_put(Ref, Context, Key, Value, Opts) -> + Updates = [{put, Key, Value}], + async_write(Context, Ref, Updates, Opts), + ok. + -spec async_write(reference(), db_ref(), write_actions(), write_options()) -> ok. async_write(_CallerRef, _Ref, _Updates, _Opts) -> erlang:nif_error({error, not_loaded}). From 26a1ac6224287476e28e0635c0b8919e998e019b Mon Sep 17 00:00:00 2001 From: Fred Dushin Date: Thu, 26 Mar 2015 23:13:10 -0400 Subject: [PATCH 04/22] Fixed dialyzer spec for asyc_put --- src/eleveldb.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/src/eleveldb.erl b/src/eleveldb.erl index d0803e42..8046e158 100644 --- a/src/eleveldb.erl +++ b/src/eleveldb.erl @@ -162,6 +162,7 @@ write(Ref, Updates, Opts) -> async_write(CallerRef, Ref, Updates, Opts), ?WAIT_FOR_REPLY(CallerRef). +-spec async_put(db_ref(), reference(), binary(), binary(), write_options()) -> ok. async_put(Ref, Context, Key, Value, Opts) -> Updates = [{put, Key, Value}], async_write(Context, Ref, Updates, Opts), From d946c610ebd1aff5780293cc985eef611b8cd747 Mon Sep 17 00:00:00 2001 From: Jon Meredith Date: Sun, 29 Mar 2015 19:20:39 -0600 Subject: [PATCH 05/22] Extended EQC test for async_put. --- src/eleveldb.erl | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/eleveldb.erl b/src/eleveldb.erl index 8046e158..fa32975b 100644 --- a/src/eleveldb.erl +++ b/src/eleveldb.erl @@ -484,13 +484,23 @@ values() -> eqc_gen:non_empty(list(binary())). ops(Keys, Values) -> - {oneof([put, delete]), oneof(Keys), oneof(Values)}. + {oneof([put, async_put, delete]), oneof(Keys), oneof(Values)}. apply_kv_ops([], _Ref, Acc0) -> Acc0; apply_kv_ops([{put, K, V} | Rest], Ref, Acc0) -> ok = eleveldb:put(Ref, K, V, []), apply_kv_ops(Rest, Ref, orddict:store(K, V, Acc0)); +apply_kv_ops([{async_put, K, V} | Rest], Ref, Acc0) -> + MyRef = make_ref(), + Context = {my_context, MyRef}, + ok = eleveldb:async_put(Ref, Context, K, V, []), + receive + {Context, ok} -> + apply_kv_ops(Rest, Ref, orddict:store(K, V, Acc0)); + Msg -> + error({unexpected_msg, Msg}) + end; apply_kv_ops([{delete, K, _} | Rest], Ref, Acc0) -> ok = eleveldb:delete(Ref, K, []), apply_kv_ops(Rest, Ref, orddict:store(K, deleted, Acc0)). From b544b7587204d337d14969b543f9ed81e8c11f78 Mon Sep 17 00:00:00 2001 From: Jon Meredith Date: Fri, 29 May 2015 13:16:50 -0600 Subject: [PATCH 06/22] Bumped leveldb version to pick up sequential tunings --- c_src/build_deps.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/c_src/build_deps.sh b/c_src/build_deps.sh index 7c211af2..81bc4ab5 100755 --- a/c_src/build_deps.sh +++ b/c_src/build_deps.sh @@ -8,7 +8,7 @@ if [ `uname -s` = 'SunOS' -a "${POSIX_SHELL}" != "true" ]; then fi unset POSIX_SHELL # clear it so if we invoke other scripts, they run as ksh as well -LEVELDB_VSN="2.0.0" +LEVELDB_VSN="2.0.1" SNAPPY_VSN="1.0.4" From 58bb3881dff67cee867c1497709143b5a788e2b4 Mon Sep 17 00:00:00 2001 From: Matthew V Date: Wed, 3 Jun 2015 16:34:13 -0400 Subject: [PATCH 07/22] Update build_deps to pull leveldb 2.0.2 --- c_src/build_deps.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/c_src/build_deps.sh b/c_src/build_deps.sh index 81bc4ab5..930fd853 100755 --- a/c_src/build_deps.sh +++ b/c_src/build_deps.sh @@ -8,7 +8,7 @@ if [ `uname -s` = 'SunOS' -a "${POSIX_SHELL}" != "true" ]; then fi unset POSIX_SHELL # clear it so if we invoke other scripts, they run as ksh as well -LEVELDB_VSN="2.0.1" +LEVELDB_VSN="2.0.2" SNAPPY_VSN="1.0.4" From 812335f353fa2c8a3473ab9194fa5375587f3027 Mon Sep 17 00:00:00 2001 From: Matthew V Date: Mon, 15 Jun 2015 12:01:39 -0400 Subject: [PATCH 08/22] push leveldb version string into environment for build. --- c_src/build_deps.sh | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/c_src/build_deps.sh b/c_src/build_deps.sh index 930fd853..757ef8e6 100755 --- a/c_src/build_deps.sh +++ b/c_src/build_deps.sh @@ -8,7 +8,7 @@ if [ `uname -s` = 'SunOS' -a "${POSIX_SHELL}" != "true" ]; then fi unset POSIX_SHELL # clear it so if we invoke other scripts, they run as ksh as well -LEVELDB_VSN="2.0.2" +LEVELDB_VSN="2.0" SNAPPY_VSN="1.0.4" @@ -47,6 +47,7 @@ case "$1" in export CXXFLAGS="$CXXFLAGS -I $BASEDIR/system/include" export LDFLAGS="$LDFLAGS -L$BASEDIR/system/lib" export LD_LIBRARY_PATH="$BASEDIR/system/lib:$LD_LIBRARY_PATH" + export LEVELDB_VSN="$LEVELDB_VSN" (cd leveldb && $MAKE check) @@ -71,6 +72,7 @@ case "$1" in export CXXFLAGS="$CXXFLAGS -I $BASEDIR/system/include" export LDFLAGS="$LDFLAGS -L$BASEDIR/system/lib" export LD_LIBRARY_PATH="$BASEDIR/system/lib:$LD_LIBRARY_PATH" + export LEVELDB_VSN="$LEVELDB_VSN" if [ ! -d leveldb ]; then git clone git://github.com/basho/leveldb From cbcee672e62e709db22ac50d88cbc8e38ceb7575 Mon Sep 17 00:00:00 2001 From: Jon Meredith Date: Mon, 15 Jun 2015 14:32:32 -0600 Subject: [PATCH 09/22] Bumping leveldb to 2.0.3 --- c_src/build_deps.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/c_src/build_deps.sh b/c_src/build_deps.sh index 7c211af2..40512b02 100755 --- a/c_src/build_deps.sh +++ b/c_src/build_deps.sh @@ -8,7 +8,7 @@ if [ `uname -s` = 'SunOS' -a "${POSIX_SHELL}" != "true" ]; then fi unset POSIX_SHELL # clear it so if we invoke other scripts, they run as ksh as well -LEVELDB_VSN="2.0.0" +LEVELDB_VSN="2.0.3" SNAPPY_VSN="1.0.4" From 4f8e032e5f825cea6fa5e54609a9d0ce462bfb65 Mon Sep 17 00:00:00 2001 From: Jon Meredith Date: Thu, 9 Jul 2015 17:16:21 -0600 Subject: [PATCH 10/22] Merge pull request #140 from ahf/cleanup-lager Remove lager parse transform --- rebar.config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rebar.config b/rebar.config index 95b3a33c..b1d912e3 100644 --- a/rebar.config +++ b/rebar.config @@ -6,7 +6,7 @@ {port_sources, ["c_src/*.cc"]}. -{erl_opts, [warnings_as_errors, {parse_transform, lager_transform}, debug_info]}. +{erl_opts, [warnings_as_errors, debug_info]}. {deps, [ {cuttlefish, ".*", {git, "git://github.com/basho/cuttlefish.git", {tag, "2.0.1"}}} From f9cae4fcaa167a9a38dbf7d5249e357b9fd5b322 Mon Sep 17 00:00:00 2001 From: Gordon Guthrie Date: Fri, 7 Aug 2015 08:58:23 +0000 Subject: [PATCH 11/22] interim commit of changed API for TS - needs dialyzer fix --- src/eleveldb.erl | 15 ++++++++++++++- test/range_scan.erl | 8 ++++---- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/src/eleveldb.erl b/src/eleveldb.erl index a254eefa..560f8e1d 100644 --- a/src/eleveldb.erl +++ b/src/eleveldb.erl @@ -45,6 +45,11 @@ parse_string/1, is_empty/1]). +%% for testing +-export([ + ts_key_TEST/1 + ]). + -export([option_types/1, validate_options/2]). @@ -507,7 +512,15 @@ ts_batch_to_binary({ts_batch, Family, Series, Points}) -> B3 = append_string(Series, B2), append_points(Points, B3). -ts_key({Family, Series, Time}) -> +ts_key(List) when is_list(List) -> + ts_l2(lists:reverse(List), <<>>). + +ts_l2([], Acc) -> + Acc; +ts_l2([H | T], Acc) -> + ts_l2(T, append_string(H, Acc)). + +ts_key_TEST({Family, Series, Time}) -> B1 = <>, B2 = append_string(Family, B1), append_string(Series, B2). diff --git a/test/range_scan.erl b/test/range_scan.erl index 68b07179..88c156f4 100644 --- a/test/range_scan.erl +++ b/test/range_scan.erl @@ -60,8 +60,8 @@ test_range_query(Ref) -> end}. read_items_with_level_filter(Ref, Start0, End0, F1, F2) -> - Start = eleveldb:ts_key({?FAMILY, ?SERIES, Start0}), - End = eleveldb:ts_key({?FAMILY, ?SERIES, End0}), + Start = eleveldb:ts_key_TEST({?FAMILY, ?SERIES, Start0}), + End = eleveldb:ts_key_TEST({?FAMILY, ?SERIES, End0}), Opts = [{range_filter, {"and", [ {">=", [{field, "field_1"}, {const, F1}]}, {"<=", [{field, "field_2"}, {const, F2}]} @@ -76,8 +76,8 @@ calc_expected_rows(End0, F2, Start0, F1) -> lists:min([End0, F2]) - lists:max([Start0, F1]) + 1. read_items_with_erlang_filter(Ref, Start0, End0, F1, F2) -> - Start = eleveldb:ts_key({?FAMILY, ?SERIES, Start0}), - End = eleveldb:ts_key({?FAMILY, ?SERIES, End0}), + Start = eleveldb:ts_key_TEST({?FAMILY, ?SERIES, Start0}), + End = eleveldb:ts_key_TEST({?FAMILY, ?SERIES, End0}), Opts = [], ExpectedRows = calc_expected_rows(End0, F2, Start0, F1), time_result("With Erlang Filtering", fun() -> From e31a6de88b9c7a9d3e00397099950711184f61b7 Mon Sep 17 00:00:00 2001 From: Matthew Von-Maszewski Date: Tue, 11 Aug 2015 15:24:27 -0400 Subject: [PATCH 12/22] backport develop to 2.1 branch on 8/11/15 --- c_src/build_deps.sh | 2 +- c_src/eleveldb.cc | 78 ++++++++++++++++++++++++-------------------- c_src/eleveldb.h | 1 + c_src/workitems.cc | 26 +++++++++++++++ c_src/workitems.h | 28 ++++++++++++++++ src/eleveldb.app.src | 6 ++-- src/eleveldb.erl | 13 +++++--- 7 files changed, 110 insertions(+), 44 deletions(-) diff --git a/c_src/build_deps.sh b/c_src/build_deps.sh index 01ae21fd..2fea18c8 100755 --- a/c_src/build_deps.sh +++ b/c_src/build_deps.sh @@ -8,7 +8,7 @@ if [ `uname -s` = 'SunOS' -a "${POSIX_SHELL}" != "true" ]; then fi unset POSIX_SHELL # clear it so if we invoke other scripts, they run as ksh as well -LEVELDB_VSN="2.0.3" +LEVELDB_VSN="2.1.3" SNAPPY_VSN="1.0.4" diff --git a/c_src/eleveldb.cc b/c_src/eleveldb.cc index 95147cb7..ddb800db 100644 --- a/c_src/eleveldb.cc +++ b/c_src/eleveldb.cc @@ -63,7 +63,7 @@ static ErlNifFunc nif_funcs[] = {"async_close", 2, eleveldb::async_close}, {"async_iterator_close", 2, eleveldb::async_iterator_close}, {"status", 2, eleveldb_status}, - {"destroy", 2, eleveldb_destroy}, + {"async_destroy", 3, eleveldb::async_destroy}, {"repair", 2, eleveldb_repair}, {"is_empty", 1, eleveldb_is_empty}, @@ -1024,9 +1024,48 @@ async_iterator_close( } // async_iterator_close + +ERL_NIF_TERM +async_destroy( + ErlNifEnv* env, + int argc, + const ERL_NIF_TERM argv[]) +{ + char db_name[4096]; + + if(!enif_get_string(env, argv[1], db_name, sizeof(db_name), ERL_NIF_LATIN1) || + !enif_is_list(env, argv[2])) + { + return enif_make_badarg(env); + } // if + + ERL_NIF_TERM caller_ref = argv[0]; + + eleveldb_priv_data& priv = *static_cast(enif_priv_data(env)); + + leveldb::Options *opts = new leveldb::Options; + fold(env, argv[2], parse_open_option, *opts); + + eleveldb::WorkTask *work_item = new eleveldb::DestroyTask(env, caller_ref, + db_name, opts); + + if(false == priv.thread_pool.submit(work_item)) + { + delete work_item; + return send_reply(env, caller_ref, + enif_make_tuple2(env, eleveldb::ATOM_ERROR, caller_ref)); + } + + return eleveldb::ATOM_OK; + +} // async_destroy + } // namespace eleveldb +/** + * HEY YOU ... please make async + */ ERL_NIF_TERM eleveldb_status( ErlNifEnv* env, @@ -1068,6 +1107,9 @@ eleveldb_status( } // eleveldb_status +/** + * HEY YOU ... please make async + */ ERL_NIF_TERM eleveldb_repair( ErlNifEnv* env, @@ -1096,40 +1138,6 @@ eleveldb_repair( } } // eleveldb_repair -/** - * HEY YOU ... please make async - */ -ERL_NIF_TERM -eleveldb_destroy( - ErlNifEnv* env, - int argc, - const ERL_NIF_TERM argv[]) -{ - char name[4096]; - if (enif_get_string(env, argv[0], name, sizeof(name), ERL_NIF_LATIN1) && - enif_is_list(env, argv[1])) - { - // Parse out the options - leveldb::Options opts; - fold(env, argv[1], parse_open_option, opts); - - leveldb::Status status = leveldb::DestroyDB(name, opts); - if (!status.ok()) - { - return error_tuple(env, eleveldb::ATOM_ERROR_DB_DESTROY, status); - } - else - { - return eleveldb::ATOM_OK; - } - } - else - { - return enif_make_badarg(env); - } - -} // eleveldb_destroy - ERL_NIF_TERM eleveldb_is_empty( diff --git a/c_src/eleveldb.h b/c_src/eleveldb.h index b007156f..13361882 100644 --- a/c_src/eleveldb.h +++ b/c_src/eleveldb.h @@ -45,6 +45,7 @@ ERL_NIF_TERM async_open(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); ERL_NIF_TERM async_write(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); ERL_NIF_TERM async_get(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); ERL_NIF_TERM async_close(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); +ERL_NIF_TERM async_destroy(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); ERL_NIF_TERM async_iterator(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); ERL_NIF_TERM async_iterator_move(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); diff --git a/c_src/workitems.cc b/c_src/workitems.cc index fdb07e38..8f6c3aac 100644 --- a/c_src/workitems.cc +++ b/c_src/workitems.cc @@ -361,6 +361,32 @@ MoveTask::recycle() } // MoveTask::recycle +/** + * DestroyTask functions + */ + +DestroyTask::DestroyTask( + ErlNifEnv* caller_env, + ERL_NIF_TERM& _caller_ref, + const std::string& db_name_, + leveldb::Options *open_options_) + : WorkTask(caller_env, _caller_ref), + db_name(db_name_), open_options(open_options_) +{ +} // DestroyTask::DestroyTask + + +work_result +DestroyTask::operator()() +{ + leveldb::Status status = leveldb::DestroyDB(db_name, *open_options); + + if(!status.ok()) + return error_tuple(local_env(), ATOM_ERROR_DB_DESTROY, status); + + return work_result(ATOM_OK); + +} // DestroyTask::operator() } // namespace eleveldb diff --git a/c_src/workitems.h b/c_src/workitems.h index 62ef92a6..807934b3 100644 --- a/c_src/workitems.h +++ b/c_src/workitems.h @@ -439,6 +439,34 @@ class ItrCloseTask : public WorkTask }; // class ItrCloseTask + +/** + * Background object for async open of a leveldb instance + */ + +class DestroyTask : public WorkTask +{ +protected: + std::string db_name; + leveldb::Options *open_options; // associated with db handle, we don't free it + +public: + DestroyTask(ErlNifEnv* caller_env, ERL_NIF_TERM& _caller_ref, + const std::string& db_name_, leveldb::Options *open_options_); + + virtual ~DestroyTask() {}; + + virtual work_result operator()(); + +private: + DestroyTask(); + DestroyTask(const DestroyTask &); + DestroyTask & operator=(const DestroyTask &); + +}; // class DestroyTask + + + } // namespace eleveldb diff --git a/src/eleveldb.app.src b/src/eleveldb.app.src index 991a8d61..cc2b4343 100644 --- a/src/eleveldb.app.src +++ b/src/eleveldb.app.src @@ -8,11 +8,11 @@ stdlib ]}, {env, [ - %% what percent of total memory should go to + %% what percent of total memory should go to %% leveldb. Default is 15% on the basis of - %% a single development machine running 5 + %% a single development machine running 5 %% Riak instances would therefore claim 75%. - %% REAL users will want this at 75 to 80 + %% REAL users will want this at 70%. {total_leveldb_mem_percent, 15}, %% Use bloom filter support by default diff --git a/src/eleveldb.erl b/src/eleveldb.erl index fa32975b..1e72bab9 100644 --- a/src/eleveldb.erl +++ b/src/eleveldb.erl @@ -250,13 +250,16 @@ status(Ref, Key) -> status_int(_Ref, _Key) -> erlang:nif_error({error, not_loaded}). +-spec async_destroy(reference(), string(), open_options()) -> ok. +async_destroy(_CallerRef, _Name, _Opts) -> + erlang:nif_error({error, not_loaded}). + -spec destroy(string(), open_options()) -> ok | {error, any()}. destroy(Name, Opts) -> - eleveldb_bump:big(), - destroy_int(Name, Opts). - -destroy_int(_Name, _Opts) -> - erlang:nif_error({erlang, not_loaded}). + CallerRef = make_ref(), + Opts2 = add_open_defaults(Opts), + async_destroy(CallerRef, Name, Opts2), + ?WAIT_FOR_REPLY(CallerRef). repair(Name, Opts) -> eleveldb_bump:big(), From 90b263a845e51b18a0a53e2883563ea50e13c3f5 Mon Sep 17 00:00:00 2001 From: Gordon Guthrie Date: Mon, 17 Aug 2015 10:32:38 +0000 Subject: [PATCH 13/22] minor refactoring for time series integration --- src/eleveldb.erl | 10 +++++- src/eleveldb_ts.erl | 80 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 89 insertions(+), 1 deletion(-) create mode 100644 src/eleveldb_ts.erl diff --git a/src/eleveldb.erl b/src/eleveldb.erl index 560f8e1d..3212f3c9 100644 --- a/src/eleveldb.erl +++ b/src/eleveldb.erl @@ -43,7 +43,8 @@ ts_batch_to_binary/1, ts_key/1, parse_string/1, - is_empty/1]). + is_empty/1, + encode/2]). %% for testing -export([ @@ -148,6 +149,13 @@ init() -> -opaque itr_ref() :: binary(). +encode(Val, timestamp) -> + <>; +encode(Val, binary) when is_binary(Val)-> + Val; +encode(Val, _) -> + term_to_binary(Val). + -spec async_open(reference(), string(), open_options()) -> ok. async_open(_CallerRef, _Name, _Opts) -> erlang:nif_error({error, not_loaded}). diff --git a/src/eleveldb_ts.erl b/src/eleveldb_ts.erl new file mode 100644 index 00000000..b7d742aa --- /dev/null +++ b/src/eleveldb_ts.erl @@ -0,0 +1,80 @@ +-module(eleveldb_ts). + +-export([ + encode_key/1, + encode_record/1, + decode_record/1 + ]). + +-ifdef(TEST). +-compile(export_all). +-include_lib("eunit/include/eunit.hrl"). +-endif. + +encode_key(Elements) when is_list(Elements) -> + encode_k2(Elements, <<>>). + +%% TODO recheck this packing +%% binary list stuff is shocking here too +encode_k2([], Bin) -> Bin; +encode_k2([{timestamp, Ts} | T], Bin) -> encode_k2(T, append(<>, Bin)); +encode_k2([{float, F} | T], Bin) -> encode_k2(T, append(<>, Bin)); +encode_k2([{int, I} | T], Bin) -> encode_k2(T, append(<>, Bin)); +encode_k2([{binary, B} | T], Bin) when is_binary(B) -> + encode_k2(T, append(B, Bin)); +encode_k2([{binary, L} | T], Bin) when is_list(L) -> + B = list_to_binary(L), + encode_k2(T, append(B, Bin)). + +encode_record(Record) -> + msgpack:pack(Record, [{format, jsx}]). + +decode_record(Bin) when is_binary(Bin) -> + msgpack:unpack(Bin, [{format, jsx}]). + +%% +%% Internal Funs +%% + +append_varint(N, Bin) -> + N2 = N bsr 7, + case N2 of + 0 -> + C = N rem 128, + <>; + _ -> + C = (N rem 128) + 128, + append_varint(N2, <>) + end. + +append(S, Bin) when is_binary(S) andalso + is_binary(Bin) -> + L = byte_size(S), + B2 = append_varint(L, Bin), + <>. + +%% =================================================================== +%% EUnit tests +%% =================================================================== +-ifdef(TEST). + +simple_encode_key_test() -> + Key = [{timestamp, 1}, {binary, <<"abc">>}, {float, 1.0}, {int, 9}], + Got = encode_key(Key), + Exp = <<8,0,0,0,0,0,0,0,01,3,97,98,99,8,63,240,0,0,0,0,0,0,8,0,0,0,0,0,0,0,9>>, + ?assertEqual(Exp, Got). + +simple_encode_record_test() -> + Rec = [{<<"field_1">>, 123}, {<<"field_2">>, "abdce"}], + Got = encode_record(Rec), + Exp = <<130,167,102,105,101,108,100,95,49,123,167,102,105,101,108,100,95,50,149,97,98,100,99,101>>, + ?assertEqual(Exp, Got). + +simple_decode_record_test() -> + Rec = <<130,167,102,105,101,108,100,95,49,123,167,102,105,101,108,100,95,50,149,97,98,100,99,101>>, + {ok, Got} = decode_record(Rec), + Exp = [{<<"field_1">>, 123}, {<<"field_2">>, "abdce"}], + ?assertEqual(Exp, Got). + + +-endif. From e7267052b00847c3cf751de73e15409a0af255d2 Mon Sep 17 00:00:00 2001 From: Andrei Zavada Date: Wed, 26 Aug 2015 20:07:04 +0400 Subject: [PATCH 14/22] pin cuttlefish for e2e/ts --- rebar.config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rebar.config b/rebar.config index d49b71b1..b5069263 100644 --- a/rebar.config +++ b/rebar.config @@ -14,7 +14,7 @@ {deps, [ {msgpack, ".*", {git, "git://github.com/msgpack/msgpack-erlang.git", "master"}}, - {cuttlefish, ".*", {git, "git://github.com/basho/cuttlefish.git", {branch, "2.0"}}} + {cuttlefish, ".*", {git, "git://github.com/basho/cuttlefish.git", {branch, "develop"}}} ]}. {port_env, [ From 029ae98ad33f98316983242f5b498fbe0584b749 Mon Sep 17 00:00:00 2001 From: erikleitch Date: Thu, 27 Aug 2015 11:10:07 -0700 Subject: [PATCH 15/22] Starting again, this time from gg/prototype/timeseries-range-scan-filter branch! First commit just changes leveldb VSN to point to corresponding feature/streaming-folds-filter branch --- c_src/build_deps.sh | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/c_src/build_deps.sh b/c_src/build_deps.sh index 5620f2ff..a04705de 100755 --- a/c_src/build_deps.sh +++ b/c_src/build_deps.sh @@ -8,8 +8,7 @@ if [ `uname -s` = 'SunOS' -a "${POSIX_SHELL}" != "true" ]; then fi unset POSIX_SHELL # clear it so if we invoke other scripts, they run as ksh as well -#LEVELDB_VSN="2.0.0" -LEVELDB_VSN="prototype/timeseries" +LEVELDB_VSN="feature/streaming-folds-filter" SNAPPY_VSN="1.0.4" From e7bee9f6cac03a4947dd10d7ca3035b5a3da3a09 Mon Sep 17 00:00:00 2001 From: erikleitch Date: Thu, 27 Aug 2015 11:20:20 -0700 Subject: [PATCH 16/22] Commented out macosx-version-min Opt from rebar.config.script, which prevents any of the C++11 features from compiling under Yosemite. Needs further work to resolve this --- rebar.config | 3 +-- rebar.config.script | 3 ++- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/rebar.config b/rebar.config index d49b71b1..f25ae920 100644 --- a/rebar.config +++ b/rebar.config @@ -21,8 +21,7 @@ %% Make sure to set -fPIC when compiling leveldb {"CFLAGS", "$CFLAGS -Wall -O3 -fPIC"}, % try and make it build - % {"CXXFLAGS", "$CXXFLAGS -Wall -O3 -fPIC -std=c++11 --stdlib=libc++"}, - {"CXXFLAGS", "$CXXFLAGS -Wall -O3 -fPIC -std=gnu++11"}, + {"CXXFLAGS", "$CXXFLAGS -Wall -O3 -fPIC -std=c++0x"}, {"DRV_CFLAGS", "$DRV_CFLAGS -O3 -Wall -I c_src/leveldb/include"}, {"DRV_LDFLAGS", "$DRV_LDFLAGS c_src/leveldb/libleveldb.a c_src/system/lib/libsnappy.a -lstdc++"} ]}. diff --git a/rebar.config.script b/rebar.config.script index 60a64b9d..8515c12e 100644 --- a/rebar.config.script +++ b/rebar.config.script @@ -6,7 +6,8 @@ %% actually running. case os:type() of {unix,darwin} -> - Opt = " -mmacosx-version-min=10.8", +%% Opt = " -mmacosx-version-min=10.8", + Opt = " ", [Mjr|_] = string:tokens(os:cmd("/usr/bin/uname -r"), "."), Major = list_to_integer(Mjr), if From a895daca93ae3f05a7d25d382d81d8f585745cb1 Mon Sep 17 00:00:00 2001 From: erikleitch Date: Thu, 27 Aug 2015 13:49:51 -0700 Subject: [PATCH 17/22] Removed macosx-min flag from rebar.config.script which prevents C++11 code from compiling on Yosemite. Set leveldb VSN to end-to-end/timeseries for this branch --- c_src/build_deps.sh | 3 +-- rebar.config.script | 3 ++- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/c_src/build_deps.sh b/c_src/build_deps.sh index 5620f2ff..b0112517 100755 --- a/c_src/build_deps.sh +++ b/c_src/build_deps.sh @@ -8,8 +8,7 @@ if [ `uname -s` = 'SunOS' -a "${POSIX_SHELL}" != "true" ]; then fi unset POSIX_SHELL # clear it so if we invoke other scripts, they run as ksh as well -#LEVELDB_VSN="2.0.0" -LEVELDB_VSN="prototype/timeseries" +LEVELDB_VSN="end-to-end/timeseries" SNAPPY_VSN="1.0.4" diff --git a/rebar.config.script b/rebar.config.script index 60a64b9d..8fb067c6 100644 --- a/rebar.config.script +++ b/rebar.config.script @@ -6,7 +6,8 @@ %% actually running. case os:type() of {unix,darwin} -> - Opt = " -mmacosx-version-min=10.8", +%% Opt = " -mmacosx-version-min=10.8", + Opt = "", [Mjr|_] = string:tokens(os:cmd("/usr/bin/uname -r"), "."), Major = list_to_integer(Mjr), if From 5c767c3b8da03d752e93dcf3fd3ce82448b443d3 Mon Sep 17 00:00:00 2001 From: erikleitch Date: Thu, 27 Aug 2015 14:23:28 -0700 Subject: [PATCH 18/22] First successful compile of merge of streaming folds + filtering code. NB: COMPILED -- not TESTED --- c_src/atoms.h | 4 + c_src/eleveldb.cc | 294 +++++++++++++++++++++++++++++---------------- c_src/eleveldb.h | 8 +- c_src/workitems.cc | 224 ++++++++++++++++++++++++++++++---- c_src/workitems.h | 44 +++++-- 5 files changed, 441 insertions(+), 133 deletions(-) diff --git a/c_src/atoms.h b/c_src/atoms.h index 1ae8a166..ff77c36f 100644 --- a/c_src/atoms.h +++ b/c_src/atoms.h @@ -70,6 +70,10 @@ extern ERL_NIF_TERM ATOM_RANGE_SCAN_END; extern ERL_NIF_TERM ATOM_NEEDS_REACK; extern ERL_NIF_TERM ATOM_TIME_SERIES; extern ERL_NIF_TERM ATOM_RANGE_FILTER; +extern ERL_NIF_TERM ATOM_STREAMING_BATCH; +extern ERL_NIF_TERM ATOM_STREAMING_END; +extern ERL_NIF_TERM ATOM_LIMIT; +extern ERL_NIF_TERM ATOM_UNDEFINED; } // namespace eleveldb diff --git a/c_src/eleveldb.cc b/c_src/eleveldb.cc index f28f12d8..23e1f07f 100644 --- a/c_src/eleveldb.cc +++ b/c_src/eleveldb.cc @@ -81,10 +81,13 @@ static ErlNifFunc nif_funcs[] = {"async_iterator", 3, eleveldb::async_iterator}, {"async_iterator", 4, eleveldb::async_iterator}, - {"async_iterator_move", 3, eleveldb::async_iterator_move}, - {"range_scan", 4, eleveldb::range_scan}, - {"range_scan_ack", 2, eleveldb::range_scan_ack} + + {"streaming_start", 4, eleveldb::streaming_start}, + {"streaming_ack", 2, eleveldb::streaming_ack}, + {"streaming_stop", 1, eleveldb::streaming_stop}, + + {"current_usec", 0, eleveldb::currentMicroSeconds}, }; @@ -139,6 +142,7 @@ ERL_NIF_TERM ATOM_BLOCK_CACHE_THRESHOLD; ERL_NIF_TERM ATOM_IS_INTERNAL_DB; ERL_NIF_TERM ATOM_LIMITED_DEVELOPER_MEM; ERL_NIF_TERM ATOM_ELEVELDB_THREADS; +ERL_NIF_TERM ATOM_ELEVELDB_STREAM_THREADS; ERL_NIF_TERM ATOM_FADVISE_WILLNEED; ERL_NIF_TERM ATOM_DELETE_THRESHOLD; ERL_NIF_TERM ATOM_TIERED_SLOW_LEVEL; @@ -154,6 +158,10 @@ ERL_NIF_TERM ATOM_NEEDS_REACK; ERL_NIF_TERM ATOM_TIME_SERIES; ERL_NIF_TERM ATOM_GLOBAL_DATA_DIR; ERL_NIF_TERM ATOM_RANGE_FILTER; +ERL_NIF_TERM ATOM_STREAMING_BATCH; +ERL_NIF_TERM ATOM_STREAMING_END; +ERL_NIF_TERM ATOM_LIMIT; +ERL_NIF_TERM ATOM_UNDEFINED; } // namespace eleveldb @@ -194,6 +202,7 @@ static ERL_NIF_TERM slice_to_binary(ErlNifEnv* env, leveldb::Slice s) struct EleveldbOptions { int m_EleveldbThreads; + int m_EleveldbStreamThreads; int m_LeveldbImmThreads; int m_LeveldbBGWriteThreads; int m_LeveldbOverlapThreads; @@ -207,7 +216,7 @@ struct EleveldbOptions std::string m_GlobalDataDir; EleveldbOptions() - : m_EleveldbThreads(71), + : m_EleveldbThreads(51), m_EleveldbStreamThreads(20), m_LeveldbImmThreads(0), m_LeveldbBGWriteThreads(0), m_LeveldbOverlapThreads(0), m_LeveldbGroomingThreads(0), m_TotalMemPercent(0), m_TotalMem(0), @@ -218,6 +227,7 @@ struct EleveldbOptions void Dump() { syslog(LOG_ERR, " m_EleveldbThreads: %d\n", m_EleveldbThreads); + syslog(LOG_ERR, " m_EleveldbStreamThreads: %d\n", m_EleveldbStreamThreads); syslog(LOG_ERR, " m_LeveldbImmThreads: %d\n", m_LeveldbImmThreads); syslog(LOG_ERR, " m_LeveldbBGWriteThreads: %d\n", m_LeveldbBGWriteThreads); syslog(LOG_ERR, " m_LeveldbOverlapThreads: %d\n", m_LeveldbOverlapThreads); @@ -240,11 +250,13 @@ class eleveldb_priv_data public: EleveldbOptions m_Opts; eleveldb::eleveldb_thread_pool thread_pool; + eleveldb::eleveldb_thread_pool stream_thread_pool; leveldb::DataDictionary data_dictionary; explicit eleveldb_priv_data(EleveldbOptions & Options) - : m_Opts(Options), thread_pool(Options.m_EleveldbThreads), - data_dictionary(Options.m_GlobalDataDir) + : m_Opts(Options), thread_pool(Options.m_EleveldbThreads), + stream_thread_pool(Options.m_EleveldbStreamThreads), + data_dictionary(Options.m_GlobalDataDir) {} private: @@ -313,6 +325,17 @@ ERL_NIF_TERM parse_init_option(ErlNifEnv* env, ERL_NIF_TERM item, EleveldbOption } // if } // if } + else if (option[0] == eleveldb::ATOM_ELEVELDB_STREAM_THREADS) + { + unsigned long temp; + if (enif_get_ulong(env, option[1], &temp)) + { + if (temp != 0) + { + opts.m_EleveldbStreamThreads = temp; + } // if + } // if + } else if (option[0] == eleveldb::ATOM_FADVISE_WILLNEED) { opts.m_FadviseWillNeed = (option[1] == eleveldb::ATOM_TRUE); @@ -552,6 +575,39 @@ ERL_NIF_TERM parse_range_scan_option(ErlNifEnv* env, ERL_NIF_TERM item, return eleveldb::ATOM_OK; } +ERL_NIF_TERM parse_streaming_option(ErlNifEnv* env, ERL_NIF_TERM item, + eleveldb::RangeScanOptions & opts) +{ + int arity; + const ERL_NIF_TERM* option; + if (enif_get_tuple(env, item, &arity, &option) && 2 == arity) + { + if (option[0] == eleveldb::ATOM_START_INCLUSIVE) + opts.start_inclusive = (option[1] == eleveldb::ATOM_TRUE); + else if (option[0] == eleveldb::ATOM_END_INCLUSIVE) + opts.end_inclusive = (option[1] == eleveldb::ATOM_TRUE); + else if (option[0] == eleveldb::ATOM_FILL_CACHE) + opts.fill_cache = (option[1] == eleveldb::ATOM_TRUE); + else if (option[0] == eleveldb::ATOM_VERIFY_CHECKSUMS) + opts.verify_checksums = (option[1] == eleveldb::ATOM_TRUE); + else if (option[0] == eleveldb::ATOM_MAX_UNACKED_BYTES) { + unsigned max_unacked_bytes; + if (enif_get_uint(env, option[1], &max_unacked_bytes)) + opts.max_unacked_bytes = max_unacked_bytes; + } else if (option[0] == eleveldb::ATOM_MAX_BATCH_BYTES) { + unsigned max_batch_bytes; + if (enif_get_uint(env, option[1], &max_batch_bytes)) + opts.max_batch_bytes = max_batch_bytes; + } else if (option[0] == eleveldb::ATOM_LIMIT) { + unsigned limit; + if (enif_get_uint(env, option[1], &limit)) + opts.limit = limit; + } + } + + return eleveldb::ATOM_OK; +} + ERL_NIF_TERM parse_write_option(ErlNifEnv* env, ERL_NIF_TERM item, leveldb::WriteOptions& opts) { int arity; @@ -988,101 +1044,6 @@ async_iterator( return submit_to_thread_queue(work_item, env, caller_ref); } // async_iterator -ERL_NIF_TERM -range_scan_ack(ErlNifEnv * env, - int argc, - const ERL_NIF_TERM argv[]) -{ - const ERL_NIF_TERM ref = argv[0]; - const ERL_NIF_TERM num_bytes_term = argv[1]; - uint32_t num_bytes; - - if (!enif_get_uint(env, num_bytes_term, &num_bytes)) - return enif_make_badarg(env); - - using eleveldb::RangeScanTask; - RangeScanTask::SyncHandle * sync_handle; - sync_handle = RangeScanTask::RetrieveSyncHandle(env, ref); - - if (!sync_handle || !sync_handle->sync_obj) - return enif_make_badarg(env); - - bool needs_reack = sync_handle->sync_obj->AckBytes(num_bytes); - return needs_reack ? eleveldb::ATOM_NEEDS_REACK : eleveldb::ATOM_OK; -} - -ERL_NIF_TERM -range_scan(ErlNifEnv * env, - int argc, - const ERL_NIF_TERM argv[]) -{ - const ERL_NIF_TERM db_ref = argv[0]; - const ERL_NIF_TERM start_key_term = argv[1]; - const ERL_NIF_TERM end_key_term = argv[2]; - const ERL_NIF_TERM options_list = argv[3]; - - ReferencePtr db_ptr; - db_ptr.assign(DbObject::RetrieveDbObject(env, db_ref)); - - if (NULL == db_ptr.get() - || !enif_is_binary(env, start_key_term) - || !enif_is_binary(env, end_key_term) - || !enif_is_list(env, options_list)) - { - return enif_make_badarg(env); - } - - if (NULL == db_ptr->m_Db) - return error_einval(env); - - const leveldb::Options & options = db_ptr->m_Db->GetOptions(); - leveldb::KeyTranslator * key_tx = options.translator; - - ERL_NIF_TERM reply_ref = enif_make_ref(env); - - ErlNifBinary start_key_bin; - enif_inspect_binary(env, start_key_term, &start_key_bin); - leveldb::Slice start_key_slice((const char *)start_key_bin.data, - start_key_bin.size); - std::string start_key; - start_key.resize(key_tx->GetInternalKeySize(start_key_slice)); - key_tx->TranslateExternalKey(start_key_slice, (char*)start_key.data()); - - ErlNifBinary end_key_bin; - enif_inspect_binary(env, end_key_term, &end_key_bin); - leveldb::Slice end_key_slice((const char *)end_key_bin.data, - end_key_bin.size); - std::string end_key; - end_key.resize(key_tx->GetInternalKeySize(end_key_slice)); - key_tx->TranslateExternalKey(end_key_slice, (char*)end_key.data()); - - RangeScanOptions opts; - fold(env, options_list, parse_range_scan_option, opts); - - using eleveldb::RangeScanTask; - RangeScanTask::SyncHandle * sync_handle = - RangeScanTask::CreateSyncHandle(opts); - - ERL_NIF_TERM sync_ref = enif_make_resource(env, sync_handle); - - RangeScanTask * task = - new RangeScanTask(env, reply_ref, db_ptr.get(), - start_key, end_key, opts, sync_handle->sync_obj); - - eleveldb_priv_data& priv = - *static_cast(enif_priv_data(env)); - - if (false == priv.thread_pool.submit(task)) - { - delete task; // TODO: May require fancier destruction. - // TODO: Add thread pool submit error atom - return enif_make_tuple2(env, eleveldb::ATOM_ERROR, reply_ref); - } - - return enif_make_tuple2(env, eleveldb::ATOM_OK, - enif_make_tuple2(env, reply_ref, sync_ref)); -} - ERL_NIF_TERM async_iterator_move( ErlNifEnv* env, @@ -1340,6 +1301,132 @@ async_iterator_close( } // else } // async_iterator_close +//======================================================================= +// Streaming version of iterator, from Engel's streaming-folds branch +//======================================================================= + +/**....................................................................... + * Erlang client ack receipt of a batch of data from streaming + */ +ERL_NIF_TERM +streaming_ack(ErlNifEnv * env, + int argc, + const ERL_NIF_TERM argv[]) +{ + const ERL_NIF_TERM ref = argv[0]; + const ERL_NIF_TERM num_bytes_term = argv[1]; + uint32_t num_bytes; + + if (!enif_get_uint(env, num_bytes_term, &num_bytes)) + return enif_make_badarg(env); + + using eleveldb::RangeScanTask; + RangeScanTask::SyncHandle * sync_handle; + sync_handle = RangeScanTask::RetrieveSyncHandle(env, ref); + + if (!sync_handle || !sync_handle->sync_obj) + return enif_make_badarg(env); + + sync_handle->sync_obj->AckBytes(num_bytes); + + return eleveldb::ATOM_OK; +} + +/**....................................................................... + * Stop a stream that's currently in progress + */ +ERL_NIF_TERM +streaming_stop(ErlNifEnv * env, + int argc, + const ERL_NIF_TERM argv[]) +{ + const ERL_NIF_TERM ref = argv[0]; + + using eleveldb::RangeScanTask; + RangeScanTask::SyncHandle * sync_handle; + sync_handle = RangeScanTask::RetrieveSyncHandle(env, ref); + + if (!sync_handle) + return enif_make_badarg(env); + + RangeScanTask::SyncHandleResourceCleanup(env, sync_handle); + + return eleveldb::ATOM_OK; +} + +/**....................................................................... + * Start streaming + */ +ERL_NIF_TERM +streaming_start(ErlNifEnv * env, + int argc, + const ERL_NIF_TERM argv[]) +{ + const ERL_NIF_TERM db_ref = argv[0]; + const ERL_NIF_TERM start_key_term = argv[1]; + const ERL_NIF_TERM end_key_term = argv[2]; + const ERL_NIF_TERM options_list = argv[3]; + + ReferencePtr db_ptr; + db_ptr.assign(DbObject::RetrieveDbObject(env, db_ref)); + + bool has_end_key = enif_is_binary(env, end_key_term); + + if (NULL == db_ptr.get() + || !enif_is_binary(env, start_key_term) + || (!has_end_key && eleveldb::ATOM_UNDEFINED != end_key_term) + || !enif_is_list(env, options_list)) + { + return enif_make_badarg(env); + } + + if (NULL == db_ptr->m_Db) + return error_einval(env); + + ERL_NIF_TERM reply_ref = enif_make_ref(env); + + ErlNifBinary start_key_bin; + enif_inspect_binary(env, start_key_term, &start_key_bin); + std::string start_key((const char*)start_key_bin.data, start_key_bin.size); + + std::string * end_key_ptr = NULL; + std::string end_key; + if (has_end_key) { + ErlNifBinary end_key_bin; + enif_inspect_binary(env, end_key_term, &end_key_bin); + end_key.assign((const char*)end_key_bin.data, end_key_bin.size); + end_key_ptr = &end_key; + } + + RangeScanOptions opts; + fold(env, options_list, parse_streaming_option, opts); + + using eleveldb::RangeScanTask; + RangeScanTask::SyncHandle * sync_handle = + RangeScanTask::CreateSyncHandle(opts); + + ERL_NIF_TERM sync_ref = enif_make_resource(env, sync_handle); + // Release so it's destroyed on GC. + enif_release_resource(sync_handle); + + RangeScanTask * task = + new RangeScanTask(env, reply_ref, db_ptr.get(), + start_key, end_key_ptr, opts, sync_handle->sync_obj); + + eleveldb_priv_data& priv = + *static_cast(enif_priv_data(env)); + + if (false == priv.stream_thread_pool.submit(task)) + { + delete task; // TODO: May require fancier destruction. + // TODO: Add thread pool submit error atom + return enif_make_tuple2(env, eleveldb::ATOM_ERROR, reply_ref); + } + + return enif_make_tuple2(env, eleveldb::ATOM_OK, + enif_make_tuple2(env, reply_ref, sync_ref)); +} + } // namespace eleveldb @@ -1563,6 +1650,7 @@ try ATOM(eleveldb::ATOM_IS_INTERNAL_DB, "is_internal_db"); ATOM(eleveldb::ATOM_LIMITED_DEVELOPER_MEM, "limited_developer_mem"); ATOM(eleveldb::ATOM_ELEVELDB_THREADS, "eleveldb_threads"); + ATOM(eleveldb::ATOM_ELEVELDB_STREAM_THREADS, "eleveldb_stream_threads"); ATOM(eleveldb::ATOM_FADVISE_WILLNEED, "fadvise_willneed"); ATOM(eleveldb::ATOM_DELETE_THRESHOLD, "delete_threshold"); ATOM(eleveldb::ATOM_TIERED_SLOW_LEVEL, "tiered_slow_level"); @@ -1578,6 +1666,10 @@ try ATOM(eleveldb::ATOM_TIME_SERIES, "time_series"); ATOM(eleveldb::ATOM_GLOBAL_DATA_DIR, "global_data_dir"); ATOM(eleveldb::ATOM_RANGE_FILTER, "range_filter"); + ATOM(eleveldb::ATOM_STREAMING_BATCH, "streaming_batch"); + ATOM(eleveldb::ATOM_STREAMING_END, "streaming_end"); + ATOM(eleveldb::ATOM_LIMIT, "limit"); + ATOM(eleveldb::ATOM_UNDEFINED, "undefined"); #undef ATOM diff --git a/c_src/eleveldb.h b/c_src/eleveldb.h index 884b92d1..b26e9e69 100644 --- a/c_src/eleveldb.h +++ b/c_src/eleveldb.h @@ -52,8 +52,12 @@ ERL_NIF_TERM async_close_family(ErlNifEnv* env, int argc, const ERL_NIF_TERM arg ERL_NIF_TERM async_iterator(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); ERL_NIF_TERM async_iterator_move(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); ERL_NIF_TERM async_iterator_close(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); -ERL_NIF_TERM range_scan(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); -ERL_NIF_TERM range_scan_ack(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); + +ERL_NIF_TERM streaming_start(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); +ERL_NIF_TERM streaming_ack(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); +ERL_NIF_TERM streaming_stop(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); + +ERL_NIF_TERM currentMicroSeconds(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); } // namespace eleveldb diff --git a/c_src/workitems.cc b/c_src/workitems.cc index 3359f519..dffe0f70 100644 --- a/c_src/workitems.cc +++ b/c_src/workitems.cc @@ -454,14 +454,18 @@ RangeScanTask::RangeScanTask(ErlNifEnv * caller_env, ERL_NIF_TERM caller_ref, DbObject * db_handle, const std::string & start_key, - const std::string & end_key, + const std::string * end_key, RangeScanOptions & options, SyncObject * sync_obj) : WorkTask(caller_env, caller_ref, db_handle), options_(options), - start_key_(start_key), end_key_(end_key), + start_key_(start_key), + has_end_key_(bool(end_key)), sync_obj_(sync_obj) { + if (end_key) { + end_key_ = *end_key; + } sync_obj_->RefInc(); } @@ -471,9 +475,11 @@ RangeScanTask::~RangeScanTask() } RangeScanTask::SyncObject::SyncObject(const RangeScanOptions & opts) -: max_bytes_(opts.max_unacked_bytes), num_bytes_(0), +: max_bytes_(opts.max_unacked_bytes), + low_bytes_(opts.low_bytes), + num_bytes_(0), producer_sleeping_(false), pending_signal_(false), consumer_dead_(false), - mutex_(NULL), cond_(NULL) + crossed_under_max_(false), mutex_(NULL), cond_(NULL) { mutex_ = enif_mutex_create(0); cond_ = enif_cond_create(0); @@ -488,56 +494,56 @@ RangeScanTask::SyncObject::~SyncObject() void RangeScanTask::SyncObject::AddBytes(uint32_t n) { uint32_t num_bytes = add_and_fetch(&num_bytes_, n); - // Block if max bytes reached. + // Block if buffer full. if (num_bytes >= max_bytes_) { enif_mutex_lock(mutex_); - if (!consumer_dead_) { + if (!consumer_dead_ && !pending_signal_) { producer_sleeping_ = true; while (producer_sleeping_) { enif_cond_wait(cond_, mutex_); } } + if (pending_signal_) + pending_signal_ = false; enif_mutex_unlock(mutex_); } } -bool RangeScanTask::SyncObject::AckBytes(uint32_t n) +void RangeScanTask::SyncObject::AckBytes(uint32_t n) { uint32_t num_bytes = sub_and_fetch(&num_bytes_, n); - bool ret; - const bool is_reack = n == 0; - const bool is_under_max = num_bytes < max_bytes_; - const bool was_over_max = num_bytes_ + n >= max_bytes_; - const bool went_under_max = is_under_max && was_over_max; + if (num_bytes < max_bytes_ && num_bytes_ + n >= max_bytes_) + crossed_under_max_ = true; - if (went_under_max || is_reack) { + // Detect if at some point buffer was full, but now we have + // acked enough bytes to go under the low watermark. + if (crossed_under_max_ && num_bytes < low_bytes_) { + crossed_under_max_ = false; enif_mutex_lock(mutex_); if (producer_sleeping_) { producer_sleeping_ = false; enif_cond_signal(cond_); - ret = false; } else { - // Producer crossed the threshold, but we caught it before it - // blocked. Pending a cond signal to wake it when it does. pending_signal_ = true; - ret = true; } enif_mutex_unlock(mutex_); - } else - ret = false; - - return ret; + } } void RangeScanTask::SyncObject::MarkConsumerDead() { enif_mutex_lock(mutex_); consumer_dead_ = true; - if (producer_sleeping_) + if (producer_sleeping_) { + producer_sleeping_ = false; enif_cond_signal(cond_); + } enif_mutex_unlock(mutex_); } +bool RangeScanTask::SyncObject::IsConsumerDead() const { + return consumer_dead_; +} void send_batch(ErlNifPid * pid, ErlNifEnv * msg_env, ERL_NIF_TERM ref_term, ErlNifBinary * bin) { @@ -576,6 +582,177 @@ work_result RangeScanTask::operator()() ErlNifEnv * env = local_env_; ErlNifEnv * msg_env = enif_alloc_env(); read_options.fill_cache = options_.fill_cache; + read_options.verify_checksums = options_.verify_checksums; + leveldb::Iterator * iter = m_DbPtr->m_Db->NewIterator(read_options); + const leveldb::Comparator * cmp = m_DbPtr->m_DbOptions->comparator; + + const leveldb::Slice skey_slice(start_key_); + const leveldb::Slice ekey_slice(end_key_); + + const leveldb::Options& db_options = m_DbPtr->m_Db->GetOptions(); + leveldb::KeyTranslator* translator = db_options.translator; + + iter->Seek(skey_slice); + + ErlNifPid pid; + enif_get_local_pid(env, caller_pid_term, &pid); + ErlNifBinary bin; + const size_t initial_bin_size = size_t(options_.max_batch_bytes * 1.1); + size_t out_offset = 0; + size_t num_read = 0; + + std::string key_buffer; + key_buffer.reserve(256); + + //------------------------------------------------------------ + // Skip if not including first key and first key exists + //------------------------------------------------------------ + + if (!options_.start_inclusive + && iter->Valid() + && cmp->Compare(iter->key(), skey_slice) == 0) { + iter->Next(); + } + + while (!sync_obj_->IsConsumerDead()) { + + //------------------------------------------------------------ + // If reached end (iter invalid) or we've reached the + // specified limit on number of items (options_.limit), or the + // current key is past end key, send the batch and break out of the loop + //------------------------------------------------------------ + + if (!iter->Valid() + || (options_.limit > 0 && num_read >= options_.limit) + || (has_end_key_ && + (options_.end_inclusive ? + cmp->Compare(iter->key(), ekey_slice) > 0 : + cmp->Compare(iter->key(), ekey_slice) >= 0 + ))) { + + // If data are present in the batch (ie, out_offset != 0), + // send the batch now + + if (out_offset) { + + // Shrink it to final size. + + if (out_offset != bin.size) + enif_realloc_binary(&bin, out_offset); + + send_batch(&pid, msg_env, caller_ref_term, &bin); + out_offset = 0; + } + + break; + } + + //------------------------------------------------------------ + // Else keep going; shove the next entry in the batch, but + // only if it passes any user-specified filter + // ------------------------------------------------------------ + + leveldb::Slice key = iter->key(); + leveldb::Slice value = iter->value(); + + bool filter_passed = true; + if (options_.range_filter!=0) { + options_.extractor->extract(value.data(), value.size(), + options_.range_filter); + filter_passed = options_.range_filter->evaluate(); + } + + if (filter_passed) { + + key_buffer.resize(0); + translator->TranslateInternalKey(key, &key_buffer); + + const size_t ksz = key_buffer.size(); + const size_t vsz = value.size(); + + const size_t ksz_sz = VarintLength(ksz); + const size_t vsz_sz = VarintLength(vsz); + + const size_t esz = ksz + ksz_sz + vsz + vsz_sz; + const size_t next_offset = out_offset + esz; + + // Allocate the output data array if this is the first data + // (out_offset == 0) + + if (out_offset == 0) + enif_alloc_binary(initial_bin_size, &bin); + + //------------------------------------------------------------ + // If we need more space, allocate it exactly since that means we + // reached the batch max anyway and will send it right away + //------------------------------------------------------------ + + if (next_offset > bin.size) + enif_realloc_binary(&bin, next_offset); + + char * const out = (char*)bin.data + out_offset; + + EncodeVarint64(out, ksz); + memcpy(out + ksz_sz, key_buffer.data(), ksz); + + EncodeVarint64(out + ksz_sz + ksz, vsz); + memcpy(out + ksz_sz + ksz + vsz_sz, value.data(), vsz); + + out_offset = next_offset; + + // If we've reached the maximum number of bytes to include in + // the batch, possibly shrink the binary and send it + + if (out_offset >= options_.max_batch_bytes) { + + if (out_offset != bin.size) + enif_realloc_binary(&bin, out_offset); + + send_batch(&pid, msg_env, caller_ref_term, &bin); + + // Maybe block if max reached. + + sync_obj_->AddBytes(out_offset); + + out_offset = 0; + } + + // Increment the number of keys read and step to the next key + + ++num_read; + } + + iter->Next(); + } + + //------------------------------------------------------------ + // If exiting work loop, send a streaming_end message to any + // waiting erlang threads + //------------------------------------------------------------ + + if (!sync_obj_->IsConsumerDead()) { + ERL_NIF_TERM ref_copy = enif_make_copy(msg_env, caller_ref_term); + ERL_NIF_TERM msg = + enif_make_tuple2(msg_env, ATOM_STREAMING_END, ref_copy); + enif_send(NULL, &pid, msg_env, msg); + } + + if (out_offset) + enif_release_binary(&bin); + + enif_free_env(msg_env); + return work_result(); + +} // RangeScanTask::operator() + +#if 0 +work_result RangeScanTask::operator()() +{ + leveldb::ReadOptions read_options; + ErlNifEnv * env = local_env_; + ErlNifEnv * msg_env = enif_alloc_env(); + read_options.fill_cache = options_.fill_cache; + read_options.verify_checksums = options_.verify_checksums; leveldb::Iterator * iter = m_DbPtr->m_Db->NewIterator(read_options); const leveldb::Comparator * cmp = m_DbPtr->m_DbOptions->comparator; const leveldb::Slice skey_slice(start_key_); @@ -590,6 +767,8 @@ work_result RangeScanTask::operator()() ErlNifBinary bin; const size_t initial_bin_size = size_t(options_.max_batch_bytes * 1.1); size_t out_offset = 0; + size_t num_read = 0; + std::string key_buffer; key_buffer.reserve(256); @@ -653,6 +832,7 @@ work_result RangeScanTask::operator()() enif_free_env(msg_env); return work_result(); } // RangeScanTask::operator() +#endif ErlNifResourceType * RangeScanTask::sync_handle_resource_ = NULL; diff --git a/c_src/workitems.h b/c_src/workitems.h index 98b0ab43..c6339c87 100644 --- a/c_src/workitems.h +++ b/c_src/workitems.h @@ -452,17 +452,40 @@ class ItrCloseTask : public WorkTask }; // class ItrCloseTask struct RangeScanOptions { + + // Byte-level controls for batching/ack + size_t max_unacked_bytes; + size_t low_bytes; size_t max_batch_bytes; + + // Max number of items to return. Zero means unlimited. + + size_t limit; + + // Include the start key in streaming iteration? + bool start_inclusive; + + // Include the end key in streaming iteration? + bool end_inclusive; + + // Read options + bool fill_cache; + bool verify_checksums; + ExpressionNode* range_filter = 0; Extractor* extractor = 0; RangeScanOptions() - : max_unacked_bytes(10 * 1024 * 1024), max_batch_bytes(1 * 1024 * 1024), - start_inclusive(true), end_inclusive(false), fill_cache(false) + : max_unacked_bytes(10 * 1024 * 1024), + low_bytes(2 * 1024 * 1024), + max_batch_bytes(1 * 1024 * 1024), + limit(0), + start_inclusive(true), end_inclusive(false), + fill_cache(false), verify_checksums(true) { } }; @@ -489,21 +512,25 @@ class RangeScanTask : public WorkTask // side to shut down. void AddBytes(uint32_t n); - // Returns true if num bytes was over the max, so that producer - // has gone or will go to sleep waiting on ack. Consumer *needs* - // to signal it or it will sleep forever. - bool AckBytes(uint32_t n); + void AckBytes(uint32_t n); // Should be called when the Erlang handle is garbage collected // so no process is there to consume the output. void MarkConsumerDead(); + bool IsConsumerDead() const; + private: const uint32_t max_bytes_; + const uint32_t low_bytes_; volatile uint32_t num_bytes_; - volatile bool producer_sleeping_; + bool producer_sleeping_; + // Set if producer filled up but consumer acked before + // producer went to sleep. Producer should abort going to + // sleep upon seeing this set. volatile bool pending_signal_; volatile bool consumer_dead_; + volatile bool crossed_under_max_; ErlNifMutex * mutex_; ErlNifCond * cond_; }; @@ -516,7 +543,7 @@ class RangeScanTask : public WorkTask ERL_NIF_TERM caller_ref, DbObject * db_handle, const std::string & start_key, - const std::string & end_key, + const std::string * end_key, RangeScanOptions & options, SyncObject * sync_obj); @@ -534,6 +561,7 @@ class RangeScanTask : public WorkTask RangeScanOptions options_; std::string start_key_; std::string end_key_; + bool has_end_key_; SyncObject * sync_obj_; }; // class RangeScanTask From 8f8d398129badb89ef898767dbe6b399c8537567 Mon Sep 17 00:00:00 2001 From: erikleitch Date: Thu, 27 Aug 2015 16:20:16 -0700 Subject: [PATCH 19/22] Added back range scan stuff for backwards compatibility (now supports both range scan and streaming folds --- c_src/eleveldb.cc | 135 +++++++++++++++++++++++++++++++++++++++++- c_src/eleveldb.h | 3 + c_src/workitems.cc | 130 +++++++++++++++++++++++++++++++++++++++++ c_src/workitems.h | 17 ++++++ src/eleveldb.erl | 143 ++++++++++++++++++++++++++++++++++++++++++--- 5 files changed, 418 insertions(+), 10 deletions(-) diff --git a/c_src/eleveldb.cc b/c_src/eleveldb.cc index 23e1f07f..79e1828a 100644 --- a/c_src/eleveldb.cc +++ b/c_src/eleveldb.cc @@ -83,6 +83,9 @@ static ErlNifFunc nif_funcs[] = {"async_iterator", 4, eleveldb::async_iterator}, {"async_iterator_move", 3, eleveldb::async_iterator_move}, + {"range_scan", 4, eleveldb::range_scan}, + {"range_scan_ack", 2, eleveldb::range_scan_ack}, + {"streaming_start", 4, eleveldb::streaming_start}, {"streaming_ack", 2, eleveldb::streaming_ack}, {"streaming_stop", 1, eleveldb::streaming_stop}, @@ -174,6 +177,8 @@ class eleveldb_priv_data; static volatile uint64_t gCurrentTotalMemory=0; +int64_t getCurrentMicroSeconds(); + // Erlang helpers: ERL_NIF_TERM error_einval(ErlNifEnv* env) { @@ -1301,6 +1306,105 @@ async_iterator_close( } // else } // async_iterator_close +//======================================================================= +// Range scan iteration, to compare with streaming version +//======================================================================= + +ERL_NIF_TERM +range_scan_ack(ErlNifEnv * env, + int argc, + const ERL_NIF_TERM argv[]) +{ + const ERL_NIF_TERM ref = argv[0]; + const ERL_NIF_TERM num_bytes_term = argv[1]; + uint32_t num_bytes; + + if (!enif_get_uint(env, num_bytes_term, &num_bytes)) + return enif_make_badarg(env); + + using eleveldb::RangeScanTask; + RangeScanTask::SyncHandle * sync_handle; + sync_handle = RangeScanTask::RetrieveSyncHandle(env, ref); + + if (!sync_handle || !sync_handle->sync_obj) + return enif_make_badarg(env); + + bool needs_reack = sync_handle->sync_obj->AckBytesRet(num_bytes); + return needs_reack ? eleveldb::ATOM_NEEDS_REACK : eleveldb::ATOM_OK; +} + +ERL_NIF_TERM +range_scan(ErlNifEnv * env, + int argc, + const ERL_NIF_TERM argv[]) +{ + const ERL_NIF_TERM db_ref = argv[0]; + const ERL_NIF_TERM start_key_term = argv[1]; + const ERL_NIF_TERM end_key_term = argv[2]; + const ERL_NIF_TERM options_list = argv[3]; + + ReferencePtr db_ptr; + db_ptr.assign(DbObject::RetrieveDbObject(env, db_ref)); + + if (NULL == db_ptr.get() + || !enif_is_binary(env, start_key_term) + || !enif_is_binary(env, end_key_term) + || !enif_is_list(env, options_list)) + { + return enif_make_badarg(env); + } + + if (NULL == db_ptr->m_Db) + return error_einval(env); + + const leveldb::Options & options = db_ptr->m_Db->GetOptions(); + leveldb::KeyTranslator * key_tx = options.translator; + + ERL_NIF_TERM reply_ref = enif_make_ref(env); + + ErlNifBinary start_key_bin; + enif_inspect_binary(env, start_key_term, &start_key_bin); + leveldb::Slice start_key_slice((const char *)start_key_bin.data, + start_key_bin.size); + std::string start_key; + start_key.resize(key_tx->GetInternalKeySize(start_key_slice)); + key_tx->TranslateExternalKey(start_key_slice, (char*)start_key.data()); + + ErlNifBinary end_key_bin; + enif_inspect_binary(env, end_key_term, &end_key_bin); + leveldb::Slice end_key_slice((const char *)end_key_bin.data, + end_key_bin.size); + + std::string end_key; + end_key.resize(key_tx->GetInternalKeySize(end_key_slice)); + key_tx->TranslateExternalKey(end_key_slice, (char*)end_key.data()); + + RangeScanOptions opts; + fold(env, options_list, parse_range_scan_option, opts); + + eleveldb::RangeScanTask::SyncHandle * sync_handle = + eleveldb::RangeScanTask::CreateSyncHandle(opts); + + ERL_NIF_TERM sync_ref = enif_make_resource(env, sync_handle); + + RangeScanTaskOld * task = + new RangeScanTaskOld(env, reply_ref, db_ptr.get(), + start_key, &end_key, opts, sync_handle->sync_obj); + + eleveldb_priv_data& priv = + *static_cast(enif_priv_data(env)); + + if (false == priv.thread_pool.submit(task)) + { + delete task; // TODO: May require fancier destruction. + // TODO: Add thread pool submit error atom + return enif_make_tuple2(env, eleveldb::ATOM_ERROR, reply_ref); + } + + return enif_make_tuple2(env, eleveldb::ATOM_OK, + enif_make_tuple2(env, reply_ref, sync_ref)); +} + //======================================================================= // Streaming version of iterator, from Engel's streaming-folds branch //======================================================================= @@ -1425,10 +1529,39 @@ streaming_start(ErlNifEnv * env, return enif_make_tuple2(env, eleveldb::ATOM_OK, enif_make_tuple2(env, reply_ref, sync_ref)); -} +} // streaming_start + +ERL_NIF_TERM +currentMicroSeconds( + ErlNifEnv* env, + int argc, + const ERL_NIF_TERM argv[]) +{ + return enif_make_int64(env, getCurrentMicroSeconds()); +} // currentMicroSeconds } // namespace eleveldb +int64_t getCurrentMicroSeconds() +{ +#if _POSIX_TIMERS >= 200801L + +struct timespec ts; + +// this is rumored to be faster that gettimeofday(), and sometimes +// shift less ... someday use CLOCK_MONOTONIC_RAW + + clock_gettime(CLOCK_MONOTONIC, &ts); + return static_cast(ts.tv_sec) * 1000000 + ts.tv_nsec/1000; + +#else + +struct timeval tv; +gettimeofday(&tv, NULL); +return static_cast(tv.tv_sec) * 1000000 + tv.tv_usec; + +#endif +} ERL_NIF_TERM eleveldb_status( diff --git a/c_src/eleveldb.h b/c_src/eleveldb.h index b26e9e69..195dc2ba 100644 --- a/c_src/eleveldb.h +++ b/c_src/eleveldb.h @@ -53,6 +53,9 @@ ERL_NIF_TERM async_iterator(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) ERL_NIF_TERM async_iterator_move(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); ERL_NIF_TERM async_iterator_close(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); +ERL_NIF_TERM range_scan(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); +ERL_NIF_TERM range_scan_ack(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); + ERL_NIF_TERM streaming_start(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); ERL_NIF_TERM streaming_ack(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); ERL_NIF_TERM streaming_stop(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); diff --git a/c_src/workitems.cc b/c_src/workitems.cc index dffe0f70..85d40e5b 100644 --- a/c_src/workitems.cc +++ b/c_src/workitems.cc @@ -509,6 +509,35 @@ void RangeScanTask::SyncObject::AddBytes(uint32_t n) } } +bool RangeScanTask::SyncObject::AckBytesRet(uint32_t n) +{ + uint32_t num_bytes = sub_and_fetch(&num_bytes_, n); + bool ret; + + const bool is_reack = n == 0; + const bool is_under_max = num_bytes < max_bytes_; + const bool was_over_max = num_bytes_ + n >= max_bytes_; + const bool went_under_max = is_under_max && was_over_max; + + if (went_under_max || is_reack) { + enif_mutex_lock(mutex_); + if (producer_sleeping_) { + producer_sleeping_ = false; + enif_cond_signal(cond_); + ret = false; + } else { + // Producer crossed the threshold, but we caught it before it + // blocked. Pending a cond signal to wake it when it does. + pending_signal_ = true; + ret = true; + } + enif_mutex_unlock(mutex_); + } else + ret = false; + + return ret; +} + void RangeScanTask::SyncObject::AckBytes(uint32_t n) { uint32_t num_bytes = sub_and_fetch(&num_bytes_, n); @@ -878,4 +907,105 @@ void RangeScanTask::SyncHandleResourceCleanup(ErlNifEnv * env, void * arg) } } +//======================================================================= +// Backwards compatibility for range_scan operations +//======================================================================= + +RangeScanTaskOld::RangeScanTaskOld(ErlNifEnv * caller_env, + ERL_NIF_TERM caller_ref, + DbObject * db_handle, + const std::string & start_key, + const std::string * end_key, + RangeScanOptions & options, + SyncObject * sync_obj) : + RangeScanTask(caller_env, caller_ref, db_handle, start_key, end_key, options, sync_obj) +{ +} + +RangeScanTaskOld::~RangeScanTaskOld() {}; + +work_result RangeScanTaskOld::operator()() +{ + leveldb::ReadOptions read_options; + ErlNifEnv * env = local_env_; + ErlNifEnv * msg_env = enif_alloc_env(); + read_options.fill_cache = options_.fill_cache; + leveldb::Iterator * iter = m_DbPtr->m_Db->NewIterator(read_options); + const leveldb::Comparator * cmp = m_DbPtr->m_DbOptions->comparator; + const leveldb::Slice skey_slice(start_key_); + const leveldb::Slice ekey_slice(end_key_); + const leveldb::Options & db_options = m_DbPtr->m_Db->GetOptions(); + leveldb::KeyTranslator * translator = db_options.translator; + + iter->Seek(skey_slice); + + ErlNifPid pid; + enif_get_local_pid(env, caller_pid_term, &pid); + ErlNifBinary bin; + const size_t initial_bin_size = size_t(options_.max_batch_bytes * 1.1); + size_t out_offset = 0; + std::string key_buffer; + key_buffer.reserve(256); + + for (;;) { + // If reached end or key past end key. + if (!iter->Valid() + || cmp->Compare(iter->key(), ekey_slice) >= 0) { + // If data in batch + if (out_offset) { + // Shrink it to final size. + if (out_offset != bin.size) + enif_realloc_binary(&bin, out_offset); + send_batch(&pid, msg_env, caller_ref_term, &bin); + } + break; + } + // Shove next entry in the batch. + leveldb::Slice key = iter->key(); + leveldb::Slice value = iter->value(); + bool filter_passed = true; + if (options_.range_filter!=0) { + options_.extractor->extract(value.data(), value.size(), options_.range_filter); + filter_passed = options_.range_filter->evaluate(); + } + if (filter_passed) { + key_buffer.resize(0); + translator->TranslateInternalKey(key, &key_buffer); + const size_t ksz = key_buffer.size(), vsz = value.size(); + const size_t ksz_sz = VarintLength(ksz); + const size_t vsz_sz = VarintLength(vsz); + const size_t esz = ksz + ksz_sz + vsz + vsz_sz; + const size_t next_offset = out_offset + esz; + if (out_offset == 0) + enif_alloc_binary(initial_bin_size, &bin); + // If we need more space, allocate it exactly since that means we + // reached the batch max anyway and will send it right away. + if (next_offset > bin.size) + enif_realloc_binary(&bin, next_offset); + char * const out = (char*)bin.data + out_offset; + EncodeVarint64(out, ksz); + memcpy(out + ksz_sz, key_buffer.data(), ksz); + EncodeVarint64(out + ksz_sz + ksz, vsz); + memcpy(out + ksz_sz + ksz + vsz_sz, value.data(), vsz); + out_offset = next_offset; + if (out_offset >= options_.max_batch_bytes) { + if (out_offset != bin.size) + enif_realloc_binary(&bin, out_offset); + send_batch(&pid, msg_env, caller_ref_term, &bin); + // Maybe block if max reached. + sync_obj_->AddBytes(out_offset); + out_offset = 0; + } + } + iter->Next(); + } + + ERL_NIF_TERM ref_copy = enif_make_copy(msg_env, caller_ref_term); + ERL_NIF_TERM msg = + enif_make_tuple2(msg_env, ATOM_RANGE_SCAN_END, ref_copy); + enif_send(NULL, &pid, msg_env, msg); + enif_free_env(msg_env); + return work_result(); +} // RangeScanTask::operator() + } // namespace eleveldb diff --git a/c_src/workitems.h b/c_src/workitems.h index c6339c87..eba427ef 100644 --- a/c_src/workitems.h +++ b/c_src/workitems.h @@ -513,6 +513,7 @@ class RangeScanTask : public WorkTask void AddBytes(uint32_t n); void AckBytes(uint32_t n); + bool AckBytesRet(uint32_t n); // Should be called when the Erlang handle is garbage collected // so no process is there to consume the output. @@ -566,6 +567,22 @@ class RangeScanTask : public WorkTask }; // class RangeScanTask +class RangeScanTaskOld : public RangeScanTask { + public: + + RangeScanTaskOld(ErlNifEnv * caller_env, + ERL_NIF_TERM caller_ref, + DbObject * db_handle, + const std::string & start_key, + const std::string * end_key, + RangeScanOptions & options, + SyncObject * sync_obj); + + virtual ~RangeScanTaskOld(); + virtual work_result operator()(); + +}; // class RangeScanTaskOld + } // namespace eleveldb diff --git a/src/eleveldb.erl b/src/eleveldb.erl index 3212f3c9..d4019ead 100644 --- a/src/eleveldb.erl +++ b/src/eleveldb.erl @@ -34,6 +34,7 @@ write/3, write/4, fold/4, + foldtest1/4, fold_keys/4, status/2, destroy/2, @@ -44,7 +45,8 @@ ts_key/1, parse_string/1, is_empty/1, - encode/2]). + encode/2, + current_usec/0]). %% for testing -export([ @@ -62,6 +64,9 @@ range_scan_ack/2, range_scan_fold/6]). +-export([emlfold1/4, + emlfold2/1]). + -export_type([db_ref/0, itr_ref/0]). @@ -127,6 +132,13 @@ init() -> {tiered_fast_prefix, string()} | {tiered_slow_prefix, string()}]. +-type read_option() :: {verify_checksums, boolean()} | + {fill_cache, boolean()}. + +-type itr_option() :: {iterator_refresh, boolean()}. + +-type itr_options() :: [read_option() | itr_option()]. + -type read_options() :: [{verify_checksums, boolean()} | {fill_cache, boolean()} | {iterator_refresh, boolean()}]. @@ -143,12 +155,30 @@ init() -> {max_batch_size, pos_integer()} | {max_unacked_bytes, pos_integer()}]. +-type streaming_option() :: {max_batch_bytes, pos_integer()} | + {max_unacked_bytes, pos_integer()}. + +-type streaming_options() :: [streaming_option()]. + +-type fold_method() :: iterator | streaming. + +-type fold_options() :: [read_option() | + {fold_method, fold_method()} | + {first_key, binary()} | + {last_key, binary() | undefined} | + {start_inclusive, boolean()} | + {end_inclusive, boolean()} | + {limit, pos_integer()} | + streaming_option()]. + -type iterator_action() :: first | last | next | prev | prefetch | binary(). -opaque db_ref() :: binary(). -opaque itr_ref() :: binary(). +-type stream_ref() :: {reference(), binary()}. + encode(Val, timestamp) -> <>; encode(Val, binary) when is_binary(Val)-> @@ -239,21 +269,21 @@ write(Ref, Updates, Opts) -> async_write(_CallerRef, _Ref, _Updates, _Opts) -> erlang:nif_error({error, not_loaded}). --spec async_iterator(reference(), db_ref(), read_options()) -> ok. --spec async_iterator(reference(), db_ref(), read_options(), keys_only) -> ok. +-spec async_iterator(reference(), db_ref(), itr_options()) -> ok. async_iterator(_CallerRef, _Ref, _Opts) -> erlang:nif_error({error, not_loaded}). +-spec async_iterator(reference(), db_ref(), itr_options(), keys_only) -> ok. async_iterator(_CallerRef, _Ref, _Opts, keys_only) -> erlang:nif_error({error, not_loaded}). --spec iterator(db_ref(), read_options()) -> {ok, itr_ref()}. +-spec iterator(db_ref(), itr_options()) -> {ok, itr_ref()}. iterator(Ref, Opts) -> CallerRef = make_ref(), async_iterator(CallerRef, Ref, Opts), ?WAIT_FOR_REPLY(CallerRef). --spec iterator(db_ref(), read_options(), keys_only) -> {ok, itr_ref()}. +-spec iterator(db_ref(), itr_options(), keys_only) -> {ok, itr_ref()}. iterator(Ref, Opts, keys_only) -> CallerRef = make_ref(), async_iterator(CallerRef, Ref, Opts, keys_only), @@ -300,6 +330,20 @@ range_scan(_DBRef, _StartKey, _EndKey, _Opts) -> range_scan_ack(_Ref, _NumBytes) -> erlang:nif_error({error, not_loaded}). +-spec streaming_start(db_ref(), binary(), binary() | undefined, + streaming_options()) -> + {ok, stream_ref()} | {error, any()}. +streaming_start(_DBRef, _StartKey, _EndKey, _Opts) -> + erlang:nif_error({error, not_loaded}). + +-spec streaming_ack(binary(), pos_integer()) -> ok. +streaming_ack(_AckRef, _NumBytes) -> + erlang:nif_error({error, not_loaded}). + +-spec streaming_stop(binary()) -> ok. +streaming_stop(_AckRef) -> + erlang:nif_error({error, not_loaded}). + -type fold_fun() :: fun(({Key::binary(), Value::binary()}, any()) -> any()). range_scan_fold(Ref, Fun, Acc0, SKey, EKey, Opts) -> @@ -334,6 +378,38 @@ do_range_scan_fold(MsgRef, AckRef, Fun, Acc) -> lager:info("Range scan got unexpected message: ~p\n", [Msg]) end. +do_streaming_batch(<<>>, _Fun, Acc) -> + Acc; +do_streaming_batch(Bin, Fun, Acc) -> + {K, Bin2} = parse_string(Bin), + {V, Bin3} = parse_string(Bin2), + Acc2 = Fun({K, V}, Acc), + do_streaming_batch(Bin3, Fun, Acc2). + +do_streaming_fold(StreamRef = {MsgRef, AckRef}, Fun, Acc) -> + receive + {streaming_end, MsgRef} -> + Acc; + {streaming_batch, MsgRef, Batch} -> + Size = byte_size(Batch), + Acc2 = do_streaming_batch(Batch, Fun, Acc), + _ = streaming_ack(AckRef, Size), + do_streaming_fold(StreamRef, Fun, Acc2) + end. + +do_streaming_fold_test1(StreamRef = {MsgRef, AckRef}, Fun, Acc) -> + receive + {streaming_end, MsgRef} -> + Acc; + {streaming_batch, MsgRef, Batch} -> + Size = byte_size(Batch), + _ = streaming_ack(AckRef, Size), + do_streaming_fold(StreamRef, Fun, Acc) + end. + +current_usec() -> + erlang:nif_error({error, not_loaded}). + parse_string(Bin) -> parse_string(0, 0, Bin). @@ -347,10 +423,59 @@ parse_string(Size, Shift, <<0:1, N:7, Bin/binary>>) -> %% Fold over the keys and values in the database %% will throw an exception if the database is closed while the fold runs --spec fold(db_ref(), fold_fun(), any(), read_options()) -> any(). +-spec fold(db_ref(), fold_fun(), any(), fold_options()) -> any(). fold(Ref, Fun, Acc0, Opts) -> + case proplists:get_value(fold_method, Opts, iterator) of + iterator -> + {ok, Itr} = iterator(Ref, Opts), + do_itr_fold(Itr, Fun, Acc0, Opts); + streaming -> + SKey = proplists:get_value(first_key, Opts, <<>>), + EKey = proplists:get_value(last_key, Opts), + io:format("SKey = ~n"), + io:format(SKey), + io:format("~n"), + io:format("EKey = ~n"), + io:format(EKey), + io:format("~n"), + {ok, StreamRef} = streaming_start(Ref, SKey, EKey, Opts), + {_, AckRef} = StreamRef, + try + do_streaming_fold(StreamRef, Fun, Acc0) + after + %% Close early, do not wait for garbage collection. + streaming_stop(AckRef) + end + end. + +-spec foldtest1(db_ref(), fold_fun(), any(), fold_options()) -> any(). +foldtest1(Ref, Fun, Acc0, Opts) -> + case proplists:get_value(fold_method, Opts, iterator) of + iterator -> + {ok, Itr} = iterator(Ref, Opts), + do_itr_fold(Itr, Fun, Acc0, Opts); + streaming -> + SKey = proplists:get_value(first_key, Opts, <<>>), + EKey = proplists:get_value(last_key, Opts), + {ok, StreamRef} = streaming_start(Ref, SKey, EKey, Opts), + {_, AckRef} = StreamRef, + try + do_streaming_fold_test1(StreamRef, Fun, Acc0) + after + %% Close early, do not wait for garbage collection. + streaming_stop(AckRef) + end + end. + +emlfold1(Ref, _, _, Opts) -> {ok, Itr} = iterator(Ref, Opts), - do_fold(Itr, Fun, Acc0, Opts). + Start = proplists:get_value(first_key, Opts, first), + true = is_binary(Start) or (Start == first), + iterator_move(Itr, Start). + +emlfold2(Ref) -> + {ok, Itr} = iterator(Ref, []), + iterator_move(Itr, first). -type fold_keys_fun() :: fun((Key::binary(), any()) -> any()). @@ -359,7 +484,7 @@ fold(Ref, Fun, Acc0, Opts) -> -spec fold_keys(db_ref(), fold_keys_fun(), any(), read_options()) -> any(). fold_keys(Ref, Fun, Acc0, Opts) -> {ok, Itr} = iterator(Ref, Opts, keys_only), - do_fold(Itr, Fun, Acc0, Opts). + do_itr_fold(Itr, Fun, Acc0, Opts). -spec status(db_ref(), Key::binary()) -> {ok, binary()} | error. status(Ref, Key) -> @@ -460,7 +585,7 @@ add_open_defaults(Opts) -> end. -do_fold(Itr, Fun, Acc0, Opts) -> +do_itr_fold(Itr, Fun, Acc0, Opts) -> try %% Extract {first_key, binary()} and seek to that key as a starting %% point for the iteration. The folding function should use throw if it From 606fcccf1396ebc640c9c3674e6b81a1f3c0621f Mon Sep 17 00:00:00 2001 From: vagrant Date: Fri, 28 Aug 2015 18:37:15 +0000 Subject: [PATCH 20/22] Fixed initializations preventing the code from compiling on buildbot --- c_src/eleveldb.cc | 4 ++-- c_src/filter.h | 4 ++-- c_src/workitems.h | 6 +++--- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/c_src/eleveldb.cc b/c_src/eleveldb.cc index 79e1828a..5da88a29 100644 --- a/c_src/eleveldb.cc +++ b/c_src/eleveldb.cc @@ -291,7 +291,7 @@ ERL_NIF_TERM parse_init_option(ErlNifEnv* env, ERL_NIF_TERM item, EleveldbOption { if (option[0] == eleveldb::ATOM_TOTAL_LEVELDB_MEM) { - size_t memory_sz; + unsigned long memory_sz; if (enif_get_ulong(env, option[1], &memory_sz)) { if (memory_sz != 0) @@ -394,7 +394,7 @@ ERL_NIF_TERM parse_open_option(ErlNifEnv* env, ERL_NIF_TERM item, leveldb::Optio } else if (option[0] == eleveldb::ATOM_BLOCK_CACHE_THRESHOLD) { - size_t memory_sz; + unsigned long memory_sz; if (enif_get_ulong(env, option[1], &memory_sz)) { if (memory_sz != 0) diff --git a/c_src/filter.h b/c_src/filter.h index 4bc5a1bb..1938f1f4 100644 --- a/c_src/filter.h +++ b/c_src/filter.h @@ -109,10 +109,10 @@ struct ConstantValue: public ExpressionNode { template struct FieldValue: public ExpressionNode { const std::string field; - bool has_val=false; + bool has_val; T value; - FieldValue(const std::string fieldName): field(fieldName) { + FieldValue(const std::string fieldName): field(fieldName), has_val(false) { } inline virtual bool has_value() { diff --git a/c_src/workitems.h b/c_src/workitems.h index eba427ef..e78a6e52 100644 --- a/c_src/workitems.h +++ b/c_src/workitems.h @@ -476,8 +476,8 @@ struct RangeScanOptions { bool fill_cache; bool verify_checksums; - ExpressionNode* range_filter = 0; - Extractor* extractor = 0; + ExpressionNode* range_filter; + Extractor* extractor; RangeScanOptions() : max_unacked_bytes(10 * 1024 * 1024), @@ -485,7 +485,7 @@ struct RangeScanOptions { max_batch_bytes(1 * 1024 * 1024), limit(0), start_inclusive(true), end_inclusive(false), - fill_cache(false), verify_checksums(true) + fill_cache(false), verify_checksums(true), range_filter(0), extractor(0) { } }; From 925a9bd1352c2cd30eb87e8e5c2965762f8c6872 Mon Sep 17 00:00:00 2001 From: erikleitch Date: Fri, 28 Aug 2015 11:54:27 -0700 Subject: [PATCH 21/22] Fixed range_scan_ack declaration in eleveldb.erl that failed dialyzer test --- src/eleveldb.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/eleveldb.erl b/src/eleveldb.erl index d4019ead..6d9317bd 100644 --- a/src/eleveldb.erl +++ b/src/eleveldb.erl @@ -326,7 +326,7 @@ async_iterator_close(_CallerRef, _IRef) -> range_scan(_DBRef, _StartKey, _EndKey, _Opts) -> erlang:nif_error({error, not_loaded}). --spec range_scan_ack(reference(), pos_integer()) -> ok. +-spec range_scan_ack(reference(), pos_integer()|0) -> ok | needs_reack. range_scan_ack(_Ref, _NumBytes) -> erlang:nif_error({error, not_loaded}). From 3c730839d1f9a58a4ea414ae93ed62f1dbb4ca23 Mon Sep 17 00:00:00 2001 From: Andrei Zavada Date: Thu, 3 Sep 2015 20:16:59 +0400 Subject: [PATCH 22/22] make decode_record return the thing, rather than {ok, Thing} --- src/eleveldb_ts.erl | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/eleveldb_ts.erl b/src/eleveldb_ts.erl index b7d742aa..c6ae4af7 100644 --- a/src/eleveldb_ts.erl +++ b/src/eleveldb_ts.erl @@ -29,8 +29,10 @@ encode_k2([{binary, L} | T], Bin) when is_list(L) -> encode_record(Record) -> msgpack:pack(Record, [{format, jsx}]). -decode_record(Bin) when is_binary(Bin) -> - msgpack:unpack(Bin, [{format, jsx}]). +decode_record(Bin) when is_binary(Bin) -> + {ok, Record} = msgpack:unpack(Bin, [{format, jsx}]), + Record. + %% %% Internal Funs