Skip to content

Commit

Permalink
Enhance Session Job Handling and Heartbeat Mechanism (#168)
Browse files Browse the repository at this point in the history
* Enhance Session Job Handling and Heartbeat Mechanism

This commit introduces two key improvements:
1. An initial delay is added to start the heartbeat mechanism, reducing the risk of concurrent update conflicts.
2. The 'jobId' in the session store is updated at the beginning to properly manage and evict old jobs for the same session.

Testing Performed:
1. Manual sanity testing was conducted.
2. It was ensured that old jobs eventually exit after a new job for the same session starts.
3. The functionality of the heartbeat mechanism was verified to still work effectively.

Signed-off-by: Kaituo Li <[email protected]>

* Override only when --conf excludeJobIds is not empty and excludeJobIds does not contain currentJobId.

Signed-off-by: Kaituo Li <[email protected]>

* reformat code

Signed-off-by: Kaituo Li <[email protected]>

---------

Signed-off-by: Kaituo Li <[email protected]>
  • Loading branch information
kaituo authored Nov 19, 2023
1 parent 8dc9584 commit 6f16801
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,23 +109,50 @@ object FlintInstance {
maybeError)
}

def serialize(job: FlintInstance, currentTime: Long): String = {
// jobId is only readable by spark, thus we don't override jobId
Serialization.write(
Map(
"type" -> "session",
"sessionId" -> job.sessionId,
"error" -> job.error.getOrElse(""),
"applicationId" -> job.applicationId,
"state" -> job.state,
// update last update time
"lastUpdateTime" -> currentTime,
// Convert a Seq[String] into a comma-separated string, such as "id1,id2".
// This approach is chosen over serializing to an array format (e.g., ["id1", "id2"])
// because it simplifies client-side processing. With a comma-separated string,
// clients can easily ignore this field if it's not in use, avoiding the need
// for array parsing logic. This makes the serialized data more straightforward to handle.
"excludeJobIds" -> job.excludedJobIds.mkString(","),
"jobStartTime" -> job.jobStartTime))
/**
* After the initial setup, the 'jobId' is only readable by Spark, and it should not be
* overridden. We use 'jobId' to ensure that only one job can run per session. In the case of a
* new job for the same session, it will override the 'jobId' in the session document. The old
* job will periodically check the 'jobId.' If the read 'jobId' does not match the current
* 'jobId,' the old job will exit early. Therefore, it is crucial that old jobs do not overwrite
* the session store's 'jobId' field after the initial setup.
*
* @param job
* Flint session object
* @param currentTime
* current timestamp in milliseconds
* @param includeJobId
* flag indicating whether to include the "jobId" field in the serialization
* @return
* serialized Flint session
*/
def serialize(job: FlintInstance, currentTime: Long, includeJobId: Boolean = true): String = {
val baseMap = Map(
"type" -> "session",
"sessionId" -> job.sessionId,
"error" -> job.error.getOrElse(""),
"applicationId" -> job.applicationId,
"state" -> job.state,
// update last update time
"lastUpdateTime" -> currentTime,
// Convert a Seq[String] into a comma-separated string, such as "id1,id2".
// This approach is chosen over serializing to an array format (e.g., ["id1", "id2"])
// because it simplifies client-side processing. With a comma-separated string,
// clients can easily ignore this field if it's not in use, avoiding the need
// for array parsing logic. This makes the serialized data more straightforward to handle.
"excludeJobIds" -> job.excludedJobIds.mkString(","),
"jobStartTime" -> job.jobStartTime)

val resultMap = if (includeJobId) {
baseMap + ("jobId" -> job.jobId)
} else {
baseMap
}

Serialization.write(resultMap)
}

def serializeWithoutJobId(job: FlintInstance, currentTime: Long): String = {
serialize(job, currentTime, includeJobId = false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class FlintInstanceTest extends SparkFunSuite with Matchers {
1620000001000L,
excludedJobIds)
val currentTime = System.currentTimeMillis()
val json = FlintInstance.serialize(instance, currentTime)
val json = FlintInstance.serializeWithoutJobId(instance, currentTime)

json should include(""""applicationId":"app-123"""")
json should not include (""""jobId":"job-456"""")
Expand Down Expand Up @@ -80,7 +80,7 @@ class FlintInstanceTest extends SparkFunSuite with Matchers {
Seq.empty[String],
Some("Some error occurred"))
val currentTime = System.currentTimeMillis()
val json = FlintInstance.serialize(instance, currentTime)
val json = FlintInstance.serializeWithoutJobId(instance, currentTime)

json should include(""""error":"Some error occurred"""")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
private val MAPPING_CHECK_TIMEOUT = Duration(1, MINUTES)
private val DEFAULT_QUERY_EXECUTION_TIMEOUT = Duration(10, MINUTES)
private val DEFAULT_QUERY_WAIT_TIMEOUT_MILLIS = 10 * 60 * 1000
val INITIAL_DELAY_MILLIS = 3000L

def update(flintCommand: FlintCommand, updater: OpenSearchUpdater): Unit = {
updater.update(flintCommand.statementId, FlintCommand.serialize(flintCommand))
Expand Down Expand Up @@ -121,7 +122,8 @@ object FlintREPL extends Logging with FlintJobExecutor {
sessionId.get,
threadPool,
osClient,
sessionIndex.get)
sessionIndex.get,
INITIAL_DELAY_MILLIS)

if (setupFlintJobWithExclusionCheck(
conf,
Expand Down Expand Up @@ -322,18 +324,25 @@ object FlintREPL extends Logging with FlintJobExecutor {
sessionIndex: String,
jobStartTime: Long,
excludeJobIds: Seq[String] = Seq.empty[String]): Unit = {
val flintJob =
new FlintInstance(
applicationId,
jobId,
sessionId,
"running",
currentTimeProvider.currentEpochMillis(),
jobStartTime,
excludeJobIds)
flintSessionIndexUpdater.upsert(
val includeJobId = !excludeJobIds.isEmpty && !excludeJobIds.contains(jobId)
val currentTime = currentTimeProvider.currentEpochMillis()
val flintJob = new FlintInstance(
applicationId,
jobId,
sessionId,
FlintInstance.serialize(flintJob, currentTimeProvider.currentEpochMillis()))
"running",
currentTime,
jobStartTime,
excludeJobIds)

val serializedFlintInstance = if (includeJobId) {
FlintInstance.serialize(flintJob, currentTime, true)
} else {
FlintInstance.serializeWithoutJobId(flintJob, currentTime)
}

flintSessionIndexUpdater.upsert(sessionId, serializedFlintInstance)

logDebug(
s"""Updated job: {"jobid": ${flintJob.jobId}, "sessionId": ${flintJob.sessionId}} from $sessionIndex""")
}
Expand Down Expand Up @@ -391,7 +400,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
val currentTime = currentTimeProvider.currentEpochMillis()
flintSessionIndexUpdater.upsert(
sessionId,
FlintInstance.serialize(flintInstance, currentTime))
FlintInstance.serializeWithoutJobId(flintInstance, currentTime))
}

/**
Expand Down Expand Up @@ -816,7 +825,9 @@ object FlintREPL extends Logging with FlintJobExecutor {

flintSessionIndexUpdater.updateIf(
sessionId,
FlintInstance.serialize(flintInstance, currentTimeProvider.currentEpochMillis()),
FlintInstance.serializeWithoutJobId(
flintInstance,
currentTimeProvider.currentEpochMillis()),
getResponse.getSeqNo,
getResponse.getPrimaryTerm)
}
Expand All @@ -833,14 +844,17 @@ object FlintREPL extends Logging with FlintJobExecutor {
* the thread pool.
* @param osClient
* the OpenSearch client.
* @param initialDelayMillis
* the intial delay to start heartbeat
*/
def createHeartBeatUpdater(
currentInterval: Long,
flintSessionUpdater: OpenSearchUpdater,
sessionId: String,
threadPool: ScheduledExecutorService,
osClient: OSClient,
sessionIndex: String): Unit = {
sessionIndex: String,
initialDelayMillis: Long): Unit = {

threadPool.scheduleAtFixedRate(
new Runnable {
Expand All @@ -853,7 +867,9 @@ object FlintREPL extends Logging with FlintJobExecutor {
flintInstance.state = "running"
flintSessionUpdater.updateIf(
sessionId,
FlintInstance.serialize(flintInstance, currentTimeProvider.currentEpochMillis()),
FlintInstance.serializeWithoutJobId(
flintInstance,
currentTimeProvider.currentEpochMillis()),
getResponse.getSeqNo,
getResponse.getPrimaryTerm)
}
Expand All @@ -867,7 +883,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
}
}
},
0L,
initialDelayMillis,
currentInterval,
java.util.concurrent.TimeUnit.MILLISECONDS)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ class FlintREPLTest
"session1",
threadPool,
osClient,
"sessionIndex")
"sessionIndex",
0)

// Verifications
verify(osClient, atLeastOnce()).getDoc("sessionIndex", "session1")
Expand Down

0 comments on commit 6f16801

Please sign in to comment.