Qbeast Spark is an Apache Spark extension that enhances data processing in Data Lakehouses. It provides advanced multi-dimensional filtering and efficient data sampling, enabling faster and more accurate queries. The extension also maintains ACID properties for data integrity and reliability, making it ideal for handling large-scale data efficiently.
-
Data Lakehouse - Data lake with ACID properties, thanks to the underlying Delta Lake architecture
-
Multi-column indexing: Filter your data with multiple columns using the Qbeast Format.
-
Improved Sampling operator - Read statistically significant subsets of files.
-
Table Tolerance - Model for sampling fraction and query accuracy trade-off.
As you can see above, the Qbeast Spark extension allows faster queries with statistically accurate sampling.
Format | Execution Time | Result |
---|---|---|
Delta | ~ 151.3 sec. | 37.869383 |
Qbeast | ~ 6.6 sec. | 37.856333 |
In this example, 1% sampling provides the result x22 times faster compared to using Delta format, with an error of 0,034%.
Explore the documentation for more details:
- Quickstart for Qbeast-Spark
- Data Lakehouse with Qbeast Format
- OTree Algorithm
- QbeastTable
- Columns To Index Selector
- Recommendations for different Cloud Storage systems
- Advanced configurations
- Qbeast Metadata
- FAQ: Frequently Asked Questions
You can run the qbeast-spark application locally on your computer, or using a Docker image we already prepared with the dependencies. You can find it in the Packages section.
Download Spark 3.5.0 with Hadoop 3.3.4, unzip it, and create the SPARK_HOME
environment variable:
ℹ️ Note: You can use Hadoop 2.7 if desired, but you could have some troubles with different cloud providers' storage, read more about it here.
wget https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
tar -xzvf spark-3.5.0-bin-hadoop3.tgz
export SPARK_HOME=$PWD/spark-3.5.0-bin-hadoop3
Inside the project folder, launch a spark shell with the required dependencies:
$SPARK_HOME/bin/spark-shell \
--packages io.qbeast:qbeast-spark_2.12:0.7.0,io.delta:delta-spark_2.12:3.1.0 \
--conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog
Read the CSV source file placed inside the project.
val csvDF = spark.read.format("csv").
option("header", "true").
option("inferSchema", "true").
load("./src/test/resources/ecommerce100K_2019_Oct.csv")
Indexing the dataset by writing it into the qbeast format, specifying the columns to index.
val tmpDir = "/tmp/qbeast-spark"
csvDF.write.
mode("overwrite").
format("qbeast").
option("columnsToIndex", "user_id,product_id").
save(tmpDir)
You can create a table with Qbeast with the help of QbeastCatalog
.
spark.sql(
"CREATE TABLE student (id INT, name STRING, age INT) " +
"USING qbeast OPTIONS ('columnsToIndex'='id')")
Use INSERT INTO
to add records to the new table. It will update the index in a dynamic fashion when new data is inserted.
val studentsDF = Seq((1, "Alice", 34), (2, "Bob", 36)).toDF("id", "name", "age")
studentsDF.write.mode("overwrite").saveAsTable("visitor_students")
// AS SELECT FROM
spark.sql("INSERT INTO table student SELECT * FROM visitor_students")
// VALUES
spark.sql("INSERT INTO table student VALUES (3, 'Charlie', 37)")
// SHOW
spark.sql("SELECT * FROM student").show()
+---+-------+---+
| id| name|age|
+---+-------+---+
| 1| Alice| 34|
| 2| Bob| 36|
| 3|Charlie| 37|
+---+-------+---+
Load the newly indexed dataset.
val qbeastDF =
spark.
read.
format("qbeast").
load(tmpDir)
Sampling the data, notice how the sampler is converted into filters and pushed down to the source!
qbeastDF.sample(0.1).explain(true)
Go to the Quickstart or notebook for more details.
Get insights to the data using the QbeastTable
interface!
import io.qbeast.spark.QbeastTable
val qbeastTable = QbeastTable.forPath(spark, tmpDir)
qbeastTable.getIndexMetrics()
Optimize is an expensive operation that consist on rewriting part of the files to accomplish better layout and improving query performance.
To minimize write amplification of this command, we execute it based on subsets of the table, like Revision ID's
or specific files.
Read more about
Revision
and find an example here.
These are the 3 ways of executing the optimize
operation:
qbeastTable.optimize() // Optimizes the last Revision Available.
// This does NOT include previous Revision's optimizations.
qbeastTable.optimize(2L) // Optimizes the Revision number 2.
qbeastTable.optimize(Seq("file1", "file2")) // Optimizes the specific files
If you want to optimize the full table, you must loop through revisions
:
val revisions = qbeastTable.revisionsIDs() // Get all the Revision ID's available in the table.
revisions.foreach(revision =>
qbeastTable.optimize(revision)
)
Go to QbeastTable documentation for more detailed information.
Use Python index visualizer for your indexed table to visually examine index structure and gather sampling metrics.
Version | Spark | Hadoop | Delta Lake |
---|---|---|---|
0.1.0 | 3.0.0 | 3.2.0 | 0.8.0 |
0.2.0 | 3.1.x | 3.2.0 | 1.0.0 |
0.3.x | 3.2.x | 3.3.x | 1.2.x |
0.4.x | 3.3.x | 3.3.x | 2.1.x |
0.5.x | 3.4.x | 3.3.x | 2.4.x |
0.6.x | 3.5.x | 3.3.x | 3.1.x |
0.7.x | 3.5.x | 3.3.x | 3.1.x |
Check here for Delta Lake and Apache Spark version compatibility.
See Contribution Guide for more information.
See LICENSE.
See Code of conduct