-
Notifications
You must be signed in to change notification settings - Fork 20
/
Copy pathexec.h
178 lines (149 loc) · 10 KB
/
exec.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
// Please refer to https://github.com/phaistos-networks/Trinity/wiki/Query-Execution-Engine-Internals
#pragma once
#include "docidupdates.h"
#include "index_source.h"
#include "matches.h"
#include "queries.h"
#include "similarity.h"
#include <future>
namespace Trinity {
enum class ExecFlags : uint32_t {
// If this set, then only the matching document IDs will be provided in MatchedIndexDocumentsFilter's subclass consider(const docid_t) call
// as opposed to when the default execution mode is selected where consider(matched_document &) is invoked instead, with rich information
// about any and all matched tokens etc.
//
// This is very helpful if you want to e.g just count or collect documents matching a query,
// or otherwise don't care for which of the terms (in case of ORs) matched the document, only for
// the documents(IDs) that match the query (so you won't get a chance to e.g compute a trinity/query score based on the matched terms).
//
// It is also helpful if you want to e.g build a prefix-search people search system(like LinkedIn's) where you want
// to match all users matching the query, and you really don't care
// for which of the terms (or their hits) to do so.
DocumentsOnly = 1,
// This flag selects a query execution mode that matches Lucene's, and can be useful for
// very specific use cases, like visual search, and in other cases where you prioritize faster execution over
// higher relevancy, which would be computed by having access to rich information Trinity tracks and provides in matched_document
// in the default execution mode.
//
// If this mode is selected, it will instead accumulate the scores of various iterators together into a "score" and invoke
// MatchedIndexDocumentsFilter's subclass consider(const docid_t, const double score)
//
// If your Similarity Scorer depends on index sources field statistics, check Trinity::merge() disableOptimizations parameter
AccumulatedScoreScheme = 2,
// If set, this doesn't track unique (termID, toNextSpan, flags) for MatchedIndexDocumentsFilter::queryIndicesTerms
// instead it tracks unique (termID, toNextSpan) -- that is, respects the older semantics.
// If you are not interested for that unique tripplet, but instead of the unique (termID, toNextSpan), you should use
// this flag. If set, query_index_term::flags will be set to 0.
// This is really only relevant if the default exec. mode is selected
// i.e neither DocumentsOnly nor AccumulatedScoreScheme are set in the passed flags to exec_query()
DisregardTokenFlagsForQueryIndicesTerms = 4
};
static inline void validate_flags(const uint32_t f) {
if (const auto mask = f & (unsigned(ExecFlags::DocumentsOnly) | unsigned(ExecFlags::AccumulatedScoreScheme)); mask && (mask & (mask - 1)))
throw Switch::invalid_argument("DocumentsOnly and AccumulatedScoreScheme are mutually exclusive modes");
}
void exec_query(const query &in, IndexSource *, masked_documents_registry *const maskedDocumentsRegistry, MatchedIndexDocumentsFilter *, IndexDocumentsFilter *const f = nullptr,
const uint32_t flags = 0,
Similarity::IndexSourceTermsScorer *scorer = nullptr);
// Handy utility function; executes query on all index sources in the provided collection in sequence and returns
// a vector with the match filters/results of each execution.
//
// You are expected to merge/reduce/blend them.
// It's trivial to do this in parallel using e.g std::async() or any other means of scheduling exec_query() for each index source in
// a different thread. See exec_query_par() for a possible implementation.
//
// Note that execution of sources does not depend on state of other sources - they are isolated so parallel processing them requires
// no coordination.
template <typename T, typename... Arg>
std::vector<std::unique_ptr<T>> exec_query(const query &in, IndexSourcesCollection *collection, IndexDocumentsFilter *f, const uint32_t flags, Arg &&... args) {
static_assert(std::is_base_of<MatchedIndexDocumentsFilter, T>::value, "Expected a MatchedIndexDocumentsFilter subclass");
const auto n = collection->sources.size();
std::vector<std::unique_ptr<T>> out;
validate_flags(flags);
for (size_t i{0}; i != n; ++i) {
auto source = collection->sources[i];
auto scanner = collection->scanner_registry_for(i);
auto filter = std::make_unique<T>(std::forward<Arg>(args)...);
exec_query(in, source, scanner.get(), filter.get(), f, flags);
out.push_back(std::move(filter));
}
return out;
}
// Parallel queries execution, using std::async()
// This variant also supports ExecFlags::AccumulatedScoreScheme
// You will need to provide a cs for this to work
template <typename T, typename... Arg>
std::vector<std::unique_ptr<T>> exec_query_par(const query &in,
IndexSourcesCollection *collection,
IndexDocumentsFilter *f,
const uint32_t flags,
Trinity::Similarity::IndexSourcesCollectionTermsScorer *cs,
Arg &&... args) {
static_assert(std::is_base_of<MatchedIndexDocumentsFilter, T>::value, "Expected a MatchedIndexDocumentsFilter subclass");
const auto n = collection->sources.size();
std::vector<std::unique_ptr<T>> out;
validate_flags(flags);
if (!n) {
return out;
}
const bool accumScoreScheme = flags & unsigned(ExecFlags::AccumulatedScoreScheme);
if (accumScoreScheme) {
if (!cs) {
throw Switch::invalid_argument("IndexSourcesCollectionTermsScorer not set");
}
// May or may not do something here
cs->reset(collection);
}
if (n == 1) {
// fast-path: single source
if (false == collection->sources[0]->index_empty()) {
auto source = collection->sources[0];
auto scanner = collection->scanner_registry_for(0);
auto filter = std::make_unique<T>(std::forward<Arg>(args)...);
std::unique_ptr<Similarity::IndexSourceTermsScorer> scorer;
if (accumScoreScheme) {
scorer.reset(cs->new_source_scorer(source));
}
exec_query(in, source, scanner.get(), filter.get(), f, flags, scorer.get());
out.push_back(std::move(filter));
}
return out;
}
std::vector<std::future<std::unique_ptr<T>>> futures;
// Schedule all but the first via std::async()
// we 'll handle the first here.
for (size_t i{1}; i != n; ++i) {
if (false == collection->sources[i]->index_empty()) {
futures.emplace_back(
std::async(std::launch::async, [&, accumScoreScheme](const uint32_t i) {
auto source = collection->sources[i];
auto scanner = collection->scanner_registry_for(i);
auto filter = std::make_unique<T>(std::forward<Arg>(args)...);
std::unique_ptr<Similarity::IndexSourceTermsScorer> scorer;
if (accumScoreScheme) {
scorer.reset(cs->new_source_scorer(source));
}
exec_query(in, source, scanner.get(), filter.get(), f, flags, scorer.get());
return filter;
},
i));
}
}
if (auto source = collection->sources[0]; false == source->index_empty()) {
auto scanner = collection->scanner_registry_for(0);
auto filter = std::make_unique<T>(std::forward<Arg>(args)...);
std::unique_ptr<Similarity::IndexSourceTermsScorer> scorer;
if (accumScoreScheme) {
scorer.reset(cs->new_source_scorer(source));
}
exec_query(in, source, scanner.get(), filter.get(), f, flags, scorer.get());
out.push_back(std::move(filter));
}
while (futures.size()) {
auto &f = futures.back();
out.push_back(std::move(f.get()));
futures.pop_back();
}
return out;
}
}; // namespace Trinity