-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrelated-work.tex
415 lines (337 loc) · 25.3 KB
/
related-work.tex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
\section{Related Work} \label{sec:related-work}
We summarize three closely related papers and point out similarities and differences with our
work.
The first paper describes another distribute \textsc{WCOJ} on Timely Dataflow.
The second, compares multiple worst-case optimal join algorithm parallelization
approaches to determine the best given a specific use-case.
Third we describe a general graph pattern mining system build on Spark which also
employs work-stealing.
\subsection{\textsc{WCOJ} on Timely Data Flow}\label{subsec:wcoj-timely-data-flow}
Mc Sherry et al. published a distributed worst-case optimal join based on Timely Data Flow in 2018~\cite{ammar2018distributed,naiad}.
In their paper they introduce three algorithms: \textit{BigJoin}, \textit{Delta-BigJoin} and \textit{BigJoin-S}.
They implement only the first two algorithms.
\textit{BigJoin-S} is only described not implemented but comes with stronger theoretical guarantees.
Namely, it is worst-case optimal in computation and communication with respect to the output size of the query
given by the AGM bound.
\textit{BigJoin-S} can guarantee work balance.
Moreover, it achieves optimality and work-balance while using low amounts of memory on all workers;
the memory usage per worker is in $\mathcal{O} (\frac{IN}{w})$ with $IN$ size of the input relationships and $w$ the number of workers.
The other two join algorithm are only worst-case optimal in computation and communication costs but
do not guarantee work-balance nor do they give the same low memory guarantees.
Although, in praxis, they achieve both on many real-world datasets.
\textit{Delta-BigJoin} is an incremental algorithm which computes the new instances of the subgraph given a batch of new edges.
Hence, it operates in a different setting than our work.
We assume static graphs while they operate on graphs with the ability to find new instances caused by insertions.
\textit{BigJoin} is closest to our work.
It has been implemented and is a worst-case optimal join for static graphs.
In the following paragraphs, we describe \textit{BigJoin}, analyse why it is not likely to be a good fit for Spark,
discuss and compare the index structures used in their work to represent the input relationships and compare the
guarantees are given by them and us.
\subsubsection{The \textit{BigJoin} algorithm}
\textit{BigJoin} encodes a \textit{Generic Join} (see~\cref{subsec:worst-case-optimal-join-algorithm}) into multiple
timely dataflow operators.
In short, Timely Dataflow operators are distributed over multiple workers and each of them takes a stream of input data, operates
on it and sends it to the next operator which can be processed on a different worker.
Examples for operators are \textit{map} functions, \textit{filters} \textit{count} or \textit{min}.
It is important to note that sending the output to an operator on a different worker is a fast, streaming operation, as opposed to,
Spark's shuffles which are synchronous and slow because they involve disk writes and reads.
For the \textit{BigJoin} the authors require each worker to hold an index for each input relationship which maps prefixes
of the global variable order to the possible bindings for the next variable.
In use-case of graph pattern matching, this means that each worker holds an index into the forward and backward adjacency lists.
Their algorithm runs in multiple rounds;
one per variable in the join query.
In each round, they bind one variable.
So each round takes the prefixes as input and fixes one more binding.
A single round starts with all prefixes from the former round distributed among all workers arbitrarily.
Then they find join relations that offers the smallest set of extensions for each prefix.
This is done in steps with one step per relationship.
In each step, the prefix is sent to a worker by the hash of the attributes bound in the relationship of that step.
When the relationship offers less possible values for the new binding then the current minimum, i.e. the size of its matching adjacency
the list is smaller, we remember it as the new minimum for the given prefix.
Next, they hash the values of the prefix which are defined in the relationship with the least extensions and use these hashes to
distribute the tuples over all workers.
Then, each worker produces all possible extensions for each prefix.
Finally, each round ends with filtering out all extensions that are not in the intersection of extensions offered by each relationship.
This again takes one filtering step per relationship in the join.
This is a simple instance of the \textit{Generic Join} implemented in data flow operators.
The algorithm described so far can build a high amount of possible extensions in each round.
This keeps it from keeping worst-case optimal guarantees for memory usage.
The authors fix this problem by batching the prefixes:
they allow only a certain number of prefixes in the system at all times.
They defer building new prefixes until the current batch of prefixes has been completed.
This is natively supported by Timely Dataflow.
\subsubsection{Applicability to Spark and comparison to GraphWCOJ}
\textit{BigJoin} is not suitable for Spark.
This has multiple reasons.
Most importantly, it uses too many shuffle rounds.
Each round and each step in a round requires communication and therefore a shuffle.
In total, the algorithm uses $2R \times V$ rounds for a query with $R$ relations and $V$ variables.
As pointed out before this is no big problem in Timely Dataflow because shuffles are fast and asynchronous.
However, in Spark, this is not the case.
We would like to point out that binary join plans can solve the same queries in $R - 1$ shuffle rounds and
that our solution does not require any communication rounds.
Second, Spark does not support batching queries naturally as Timely Dataflow.
Building support for batching into Spark would be an engineering effort.
Additionally, it would be hard to define a good user interface over a batched query in Spark.
\textsc{GraphWCOJ} does not require batching because it only processes at most as many prefixes as workers in the system in parallel.
Therefore, we do not have the problem of memory pressure to remember prefixes.
This is because the \textsc{LFTJ} algorithm is a non-materialized representation of the join.
When the Leapfrog Triejoin is executed, it changes its state such that the state always represents the non-materialized
part of the join;
the state is encoded in the positions of the \textit{TrieIterators}.
In other words, the \textsc{LFTJ} performs a depth-first search of all possible bindings while the \textit{BigJoin}
performs a batched breadth-first search.
\subsubsection{Indices used by \textit{BigJoin} and GraphWCOJ}
The index structures used in their and our solutions are the same; one forward and one backward index over the whole graph on each
worker.
It is possible to distribute the index of \textit{BigJoin} such that each worker holds only a part
of the index.
This is because each worker needs to hold only the possible extensions for the prefixes that map to it for each relationship.
We analyse this in the next paragraph.
The prefixes are mapped to workers by the hash of the attributes already bound.
For graph pattern matching this is one or zero attributes;
the edge relationship has two attributes and one is a new, yet unbound variable in the prefix.
We can reach a distribution of the indices such that each worker holds $\frac{I}{w}$ with $I$ the size of the indices.
For that, we choose the same hash function for each variable such that always the same values match to the same worker.
However, this solution is likely to lead to high skew and work imbalance because if a value is a heavy hitter the
worker needs to process it for each binding over and over.
It is better to use different hash functions per variable.
In this case, we can estimate the percentage of the whole index hold by each worker by the binomial distribution.
This distribution models the probability that out of $N$ independent trials $k$ succeed with the likelihood of $p$ for a single trial
to succeed.
We model the event of a key from the index being assigned to a worker as trial.
The likelihood is $\frac{1}{w}$.
We have as many trials as variables in the join: $N = V$.
We are interested in the case that the tuple is not assigned by any of the variables, so $k = 0$.
Then we have the likelihood that a tuple is not assigned given by $\mathcal{B} (V, 0, \frac{1}{w})$;
so the fraction of the indices assigned to each worker is $1 - \mathcal{B} (V, 0, \frac{1}{w})$.
We plot this function for different numbers of workers and variables in~\cref{fig:big-join-indices}.
The split of the indices held by each worker decreases drastically with the numbers of workers in
the system.
Hence, this partitioning scheme scales relatively well.
\begin{figure}
\centering
\includesvg[width=0.5\textwidth]{svg/big-join-indices}
\caption{Expected split of the indexes hold on each worker for different numbers of workers used
and variables in the query.
}
\label{fig:big-join-indices}
\end{figure}
\subsubsection{Theoretical guarantees}
\textit{BigJoin} guarantees computational and communication worst-case optimality.
However, the communication optimality does not take into account how the indices are generated on each worker.
If they are sent to each worker, this would be not worst-case optimal.
The extensions of \textit{BigJoin-S} additionally give work-balance and low memory usage in $\mathcal{O} (\frac{IN}{w})$.
GraphWCOJ guarantees computational worst-case optimality which it inherits from \textsc{LFTJ}.
Worst-case optimal communication is given by the fact that we do not communicate.
This is if we do not take the distribution of the indices into account which is in line with the analysis of the discussed paper.
If we take the distribution of indices into account, our algorithm is not worst-case optimal.
During our setup, we broadcast the indices used.
This is not optimal for a single query;
Shares would be optimal.
However, it amortizes quickly over multiple queries, while Shares converges to broadcasting for big queries.
We can not guarantee work-balance.
However, we get close to it by using work-stealing.
With work-stealing, we are optimal within the size of a single task.
GraphWCOJ's memory footprint does not depend on the size of the input nor of the output of the join.
Its memory usage is given by the size of the Java objects used which depend on the query.
However, this size should be neglectful small for all but embedded use-cases.
\subsubsection{Conclusion}
We conclude that our approach is the better fit for Spark because it requires less shuffling and no batching.
GraphWCOJ gives nearly the same theoretical guarantees as \textit{BigJoin}.
While they can distribute their indices we cannot;
we rely on the fact that each worker holds the complete index.
Finally, we would like to point out that~\cite{ammar2018distributed} does not publish any number on the amount
of network traffic caused by their algorithm.
Given that it sends many prefixes via the network this could be a bottleneck in many deployments, i.e. in cheaper instances
in the Amazon cloud.
An analysis of the network traffic would be beneficial for a better understanding of the advantages and disadvantages
of their approach.
% General comparision?
% rather not
% but if so, state tracked, number of prefixes in the system at any time,
% our system is monolith, theirs is built of simple operators
% Experiments
% Scaling of BigJoin not given, cannot be compared
% Single threaded on twitter graph (big one) (need to double check which) and LJ, LJ needs 6.5s
% BigJoin 8 workers 16 cores each, takes 3.4 s to find all triangles in LJ
% BigJoin 10 machines 16 cores each 4-clique, house, 5-clique. they do not report dataset, maybe i can find the dataset int
% seed paper
% no experiment regarding communication costs
% Implemenation: https://github.com/frankmcsherry/dataflow-join
\subsection{Survey and experimental analysis of distributed subgraph matching}
On the 28th July 2019, L. Lai et al. published a survey with experiments for multiple
distributed graph pattern matching algoritms~\cite{longbin}\footnote{The survey was published on arXiv 5 months after we started our
thesis in
February.}.
Here, we focus on four of the strategies they tested: \textit{BigJoin} (see \cref{subsec:wcoj-timely-data-flow}),
Shares, fully replicated graph and binary joins.
All of their algorithms are implemented in Timely Dataflow;
so far they are not open-source.
They ran the all algorithms on 9 different queries over 8 datasets mostly on a cluster of 10 machines
and 3 workers per machine.
Below we first summarize the most important design decisions for each algorithm, then
highlight their most interesting findings and finally compare their results with ours.
\textit{BigJoin} is implemented as described above but uses a \textsc{CSR} data structure,
triangle indexing and a specific form of compression as optimization.
The Shares algorithm is configured as described in~\cref{subsubsec:shares} and uses
\textit{DualSim} as the local algorithm.
\textit{DualSim} is a specialized subgraph matching algorithm.
The authors show that it beats the worst-case optimal join used in \textit{EmptyHeaded}~\cite{emptyheaded}
which is a form of the \textit{Generic Join} (see \cref{subsec:worst-case-optimal-join-algorithm}).
The survey also covers our strategy of fully replicating the graph on all machines.
They choose \textit{DualSim} as a local algorithm and a round-robin partitioning on the
second join variable.
Finally, they implement the binary joins with hash joins and use a sophisticated query optimizer
to devise the best join order.
The most important finding of this work is that fully replicating the graph on all machines
is the best option if the graph fits into memory even in Timely Dataflow with its deeply
optimized and asynchronous communication routines.
They establish that fully replicating the graph is nearly always the fastest strategy,
has the lowest memory footprint\footnote{Shares replicates too much data,
\textit{BigJoin} needs to hold many prefixes in memory and binary joins incur intermediary results.}, no
further
communication costs and scales better
than
all other strategies up to 60 workers.
In line with our argument against Shares, they find that this strategy is nearly always
beaten by most other strategies.
They establish that it takes longer than the other strategies on nearly all queries and datasets.
Furthermore, it shows the weakest ability to scale.
They report that \textit{BigJoin} or binary joins are the best option if fully
replicating the graph is out of the question.
Binary joins can be used for star and clique joins if it is possible to index all triangles in
the graph and keep this index in memory.
Otherwise, \textit{BigJoin} is preferable in most cases.
Finally, they study the communication costs of the binary joins, Shares and \textit{BigJoin}.
They find that graph pattern matching is computation bound problem when 10 GB switches are used
for networking but communication costs dominate in 1 GB switched networks.
They draw their conclusions from experiments run with 10 GB network infrastructure.
Interestingly, Shares incurs fewer communication costs than \textit{BigJoin}.
Their paper differs from our thesis in multiple ways.
We implement our system in Spark which has wide-spread usage in industry and a surrounding eco-system
of graph pattern matching systems (see \cref{subsec:graphs-on-spark}).
We give a comparison between a column store, binary search based Leapfrog Triejoin and
our \textsc{CSR} based GraphWCOJ.
They do not report on the benefits of \textsc{CSR} in the context of \textsc{WCOJ}.
Their implementation of the fully replicated strategy differs from ours in two important factors.
First, they use a different local algorithm called Dual Sim~\cite{dualsim}.
Second, they use a different partitioning scheme.
Their scheme replicates work of finding bindings for the first and second variable in a query and does not actively counter skew.
The skew-resilience of their scheme is based on the fact that it partitions the work on the second
binding.
Hence, it distributes skew of the first binding equally.
However, as we see with our work-stealing approach this does not guarantee skew freeness for
bigger queries (see \cref{subsec:scaling-graphWCOJ}).
Their scheme could be applied to our system.
It is simpler than work-stealing but less resilient to skew.
However, they come to the same conclusions, namely that fully replicating is the preferred strategy
when the graph fits into main memory and that Shares is not a good strategy for graph pattern matching.
\subsection{Fractal: a graph pattern mining system on Spark} \label{subsec:fractal}
Fractal is a general-purpose graph pattern mining system built on top of Spark published at SIGMOD'19~\cite{fractal}.
We first describe the relevant aspects of their system.
Then we compare it to our approach.
Graph pattern mining includes the problem of graph pattern matching (as defined in~\cref{subsec:graph-pattern-matching}).
Additionally, it includes problems such as frequent subgraph mining or keyword-based subgraph search.
To support all these problems in a single system, the authors describe their own programming interface made
off initialization operators, workflow operators and output operators.
Each workflow is described as a sequence of these operators.
All workflows are based around extending a subgraph starting from a single edge, vertex or a user-described
pattern.
This makes for three initialization operators one to start from a vertex, edge or pattern each.
The workflow operators process the subgraphs induced by the initialization operators.
They can expand the subgraph, e.g. if the subgraph is vertex induced, one expand step adds all neighbouring
vertices to the subgraph.
Another workflow operator is to filter the subgraph instances.
Then it is possible to aggregate subgraphs by computing a key, a value and possibly a reduction.
Finally, these workflow steps can be looped to be repeated multiple times.
To execute the workflow the user can use one out of two output operators: \textit{subgraphs} and
\textit{aggregation} to list all matching subgraphs or aggregate all matching subgraphs respectively.
For example, the workflow $vertexInduced().filter(sg => fully connected).subgraphs()$ enumerates
all cliques in a graph.
Fractal maps these workflows to Spark by splitting them into \textit{fractal steps} on
synchronization points, e.g. an aggregation which results is required in the next step.
Each step maps to a Spark job which is scheduled by the Spark scheduler.
Fractal schedules the \textit{fractal steps} in the correct order and waits for them to complete
before starting the next one.
A typical problem of graph pattern mining is the high amount of memory needed to keep partial matches;
the state of the algorithm.
Fractal counters this problem by enlisting subgraphs with a depth-first strategy.
Furthermore, it starts computing all subgraphs from scratch for each step, instead of keeping them in
memory in between the steps.
They only keep the results of the aggregations to be used by the next step.
Another problem in graph pattern mining is work-balance because some parts of the graph are
more work-intensive than others.
Fractal tackles this problem with work-stealing.
They use a hierarchical work-stealing approach.
First, each thread tries to steal work from another thread within the same Spark executor.
Only if this is not possible, they request work from another machine.
The local work-stealing is implemented by sharing the same subgraph enumerators;
an iterator-like data structure that saves the state of the subgraph matching algorithm in a prefix
match.
The subgraph enumerator offers a thread-safe method to generate the next prefix.
Hence, a thread can steal work simply by using the enumerator of another thread.
The second hierarchy of work-stealing is between multiple Spark workers.
The authors support that by using Akka to implement a simple message passing interface between
all Spark workers.
Their experiments show near-linear scaling for the described work-stealing strategy up to 280
execution threads over 10 machines.
Fractal is similar to our system in some aspects.
They also solve graph pattern matching on Spark, inspired our approach to work-stealing and
choose a depth-first subgraph enumeration approach.
We discuss these similarities below and outline the differences.
Like us, Fractal solves the problem of graph pattern matching.
However, they offer the ability to solve multiple other common subgraph related problems as well.
The biggest difference is that they directly support aggregation over subgraphs, which, for example,
allows them to solve frequent subgraph mining.
They build an independent system on top of Spark's infrastructure which comes with their own
imperative query language.
We integrate a single algorithm deeply into Spark's query optimizer.
Therefore, our contribution can be easily integrated into other graph systems building on Spark, e.g.
G-Core~\cite{gcore} or CAPS~\cite{caps}.
These systems would offer a declarative interface to our worst-case optimal joins.
The work-stealing approach of Fractal inspired our solution.
We also use a shared object to steal work within a single Spark executor.
Anyhow, our approach is simpler and less fine-grained.
They allow stealing work at every level of the depth-first enumeration of all subgraphs.
We only share work on the first level.
This makes their solution strictly more fine-grained and likely to perform better on big
queries.
We discuss this issue in our future work section~\ref{subsubsec:finer-grained-work-stealing}.
Fractal is built for Spark in cluster mode.
Hence, they allow processes to steal work from different workers.
GraphWCOJ is currently limited to a single worker.
However, their message-passing based solution is directly applicable to our system if
we extend to multiple workers.
Again, we talk about this in more depth in future work (\ref{subsubsec:cluster-mode}).
In short, we could use the same message-passing implementation but instead of stealing
from a subgraph enumerator, the work would be taken from the queue on each worker.
Both systems enlist the subgraphs in a depth-first like fashion.
In both systems, this is highly beneficial to memory usage;
the problem of breadth-first algorithms has been discussed for \textit{BigJoin} before (\cref{subsec:wcoj-timely-data-flow}).
To conclude, Fractal is a complete system with its own query interface.
This makes it more powerful than our system but also less integrated into Spark.
Hence, it forces the user to adapt to their imperative language and hinders
integration with declarative graph query languages.
We used a similar work-stealing algorithm in our work.
Both systems recognize and demonstrate the advantages of a depth-first approach.
%\subsection{Adaptive Query Exectution}
% Code generation
%A novel development in Spark is the ability to generate code to execute queries on the fly, called WholeStage code-generation, based on a technique used in the Hyper database~\cite{hyper,jira-whole-stage,1m-rows-laptop}.
%Compiled queries have been shown to be multiple magnitudes faster than interpreted queries traditionally used in most database systems.
%Interpreted queries are most commonly implemented using the Volcano model~\cite{volcano}.
%This model provides a simple and composable interface for algebraic operators; basically, every operator would provide an iterator interface.
%This interface would be used by a query by calling next on the root operator, who in turn calls next on each of its children and so on until the next calls reach the scanning operators at the bottom of the query execution tree.
%These would provide a single tuple which would then be "pulled" upwards through the query tree and processed by all operators.
%When it reaches the root operator, the result is delivered to the user.
%This happens for every tuple; hence, the approach can be described as tuple-at-a-time.
%Although, this interface is simple yet powerful due to its composability, it is also quite computation intensive mainly due to the high number of calls to the next function, which is often a virtual function call.
%This high number of virtual function call is not only CPU intensive but also makes bad use of CPU registers (they are spilled on every function call) and hinders compiler optimizations.
%Compiled queries avoid these costs by generating code specific to each query consisting mainly out of multiple, tight for-loops following each other.
%This speeds up processing by keeping data in the CPU registers as long as possible and avoiding materilization and function calls.
%Furthermore, it allows compiler optimizations, such as loop unrolling or ~\cite{hyper}
%We are not aware of any published efforts to speed up worst-case optimal joins via code generation.
%We aim to combine the research on worst-case optimal join algorithm and Spark's extensible optimizer Catalyst to speed up graph processing for all Spark users.
%In particular, this work will be based on either of the two distributed versions of worst-case optimal join algorithms mentioned above.
%We hope to further their work by evaluating which approach (shuffle + local join or timely data flow) works best on a MapReduce based processing engine as well as proving that worst-case optimal join algorithms
%can improve performance on a complex, optimized, existing platform that has not been built with them in mind originally, albeit their high additional cost (e.g. for sorting and need for special data structures).