Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve multithreaded performance with memory prefetching #861

Merged
merged 7 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 206 additions & 0 deletions src/dict.c
Original file line number Diff line number Diff line change
Expand Up @@ -1541,6 +1541,212 @@ dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, dictDefragFunctio
return v;
}


/* -------------------------- Dict Prefetching ------------------------------ */

typedef enum {
PrefetchBucket, /* Initial state, determines which hash table to use, and prefetch the table's bucket */
PrefetchEntry, /* Prefetches entries associated with the given key's hash */
PrefetchValue, /* Prefetches the value object of the entry found in the previous step*/
PrefetchValueData, /* Prefetches the value object's data (if applicable) */
PrefetchDone /* Indicates that prefetching for this key is complete */
} PrefetchState;

/************************************ State machine diagram for the prefetch operation. ********************************
start
┌────────▼─────────┐
┌─────────►│ PrefetchBucket ├────►────────┐
│ └────────┬─────────┘ no more tables -> done
| bucket|found |
│ | │
entry not found - goto next table ┌────────▼────────┐ │
└────◄─────┤ PrefetchEntry | ▼
┌────────────►└────────┬────────┘ │
| Entry│found │
│ | │
value not found - goto next entry ┌───────▼────────┐ |
└───────◄──────┤ PrefetchValue | ▼
└───────┬────────┘ │
Value│found │
| |
┌───────────▼──────────────┐ │
│ PrefetchValueData │ ▼
└───────────┬──────────────┘ │
| │
┌───────-─▼─────────────┐ │
│ PrefetchDone │◄────────┘
└───────────────────────┘
**********************************************************************************************************************/

typedef struct {
PrefetchState state; /* Current state of the prefetch operation */
int ht_idx; /* Index of the current hash table (0 or 1 for rehashing) */
uint64_t bucket_idx; /* Index of the bucket in the current hash table */
uint64_t key_hash; /* Hash value of the key being prefetched */
dictEntry *current_entry; /* Pointer to the current entry being processed */
} PrefetchInfo;

typedef struct {
PrefetchInfo prefetch_info[DictMaxPrefetchSize];
size_t current_batch_size; /* Number of keys in the current batch */
size_t cur_idx; /* Index of the current key being prefetched */
size_t keys_done; /* Number of keys that have been processed */
} PrefetchBatch;

static PrefetchBatch prefetchBatch; /* Global prefetch batch - holds the current batch of keys being prefetched */

static void incrCurIdx(void) {
prefetchBatch.cur_idx++;
if (prefetchBatch.cur_idx >= prefetchBatch.current_batch_size) {
prefetchBatch.cur_idx %= prefetchBatch.current_batch_size;
}
}

/* Prefetches the given pointer and move to the next key in the batch */
static void prefetch(void *ptr) {
__builtin_prefetch(ptr);
/* while the prefetch is in progress, we can continue to the next key */
incrCurIdx();
}

static void markDone(PrefetchInfo *info) {
info->state = PrefetchDone;
prefetchBatch.keys_done++;
}

static PrefetchInfo *getNextPrefetchInfo(void) {
while (prefetchBatch.prefetch_info[prefetchBatch.cur_idx].state == PrefetchDone) {
incrCurIdx();
}
return &prefetchBatch.prefetch_info[prefetchBatch.cur_idx];
}

static void initBatch(dict **keys_dicts, size_t num_keys, const void **keys) {
assert(num_keys <= DictMaxPrefetchSize);

prefetchBatch.current_batch_size = num_keys;
prefetchBatch.cur_idx = 0;
prefetchBatch.keys_done = 0;

/* Initialize the prefetch info */
for (size_t i = 0; i < prefetchBatch.current_batch_size; i++) {
PrefetchInfo *info = &prefetchBatch.prefetch_info[i];
if (!keys_dicts[i] || dictSize(keys_dicts[i]) == 0) {
info->state = PrefetchDone;
prefetchBatch.keys_done++;
continue;
}
info->ht_idx = -1;
info->current_entry = NULL;
info->state = PrefetchBucket;
info->key_hash = dictHashKey(keys_dicts[i], keys[i]);
}
}

