Skip to content

Commit

Permalink
[fix](agenttask) failed tasks and remove them from queue if an error
Browse files Browse the repository at this point in the history
happens
  • Loading branch information
dataroaring committed Oct 22, 2024
1 parent df4c7f2 commit 0a23f8a
Showing 1 changed file with 10 additions and 6 deletions.
16 changes: 10 additions & 6 deletions fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,13 @@ public void run() {
BackendService.Client client = null;
TNetworkAddress address = null;
boolean ok = false;
Backend backend = Env.getCurrentSystemInfo().getBackend(backendId);
if (backend == null || !backend.isAlive()) {
continue;
}
List<AgentTask> tasks = this.backendIdToTasks.get(backendId).subList(0, Config.schedule_batch_size);

try {
Backend backend = Env.getCurrentSystemInfo().getBackend(backendId);
if (backend == null || !backend.isAlive()) {
continue;
}
List<AgentTask> tasks = this.backendIdToTasks.get(backendId);
// create AgentClient
String host = FeConstants.runningUnitTest ? "127.0.0.1" : backend.getHost();
address = new TNetworkAddress(host, backend.getBePort());
Expand All @@ -172,7 +173,6 @@ public void run() {
try {
agentTaskRequests.add(toAgentTaskRequest(task));
} catch (Exception e) {
task.failed();
throw e;
}
}
Expand All @@ -187,6 +187,10 @@ public void run() {
}
ok = true;
} catch (Exception e) {
for (AgentTask task : tasks) {
task.failed();
AgentTaskQueue.remove(task, );
}
LOG.warn("task exec error. backend[{}]", backendId, e);
} finally {
if (ok) {
Expand Down

0 comments on commit 0a23f8a

Please sign in to comment.