Skip to content

Commit

Permalink
[feature](Nereids) Add minidump replay and refactor user feature of m…
Browse files Browse the repository at this point in the history
…inidump (apache#20716)

### Two main changes:
- 1、add minidump replay
- 2、change minidump serialization of statistic messages and some interface between main logic of nereids optimizer and minidump

### Use of nereids ut:
- 1、save minidump files:  
        Execute command by mysql-client:
```
set enable_nereids_planner=true;
set enable_minidump=true;
```
        Execute sql in mysql-client
- 2、use nereids-ut script to execute directory:
```
cp -r ${DORIS_HOME}/minidump ${DORIS_HOME}/output/fe && cd ${DORIS_HOME}/output/fe
./nereids_ut --d ${directory_of_minidump_files}
```

### Refactor of minidump
- move statistics used serialization to serialization of input and serialize with catalogs
- generating minidump file only when enable_minidump flag is set, minidump module interactive with main optimizer only by :
serializeInputsToDumpFile(catalog, statistics, query) && serializeOutputsToDumpFile(outputplan).
  • Loading branch information
LiBinfeng-01 authored Jul 25, 2023
1 parent fc2b9db commit f84af95
Show file tree
Hide file tree
Showing 23 changed files with 1,416 additions and 263 deletions.
1 change: 1 addition & 0 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,7 @@ if [[ "${BUILD_FE}" -eq 1 ]]; then
cp -r -p "${DORIS_HOME}/fe/fe-core/target/lib"/* "${DORIS_OUTPUT}/fe/lib"/
cp -r -p "${DORIS_HOME}/fe/fe-core/target/doris-fe.jar" "${DORIS_OUTPUT}/fe/lib"/
cp -r -p "${DORIS_HOME}/docs/build/help-resource.zip" "${DORIS_OUTPUT}/fe/lib"/
cp -r -p "${DORIS_HOME}/minidump" "${DORIS_OUTPUT}/fe"/
cp -r -p "${DORIS_HOME}/webroot/static" "${DORIS_OUTPUT}/fe/webroot"/

cp -r -p "${DORIS_THIRDPARTY}/installed/webroot"/* "${DORIS_OUTPUT}/fe/webroot/static"/
Expand Down
16 changes: 16 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -114,32 +115,45 @@ public enum OlapTableState {
WAITING_STABLE
}

@SerializedName(value = "state")
private volatile OlapTableState state;

// index id -> index meta
@SerializedName("indexIdToMeta")
private Map<Long, MaterializedIndexMeta> indexIdToMeta = Maps.newHashMap();
// index name -> index id
@SerializedName("indexNameToId")
private Map<String, Long> indexNameToId = Maps.newHashMap();

@SerializedName("keysType")
private KeysType keysType;
@SerializedName("partitionInfo")
private PartitionInfo partitionInfo;
@SerializedName("idToPartition")
private Map<Long, Partition> idToPartition = new HashMap<>();
private Map<String, Partition> nameToPartition = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);

@SerializedName(value = "distributionInfo")
private DistributionInfo defaultDistributionInfo;

// all info about temporary partitions are save in "tempPartitions"
@SerializedName(value = "tempPartitions")
private TempPartitions tempPartitions = new TempPartitions();

// bloom filter columns
@SerializedName(value = "bfColumns")

private Set<String> bfColumns;
@SerializedName(value = "bfFpp")
private double bfFpp;
@SerializedName(value = "colocateGroup")

private String colocateGroup;

private boolean hasSequenceCol;
private Type sequenceType;

@SerializedName(value = "indexes")
private TableIndexes indexes;

// In former implementation, base index id is same as table id.
Expand All @@ -148,8 +162,10 @@ public enum OlapTableState {
// So we add this 'baseIndexId' to explicitly specify the base index id,
// which should be different with table id.
// The init value is -1, which means there is not partition and index at all.
@SerializedName(value = "baseIndexId")
private long baseIndexId = -1;

@SerializedName(value = "tableProperty")
private TableProperty tableProperty;

private AutoIncrementGenerator autoIncrementGenerator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -53,6 +54,7 @@
public class PartitionInfo implements Writable {
private static final Logger LOG = LogManager.getLogger(PartitionInfo.class);

@SerializedName("Type")
protected PartitionType type;
// partition columns for list and range partitions
protected List<Column> partitionColumns = Lists.newArrayList();
Expand All @@ -61,14 +63,17 @@ public class PartitionInfo implements Writable {
// temp partition id -> partition item
protected Map<Long, PartitionItem> idToTempItem = Maps.newHashMap();
// partition id -> data property
@SerializedName("IdToDataProperty")
protected Map<Long, DataProperty> idToDataProperty;
// partition id -> storage policy
protected Map<Long, String> idToStoragePolicy;
// partition id -> replication allocation
@SerializedName("IdToReplicaAllocation")
protected Map<Long, ReplicaAllocation> idToReplicaAllocation;
// true if the partition has multi partition columns
protected boolean isMultiColumnPartition = false;

@SerializedName("IdToInMemory")
protected Map<Long, Boolean> idToInMemory;

// partition id -> tablet type
Expand Down
16 changes: 16 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;

import java.io.DataInput;
import java.io.DataOutput;
Expand All @@ -66,10 +68,14 @@ public abstract class Table extends MetaObject implements Writable, TableIf {
public volatile boolean isDropped = false;

private boolean hasCompoundKey = false;
@SerializedName(value = "id")
protected long id;
@SerializedName(value = "name")
protected volatile String name;
protected volatile String qualifiedDbName;
@SerializedName(value = "type")
protected TableType type;
@SerializedName(value = "createTime")
protected long createTime;
protected QueryableReentrantReadWriteLock rwLock;

Expand All @@ -93,6 +99,7 @@ public abstract class Table extends MetaObject implements Writable, TableIf {
* <p>
* If you want to get the mv columns, you should call getIndexToSchema in Subclass OlapTable.
*/
@SerializedName(value = "fullSchema")
protected List<Column> fullSchema;
// tree map for case-insensitive lookup.
/**
Expand All @@ -103,6 +110,7 @@ public abstract class Table extends MetaObject implements Writable, TableIf {
// DO NOT persist this variable.
protected boolean isTypeRead = false;
// table(view)'s comment
@SerializedName(value = "comment")
protected String comment = "";
// sql for creating this table, default is "";
protected String ddlSql = "";
Expand Down Expand Up @@ -491,6 +499,14 @@ public String toString() {
return "Table [id=" + id + ", name=" + name + ", type=" + type + "]";
}

public JSONObject toSimpleJson() {
JSONObject table = new JSONObject();
table.put("Type", type.toEngineName());
table.put("Id", Long.toString(id));
table.put("Name", name);
return table;
}

/*
* 1. Only schedule OLAP table.
* 2. If table is colocate with other table, not schedule it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.nereids.analyzer.Scope;
import org.apache.doris.nereids.analyzer.UnboundRelation;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.jobs.Job;
import org.apache.doris.nereids.jobs.JobContext;
import org.apache.doris.nereids.jobs.executor.Analyzer;
Expand Down Expand Up @@ -341,13 +342,16 @@ public void extractTables(LogicalPlan logicalPlan) {
}

/** get table by table name, try to get from information from dumpfile first */
public TableIf getTableByName(String tableName) {
public TableIf getTableInMinidumpCache(String tableName) {
Preconditions.checkState(tables != null);
for (TableIf table : tables) {
if (table.getName().equals(tableName)) {
return table;
}
}
if (ConnectContext.get().getSessionVariable().isPlayNereidsDump()) {
throw new AnalysisException("Minidump cache can not find table:" + tableName);
}
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@
import org.apache.doris.analysis.DescriptorTable;
import org.apache.doris.analysis.ExplainOptions;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.NereidsException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.nereids.CascadesContext.Lock;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
Expand All @@ -41,7 +39,6 @@
import org.apache.doris.nereids.processor.pre.PlanPreprocessors;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.plans.AbstractPlan;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
Expand All @@ -50,7 +47,6 @@
import org.apache.doris.planner.Planner;
import org.apache.doris.planner.RuntimeFilter;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.ConnectContext;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
Expand All @@ -59,10 +55,7 @@
import io.opentelemetry.context.Scope;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONArray;
import org.json.JSONObject;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -193,15 +186,10 @@ public Plan plan(LogicalPlan plan, PhysicalProperties requireProperties, Explain
}

// minidump of input must be serialized first, this process ensure minidump string not null
if (!statementContext.getConnectContext().getSessionVariable().isPlayNereidsDump()
&& statementContext.getConnectContext().getSessionVariable().isEnableMinidump()) {
MinidumpUtils.init();
String queryId = DebugUtil.printId(statementContext.getConnectContext().queryId());
try {
statementContext.getConnectContext().setMinidump(serializeInputsToDumpFile(plan, queryId));
} catch (IOException e) {
throw new RuntimeException(e);
}
try {
MinidumpUtils.serializeInputsToDumpFile(plan, cascadesContext.getTables());
} catch (IOException e) {
throw new RuntimeException(e);
}

if (statementContext.getConnectContext().getExecutor() != null) {
Expand Down Expand Up @@ -251,11 +239,7 @@ public Plan plan(LogicalPlan plan, PhysicalProperties requireProperties, Explain
optimizedPlan = physicalPlan;
}
// serialize optimized plan to dumpfile, dumpfile do not have this part means optimize failed
serializeOutputToDumpFile(physicalPlan, statementContext.getConnectContext());
if (statementContext.getConnectContext().getSessionVariable().isEnableMinidump()) {
MinidumpUtils.saveMinidumpString(statementContext.getConnectContext().getMinidump(),
DebugUtil.printId(statementContext.getConnectContext().queryId()));
}
MinidumpUtils.serializeOutputToDumpFile(physicalPlan);
NereidsTracer.output(statementContext.getConnectContext());
timeoutExecutor.ifPresent(ExecutorService::shutdown);

Expand Down Expand Up @@ -297,42 +281,6 @@ private PhysicalPlan postProcess(PhysicalPlan physicalPlan) {
return new PlanPostProcessors(cascadesContext).process(physicalPlan);
}

private JSONObject serializeInputsToDumpFile(Plan parsedPlan, String dumpName) throws IOException {
String dumpPath = MinidumpUtils.DUMP_PATH + "/" + dumpName;
File minidumpFileDir = new File(dumpPath);
if (!minidumpFileDir.exists()) {
minidumpFileDir.mkdirs();
}
// Create a JSON object
JSONObject jsonObj = new JSONObject();
jsonObj.put("Sql", statementContext.getOriginStatement().originStmt);
// add session variable
jsonObj.put("SessionVariable", cascadesContext.getConnectContext().getSessionVariable().toJson());
// add tables
String dbAndCatalogName = "/" + cascadesContext.getConnectContext().getDatabase() + "-"
+ cascadesContext.getConnectContext().getCurrentCatalog().getName() + "-";
jsonObj.put("CatalogName", cascadesContext.getConnectContext().getCurrentCatalog().getName());
jsonObj.put("DbName", cascadesContext.getConnectContext().getDatabase());
JSONArray tablesJson = MinidumpUtils.serializeTables(dumpPath, dbAndCatalogName, cascadesContext.getTables());
jsonObj.put("Tables", tablesJson);
// add colocate table index, used to indicate grouping of table distribution
String colocateTableIndexPath = dumpPath + "/ColocateTableIndex";
MinidumpUtils.serializeColocateTableIndex(colocateTableIndexPath, Env.getCurrentColocateIndex());
jsonObj.put("ColocateTableIndex", "/ColocateTableIndex");
// add original sql, parsed plan and optimized plan
jsonObj.put("ParsedPlan", ((AbstractPlan) parsedPlan).toJson());
// Write the JSON object to a string and put it into file
return jsonObj;
}

private void serializeOutputToDumpFile(Plan resultPlan, ConnectContext connectContext) {
if (connectContext.getSessionVariable().isPlayNereidsDump()
|| !statementContext.getConnectContext().getSessionVariable().isEnableMinidump()) {
return;
}
connectContext.getMinidump().put("ResultPlan", ((AbstractPlan) resultPlan).toJson());
}

@Override
public List<ScanNode> getScanNodes() {
return scanNodeList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,15 @@ class CostModelV1 extends PlanVisitor<Cost, PlanContext> {
// the penalty factor is no more than BROADCAST_JOIN_SKEW_PENALTY_LIMIT
static final double BROADCAST_JOIN_SKEW_RATIO = 30.0;
static final double BROADCAST_JOIN_SKEW_PENALTY_LIMIT = 2.0;
private int beNumber = Math.max(1, ConnectContext.get().getEnv().getClusterInfo().getBackendsNumber(true));
private int beNumber = 1;

public CostModelV1() {
if (ConnectContext.get().getSessionVariable().isPlayNereidsDump()) {
beNumber = ConnectContext.get().getSessionVariable().getBeNumber();
} else {
beNumber = Math.max(1, ConnectContext.get().getEnv().getClusterInfo().getBackendsNumber(true));
}
}

public static Cost addChildCost(Plan plan, Cost planCost, Cost childCost, int index) {
Preconditions.checkArgument(childCost instanceof CostV1 && planCost instanceof CostV1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,8 @@
import org.apache.doris.nereids.jobs.cascades.OptimizeGroupJob;
import org.apache.doris.nereids.jobs.joinorder.JoinOrderJob;
import org.apache.doris.nereids.memo.Group;
import org.apache.doris.nereids.minidump.MinidumpUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;

import org.json.JSONArray;
import org.json.JSONObject;

import java.util.Objects;

/**
Expand All @@ -54,7 +49,6 @@ public void execute() {
cascadesContext.pushJob(new DeriveStatsJob(cascadesContext.getMemo().getRoot().getLogicalExpression(),
cascadesContext.getCurrentJobContext()));
cascadesContext.getJobScheduler().executeJobPool(cascadesContext);
serializeStatUsed(cascadesContext.getConnectContext());
// DPHyp optimize
int maxJoinCount = cascadesContext.getMemo().countMaxContinuousJoin();
cascadesContext.getStatementContext().setMaxContinuousJoin(maxJoinCount);
Expand Down Expand Up @@ -83,21 +77,6 @@ private void dpHypOptimize() {
cascadesContext.getStatementContext().setOtherJoinReorder(true);
}

private void serializeStatUsed(ConnectContext connectContext) {
if (connectContext.getSessionVariable().isPlayNereidsDump()
|| !connectContext.getSessionVariable().isEnableMinidump()) {
return;
}
JSONObject jsonObj = connectContext.getMinidump();
// add column statistics
JSONArray columnStatistics = MinidumpUtils.serializeColumnStatistic(
cascadesContext.getConnectContext().getTotalColumnStatisticMap());
jsonObj.put("ColumnStatistics", columnStatistics);
JSONArray histogramArray = MinidumpUtils.serializeHistogram(
cascadesContext.getConnectContext().getTotalHistogramMap());
jsonObj.put("Histogram", histogramArray);
}

private SessionVariable getSessionVariable() {
return cascadesContext.getConnectContext().getSessionVariable();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,5 @@ public void execute(JobContext jobContext) {
public boolean isOnce() {
return true;
}

}
Loading

0 comments on commit f84af95

Please sign in to comment.