/* dictPrefetch - Prefetches dictionary data for an array of keys
*
* This function takes an array of dictionaries and keys, attempting to bring
* data closer to the L1 cache that might be needed for dictionary operations
* on those keys.
*
* dictFind Algorithm:
* 1. Evaluate the hash of the key
* 2. Access the index in the first table
* 3. Walk the linked list until the key is found
* If the key hasn't been found and the dictionary is in the middle of rehashing,
* access the index on the second table and repeat step 3
*
* dictPrefetch executes the same algorithm as dictFind, but one step at a time
* for each key. Instead of waiting for data to be read from memory, it prefetches
* the data and then moves on to execute the next prefetch for another key.
*
* dictPrefetch can be invoked with a callback function, get_val_data_func,
* to bring the key's value data closer to the L1 cache as well. */
void dictPrefetch(dict **keys_dicts, size_t num_keys, const void **keys, void *(*get_val_data_func)(const void *val)) {
initBatch(keys_dicts, num_keys, keys);

while (prefetchBatch.keys_done < prefetchBatch.current_batch_size) {
PrefetchInfo *info = getNextPrefetchInfo();
size_t i = prefetchBatch.cur_idx;
switch (info->state) {
case PrefetchBucket:
/* Determine which hash table to use */
if (info->ht_idx == -1) {
info->ht_idx = 0;
} else if (info->ht_idx == 0 && dictIsRehashing(keys_dicts[i])) {
info->ht_idx = 1;
} else {
/* No more tables left - mark as done. */
markDone(info);
break;
}

/* Prefetch the bucket */
info->bucket_idx = info->key_hash & DICTHT_SIZE_MASK(keys_dicts[i]->ht_size_exp[info->ht_idx]);
prefetch(&keys_dicts[i]->ht_table[info->ht_idx][info->bucket_idx]);
info->current_entry = NULL;
info->state = PrefetchEntry;
break;

case PrefetchEntry:
/* Prefetch the first entry in the bucket */
if (info->current_entry) {
/* We already found an entry in the bucket - move to the next entry */
info->current_entry = dictGetNext(info->current_entry);
} else {
/* Find the first entry in the bucket */
info->current_entry = keys_dicts[i]->ht_table[info->ht_idx][info->bucket_idx];
}

if (info->current_entry) {
prefetch(info->current_entry);
info->state = PrefetchValue;
} else {
/* No entry found in the bucket - try the bucket in the next table */
info->state = PrefetchBucket;
}
break;

case PrefetchValue: {
/* Prefetch the entry's value. */
void *value = dictGetVal(info->current_entry);

if (dictGetNext(info->current_entry) == NULL && !dictIsRehashing(keys_dicts[i])) {
/* If this is the last element we assume a hit and dont compare the keys */
prefetch(value);
info->state = PrefetchValueData;
break;
}

void *current_entry_key = dictGetKey(info->current_entry);
if (keys[i] == current_entry_key || dictCompareKeys(keys_dicts[i], keys[i], current_entry_key)) {
/* If the key is found, prefetch the value */
prefetch(value);
info->state = PrefetchValueData;
} else {
/* Move to next entry */
info->state = PrefetchEntry;
}
break;
}

case PrefetchValueData: {
/* Prefetch value data if available */
if (get_val_data_func) {
void *value_data = get_val_data_func(dictGetVal(info->current_entry));
if (value_data) prefetch(value_data);
}
markDone(info);
break;
}

default: assert(0);
}
}
}

/* ------------------------- private functions ------------------------------ */

