diff --git a/c_src/atoms.h b/c_src/atoms.h index 1ae8a166..ff77c36f 100644 --- a/c_src/atoms.h +++ b/c_src/atoms.h @@ -70,6 +70,10 @@ extern ERL_NIF_TERM ATOM_RANGE_SCAN_END; extern ERL_NIF_TERM ATOM_NEEDS_REACK; extern ERL_NIF_TERM ATOM_TIME_SERIES; extern ERL_NIF_TERM ATOM_RANGE_FILTER; +extern ERL_NIF_TERM ATOM_STREAMING_BATCH; +extern ERL_NIF_TERM ATOM_STREAMING_END; +extern ERL_NIF_TERM ATOM_LIMIT; +extern ERL_NIF_TERM ATOM_UNDEFINED; } // namespace eleveldb diff --git a/c_src/build_deps.sh b/c_src/build_deps.sh index 5620f2ff..f5301648 100755 --- a/c_src/build_deps.sh +++ b/c_src/build_deps.sh @@ -8,9 +8,7 @@ if [ `uname -s` = 'SunOS' -a "${POSIX_SHELL}" != "true" ]; then fi unset POSIX_SHELL # clear it so if we invoke other scripts, they run as ksh as well -#LEVELDB_VSN="2.0.0" -LEVELDB_VSN="prototype/timeseries" - +LEVELDB_VSN="feature/streaming-folds-filter" SNAPPY_VSN="1.0.4" set -e @@ -48,6 +46,7 @@ case "$1" in export CXXFLAGS="$CXXFLAGS -I $BASEDIR/system/include" export LDFLAGS="$LDFLAGS -L$BASEDIR/system/lib" export LD_LIBRARY_PATH="$BASEDIR/system/lib:$LD_LIBRARY_PATH" + export LEVELDB_VSN="$LEVELDB_VSN" (cd leveldb && $MAKE check) @@ -72,6 +71,7 @@ case "$1" in export CXXFLAGS="$CXXFLAGS -I $BASEDIR/system/include" export LDFLAGS="$LDFLAGS -L$BASEDIR/system/lib" export LD_LIBRARY_PATH="$BASEDIR/system/lib:$LD_LIBRARY_PATH" + export LEVELDB_VSN="$LEVELDB_VSN" if [ ! -d leveldb ]; then git clone git://github.com/basho/leveldb diff --git a/c_src/eleveldb.cc b/c_src/eleveldb.cc index f28f12d8..3c44c6fa 100644 --- a/c_src/eleveldb.cc +++ b/c_src/eleveldb.cc @@ -66,7 +66,7 @@ static ErlNifFunc nif_funcs[] = {"async_close", 2, eleveldb::async_close}, {"async_iterator_close", 2, eleveldb::async_iterator_close}, {"status", 2, eleveldb_status}, - {"destroy", 2, eleveldb_destroy}, + {"async_destroy", 3, eleveldb::async_destroy}, {"repair", 2, eleveldb_repair}, {"is_empty", 1, eleveldb_is_empty}, @@ -81,10 +81,16 @@ static ErlNifFunc nif_funcs[] = {"async_iterator", 3, eleveldb::async_iterator}, {"async_iterator", 4, eleveldb::async_iterator}, - {"async_iterator_move", 3, eleveldb::async_iterator_move}, + {"range_scan", 4, eleveldb::range_scan}, - {"range_scan_ack", 2, eleveldb::range_scan_ack} + {"range_scan_ack", 2, eleveldb::range_scan_ack}, + + {"streaming_start", 4, eleveldb::streaming_start}, + {"streaming_ack", 2, eleveldb::streaming_ack}, + {"streaming_stop", 1, eleveldb::streaming_stop}, + + {"current_usec", 0, eleveldb::currentMicroSeconds}, }; @@ -139,6 +145,7 @@ ERL_NIF_TERM ATOM_BLOCK_CACHE_THRESHOLD; ERL_NIF_TERM ATOM_IS_INTERNAL_DB; ERL_NIF_TERM ATOM_LIMITED_DEVELOPER_MEM; ERL_NIF_TERM ATOM_ELEVELDB_THREADS; +ERL_NIF_TERM ATOM_ELEVELDB_STREAM_THREADS; ERL_NIF_TERM ATOM_FADVISE_WILLNEED; ERL_NIF_TERM ATOM_DELETE_THRESHOLD; ERL_NIF_TERM ATOM_TIERED_SLOW_LEVEL; @@ -154,6 +161,10 @@ ERL_NIF_TERM ATOM_NEEDS_REACK; ERL_NIF_TERM ATOM_TIME_SERIES; ERL_NIF_TERM ATOM_GLOBAL_DATA_DIR; ERL_NIF_TERM ATOM_RANGE_FILTER; +ERL_NIF_TERM ATOM_STREAMING_BATCH; +ERL_NIF_TERM ATOM_STREAMING_END; +ERL_NIF_TERM ATOM_LIMIT; +ERL_NIF_TERM ATOM_UNDEFINED; } // namespace eleveldb @@ -166,6 +177,8 @@ class eleveldb_priv_data; static volatile uint64_t gCurrentTotalMemory=0; +int64_t getCurrentMicroSeconds(); + // Erlang helpers: ERL_NIF_TERM error_einval(ErlNifEnv* env) { @@ -194,6 +207,7 @@ static ERL_NIF_TERM slice_to_binary(ErlNifEnv* env, leveldb::Slice s) struct EleveldbOptions { int m_EleveldbThreads; + int m_EleveldbStreamThreads; int m_LeveldbImmThreads; int m_LeveldbBGWriteThreads; int m_LeveldbOverlapThreads; @@ -207,7 +221,7 @@ struct EleveldbOptions std::string m_GlobalDataDir; EleveldbOptions() - : m_EleveldbThreads(71), + : m_EleveldbThreads(51), m_EleveldbStreamThreads(20), m_LeveldbImmThreads(0), m_LeveldbBGWriteThreads(0), m_LeveldbOverlapThreads(0), m_LeveldbGroomingThreads(0), m_TotalMemPercent(0), m_TotalMem(0), @@ -218,6 +232,7 @@ struct EleveldbOptions void Dump() { syslog(LOG_ERR, " m_EleveldbThreads: %d\n", m_EleveldbThreads); + syslog(LOG_ERR, " m_EleveldbStreamThreads: %d\n", m_EleveldbStreamThreads); syslog(LOG_ERR, " m_LeveldbImmThreads: %d\n", m_LeveldbImmThreads); syslog(LOG_ERR, " m_LeveldbBGWriteThreads: %d\n", m_LeveldbBGWriteThreads); syslog(LOG_ERR, " m_LeveldbOverlapThreads: %d\n", m_LeveldbOverlapThreads); @@ -240,11 +255,13 @@ class eleveldb_priv_data public: EleveldbOptions m_Opts; eleveldb::eleveldb_thread_pool thread_pool; + eleveldb::eleveldb_thread_pool stream_thread_pool; leveldb::DataDictionary data_dictionary; explicit eleveldb_priv_data(EleveldbOptions & Options) - : m_Opts(Options), thread_pool(Options.m_EleveldbThreads), - data_dictionary(Options.m_GlobalDataDir) + : m_Opts(Options), thread_pool(Options.m_EleveldbThreads), + stream_thread_pool(Options.m_EleveldbStreamThreads), + data_dictionary(Options.m_GlobalDataDir) {} private: @@ -274,7 +291,7 @@ ERL_NIF_TERM parse_init_option(ErlNifEnv* env, ERL_NIF_TERM item, EleveldbOption { if (option[0] == eleveldb::ATOM_TOTAL_LEVELDB_MEM) { - size_t memory_sz; + unsigned long memory_sz; if (enif_get_ulong(env, option[1], &memory_sz)) { if (memory_sz != 0) @@ -313,6 +330,17 @@ ERL_NIF_TERM parse_init_option(ErlNifEnv* env, ERL_NIF_TERM item, EleveldbOption } // if } // if } + else if (option[0] == eleveldb::ATOM_ELEVELDB_STREAM_THREADS) + { + unsigned long temp; + if (enif_get_ulong(env, option[1], &temp)) + { + if (temp != 0) + { + opts.m_EleveldbStreamThreads = temp; + } // if + } // if + } else if (option[0] == eleveldb::ATOM_FADVISE_WILLNEED) { opts.m_FadviseWillNeed = (option[1] == eleveldb::ATOM_TRUE); @@ -366,7 +394,7 @@ ERL_NIF_TERM parse_open_option(ErlNifEnv* env, ERL_NIF_TERM item, leveldb::Optio } else if (option[0] == eleveldb::ATOM_BLOCK_CACHE_THRESHOLD) { - size_t memory_sz; + unsigned long memory_sz; if (enif_get_ulong(env, option[1], &memory_sz)) { if (memory_sz != 0) @@ -552,6 +580,39 @@ ERL_NIF_TERM parse_range_scan_option(ErlNifEnv* env, ERL_NIF_TERM item, return eleveldb::ATOM_OK; } +ERL_NIF_TERM parse_streaming_option(ErlNifEnv* env, ERL_NIF_TERM item, + eleveldb::RangeScanOptions & opts) +{ + int arity; + const ERL_NIF_TERM* option; + if (enif_get_tuple(env, item, &arity, &option) && 2 == arity) + { + if (option[0] == eleveldb::ATOM_START_INCLUSIVE) + opts.start_inclusive = (option[1] == eleveldb::ATOM_TRUE); + else if (option[0] == eleveldb::ATOM_END_INCLUSIVE) + opts.end_inclusive = (option[1] == eleveldb::ATOM_TRUE); + else if (option[0] == eleveldb::ATOM_FILL_CACHE) + opts.fill_cache = (option[1] == eleveldb::ATOM_TRUE); + else if (option[0] == eleveldb::ATOM_VERIFY_CHECKSUMS) + opts.verify_checksums = (option[1] == eleveldb::ATOM_TRUE); + else if (option[0] == eleveldb::ATOM_MAX_UNACKED_BYTES) { + unsigned max_unacked_bytes; + if (enif_get_uint(env, option[1], &max_unacked_bytes)) + opts.max_unacked_bytes = max_unacked_bytes; + } else if (option[0] == eleveldb::ATOM_MAX_BATCH_BYTES) { + unsigned max_batch_bytes; + if (enif_get_uint(env, option[1], &max_batch_bytes)) + opts.max_batch_bytes = max_batch_bytes; + } else if (option[0] == eleveldb::ATOM_LIMIT) { + unsigned limit; + if (enif_get_uint(env, option[1], &limit)) + opts.limit = limit; + } + } + + return eleveldb::ATOM_OK; +} + ERL_NIF_TERM parse_write_option(ErlNifEnv* env, ERL_NIF_TERM item, leveldb::WriteOptions& opts) { int arity; @@ -988,101 +1049,6 @@ async_iterator( return submit_to_thread_queue(work_item, env, caller_ref); } // async_iterator -ERL_NIF_TERM -range_scan_ack(ErlNifEnv * env, - int argc, - const ERL_NIF_TERM argv[]) -{ - const ERL_NIF_TERM ref = argv[0]; - const ERL_NIF_TERM num_bytes_term = argv[1]; - uint32_t num_bytes; - - if (!enif_get_uint(env, num_bytes_term, &num_bytes)) - return enif_make_badarg(env); - - using eleveldb::RangeScanTask; - RangeScanTask::SyncHandle * sync_handle; - sync_handle = RangeScanTask::RetrieveSyncHandle(env, ref); - - if (!sync_handle || !sync_handle->sync_obj) - return enif_make_badarg(env); - - bool needs_reack = sync_handle->sync_obj->AckBytes(num_bytes); - return needs_reack ? eleveldb::ATOM_NEEDS_REACK : eleveldb::ATOM_OK; -} - -ERL_NIF_TERM -range_scan(ErlNifEnv * env, - int argc, - const ERL_NIF_TERM argv[]) -{ - const ERL_NIF_TERM db_ref = argv[0]; - const ERL_NIF_TERM start_key_term = argv[1]; - const ERL_NIF_TERM end_key_term = argv[2]; - const ERL_NIF_TERM options_list = argv[3]; - - ReferencePtr db_ptr; - db_ptr.assign(DbObject::RetrieveDbObject(env, db_ref)); - - if (NULL == db_ptr.get() - || !enif_is_binary(env, start_key_term) - || !enif_is_binary(env, end_key_term) - || !enif_is_list(env, options_list)) - { - return enif_make_badarg(env); - } - - if (NULL == db_ptr->m_Db) - return error_einval(env); - - const leveldb::Options & options = db_ptr->m_Db->GetOptions(); - leveldb::KeyTranslator * key_tx = options.translator; - - ERL_NIF_TERM reply_ref = enif_make_ref(env); - - ErlNifBinary start_key_bin; - enif_inspect_binary(env, start_key_term, &start_key_bin); - leveldb::Slice start_key_slice((const char *)start_key_bin.data, - start_key_bin.size); - std::string start_key; - start_key.resize(key_tx->GetInternalKeySize(start_key_slice)); - key_tx->TranslateExternalKey(start_key_slice, (char*)start_key.data()); - - ErlNifBinary end_key_bin; - enif_inspect_binary(env, end_key_term, &end_key_bin); - leveldb::Slice end_key_slice((const char *)end_key_bin.data, - end_key_bin.size); - std::string end_key; - end_key.resize(key_tx->GetInternalKeySize(end_key_slice)); - key_tx->TranslateExternalKey(end_key_slice, (char*)end_key.data()); - - RangeScanOptions opts; - fold(env, options_list, parse_range_scan_option, opts); - - using eleveldb::RangeScanTask; - RangeScanTask::SyncHandle * sync_handle = - RangeScanTask::CreateSyncHandle(opts); - - ERL_NIF_TERM sync_ref = enif_make_resource(env, sync_handle); - - RangeScanTask * task = - new RangeScanTask(env, reply_ref, db_ptr.get(), - start_key, end_key, opts, sync_handle->sync_obj); - - eleveldb_priv_data& priv = - *static_cast(enif_priv_data(env)); - - if (false == priv.thread_pool.submit(task)) - { - delete task; // TODO: May require fancier destruction. - // TODO: Add thread pool submit error atom - return enif_make_tuple2(env, eleveldb::ATOM_ERROR, reply_ref); - } - - return enif_make_tuple2(env, eleveldb::ATOM_OK, - enif_make_tuple2(env, reply_ref, sync_ref)); -} - ERL_NIF_TERM async_iterator_move( ErlNifEnv* env, @@ -1340,9 +1306,302 @@ async_iterator_close( } // else } // async_iterator_close +//======================================================================= +// Range scan iteration, to compare with streaming version +//======================================================================= + +ERL_NIF_TERM +range_scan_ack(ErlNifEnv * env, + int argc, + const ERL_NIF_TERM argv[]) +{ + const ERL_NIF_TERM ref = argv[0]; + const ERL_NIF_TERM num_bytes_term = argv[1]; + uint32_t num_bytes; + + if (!enif_get_uint(env, num_bytes_term, &num_bytes)) + return enif_make_badarg(env); + + using eleveldb::RangeScanTask; + RangeScanTask::SyncHandle * sync_handle; + sync_handle = RangeScanTask::RetrieveSyncHandle(env, ref); + + if (!sync_handle || !sync_handle->sync_obj) + return enif_make_badarg(env); + + bool needs_reack = sync_handle->sync_obj->AckBytesRet(num_bytes); + return needs_reack ? eleveldb::ATOM_NEEDS_REACK : eleveldb::ATOM_OK; +} + +ERL_NIF_TERM +range_scan(ErlNifEnv * env, + int argc, + const ERL_NIF_TERM argv[]) +{ + const ERL_NIF_TERM db_ref = argv[0]; + const ERL_NIF_TERM start_key_term = argv[1]; + const ERL_NIF_TERM end_key_term = argv[2]; + const ERL_NIF_TERM options_list = argv[3]; + + ReferencePtr db_ptr; + db_ptr.assign(DbObject::RetrieveDbObject(env, db_ref)); + + if (NULL == db_ptr.get() + || !enif_is_binary(env, start_key_term) + || !enif_is_binary(env, end_key_term) + || !enif_is_list(env, options_list)) + { + return enif_make_badarg(env); + } + + if (NULL == db_ptr->m_Db) + return error_einval(env); + + const leveldb::Options & options = db_ptr->m_Db->GetOptions(); + leveldb::KeyTranslator * key_tx = options.translator; + + ERL_NIF_TERM reply_ref = enif_make_ref(env); + + ErlNifBinary start_key_bin; + enif_inspect_binary(env, start_key_term, &start_key_bin); + leveldb::Slice start_key_slice((const char *)start_key_bin.data, + start_key_bin.size); + std::string start_key; + start_key.resize(key_tx->GetInternalKeySize(start_key_slice)); + key_tx->TranslateExternalKey(start_key_slice, (char*)start_key.data()); + + ErlNifBinary end_key_bin; + enif_inspect_binary(env, end_key_term, &end_key_bin); + leveldb::Slice end_key_slice((const char *)end_key_bin.data, + end_key_bin.size); + + std::string end_key; + end_key.resize(key_tx->GetInternalKeySize(end_key_slice)); + key_tx->TranslateExternalKey(end_key_slice, (char*)end_key.data()); + + RangeScanOptions opts; + fold(env, options_list, parse_range_scan_option, opts); + + eleveldb::RangeScanTask::SyncHandle * sync_handle = + eleveldb::RangeScanTask::CreateSyncHandle(opts); + + ERL_NIF_TERM sync_ref = enif_make_resource(env, sync_handle); + + RangeScanTaskOld * task = + new RangeScanTaskOld(env, reply_ref, db_ptr.get(), + start_key, &end_key, opts, sync_handle->sync_obj); + + eleveldb_priv_data& priv = + *static_cast(enif_priv_data(env)); + + if (false == priv.thread_pool.submit(task)) + { + delete task; // TODO: May require fancier destruction. + // TODO: Add thread pool submit error atom + return enif_make_tuple2(env, eleveldb::ATOM_ERROR, reply_ref); + } + + return enif_make_tuple2(env, eleveldb::ATOM_OK, + enif_make_tuple2(env, reply_ref, sync_ref)); +} + +//======================================================================= +// Streaming version of iterator, from Engel's streaming-folds branch +//======================================================================= + +/**....................................................................... + * Erlang client ack receipt of a batch of data from streaming + */ +ERL_NIF_TERM +streaming_ack(ErlNifEnv * env, + int argc, + const ERL_NIF_TERM argv[]) +{ + const ERL_NIF_TERM ref = argv[0]; + const ERL_NIF_TERM num_bytes_term = argv[1]; + uint32_t num_bytes; + + if (!enif_get_uint(env, num_bytes_term, &num_bytes)) + return enif_make_badarg(env); + + using eleveldb::RangeScanTask; + RangeScanTask::SyncHandle * sync_handle; + sync_handle = RangeScanTask::RetrieveSyncHandle(env, ref); + + if (!sync_handle || !sync_handle->sync_obj) + return enif_make_badarg(env); + + sync_handle->sync_obj->AckBytes(num_bytes); + + return eleveldb::ATOM_OK; +} + +/**....................................................................... + * Stop a stream that's currently in progress + */ +ERL_NIF_TERM +streaming_stop(ErlNifEnv * env, + int argc, + const ERL_NIF_TERM argv[]) +{ + const ERL_NIF_TERM ref = argv[0]; + + using eleveldb::RangeScanTask; + RangeScanTask::SyncHandle * sync_handle; + sync_handle = RangeScanTask::RetrieveSyncHandle(env, ref); + + if (!sync_handle) + return enif_make_badarg(env); + + RangeScanTask::SyncHandleResourceCleanup(env, sync_handle); + + return eleveldb::ATOM_OK; +} + +/**....................................................................... + * Start streaming + */ +ERL_NIF_TERM +streaming_start(ErlNifEnv * env, + int argc, + const ERL_NIF_TERM argv[]) +{ + const ERL_NIF_TERM db_ref = argv[0]; + const ERL_NIF_TERM start_key_term = argv[1]; + const ERL_NIF_TERM end_key_term = argv[2]; + const ERL_NIF_TERM options_list = argv[3]; + + ReferencePtr db_ptr; + db_ptr.assign(DbObject::RetrieveDbObject(env, db_ref)); + + bool has_end_key = enif_is_binary(env, end_key_term); + + if (NULL == db_ptr.get() + || !enif_is_binary(env, start_key_term) + || (!has_end_key && eleveldb::ATOM_UNDEFINED != end_key_term) + || !enif_is_list(env, options_list)) + { + return enif_make_badarg(env); + } + + if (NULL == db_ptr->m_Db) + return error_einval(env); + + ERL_NIF_TERM reply_ref = enif_make_ref(env); + + ErlNifBinary start_key_bin; + enif_inspect_binary(env, start_key_term, &start_key_bin); + std::string start_key((const char*)start_key_bin.data, start_key_bin.size); + + std::string * end_key_ptr = NULL; + std::string end_key; + if (has_end_key) { + ErlNifBinary end_key_bin; + enif_inspect_binary(env, end_key_term, &end_key_bin); + end_key.assign((const char*)end_key_bin.data, end_key_bin.size); + end_key_ptr = &end_key; + } + + RangeScanOptions opts; + fold(env, options_list, parse_streaming_option, opts); + + using eleveldb::RangeScanTask; + RangeScanTask::SyncHandle * sync_handle = + RangeScanTask::CreateSyncHandle(opts); + + ERL_NIF_TERM sync_ref = enif_make_resource(env, sync_handle); + // Release so it's destroyed on GC. + enif_release_resource(sync_handle); + + RangeScanTask * task = + new RangeScanTask(env, reply_ref, db_ptr.get(), + start_key, end_key_ptr, opts, sync_handle->sync_obj); + + eleveldb_priv_data& priv = + *static_cast(enif_priv_data(env)); + + if (false == priv.stream_thread_pool.submit(task)) + { + delete task; // TODO: May require fancier destruction. + // TODO: Add thread pool submit error atom + return enif_make_tuple2(env, eleveldb::ATOM_ERROR, reply_ref); + } + + return enif_make_tuple2(env, eleveldb::ATOM_OK, + enif_make_tuple2(env, reply_ref, sync_ref)); +} // streaming_start + +ERL_NIF_TERM +currentMicroSeconds( + ErlNifEnv* env, + int argc, + const ERL_NIF_TERM argv[]) +{ + return enif_make_int64(env, getCurrentMicroSeconds()); +} // currentMicroSeconds + + +ERL_NIF_TERM +async_destroy( + ErlNifEnv* env, + int argc, + const ERL_NIF_TERM argv[]) +{ + char db_name[4096]; + + if(!enif_get_string(env, argv[1], db_name, sizeof(db_name), ERL_NIF_LATIN1) || + !enif_is_list(env, argv[2])) + { + return enif_make_badarg(env); + } // if + + ERL_NIF_TERM caller_ref = argv[0]; + + eleveldb_priv_data& priv = *static_cast(enif_priv_data(env)); + + leveldb::Options *opts = new leveldb::Options; + fold(env, argv[2], parse_open_option, *opts); + + eleveldb::WorkTask *work_item = new eleveldb::DestroyTask(env, caller_ref, + db_name, opts); + + if(false == priv.thread_pool.submit(work_item)) + { + delete work_item; + return send_reply(env, caller_ref, + enif_make_tuple2(env, eleveldb::ATOM_ERROR, caller_ref)); + } + + return eleveldb::ATOM_OK; + +} // async_destroy + } // namespace eleveldb +int64_t getCurrentMicroSeconds() +{ +#if _POSIX_TIMERS >= 200801L + +struct timespec ts; + +// this is rumored to be faster that gettimeofday(), and sometimes +// shift less ... someday use CLOCK_MONOTONIC_RAW + + clock_gettime(CLOCK_MONOTONIC, &ts); + return static_cast(ts.tv_sec) * 1000000 + ts.tv_nsec/1000; + +#else +struct timeval tv; +gettimeofday(&tv, NULL); +return static_cast(tv.tv_sec) * 1000000 + tv.tv_usec; + +#endif +} + +/** + * HEY YOU ... please make async + */ ERL_NIF_TERM eleveldb_status( ErlNifEnv* env, @@ -1384,6 +1643,9 @@ eleveldb_status( } // eleveldb_status +/** + * HEY YOU ... please make async + */ ERL_NIF_TERM eleveldb_repair( ErlNifEnv* env, @@ -1412,40 +1674,6 @@ eleveldb_repair( } } // eleveldb_repair -/** - * HEY YOU ... please make async - */ -ERL_NIF_TERM -eleveldb_destroy( - ErlNifEnv* env, - int argc, - const ERL_NIF_TERM argv[]) -{ - char name[4096]; - if (enif_get_string(env, argv[0], name, sizeof(name), ERL_NIF_LATIN1) && - enif_is_list(env, argv[1])) - { - // Parse out the options - leveldb::Options opts; - fold(env, argv[1], parse_open_option, opts); - - leveldb::Status status = leveldb::DestroyDB(name, opts); - if (!status.ok()) - { - return error_tuple(env, eleveldb::ATOM_ERROR_DB_DESTROY, status); - } - else - { - return eleveldb::ATOM_OK; - } - } - else - { - return enif_make_badarg(env); - } - -} // eleveldb_destroy - ERL_NIF_TERM eleveldb_is_empty( @@ -1563,6 +1791,7 @@ try ATOM(eleveldb::ATOM_IS_INTERNAL_DB, "is_internal_db"); ATOM(eleveldb::ATOM_LIMITED_DEVELOPER_MEM, "limited_developer_mem"); ATOM(eleveldb::ATOM_ELEVELDB_THREADS, "eleveldb_threads"); + ATOM(eleveldb::ATOM_ELEVELDB_STREAM_THREADS, "eleveldb_stream_threads"); ATOM(eleveldb::ATOM_FADVISE_WILLNEED, "fadvise_willneed"); ATOM(eleveldb::ATOM_DELETE_THRESHOLD, "delete_threshold"); ATOM(eleveldb::ATOM_TIERED_SLOW_LEVEL, "tiered_slow_level"); @@ -1578,6 +1807,10 @@ try ATOM(eleveldb::ATOM_TIME_SERIES, "time_series"); ATOM(eleveldb::ATOM_GLOBAL_DATA_DIR, "global_data_dir"); ATOM(eleveldb::ATOM_RANGE_FILTER, "range_filter"); + ATOM(eleveldb::ATOM_STREAMING_BATCH, "streaming_batch"); + ATOM(eleveldb::ATOM_STREAMING_END, "streaming_end"); + ATOM(eleveldb::ATOM_LIMIT, "limit"); + ATOM(eleveldb::ATOM_UNDEFINED, "undefined"); #undef ATOM diff --git a/c_src/eleveldb.h b/c_src/eleveldb.h index 884b92d1..7482a9df 100644 --- a/c_src/eleveldb.h +++ b/c_src/eleveldb.h @@ -45,6 +45,7 @@ ERL_NIF_TERM async_open(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); ERL_NIF_TERM async_write(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); ERL_NIF_TERM async_get(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); ERL_NIF_TERM async_close(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); +ERL_NIF_TERM async_destroy(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); ERL_NIF_TERM async_open_family(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); ERL_NIF_TERM async_close_family(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); @@ -52,9 +53,16 @@ ERL_NIF_TERM async_close_family(ErlNifEnv* env, int argc, const ERL_NIF_TERM arg ERL_NIF_TERM async_iterator(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); ERL_NIF_TERM async_iterator_move(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); ERL_NIF_TERM async_iterator_close(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); + ERL_NIF_TERM range_scan(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); ERL_NIF_TERM range_scan_ack(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); +ERL_NIF_TERM streaming_start(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); +ERL_NIF_TERM streaming_ack(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); +ERL_NIF_TERM streaming_stop(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); + +ERL_NIF_TERM currentMicroSeconds(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); + } // namespace eleveldb diff --git a/c_src/filter.h b/c_src/filter.h index 4bc5a1bb..1938f1f4 100644 --- a/c_src/filter.h +++ b/c_src/filter.h @@ -109,10 +109,10 @@ struct ConstantValue: public ExpressionNode { template struct FieldValue: public ExpressionNode { const std::string field; - bool has_val=false; + bool has_val; T value; - FieldValue(const std::string fieldName): field(fieldName) { + FieldValue(const std::string fieldName): field(fieldName), has_val(false) { } inline virtual bool has_value() { diff --git a/c_src/workitems.cc b/c_src/workitems.cc index 3359f519..a5892713 100644 --- a/c_src/workitems.cc +++ b/c_src/workitems.cc @@ -454,14 +454,18 @@ RangeScanTask::RangeScanTask(ErlNifEnv * caller_env, ERL_NIF_TERM caller_ref, DbObject * db_handle, const std::string & start_key, - const std::string & end_key, + const std::string * end_key, RangeScanOptions & options, SyncObject * sync_obj) : WorkTask(caller_env, caller_ref, db_handle), options_(options), - start_key_(start_key), end_key_(end_key), + start_key_(start_key), + has_end_key_(bool(end_key)), sync_obj_(sync_obj) { + if (end_key) { + end_key_ = *end_key; + } sync_obj_->RefInc(); } @@ -470,10 +474,13 @@ RangeScanTask::~RangeScanTask() sync_obj_->RefDec(); } + RangeScanTask::SyncObject::SyncObject(const RangeScanOptions & opts) -: max_bytes_(opts.max_unacked_bytes), num_bytes_(0), +: max_bytes_(opts.max_unacked_bytes), + low_bytes_(opts.low_bytes), + num_bytes_(0), producer_sleeping_(false), pending_signal_(false), consumer_dead_(false), - mutex_(NULL), cond_(NULL) + crossed_under_max_(false), mutex_(NULL), cond_(NULL) { mutex_ = enif_mutex_create(0); cond_ = enif_cond_create(0); @@ -488,20 +495,22 @@ RangeScanTask::SyncObject::~SyncObject() void RangeScanTask::SyncObject::AddBytes(uint32_t n) { uint32_t num_bytes = add_and_fetch(&num_bytes_, n); - // Block if max bytes reached. + // Block if buffer full. if (num_bytes >= max_bytes_) { enif_mutex_lock(mutex_); - if (!consumer_dead_) { + if (!consumer_dead_ && !pending_signal_) { producer_sleeping_ = true; while (producer_sleeping_) { enif_cond_wait(cond_, mutex_); } } + if (pending_signal_) + pending_signal_ = false; enif_mutex_unlock(mutex_); } } -bool RangeScanTask::SyncObject::AckBytes(uint32_t n) +bool RangeScanTask::SyncObject::AckBytesRet(uint32_t n) { uint32_t num_bytes = sub_and_fetch(&num_bytes_, n); bool ret; @@ -530,14 +539,67 @@ bool RangeScanTask::SyncObject::AckBytes(uint32_t n) return ret; } +void RangeScanTask::SyncObject::AckBytes(uint32_t n) +{ + uint32_t num_bytes = sub_and_fetch(&num_bytes_, n); + + if (num_bytes < max_bytes_ && num_bytes_ + n >= max_bytes_) + crossed_under_max_ = true; + + // Detect if at some point buffer was full, but now we have + // acked enough bytes to go under the low watermark. + if (crossed_under_max_ && num_bytes < low_bytes_) { + crossed_under_max_ = false; + enif_mutex_lock(mutex_); + if (producer_sleeping_) { + producer_sleeping_ = false; + enif_cond_signal(cond_); + } else { + pending_signal_ = true; + } + enif_mutex_unlock(mutex_); + } +} + void RangeScanTask::SyncObject::MarkConsumerDead() { enif_mutex_lock(mutex_); consumer_dead_ = true; - if (producer_sleeping_) + if (producer_sleeping_) { + producer_sleeping_ = false; enif_cond_signal(cond_); + } enif_mutex_unlock(mutex_); } +bool RangeScanTask::SyncObject::IsConsumerDead() const { + return consumer_dead_; +} + +/** + * DestroyTask functions + */ + +DestroyTask::DestroyTask( + ErlNifEnv* caller_env, + ERL_NIF_TERM& _caller_ref, + const std::string& db_name_, + leveldb::Options *open_options_) + : WorkTask(caller_env, _caller_ref), + db_name(db_name_), open_options(open_options_) +{ +} // DestroyTask::DestroyTask + +work_result +DestroyTask::operator()() +{ + leveldb::Status status = leveldb::DestroyDB(db_name, *open_options); + + if(!status.ok()) + return error_tuple(local_env(), ATOM_ERROR_DB_DESTROY, status); + + return work_result(ATOM_OK); + +} // DestroyTask::operator() void send_batch(ErlNifPid * pid, ErlNifEnv * msg_env, ERL_NIF_TERM ref_term, ErlNifBinary * bin) { @@ -576,6 +638,177 @@ work_result RangeScanTask::operator()() ErlNifEnv * env = local_env_; ErlNifEnv * msg_env = enif_alloc_env(); read_options.fill_cache = options_.fill_cache; + read_options.verify_checksums = options_.verify_checksums; + leveldb::Iterator * iter = m_DbPtr->m_Db->NewIterator(read_options); + const leveldb::Comparator * cmp = m_DbPtr->m_DbOptions->comparator; + + const leveldb::Slice skey_slice(start_key_); + const leveldb::Slice ekey_slice(end_key_); + + const leveldb::Options& db_options = m_DbPtr->m_Db->GetOptions(); + leveldb::KeyTranslator* translator = db_options.translator; + + iter->Seek(skey_slice); + + ErlNifPid pid; + enif_get_local_pid(env, caller_pid_term, &pid); + ErlNifBinary bin; + const size_t initial_bin_size = size_t(options_.max_batch_bytes * 1.1); + size_t out_offset = 0; + size_t num_read = 0; + + std::string key_buffer; + key_buffer.reserve(256); + + //------------------------------------------------------------ + // Skip if not including first key and first key exists + //------------------------------------------------------------ + + if (!options_.start_inclusive + && iter->Valid() + && cmp->Compare(iter->key(), skey_slice) == 0) { + iter->Next(); + } + + while (!sync_obj_->IsConsumerDead()) { + + //------------------------------------------------------------ + // If reached end (iter invalid) or we've reached the + // specified limit on number of items (options_.limit), or the + // current key is past end key, send the batch and break out of the loop + //------------------------------------------------------------ + + if (!iter->Valid() + || (options_.limit > 0 && num_read >= options_.limit) + || (has_end_key_ && + (options_.end_inclusive ? + cmp->Compare(iter->key(), ekey_slice) > 0 : + cmp->Compare(iter->key(), ekey_slice) >= 0 + ))) { + + // If data are present in the batch (ie, out_offset != 0), + // send the batch now + + if (out_offset) { + + // Shrink it to final size. + + if (out_offset != bin.size) + enif_realloc_binary(&bin, out_offset); + + send_batch(&pid, msg_env, caller_ref_term, &bin); + out_offset = 0; + } + + break; + } + + //------------------------------------------------------------ + // Else keep going; shove the next entry in the batch, but + // only if it passes any user-specified filter + // ------------------------------------------------------------ + + leveldb::Slice key = iter->key(); + leveldb::Slice value = iter->value(); + + bool filter_passed = true; + if (options_.range_filter!=0) { + options_.extractor->extract(value.data(), value.size(), + options_.range_filter); + filter_passed = options_.range_filter->evaluate(); + } + + if (filter_passed) { + + key_buffer.resize(0); + translator->TranslateInternalKey(key, &key_buffer); + + const size_t ksz = key_buffer.size(); + const size_t vsz = value.size(); + + const size_t ksz_sz = VarintLength(ksz); + const size_t vsz_sz = VarintLength(vsz); + + const size_t esz = ksz + ksz_sz + vsz + vsz_sz; + const size_t next_offset = out_offset + esz; + + // Allocate the output data array if this is the first data + // (out_offset == 0) + + if (out_offset == 0) + enif_alloc_binary(initial_bin_size, &bin); + + //------------------------------------------------------------ + // If we need more space, allocate it exactly since that means we + // reached the batch max anyway and will send it right away + //------------------------------------------------------------ + + if (next_offset > bin.size) + enif_realloc_binary(&bin, next_offset); + + char * const out = (char*)bin.data + out_offset; + + EncodeVarint64(out, ksz); + memcpy(out + ksz_sz, key_buffer.data(), ksz); + + EncodeVarint64(out + ksz_sz + ksz, vsz); + memcpy(out + ksz_sz + ksz + vsz_sz, value.data(), vsz); + + out_offset = next_offset; + + // If we've reached the maximum number of bytes to include in + // the batch, possibly shrink the binary and send it + + if (out_offset >= options_.max_batch_bytes) { + + if (out_offset != bin.size) + enif_realloc_binary(&bin, out_offset); + + send_batch(&pid, msg_env, caller_ref_term, &bin); + + // Maybe block if max reached. + + sync_obj_->AddBytes(out_offset); + + out_offset = 0; + } + + // Increment the number of keys read and step to the next key + + ++num_read; + } + + iter->Next(); + } + + //------------------------------------------------------------ + // If exiting work loop, send a streaming_end message to any + // waiting erlang threads + //------------------------------------------------------------ + + if (!sync_obj_->IsConsumerDead()) { + ERL_NIF_TERM ref_copy = enif_make_copy(msg_env, caller_ref_term); + ERL_NIF_TERM msg = + enif_make_tuple2(msg_env, ATOM_STREAMING_END, ref_copy); + enif_send(NULL, &pid, msg_env, msg); + } + + if (out_offset) + enif_release_binary(&bin); + + enif_free_env(msg_env); + return work_result(); + +} // RangeScanTask::operator() + +#if 0 +work_result RangeScanTask::operator()() +{ + leveldb::ReadOptions read_options; + ErlNifEnv * env = local_env_; + ErlNifEnv * msg_env = enif_alloc_env(); + read_options.fill_cache = options_.fill_cache; + read_options.verify_checksums = options_.verify_checksums; leveldb::Iterator * iter = m_DbPtr->m_Db->NewIterator(read_options); const leveldb::Comparator * cmp = m_DbPtr->m_DbOptions->comparator; const leveldb::Slice skey_slice(start_key_); @@ -590,6 +823,8 @@ work_result RangeScanTask::operator()() ErlNifBinary bin; const size_t initial_bin_size = size_t(options_.max_batch_bytes * 1.1); size_t out_offset = 0; + size_t num_read = 0; + std::string key_buffer; key_buffer.reserve(256); @@ -653,6 +888,7 @@ work_result RangeScanTask::operator()() enif_free_env(msg_env); return work_result(); } // RangeScanTask::operator() +#endif ErlNifResourceType * RangeScanTask::sync_handle_resource_ = NULL; @@ -698,4 +934,105 @@ void RangeScanTask::SyncHandleResourceCleanup(ErlNifEnv * env, void * arg) } } +//======================================================================= +// Backwards compatibility for range_scan operations +//======================================================================= + +RangeScanTaskOld::RangeScanTaskOld(ErlNifEnv * caller_env, + ERL_NIF_TERM caller_ref, + DbObject * db_handle, + const std::string & start_key, + const std::string * end_key, + RangeScanOptions & options, + SyncObject * sync_obj) : + RangeScanTask(caller_env, caller_ref, db_handle, start_key, end_key, options, sync_obj) +{ +} + +RangeScanTaskOld::~RangeScanTaskOld() {}; + +work_result RangeScanTaskOld::operator()() +{ + leveldb::ReadOptions read_options; + ErlNifEnv * env = local_env_; + ErlNifEnv * msg_env = enif_alloc_env(); + read_options.fill_cache = options_.fill_cache; + leveldb::Iterator * iter = m_DbPtr->m_Db->NewIterator(read_options); + const leveldb::Comparator * cmp = m_DbPtr->m_DbOptions->comparator; + const leveldb::Slice skey_slice(start_key_); + const leveldb::Slice ekey_slice(end_key_); + const leveldb::Options & db_options = m_DbPtr->m_Db->GetOptions(); + leveldb::KeyTranslator * translator = db_options.translator; + + iter->Seek(skey_slice); + + ErlNifPid pid; + enif_get_local_pid(env, caller_pid_term, &pid); + ErlNifBinary bin; + const size_t initial_bin_size = size_t(options_.max_batch_bytes * 1.1); + size_t out_offset = 0; + std::string key_buffer; + key_buffer.reserve(256); + + for (;;) { + // If reached end or key past end key. + if (!iter->Valid() + || cmp->Compare(iter->key(), ekey_slice) >= 0) { + // If data in batch + if (out_offset) { + // Shrink it to final size. + if (out_offset != bin.size) + enif_realloc_binary(&bin, out_offset); + send_batch(&pid, msg_env, caller_ref_term, &bin); + } + break; + } + // Shove next entry in the batch. + leveldb::Slice key = iter->key(); + leveldb::Slice value = iter->value(); + bool filter_passed = true; + if (options_.range_filter!=0) { + options_.extractor->extract(value.data(), value.size(), options_.range_filter); + filter_passed = options_.range_filter->evaluate(); + } + if (filter_passed) { + key_buffer.resize(0); + translator->TranslateInternalKey(key, &key_buffer); + const size_t ksz = key_buffer.size(), vsz = value.size(); + const size_t ksz_sz = VarintLength(ksz); + const size_t vsz_sz = VarintLength(vsz); + const size_t esz = ksz + ksz_sz + vsz + vsz_sz; + const size_t next_offset = out_offset + esz; + if (out_offset == 0) + enif_alloc_binary(initial_bin_size, &bin); + // If we need more space, allocate it exactly since that means we + // reached the batch max anyway and will send it right away. + if (next_offset > bin.size) + enif_realloc_binary(&bin, next_offset); + char * const out = (char*)bin.data + out_offset; + EncodeVarint64(out, ksz); + memcpy(out + ksz_sz, key_buffer.data(), ksz); + EncodeVarint64(out + ksz_sz + ksz, vsz); + memcpy(out + ksz_sz + ksz + vsz_sz, value.data(), vsz); + out_offset = next_offset; + if (out_offset >= options_.max_batch_bytes) { + if (out_offset != bin.size) + enif_realloc_binary(&bin, out_offset); + send_batch(&pid, msg_env, caller_ref_term, &bin); + // Maybe block if max reached. + sync_obj_->AddBytes(out_offset); + out_offset = 0; + } + } + iter->Next(); + } + + ERL_NIF_TERM ref_copy = enif_make_copy(msg_env, caller_ref_term); + ERL_NIF_TERM msg = + enif_make_tuple2(msg_env, ATOM_RANGE_SCAN_END, ref_copy); + enif_send(NULL, &pid, msg_env, msg); + enif_free_env(msg_env); + return work_result(); +} // RangeScanTask::operator() + } // namespace eleveldb diff --git a/c_src/workitems.h b/c_src/workitems.h index 98b0ab43..49b75696 100644 --- a/c_src/workitems.h +++ b/c_src/workitems.h @@ -452,17 +452,40 @@ class ItrCloseTask : public WorkTask }; // class ItrCloseTask struct RangeScanOptions { + + // Byte-level controls for batching/ack + size_t max_unacked_bytes; + size_t low_bytes; size_t max_batch_bytes; + + // Max number of items to return. Zero means unlimited. + + size_t limit; + + // Include the start key in streaming iteration? + bool start_inclusive; + + // Include the end key in streaming iteration? + bool end_inclusive; + + // Read options + bool fill_cache; - ExpressionNode* range_filter = 0; - Extractor* extractor = 0; + bool verify_checksums; + + ExpressionNode* range_filter; + Extractor* extractor; RangeScanOptions() - : max_unacked_bytes(10 * 1024 * 1024), max_batch_bytes(1 * 1024 * 1024), - start_inclusive(true), end_inclusive(false), fill_cache(false) + : max_unacked_bytes(10 * 1024 * 1024), + low_bytes(2 * 1024 * 1024), + max_batch_bytes(1 * 1024 * 1024), + limit(0), + start_inclusive(true), end_inclusive(false), + fill_cache(false), verify_checksums(true), range_filter(0), extractor(0) { } }; @@ -489,21 +512,26 @@ class RangeScanTask : public WorkTask // side to shut down. void AddBytes(uint32_t n); - // Returns true if num bytes was over the max, so that producer - // has gone or will go to sleep waiting on ack. Consumer *needs* - // to signal it or it will sleep forever. - bool AckBytes(uint32_t n); + void AckBytes(uint32_t n); + bool AckBytesRet(uint32_t n); // Should be called when the Erlang handle is garbage collected // so no process is there to consume the output. void MarkConsumerDead(); + bool IsConsumerDead() const; + private: const uint32_t max_bytes_; + const uint32_t low_bytes_; volatile uint32_t num_bytes_; - volatile bool producer_sleeping_; + bool producer_sleeping_; + // Set if producer filled up but consumer acked before + // producer went to sleep. Producer should abort going to + // sleep upon seeing this set. volatile bool pending_signal_; volatile bool consumer_dead_; + volatile bool crossed_under_max_; ErlNifMutex * mutex_; ErlNifCond * cond_; }; @@ -516,7 +544,7 @@ class RangeScanTask : public WorkTask ERL_NIF_TERM caller_ref, DbObject * db_handle, const std::string & start_key, - const std::string & end_key, + const std::string * end_key, RangeScanOptions & options, SyncObject * sync_obj); @@ -534,10 +562,52 @@ class RangeScanTask : public WorkTask RangeScanOptions options_; std::string start_key_; std::string end_key_; + bool has_end_key_; SyncObject * sync_obj_; }; // class RangeScanTask +class RangeScanTaskOld : public RangeScanTask { + public: + + RangeScanTaskOld(ErlNifEnv * caller_env, + ERL_NIF_TERM caller_ref, + DbObject * db_handle, + const std::string & start_key, + const std::string * end_key, + RangeScanOptions & options, + SyncObject * sync_obj); + + virtual ~RangeScanTaskOld(); + virtual work_result operator()(); + +}; // class RangeScanTaskOld + +/** + * Background object for async open of a leveldb instance + */ + +class DestroyTask : public WorkTask +{ +protected: + std::string db_name; + leveldb::Options *open_options; // associated with db handle, we don't free it + +public: + DestroyTask(ErlNifEnv* caller_env, ERL_NIF_TERM& _caller_ref, + const std::string& db_name_, leveldb::Options *open_options_); + + virtual ~DestroyTask() {}; + + virtual work_result operator()(); + +private: + DestroyTask(); + DestroyTask(const DestroyTask &); + DestroyTask & operator=(const DestroyTask &); + +}; // class DestroyTask + } // namespace eleveldb diff --git a/rebar.config b/rebar.config index d49b71b1..61472df9 100644 --- a/rebar.config +++ b/rebar.config @@ -14,15 +14,14 @@ {deps, [ {msgpack, ".*", {git, "git://github.com/msgpack/msgpack-erlang.git", "master"}}, - {cuttlefish, ".*", {git, "git://github.com/basho/cuttlefish.git", {branch, "2.0"}}} + {cuttlefish, ".*", {git, "git://github.com/basho/cuttlefish.git", {branch, "develop"}}} ]}. {port_env, [ %% Make sure to set -fPIC when compiling leveldb {"CFLAGS", "$CFLAGS -Wall -O3 -fPIC"}, % try and make it build - % {"CXXFLAGS", "$CXXFLAGS -Wall -O3 -fPIC -std=c++11 --stdlib=libc++"}, - {"CXXFLAGS", "$CXXFLAGS -Wall -O3 -fPIC -std=gnu++11"}, + {"CXXFLAGS", "$CXXFLAGS -Wall -O3 -fPIC -std=c++0x"}, {"DRV_CFLAGS", "$DRV_CFLAGS -O3 -Wall -I c_src/leveldb/include"}, {"DRV_LDFLAGS", "$DRV_LDFLAGS c_src/leveldb/libleveldb.a c_src/system/lib/libsnappy.a -lstdc++"} ]}. diff --git a/rebar.config.script b/rebar.config.script index 60a64b9d..8515c12e 100644 --- a/rebar.config.script +++ b/rebar.config.script @@ -6,7 +6,8 @@ %% actually running. case os:type() of {unix,darwin} -> - Opt = " -mmacosx-version-min=10.8", +%% Opt = " -mmacosx-version-min=10.8", + Opt = " ", [Mjr|_] = string:tokens(os:cmd("/usr/bin/uname -r"), "."), Major = list_to_integer(Mjr), if diff --git a/src/eleveldb.app.src b/src/eleveldb.app.src index 991a8d61..cc2b4343 100644 --- a/src/eleveldb.app.src +++ b/src/eleveldb.app.src @@ -8,11 +8,11 @@ stdlib ]}, {env, [ - %% what percent of total memory should go to + %% what percent of total memory should go to %% leveldb. Default is 15% on the basis of - %% a single development machine running 5 + %% a single development machine running 5 %% Riak instances would therefore claim 75%. - %% REAL users will want this at 75 to 80 + %% REAL users will want this at 70%. {total_leveldb_mem_percent, 15}, %% Use bloom filter support by default diff --git a/src/eleveldb.erl b/src/eleveldb.erl index a254eefa..022f9446 100644 --- a/src/eleveldb.erl +++ b/src/eleveldb.erl @@ -29,11 +29,13 @@ get/4, put/4, put/5, + async_put/5, delete/3, delete/4, write/3, write/4, fold/4, + foldtest1/4, fold_keys/4, status/2, destroy/2, @@ -43,7 +45,14 @@ ts_batch_to_binary/1, ts_key/1, parse_string/1, - is_empty/1]). + is_empty/1, + encode/2, + current_usec/0]). + +%% for testing +-export([ + ts_key_TEST/1 + ]). -export([option_types/1, validate_options/2]). @@ -56,6 +65,9 @@ range_scan_ack/2, range_scan_fold/6]). +-export([emlfold1/4, + emlfold2/1]). + -export_type([db_ref/0, itr_ref/0]). @@ -121,6 +133,13 @@ init() -> {tiered_fast_prefix, string()} | {tiered_slow_prefix, string()}]. +-type read_option() :: {verify_checksums, boolean()} | + {fill_cache, boolean()}. + +-type itr_option() :: {iterator_refresh, boolean()}. + +-type itr_options() :: [read_option() | itr_option()]. + -type read_options() :: [{verify_checksums, boolean()} | {fill_cache, boolean()} | {iterator_refresh, boolean()}]. @@ -137,12 +156,37 @@ init() -> {max_batch_size, pos_integer()} | {max_unacked_bytes, pos_integer()}]. +-type streaming_option() :: {max_batch_bytes, pos_integer()} | + {max_unacked_bytes, pos_integer()}. + +-type streaming_options() :: [streaming_option()]. + +-type fold_method() :: iterator | streaming. + +-type fold_options() :: [read_option() | + {fold_method, fold_method()} | + {first_key, binary()} | + {last_key, binary() | undefined} | + {start_inclusive, boolean()} | + {end_inclusive, boolean()} | + {limit, pos_integer()} | + streaming_option()]. + -type iterator_action() :: first | last | next | prev | prefetch | binary(). -opaque db_ref() :: binary(). -opaque itr_ref() :: binary(). +-type stream_ref() :: {reference(), binary()}. + +encode(Val, timestamp) -> + <>; +encode(Val, binary) when is_binary(Val)-> + Val; +encode(Val, _) -> + term_to_binary(Val). + -spec async_open(reference(), string(), open_options()) -> ok. async_open(_CallerRef, _Name, _Opts) -> erlang:nif_error({error, not_loaded}). @@ -222,25 +266,31 @@ write(Ref, Updates, Opts) -> async_write(CallerRef, Ref, Updates, Opts), ?WAIT_FOR_REPLY(CallerRef). +-spec async_put(db_ref(), reference(), binary(), binary(), write_options()) -> ok. +async_put(Ref, Context, Key, Value, Opts) -> + Updates = [{put, Key, Value}], + async_write(Context, Ref, Updates, Opts), + ok. + -spec async_write(reference(), db_ref(), write_actions(), write_options()) -> ok. async_write(_CallerRef, _Ref, _Updates, _Opts) -> erlang:nif_error({error, not_loaded}). --spec async_iterator(reference(), db_ref(), read_options()) -> ok. --spec async_iterator(reference(), db_ref(), read_options(), keys_only) -> ok. +-spec async_iterator(reference(), db_ref(), itr_options()) -> ok. async_iterator(_CallerRef, _Ref, _Opts) -> erlang:nif_error({error, not_loaded}). +-spec async_iterator(reference(), db_ref(), itr_options(), keys_only) -> ok. async_iterator(_CallerRef, _Ref, _Opts, keys_only) -> erlang:nif_error({error, not_loaded}). --spec iterator(db_ref(), read_options()) -> {ok, itr_ref()}. +-spec iterator(db_ref(), itr_options()) -> {ok, itr_ref()}. iterator(Ref, Opts) -> CallerRef = make_ref(), async_iterator(CallerRef, Ref, Opts), ?WAIT_FOR_REPLY(CallerRef). --spec iterator(db_ref(), read_options(), keys_only) -> {ok, itr_ref()}. +-spec iterator(db_ref(), itr_options(), keys_only) -> {ok, itr_ref()}. iterator(Ref, Opts, keys_only) -> CallerRef = make_ref(), async_iterator(CallerRef, Ref, Opts, keys_only), @@ -278,15 +328,29 @@ iterator_close(IRef) -> async_iterator_close(_CallerRef, _IRef) -> erlang:nif_error({error, not_loaded}). --spec range_scan(db_ref(), binary(), binary(), range_scan_options()) -> +-spec range_scan(db_ref(), binary(), binary()|undefined, range_scan_options()) -> {ok, {itr_ref(), reference()}} | {error, any()}. range_scan(_DBRef, _StartKey, _EndKey, _Opts) -> erlang:nif_error({error, not_loaded}). --spec range_scan_ack(reference(), pos_integer()) -> ok. +-spec range_scan_ack(reference(), pos_integer()|0) -> ok | needs_reack. range_scan_ack(_Ref, _NumBytes) -> erlang:nif_error({error, not_loaded}). +-spec streaming_start(db_ref(), binary(), binary() | undefined, + streaming_options()) -> + {ok, stream_ref()} | {error, any()}. +streaming_start(_DBRef, _StartKey, _EndKey, _Opts) -> + erlang:nif_error({error, not_loaded}). + +-spec streaming_ack(binary(), pos_integer()) -> ok. +streaming_ack(_AckRef, _NumBytes) -> + erlang:nif_error({error, not_loaded}). + +-spec streaming_stop(binary()) -> ok. +streaming_stop(_AckRef) -> + erlang:nif_error({error, not_loaded}). + -type fold_fun() :: fun(({Key::binary(), Value::binary()}, any()) -> any()). range_scan_fold(Ref, Fun, Acc0, SKey, EKey, Opts) -> @@ -321,6 +385,38 @@ do_range_scan_fold(MsgRef, AckRef, Fun, Acc) -> lager:info("Range scan got unexpected message: ~p\n", [Msg]) end. +do_streaming_batch(<<>>, _Fun, Acc) -> + Acc; +do_streaming_batch(Bin, Fun, Acc) -> + {K, Bin2} = parse_string(Bin), + {V, Bin3} = parse_string(Bin2), + Acc2 = Fun({K, V}, Acc), + do_streaming_batch(Bin3, Fun, Acc2). + +do_streaming_fold(StreamRef = {MsgRef, AckRef}, Fun, Acc) -> + receive + {streaming_end, MsgRef} -> + Acc; + {streaming_batch, MsgRef, Batch} -> + Size = byte_size(Batch), + Acc2 = do_streaming_batch(Batch, Fun, Acc), + _ = streaming_ack(AckRef, Size), + do_streaming_fold(StreamRef, Fun, Acc2) + end. + +do_streaming_fold_test1(StreamRef = {MsgRef, AckRef}, Fun, Acc) -> + receive + {streaming_end, MsgRef} -> + Acc; + {streaming_batch, MsgRef, Batch} -> + Size = byte_size(Batch), + _ = streaming_ack(AckRef, Size), + do_streaming_fold(StreamRef, Fun, Acc) + end. + +current_usec() -> + erlang:nif_error({error, not_loaded}). + parse_string(Bin) -> parse_string(0, 0, Bin). @@ -334,10 +430,59 @@ parse_string(Size, Shift, <<0:1, N:7, Bin/binary>>) -> %% Fold over the keys and values in the database %% will throw an exception if the database is closed while the fold runs --spec fold(db_ref(), fold_fun(), any(), read_options()) -> any(). +-spec fold(db_ref(), fold_fun(), any(), fold_options()) -> any(). fold(Ref, Fun, Acc0, Opts) -> + case proplists:get_value(fold_method, Opts, iterator) of + iterator -> + {ok, Itr} = iterator(Ref, Opts), + do_itr_fold(Itr, Fun, Acc0, Opts); + streaming -> + SKey = proplists:get_value(first_key, Opts, <<>>), + EKey = proplists:get_value(last_key, Opts), + io:format("SKey = ~n"), + io:format(SKey), + io:format("~n"), + io:format("EKey = ~n"), + io:format(EKey), + io:format("~n"), + {ok, StreamRef} = streaming_start(Ref, SKey, EKey, Opts), + {_, AckRef} = StreamRef, + try + do_streaming_fold(StreamRef, Fun, Acc0) + after + %% Close early, do not wait for garbage collection. + streaming_stop(AckRef) + end + end. + +-spec foldtest1(db_ref(), fold_fun(), any(), fold_options()) -> any(). +foldtest1(Ref, Fun, Acc0, Opts) -> + case proplists:get_value(fold_method, Opts, iterator) of + iterator -> + {ok, Itr} = iterator(Ref, Opts), + do_itr_fold(Itr, Fun, Acc0, Opts); + streaming -> + SKey = proplists:get_value(first_key, Opts, <<>>), + EKey = proplists:get_value(last_key, Opts), + {ok, StreamRef} = streaming_start(Ref, SKey, EKey, Opts), + {_, AckRef} = StreamRef, + try + do_streaming_fold_test1(StreamRef, Fun, Acc0) + after + %% Close early, do not wait for garbage collection. + streaming_stop(AckRef) + end + end. + +emlfold1(Ref, _, _, Opts) -> {ok, Itr} = iterator(Ref, Opts), - do_fold(Itr, Fun, Acc0, Opts). + Start = proplists:get_value(first_key, Opts, first), + true = is_binary(Start) or (Start == first), + iterator_move(Itr, Start). + +emlfold2(Ref) -> + {ok, Itr} = iterator(Ref, []), + iterator_move(Itr, first). -type fold_keys_fun() :: fun((Key::binary(), any()) -> any()). @@ -346,7 +491,7 @@ fold(Ref, Fun, Acc0, Opts) -> -spec fold_keys(db_ref(), fold_keys_fun(), any(), read_options()) -> any(). fold_keys(Ref, Fun, Acc0, Opts) -> {ok, Itr} = iterator(Ref, Opts, keys_only), - do_fold(Itr, Fun, Acc0, Opts). + do_itr_fold(Itr, Fun, Acc0, Opts). -spec status(db_ref(), Key::binary()) -> {ok, binary()} | error. status(Ref, Key) -> @@ -356,13 +501,16 @@ status(Ref, Key) -> status_int(_Ref, _Key) -> erlang:nif_error({error, not_loaded}). +-spec async_destroy(reference(), string(), open_options()) -> ok. +async_destroy(_CallerRef, _Name, _Opts) -> + erlang:nif_error({error, not_loaded}). + -spec destroy(string(), open_options()) -> ok | {error, any()}. destroy(Name, Opts) -> - eleveldb_bump:big(), - destroy_int(Name, Opts). - -destroy_int(_Name, _Opts) -> - erlang:nif_error({erlang, not_loaded}). + CallerRef = make_ref(), + Opts2 = add_open_defaults(Opts), + async_destroy(CallerRef, Name, Opts2), + ?WAIT_FOR_REPLY(CallerRef). repair(Name, Opts) -> eleveldb_bump:big(), @@ -447,7 +595,7 @@ add_open_defaults(Opts) -> end. -do_fold(Itr, Fun, Acc0, Opts) -> +do_itr_fold(Itr, Fun, Acc0, Opts) -> try %% Extract {first_key, binary()} and seek to that key as a starting %% point for the iteration. The folding function should use throw if it @@ -507,7 +655,15 @@ ts_batch_to_binary({ts_batch, Family, Series, Points}) -> B3 = append_string(Series, B2), append_points(Points, B3). -ts_key({Family, Series, Time}) -> +ts_key(List) when is_list(List) -> + ts_l2(lists:reverse(List), <<>>). + +ts_l2([], Acc) -> + Acc; +ts_l2([H | T], Acc) -> + ts_l2(T, append_string(H, Acc)). + +ts_key_TEST({Family, Series, Time}) -> B1 = <>, B2 = append_string(Family, B1), append_string(Series, B2). @@ -625,13 +781,23 @@ values() -> eqc_gen:non_empty(list(binary())). ops(Keys, Values) -> - {oneof([put, delete]), oneof(Keys), oneof(Values)}. + {oneof([put, async_put, delete]), oneof(Keys), oneof(Values)}. apply_kv_ops([], _Ref, Acc0) -> Acc0; apply_kv_ops([{put, K, V} | Rest], Ref, Acc0) -> ok = eleveldb:put(Ref, K, V, []), apply_kv_ops(Rest, Ref, orddict:store(K, V, Acc0)); +apply_kv_ops([{async_put, K, V} | Rest], Ref, Acc0) -> + MyRef = make_ref(), + Context = {my_context, MyRef}, + ok = eleveldb:async_put(Ref, Context, K, V, []), + receive + {Context, ok} -> + apply_kv_ops(Rest, Ref, orddict:store(K, V, Acc0)); + Msg -> + error({unexpected_msg, Msg}) + end; apply_kv_ops([{delete, K, _} | Rest], Ref, Acc0) -> ok = eleveldb:delete(Ref, K, []), apply_kv_ops(Rest, Ref, orddict:store(K, deleted, Acc0)). diff --git a/src/eleveldb_ts.erl b/src/eleveldb_ts.erl new file mode 100644 index 00000000..c6ae4af7 --- /dev/null +++ b/src/eleveldb_ts.erl @@ -0,0 +1,82 @@ +-module(eleveldb_ts). + +-export([ + encode_key/1, + encode_record/1, + decode_record/1 + ]). + +-ifdef(TEST). +-compile(export_all). +-include_lib("eunit/include/eunit.hrl"). +-endif. + +encode_key(Elements) when is_list(Elements) -> + encode_k2(Elements, <<>>). + +%% TODO recheck this packing +%% binary list stuff is shocking here too +encode_k2([], Bin) -> Bin; +encode_k2([{timestamp, Ts} | T], Bin) -> encode_k2(T, append(<>, Bin)); +encode_k2([{float, F} | T], Bin) -> encode_k2(T, append(<>, Bin)); +encode_k2([{int, I} | T], Bin) -> encode_k2(T, append(<>, Bin)); +encode_k2([{binary, B} | T], Bin) when is_binary(B) -> + encode_k2(T, append(B, Bin)); +encode_k2([{binary, L} | T], Bin) when is_list(L) -> + B = list_to_binary(L), + encode_k2(T, append(B, Bin)). + +encode_record(Record) -> + msgpack:pack(Record, [{format, jsx}]). + +decode_record(Bin) when is_binary(Bin) -> + {ok, Record} = msgpack:unpack(Bin, [{format, jsx}]), + Record. + + +%% +%% Internal Funs +%% + +append_varint(N, Bin) -> + N2 = N bsr 7, + case N2 of + 0 -> + C = N rem 128, + <>; + _ -> + C = (N rem 128) + 128, + append_varint(N2, <>) + end. + +append(S, Bin) when is_binary(S) andalso + is_binary(Bin) -> + L = byte_size(S), + B2 = append_varint(L, Bin), + <>. + +%% =================================================================== +%% EUnit tests +%% =================================================================== +-ifdef(TEST). + +simple_encode_key_test() -> + Key = [{timestamp, 1}, {binary, <<"abc">>}, {float, 1.0}, {int, 9}], + Got = encode_key(Key), + Exp = <<8,0,0,0,0,0,0,0,01,3,97,98,99,8,63,240,0,0,0,0,0,0,8,0,0,0,0,0,0,0,9>>, + ?assertEqual(Exp, Got). + +simple_encode_record_test() -> + Rec = [{<<"field_1">>, 123}, {<<"field_2">>, "abdce"}], + Got = encode_record(Rec), + Exp = <<130,167,102,105,101,108,100,95,49,123,167,102,105,101,108,100,95,50,149,97,98,100,99,101>>, + ?assertEqual(Exp, Got). + +simple_decode_record_test() -> + Rec = <<130,167,102,105,101,108,100,95,49,123,167,102,105,101,108,100,95,50,149,97,98,100,99,101>>, + {ok, Got} = decode_record(Rec), + Exp = [{<<"field_1">>, 123}, {<<"field_2">>, "abdce"}], + ?assertEqual(Exp, Got). + + +-endif. diff --git a/test/range_scan.erl b/test/range_scan.erl index 68b07179..8fc26214 100644 --- a/test/range_scan.erl +++ b/test/range_scan.erl @@ -59,9 +59,12 @@ test_range_query(Ref) -> end, lists:seq(0,9)) end}. +get_ts_key(Key) -> + eleveldb:ts_key_TEST({?FAMILY, ?SERIES, Key}). + read_items_with_level_filter(Ref, Start0, End0, F1, F2) -> - Start = eleveldb:ts_key({?FAMILY, ?SERIES, Start0}), - End = eleveldb:ts_key({?FAMILY, ?SERIES, End0}), + Start = eleveldb:ts_key_TEST({?FAMILY, ?SERIES, Start0}), + End = eleveldb:ts_key_TEST({?FAMILY, ?SERIES, End0}), Opts = [{range_filter, {"and", [ {">=", [{field, "field_1"}, {const, F1}]}, {"<=", [{field, "field_2"}, {const, F2}]} @@ -76,8 +79,8 @@ calc_expected_rows(End0, F2, Start0, F1) -> lists:min([End0, F2]) - lists:max([Start0, F1]) + 1. read_items_with_erlang_filter(Ref, Start0, End0, F1, F2) -> - Start = eleveldb:ts_key({?FAMILY, ?SERIES, Start0}), - End = eleveldb:ts_key({?FAMILY, ?SERIES, End0}), + Start = eleveldb:ts_key_TEST({?FAMILY, ?SERIES, Start0}), + End = eleveldb:ts_key_TEST({?FAMILY, ?SERIES, End0}), Opts = [], ExpectedRows = calc_expected_rows(End0, F2, Start0, F1), time_result("With Erlang Filtering", fun() ->