Skip to content

Commit

Permalink
[Feature-#1886][hbase] New features in sql mode
Browse files Browse the repository at this point in the history
[Feature-#1886][hbase] New features in sql mode
  • Loading branch information
libailin authored and zoudaokoulife committed May 29, 2024
1 parent bf9458e commit 3a3f015
Show file tree
Hide file tree
Showing 20 changed files with 630 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package com.dtstack.chunjun.connector.hbase;

import com.dtstack.chunjun.connector.hbase.config.HBaseConfig;

import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
Expand All @@ -34,6 +36,7 @@
import java.util.Map;
import java.util.Optional;

import static com.dtstack.chunjun.connector.hbase.config.HBaseConfigConstants.MULTI_VERSION_FIXED_COLUMN;
import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;

/** Helps to specify an HBase Table's schema. */
Expand Down Expand Up @@ -315,12 +318,91 @@ private static DataType getRowDataType(String[] fieldNames, DataType[] fieldType
return DataTypes.ROW(fields);
}

/**
* Converts this {@link HBaseTableSchema} to {@link DataType}, the fields are consisted of
* families and rowkey, the order is in the definition order (i.e. calling {@link
* #addColumn(String, String, Class)} and {@link #setRowKey(String, Class)}). The family field
* is a composite type which is consisted of qualifiers.
*
* @return the {@link DataType} derived from the {@link HBaseTableSchema}.
*/
public DataType convertToDataType() {
String[] familyNames = getFamilyNames();
if (rowKeyInfo != null) {
String[] fieldNames = new String[familyNames.length + 1];
DataType[] fieldTypes = new DataType[familyNames.length + 1];
for (int i = 0; i < fieldNames.length; i++) {
if (i == rowKeyInfo.rowKeyIndex) {
fieldNames[i] = rowKeyInfo.rowKeyName;
fieldTypes[i] = rowKeyInfo.rowKeyType;
} else {
int familyIndex = i < rowKeyInfo.rowKeyIndex ? i : i - 1;
String family = familyNames[familyIndex];
fieldNames[i] = family;
fieldTypes[i] =
getRowDataType(
getQualifierNames(family), getQualifierDataTypes(family));
}
}
DataTypes.Field[] fields = new DataTypes.Field[fieldNames.length];
for (int i = 0; i < fields.length; i++) {
fields[i] = DataTypes.FIELD(fieldNames[i], fieldTypes[i]);
}
return DataTypes.ROW(fields);
} else {
String[] fieldNames = new String[familyNames.length];
DataType[] fieldTypes = new DataType[familyNames.length];
for (int i = 0; i < fieldNames.length; i++) {
String family = familyNames[i];
fieldNames[i] = family;
fieldTypes[i] =
getRowDataType(getQualifierNames(family), getQualifierDataTypes(family));
}
DataTypes.Field[] fields = new DataTypes.Field[fieldNames.length];
for (int i = 0; i < fields.length; i++) {
fields[i] = DataTypes.FIELD(fieldNames[i], fieldTypes[i]);
}
return DataTypes.ROW(fields);
}
}

/** Construct a {@link HBaseTableSchema} from a {@link DataType}. */
public static HBaseTableSchema fromDataType(DataType physicalRowType, HBaseConfig conf) {
HBaseTableSchema hbaseSchema = new HBaseTableSchema();
RowType rowType = (RowType) physicalRowType.getLogicalType();
for (RowType.RowField field : rowType.getFields()) {
LogicalType fieldType = field.getType();
if (conf.getMode().equalsIgnoreCase(MULTI_VERSION_FIXED_COLUMN)) {
continue;
}
if (fieldType.getTypeRoot() == LogicalTypeRoot.ROW) {
RowType familyType = (RowType) fieldType;
String familyName = field.getName();
for (RowType.RowField qualifier : familyType.getFields()) {
hbaseSchema.addColumn(
familyName,
qualifier.getName(),
fromLogicalToDataType(qualifier.getType()));
}
} else if (fieldType.getChildren().size() == 0) {
hbaseSchema.setRowKey(field.getName(), fromLogicalToDataType(fieldType));
} else {
throw new IllegalArgumentException(
"Unsupported field type '" + fieldType + "' for HBase.");
}
}
return hbaseSchema;
}

/** Construct a {@link HBaseTableSchema} from a {@link TableSchema}. */
public static HBaseTableSchema fromTableSchema(TableSchema schema) {
public static HBaseTableSchema fromTableSchema(TableSchema schema, HBaseConfig conf) {
HBaseTableSchema hbaseSchema = new HBaseTableSchema();
RowType rowType = (RowType) schema.toPhysicalRowDataType().getLogicalType();
for (RowType.RowField field : rowType.getFields()) {
LogicalType fieldType = field.getType();
if (conf.getMode().equalsIgnoreCase(MULTI_VERSION_FIXED_COLUMN)) {
continue;
}
if (fieldType.getTypeRoot() == LogicalTypeRoot.ROW) {
RowType familyType = (RowType) fieldType;
String familyName = field.getName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ public class HBaseConfig extends CommonConfig {
private boolean isBinaryRowkey;
private String table;
private int scanCacheSize = 1000;
private int scanBatchSize = -1;
private int maxVersion = Integer.MAX_VALUE;
private String mode = "normal";

// writer
private String nullMode = "SKIP";
Expand All @@ -66,6 +69,7 @@ public class HBaseConfig extends CommonConfig {
private long writeBufferSize;
private String rowkeyExpress;
private Integer versionColumnIndex;
private String versionColumnName;
private String versionColumnValue;
private Long ttl;
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,6 @@ public class HBaseConfigConstants {
public static final long DEFAULT_WRITE_BUFFER_SIZE = 8 * 1024 * 1024L;

public static final boolean DEFAULT_WAL_FLAG = false;

public static final String MULTI_VERSION_FIXED_COLUMN = "multiVersionFixedColumn";
}
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ public Long getVersion(RowData record) {
"version column index out of range: " + versionColumnIndex);
}
if (record.isNullAt(versionColumnIndex)) {
throw new IllegalArgumentException("null verison column!");
throw new IllegalArgumentException("null version column!");
}

timeStampValue = ((ColumnRowData) record).getField(versionColumnIndex).getData();
Expand Down
Loading

0 comments on commit 3a3f015

Please sign in to comment.