Skip to content
This repository has been archived by the owner on Mar 22, 2023. It is now read-only.

hashmap: move reserve() from base class to public method #937

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 18 additions & 14 deletions examples/concurrent_hash_map/concurrent_hash_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@

using namespace pmem::obj;

/* In this example we will be using concurrent_hash_map with p<int> type for
* both keys and values */
/* In this example we use concurrent_hash_map with p<int> type as
* both keys and values. */
using hashmap_type = concurrent_hash_map<p<int>, p<int>>;

const int THREADS_NUM = 30;

/* This is basic example and we only need to use concurrent_hash_map. Hence we
* will correlate memory pool root object with single instance of persistent
* pointer to hasmap_type */
* correlate memory pool root object with a single instance of persistent
* pointer to hashmap_type. */
struct root {
persistent_ptr<hashmap_type> pptr;
};
Expand Down Expand Up @@ -73,7 +73,7 @@ main(int argc, char *argv[])

r->runtime_initialize();

/* defragment the whole pool at the beginning */
/* Defragment the whole pool at the beginning. */
try {
r->defragment();
} catch (const pmem::defrag_error &e) {
Expand All @@ -86,11 +86,15 @@ main(int argc, char *argv[])
auto &map = *r;
std::cout << map.size() << std::endl;

/* We expect around 10 * THREADS_NUM items, so we reserve
* hashmap's capacity to speed up insert operations. */
map.reserve(10 * THREADS_NUM);

std::vector<std::thread> threads;
threads.reserve(static_cast<size_t>(THREADS_NUM));

/* Insert THREADS_NUM / 3 key-value pairs to the hashmap. This
* operation is thread-safe. */
/* Start THREADS_NUM/3 threads to insert key-value pairs
* to the hashmap. This operation is thread-safe. */
for (int i = 0; i < THREADS_NUM / 3; ++i) {
threads.emplace_back([&]() {
for (int i = 0; i < 10 * THREADS_NUM; ++i) {
Expand All @@ -100,8 +104,8 @@ main(int argc, char *argv[])
});
}

/* Erase THREADS_NUM /3 key-value pairs from the hashmap. This
* operation is thread-safe. */
/* Start THREADS_NUM/3 threads to erase key-value pairs
* from the hashmap. This operation is thread-safe. */
for (int i = 0; i < THREADS_NUM / 3; ++i) {
threads.emplace_back([&]() {
for (int i = 0; i < 10 * THREADS_NUM; ++i) {
Expand All @@ -110,8 +114,9 @@ main(int argc, char *argv[])
});
}

/* Check if given key is in the hashmap. For the time of an
* accessor life, the read-write lock is taken on the item. */
/* Start THREADS_NUM/3 threads to check if given key is
* in the hashmap. For the time of an accessor life,
* the read-write lock is taken on the item. */
for (int i = 0; i < THREADS_NUM / 3; ++i) {
threads.emplace_back([&]() {
for (int i = 0; i < 10 * THREADS_NUM; ++i) {
Expand All @@ -132,7 +137,7 @@ main(int argc, char *argv[])
t.join();
}
try {
/* defragment the whole pool at the end */
/* Defragment the whole pool at the end. */
map.defragment();
} catch (const pmem::defrag_error &e) {
std::cerr << "Defragmentation exception: " << e.what()
Expand Down Expand Up @@ -176,8 +181,7 @@ main(int argc, char *argv[])
map.free_data();

/* map.clear() // WRONG
* After free_data() concurrent hash map cannot be used
* anymore! */
* After free_data() hash map cannot be used anymore! */

transaction::run(pop, [&] {
delete_persistent<hashmap_type>(r);
Expand Down
57 changes: 31 additions & 26 deletions include/libpmemobj++/container/concurrent_hash_map.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ struct hash_map_node {

/**
* The class provides the way to access certain properties of segments
* used by hash map
* used by hash map.
*/
template <typename Bucket>
class segment_traits {
Expand Down Expand Up @@ -1298,28 +1298,9 @@ class hash_map_base {
return false;
}

/**
* Prepare enough segments for number of buckets
*/
void
reserve(size_type buckets)
{
if (buckets == 0)
return;

--buckets;

bool is_initial = this->size() == 0;

for (size_type m = mask(); buckets > m; m = mask())
enable_segment(
segment_traits_t::segment_index_of(m + 1),
is_initial);
}

/**
* Swap hash_map_base
* @throws std::transaction_error in case of PMDK transaction failed
* @throw std::transaction_error in case of PMDK transaction failed
*/
void
internal_swap(hash_map_base<Key, T, mutex_t, scoped_t> &table)
Expand Down Expand Up @@ -1575,7 +1556,7 @@ operator!=(const hash_map_iterator<Container, M> &i,
* Persistent memory aware implementation of Intel TBB concurrent_hash_map.
* The implementation is based on a concurrent hash table algorithm
* (https://arxiv.org/ftp/arxiv/papers/1509/1509.02235.pdf) where elements
* assigned to buckets based on a hash code are calculated from a key.
* are assigned to buckets based on a hash code calculated from a key.
* In addition to concurrent find, insert, and erase operations, the algorithm
* employs resizing and on-demand per-bucket rehashing. The hash table consists
* of an array of buckets, and each bucket consists of a list of nodes and a
Expand Down Expand Up @@ -1661,6 +1642,7 @@ class concurrent_hash_map
using hash_map_base::check_growth;
using hash_map_base::check_mask_race;
using hash_map_base::embedded_buckets;
using hash_map_base::enable_segment;
using hash_map_base::FEATURE_CONSISTENT_SIZE;
using hash_map_base::get_bucket;
using hash_map_base::get_pool_base;
Expand All @@ -1669,7 +1651,6 @@ class concurrent_hash_map
using hash_map_base::internal_swap;
using hash_map_base::layout_features;
using hash_map_base::mask;
using hash_map_base::reserve;
using tls_t = typename hash_map_base::tls_t;
using node = typename hash_map_base::node;
using node_mutex_t = typename node::mutex_t;
Expand Down Expand Up @@ -2273,7 +2254,7 @@ class concurrent_hash_map
* Clear hash map content
* Not thread safe.
*
* @throws pmem::transaction_error in case of PMDK transaction failure
* @throw pmem::transaction_error in case of PMDK transaction failure
*/
void clear();

Expand Down Expand Up @@ -2800,7 +2781,7 @@ class concurrent_hash_map
* Remove element with corresponding key
*
* @return true if element was deleted by this call
* @throws pmem::transaction_free_error in case of PMDK unable to free
* @throw pmem::transaction_free_error in case of PMDK unable to free
* the memory
* @throw pmem::transaction_scope_error if called inside transaction
*/
Expand Down Expand Up @@ -2879,6 +2860,30 @@ class concurrent_hash_map
return my_defrag.run();
}

/**
* Prepare enough segments for number of buckets.
*
* XXX: fixme
*/
void
reserve(size_type buckets)
{
if (buckets == 0)
return;

--buckets;

bool is_initial = this->size() == 0;

pool_base pop = get_pool_base();
pmem::obj::transaction::manual tx(pop);
for (size_type m = mask(); buckets > m; m = mask())
enable_segment(
segment_traits_t::segment_index_of(m + 1),
is_initial);
pmem::obj::transaction::commit();
}

/**
* Remove element with corresponding key
*
Expand All @@ -2889,7 +2894,7 @@ class concurrent_hash_map
* this function without constructing an instance of Key
*
* @return true if element was deleted by this call
* @throws pmem::transaction_free_error in case of PMDK unable to free
* @throw pmem::transaction_free_error in case of PMDK unable to free
* the memory
* @throw pmem::transaction_scope_error if called inside transaction
*/
Expand Down
9 changes: 9 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,15 @@ if(TEST_CONCURRENT_HASHMAP)
build_test(concurrent_hash_map_defrag concurrent_hash_map_defrag/concurrent_hash_map_defrag.cpp)
add_test_generic(NAME concurrent_hash_map_defrag TRACERS none)

build_test(concurrent_hash_map_reserve concurrent_hash_map_reserve/concurrent_hash_map_reserve.cpp)
add_test_generic(NAME concurrent_hash_map_reserve TRACERS none) # memcheck pmemcheck

if (NOT WIN32)
build_test(concurrent_hash_map_reserve_mock concurrent_hash_map_reserve/concurrent_hash_map_reserve_mock.cpp common/mock_tx_alloc.cc)
target_link_libraries(concurrent_hash_map_reserve_mock ${CMAKE_DL_LIBS})
add_test_generic(NAME concurrent_hash_map_reserve_mock TRACERS none)
endif()

# This test can NOT be run under helgrind as it will report wrong lock ordering. Helgrind is right about
# possible deadlock situation but that could only happen in case of wrong API usage.
build_test(concurrent_hash_map_deadlock concurrent_hash_map_deadlock/concurrent_hash_map_deadlock.cpp)
Expand Down
41 changes: 41 additions & 0 deletions tests/common/mock_tx_alloc.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// SPDX-License-Identifier: BSD-3-Clause
/* Copyright 2017-2020, Intel Corporation */

#include <cstdlib>
#include <dlfcn.h>
#include <errno.h>
#include <libpmemobj/tx_base.h>

#include "mock_tx_alloc.h"

size_t test_alloc_counter;

extern "C" PMEMoid pmemobj_tx_alloc(size_t size, uint64_t type_num);

PMEMoid
pmemobj_tx_alloc(size_t size, uint64_t type_num)
{
static auto real = (decltype(pmemobj_tx_alloc) *)dlsym(
RTLD_NEXT, "pmemobj_tx_alloc");

if (real == nullptr)
abort();

test_alloc_counter++;

return real(size, type_num);
}

PMEMoid
pmemobj_tx_xalloc(size_t size, uint64_t type_num, uint64_t flags)
{
static auto real = (decltype(pmemobj_tx_xalloc) *)dlsym(
RTLD_NEXT, "pmemobj_tx_xalloc");

if (real == nullptr)
abort();

test_alloc_counter++;

return real(size, type_num, flags);
}
8 changes: 8 additions & 0 deletions tests/common/mock_tx_alloc.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
// SPDX-License-Identifier: BSD-3-Clause
/* Copyright 2017-2020, Intel Corporation */

#include <cstddef>

#pragma once

extern size_t test_alloc_counter;
65 changes: 65 additions & 0 deletions tests/concurrent_hash_map_reserve/concurrent_hash_map_reserve.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// SPDX-License-Identifier: BSD-3-Clause
/* Copyright 2020, Intel Corporation */

#include "../concurrent_hash_map/concurrent_hash_map_string_test.hpp"
#include "unittest.hpp"

#define LAYOUT "concurrent_hash_map"

namespace nvobj = pmem::obj;

/*
* reserve -- basic reserve test of
* pmem::obj::concurrent_hash_map<pmem::obj::string, pmem::obj::string>
*/
void
reserve_test(nvobj::pool<root> &pop)
{
const size_t RESERVE_COUNT = 5000;
auto map = pop.root()->cons;

UT_ASSERT(map != nullptr);

map->runtime_initialize();

UT_ASSERT(map->bucket_count() < RESERVE_COUNT);
UT_ASSERTeq(map->size(), 0);
map->reserve(RESERVE_COUNT);
UT_ASSERTeq(map->size(), 0);
UT_ASSERT(map->bucket_count() >= RESERVE_COUNT);

map->clear();
}

static void
test(int argc, char *argv[])
{
if (argc < 1) {
UT_FATAL("usage: %s file-name", argv[0]);
}

const char *path = argv[1];
nvobj::pool<root> pop;

try {
pop = nvobj::pool<root>::create(path, LAYOUT,
200 * PMEMOBJ_MIN_POOL,
S_IWUSR | S_IRUSR);
pmem::obj::transaction::run(pop, [&] {
pop.root()->cons =
nvobj::make_persistent<persistent_map_type>();
});
} catch (pmem::pool_error &pe) {
UT_FATAL("!pool::create: %s %s", pe.what(), path);
}

reserve_test(pop);

pop.close();
}

int
main(int argc, char *argv[])
{
return run_test([&] { test(argc, argv); });
}
Loading