From 3c162ff4b0c2e16ab053af05a21fc0109134f530 Mon Sep 17 00:00:00 2001 From: sunjiangwei Date: Wed, 15 Jan 2025 19:34:55 +0800 Subject: [PATCH] [optimize](cache) Refine cache expiration granularity, from table to partition granularity --- .../java/org/apache/doris/catalog/Type.java | 3 +- .../doris/analysis/DefaultValueExprDef.java | 3 +- .../java/org/apache/doris/analysis/Expr.java | 3 +- .../apache/doris/catalog/AggregateType.java | 3 +- .../java/org/apache/doris/catalog/Column.java | 3 +- .../org/apache/doris/catalog/ColumnStats.java | 3 +- .../doris/catalog/GeneratedColumnInfo.java | 4 +- .../common/cache/NereidsSqlCacheManager.java | 52 ++++++++++++++----- .../apache/doris/nereids/SqlCacheContext.java | 15 +++++- .../apache/doris/qe/cache/CacheAnalyzer.java | 3 +- 10 files changed, 69 insertions(+), 23 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/catalog/Type.java b/fe/fe-common/src/main/java/org/apache/doris/catalog/Type.java index d3218203bf6462a..429f0801850f6b7 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/catalog/Type.java +++ b/fe/fe-common/src/main/java/org/apache/doris/catalog/Type.java @@ -34,6 +34,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.Serializable; import java.math.BigDecimal; import java.math.BigInteger; import java.net.InetAddress; @@ -50,7 +51,7 @@ * Mostly contains static type instances and helper methods for convenience, as well * as abstract methods that subclasses must implement. */ -public abstract class Type { +public abstract class Type implements Serializable { // Currently only support Array type with max 9 depths. public static int MAX_NESTING_DEPTH = 9; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DefaultValueExprDef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DefaultValueExprDef.java index 44bc766ee47b5e7..3eb2928651cf746 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DefaultValueExprDef.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DefaultValueExprDef.java @@ -36,12 +36,13 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.io.Serializable; import java.util.List; /** * This def used for column which defaultValue is an expression. */ -public class DefaultValueExprDef implements Writable, GsonPostProcessable { +public class DefaultValueExprDef implements Writable, GsonPostProcessable, Serializable { private static final Logger LOG = LogManager.getLogger(DefaultValueExprDef.class); @SerializedName("exprName") private String exprName; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java index 0b5e68e1f30346f..78b3003fe2a7daf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java @@ -67,6 +67,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -82,7 +83,7 @@ /** * Root of the expr node hierarchy. */ -public abstract class Expr extends TreeNode implements ParseNode, Cloneable, ExprStats { +public abstract class Expr extends TreeNode implements ParseNode, Cloneable, ExprStats, Serializable { // Name of the function that needs to be implemented by every Expr that // supports negation. diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/AggregateType.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/AggregateType.java index 94a788f0b9afe0f..d166c645090b652 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/AggregateType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/AggregateType.java @@ -21,13 +21,14 @@ import com.google.common.collect.Lists; +import java.io.Serializable; import java.util.EnumMap; import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; -public enum AggregateType { +public enum AggregateType implements Serializable { SUM("SUM"), MIN("MIN"), MAX("MAX"), diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java index fa7c7feb7f06700..eb5a92bb0505dfa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java @@ -49,6 +49,7 @@ import java.io.DataInput; import java.io.IOException; +import java.io.Serializable; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -59,7 +60,7 @@ /** * This class represents the column-related metadata. */ -public class Column implements GsonPostProcessable { +public class Column implements GsonPostProcessable, Serializable { private static final Logger LOG = LogManager.getLogger(Column.class); // NOTE: you should name hidden column start with '__DORIS_' !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! public static final String DELETE_SIGN = "__DORIS_DELETE_SIGN__"; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColumnStats.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColumnStats.java index 198ca02f51b5c83..f288662b82e43b3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColumnStats.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColumnStats.java @@ -26,12 +26,13 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.Serializable; import java.util.Objects; /** * Statistics for a single column. */ -public class ColumnStats { +public class ColumnStats implements Serializable { private static final Logger LOG = LogManager.getLogger(ColumnStats.class); @SerializedName(value = "avgSerializedSize") diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/GeneratedColumnInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/GeneratedColumnInfo.java index b14343a5c9f5f89..cf80ab7a3a8dbd8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/GeneratedColumnInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/GeneratedColumnInfo.java @@ -22,8 +22,10 @@ import com.google.gson.annotations.SerializedName; import jline.internal.Nullable; +import java.io.Serializable; + /**GeneratedColumnInfo*/ -public class GeneratedColumnInfo { +public class GeneratedColumnInfo implements Serializable { /**GeneratedColumnType*/ public enum GeneratedColumnType { VIRTUAL, diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/cache/NereidsSqlCacheManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/cache/NereidsSqlCacheManager.java index bf82795450048a6..dd1d018702b6ce7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/cache/NereidsSqlCacheManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/cache/NereidsSqlCacheManager.java @@ -18,6 +18,7 @@ package org.apache.doris.common.cache; import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.catalog.Column; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; @@ -74,10 +75,13 @@ import java.time.Duration; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; /** * NereidsSqlCacheManager @@ -332,14 +336,6 @@ private boolean tablesOrDataChanged(Env env, SqlCacheContext sqlCacheContext) { if (!(tableIf instanceof OlapTable) || tableVersion.id != tableIf.getId()) { return true; } - - OlapTable olapTable = (OlapTable) tableIf; - long currentTableVersion = olapTable.getVisibleVersion(); - long cacheTableVersion = tableVersion.version; - // some partitions have been dropped, or delete or updated or replaced, or insert rows into new partition? - if (currentTableVersion != cacheTableVersion) { - return true; - } } // check partition version @@ -350,13 +346,41 @@ private boolean tablesOrDataChanged(Env env, SqlCacheContext sqlCacheContext) { return true; } OlapTable olapTable = (OlapTable) tableIf; - Collection partitionIds = scanTable.getScanPartitions(); - olapTable.getVersionInBatchForCloudMode(partitionIds); - - for (Long scanPartitionId : scanTable.getScanPartitions()) { - Partition partition = olapTable.getPartition(scanPartitionId); + List currentFullSchema = olapTable.getFullSchema(); + List cacheFullSchema = scanTable.latestColumns; + if (currentFullSchema.size() != cacheFullSchema.size()) { + return true; + } + for (int i = 0; i < currentFullSchema.size(); i++) { + Column currentColumn = currentFullSchema.get(i); + Column cacheColumn = cacheFullSchema.get(i); + if ((currentColumn == null || cacheColumn == null) + || (currentColumn.hashCode() != cacheColumn.hashCode())) { + return true; + } + if (!Objects.equals(currentColumn.getName(), cacheColumn.getName())) { + return true; + } + if (!Objects.equals(currentColumn.getType(), cacheColumn.getType())) { + return true; + } + if (!Objects.equals(currentColumn.getDefaultValue(), cacheColumn.getDefaultValue())) { + return true; + } + if (currentColumn.getType().getLength() != cacheColumn.getType().getLength()) { + return true; + } + } + Map partitionMap = olapTable.getAllPartitions().stream() + .collect(Collectors.toMap(Partition::getId, Function.identity())); + Map scanPartitionIdToVersion = scanTable.scanPartitionIdToVersion; + for (Entry entry : scanPartitionIdToVersion.entrySet()) { + Long scanPartitionId = entry.getKey(); + Long cachePartitionVersion = entry.getValue(); + Partition currentPartition = partitionMap.get(scanPartitionId); // partition == null: is this partition truncated? - if (partition == null) { + if (currentPartition == null + || currentPartition.getVisibleVersion() > cachePartitionVersion) { return true; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java index 2278436888b2f67..0f4f793bc43ffbb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java @@ -19,11 +19,13 @@ import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.catalog.Column; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.common.Pair; +import org.apache.doris.common.util.SerializationUtils; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.mysql.FieldInfo; import org.apache.doris.mysql.privilege.DataMaskPolicy; @@ -462,9 +464,20 @@ public static class ScanTable { public final FullTableName fullTableName; public final long latestVersion; public final List scanPartitions = Lists.newArrayList(); + public final List latestColumns = Lists.newArrayList(); + public final Map scanPartitionIdToVersion = Maps.newHashMap(); - public void addScanPartition(Long partitionId) { + + public void addScanPartition(Long partitionId, Long partitionVersion) { this.scanPartitions.add(partitionId); + this.scanPartitionIdToVersion.put(partitionId, partitionVersion); + } + + public void setLatestColumns(List columns) { + for (Column column : columns) { + Column cloneColumn = SerializationUtils.clone(column); + this.latestColumns.add(cloneColumn); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java index a6fd3bc8fcbfe6e..a57719672e38fd2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java @@ -699,6 +699,7 @@ private CacheTable buildCacheTableForOlapScanNode(OlapScanNode node) { ScanTable scanTable = new ScanTable( new FullTableName(catalog.getName(), database.getFullName(), olapTable.getName()), olapTable.getVisibleVersion()); + scanTable.setLatestColumns(olapTable.getFullSchema()); scanTables.add(scanTable); Collection partitionIds = node.getSelectedPartitionIds(); @@ -706,7 +707,7 @@ private CacheTable buildCacheTableForOlapScanNode(OlapScanNode node) { for (Long partitionId : node.getSelectedPartitionIds()) { Partition partition = olapTable.getPartition(partitionId); - scanTable.addScanPartition(partitionId); + scanTable.addScanPartition(partitionId, partition.getVisibleVersion(true)); if (partition.getVisibleVersionTime() >= cacheTable.latestPartitionTime) { cacheTable.latestPartitionId = partition.getId(); cacheTable.latestPartitionTime = partition.getVisibleVersionTime();