-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathspark-integration.tex
207 lines (179 loc) · 13.7 KB
/
spark-integration.tex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
\section{Spark integration}\label{sec:spark-integration}
\subsection{User interface} \label{ssec:user-interface}
\begin{listing}[H]
\centering
\inputminted{scala}{code/usage-example.scala}
\caption{Example usage of a \textsc{WCOJ} to find triangles in graph.}
\label{lst:usage-example}
\end{listing}
As one can see in line 16 % LINE
of~\cref{lst:usage-example}, we support a clean and precise DSL to match patterns in graphs.
This DSL is inspired by Spark's API for graphs named GraphFrames~\cite{graphframe}.
The user can define a pattern by its edges, each edge is written as \textit{(a) - [] -> (b)} where \textit{a} is the
source vertice and \textit{b} is the destination, multiple edges are separated by a semicolon.
A connected pattern is expressed by defining multiple edges with the same source or destination.
One should be aware, that a named source or destination is not guaranteed to be a distinct element in the graph,
e.g. \textit{(a) - [] -> (b); (b) - [] -> (c)} could be a linear path of size two or a circle between \textit{a} and
\textit{b}; in the second case \textit{a} and \textit{c} are the same element.
The reader might wonder, why we chose to stay with the GraphFrame syntax for edges of
\textit{- [] ->}, although, we could have went with something simpler, like \textit{->}.
However, sticking to the more verbose syntax allows us to include labels restriction inside of the squared brackets
in future extensions, e.g. for the integration with neo4J's \textsc{CAPS} system.
The second parameter to \textit{findPattern} allows the user to specify the variable ordering used in the \textsc{WCOJ} algorithm.
Furthermore, the user interface takes multiple optional arguments, e.g. to apply to common filters to the output of the result.
The filters are \textit{distinctFilter}, ensuring that each vertice can occur only as binding for one variable, and
\textit{smallerThanFilter} to allow only output bindings were the values decrease with regards to the specified variable ordering,
e.g. the binding \textit{[1, 2, 3]} but not \textit{[2, 1, 3]} for the triangle query above.
We experienced that these queries are typical for graph queries and that the performance greatly benefits from pushing
them into the join.
Implementing the possibility to push general filters into the join would be a valuable addition but we decided against it because
it is a pure engineering task.
\subsection{Integration with Catalyst} \label{ssec:integration-with-catalyst}
We integrated our \textsc{WCOJ} implementation into Spark such that it can be used as function on \textit{Datasets}.
Therefore, we build all components necessary to execute a \textsc{WCOJ} in Spark's structured queries, provided by Catalyst (see
\cref{subsubsec:catalyst}).
We start out with a logical plan that consists of a \textsc{WCOJ} operator.
Then we introduce a new strategy to convert this logical operator into multiple physical operators.
One physical operator executes the join.
In between, this operator and the graph edge relationships, we execute another physical operator that materializes
the graph edge relationships into a data structure that can support a \textit{TrieIterator} interface
(see~\cref{subsubsec:leapfrog-triejoin}).
This graph edge relationship can be cached so it needs to be computed only once on startup.
The integration itself is quite straightforward due to Catalysts extendability.
We explain it in the next sections.
First, we highlight some limitations of our integration into Catalyst.
It is not in the scope of this work to integrate \textsc{WCOJ}'s into the SQL parser of Spark.
Hence, \textsc{WCOJ} can only be used by Spark's Scala functional interface and not through Spark's SQL queries.
We do not integrate it into the query optimization components of Catalyst, e.g. we do not provide rules or cost-based strategies to
decide when to use a \textsc{WCOJ} or a binary join.
It is up to the user to decide when to use a \textsc{WCOJ} or a binary join.
However, our integration allows the user to intermix these freely.
The reason for this decision is, that at time of writing no published paper existed that systematically studies which queries benefit from
\textsc{WCOJ}s in general, nor, does research exist that studies the combination of \textsc{WCOJ}s and binary joins.
Only a month after we decided on our scope for Spark integration Salihoglu et al. published an arXiv paper~\cite{mhedhbi2019} that
tackled these problems for the first time.
The lack of peer-reviewed papers and the high complexity of the arXiv paper confirm that deeper integration with
Spark's optimizer is out of scope for this thesis.
\subsection{A sequential linear Leapfrog Triejoin} \label{subsec:spark-integration-seq}
For this section we assume that the reader is familiar with the background section about Catalyst (see \cref{subsubsec:catalyst}) and
\textsc{LFTJ} (see \cref{subsubsec:leapfrog-triejoin}) where we explain the components of Catalyst planning phase and the requirements
of a Leapfrog Triejoin.
In the current section, we outline how to satisfy the requirements of \textsc{LFTJ} within and help of Catalysts structured plans.
Our baseline implementation of the Leapfrog Triejoin is a sequential implemenation, i.e. it is not distributed.
Therefore, all representations of the edge relationship have only a single partition which the join operates upon.
In Spark this partitioning is called \textit{AllTuples}.
We enforce sequential execution of the complete Spark plan in our experiments by setting the number of executors to 1.
In the first phases of the Catalyst query compilation process, the query plan is represented by logical operators.
Integration into this phase only requires us to build a logical operator to represent the \textsc{WCOJ} join.
The only thing that we need to describe for this logical operator are the number of children.
A \textsc{LFTJ} can have 2 or more children, one for each input relationship.
The logical and physical plan for the triangle query is shown in~\cref{fig:lftj-catalyst-plan}.
\begin{figure}
\subfloat[Logical plan]{\includesvg[width=0.5\textwidth]{svg/lftj-logical-catalyst-plan}}
\subfloat[Physical plan]{\includesvg[width=0.5\textwidth]{svg/lftj-physical-catalyst-plan}}
\caption{Catalyst plans for the triangle query using a Leapfrog Triejoin.}
\label{fig:lftj-catalyst-plan}
\end{figure}
The strategy to translate the logical plan into a physical plan has two tasks.
First, simply translating the n-ary logical plan into n-ary physical plan that executes the \textsc{LFTJ} with the children as input.
Second, introducing a physical operator per child which materializes the RDD into a sorted, columnar array representation to support a
\textit{TrieIterator} interface.
The first physical operator is straightforward to implement.
It simply executes a \textit{LFTJ} over the \textit{TrieIterators} provided by the children.
Given that each child has only one partition and there are no parallel operations, the algorithm can be implemented exactly
as described in~\cref{subsubsec:leapfrog-triejoin}.
The second physical operator translates the linear interator interface offered by Spark for RDDs into a \textit{TrieIterator} interface.
In particular, it needs to offer a \textit{seek} operation in $\mathcal{O} (\log N) $.
To support this interface the operator requires its children to be sorted;
this requirement can be fulfilled by Catalyst standard optimization rules.
Then it takes the sorted linear iterator and materializes it into a column-wise array structure.
Given this data structure, the \textit{TrieIterator} can be implemented using binary search (as described
in~\cref{subsubsec:leapfrog-triejoin}).
\subsection{GraphWCOJ} \label{subsec:spark-integration-graphWCOJ}
The integration of GraphWCOJ is quite similar to the one of \textsc{LFTJ}.
\Cref{fig:graphWCOJ-catalyst-plan} shows the logical and physical plan constructred by our integration.
There are three main differences: GraphWCOJ requires only two children for the input relationships, the children materialize the input
relationships in a CSR data structure (see~\cref{subsec:csr-background}) and we support parallel execution of the join (which requires
a third child) and broadcasting of the CSR data structure.
We address these differences in order.
\begin{figure}
\subfloat[Logical plan]{\includesvg[width=0.5\textwidth]{svg/graphWCOJ-logical-catalyst-plan}}
\subfloat[Physical plan]{\includesvg[width=0.5\textwidth]{svg/graphWCOJ-physical-catalyst-plan}}
\caption{Catalyst plans for the triangle query using GraphWCOJ.}
\label{fig:graphWCOJ-catalyst-plan}
\end{figure}
Our GraphWCOJ operators need only two materialized versions of the input relationship.
This is because in graph pattern matching the joins are self-joins on the edge relationship.
This relationship has two attributes.
The \textsc{LFTJ} requires that its input relationships are sorted by an lexicographic sorting over the variable ordering.
To support all possible variable orderings, we need the edge relationship sorted by $src, dst$ and $dst, src$.
Hence, we need to separate, materialized versions of the edge relationship.
However, we never need more materialized relationships because all \textit{TrieIterators} can share the same underlying datastructures.
GraphWCOJ uses a CSR representation of the edge relationship (see~\cref{sec:graphwcoj}).
Hence, we need to build two CSR representations.
One with its \textit{Indices} array build from the $src$ attribute and the \textit{AdjancencyLists} array build from the $dst$ attribute.
The other, with \textit{Indices} from $dst$ and \textit{AdjacencyLists} from $src$.
Next, we describe how to build these CSRs from two linear, sorted, row wise iterators as provided by Spark for the two child relationships.
First, we note that it is necessary to build both compressed sparse row data structures in tandem.
This is because some vertices in the graph might have no outgoing or incoming edges.
That means some vertice ID's do not occur in the $src$ or $dst$ attributes of any tuple.
Therefore, the \textit{Indices} arrays of the two CSR structures would differ if they are built from either $src$ and $dst$, e.g.
they could have different length.
However, if this is the case, it is not possible to use both CSRs together in a single join.
To allow building the CSRs in-tandem, we introduce an \textit{AlignedZippedIterator}.
The \textit{next} method of this iterator is shown in~\cref{alg:alignedZippedIterator}.
It zips two iterators of two-tuple elements and aligns them on the first component, e.g. edges with $src$ as first component and $dst$ as
second component.
The zipped iterator emits triples where the first component is the aligned first element of both underlying iterators and the other
two elements are the second components of both iterators.
If the two iterators have different numbers of elements with the same first component, we advance only one iterator and fill the missing
component in the emitted triple with a placeholder until the first component of both iterators aligns again.
\begin{algorithm}
\uIf{\upshape iter1.hasNext() $\land$ iter2.hasNext()} {
t1 $\leftarrow$ iter1.peek() \;
t2 $\leftarrow$ iter2.peek() \;
\uIf {\upshape t1[0] = t2[0]} {
t1 $\leftarrow$ iter1.next()\;
t2 $\leftarrow $ iter2.next()\;
\KwRet{\upshape (t1[0], t1[1], t2[1])}\;
} \uElseIf{\upshape t1[0] < t2[0]} {
t $\leftarrow$ iter1.next()\;
\KwRet{\upshape (t[0], t[1], -1)} \;
} \Else {
t $\leftarrow$ iter2.next()\;
\KwRet{\upshape (t[0], -1, t[1])} \;
}
} \uElseIf {\upshape iter1.hasNext()} {
t $\leftarrow$ iter1.next() \;
\KwRet{\upshape (t[0], t[1], -1)} \;
} \Else {
t $\leftarrow$ iter2.next()\;
\KwRet{\upshape (t[0], -1, t[1])} \;
}
\caption{\textit{next} method of an \textit{AlignedZippedIterator}.}
\label{alg:alignedZippedIterator}
\end{algorithm}
Given an \textit{AlignedZippedIterator} over both input relationships, it is straightforward to build two CSR data structures.
We consume the whole \textit{AlignedZippedIterator}, for each element we append the 2nd and 3rd component to \textit{AdjacencyLists} of
the CSRs;
while skipping placeholders.
Whenever the first element of the three tuple changes, we append the current size of the \textit{AdjaencyLists} buffers to the
\textit{Idices} arrays.
The final difference between the Spark integration for \textsc{LFTJ} and GraphWCOJ is that we build GraphWCOJ such that it can
be run in parallel.
As argued in former chapters, we broadcast the edge relationship to all workers.
The broadcast is supported by Spark's broadcast variables (see~\cref{subsubsec:broadcast-variables}) and Catalysts support to broadcast
the execution of a physical operator.
Parallelism is introduced via the third child of our GraphWCOJ operators.
It is an empty RDD with as many partitions as the desired level of parallelism.
We schedule tasks by using the \textit{mapPartitions} function of this empty RDD.
For each partition, we run the \textsc{WCOJ} join backing its \textit{TrieIterator} with the broadcasted CSRs and partition the data
logical by one of the schemes described in~\cref{sec:worst-case-optimal-join-parallelization}.
One of the main advantages of broadcasting the edge relationship to all workers is that we can reuse the same broadcast for all queries
over the same graph.
To support this in Catalyst, we introduce one additional physical operator which we call \textit{ReusedCSRBroadcast} and a CSR broadcast
variable cache maintained on the Spark master node.
The CSR broadcast variable cache is a simple dictionary with RDD ID's to broadcast variables of CSR structures.
When, our system builds a broadcasted CSR structure, it registers the broadcast in the cache.
Every time, we translate a logical \textsc{WCOJ} plan into a physical one, we check if the CSR for edge relationship has been broadcasted already
if so, our strategy reuses this broadcast.