Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve logging in DeltaSharingLogFileSystem for delta sharing queries #4207

Merged
merged 3 commits into from
Mar 5, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ private[sharing] class DeltaSharingLogFileSystem extends FileSystem with Logging
val iterator =
SparkEnv.get.blockManager.get[String](getDeltaSharingLogBlockId(f.toString)) match {
case Some(block) => block.data.asInstanceOf[Iterator[String]]
case _ => throw new FileNotFoundException(s"Cannot find block for delta log file: $f.")
case _ => throw new FileNotFoundException(s"Failed to open delta log file: $f.")
}
// Explicitly call hasNext to allow the reader lock on the block to be released.
val arrayBuilder = Array.newBuilder[Byte]
Expand All @@ -72,7 +72,7 @@ private[sharing] class DeltaSharingLogFileSystem extends FileSystem with Logging
// This still exposes the risk of OOM.
new FSDataInputStream(new SeekableByteArrayInputStream(arrayBuilder.result()))
} else {
val content = getBlockAndReleaseLockHelper[String](f, None)
val content = getBlockAndReleaseLockHelper[String](f, None, "open")
new FSDataInputStream(new SeekableByteArrayInputStream(
content.getBytes(StandardCharsets.UTF_8)
))
Expand Down Expand Up @@ -102,7 +102,7 @@ private[sharing] class DeltaSharingLogFileSystem extends FileSystem with Logging
modificationTime = 0L
)
} else {
getBlockAndReleaseLockHelper[DeltaSharingLogFileStatus](f, Some("_status"))
getBlockAndReleaseLockHelper[DeltaSharingLogFileStatus](f, Some("_status"), "getFileStatus")
}

new FileStatus(
Expand All @@ -128,7 +128,7 @@ private[sharing] class DeltaSharingLogFileSystem extends FileSystem with Logging
SparkEnv.get.blockManager
.get[DeltaSharingLogFileStatus](getDeltaSharingLogBlockId(f.toString)) match {
case Some(block) => block.data.asInstanceOf[Iterator[DeltaSharingLogFileStatus]]
case _ => throw new FileNotFoundException(s"Failed to list files for path: $f.")
case _ => throw new FileNotFoundException(s"Failed to listStatus for path: $f.")
}

// Explicitly call hasNext to allow the reader lock on the block to be released.
Expand Down Expand Up @@ -186,10 +186,11 @@ private[sharing] class DeltaSharingLogFileSystem extends FileSystem with Logging
super.close()
}

private def getBlockAndReleaseLockHelper[T: ClassTag](f: Path, suffix: Option[String]): T = {
private def getBlockAndReleaseLockHelper[T: ClassTag](
f: Path, suffix: Option[String], caller: String): T = {
val blockId = getDeltaSharingLogBlockId(suffix.foldLeft(f.toString)(_ + _))
val result = SparkEnv.get.blockManager.getSingle[T](blockId).getOrElse {
throw new FileNotFoundException(f.toString)
throw new FileNotFoundException(s"Failed to $caller for $f.")
}
releaseLockHelper(blockId)

Expand Down Expand Up @@ -839,7 +840,7 @@ private[sharing] object DeltaSharingLogFileSystem extends Logging {
)
logInfo(
s"It takes ${(System.currentTimeMillis() - startTime) / 1000.0}s to construct delta" +
s" log for $customTablePath with ${idToUrl.toMap.size} urls."
s" log for $customTablePath with ${jsonLogSize} bytes for ${idToUrl.toMap.size} urls."
)
ConstructedDeltaLogMetadata(
idToUrl = idToUrl.toMap,
Expand Down