-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpartitionings.tex
364 lines (308 loc) · 26.9 KB
/
partitionings.tex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
\section{Worst-case optimal join parallelization}\label{sec:worst-case-optimal-join-parallelization}
Based on the fact that Shares is an optimal partitioning scheme for n-ary joins in MapReduce like systems~\cite{shares} and
our analysis that Shares converges to a full broadcast of the graph edges (see \cref{subsubsec:shares}), we decided
to forego physical partitioning of the graph.
We cache the graph in memory such that each Spark task can access the whole graph.
Then, we experiment with multiple \textit{logical} partitioning schemes which ensure that each task processes
only some parts of the graph.
This design has a big advantage over physical partitionings.
Each worker holds the full edge relationship, therefore, it can answer any possible query without needing to shuffle data or
materializing new data structures for the \textit{LeapfrogTriejoin}, e.g. sorted arrays or CSR representations.
Arranging the data into suitable data structures and shuffling data is a one-off action on system startup.
This design allows us to implement a new flavour of the Shares partitioning in which we filter the vertices of the
graph on-the-fly while processing it with our GraphWCOJ algorithm.
We describe this contribution in \cref{subsec:shares-logical}.
We also consider partitioning the work based on a single variable.
Here we use the values of the bindings for this variable to determine if a worker processes a specific part of the join.
Furthermore, we consider a work-stealing based partitioning which does not replicate any work and produces less
skew than Shares.
This comes at the price of implementing work-stealing on Spark.
The design of work-stealing in Spark is described in \cref{subsec:work-stealing}.
In \cref{subsec:statical-partitioning-experiment}, we compare \textit{logical} Shares, range based, \textit{logical} Shares
and single variable partitioning in terms of scalability and skew resilience.
\subsection{Single variable partitioning} \label{subsec:single-variable-partitioning}
As first baseline, we implement partitioning along a single variable.
We partition the values of this variable into as many ranges as the desired level of parallelism.
Each variable can take $V$ values where $V$ is the number of vertices in the graph.
So, if we have $w$ workers and partition along the first variable, the first worker processes the first $\frac{V}{w}$ values for
bindings of the first variable and ignores the rest.
We implement this partitioning as a range based filter on the \textit{TrieIterators} of the join.
A range filtered \textit{TrieIterator} interface is trivially implemented by changing the \textit{open} method to seek
for the upper bound of the first value in the range and rewrite the \textit{atEnd} method to return true once the key
value is higher than the upper bound of the range.
We chose a range filter because it is easily pushed into the \textit{TrieIterator} interface and cheap to compute.
As opposed to a hash-based filter which we found not suitable to be pushed into the \textit{TrieIterator} interface (see
\cref{subsec:shares-logical}) and more expensive to compute.
The single variable partitioning is interesting because it allows us to trade duplicated work against skew resilience.
A partitioning on the first variable to bind in the join is free of any duplicated work.
Partitionings based on any other variable run the same work on each worker up to the depth of the variable to partition on but
are duplication free after.
These partitioning on later variables tend to be more skew resilient because there are fewer variables still to bind, which are often
restricted by the bindings of earlier variables.
In particular, partitioning on the second variable is interesting because bindings for the first variable are cheap to compute:
they are the scan of the first level of a \textit{TrieIterator}.
\subsection{Logical Shares} \label{subsec:shares-logical}
We assume that the reader is familiar with~\cref{subsec:myria} where we describe the Shares and Myria in more detail.
The idea of Shares can also be used for a \textit{logical} partitioning scheme.
Instead, of partitioning the graph before computing the join, we determine if a tuple should be considered by the
join on-the-fly.
We do so by assigning a coordinate of a hypercube to each worker.
Then each worker is responsible for the tuples which match its coordinate as in the original Shares.
Filtering tuples on-the-fly in the \textsc{LFTJ} comes with a challenge: in the \textit{LeapfrogTriejoin} we do not consider whole tuples
but only single attributes of a tuple at the time,
e.g. a \textit{LeapfrogJoin} only considers one attribute and cannot determine the whole tuple to which this attribute belongs.
Fortunately, a tuple matches only if all attributes match the coordinate of the worker.
Hence, we can filter out a tuple if any of its attributes do not match.
For example, we can exclude a value in a \textit{LeapfrogJoin} without knowing the whole tuple.
Integrating Shares and \textsc{LFTJ} comes with two important design decisions.
First, the \textit{LeapfrogTriejoin} operates on a complete copy of the edge relationship.
Hence, we need to filter out the values that do not match the coordinate of the worker.
Second, we need to compute the optimal Hypercube configuration.
We describe our solutions below.
The first design decision is where to filter the values.
The \textit{LeapfrogTriejoin} consists out of multiple components which are composed as layers upon each other.
On top we have the \textit{LeapfrogTriejoin} which operates on one \textit{LeapfrogJoin} per attribute.
The \textit{LeapfrogJoins} uses multiple \textit{TrieIterators}.
Our first instinct is to push the filter as deep as possible into these layers.
We built a \textit{TrieIterator} that never returns a value which hash does not match the coordinate.
This is implemented by changing the \textit{next} and \textit{seek} methods such that they linearly
consider further values until they find a matching value if the return value of the original function does not match.
However, the resulting \textsc{LFTJ} was so slow that we abandoned this idea immediately.
We hypothesize that this is the case because the original \textit{next} and \textit{seek} method is now followed
by a linear search for a matching value.
Furthermore, many of these values are later dropped in the intersection of the \textit{LeapfrogJoin} which
can also be seen as a filter over the values of the \textit{TrieIterators}.
As we argue in~\cref{subsec:graphWCOJ-materalization}, the \textit{LeapfrogJoin} is a rather selective filter which
excludes most of its input in the process of building the intersection.
It does not make sense to push a less selective filter below a more selective filter.
With this idea in mind, we built a logical Shares implementation that filters the return values of the \textit{leapfrogNext}
method.
This is implemented as a decorator pattern around the original \textit{LeapfrogJoin}.
The use of the decorator pattern allows us to easily integrate Shares with the \textsc{LFTJ} while keeping it decoupled enough
to use other partitioning schemes.
The second design decision is how and where to compute the best hypercube configuration.
The how has been discussed extensively in former literature~\cite{shares,myria-detailed,shares-proof,shares-skew}.
We implement the exhaustive search algorithm used in the Myria system~\cite{myria-detailed}.
%In their paper they conclude that this is a `practical and efficient' solution.
%Given that the computation of the hypercube configuration can cost up to 45s for 96 workers, this is only
%the case for quite long-running queries.
In the interest of a simple solution, we compute the best configuration on the master before starting the Spark
tasks for the join.
We note that the exhaustive algorithm could be optimized easily and it would be worthwhile to introduce
a cache for common configurations.
Due to time constraints, we leave this to future work and keep our focus on the scaling behaviour of Shares.
To conclude, we succeeded to integrate Shares with \textit{LeapfrogTriejoin} and report our results in \cref{subsec:scaling-graphWCOJ}.
We cannot improve on the main weakness of Shares which is that it duplicates a lot of work.
Indeed, our design filters tuples only after the \textit{LeapfrogJoin}.
Therefore, all tuples are considered in the \textit{TrieIterator} and their binary search of the first variable.
This does not influence scaling much because only the correct logical partition of values for the first variable
are used as bindings in the \textit{LeapfrogTriejoin}.
This means they are still filtered early enough before most of the work happens.
We improve over a physical Shares by using the same CSR data structure for all \textit{TrieIterator}.
Therefore, we do not need to materialize a prefiltered data structure for each \textit{TrieIterator} and query which saves time and memory
if the partitions become large for bigger queries.
\subsubsection{RangeShares} \label{subsubsec:range-shares}
In the last section, we raised the point that our Shares implementation only filters out values after the
\textit{LeapfrogJoins}.
This is because a hash-bashed filter needs to consider single values one-by-one.
In this section, we explore the possibility to use range based filters which can be pushed into the \textit{TrieIterators}.
However, we warn the reader that this is a negative result.
It leads to high skew which hinders good scaling of this idea.
We observe that the general idea behind Shares is to introduce a mapping per attribute from the value space into the space of possible
hypercube coordinates, e.g. so far all Shares variants use a hash function per attribute to map the values onto the hypercube.
We investigate the possibility to use ranges as mapping functions, e.g. in a three-dimensional hypercube with three workers per dimension,
we could divide the value space into three ranges; a value matches a coordinate if it is in the correct range.
Contrary, to hash-based mappings which are checked value by value until one matches, a range check is a single conditional after each
\textit{seek} and \textit{next} function call.
Furthermore, this conditional is predictable for the processor because, for all but one call, the value is in range and returned.
So, contrary to hash-based filter we can push a range based filter into the \textit{TrieIterators}.
We implement this idea by dividing the vertice IDs per attribute into as many ranges as the size of the corresponding hypercube dimension.
For example, assume we have edge IDs from 0 to 899, three attributes and the hypercube dimension have the size 3, 2 and 2.
Then, we choose the ranges [0,300), [300,600) and [600,900) for the first attribute and the ranges [0,450) and [450,900) for the other
two attributes.
The worker with the coordinate (0, 0, 0) is then assigned the ranges [0,300), [0,450) and [0,450).
It configures its \textit{TrieIterators} accordingly such that they are limited to these ranges.
We run the first experiments to evaluate this idea.
We expect it to scale better than a hash-based Shares because it saves intersection work in the \textit{LeapfrogJoins}.
However, we find that high skew between the workers leads to much worse performance than a hash-based Shares.
The explanation is that if a worker is assigned the same range multiple times and this range turns out to take long to compute, it takes
much longer than all other workers.
We show the scaling behaviour of this scheme in the next section.
To mitigate this problem, we break down the vertice IDs into more ranges than there are workers in the hypercube dimension corresponding to
the attributes.
Then, we assign multiple ranges to each \textit{TrieIterator} in such a way that the overlap on the first two attributes equals the overlap
of a hash-based implementation and assign the ranges of the later attributes randomly such that all combinations are covered.
However, experiments still show a high skew: some workers find many more instances of the searched pattern in their ranges than others.
For the triangle query on \textit{LiveJournal}, we find that the fastest worker outputs only 0.4 times the triangles than the slowest worker.
We conclude that the pattern instances are unevenly distributed over the ranges of vertice IDs which leads to high skew in
a range based solution.
We show performance measurements in the coming section and stopped our investigation in this direction.
\subsection{Comparision of static partitioning schemes} \label{subsec:statical-partitioning-experiment}
In this section, we show how the static partitioning schemes implemented by us scale on the 3-clique query on the
\texttt{LiveJournal} dataset.
We partition the work according to logical Shares, on the first and second variable and according to both range-based, logical Shares schemes.
The first range based logical Shares scheme uses a single range per \textit{TrieIterator}, the second uses multiple ranges with improved
overlapping;
both have been described in the last section.
We call them \textit{SharesRange} and \textit{SharesRangeMulti}.
The speedup with up to 48 workers is shown in~\cref{fig:static-speedup}.
We measure the skew of a scheme as a relationship between the time it takes to compute the smallest and biggest partition.
\Cref{fig:static-skew} plots the skew for the different schemes, queries and levels of parallelism.
\begin{figure}
\subfloat[Speedup \label{fig:static-speedup}]{\includesvg[width=0.5\textwidth]{svg/graphWCOJ-scaling-other-schemes}}
\subfloat[Skew \label{fig:static-skew}]{\includesvg[width=0.5\textwidth]{svg/graphWCOJ-scaling-other-schemes-skew}}
\caption{3-clique on LiveJournal for logical Shares, range based shares and partitioning on the first and second variable.
Skew is measured as the relationship between the time it takes to compute the biggest and the smallest partition.
}
\end{figure}
We see that logical Shares scales better than all other schemes.
Partitioning on the second variable is slightly worse, while the other two schemes are a further behind.
Our explanation is that logical Shares and second variable partitioning inflect the lowest skew.
There is a strong correlation between the skew shown in~\cref{fig:static-skew} and speedup of~\cref{fig:static-speedup}.
Except for \textit{SharesRangeMulti}, the amount of skew relates directly to the scaling behaviour.
\textit{SharesRangeMulti} is more skew resilient than first variable partitioning and \textit{SharesRange} but does not scale
better.
That is because the first variable partitioning does not replicate any work.
Therefore, it's scaling is better even with higher skew between the workers.
The difference between the two range based Shares partitioning in archived speedup is minimal.
Most likely this are caused by the fact that \textit{SharesRangeMulti} is implemented as a decorator around the
\textit{TrieIterator} interfaces while \textit{SharesRange} is directly integrated into the \textit{TrieIterator} interface.
Also, we see that \textit{SharesRangeMulti} drastically reduces the skew compared to \textit{SharesRange} which confirms that our
optimization fulfils the intended goal.
However, the reduced skew does not translate in better scalability for the range based Shares partitioning.
We conclude that various static partitioning schemes do not scale well.
In the end, we find logical Shares is the best static partitioning scheme because it is best in managing skew which even beats work
replication free schemes as partitionings on the first variable.
We tried to improve hash-based logical Shares to push it deeper into the layers of the Leapfrog Triejoin algorithm but find that
range-based Shares cannot handle skew well enough to be a competitor for hash-based Shares.
Following these results, we look into dynamic parallelization schemes as work-stealing.
\subsection{Work-stealing} \label{subsec:work-stealing}
Normally, Spark uses static, physical partitioning of the data.
As we learned in the last section, that can lead to a trade-off between the ability to handle skew and duplicated work.
A standard approach to handle skew and unbalanced workloads is work-stealing.
For this, the work is not statically partitioned before-hand but organized in many smaller tasks which can be
solved by all workers.
Workers are either assigned an equal split of tasks and steal tasks from other workers when they are out of work or all tasks are arranged
in a queue accessible for all workers, so that workers can poll tasks from it whenever they are out of work.
In either way, this results in a situation where no task is guaranteed to be solved by a single worker and each worker only finishes
when no free tasks are left in the system.
Hence, the maximum amount of skew is roughly the size of the smallest task.
There is no duplicated work because different tasks should not overlap and each task exists only once in the whole system.
We first describe a work-stealing version designed for the Spark's local mode where all tasks are computed on a single machine.
Then we extend this design to the cluster mode of Spark.
Work-stealing requires two major design decisions from the developer.
First, how to organize the workload of a \textit{LeapfrogTriejoin} into tasks.
Second, how do workers get their tasks?
We address these questions in order by first describing our preferred solution and then their integration with Spark.
We conclude the section with an evaluation of the limitations of this integration.
For the local version, the Spark master and all executors are threads within the same JVM process.
This is important because it allows us to share data structures between multiple Spark tasks as normal JVM objects.
We discuss how the design can be extended Spark's \textit{cluster-mode} in \cref{subsubsec:distributed-work-stealing}.
The first design decision is the definition of a work-stealing task.
It is not necessary to define the tasks such that they have all the same size.
However, it is important to choose the task size small enough to avoid skew.
Furthermore, the tasks should not overlap so that work is not duplicated.
We choose to define a task as the work necessary to find all possible tuples for a single binding of the first join variable.
This is non-overlapping.
The task size can vary widely and is query dependent.
However, given the huge amount of tasks (as many as vertices in the graph), we believe this to be small enough.
We will evaluate this during our experiments.
The second design choice in work-stealing is how to hand tasks to workers.
For simplicity, we chose to use a shared, thread-safe queue that holds all tasks.
The main drawback of this solution is that the access to the queue has to be synchronized between all workers.
If there are too many workers contending for the critical section of polling a job from the queue, they can slow each other down.
However, the critical section is short because it includes only the call to the poll method of the queue.
Additionally, we decided to implement a batching scheme such that a single poll can assign multiple tasks to a worker.
This allows us to fine-tune how often a worker needs to return to the queue for new tasks.
It turns out that the work-stealing scheme as described above is straightforward to integrate into Spark.
We chose a Scala object\footnote{Methods and fields defined on a Scala object are the Scala equivalent to static methods and fields in
Java. Most importantly they are shared between all threads of the same JVM.} to hold a dictionary which associates an ID for each
query with a thread-safe queue instance.
This queue can be accessed by each Spark thread.
Due to the association between query and queue, it is possible to run multiple queries in parallel without interference.
The queue for a query is filled by the first Spark task that accesses it.
This can be implemented by a short synchronized code section at the beginning of all tasks.
It checks if the queue is empty and if so pushes one task per possible binding (all graph vertices) or batch of possible bindings.
The synchronized section is fast and only runs once when the tasks start.
Hence, it comes at negligible performance costs.
Once the queue is filled, we run our normal \textit{LeapfrogTriejoin} with filtered \textit{Leapfrog join} for the first attribute.
This filter is implemented as a decorator around the original \textit{Leapfrog join}.
The \textit{leapfrogNext} method of this decorator returns only values that are polled from the work-stealing queue before.
Our integration of work-stealing in Spark comes with some limitations.
We see it more as a proof-of-concept that work-stealing is a good choice for the parallelization of worst-case optimal joins in Spark than
as a solid implementation of work-stealing in Spark.
The latter is not possible within the time-frame of this thesis.
In the following, we discuss the constraints of our integration.
Work-stealing leads to an unforeseeable partitioning of the results: it is not possible to foresee which bindings end up in a certain
partition nor can we guarantee a specific partition size.
If the user relies on any specific partitioning, he needs to repartition the results after.
Moreover, we cannot guarantee to construct an equal partitioning over multiple runs of the same query.
If the user depends on a stable partitioning per query, he should cache the query after the worst-case optimal join execution.
We do not integrate our work-stealing scheme into the Spark scheduler but we provide a best-effort implementation because
we use all resources assigned to us as soon as they are available to us.
We can handle all scheduler decisions.
The first worker assigned fills the queue.
The worker who takes the last element from the queue sets a boolean that this query has been completed.
Hence, tasks that are started after the query has been computed, do not recompute the query.
We do not provide a fault-tolerant system.
We see two possibilities to make our system fault-tolerant.
First, one can stop all tasks if a single task fails and restart the computation with the last cached results before the worst-case
optimal join.
Second, one could extend the critical section of polling a queue value by the \textit{LeapfrogJoin} by two more operations: we peek
at the value from the queue without removing it, log the value in a set of values per task and then poll it and remove it from the queue.
With these operations, it is guaranteed that the master can reconstruct all values that a failed worker thread considered.
So, after a task failure, the master can add these values to the work queue again such that other tasks will redo the computations.
\subsubsection{Work-stealing in cluster mode} \label{subsubsec:distributed-work-stealing}
In this section, we describe a simple, yet promising design to integrate work-stealing with Spark's cluster mode.
We assume clusters in which one worker runs one executor.
This is the case in Databrick's clusters.
The main problem with distributing work-stealing is that Spark's executors cannot communicate with each other.
Therefore, we choose a communication free approach in which the tasks share work only with other tasks on the same
executor.
The work is statically partitioned in between multiple executors.
We cannot control how Spark schedules tasks on its executors.
The tasks for the join algorithm could be co-located on a single executor, balanced evenly between all of them or could be distributed
over multiple executors in any fashion between these two extremes\footnote{Spark's standalone scheduler default behaviour is to schedule
the tasks as evenly spread over all executors as possible.
Alternatively, the standalone mode offers the possibility to consolidate all tasks as much as possible on a single worker.
This can be controlled by the setting \texttt{spark.deploy.spreadOut}.}.
We need to use the slots assigned to our tasks as best as possible.
To achieve this goal, we would like to know how many tasks were scheduled on each worker.
With this knowledge, we can split the workload between executors such that each of them deals with $\frac{w}{e} \times t_e$
where $w$ is the workload, $e$ the number of executors and $t_e$ the number of tasks on executor $e$.
Spark's scheduler offers the so-called \textit{Barrier Execution Mode}.
In this mode, it schedules all tasks of a stage together; either all of them are scheduled at the same time or
none are scheduled.
When tasks are scheduled in this mode they have access to the location of all other tasks for this stage.
Hence, we can determine how many tasks were scheduled on how many executors from within each task.
Furthermore, we can tell on which executor the current task runs.
We distribute our local work-stealing approach by requesting barrier scheduling for the worst-case optimal join
operator.
Then, we can use the work-stealing design as described for the local mode on each executor by partitioning the
queue that holds all bindings for the first variable.
We experimented with two different partitioning schemes for the queue: round-robin and range-based partitioning.
However, both schemes lead to similar run-times and behaviours.
We choose the round-robin partitioning because it is generally more skew resilient.
We distribute work-stealing in Spark by partitioning the work queue over all executors with respect to the
number of tasks assigned on each of them.
This approach has two drawbacks.
First, we use the barrier mode which requires Spark to find enough available resources to schedule all tasks at the same time.
This is not a huge issue in our experiments where we run one join at the time.
However, it could be difficult to find enough open spots in a busy production cluster.
In particular, if it runs workloads with many small tasks.
We note that using the barrier mode is necessary because to distribute the queue we need to know how many executors are used and
how many tasks are on each of them.
We considered the alternative design to distribute the first variable bindings over the tasks by assigning $\frac{1}{t}$ bindings with
$t$ the number of tasks to each of them.
Then tasks that are scheduled on the same worker could detect each other dynamically via a shared data structure and share work.
However, this design is not recommendable because it cannot be determined when a single task should finish and return its resources as
it cannot check if other tasks have not been scheduled yet.
Therefore, each task would terminate once it finishes its list and cannot find a \textit{currently} co-located task.
This leads to a situation where we could end up with highly suboptimal work-stealing when the tasks run one after another and do not
share work at all.
To summarize, this design makes work-stealing highly dependent on Spark's scheduler where no parallelism can be guaranteed.
Using the barrier mode, we can guarantee the level of parallelism by setting the number of tasks.
Second, this scheme does not manage skew between executors.
If it turns out that the part of the work-stealing queue assigned to an executor requires significantly more work than
to the other executors, then this executor will dominate the overall run-time of the whole algorithm.
We establish if this is a problem in practice in our experiments~\cref{subsec:distributed-work-stealing}.