Skip to content

Commit

Permalink
generate end pass task
Browse files Browse the repository at this point in the history
  • Loading branch information
zeyu10 committed Oct 17, 2024
1 parent 558d063 commit 0bf14af
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Map;

@Getter
@Setter
@NoArgsConstructor
public class DAG {
private String workspace;
Expand All @@ -48,6 +49,8 @@ public class DAG {
@Setter
private Map<String, List<Mapping>> commonMapping;
private String inputSchema;
private Map<String, Object> output;
private String endTaskName;

@JsonCreator
public DAG(@JsonProperty("workspace") String workspace,
Expand All @@ -62,7 +65,8 @@ public DAG(@JsonProperty("workspace") String workspace,
@JsonProperty("commonMapping") Map<String, List<Mapping>> commonMapping,
@JsonProperty("namespace") String namespace,
@JsonProperty("service") String service,
@JsonProperty("inputSchema") String inputSchema) {
@JsonProperty("inputSchema") String inputSchema,
@JsonProperty("output") Map<String, Object> output) {
this.workspace = StringUtils.isBlank(workspace) ? namespace : workspace;
this.dagName = StringUtils.isBlank(dagName) ? service : dagName;
this.version = version;
Expand All @@ -74,5 +78,6 @@ public DAG(@JsonProperty("workspace") String workspace,
this.defaultContext = defaultContext;
this.commonMapping = commonMapping;
this.inputSchema = inputSchema;
this.output = output;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.weibo.rill.flow.interfaces.model.strategy.Timeline;
import com.weibo.rill.flow.interfaces.model.task.BaseTask;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

import java.util.ArrayList;
Expand All @@ -35,6 +36,7 @@
@Getter
@Setter
@JsonTypeName("pass")
@NoArgsConstructor
public class PassTask extends BaseTask {

@JsonCreator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ protected ExecutionResult doRun(String executionId, TaskInfo taskInfo, Map<Strin
log.info("pass task begin to run executionId:{}, taskInfoName:{}", executionId, taskInfo.getName());
if (CollectionUtils.isNotEmpty(taskInfo.getTask().getOutputMappings())) {
Map<String, Object> context = ContextHelper.getInstance().getContext(dagContextStorage, executionId, taskInfo);
outputMappings(context, new HashMap<>(), new HashMap<>(), taskInfo.getTask().getOutputMappings());
outputMappings(context, input, new HashMap<>(), taskInfo.getTask().getOutputMappings());
saveContext(executionId, context, Sets.newHashSet(taskInfo));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.weibo.rill.flow.service.manager;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableMap;
Expand All @@ -33,6 +32,7 @@
import com.weibo.rill.flow.interfaces.model.resource.BaseResource;
import com.weibo.rill.flow.interfaces.model.task.BaseTask;
import com.weibo.rill.flow.olympicene.core.model.dag.DAG;
import com.weibo.rill.flow.olympicene.core.model.task.PassTask;
import com.weibo.rill.flow.olympicene.core.switcher.SwitcherManager;
import com.weibo.rill.flow.olympicene.ddl.parser.DAGStringParser;
import com.weibo.rill.flow.olympicene.storage.redis.api.RedisClient;
Expand Down Expand Up @@ -229,10 +229,20 @@ private String processInputOutputMappingsWhenGetDescriptor(String descriptor) {
if (!taskExistsInput) {
return descriptor;
}
List<BaseTask> tasks = new ArrayList<>();
for (BaseTask task : taskMap.values()) {
processOutputMappingsWhenGetDescriptor(task);
processInputMappingsWhenGetDescriptor(task);
if (!task.getName().equals(dag.getEndTaskName())) {
String next = task.getNext();
Set<String> nextSet = new LinkedHashSet<>(Arrays.asList(next.split(",")));
nextSet.remove(dag.getEndTaskName());
next = String.join(",", nextSet);
task.setNext(next);
tasks.add(task);
}
}
dag.setTasks(tasks);
return dagParser.serialize(dag);
}

Expand Down Expand Up @@ -541,11 +551,12 @@ public String createDAGDescriptor(String businessId, String featureName, String
return buildDescriptorId(businessId, featureName, MD5_PREFIX + md5);
}

private boolean processInputToGenerateInputMappings(DAG dag) {
private boolean processInputToGenerateInputMappings(DAG dag, Map<String, BaseTask> taskMap) {
if (CollectionUtils.isEmpty(dag.getTasks())) {
return false;
}
boolean taskExistsInput = false;
PassTask endPassTask = generateEndPassTask(dag, taskMap);
boolean taskExistsInput = endPassTask != null;
for (BaseTask task : dag.getTasks()) {
Map<String, Object> taskInput = task.getInput();
if (MapUtils.isEmpty(taskInput)) {
Expand All @@ -572,6 +583,27 @@ private boolean processInputToGenerateInputMappings(DAG dag) {
return taskExistsInput;
}

private PassTask generateEndPassTask(DAG dag, Map<String, BaseTask> taskMap) {
if (MapUtils.isEmpty(dag.getOutput())) {
return null;
}
String endTaskName = DigestUtils.md5Hex(dag.getWorkspace() + "_" + dag.getDagName());
dag.setEndTaskName(endTaskName);
PassTask endPassTask = new PassTask();
endPassTask.setName(endTaskName);
endPassTask.setInput(dag.getOutput());
endPassTask.setCategory("pass");
List<Mapping> outputMappings = Lists.newArrayList();
for (String key : dag.getOutput().keySet()) {
outputMappings.add(new Mapping("$.input." + key, "$.context." + key));
}
endPassTask.setOutputMappings(outputMappings);

taskMap.put(endTaskName, endPassTask);
dag.getTasks().add(endPassTask);
return endPassTask;
}

private boolean containsEmpty(String... member) {
return member == null || Arrays.stream(member).anyMatch(StringUtils::isEmpty);
}
Expand Down Expand Up @@ -620,34 +652,42 @@ private String buildDescriptorId(String businessId, String featureName, String t
* @param dag DAG对象
*/
private void generateOutputMappings(DAG dag) {
boolean taskExistsInput = processInputToGenerateInputMappings(dag);
Map<String, BaseTask> taskMap = getTaskMapByDag(dag);
boolean taskExistsInput = processInputToGenerateInputMappings(dag, taskMap);
if (!taskExistsInput) {
return;
}
Map<String, List<List<String>>> taskPathsMap = new HashMap<>();
Map<String, BaseTask> taskMap = getTaskMapByDag(dag);

for (BaseTask task: dag.getTasks()) {
List<Mapping> inputMappings = task.getInputMappings();
for (Mapping inputMapping : inputMappings) {
// 获取源路径元素,如 source: $.taskA.output.key 转换为 elements: [$, taskA, output, key]
String[] elements = getSourcePathElementsByMapping(inputMapping);
if (elements.length < 2) {
continue;
}
String taskName = elements[1];
if (taskMap.containsKey(taskName)) {
// 更新输入映射的源
String outputTaskName = elements[1];
if (taskMap.containsKey(outputTaskName)) {
inputMapping.setSource("$.context" + inputMapping.getSource().substring(1));

// 将路径添加到任务路径映射中
taskPathsMap.computeIfAbsent(taskName, k -> new ArrayList<>())
taskPathsMap.computeIfAbsent(outputTaskName, k -> new ArrayList<>())
.add(Arrays.asList(elements).subList(2, elements.length));

if (task.getName().equalsIgnoreCase(dag.getEndTaskName())) {
BaseTask outputTask = taskMap.get(outputTaskName);
String next = outputTask.getNext();
if (next == null) {
next = outputTaskName;
} else {
Set<String> nextSet = new LinkedHashSet<>(Arrays.asList(next.split(",")));
nextSet.add(dag.getEndTaskName());
next = String.join(",", nextSet);
}
outputTask.setNext(next);
}
}
}
}

// 获取输出映射并生成到任务中

LinkedHashMultimap<String, String> outputMappingsMultimap = getOutputMappingsByPaths(taskPathsMap);
generateOutputMappingsIntoTasks(outputMappingsMultimap, taskMap);
}
Expand Down

0 comments on commit 0bf14af

Please sign in to comment.