/* Because we may need to allocate huge memory chunk at once when dict
Expand Down
4 changes: 3 additions & 1 deletion src/dict.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@
#define DICT_ERR 1

/* Hash table parameters */
#define HASHTABLE_MIN_FILL 8 /* Minimal hash table fill 12.5%(100/8) */
#define HASHTABLE_MIN_FILL 8 /* Minimal hash table fill 12.5%(100/8) */
#define DictMaxPrefetchSize 16 /* Limit of maximum number of dict entries to prefetch */
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How did we reach to this number? Should we provide a config for this to modify (mutable/immutable) based on the workload?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a heuristic, as is the entire feature. We found it to be optimal for GET commands with a value size of 512 bytes. However, the optimal setting may vary depending on the client load, command execution duration, and number of keys per command. I'm not sure about making this configuration modifiable, as determining an optimal value is complex.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A general concern I have is how much this might vary between different architectures. I know some of this comes from us fitting to the AWS graviton instances, but it could be very different from intel/amd/etc in addition to the workload specifics you mentioned. I guess I would like to see us at least set up a benchmark and test to see how widely it can vary, If the difference between most workloads is 2x, i.e. the optimal value is between 8-32, picking 16 is probably okay. If the optimal value goes between like 2 - 100, then maybe we should reconsider.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will run some benchmarks, but I believe the optimal batch size depends much more on the specific workload. For example, if we run expensive commands against one big hashmap with expensive operations, we may want to decrease the batch size, as the prefetched items for the next commands may be pushed out of memory. Conversely, with MSET involving many small keys, the batch size can be larger. Maybe we should consider giving users the option to fine-tune this parameter and set the default to 16?"


typedef struct dictEntry dictEntry; /* opaque */
typedef struct dict dict;
Expand Down Expand Up @@ -247,6 +248,7 @@ unsigned long
dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, dictDefragFunctions *defragfns, void *privdata);
uint64_t dictGetHash(dict *d, const void *key);
void dictRehashingInfo(dict *d, unsigned long long *from_size, unsigned long long *to_size);
void dictPrefetch(dict **keys_dicts, size_t num_keys, const void **keys, void *(*get_val_data_func)(const void *val));

size_t dictGetStatsMsg(char *buf, size_t bufsize, dictStats *stats, int full);
dictStats *dictGetStatsHt(dict *d, int htidx, int full);
Expand Down
6 changes: 4 additions & 2 deletions src/fmtargs.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@
/* Everything below this line is automatically generated by
* generate-fmtargs.py. Do not manually edit. */

#define ARG_N(_1, _2, _3, _4, _5, _6, _7, _8, _9, _10, _11, _12, _13, _14, _15, _16, _17, _18, _19, _20, _21, _22, _23, _24, _25, _26, _27, _28, _29, _30, _31, _32, _33, _34, _35, _36, _37, _38, _39, _40, _41, _42, _43, _44, _45, _46, _47, _48, _49, _50, _51, _52, _53, _54, _55, _56, _57, _58, _59, _60, _61, _62, _63, _64, _65, _66, _67, _68, _69, _70, _71, _72, _73, _74, _75, _76, _77, _78, _79, _80, _81, _82, _83, _84, _85, _86, _87, _88, _89, _90, _91, _92, _93, _94, _95, _96, _97, _98, _99, _100, _101, _102, _103, _104, _105, _106, _107, _108, _109, _110, _111, _112, _113, _114, _115, _116, _117, _118, _119, _120, N, ...) N
#define ARG_N(_1, _2, _3, _4, _5, _6, _7, _8, _9, _10, _11, _12, _13, _14, _15, _16, _17, _18, _19, _20, _21, _22, _23, _24, _25, _26, _27, _28, _29, _30, _31, _32, _33, _34, _35, _36, _37, _38, _39, _40, _41, _42, _43, _44, _45, _46, _47, _48, _49, _50, _51, _52, _53, _54, _55, _56, _57, _58, _59, _60, _61, _62, _63, _64, _65, _66, _67, _68, _69, _70, _71, _72, _73, _74, _75, _76, _77, _78, _79, _80, _81, _82, _83, _84, _85, _86, _87, _88, _89, _90, _91, _92, _93, _94, _95, _96, _97, _98, _99, _100, _101, _102, _103, _104, _105, _106, _107, _108, _109, _110, _111, _112, _113, _114, _115, _116, _117, _118, _119, _120, _121, _122, N, ...) N

