Skip to content

Commit

Permalink
[BugFix] Fix dynamic partition table unexpectly stop scheduling (#45235)
Browse files Browse the repository at this point in the history
Signed-off-by: Dejun Xia <[email protected]>
(cherry picked from commit d97d273)

# Conflicts:
#	fe/fe-core/src/main/java/com/starrocks/catalog/DynamicPartitionProperty.java
#	fe/fe-core/src/main/java/com/starrocks/clone/DynamicPartitionScheduler.java
#	fe/fe-core/src/main/java/com/starrocks/common/util/DynamicPartitionUtil.java
#	fe/fe-core/src/main/java/com/starrocks/qe/ShowExecutor.java
  • Loading branch information
nshangyiming authored and mergify[bot] committed May 8, 2024
1 parent 0479c9f commit 0c22807
Show file tree
Hide file tree
Showing 11 changed files with 772 additions and 72 deletions.
5 changes: 5 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/catalog/Database.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import com.starrocks.persist.DropInfo;
import com.starrocks.server.CatalogMgr;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.statistic.StatsConstants;
import com.starrocks.system.SystemInfoService;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -903,6 +904,10 @@ public boolean isSystemDatabase() {
fullQualifiedName.equalsIgnoreCase(SysDb.DATABASE_NAME);
}

public boolean isStatisticsDatabase() {
return fullQualifiedName.equalsIgnoreCase(StatsConstants.STATISTICS_DB_NAME);
}

// the invoker should hold db's writeLock
public void setExist(boolean exist) {
this.exist = exist;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ public class DynamicPartitionProperty {
public static final int NOT_SET_HISTORY_PARTITION_NUM = 0;
public static final String NOT_SET_PREFIX = "p";

private boolean exist;
private final boolean exists;

private boolean enable;
private boolean enabled;
private String timeUnit;
private int start;
private int end;
Expand All @@ -77,8 +77,8 @@ public class DynamicPartitionProperty {
private int historyPartitionNum;
public DynamicPartitionProperty(Map<String, String> properties) {
if (properties != null && !properties.isEmpty()) {
this.exist = true;
this.enable = Boolean.parseBoolean(properties.get(ENABLE));
this.exists = true;
this.enabled = Boolean.parseBoolean(properties.get(ENABLE));
this.timeUnit = properties.get(TIME_UNIT);
this.tz = TimeUtils.getOrSystemTimeZone(properties.get(TIME_ZONE));
// In order to compatible dynamic add partition version
Expand All @@ -92,7 +92,7 @@ public DynamicPartitionProperty(Map<String, String> properties) {
HISTORY_PARTITION_NUM, String.valueOf(NOT_SET_HISTORY_PARTITION_NUM)));
createStartOfs(properties);
} else {
this.exist = false;
this.exists = false;
}
}

Expand All @@ -112,8 +112,8 @@ private void createStartOfs(Map<String, String> properties) {
}
}

public boolean isExist() {
return exist;
public boolean isExists() {
return exists;
}

public String getTimeUnit() {
Expand All @@ -136,8 +136,8 @@ public int getBuckets() {
return buckets;
}

public boolean getEnable() {
return enable;
public boolean isEnabled() {
return enabled;
}

public StartOfDate getStartOfWeek() {
Expand Down Expand Up @@ -171,6 +171,7 @@ public int getHistoryPartitionNum() {
return historyPartitionNum;
}

<<<<<<< HEAD
public String getPropString() {
StringBuilder sb = new StringBuilder();
sb.append("{");
Expand All @@ -180,6 +181,16 @@ public String getPropString() {
sb.append(START + ":" + start + ",");
sb.append(END + ":" + end + ",");
sb.append(PREFIX + ":" + prefix + ",");
=======
public Map<String, String> getProperties() {
Map<String, String> properties = Maps.newHashMap();
properties.put(ENABLE, String.valueOf(enabled));
properties.put(TIME_UNIT, timeUnit);
properties.put(TIME_ZONE, tz.getID());
properties.put(START, String.valueOf(start));
properties.put(END, String.valueOf(end));
properties.put(PREFIX, prefix);
>>>>>>> d97d27382e ([BugFix] Fix dynamic partition table unexpectly stop scheduling (#45235))
if (buckets > 0) {
sb.append(BUCKETS + ":" + buckets + ",");
}
Expand All @@ -203,7 +214,7 @@ public void setTimeUnit(String timeUnit) {

@Override
public String toString() {
String res = ",\n\"" + ENABLE + "\" = \"" + enable + "\""
String res = ",\n\"" + ENABLE + "\" = \"" + enabled + "\""
+ ",\n\"" + TIME_UNIT + "\" = \"" + timeUnit + "\""
+ ",\n\"" + TIME_ZONE + "\" = \"" + tz.getID() + "\""
+ ",\n\"" + START + "\" = \"" + start + "\""
Expand Down
4 changes: 2 additions & 2 deletions fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ public TableProperty getTableProperty() {
public boolean dynamicPartitionExists() {
return tableProperty != null
&& tableProperty.getDynamicPartitionProperty() != null
&& tableProperty.getDynamicPartitionProperty().isExist();
&& tableProperty.getDynamicPartitionProperty().isExists();
}

public void setBaseIndexId(long baseIndexId) {
Expand Down Expand Up @@ -2481,7 +2481,7 @@ public void onCreate(Database db) {

DynamicPartitionUtil.registerOrRemovePartitionScheduleInfo(db.getId(), this);

if (Config.dynamic_partition_enable && getTableProperty().getDynamicPartitionProperty().getEnable()) {
if (Config.dynamic_partition_enable && getTableProperty().getDynamicPartitionProperty().isEnabled()) {
new Thread(() -> {
try {
GlobalStateMgr.getCurrentState().getDynamicPartitionScheduler()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@
import com.starrocks.sql.ast.PartitionValue;
import com.starrocks.sql.ast.RandomDistributionDesc;
import com.starrocks.sql.ast.SingleRangePartitionDesc;
import com.starrocks.statistic.StatsConstants;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.Nullable;
import org.threeten.extra.PeriodDuration;

import java.time.LocalDate;
Expand Down Expand Up @@ -116,15 +116,18 @@ public class DynamicPartitionScheduler extends FrontendDaemon {
// (DbId, TableId) for a collection of objects marked with partition_ttl_number > 0 on the table
private final Set<Pair<Long, Long>> ttlPartitionInfo = Sets.newConcurrentHashSet();

private boolean initialize;
private long lastFindingTime = -1;

public enum State {
NORMAL, ERROR
}

public boolean isInScheduler(long dbId, long tableId) {
return dynamicPartitionTableInfo.contains(new Pair<>(dbId, tableId));
}

public DynamicPartitionScheduler(String name, long intervalMs) {
super(name, intervalMs);
this.initialize = false;
}

public void registerDynamicPartitionTable(Long dbId, Long tableId) {
Expand Down Expand Up @@ -272,6 +275,7 @@ private ArrayList<AddPartitionClause> getAddPartitionClause(Database db, OlapTab
addPartitionClauses.add(new AddPartitionClause(rangePartitionDesc, null, null, false));
} else {
// construct distribution desc
<<<<<<< HEAD
DistributionInfo distributionInfo = olapTable.getDefaultDistributionInfo();
DistributionDesc distributionDesc = null;
if (distributionInfo instanceof HashDistributionInfo) {
Expand All @@ -285,6 +289,9 @@ private ArrayList<AddPartitionClause> getAddPartitionClause(Database db, OlapTab
} else if (distributionInfo instanceof RandomDistributionInfo) {
distributionDesc = new RandomDistributionDesc(dynamicPartitionProperty.getBuckets());
}
=======
DistributionDesc distributionDesc = createDistributionDesc(olapTable, dynamicPartitionProperty);
>>>>>>> d97d27382e ([BugFix] Fix dynamic partition table unexpectly stop scheduling (#45235))

// add partition according to partition desc and distribution desc
addPartitionClauses.add(new AddPartitionClause(rangePartitionDesc, distributionDesc, null, false));
Expand All @@ -293,6 +300,25 @@ private ArrayList<AddPartitionClause> getAddPartitionClause(Database db, OlapTab
return addPartitionClauses;
}

@Nullable
private static DistributionDesc createDistributionDesc(OlapTable olapTable,
DynamicPartitionProperty dynamicPartitionProperty) {
DistributionInfo distributionInfo = olapTable.getDefaultDistributionInfo();
DistributionDesc distributionDesc = null;
if (distributionInfo instanceof HashDistributionInfo) {
HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo;
List<String> distColumnNames = new ArrayList<>();
for (Column distributionColumn : hashDistributionInfo.getDistributionColumns()) {
distColumnNames.add(distributionColumn.getName());
}
distributionDesc = new HashDistributionDesc(dynamicPartitionProperty.getBuckets(),
distColumnNames);
} else if (distributionInfo instanceof RandomDistributionInfo) {
distributionDesc = new RandomDistributionDesc(dynamicPartitionProperty.getBuckets());
}
return distributionDesc;
}

/**
* 1. get the range of [start, 0) as a reserved range.
* 2. get DropPartitionClause of partitions which range are before this reserved range.
Expand Down Expand Up @@ -347,7 +373,7 @@ private ArrayList<DropPartitionClause> getDropPartitionClause(Database db, OlapT
return dropPartitionClauses;
}

private void executeDynamicPartition() {
private void scheduleDynamicPartition() {
Iterator<Pair<Long, Long>> iterator = dynamicPartitionTableInfo.iterator();
while (iterator.hasNext()) {
Pair<Long, Long> tableInfo = iterator.next();
Expand Down Expand Up @@ -377,7 +403,7 @@ public boolean executeDynamicPartitionForTable(Long dbId, Long tableId) {
olapTable = (OlapTable) db.getTable(tableId);
// Only OlapTable has DynamicPartitionProperty
if (olapTable == null || !olapTable.dynamicPartitionExists() ||
!olapTable.getTableProperty().getDynamicPartitionProperty().getEnable()) {
!olapTable.getTableProperty().getDynamicPartitionProperty().isEnabled()) {
if (olapTable == null) {
LOG.warn("Automatically removes the schedule because table does not exist, " +
"tableId: {}", tableId);
Expand Down Expand Up @@ -456,7 +482,7 @@ public boolean executeDynamicPartitionForTable(Long dbId, Long tableId) {
return false;
}

private void executePartitionTimeToLive() {
private void scheduleTTLPartition() {
Iterator<Pair<Long, Long>> iterator = ttlPartitionInfo.iterator();
while (iterator.hasNext()) {
Pair<Long, Long> tableInfo = iterator.next();
Expand All @@ -474,8 +500,8 @@ private void executePartitionTimeToLive() {
olapTable = (OlapTable) table;
} else {
iterator.remove();
LOG.warn("database={}, table={}. is not olap table. remove it from scheduler",
dbId, tableId);
LOG.warn("database={}-{}, table={}. is not olap table. remove it from scheduler",
db.getFullName(), dbId, tableId);
continue;
}

Expand Down Expand Up @@ -507,12 +533,13 @@ private void executePartitionTimeToLive() {
ArrayList<DropPartitionClause> dropPartitionClauses = null;
try {
if (!ttlDuration.isZero()) {
dropPartitionClauses = buildDropPartitionClauseByTTL(olapTable, ttlDuration);
dropPartitionClauses = buildDropPartitionClauseByTTLDuration(olapTable, ttlDuration);
} else {
dropPartitionClauses = getDropPartitionClauseByTTL(olapTable, ttlNumber);
dropPartitionClauses = buildDropPartitionClauseByTTLNumber(olapTable, ttlNumber);
}
} catch (AnalysisException e) {
LOG.warn("database={}, table={} Failed to generate drop statement.", dbId, tableId, e);
LOG.warn("database={}-{}, table={}-{} failed to build drop partition statement.",
db.getFullName(), dbId, table.getName(), tableId, e);
}
if (dropPartitionClauses == null) {
continue;
Expand All @@ -538,8 +565,8 @@ private void executePartitionTimeToLive() {
* Build drop partitions by TTL.
* Drop the partition if partition upper endpoint less than TTL lower bound
*/
private ArrayList<DropPartitionClause> buildDropPartitionClauseByTTL(OlapTable olapTable,
PeriodDuration ttlDuration)
private ArrayList<DropPartitionClause> buildDropPartitionClauseByTTLDuration(OlapTable olapTable,
PeriodDuration ttlDuration)
throws AnalysisException {
if (ttlDuration.isZero()) {
return Lists.newArrayList();
Expand All @@ -549,7 +576,7 @@ private ArrayList<DropPartitionClause> buildDropPartitionClauseByTTL(OlapTable o
List<Column> partitionColumns = rangePartitionInfo.getPartitionColumns();
Preconditions.checkArgument(partitionColumns.size() == 1);
Type partitionType = partitionColumns.get(0).getType();
PartitionKey ttlLowerBound = null;
PartitionKey ttlLowerBound;

LocalDateTime ttlTime = LocalDateTime.now().minus(ttlDuration);
if (partitionType.isDatetime()) {
Expand Down Expand Up @@ -579,9 +606,8 @@ private ArrayList<DropPartitionClause> buildDropPartitionClauseByTTL(OlapTable o
return dropPartitionClauses;
}

private ArrayList<DropPartitionClause> getDropPartitionClauseByTTL(OlapTable olapTable, int ttlNumber)
private ArrayList<DropPartitionClause> buildDropPartitionClauseByTTLNumber(OlapTable olapTable, int ttlNumber)
throws AnalysisException {

ArrayList<DropPartitionClause> dropPartitionClauses = new ArrayList<>();
RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) (olapTable.getPartitionInfo());
List<Column> partitionColumns = rangePartitionInfo.getPartitionColumns();
Expand All @@ -595,6 +621,7 @@ private ArrayList<DropPartitionClause> getDropPartitionClauseByTTL(OlapTable ola
if (partitionType.isDateType()) {
PartitionKey currentPartitionKey = partitionType.isDatetime() ?
PartitionKey.ofDateTime(LocalDateTime.now()) : PartitionKey.ofDate(LocalDate.now());
// For expr partitioning table, always has a shadow partition, we should avoid deleting it.
PartitionKey shadowPartitionKey = PartitionKey.createShadowPartitionKey(partitionColumns);

Map<Long, Range<PartitionKey>> idToRange = rangePartitionInfo.getIdToRange(false);
Expand All @@ -618,16 +645,15 @@ private ArrayList<DropPartitionClause> getDropPartitionClauseByTTL(OlapTable ola
candidatePartitionList.sort(Comparator.comparing(o -> o.getValue().upperEndpoint()));

int allPartitionNumber = candidatePartitionList.size();
if (allPartitionNumber <= ttlNumber) {
return dropPartitionClauses;
} else {
if (allPartitionNumber > ttlNumber) {
int dropSize = allPartitionNumber - ttlNumber;
for (int i = 0; i < dropSize; i++) {
Long checkDropPartitionId = candidatePartitionList.get(i).getKey();
Partition partition = olapTable.getPartition(checkDropPartitionId);
if (partition != null) {
String dropPartitionName = partition.getName();
dropPartitionClauses.add(new DropPartitionClause(false, dropPartitionName, false, true));
dropPartitionClauses.add(new DropPartitionClause(false, dropPartitionName,
false, true));
}
}
}
Expand Down Expand Up @@ -656,13 +682,21 @@ private void clearDropPartitionFailedMsg(String tableName) {
createOrUpdateRuntimeInfo(tableName, DROP_PARTITION_MSG, DEFAULT_RUNTIME_VALUE);
}

<<<<<<< HEAD
private void initDynamicPartitionTable() {
for (Long dbId : GlobalStateMgr.getCurrentState().getDbIds()) {
=======
private void findSchedulableTables() {
Map<String, List<String>> dynamicPartitionTables = new HashMap<>();
Map<String, List<String>> ttlPartitionTables = new HashMap<>();
long start = System.currentTimeMillis();
for (Long dbId : GlobalStateMgr.getCurrentState().getLocalMetastore().getDbIds()) {
>>>>>>> d97d27382e ([BugFix] Fix dynamic partition table unexpectly stop scheduling (#45235))
Database db = GlobalStateMgr.getCurrentState().getDb(dbId);
if (db == null) {
continue;
}
if (db.isSystemDatabase() || db.getFullName().equals(StatsConstants.STATISTICS_DB_NAME)) {
if (db.isSystemDatabase() || db.isStatisticsDatabase()) {
continue;
}

Expand All @@ -671,16 +705,25 @@ private void initDynamicPartitionTable() {
for (Table table : GlobalStateMgr.getCurrentState().getDb(dbId).getTables()) {
if (DynamicPartitionUtil.isDynamicPartitionTable(table)) {
registerDynamicPartitionTable(db.getId(), table.getId());
}
if (DynamicPartitionUtil.isTTLPartitionTable(table)) {
dynamicPartitionTables.computeIfAbsent(db.getFullName(), k -> new ArrayList<>())
.add(table.getName());
} else if (DynamicPartitionUtil.isTTLPartitionTable(table)) {
// Table(MV) with dynamic partition enabled should not specify partition_ttl_number(MV) or
// partition_live_number property.
registerTtlPartitionTable(db.getId(), table.getId());
ttlPartitionTables.computeIfAbsent(db.getFullName(), k -> new ArrayList<>())
.add(table.getName());
}
}
} finally {
db.readUnlock();
}
}
initialize = true;
LOG.info("finished to find all schedulable tables, cost: {}ms, " +
"dynamic partition tables: {}, ttl partition tables: {}, scheduler enabled: {}, scheduler interval: {}s",
System.currentTimeMillis() - start, dynamicPartitionTables, ttlPartitionTables,
Config.dynamic_partition_enable, Config.dynamic_partition_check_interval_seconds);
lastFindingTime = System.currentTimeMillis();
}

@VisibleForTesting
Expand All @@ -690,14 +733,25 @@ public void runOnceForTest() {

@Override
protected void runAfterCatalogReady() {
if (!initialize) {
// check Dynamic Partition tables only when FE start
initDynamicPartitionTable();
// Find all tables that need to be scheduled.
long now = System.currentTimeMillis();
if ((now - lastFindingTime) > Math.max(300000, Config.dynamic_partition_check_interval_seconds)) {
findSchedulableTables();
}

// Update scheduler interval.
setInterval(Config.dynamic_partition_check_interval_seconds * 1000L);

// Schedule tables with dynamic partition enabled(only works for base table).
if (Config.dynamic_partition_enable) {
executeDynamicPartition();
scheduleDynamicPartition();
}
executePartitionTimeToLive();

// Schedule tables(mvs) with ttl partition enabled.
// For now, partition_live_number works for base table with
// single column range partitioning(including expr partitioning, e.g. ... partition by date_trunc('month', col).
// partition_ttl_number and partition_ttl work for mv with
// single column range partitioning(including expr partitioning).
scheduleTTLPartition();
}
}
Loading

0 comments on commit 0c22807

Please sign in to comment.