Skip to content

Commit

Permalink
[optimize](cache) Refine cache expiration granularity, from table to …
Browse files Browse the repository at this point in the history
…partition granularity
  • Loading branch information
sunjiangwei committed Jan 16, 2025
1 parent fc1f683 commit 3c162ff
Show file tree
Hide file tree
Showing 10 changed files with 69 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.Serializable;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.InetAddress;
Expand All @@ -50,7 +51,7 @@
* Mostly contains static type instances and helper methods for convenience, as well
* as abstract methods that subclasses must implement.
*/
public abstract class Type {
public abstract class Type implements Serializable {
// Currently only support Array type with max 9 depths.
public static int MAX_NESTING_DEPTH = 9;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;

/**
* This def used for column which defaultValue is an expression.
*/
public class DefaultValueExprDef implements Writable, GsonPostProcessable {
public class DefaultValueExprDef implements Writable, GsonPostProcessable, Serializable {
private static final Logger LOG = LogManager.getLogger(DefaultValueExprDef.class);
@SerializedName("exprName")
private String exprName;
Expand Down
3 changes: 2 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -82,7 +83,7 @@
/**
* Root of the expr node hierarchy.
*/
public abstract class Expr extends TreeNode<Expr> implements ParseNode, Cloneable, ExprStats {
public abstract class Expr extends TreeNode<Expr> implements ParseNode, Cloneable, ExprStats, Serializable {

// Name of the function that needs to be implemented by every Expr that
// supports negation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@

import com.google.common.collect.Lists;

import java.io.Serializable;
import java.util.EnumMap;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public enum AggregateType {
public enum AggregateType implements Serializable {
SUM("SUM"),
MIN("MIN"),
MAX("MAX"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@

import java.io.DataInput;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
Expand All @@ -59,7 +60,7 @@
/**
* This class represents the column-related metadata.
*/
public class Column implements GsonPostProcessable {
public class Column implements GsonPostProcessable, Serializable {
private static final Logger LOG = LogManager.getLogger(Column.class);
// NOTE: you should name hidden column start with '__DORIS_' !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
public static final String DELETE_SIGN = "__DORIS_DELETE_SIGN__";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.Serializable;
import java.util.Objects;

/**
* Statistics for a single column.
*/
public class ColumnStats {
public class ColumnStats implements Serializable {
private static final Logger LOG = LogManager.getLogger(ColumnStats.class);

@SerializedName(value = "avgSerializedSize")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import com.google.gson.annotations.SerializedName;
import jline.internal.Nullable;

import java.io.Serializable;

/**GeneratedColumnInfo*/
public class GeneratedColumnInfo {
public class GeneratedColumnInfo implements Serializable {
/**GeneratedColumnType*/
public enum GeneratedColumnType {
VIRTUAL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.common.cache;

import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
Expand Down Expand Up @@ -74,10 +75,13 @@
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* NereidsSqlCacheManager
Expand Down Expand Up @@ -332,14 +336,6 @@ private boolean tablesOrDataChanged(Env env, SqlCacheContext sqlCacheContext) {
if (!(tableIf instanceof OlapTable) || tableVersion.id != tableIf.getId()) {
return true;
}

OlapTable olapTable = (OlapTable) tableIf;
long currentTableVersion = olapTable.getVisibleVersion();
long cacheTableVersion = tableVersion.version;
// some partitions have been dropped, or delete or updated or replaced, or insert rows into new partition?
if (currentTableVersion != cacheTableVersion) {
return true;
}
}

// check partition version
Expand All @@ -350,13 +346,41 @@ private boolean tablesOrDataChanged(Env env, SqlCacheContext sqlCacheContext) {
return true;
}
OlapTable olapTable = (OlapTable) tableIf;
Collection<Long> partitionIds = scanTable.getScanPartitions();
olapTable.getVersionInBatchForCloudMode(partitionIds);

for (Long scanPartitionId : scanTable.getScanPartitions()) {
Partition partition = olapTable.getPartition(scanPartitionId);
List<Column> currentFullSchema = olapTable.getFullSchema();
List<Column> cacheFullSchema = scanTable.latestColumns;
if (currentFullSchema.size() != cacheFullSchema.size()) {
return true;
}
for (int i = 0; i < currentFullSchema.size(); i++) {
Column currentColumn = currentFullSchema.get(i);
Column cacheColumn = cacheFullSchema.get(i);
if ((currentColumn == null || cacheColumn == null)
|| (currentColumn.hashCode() != cacheColumn.hashCode())) {
return true;
}
if (!Objects.equals(currentColumn.getName(), cacheColumn.getName())) {
return true;
}
if (!Objects.equals(currentColumn.getType(), cacheColumn.getType())) {
return true;
}
if (!Objects.equals(currentColumn.getDefaultValue(), cacheColumn.getDefaultValue())) {
return true;
}
if (currentColumn.getType().getLength() != cacheColumn.getType().getLength()) {
return true;
}
}
Map<Long, Partition> partitionMap = olapTable.getAllPartitions().stream()
.collect(Collectors.toMap(Partition::getId, Function.identity()));
Map<Long, Long> scanPartitionIdToVersion = scanTable.scanPartitionIdToVersion;
for (Entry<Long, Long> entry : scanPartitionIdToVersion.entrySet()) {
Long scanPartitionId = entry.getKey();
Long cachePartitionVersion = entry.getValue();
Partition currentPartition = partitionMap.get(scanPartitionId);
// partition == null: is this partition truncated?
if (partition == null) {
if (currentPartition == null
|| currentPartition.getVisibleVersion() > cachePartitionVersion) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.SerializationUtils;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.mysql.FieldInfo;
import org.apache.doris.mysql.privilege.DataMaskPolicy;
Expand Down Expand Up @@ -462,9 +464,20 @@ public static class ScanTable {
public final FullTableName fullTableName;
public final long latestVersion;
public final List<Long> scanPartitions = Lists.newArrayList();
public final List<Column> latestColumns = Lists.newArrayList();
public final Map<Long, Long> scanPartitionIdToVersion = Maps.newHashMap();

public void addScanPartition(Long partitionId) {

public void addScanPartition(Long partitionId, Long partitionVersion) {
this.scanPartitions.add(partitionId);
this.scanPartitionIdToVersion.put(partitionId, partitionVersion);
}

public void setLatestColumns(List<Column> columns) {
for (Column column : columns) {
Column cloneColumn = SerializationUtils.clone(column);
this.latestColumns.add(cloneColumn);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -699,14 +699,15 @@ private CacheTable buildCacheTableForOlapScanNode(OlapScanNode node) {
ScanTable scanTable = new ScanTable(
new FullTableName(catalog.getName(), database.getFullName(), olapTable.getName()),
olapTable.getVisibleVersion());
scanTable.setLatestColumns(olapTable.getFullSchema());
scanTables.add(scanTable);

Collection<Long> partitionIds = node.getSelectedPartitionIds();
olapTable.getVersionInBatchForCloudMode(partitionIds);

for (Long partitionId : node.getSelectedPartitionIds()) {
Partition partition = olapTable.getPartition(partitionId);
scanTable.addScanPartition(partitionId);
scanTable.addScanPartition(partitionId, partition.getVisibleVersion(true));
if (partition.getVisibleVersionTime() >= cacheTable.latestPartitionTime) {
cacheTable.latestPartitionId = partition.getId();
cacheTable.latestPartitionTime = partition.getVisibleVersionTime();
Expand Down

0 comments on commit 3c162ff

Please sign in to comment.