diff --git a/libursa/SortedRun.cpp b/libursa/SortedRun.cpp index 8881867..bcde0f1 100644 --- a/libursa/SortedRun.cpp +++ b/libursa/SortedRun.cpp @@ -6,7 +6,7 @@ #include "Utils.h" -// Read element currently under pos. +// Read VLE integer stored under pos. uint32_t run_read(uint8_t *pos) { uint64_t acc = 0; uint32_t shift = 0; @@ -20,7 +20,7 @@ uint32_t run_read(uint8_t *pos) { } } -// Move pos to the next element. +// Return a pointer to the next encoded integer. uint8_t *run_forward(uint8_t *pos) { for (;; pos++) { if ((*pos & 0x80) == 0) { @@ -71,7 +71,8 @@ void IntersectionHelper::step_single() { seq_it_++; } -// Read 8 bytes under run_it_. If all are small, handle them all. +// Read 8 bytes under run_it_. If all are small, intersect them all. +// Returns true if the method can continue, and false if a large int was found. bool IntersectionHelper::step_by_8() { constexpr int BATCH_SIZE = 8; constexpr uint64_t VLE_MASK = 0x8080808080808080UL; @@ -79,18 +80,22 @@ bool IntersectionHelper::step_by_8() { uint64_t *as_qword = (uint64_t *)run_it_; uint64_t hit = (*as_qword & VLE_MASK); if (hit != 0) { + // A large byte (>0x80) was found, handle them in a slow path. return false; } uint32_t after_batch = prev_ + BATCH_SIZE; after_batch += std::accumulate(run_it_, run_it_ + BATCH_SIZE, 0); + // Fast-fast path. Maybe we can just add all 8 bytes and still are + // below the next sequence byte (i.e. nothing to do in intersection). if (after_batch < *seq_it_) { run_it_ += BATCH_SIZE; prev_ = after_batch; return true; } + // Regular batch: like step_single but we know are only dealing with bytes. for (uint8_t *end = run_it_ + BATCH_SIZE; run_it_ < end && seq_it_ < seq_end_;) { uint32_t next = prev_ + *run_it_ + 1; @@ -109,6 +114,7 @@ bool IntersectionHelper::step_by_8() { return true; } +// Do the intersection in batches of 8 bytes at once. void IntersectionHelper::intersect_by_8() { while (run_it_ < run_end_ - 8 && seq_it_ < seq_end_) { if (step_by_8()) { @@ -118,6 +124,8 @@ void IntersectionHelper::intersect_by_8() { } } +// This function is basically std::set_intersection, but optimized as +// much as possible (since sometimes almost 50% of time is spent here). void IntersectionHelper::intersect() { intersect_by_8(); while (run_it_ < run_end_ && seq_it_ < seq_end_) {