Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

207 rollup compaction #210

Closed
wants to merge 40 commits into from

Conversation

alexeiakimov
Copy link
Contributor

Description

This is a draft PR for early reviews of the proposed changes while implementing #207. The reviewer are @cugni , @osopardo1 , @Jiaweihu08

@alexeiakimov alexeiakimov self-assigned this Aug 15, 2023
@alexeiakimov alexeiakimov marked this pull request as draft August 15, 2023 23:24
@codecov
Copy link

codecov bot commented Aug 16, 2023

Codecov Report

Attention: 70 lines in your changes are missing coverage. Please review.

Comparison is base (f066acf) 92.00% compared to head (1f39ca8) 89.96%.

❗ Current head 1f39ca8 differs from pull request most recent head 03440b6. Consider uploading reports for the commit 03440b6 to get more accurate results

Files Patch % Lines
...beast/spark/delta/writer/LegacyWriteStrategy.scala 0.00% 21 Missing ⚠️
...ain/scala/io/qbeast/spark/table/IndexedTable.scala 28.00% 18 Missing ⚠️
...c/main/scala/io/qbeast/spark/delta/CubeIndex.scala 78.78% 7 Missing ⚠️
...main/scala/io/qbeast/spark/delta/BlocksCodec.scala 90.74% 5 Missing ⚠️
...qbeast/spark/delta/writer/IndexFileGenerator.scala 87.87% 4 Missing ⚠️
...rc/main/scala/io/qbeast/core/model/IndexFile.scala 80.00% 3 Missing ⚠️
...ast/spark/internal/sources/ColumnVectorSlice.scala 89.47% 2 Missing ⚠️
...internal/sources/RangedColumnarBatchIterator.scala 92.59% 2 Missing ⚠️
src/main/scala/io/qbeast/spark/utils/Params.scala 33.33% 2 Missing ⚠️
...in/scala/io/qbeast/spark/delta/writer/Rollup.scala 96.00% 1 Missing ⚠️
... and 5 more
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #210      +/-   ##
==========================================
- Coverage   92.00%   89.96%   -2.04%     
==========================================
  Files          88      109      +21     
  Lines        2214     2662     +448     
  Branches      168      195      +27     
==========================================
+ Hits         2037     2395     +358     
- Misses        177      267      +90     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Member

@cugni cugni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks good to me; great work! I’ve just added a few comments about naming and other small things.

* @param revisionId the revision identifier
* @param blocks the index blocks
*/
final case class IndexFile(file: File, revisionId: Long, blocks: Seq[Block]) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the relationship between files and IndexedFiles is 1:1, why do we need two classes and not just one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably here the name IndexFile is misleading. The File data structure represents the physical file storing the data, it is also used in the QueryFile to report the query results.

@alexeiakimov
Copy link
Contributor Author

alexeiakimov commented Aug 20, 2023

The commit 6ef20b1 introduces the idea of how the multi-block files are written. It is still work in progress, because there are lot of errors in tests.

* functions that are Delta-specific and therefore cannot be defined directly
* in the IndexFile or its companion object because thet are parts of the core.
*/
private[delta] object IndexFiles {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this object is only used to provide methods from other packages, could we call it IndexFileUtils?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. This is a pifall of Scala 2.x where functions are not first class citizens, like in Java. I followed the modern Java idiom to name utility classes, i.e classes with static methods, as noun in plural form, like Files, Paths, Executors, etc. Honestly I do not what the Scala tradition is.

.collect()
.foreach(row => builder += row.cubeId -> row)
builder.result()
.iterator
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of calling collect() on each revisionFiles, could we do a one single DeltaLog.snapshot.allFiles.collect()?

I do not know in terms of performance which is the benefit, but I assume it would be more efficient to do a single collect() if we are going to process everything after sending it to the driver anyway.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other possibility is to keep a DataFrame with all pre-processed Qbeast classes and filter directly the files with Spark (which keeps it very tight to the query engine).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly I do not know why Delta uses Dataset to return the log entries, do they think that there can be enough entries to make Dataset more efficient than a simple collection from the standard library? @osopardo1 do you mean to make revisionFiles something like lazily evaluated field which collects the DeltaLog.snapshot.allFiles just once?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have changed revisionFiles to private lazy val with type Array[AddFile], the tests are passed so it seems to work. Thank you.

@@ -97,6 +94,19 @@ case class OTreeIndex(index: TahoeLogFileIndex) extends FileIndex with Logging {
Seq(PartitionDirectory(new GenericInternalRow(Array.empty[Any]), fileStats))
}

private def queryFileToFileStatus(queryFile: QueryFile): FileStatus = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference between a File and a QueryFile?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QueryFile is a physical file represented by File with row ranges to read. Instances of QueryFile are returned by QueryExecutor.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I understand. Should the file not contain anything related to row ranges, then? (as an strong constraint)

@Qbeast-io Qbeast-io deleted a comment from alexeiakimov Sep 19, 2023
@@ -165,7 +179,7 @@ private[table] class IndexedTableImpl(
})
}

isNewCubeSize || isNewSpace
isNewCubeSize || isNewFileSize || isNewSpace
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a need to create a new Revision if the desiredFileSize change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly I do not know, the approach was borrowed from the preferred cube size.

@@ -146,6 +158,8 @@ private[table] class IndexedTableImpl(
checkColumnsToMatchSchema(latestRevision)
// Checks if the desiredCubeSize is different from the existing one
val isNewCubeSize = latestRevision.desiredCubeSize != qbeastOptions.cubeSize
// Checks if the desiredFileSize is different from the existing one
val isNewFileSize = latestRevision.desiredFileSize != qbeastOptions.fileSize
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When fileSize is not provided during an append, its value should be that of the existing revison. This is handled by the method addRequiredParams but at the moment it is not doing so.

@osopardo1 osopardo1 mentioned this pull request Oct 23, 2023
@cdelfosse cdelfosse closed this Nov 27, 2023
@cdelfosse cdelfosse reopened this Nov 27, 2023
@cdelfosse
Copy link
Contributor

Draft that is dropped

@cdelfosse cdelfosse closed this Nov 27, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants