Skip to content

Commit

Permalink
rename row_number into line_num and add file_name column in the table…
Browse files Browse the repository at this point in the history
… of app_logs
  • Loading branch information
naive-zhang committed Dec 20, 2024
1 parent 94a570d commit 2efb0df
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class YarnAppPartitionReader(yarnAppPartition: YarnAppPartition)
case validState if validYarnStateSet.contains(validState) =>
yarnClient.getApplications(
java.util.EnumSet.of(YarnApplicationState.valueOf(validState)))
case _ => java.util.Collections.EMPTY_LIST[ApplicationReport]
case _ => Seq.empty[ApplicationReport].asJava
}
case EqualTo("type", appType: String) =>
yarnClient.getApplications(java.util.Collections.singleton(appType))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@

package org.apache.kyuubi.spark.connector.yarn

import java.io.{BufferedReader, InputStreamReader}

import scala.collection.mutable.ArrayBuffer
import scala.util.matching.Regex

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.IOUtils
Expand All @@ -30,6 +25,10 @@ import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.connector.read.PartitionReader
import org.apache.spark.unsafe.types.UTF8String

import java.io.{BufferedReader, InputStreamReader}
import scala.collection.mutable.ArrayBuffer
import scala.util.matching.Regex

class YarnLogPartitionReader(yarnLogPartition: YarnLogPartition)
extends PartitionReader[InternalRow] {

Expand All @@ -50,7 +49,8 @@ class YarnLogPartitionReader(yarnLogPartition: YarnLogPartition)
UTF8String.fromString(yarnLog.user),
UTF8String.fromString(yarnLog.host),
UTF8String.fromString(yarnLog.containerId),
yarnLog.rowNumber,
yarnLog.lineNumber,
UTF8String.fromString(yarnLog.fileName),
UTF8String.fromString(yarnLog.message)))
}

Expand Down Expand Up @@ -101,6 +101,7 @@ class YarnLogPartitionReader(yarnLogPartition: YarnLogPartition)
s"${containerHost}_${containerSuffix}",
containerHost,
lineNumber,
path.getName,
line)
}
logEntries
Expand All @@ -119,5 +120,6 @@ case class LogEntry(
user: String,
containerId: String,
host: String,
rowNumber: Int,
lineNumber: Int,
fileName: String,
message: String)
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ class YarnLogTable extends Table with SupportsRead {
StructField("user", StringType, nullable = false),
StructField("host", StringType, nullable = false),
StructField("container_id", StringType, nullable = false),
StructField("row_number", IntegerType, nullable = false),
StructField("line_num", IntegerType, nullable = false),
StructField("file_name", StringType, nullable = false),
StructField("message", StringType, nullable = true)))

override def capabilities(): util.Set[TableCapability] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ trait WithKyuubiServerAndYarnMiniCluster extends KyuubiFunSuite with WithKyuubiS
}

def submitMockTasksInParallelTreeTimes(): Unit = {
val threads = (1 to 100).map { i =>
val threads = (1 to 5).map { i =>
new Thread(() => {
info(s"Starting submission in thread $i")
try {
Expand Down

0 comments on commit 2efb0df

Please sign in to comment.