Skip to content

Commit

Permalink
HDDS-11680. Enhance Recon Metrics For Improved Observability (#7517)
Browse files Browse the repository at this point in the history
  • Loading branch information
devabhishekpal authored Jan 19, 2025
1 parent c9a530a commit f90e625
Show file tree
Hide file tree
Showing 49 changed files with 1,200 additions and 408 deletions.
125 changes: 125 additions & 0 deletions hadoop-ozone/dist/src/main/smoketest/recon/recon-taskstatus.robot
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

*** Settings ***
Documentation Test to validate the recon task status API works properly
Library OperatingSystem
Library String
Library BuiltIn
Library Collections
Resource ../ozone-lib/freon.robot
Resource ../commonlib.robot
Test Timeout 5 minutes

*** Variables ***
${BASE_URL} http://recon:9888
${TASK_STATUS_ENDPOINT} ${BASE_URL}/api/v1/task/status
${TRIGGER_SYNC_ENDPOINT} ${BASE_URL}/api/v1/triggerdbsync/om
${TASK_NAME_1} ContainerHealthTask
${TASK_NAME_2} OmDeltaRequest
${BUCKET} testbucket
${VOLUME} testvolume
${KEYPATH} ${VOLUME}/${BUCKET}/testkey

*** Keywords ***

Kinit as ozone admin
Run Keyword if '${SECURITY_ENABLED}' == 'true' Kinit test user testuser testuser.keytab

Sync OM Data
Log To Console Sending CURL request to ${TRIGGER_SYNC_ENDPOINT}
${result} = Execute curl --negotiate -u : -LSs ${TRIGGER_SYNC_ENDPOINT}
[return] ${result}

Fetch Task Status
Log To Console Sending CURL request to ${TASK_STATUS_ENDPOINT}
${result} = Execute curl -H "Accepts: application/json" --negotiate -u : -LSs ${TASK_STATUS_ENDPOINT}
${parsed_response} = Evaluate json.loads('''${result}''')
${tasks} = Evaluate [task for task in ${parsed_response}]
[return] ${tasks}

*** Test Cases ***

Prepopulate Data and Trigger OM DB Sync
[Documentation] Use Freon to prepopulate the OM DB with data and trigger OM DB sync.
Kinit as ozone admin
Freon DFSG n=100 path=${KEYPATH} size=100

${result} = Sync OM Data
Should contain ${result} true # Sync should return true if successful

Validate Task Status After Sync
[Documentation] Validate that task status is updated after triggering the OM DB sync.
${tasks} = Fetch Task Status
Should Not Be Empty ${tasks}

FOR ${task} IN @{tasks}
Dictionary Should Contain Key ${task} taskName
Dictionary Should Contain Key ${task} lastUpdatedSeqNumber
Dictionary Should Contain Key ${task} lastUpdatedTimestamp
Dictionary Should Contain Key ${task} isCurrentTaskRunning
Dictionary Should Contain Key ${task} lastTaskRunStatus
END

Validate Stats for Specific Task
[Documentation] Validate response for a specific task after OM DB sync.
${tasks} = Fetch Task Status

${task_list} = Evaluate [task for task in ${tasks} if task["taskName"] == "${TASK_NAME_1}"]
${list_length} = Get Length ${task_list}
Should Be Equal As Integers ${list_length} 1

${task} = Get From List ${task_list} 0

# Validate table fields
Should Be True ${task["lastUpdatedTimestamp"]}!=${None}
Should Be True ${task["lastUpdatedSeqNumber"]}!=${None}
Should Be True ${task["isCurrentTaskRunning"]}!=${None}
Should Be True ${task["lastTaskRunStatus"]}!=${None}

Validate All Tasks Updated After Sync
[Documentation] Ensure all tasks have been updated after an OM DB sync operation.
${tasks} = Fetch Task Status
Should Not Be Empty ${tasks}

FOR ${task} IN @{tasks}
Should Be True ${task["lastUpdatedTimestamp"]}!=${None}
Should Be True ${task["lastUpdatedSeqNumber"]}!=${None}
END

Validate Sequence number is updated after sync
Log To Console Triggering OM DB sync for updates
Sync OM Data
${tasks} = Fetch Task Status
Should Not Be Empty ${tasks}

${om_delta_task_list} = Evaluate [task for task in ${tasks} if task["taskName"] == "OmDeltaRequest"]
${list_length} = Get Length ${om_delta_task_list}
Should Be Equal As Integers ${list_length} 1

${om_delta_task} = Get From List ${om_delta_task_list} 0
${om_delta_task_seq_num} = Evaluate int(${om_delta_task["lastUpdatedSeqNumber"]})
${om_task_names} = Evaluate ["NSSummaryTask", "ContainerKeyMapperTask", "FileSizeCountTask", "OmTableInsightTask"]
${om_tasks} = Evaluate [task for task in ${tasks} if task["taskName"] in ${om_task_names}]

FOR ${task} IN @{om_tasks}
IF ${task["isCurrentTaskRunning"]} == 0
Should Be Equal As Integers ${task["lastUpdatedSeqNumber"]} ${om_delta_task_seq_num}
END
END
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,4 @@ private void addPropertiesNotInXml() {
));
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LOOP_LIMIT;
import static org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl.OmSnapshotTaskName.OmDeltaRequest;
import static org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl.OmSnapshotTaskName.OmSnapshotRequest;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.slf4j.event.Level.INFO;
Expand Down Expand Up @@ -255,8 +256,7 @@ public void testOmDBSyncing() throws Exception {
"lastUpdatedTimestamp");

