-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
207 rollup compaction #210
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks good to me; great work! I’ve just added a few comments about naming and other small things.
* @param revisionId the revision identifier | ||
* @param blocks the index blocks | ||
*/ | ||
final case class IndexFile(file: File, revisionId: Long, blocks: Seq[Block]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the relationship between files and IndexedFiles is 1:1, why do we need two classes and not just one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably here the name IndexFile is misleading. The File data structure represents the physical file storing the data, it is also used in the QueryFile to report the query results.
…ck support and new file metadata format.
The commit 6ef20b1 introduces the idea of how the multi-block files are written. It is still work in progress, because there are lot of errors in tests. |
* functions that are Delta-specific and therefore cannot be defined directly | ||
* in the IndexFile or its companion object because thet are parts of the core. | ||
*/ | ||
private[delta] object IndexFiles { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this object is only used to provide methods from other packages, could we call it IndexFileUtils
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question. This is a pifall of Scala 2.x where functions are not first class citizens, like in Java. I followed the modern Java idiom to name utility classes, i.e classes with static methods, as noun in plural form, like Files, Paths, Executors, etc. Honestly I do not what the Scala tradition is.
.collect() | ||
.foreach(row => builder += row.cubeId -> row) | ||
builder.result() | ||
.iterator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of calling collect()
on each revisionFiles
, could we do a one single DeltaLog.snapshot.allFiles.collect()
?
I do not know in terms of performance which is the benefit, but I assume it would be more efficient to do a single collect() if we are going to process everything after sending it to the driver anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The other possibility is to keep a DataFrame with all pre-processed Qbeast classes and filter directly the files with Spark (which keeps it very tight to the query engine).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Honestly I do not know why Delta uses Dataset to return the log entries, do they think that there can be enough entries to make Dataset more efficient than a simple collection from the standard library? @osopardo1 do you mean to make revisionFiles something like lazily evaluated field which collects the DeltaLog.snapshot.allFiles just once?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have changed revisionFiles
to private lazy val with type Array[AddFile], the tests are passed so it seems to work. Thank you.
@@ -97,6 +94,19 @@ case class OTreeIndex(index: TahoeLogFileIndex) extends FileIndex with Logging { | |||
Seq(PartitionDirectory(new GenericInternalRow(Array.empty[Any]), fileStats)) | |||
} | |||
|
|||
private def queryFileToFileStatus(queryFile: QueryFile): FileStatus = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the difference between a File
and a QueryFile
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
QueryFile is a physical file represented by File with row ranges to read. Instances of QueryFile are returned by QueryExecutor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I understand. Should the file not contain anything related to row ranges, then? (as an strong constraint)
…at and RangedColumnarBatchIterator
src/main/scala/io/qbeast/spark/delta/writer/RollupWriteStrategy.scala
Outdated
Show resolved
Hide resolved
@@ -165,7 +179,7 @@ private[table] class IndexedTableImpl( | |||
}) | |||
} | |||
|
|||
isNewCubeSize || isNewSpace | |||
isNewCubeSize || isNewFileSize || isNewSpace |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have a need to create a new Revision if the desiredFileSize change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Honestly I do not know, the approach was borrowed from the preferred cube size.
@@ -146,6 +158,8 @@ private[table] class IndexedTableImpl( | |||
checkColumnsToMatchSchema(latestRevision) | |||
// Checks if the desiredCubeSize is different from the existing one | |||
val isNewCubeSize = latestRevision.desiredCubeSize != qbeastOptions.cubeSize | |||
// Checks if the desiredFileSize is different from the existing one | |||
val isNewFileSize = latestRevision.desiredFileSize != qbeastOptions.fileSize |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When fileSize
is not provided during an append
, its value should be that of the existing revison. This is handled by the method addRequiredParams
but at the moment it is not doing so.
Draft that is dropped |
Description
This is a draft PR for early reviews of the proposed changes while implementing #207. The reviewer are @cugni , @osopardo1 , @Jiaweihu08