#define RSEQ_N() 120, 119, 118, 117, 116, 115, 114, 113, 112, 111, 110, 109, 108, 107, 106, 105, 104, 103, 102, 101, 100, 99, 98, 97, 96, 95, 94, 93, 92, 91, 90, 89, 88, 87, 86, 85, 84, 83, 82, 81, 80, 79, 78, 77, 76, 75, 74, 73, 72, 71, 70, 69, 68, 67, 66, 65, 64, 63, 62, 61, 60, 59, 58, 57, 56, 55, 54, 53, 52, 51, 50, 49, 48, 47, 46, 45, 44, 43, 42, 41, 40, 39, 38, 37, 36, 35, 34, 33, 32, 31, 30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0
#define RSEQ_N() 122, 121, 120, 119, 118, 117, 116, 115, 114, 113, 112, 111, 110, 109, 108, 107, 106, 105, 104, 103, 102, 101, 100, 99, 98, 97, 96, 95, 94, 93, 92, 91, 90, 89, 88, 87, 86, 85, 84, 83, 82, 81, 80, 79, 78, 77, 76, 75, 74, 73, 72, 71, 70, 69, 68, 67, 66, 65, 64, 63, 62, 61, 60, 59, 58, 57, 56, 55, 54, 53, 52, 51, 50, 49, 48, 47, 46, 45, 44, 43, 42, 41, 40, 39, 38, 37, 36, 35, 34, 33, 32, 31, 30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0

#define COMPACT_FMT_2(fmt, value) fmt
#define COMPACT_FMT_4(fmt, value, ...) fmt COMPACT_FMT_2(__VA_ARGS__)
Expand Down Expand Up @@ -108,6 +108,7 @@
#define COMPACT_FMT_116(fmt, value, ...) fmt COMPACT_FMT_114(__VA_ARGS__)
#define COMPACT_FMT_118(fmt, value, ...) fmt COMPACT_FMT_116(__VA_ARGS__)
#define COMPACT_FMT_120(fmt, value, ...) fmt COMPACT_FMT_118(__VA_ARGS__)
#define COMPACT_FMT_122(fmt, value, ...) fmt COMPACT_FMT_120(__VA_ARGS__)

#define COMPACT_VALUES_2(fmt, value) value
#define COMPACT_VALUES_4(fmt, value, ...) value, COMPACT_VALUES_2(__VA_ARGS__)
Expand Down Expand Up @@ -169,5 +170,6 @@
#define COMPACT_VALUES_116(fmt, value, ...) value, COMPACT_VALUES_114(__VA_ARGS__)
#define COMPACT_VALUES_118(fmt, value, ...) value, COMPACT_VALUES_116(__VA_ARGS__)
#define COMPACT_VALUES_120(fmt, value, ...) value, COMPACT_VALUES_118(__VA_ARGS__)
#define COMPACT_VALUES_122(fmt, value, ...) value, COMPACT_VALUES_120(__VA_ARGS__)

#endif
12 changes: 12 additions & 0 deletions src/kvstore.c
Original file line number Diff line number Diff line change
Expand Up @@ -828,3 +828,15 @@ int kvstoreDictDelete(kvstore *kvs, int didx, const void *key) {
}
return ret;
}

void kvstoreDictPrefetch(kvstore **kvs,
int *slots,
const void **keys,
size_t keys_count,
void *(*get_val_data_func)(const void *val)) {
dict *dicts[keys_count];
for (size_t i = 0; i < keys_count; i++) {
dicts[i] = kvstoreGetDict(kvs[i], slots[i]);
}
dictPrefetch(dicts, keys_count, keys, get_val_data_func);
}
5 changes: 5 additions & 0 deletions src/kvstore.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ int kvstoreNumNonEmptyDicts(kvstore *kvs);
int kvstoreNumAllocatedDicts(kvstore *kvs);
int kvstoreNumDicts(kvstore *kvs);
uint64_t kvstoreGetHash(kvstore *kvs, const void *key);
void kvstoreDictPrefetch(kvstore **kvs,
int *slots,
const void **keys,
size_t keys_count,
void *(*get_val_data_func)(const void *val));

/* kvstore iterator specific functions */
kvstoreIterator *kvstoreIteratorInit(kvstore *kvs);
Expand Down
Loading
Loading