// verify only Delta updates were added to recon after restart.
assertEquals(beforeRestartSnapShotTimeStamp,
afterRestartSnapShotTimeStamp);
assertThat(afterRestartSnapShotTimeStamp).isGreaterThanOrEqualTo(beforeRestartSnapShotTimeStamp);

//verify sequence number after Delta Updates
assertEquals(omLatestSeqNumber, reconLatestSeqNumber);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ private void createReconTaskStatusTable(Connection conn) {
.column("task_name", SQLDataType.VARCHAR(766).nullable(false))
.column("last_updated_timestamp", SQLDataType.BIGINT)
.column("last_updated_seq_number", SQLDataType.BIGINT)
.column("last_task_run_status", SQLDataType.INTEGER)
.column("is_current_task_running", SQLDataType.INTEGER)
.constraint(DSL.constraint("pk_task_name")
.primaryKey("task_name"))
.execute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.hadoop.ozone.recon.tasks.ReconTaskController;
import org.apache.hadoop.ozone.recon.tasks.ReconTaskControllerImpl;
import org.apache.hadoop.ozone.recon.tasks.OmTableInsightTask;
import org.apache.hadoop.ozone.recon.tasks.updater.ReconTaskStatusUpdaterManager;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.ratis.protocol.ClientId;
import org.hadoop.ozone.recon.codegen.ReconSqlDbConfig;
Expand Down Expand Up @@ -110,6 +111,7 @@ protected void configure() {

install(new ReconOmTaskBindingModule());
install(new ReconDaoBindingModule());
bind(ReconTaskStatusUpdaterManager.class).in(Singleton.class);

bind(ReconTaskController.class)
.to(ReconTaskControllerImpl.class).in(Singleton.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,12 @@ public class TaskStatusService {
private ReconTaskStatusDao reconTaskStatusDao;

/**
* Return the list of Recon Tasks and the last successful timestamp and
* sequence number.
* Return the list of Recon Tasks and their related stats from RECON_TASK_STATUS table.
* @return {@link Response}
*/
@GET
@Path("status")
public Response getTaskTimes() {
public Response getTaskStats() {
List<ReconTaskStatus> resultSet = reconTaskStatusDao.findAll();
return Response.ok(resultSet).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,10 @@
import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager;
import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
import org.apache.hadoop.ozone.recon.tasks.ReconTaskConfig;
import org.apache.hadoop.ozone.recon.tasks.updater.ReconTaskStatusUpdater;
import org.apache.hadoop.ozone.recon.tasks.updater.ReconTaskStatusUpdaterManager;
import org.apache.hadoop.util.Time;
import org.hadoop.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates;
import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao;
import org.hadoop.ozone.recon.schema.tables.pojos.UnhealthyContainers;
import org.hadoop.ozone.recon.schema.tables.records.UnhealthyContainersRecord;
import org.jooq.Cursor;
Expand Down Expand Up @@ -86,42 +87,47 @@ public class ContainerHealthTask extends ReconScmTask {

private final OzoneConfiguration conf;

private final ReconTaskStatusUpdater taskStatusUpdater;

@SuppressWarnings("checkstyle:ParameterNumber")
public ContainerHealthTask(
ContainerManager containerManager,
StorageContainerServiceProvider scmClient,
ReconTaskStatusDao reconTaskStatusDao,
ContainerHealthSchemaManager containerHealthSchemaManager,
PlacementPolicy placementPolicy,
ReconTaskConfig reconTaskConfig,
ReconContainerMetadataManager reconContainerMetadataManager,
OzoneConfiguration conf) {
super(reconTaskStatusDao);
OzoneConfiguration conf, ReconTaskStatusUpdaterManager taskStatusUpdaterManager) {
super(taskStatusUpdaterManager);
this.scmClient = scmClient;
this.containerHealthSchemaManager = containerHealthSchemaManager;
this.reconContainerMetadataManager = reconContainerMetadataManager;
this.placementPolicy = placementPolicy;
this.containerManager = containerManager;
this.conf = conf;
interval = reconTaskConfig.getMissingContainerTaskInterval().toMillis();
this.taskStatusUpdater = getTaskStatusUpdater();
}

@Override
public void run() {
try {
while (canRun()) {
triggerContainerHealthCheck();
initializeAndRunTask();
Thread.sleep(interval);
}
} catch (Throwable t) {
LOG.error("Exception in Missing Container task Thread.", t);
if (t instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
taskStatusUpdater.setLastTaskRunStatus(-1);
taskStatusUpdater.recordRunCompletion();
}
}

public void triggerContainerHealthCheck() {
@Override
protected void runTask() throws Exception {
lock.writeLock().lock();
// Map contains all UNHEALTHY STATES as keys and value is another map
// with 3 keys (CONTAINER_COUNT, TOTAL_KEYS, TOTAL_USED_BYTES) and value
Expand All @@ -144,7 +150,11 @@ public void triggerContainerHealthCheck() {
" process {} existing database records.",
Time.monotonicNow() - start, existingCount);

start = Time.monotonicNow();
checkAndProcessContainers(unhealthyContainerStateStatsMap, currentTime);
LOG.debug("Container Health Task thread took {} milliseconds to process containers",
Time.monotonicNow() - start);
taskStatusUpdater.setLastTaskRunStatus(0);
processedContainers.clear();
} finally {
lock.writeLock().unlock();
Expand All @@ -165,7 +175,6 @@ private void checkAndProcessContainers(
.filter(c -> !processedContainers.contains(c))
.forEach(c -> processContainer(c, currentTime,
unhealthyContainerStateStatsMap));
recordSingleRunCompletion();
LOG.debug("Container Health task thread took {} milliseconds for" +
" processing {} containers.", Time.monotonicNow() - start,
containers.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
Expand All @@ -34,8 +33,9 @@
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
import org.apache.hadoop.ozone.recon.tasks.ReconTaskConfig;
import org.apache.hadoop.ozone.recon.tasks.updater.ReconTaskStatusUpdater;
import org.apache.hadoop.ozone.recon.tasks.updater.ReconTaskStatusUpdaterManager;
import org.apache.hadoop.util.Time;
import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -55,36 +55,40 @@ public class PipelineSyncTask extends ReconScmTask {

private ReadWriteLock lock = new ReentrantReadWriteLock(true);
private final long interval;
private final ReconTaskStatusUpdater taskStatusUpdater;

public PipelineSyncTask(ReconPipelineManager pipelineManager,
ReconNodeManager nodeManager,
StorageContainerServiceProvider scmClient,
ReconTaskStatusDao reconTaskStatusDao,
ReconTaskConfig reconTaskConfig) {
super(reconTaskStatusDao);
ReconTaskConfig reconTaskConfig,
ReconTaskStatusUpdaterManager taskStatusUpdaterManager) {
super(taskStatusUpdaterManager);
this.scmClient = scmClient;
this.reconPipelineManager = pipelineManager;
this.nodeManager = nodeManager;
this.interval = reconTaskConfig.getPipelineSyncTaskInterval().toMillis();
this.taskStatusUpdater = getTaskStatusUpdater();
}

@Override
public void run() {
try {
while (canRun()) {
triggerPipelineSyncTask();
initializeAndRunTask();
Thread.sleep(interval);
}
} catch (Throwable t) {
LOG.error("Exception in Pipeline sync Thread.", t);
if (t instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
taskStatusUpdater.setLastTaskRunStatus(-1);
taskStatusUpdater.recordRunCompletion();
}
}

public void triggerPipelineSyncTask()
throws IOException, TimeoutException, NodeNotFoundException {
@Override
protected void runTask() throws IOException, NodeNotFoundException {
lock.writeLock().lock();
try {
long start = Time.monotonicNow();
Expand All @@ -93,7 +97,7 @@ public void triggerPipelineSyncTask()
syncOperationalStateOnDeadNodes();
LOG.debug("Pipeline sync Thread took {} milliseconds.",
Time.monotonicNow() - start);
recordSingleRunCompletion();
taskStatusUpdater.setLastTaskRunStatus(0);
} finally {
lock.writeLock().unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ public void onMessage(final DatanodeDetails datanodeDetails,
LOG.warn("Node {} has reached DEAD state, but SCM does not have " +
"information about it.", datanodeDetails);
}
containerHealthTask.triggerContainerHealthCheck();
pipelineSyncTask.triggerPipelineSyncTask();
containerHealthTask.initializeAndRunTask();
pipelineSyncTask.initializeAndRunTask();
} catch (Exception ioEx) {
LOG.error("Error trying to verify Node operational state from SCM.",
ioEx);
Expand Down
Loading

0 comments on commit f90e625

Please sign in to comment.