From 3a3f015bbd74791880c3c5cf099afe2c2bd97fd6 Mon Sep 17 00:00:00 2001 From: libailin Date: Tue, 26 Mar 2024 16:55:59 +0800 Subject: [PATCH] [Feature-#1886][hbase] New features in sql mode [Feature-#1886][hbase] New features in sql mode --- .../connector/hbase/HBaseTableSchema.java | 84 +++++++- .../connector/hbase/config/HBaseConfig.java | 4 + .../hbase/config/HBaseConfigConstants.java | 2 + .../converter/HBaseFlatRowConverter.java | 2 +- .../connector/hbase/converter/HBaseSerde.java | 199 +++++++++++++++++- .../hbase/converter/HBaseSqlConverter.java | 52 ++++- .../hbase/source/HBaseInputFormat.java | 70 ++++++ .../hbase/source/HBaseInputFormatBuilder.java | 12 ++ .../hbase/source/HBaseSourceFactoryBase.java | 3 +- .../table/HBaseDynamicTableFactoryBase.java | 98 ++++++++- .../hbase/table/HBaseDynamicTableSink.java | 11 +- .../hbase/table/HBaseDynamicTableSource.java | 41 +++- .../connector/hbase/table/HBaseOptions.java | 58 +++++ .../table/lookup/HBaseAllTableFunction.java | 4 +- .../table/lookup/HBaseLruTableFunction.java | 2 +- .../connector/hbase/util/HBaseHelper.java | 10 + .../connector/hbase/HBaseTableSchemaTest.java | 5 +- .../hbase/converter/HBaseSerdeTest.java | 5 +- .../chunjun-connector-hbase2/pom.xml | 4 + .../org.apache.flink.table.factories.Factory | 16 ++ 20 files changed, 630 insertions(+), 52 deletions(-) create mode 100644 chunjun-connectors/chunjun-connector-hbase2/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory diff --git a/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/HBaseTableSchema.java b/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/HBaseTableSchema.java index 81d21c3762..80036fc1cd 100644 --- a/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/HBaseTableSchema.java +++ b/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/HBaseTableSchema.java @@ -18,6 +18,8 @@ package com.dtstack.chunjun.connector.hbase; +import com.dtstack.chunjun.connector.hbase.config.HBaseConfig; + import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; @@ -34,6 +36,7 @@ import java.util.Map; import java.util.Optional; +import static com.dtstack.chunjun.connector.hbase.config.HBaseConfigConstants.MULTI_VERSION_FIXED_COLUMN; import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType; /** Helps to specify an HBase Table's schema. */ @@ -315,12 +318,91 @@ private static DataType getRowDataType(String[] fieldNames, DataType[] fieldType return DataTypes.ROW(fields); } + /** + * Converts this {@link HBaseTableSchema} to {@link DataType}, the fields are consisted of + * families and rowkey, the order is in the definition order (i.e. calling {@link + * #addColumn(String, String, Class)} and {@link #setRowKey(String, Class)}). The family field + * is a composite type which is consisted of qualifiers. + * + * @return the {@link DataType} derived from the {@link HBaseTableSchema}. + */ + public DataType convertToDataType() { + String[] familyNames = getFamilyNames(); + if (rowKeyInfo != null) { + String[] fieldNames = new String[familyNames.length + 1]; + DataType[] fieldTypes = new DataType[familyNames.length + 1]; + for (int i = 0; i < fieldNames.length; i++) { + if (i == rowKeyInfo.rowKeyIndex) { + fieldNames[i] = rowKeyInfo.rowKeyName; + fieldTypes[i] = rowKeyInfo.rowKeyType; + } else { + int familyIndex = i < rowKeyInfo.rowKeyIndex ? i : i - 1; + String family = familyNames[familyIndex]; + fieldNames[i] = family; + fieldTypes[i] = + getRowDataType( + getQualifierNames(family), getQualifierDataTypes(family)); + } + } + DataTypes.Field[] fields = new DataTypes.Field[fieldNames.length]; + for (int i = 0; i < fields.length; i++) { + fields[i] = DataTypes.FIELD(fieldNames[i], fieldTypes[i]); + } + return DataTypes.ROW(fields); + } else { + String[] fieldNames = new String[familyNames.length]; + DataType[] fieldTypes = new DataType[familyNames.length]; + for (int i = 0; i < fieldNames.length; i++) { + String family = familyNames[i]; + fieldNames[i] = family; + fieldTypes[i] = + getRowDataType(getQualifierNames(family), getQualifierDataTypes(family)); + } + DataTypes.Field[] fields = new DataTypes.Field[fieldNames.length]; + for (int i = 0; i < fields.length; i++) { + fields[i] = DataTypes.FIELD(fieldNames[i], fieldTypes[i]); + } + return DataTypes.ROW(fields); + } + } + + /** Construct a {@link HBaseTableSchema} from a {@link DataType}. */ + public static HBaseTableSchema fromDataType(DataType physicalRowType, HBaseConfig conf) { + HBaseTableSchema hbaseSchema = new HBaseTableSchema(); + RowType rowType = (RowType) physicalRowType.getLogicalType(); + for (RowType.RowField field : rowType.getFields()) { + LogicalType fieldType = field.getType(); + if (conf.getMode().equalsIgnoreCase(MULTI_VERSION_FIXED_COLUMN)) { + continue; + } + if (fieldType.getTypeRoot() == LogicalTypeRoot.ROW) { + RowType familyType = (RowType) fieldType; + String familyName = field.getName(); + for (RowType.RowField qualifier : familyType.getFields()) { + hbaseSchema.addColumn( + familyName, + qualifier.getName(), + fromLogicalToDataType(qualifier.getType())); + } + } else if (fieldType.getChildren().size() == 0) { + hbaseSchema.setRowKey(field.getName(), fromLogicalToDataType(fieldType)); + } else { + throw new IllegalArgumentException( + "Unsupported field type '" + fieldType + "' for HBase."); + } + } + return hbaseSchema; + } + /** Construct a {@link HBaseTableSchema} from a {@link TableSchema}. */ - public static HBaseTableSchema fromTableSchema(TableSchema schema) { + public static HBaseTableSchema fromTableSchema(TableSchema schema, HBaseConfig conf) { HBaseTableSchema hbaseSchema = new HBaseTableSchema(); RowType rowType = (RowType) schema.toPhysicalRowDataType().getLogicalType(); for (RowType.RowField field : rowType.getFields()) { LogicalType fieldType = field.getType(); + if (conf.getMode().equalsIgnoreCase(MULTI_VERSION_FIXED_COLUMN)) { + continue; + } if (fieldType.getTypeRoot() == LogicalTypeRoot.ROW) { RowType familyType = (RowType) fieldType; String familyName = field.getName(); diff --git a/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/config/HBaseConfig.java b/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/config/HBaseConfig.java index 11afe8fc7a..02a692364e 100644 --- a/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/config/HBaseConfig.java +++ b/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/config/HBaseConfig.java @@ -58,6 +58,9 @@ public class HBaseConfig extends CommonConfig { private boolean isBinaryRowkey; private String table; private int scanCacheSize = 1000; + private int scanBatchSize = -1; + private int maxVersion = Integer.MAX_VALUE; + private String mode = "normal"; // writer private String nullMode = "SKIP"; @@ -66,6 +69,7 @@ public class HBaseConfig extends CommonConfig { private long writeBufferSize; private String rowkeyExpress; private Integer versionColumnIndex; + private String versionColumnName; private String versionColumnValue; private Long ttl; } diff --git a/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/config/HBaseConfigConstants.java b/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/config/HBaseConfigConstants.java index db374b73ff..de93f57404 100644 --- a/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/config/HBaseConfigConstants.java +++ b/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/config/HBaseConfigConstants.java @@ -36,4 +36,6 @@ public class HBaseConfigConstants { public static final long DEFAULT_WRITE_BUFFER_SIZE = 8 * 1024 * 1024L; public static final boolean DEFAULT_WAL_FLAG = false; + + public static final String MULTI_VERSION_FIXED_COLUMN = "multiVersionFixedColumn"; } diff --git a/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/converter/HBaseFlatRowConverter.java b/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/converter/HBaseFlatRowConverter.java index 90d9fb2111..969c2d7573 100644 --- a/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/converter/HBaseFlatRowConverter.java +++ b/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/converter/HBaseFlatRowConverter.java @@ -343,7 +343,7 @@ public Long getVersion(RowData record) { "version column index out of range: " + versionColumnIndex); } if (record.isNullAt(versionColumnIndex)) { - throw new IllegalArgumentException("null verison column!"); + throw new IllegalArgumentException("null version column!"); } timeStampValue = ((ColumnRowData) record).getField(versionColumnIndex).getData(); diff --git a/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/converter/HBaseSerde.java b/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/converter/HBaseSerde.java index 625f15adb0..18b1bc9e5b 100644 --- a/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/converter/HBaseSerde.java +++ b/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/converter/HBaseSerde.java @@ -18,18 +18,26 @@ package com.dtstack.chunjun.connector.hbase.converter; +import com.dtstack.chunjun.connector.hbase.FunctionParser; +import com.dtstack.chunjun.connector.hbase.FunctionTree; import com.dtstack.chunjun.connector.hbase.HBaseTableSchema; +import com.dtstack.chunjun.connector.hbase.config.HBaseConfig; +import com.dtstack.chunjun.constants.ConstantValue; +import com.dtstack.chunjun.util.DateUtil; import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.data.binary.BinaryStringData; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalTypeFamily; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; @@ -42,12 +50,21 @@ import java.io.Serializable; import java.math.BigDecimal; import java.nio.charset.StandardCharsets; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import static com.dtstack.chunjun.connector.hbase.config.HBaseConfigConstants.DEFAULT_NULL_MODE; import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision; import static org.apache.flink.util.Preconditions.checkArgument; /** Utilities for HBase serialization and deserialization. */ +@Slf4j public class HBaseSerde implements Serializable { private static final long serialVersionUID = -4326665856298124101L; @@ -80,7 +97,23 @@ public class HBaseSerde implements Serializable { protected final FieldDecoder[][] qualifierDecoders; private final GenericRowData rowWithRowKey; - public HBaseSerde(HBaseTableSchema hbaseSchema, final String nullStringLiteral) { + private final String versionColumnName; + + private final String versionColumnValue; + + private final SimpleDateFormat timeSecondFormat = + getSimpleDateFormat(ConstantValue.TIME_SECOND_SUFFIX); + private final SimpleDateFormat timeMillisecondFormat = + getSimpleDateFormat(ConstantValue.TIME_MILLISECOND_SUFFIX); + private final HBaseConfig hBaseConfig; + private List rowKeyColumnIndex; + private List rowKeyColumns; + private final List columnNames = new ArrayList<>(); + private FunctionTree functionTree; + private final String nullMode; + + public HBaseSerde(HBaseTableSchema hbaseSchema, HBaseConfig hBaseConfig) { + final String nullStringLiteral = hBaseConfig.getNullStringLiteral(); this.families = hbaseSchema.getFamilyKeys(); this.rowkeyIndex = hbaseSchema.getRowKeyIndex(); LogicalType rowkeyType = @@ -120,8 +153,17 @@ public HBaseSerde(HBaseTableSchema hbaseSchema, final String nullStringLiteral) .map(t -> createNullableFieldDecoder(t, nullStringBytes)) .toArray(FieldDecoder[]::new); this.reusedFamilyRows[f] = new GenericRowData(dataTypes.length); + String[] qualifierNames = hbaseSchema.getQualifierNames(familyNames[f]); + for (String qualifierName : qualifierNames) { + columnNames.add(familyNames[f] + ":" + qualifierName); + } } this.rowWithRowKey = new GenericRowData(1); + this.hBaseConfig = hBaseConfig; + initRowKeyConfig(); + this.versionColumnName = hBaseConfig.getVersionColumnName(); + this.versionColumnValue = hBaseConfig.getVersionColumnValue(); + this.nullMode = hBaseConfig.getNullMode(); } /** @@ -129,15 +171,26 @@ public HBaseSerde(HBaseTableSchema hbaseSchema, final String nullStringLiteral) * * @return The appropriate instance of Put for this use case. */ - public @Nullable Put createPutMutation(RowData row) { + public @Nullable Put createPutMutation(RowData row) throws Exception { checkArgument(keyEncoder != null, "row key is not set."); - byte[] rowkey = keyEncoder.encode(row, rowkeyIndex); + byte[] rowkey; + if (StringUtils.isNotBlank(hBaseConfig.getRowkeyExpress())) { + rowkey = getRowkey(row); + } else { + rowkey = keyEncoder.encode(row, rowkeyIndex); + } if (rowkey.length == 0) { // drop dirty records, rowkey shouldn't be zero length return null; } + Long version = getVersion(row); // upsert - Put put = new Put(rowkey); + Put put; + if (version == null) { + put = new Put(rowkey); + } else { + put = new Put(rowkey, version); + } for (int i = 0; i < fieldLength; i++) { if (i != rowkeyIndex) { int f = i > rowkeyIndex ? i - 1 : i; @@ -149,6 +202,10 @@ public HBaseSerde(HBaseTableSchema hbaseSchema, final String nullStringLiteral) byte[] qualifier = qualifiers[f][q]; // serialize value byte[] value = qualifierEncoders[f][q].encode(familyRow, q); + if (nullMode.equalsIgnoreCase(DEFAULT_NULL_MODE) + && Arrays.equals(value, EMPTY_BYTES)) { + continue; + } put.addColumn(familyKey, qualifier, value); } } @@ -525,4 +582,138 @@ public byte[] getRowKey(Object rowKey) { public Object getRowKey(byte[] rowKey) { return keyDecoder.decode(rowKey); } + + public Long getVersion(RowData record) { + if (StringUtils.isBlank(versionColumnName) && StringUtils.isBlank(versionColumnValue)) { + return null; + } + + Object timeStampValue = versionColumnValue; + if (StringUtils.isNotBlank(versionColumnName)) { + // 指定列作为版本,long/doubleColumn直接record.aslong, + // 其它类型尝试用yyyy-MM-dd HH:mm:ss,yyyy-MM-dd HH:mm:ss SSS去format + String[] cfAndQualifier = versionColumnName.split(":"); + byte[] columnFamiliesName; + byte[] columnName; + if (cfAndQualifier.length == 2 + && StringUtils.isNotBlank(cfAndQualifier[0]) + && StringUtils.isNotBlank(cfAndQualifier[1])) { + columnFamiliesName = Bytes.toBytes(cfAndQualifier[0]); + columnName = Bytes.toBytes(cfAndQualifier[1]); + } else { + throw new IllegalArgumentException( + "version-column-name 参数配置格式应该是:列族:列名. 您配置的列错误:" + versionColumnName); + } + boolean isExist = false; + for (int i = 0; i < fieldLength; i++) { + if (i != rowkeyIndex) { + int f = i > rowkeyIndex ? i - 1 : i; + // get family key + byte[] familyKey = families[f]; + RowData familyRow = record.getRow(i, qualifiers[f].length); + for (int q = 0; q < this.qualifiers[f].length; q++) { + // get quantifier key + byte[] qualifier = qualifiers[f][q]; + // serialize value + byte[] value = qualifierEncoders[f][q].encode(familyRow, q); + if (Bytes.equals(columnFamiliesName, familyKey) + && Bytes.equals(columnName, qualifier)) { + isExist = true; + timeStampValue = qualifierDecoders[f][q].decode(value); + } + } + } + } + if (!isExist) { + throw new IllegalArgumentException( + "version column name not exist in record: " + versionColumnName); + } + } + + if (timeStampValue instanceof Long) { + return (Long) timeStampValue; + } else if (timeStampValue instanceof Double) { + return ((Double) timeStampValue).longValue(); + } else if (timeStampValue instanceof String || timeStampValue instanceof BinaryStringData) { + try { + return Long.valueOf(timeStampValue.toString()); + } catch (Exception e) { + // ignore + } + Date date; + try { + date = timeMillisecondFormat.parse(timeStampValue.toString()); + } catch (ParseException e) { + try { + date = timeSecondFormat.parse(timeStampValue.toString()); + } catch (ParseException e1) { + log.info( + String.format( + "您指定第[%s]列作为hbase写入版本,但在尝试用yyyy-MM-dd HH:mm:ss 和 yyyy-MM-dd HH:mm:ss SSS 去解析为Date时均出错,请检查并修改", + versionColumnName)); + throw new RuntimeException(e1); + } + } + return date.getTime(); + } else if (timeStampValue instanceof Date) { + return ((Date) timeStampValue).getTime(); + } else if (timeStampValue instanceof TimestampData) { + String timeStampStr = ((TimestampData) timeStampValue).toString(); + return DateUtil.getTimestampFromStr(timeStampStr).getTime(); + } else { + throw new RuntimeException("version 类型不兼容: " + timeStampValue.getClass()); + } + } + + private static SimpleDateFormat getSimpleDateFormat(String sign) { + SimpleDateFormat format; + if (ConstantValue.TIME_SECOND_SUFFIX.equals(sign)) { + format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + } else { + format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss SSS"); + } + return format; + } + + private byte[] getRowkey(RowData record) throws Exception { + Map nameValueMap = new HashMap<>((rowKeyColumnIndex.size() << 2) / 3); + for (int i = 0; i < fieldLength; i++) { + if (i != rowkeyIndex) { + int f = i > rowkeyIndex ? i - 1 : i; + // get family key + byte[] familyKey = families[f]; + RowData familyRow = record.getRow(i, qualifiers[f].length); + for (int q = 0; q < this.qualifiers[f].length; q++) { + // get quantifier key + byte[] qualifier = qualifiers[f][q]; + // serialize value + byte[] value = qualifierEncoders[f][q].encode(familyRow, q); + String cfAndQualifier = + Bytes.toString(familyKey) + ":" + Bytes.toString(qualifier); + if (columnNames.contains(cfAndQualifier)) { + nameValueMap.put(cfAndQualifier, qualifierDecoders[f][q].decode(value)); + } + } + } + } + + String rowKeyStr = functionTree.evaluate(nameValueMap); + return rowKeyStr.getBytes(StandardCharsets.UTF_8); + } + + private void initRowKeyConfig() { + if (StringUtils.isNotBlank(hBaseConfig.getRowkeyExpress())) { + this.functionTree = FunctionParser.parse(hBaseConfig.getRowkeyExpress()); + this.rowKeyColumns = FunctionParser.parseRowKeyCol(hBaseConfig.getRowkeyExpress()); + this.rowKeyColumnIndex = new ArrayList<>(rowKeyColumns.size()); + for (String rowKeyColumn : rowKeyColumns) { + int index = columnNames.indexOf(rowKeyColumn); + if (index == -1) { + throw new RuntimeException( + "Can not get row key column from columns:" + rowKeyColumn); + } + rowKeyColumnIndex.add(index); + } + } + } } diff --git a/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/converter/HBaseSqlConverter.java b/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/converter/HBaseSqlConverter.java index a73f60bdf8..e284714f00 100644 --- a/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/converter/HBaseSqlConverter.java +++ b/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/converter/HBaseSqlConverter.java @@ -19,50 +19,82 @@ package com.dtstack.chunjun.connector.hbase.converter; import com.dtstack.chunjun.connector.hbase.HBaseTableSchema; +import com.dtstack.chunjun.connector.hbase.config.HBaseConfig; import com.dtstack.chunjun.converter.AbstractRowConverter; import com.dtstack.chunjun.converter.IDeserializationConverter; import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.RowKind; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; import java.math.BigDecimal; +import static com.dtstack.chunjun.connector.hbase.config.HBaseConfigConstants.MULTI_VERSION_FIXED_COLUMN; import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision; public class HBaseSqlConverter - extends AbstractRowConverter { + extends AbstractRowConverter { private static final long serialVersionUID = -8935215591844851238L; private static final int MIN_TIMESTAMP_PRECISION = 0; private static final int MAX_TIMESTAMP_PRECISION = 3; private static final int MIN_TIME_PRECISION = 0; private static final int MAX_TIME_PRECISION = 3; - private final HBaseTableSchema schema; - private final String nullStringLiteral; + private HBaseTableSchema schema; + private HBaseConfig hBaseConfig; private transient HBaseSerde serde; - public HBaseSqlConverter(HBaseTableSchema schema, String nullStringLiteral) { + public HBaseSqlConverter(HBaseTableSchema schema, HBaseConfig hBaseConfig) { this.schema = schema; - this.nullStringLiteral = nullStringLiteral; + this.hBaseConfig = hBaseConfig; + } + + public HBaseSqlConverter(RowType rowType, HBaseConfig hBaseConfig) { + super(rowType); + this.hBaseConfig = hBaseConfig; + for (int i = 0; i < rowType.getFieldCount(); i++) { + toInternalConverters.add( + wrapIntoNullableInternalConverter( + createInternalConverter(rowType.getTypeAt(i)))); + } } @Override - public RowData toInternal(Result input) { + public RowData toInternal(Object input) throws Exception { + + if (hBaseConfig.getMode().equalsIgnoreCase(MULTI_VERSION_FIXED_COLUMN)) { + Cell current = (Cell) input; + GenericRowData rowData = new GenericRowData(this.rowType.getFieldCount()); + String family = new String(CellUtil.cloneFamily(current)); + String qualifier = new String(CellUtil.cloneQualifier(current)); + String familyAndQualifier = family + ":" + qualifier; + rowData.setField( + 0, toInternalConverters.get(0).deserialize(CellUtil.cloneRow(current))); + rowData.setField( + 1, toInternalConverters.get(1).deserialize(familyAndQualifier.getBytes())); + rowData.setField(2, current.getTimestamp()); + rowData.setField( + 3, StringData.fromString(Bytes.toStringBinary(CellUtil.cloneValue(current)))); + return rowData; + } if (serde == null) { - this.serde = new HBaseSerde(schema, nullStringLiteral); + this.serde = new HBaseSerde(schema, hBaseConfig); } - return serde.convertToReusedRow(input); + return serde.convertToReusedRow((Result) input); } @Override @@ -137,9 +169,9 @@ protected IDeserializationConverter createInternalConverter(LogicalType type) { } @Override - public Mutation toExternal(RowData rowData, Mutation output) { + public Mutation toExternal(RowData rowData, Mutation output) throws Exception { if (serde == null) { - this.serde = new HBaseSerde(schema, nullStringLiteral); + this.serde = new HBaseSerde(schema, hBaseConfig); } RowKind kind = rowData.getRowKind(); if (kind == RowKind.INSERT || kind == RowKind.UPDATE_AFTER) { diff --git a/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/source/HBaseInputFormat.java b/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/source/HBaseInputFormat.java index 65b199ea8e..9810465c75 100644 --- a/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/source/HBaseInputFormat.java +++ b/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/source/HBaseInputFormat.java @@ -22,11 +22,14 @@ import com.dtstack.chunjun.connector.hbase.util.HBaseHelper; import com.dtstack.chunjun.connector.hbase.util.ScanBuilder; import com.dtstack.chunjun.source.format.BaseRichInputFormat; +import com.dtstack.chunjun.throwable.ChunJunRuntimeException; +import com.dtstack.chunjun.util.ExceptionUtil; import org.apache.flink.core.io.InputSplit; import org.apache.flink.table.data.RowData; import lombok.extern.slf4j.Slf4j; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; @@ -41,8 +44,16 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static com.dtstack.chunjun.connector.hbase.config.HBaseConfigConstants.MULTI_VERSION_FIXED_COLUMN; /** The InputFormat Implementation used for HbaseReader */ @Slf4j @@ -59,11 +70,22 @@ public class HBaseInputFormat extends BaseRichInputFormat { protected boolean isBinaryRowkey; /** 客户端每次 rpc fetch 的行数 */ protected int scanCacheSize = 1000; + /** 客户端每次 rpc 从服务器端读取的列数 */ + protected int scanBatchSize = -1; + + protected int maxVersion = Integer.MAX_VALUE; + /** 读取HBase的模式,支持normal模式和multiVersionFixedColumn模式。 */ + protected String mode = "normal"; private transient Connection connection; private transient Scan scan; private transient ResultScanner resultScanner; private transient Result next; + private transient Cell currentCell; + /** 是否读取结束 */ + private transient AtomicBoolean hasNext; + + private transient BlockingQueue queue; private final ScanBuilder scanBuilder; @@ -222,6 +244,8 @@ private String getStartKey(byte[] startRowkeyByte, byte[] regionStarKey) { @Override public void openInternal(InputSplit inputSplit) throws IOException { + this.queue = new LinkedBlockingQueue<>(4096); + this.hasNext = new AtomicBoolean(true); HBaseInputSplit hbaseInputSplit = (HBaseInputSplit) inputSplit; byte[] startRow = Bytes.toBytesBinary(hbaseInputSplit.getStartkey()); byte[] stopRow = Bytes.toBytesBinary(hbaseInputSplit.getEndKey()); @@ -236,17 +260,63 @@ public void openInternal(InputSplit inputSplit) throws IOException { scan.setStartRow(startRow); scan.setStopRow(stopRow); scan.setCaching(scanCacheSize); + scan.setBatch(scanBatchSize); + if (mode.equalsIgnoreCase(MULTI_VERSION_FIXED_COLUMN)) { + scan.setMaxVersions(maxVersion); + } resultScanner = table.getScanner(scan); + if (mode.equalsIgnoreCase(MULTI_VERSION_FIXED_COLUMN)) { + Iterator iterator = resultScanner.iterator(); + try { + while (iterator.hasNext()) { + Result result = iterator.next(); + while (result.advance()) { + currentCell = result.current(); + queue.put(currentCell); + } + } + hasNext.set(false); + } catch (InterruptedException e) { + hasNext.set(false); + log.error( + "put cell data : {} , interrupted error:{}", + currentCell.toString(), + ExceptionUtil.getErrorMessage(e)); + throw new ChunJunRuntimeException( + "because the current thread was interrupted, adding data to the queue failed", + e); + } catch (Exception e) { + hasNext.set(false); + log.error("read data failed , error:{}", ExceptionUtil.getErrorMessage(e)); + throw new ChunJunRuntimeException("read data failed", e); + } + } } @Override public boolean reachedEnd() throws IOException { + if (mode.equalsIgnoreCase(MULTI_VERSION_FIXED_COLUMN)) { + return !hasNext.get() && queue.isEmpty(); + } next = resultScanner.next(); return next == null; } @Override public RowData nextRecordInternal(RowData rawRow) { + if (mode.equalsIgnoreCase(MULTI_VERSION_FIXED_COLUMN)) { + RowData row; + try { + Cell data = queue.poll(5, TimeUnit.SECONDS); + if (Objects.isNull(data)) { + return null; + } + row = rowConverter.toInternal(data); + } catch (Exception e) { + throw new RuntimeException(e); + } + return row; + } try { return rowConverter.toInternal(next); } catch (Exception e) { diff --git a/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/source/HBaseInputFormatBuilder.java b/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/source/HBaseInputFormatBuilder.java index 5022672061..5c5a510e7b 100644 --- a/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/source/HBaseInputFormatBuilder.java +++ b/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/source/HBaseInputFormatBuilder.java @@ -79,6 +79,18 @@ public void setScanCacheSize(int scanCacheSize) { format.scanCacheSize = scanCacheSize; } + public void setScanBatchSize(int scanBatchSize) { + format.scanBatchSize = scanBatchSize; + } + + public void setMode(String mode) { + format.mode = mode; + } + + public void setMaxVersion(int maxVersion) { + format.maxVersion = maxVersion; + } + @Override protected void checkFormat() { Preconditions.checkArgument( diff --git a/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/source/HBaseSourceFactoryBase.java b/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/source/HBaseSourceFactoryBase.java index 01d96e30ef..eb421eec2b 100644 --- a/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/source/HBaseSourceFactoryBase.java +++ b/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/source/HBaseSourceFactoryBase.java @@ -114,7 +114,6 @@ public DataStream createSource() { rowConverter = new HBaseSyncConverter(hBaseConfig, rowType); scanBuilder = ScanBuilder.forSync(fieldConfList); } else { - String nullStringLiteral = hBaseConfig.getNullStringLiteral(); Set columnSet = new LinkedHashSet<>(); for (FieldConfig fieldConfig : fieldConfList) { String fieldName = fieldConfig.getName(); @@ -129,7 +128,7 @@ public DataStream createSource() { HBaseTableSchema hBaseTableSchema = buildHBaseTableSchema(hBaseConfig.getTable(), fieldConfList); syncConfig.getReader().setFieldNameList(new ArrayList<>(columnSet)); - rowConverter = new HBaseSqlConverter(hBaseTableSchema, nullStringLiteral); + rowConverter = new HBaseSqlConverter(hBaseTableSchema, hBaseConfig); scanBuilder = ScanBuilder.forSql(hBaseTableSchema); } diff --git a/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/table/HBaseDynamicTableFactoryBase.java b/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/table/HBaseDynamicTableFactoryBase.java index f37c2f916a..d7b0acc81c 100644 --- a/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/table/HBaseDynamicTableFactoryBase.java +++ b/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/table/HBaseDynamicTableFactoryBase.java @@ -30,6 +30,7 @@ import org.apache.flink.table.factories.DynamicTableSinkFactory; import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.types.DataType; import org.apache.flink.table.utils.TableSchemaUtils; import org.apache.hadoop.hbase.HConstants; @@ -41,11 +42,23 @@ import java.util.Set; import java.util.stream.Collectors; +import static com.dtstack.chunjun.connector.hbase.config.HBaseConfigConstants.MULTI_VERSION_FIXED_COLUMN; +import static com.dtstack.chunjun.connector.hbase.table.HBaseOptions.END_ROW_KEY; +import static com.dtstack.chunjun.connector.hbase.table.HBaseOptions.IS_BINARY_ROW_KEY; +import static com.dtstack.chunjun.connector.hbase.table.HBaseOptions.MAX_VERSION; +import static com.dtstack.chunjun.connector.hbase.table.HBaseOptions.MODE; +import static com.dtstack.chunjun.connector.hbase.table.HBaseOptions.NULL_MODE; import static com.dtstack.chunjun.connector.hbase.table.HBaseOptions.NULL_STRING_LITERAL; +import static com.dtstack.chunjun.connector.hbase.table.HBaseOptions.ROWKEY_EXPRESS; +import static com.dtstack.chunjun.connector.hbase.table.HBaseOptions.SCAN_BATCH_SIZE; +import static com.dtstack.chunjun.connector.hbase.table.HBaseOptions.SCAN_CACHE_SIZE; import static com.dtstack.chunjun.connector.hbase.table.HBaseOptions.SINK_BUFFER_FLUSH_INTERVAL; import static com.dtstack.chunjun.connector.hbase.table.HBaseOptions.SINK_BUFFER_FLUSH_MAX_ROWS; import static com.dtstack.chunjun.connector.hbase.table.HBaseOptions.SINK_BUFFER_FLUSH_MAX_SIZE; +import static com.dtstack.chunjun.connector.hbase.table.HBaseOptions.START_ROW_KEY; import static com.dtstack.chunjun.connector.hbase.table.HBaseOptions.TABLE_NAME; +import static com.dtstack.chunjun.connector.hbase.table.HBaseOptions.VERSION_COLUMN_NAME; +import static com.dtstack.chunjun.connector.hbase.table.HBaseOptions.VERSION_COLUMN_VALUE; import static com.dtstack.chunjun.connector.hbase.table.HBaseOptions.ZOOKEEPER_QUORUM; import static com.dtstack.chunjun.connector.hbase.table.HBaseOptions.ZOOKEEPER_ZNODE_PARENT; import static com.dtstack.chunjun.lookup.options.LookupOptions.LOOKUP_ASYNC_TIMEOUT; @@ -102,6 +115,18 @@ public Set> optionalOptions() { set.add(PRINCIPAL); set.add(KEYTAB); set.add(KRB5_CONF); + + set.add(START_ROW_KEY); + set.add(END_ROW_KEY); + set.add(IS_BINARY_ROW_KEY); + set.add(SCAN_CACHE_SIZE); + set.add(VERSION_COLUMN_NAME); + set.add(VERSION_COLUMN_VALUE); + set.add(ROWKEY_EXPRESS); + set.add(SCAN_BATCH_SIZE); + set.add(MODE); + set.add(MAX_VERSION); + set.add(NULL_MODE); return set; } @@ -113,19 +138,21 @@ public DynamicTableSource createDynamicTableSource(Context context) { helper.validateExcept("properties."); TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()); - validatePrimaryKey(physicalSchema); Map options = context.getCatalogTable().getOptions(); HBaseConfig conf = getHbaseConf(config, options); LookupConfig lookupConfig = getLookupConf(config, context.getObjectIdentifier().getObjectName()); - HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(physicalSchema); - String nullStringLiteral = helper.getOptions().get(NULL_STRING_LITERAL); - return new HBaseDynamicTableSource( - conf, physicalSchema, lookupConfig, hbaseSchema, nullStringLiteral); + HBaseTableSchema hbaseSchema = + validatePrimaryKey( + context.getPhysicalRowDataType(), context.getPrimaryKeyIndexes(), conf); + return new HBaseDynamicTableSource(conf, physicalSchema, lookupConfig, hbaseSchema); } - private static void validatePrimaryKey(TableSchema schema) { - HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(schema); + private static HBaseTableSchema validatePrimaryKey(TableSchema schema, HBaseConfig conf) { + HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(schema, conf); + if (conf.getMode().equalsIgnoreCase(MULTI_VERSION_FIXED_COLUMN)) { + return hbaseSchema; + } if (!hbaseSchema.getRowKeyName().isPresent()) { throw new IllegalArgumentException( "HBase table requires to define a row key field. A row key field is defined as an atomic type, column families and qualifiers are defined as ROW type."); @@ -145,6 +172,45 @@ private static void validatePrimaryKey(TableSchema schema) { } }); } + return hbaseSchema; + } + + /** + * Checks that the HBase table have row key defined. A row key is defined as an atomic type, and + * column families and qualifiers are defined as ROW type. There shouldn't be multiple atomic + * type columns in the schema. The PRIMARY KEY constraint is optional, if exist, the primary key + * constraint must be defined on the single row key field. + */ + public static HBaseTableSchema validatePrimaryKey( + DataType dataType, int[] primaryKeyIndexes, HBaseConfig conf) { + HBaseTableSchema hbaseSchema = HBaseTableSchema.fromDataType(dataType, conf); + if (conf.getMode().equalsIgnoreCase(MULTI_VERSION_FIXED_COLUMN)) { + return hbaseSchema; + } + if (!hbaseSchema.getRowKeyName().isPresent()) { + throw new IllegalArgumentException( + "HBase table requires to define a row key field. " + + "A row key field is defined as an atomic type, " + + "column families and qualifiers are defined as ROW type."); + } + if (primaryKeyIndexes.length == 0) { + return hbaseSchema; + } + if (primaryKeyIndexes.length > 1) { + throw new IllegalArgumentException( + "HBase table doesn't support a primary Key on multiple columns. " + + "The primary key of HBase table must be defined on row key field."); + } + if (!hbaseSchema + .getRowKeyName() + .get() + .equals(DataType.getFieldNames(dataType).get(primaryKeyIndexes[0]))) { + throw new IllegalArgumentException( + "Primary key of HBase table must be defined on the row key field. " + + "A row key field is defined as an atomic type, " + + "column families and qualifiers are defined as ROW type."); + } + return hbaseSchema; } private LookupConfig getLookupConf(ReadableConfig readableConfig, String tableName) { @@ -168,6 +234,17 @@ private HBaseConfig getHbaseConf(ReadableConfig config, Map opti conf.setTable(hTableName); String nullStringLiteral = config.get(NULL_STRING_LITERAL); conf.setNullStringLiteral(nullStringLiteral); + conf.setStartRowkey(config.get(START_ROW_KEY)); + conf.setEndRowkey(config.get(END_ROW_KEY)); + conf.setBinaryRowkey(config.get(IS_BINARY_ROW_KEY)); + conf.setScanCacheSize(config.get(SCAN_CACHE_SIZE)); + conf.setVersionColumnName(config.get(VERSION_COLUMN_NAME)); + conf.setVersionColumnValue(config.get(VERSION_COLUMN_VALUE)); + conf.setRowkeyExpress(config.get(ROWKEY_EXPRESS)); + conf.setScanBatchSize(config.get(SCAN_BATCH_SIZE)); + conf.setMode(config.get(MODE)); + conf.setMaxVersion(config.get(MAX_VERSION)); + conf.setNullMode(config.get(NULL_MODE)); return conf; } @@ -209,7 +286,6 @@ public DynamicTableSink createDynamicTableSink(Context context) { helper.validateExcept("properties."); TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()); - HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(physicalSchema); Map options = context.getCatalogTable().getOptions(); HBaseConfig conf = getHbaseConf(config, options); @@ -219,10 +295,10 @@ public DynamicTableSink createDynamicTableSink(Context context) { conf.setFlushIntervalMills(millis); long bufferFlushMaxSizeInBytes = config.get(SINK_BUFFER_FLUSH_MAX_SIZE).getBytes(); conf.setWriteBufferSize(bufferFlushMaxSizeInBytes); - - conf.setRowkeyExpress(generateRowKey(hbaseSchema)); String nullStringLiteral = helper.getOptions().get(NULL_STRING_LITERAL); - return new HBaseDynamicTableSink(conf, physicalSchema, hbaseSchema, nullStringLiteral); + conf.setNullStringLiteral(nullStringLiteral); + HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(physicalSchema, conf); + return new HBaseDynamicTableSink(conf, physicalSchema, hbaseSchema); } private String generateRowKey(HBaseTableSchema hbaseSchema) { diff --git a/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/table/HBaseDynamicTableSink.java b/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/table/HBaseDynamicTableSink.java index a1d7b04291..f7eddc6aa6 100644 --- a/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/table/HBaseDynamicTableSink.java +++ b/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/table/HBaseDynamicTableSink.java @@ -33,17 +33,12 @@ public class HBaseDynamicTableSink implements DynamicTableSink { private final HBaseConfig config; private final TableSchema tableSchema; private final HBaseTableSchema hbaseSchema; - protected final String nullStringLiteral; public HBaseDynamicTableSink( - HBaseConfig config, - TableSchema tableSchema, - HBaseTableSchema hbaseSchema, - String nullStringLiteral) { + HBaseConfig config, TableSchema tableSchema, HBaseTableSchema hbaseSchema) { this.config = config; this.tableSchema = tableSchema; this.hbaseSchema = hbaseSchema; - this.nullStringLiteral = nullStringLiteral; } @Override @@ -58,7 +53,7 @@ public SinkFunctionProvider getSinkRuntimeProvider(Context context) { builder.setHbaseConfig(config.getHbaseConfig()); builder.setTableName(config.getTable()); builder.setWriteBufferSize(config.getWriteBufferSize()); - HBaseSqlConverter hbaseSqlConverter = new HBaseSqlConverter(hbaseSchema, nullStringLiteral); + HBaseSqlConverter hbaseSqlConverter = new HBaseSqlConverter(hbaseSchema, config); builder.setRowConverter(hbaseSqlConverter); builder.setConfig(config); return SinkFunctionProvider.of( @@ -67,7 +62,7 @@ public SinkFunctionProvider getSinkRuntimeProvider(Context context) { @Override public DynamicTableSink copy() { - return new HBaseDynamicTableSink(config, tableSchema, hbaseSchema, nullStringLiteral); + return new HBaseDynamicTableSink(config, tableSchema, hbaseSchema); } @Override diff --git a/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/table/HBaseDynamicTableSource.java b/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/table/HBaseDynamicTableSource.java index 9fff3d4390..a866c15331 100644 --- a/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/table/HBaseDynamicTableSource.java +++ b/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/table/HBaseDynamicTableSource.java @@ -31,35 +31,37 @@ import com.dtstack.chunjun.source.format.BaseRichInputFormatBuilder; import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.connector.Projection; import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.DataType; + +import static com.dtstack.chunjun.connector.hbase.config.HBaseConfigConstants.MULTI_VERSION_FIXED_COLUMN; public class HBaseDynamicTableSource extends BaseHBaseDynamicTableSource { private final HBaseConfig hBaseConfig; private final TableSchema tableSchema; private final LookupConfig lookupConfig; - private final HBaseTableSchema hbaseSchema; - protected final String nullStringLiteral; + private HBaseTableSchema hbaseSchema; public HBaseDynamicTableSource( HBaseConfig conf, TableSchema tableSchema, LookupConfig lookupConfig, - HBaseTableSchema hbaseSchema, - String nullStringLiteral) { + HBaseTableSchema hbaseSchema) { super(tableSchema, hbaseSchema, conf, lookupConfig); this.hBaseConfig = conf; this.tableSchema = tableSchema; this.lookupConfig = lookupConfig; this.hbaseSchema = hbaseSchema; this.hbaseSchema.setTableName(hBaseConfig.getTable()); - this.nullStringLiteral = nullStringLiteral; } @Override public DynamicTableSource copy() { return new HBaseDynamicTableSource( - this.hBaseConfig, tableSchema, lookupConfig, hbaseSchema, nullStringLiteral); + this.hBaseConfig, tableSchema, lookupConfig, hbaseSchema); } @Override @@ -75,15 +77,30 @@ protected BaseRichInputFormatBuilder getBaseRichInputFormatBuilder() { builder.setColumnMetaInfos(hBaseConfig.getColumnMetaInfos()); builder.setConfig(hBaseConfig); builder.setHbaseConfig(hBaseConfig.getHbaseConfig()); + builder.setStartRowKey(hBaseConfig.getStartRowkey()); + builder.setEndRowKey(hBaseConfig.getEndRowkey()); + builder.setIsBinaryRowkey(hBaseConfig.isBinaryRowkey()); + builder.setScanCacheSize(hBaseConfig.getScanCacheSize()); + builder.setScanBatchSize(hBaseConfig.getScanBatchSize()); + builder.setMode(hBaseConfig.getMode()); + builder.setMaxVersion(hBaseConfig.getMaxVersion()); // 投影下推后, hbaseSchema 会被过滤无用的字段,而 tableSchema 不变, 后面根据 hbaseSchema 生成 hbase scan - AbstractRowConverter rowConverter = new HBaseSqlConverter(hbaseSchema, nullStringLiteral); + AbstractRowConverter rowConverter = new HBaseSqlConverter(hbaseSchema, hBaseConfig); + if (hBaseConfig.getMode().equalsIgnoreCase(MULTI_VERSION_FIXED_COLUMN)) { + rowConverter = + new HBaseSqlConverter( + InternalTypeInfo.of( + tableSchema.toPhysicalRowDataType().getLogicalType()) + .toRowType(), + hBaseConfig); + } builder.setRowConverter(rowConverter); return builder; } @Override protected AbstractLruTableFunction getAbstractLruTableFunction() { - AbstractRowConverter rowConverter = new HBaseSqlConverter(hbaseSchema, nullStringLiteral); + AbstractRowConverter rowConverter = new HBaseSqlConverter(hbaseSchema, hBaseConfig); return new HBaseLruTableFunction(lookupConfig, hbaseSchema, hBaseConfig, rowConverter); } @@ -91,4 +108,12 @@ protected AbstractLruTableFunction getAbstractLruTableFunction() { protected AbstractAllTableFunction getAbstractAllTableFunction() { return new HBaseAllTableFunction(lookupConfig, hbaseSchema, hBaseConfig); } + + @Override + public void applyProjection(int[][] projectedFields, DataType producedDataType) { + this.hbaseSchema = + HBaseTableSchema.fromDataType( + Projection.of(projectedFields).project(hbaseSchema.convertToDataType()), + hBaseConfig); + } } diff --git a/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/table/HBaseOptions.java b/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/table/HBaseOptions.java index ba0f554cf0..13ff48921c 100644 --- a/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/table/HBaseOptions.java +++ b/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/table/HBaseOptions.java @@ -66,4 +66,62 @@ public class HBaseOptions extends BaseFileOptions { .defaultValue(Duration.ofSeconds(1L)) .withDescription( "Writing option, the interval to flush any buffered rows. This can improve performance for writing data to HBase database, but may increase the latency. Can be set to '0' to disable it. Note, both 'sink.buffer-flush.max-size' and 'sink.buffer-flush.max-rows' can be set to '0' with the flush interval set allowing for complete async processing of buffered actions."); + public static final ConfigOption START_ROW_KEY = + ConfigOptions.key("start-row-key") + .stringType() + .noDefaultValue() + .withDescription("start row key"); + public static final ConfigOption END_ROW_KEY = + ConfigOptions.key("end-row-key") + .stringType() + .noDefaultValue() + .withDescription("end row key"); + public static final ConfigOption IS_BINARY_ROW_KEY = + ConfigOptions.key("is-binary-row-key") + .booleanType() + .defaultValue(false) + .withDescription("is binary row key"); + public static final ConfigOption SCAN_CACHE_SIZE = + ConfigOptions.key("scan-cache-size") + .intType() + .defaultValue(1000) + .withDescription("scan cache size"); + public static final ConfigOption VERSION_COLUMN_NAME = + ConfigOptions.key("version-column-name") + .stringType() + .noDefaultValue() + .withDescription("version column name"); + public static final ConfigOption VERSION_COLUMN_VALUE = + ConfigOptions.key("version-column-value") + .stringType() + .noDefaultValue() + .withDescription("version column value"); + public static final ConfigOption ROWKEY_EXPRESS = + ConfigOptions.key("rowkey-express") + .stringType() + .noDefaultValue() + .withDescription("rowkey express"); + public static final ConfigOption SCAN_BATCH_SIZE = + ConfigOptions.key("scan-batch-size") + .intType() + .defaultValue(-1) + .withDescription("scan batch size"); + + public static final ConfigOption MAX_VERSION = + ConfigOptions.key("max-version") + .intType() + .defaultValue(Integer.MAX_VALUE) + .withDescription("max version"); + + public static final ConfigOption MODE = + ConfigOptions.key("mode") + .stringType() + .defaultValue("normal") + .withDescription("Support normal and multiVersionFixedColumn mode"); + + public static final ConfigOption NULL_MODE = + ConfigOptions.key("null-mode") + .stringType() + .defaultValue("SKIP") + .withDescription("Write mode when field value is empty"); } diff --git a/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/table/lookup/HBaseAllTableFunction.java b/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/table/lookup/HBaseAllTableFunction.java index aed2a0dec4..dd5ebff7b1 100644 --- a/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/table/lookup/HBaseAllTableFunction.java +++ b/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/table/lookup/HBaseAllTableFunction.java @@ -57,7 +57,6 @@ public class HBaseAllTableFunction extends AbstractAllTableFunction { private final HBaseTableSchema hbaseTableSchema; private transient HBaseSerde serde; - private final String nullStringLiteral; private final HBaseConfig hBaseConfig; public HBaseAllTableFunction( @@ -65,12 +64,11 @@ public HBaseAllTableFunction( super(null, null, lookupConfig, null); this.hbaseTableSchema = hbaseTableSchema; this.hBaseConfig = hBaseConfig; - this.nullStringLiteral = hBaseConfig.getNullStringLiteral(); } @Override public void open(FunctionContext context) throws Exception { - this.serde = new HBaseSerde(hbaseTableSchema, nullStringLiteral); + this.serde = new HBaseSerde(hbaseTableSchema, hBaseConfig); super.open(context); } diff --git a/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/table/lookup/HBaseLruTableFunction.java b/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/table/lookup/HBaseLruTableFunction.java index 12aebfc30c..52206beae1 100644 --- a/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/table/lookup/HBaseLruTableFunction.java +++ b/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/table/lookup/HBaseLruTableFunction.java @@ -81,7 +81,7 @@ public HBaseLruTableFunction( @Override public void open(FunctionContext context) throws Exception { super.open(context); - this.serde = new HBaseSerde(hbaseTableSchema, hBaseConfig.getNullStringLiteral()); + this.serde = new HBaseSerde(hbaseTableSchema, hBaseConfig); this.executorService = new ThreadPoolExecutor( DEFAULT_POOL_SIZE, diff --git a/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/util/HBaseHelper.java b/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/util/HBaseHelper.java index 21a7628b89..99fbceb87f 100644 --- a/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/util/HBaseHelper.java +++ b/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/util/HBaseHelper.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.security.UserGroupInformation; @@ -58,6 +59,7 @@ public class HBaseHelper { private static final String KEY_HBASE_SECURITY_AUTHENTICATION = "hbase.security.authentication"; private static final String KEY_HBASE_SECURITY_AUTHORIZATION = "hbase.security.authorization"; private static final String KEY_HBASE_SECURITY_AUTH_ENABLE = "hbase.security.auth.enable"; + private static final String KEY_HADOOP_USER_NAME = "hadoop.user.name"; public static Connection getHbaseConnection( HBaseConfig hBaseConfig, String jobId, String taskNumber) { @@ -75,6 +77,14 @@ public static Connection getHbaseConnection( try { Configuration hConfiguration = getConfig(hbaseConfigMap); + Object hadoopUser = hbaseConfigMap.get(KEY_HADOOP_USER_NAME); + // 如果配置的 hadoop 用户不为空,那么设置配置中的用户。 + if (hadoopUser != null && StringUtils.isNotEmpty(hadoopUser.toString())) { + UserGroupInformation userGroupInformation = + UserGroupInformation.createRemoteUser(hadoopUser.toString()); + return ConnectionFactory.createConnection( + hConfiguration, User.create(userGroupInformation)); + } return ConnectionFactory.createConnection(hConfiguration); } catch (IOException e) { log.error("Get connection fail with config:{}", hbaseConfigMap); diff --git a/chunjun-connectors/chunjun-connector-hbase-base/src/test/java/com/dtstack/chunjun/connector/hbase/HBaseTableSchemaTest.java b/chunjun-connectors/chunjun-connector-hbase-base/src/test/java/com/dtstack/chunjun/connector/hbase/HBaseTableSchemaTest.java index 6f000694ea..dad59dbc99 100644 --- a/chunjun-connectors/chunjun-connector-hbase-base/src/test/java/com/dtstack/chunjun/connector/hbase/HBaseTableSchemaTest.java +++ b/chunjun-connectors/chunjun-connector-hbase-base/src/test/java/com/dtstack/chunjun/connector/hbase/HBaseTableSchemaTest.java @@ -18,6 +18,8 @@ package com.dtstack.chunjun.connector.hbase; +import com.dtstack.chunjun.connector.hbase.config.HBaseConfig; + import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.types.DataType; @@ -89,7 +91,8 @@ public void testFromTableSchema() { .field("stu", DataTypes.ROW(DataTypes.FIELD("id", DataTypes.INT()))) .build(); - HBaseTableSchema fromTableSchema = HBaseTableSchema.fromTableSchema(tableSchema); + HBaseTableSchema fromTableSchema = + HBaseTableSchema.fromTableSchema(tableSchema, new HBaseConfig()); Assert.assertEquals("stu", fromTableSchema.getFamilyNames()[0]); Assert.assertEquals( DataTypes.INT(), fromTableSchema.getRowKeyDataType().orElse(DataTypes.NULL())); diff --git a/chunjun-connectors/chunjun-connector-hbase-base/src/test/java/com/dtstack/chunjun/connector/hbase/converter/HBaseSerdeTest.java b/chunjun-connectors/chunjun-connector-hbase-base/src/test/java/com/dtstack/chunjun/connector/hbase/converter/HBaseSerdeTest.java index 0e2b1e19bd..583da3313c 100644 --- a/chunjun-connectors/chunjun-connector-hbase-base/src/test/java/com/dtstack/chunjun/connector/hbase/converter/HBaseSerdeTest.java +++ b/chunjun-connectors/chunjun-connector-hbase-base/src/test/java/com/dtstack/chunjun/connector/hbase/converter/HBaseSerdeTest.java @@ -19,6 +19,7 @@ package com.dtstack.chunjun.connector.hbase.converter; import com.dtstack.chunjun.connector.hbase.HBaseTableSchema; +import com.dtstack.chunjun.connector.hbase.config.HBaseConfig; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.StringData; @@ -42,11 +43,11 @@ public void setUp() { tableSchema = new HBaseTableSchema(); tableSchema.setRowKey("stu:id", Integer.class); tableSchema.addColumn("stu", "name", String.class); - serde = new HBaseSerde(tableSchema, "NULL"); + serde = new HBaseSerde(tableSchema, new HBaseConfig()); } @Test - public void testCreatePutMutation() { + public void testCreatePutMutation() throws Exception { GenericRowData data = new GenericRowData(1); data.setField(0, StringData.fromString("hbase_test")); GenericRowData rowData = new GenericRowData(2); diff --git a/chunjun-connectors/chunjun-connector-hbase2/pom.xml b/chunjun-connectors/chunjun-connector-hbase2/pom.xml index c6396abfa7..5a60e12b33 100644 --- a/chunjun-connectors/chunjun-connector-hbase2/pom.xml +++ b/chunjun-connectors/chunjun-connector-hbase2/pom.xml @@ -46,6 +46,10 @@ log4j-1.2-api org.apache.logging.log4j + + hbase-client + org.apache.hbase + diff --git a/chunjun-connectors/chunjun-connector-hbase2/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/chunjun-connectors/chunjun-connector-hbase2/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory new file mode 100644 index 0000000000..9ab3174da8 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-hbase2/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +com.dtstack.chunjun.connector.hbase2.table.Hbase2DynamicTableFactory