Skip to content

Commit

Permalink
DRILL-6130: Fix NPE during physical plan submission for various stora…
Browse files Browse the repository at this point in the history
…ge plugins

1. Fixed ser / de issues for Hive, Kafka, Hbase plugins.
2. Added physical plan submission unit test for all storage plugins in contrib module.
3. Refactoring.

closes apache#1108
  • Loading branch information
arina-ielchiieva authored and vdiravka committed Feb 16, 2018
1 parent 58f3b10 commit 58e4cec
Show file tree
Hide file tree
Showing 26 changed files with 326 additions and 268 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ private void init() {
statsCalculator = new TableStatsCalculator(conn, hbaseScanSpec, storagePlugin.getContext().getConfig(), storagePluginConfig);

boolean foundStartRegion = false;
regionsToScan = new TreeMap<HRegionInfo, ServerName>();
regionsToScan = new TreeMap<>();
for (HRegionLocation regionLocation : regionLocations) {
HRegionInfo regionInfo = regionLocation.getRegionInfo();
if (!foundStartRegion && hbaseScanSpec.getStartRow() != null && hbaseScanSpec.getStartRow().length != 0 && !regionInfo.containsRow(hbaseScanSpec.getStartRow())) {
Expand Down Expand Up @@ -338,8 +338,7 @@ public HBaseSubScan getSpecificScan(int minorFragmentId) {
assert minorFragmentId < endpointFragmentMapping.size() : String.format(
"Mappings length [%d] should be greater than minor fragment id [%d] but it isn't.", endpointFragmentMapping.size(),
minorFragmentId);
return new HBaseSubScan(getUserName(), storagePlugin, storagePluginConfig,
endpointFragmentMapping.get(minorFragmentId), columns);
return new HBaseSubScan(getUserName(), storagePlugin, endpointFragmentMapping.get(minorFragmentId), columns);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.physical.base.AbstractBase;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.PhysicalVisitor;
Expand All @@ -49,44 +48,43 @@
public class HBaseSubScan extends AbstractBase implements SubScan {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseSubScan.class);

@JsonProperty
public final HBaseStoragePluginConfig storage;
@JsonIgnore
private final HBaseStoragePlugin hbaseStoragePlugin;
private final List<HBaseSubScanSpec> regionScanSpecList;
private final List<SchemaPath> columns;

@JsonCreator
public HBaseSubScan(@JacksonInject StoragePluginRegistry registry,
@JsonProperty("userName") String userName,
@JsonProperty("storage") StoragePluginConfig storage,
@JsonProperty("hbaseStoragePluginConfig") HBaseStoragePluginConfig hbaseStoragePluginConfig,
@JsonProperty("regionScanSpecList") LinkedList<HBaseSubScanSpec> regionScanSpecList,
@JsonProperty("columns") List<SchemaPath> columns) throws ExecutionSetupException {
super(userName);
hbaseStoragePlugin = (HBaseStoragePlugin) registry.getPlugin(storage);
this.regionScanSpecList = regionScanSpecList;
this.storage = (HBaseStoragePluginConfig) storage;
this.columns = columns;
this(userName,
(HBaseStoragePlugin) registry.getPlugin(hbaseStoragePluginConfig),
regionScanSpecList,
columns);
}

public HBaseSubScan(String userName, HBaseStoragePlugin plugin, HBaseStoragePluginConfig config,
List<HBaseSubScanSpec> regionInfoList, List<SchemaPath> columns) {
public HBaseSubScan(String userName,
HBaseStoragePlugin hbaseStoragePlugin,
List<HBaseSubScanSpec> regionInfoList,
List<SchemaPath> columns) {
super(userName);
hbaseStoragePlugin = plugin;
storage = config;
this.hbaseStoragePlugin = hbaseStoragePlugin;
this.regionScanSpecList = regionInfoList;
this.columns = columns;
}

public List<HBaseSubScanSpec> getRegionScanSpecList() {
return regionScanSpecList;
@JsonProperty
public HBaseStoragePluginConfig getHbaseStoragePluginConfig() {
return hbaseStoragePlugin.getConfig();
}

@JsonIgnore
public HBaseStoragePluginConfig getStorageConfig() {
return storage;
@JsonProperty
public List<HBaseSubScanSpec> getRegionScanSpecList() {
return regionScanSpecList;
}

@JsonProperty
public List<SchemaPath> getColumns() {
return columns;
}
Expand All @@ -109,7 +107,7 @@ public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVis
@Override
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
Preconditions.checkArgument(children.isEmpty());
return new HBaseSubScan(getUserName(), hbaseStoragePlugin, storage, regionScanSpecList, columns);
return new HBaseSubScan(getUserName(), hbaseStoragePlugin, regionScanSpecList, columns);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Arrays;
import java.util.List;

import org.apache.drill.PlanTestBase;
import org.apache.drill.categories.HbaseStorageTest;
import org.apache.drill.categories.SlowTest;
import org.apache.drill.exec.rpc.user.QueryDataBatch;
Expand Down Expand Up @@ -109,4 +110,9 @@ public void testSelectFromSchema() throws Exception {
runHBaseSQLVerifyCount("SELECT row_key\n"
+ " FROM hbase.TestTableNullStr t WHERE row_key='a1'", 1);
}

@Test
public void testPhysicalPlanSubmission() throws Exception {
PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from hbase.TestTableNullStr");
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -64,7 +64,7 @@ public HivePartitionDescriptor(@SuppressWarnings("unused") final PlannerSettings
this.scanRel = scanRel;
this.managedBuffer = managedBuffer.reallocIfNeeded(256);
this.defaultPartitionValue = defaultPartitionValue;
for (HiveTableWrapper.FieldSchemaWrapper wrapper : ((HiveScan) scanRel.getGroupScan()).hiveReadEntry.table.partitionKeys) {
for (HiveTableWrapper.FieldSchemaWrapper wrapper : ((HiveScan) scanRel.getGroupScan()).getHiveReadEntry().table.partitionKeys) {
partitionMap.put(wrapper.name, i);
i++;
}
Expand All @@ -88,7 +88,7 @@ public int getMaxHierarchyLevel() {

@Override
public String getBaseTableLocation() {
HiveReadEntry origEntry = ((HiveScan) scanRel.getGroupScan()).hiveReadEntry;
HiveReadEntry origEntry = ((HiveScan) scanRel.getGroupScan()).getHiveReadEntry();
return origEntry.table.getTable().getSd().getLocation();
}

Expand All @@ -97,7 +97,7 @@ public void populatePartitionVectors(ValueVector[] vectors, List<PartitionLocati
BitSet partitionColumnBitSet, Map<Integer, String> fieldNameMap) {
int record = 0;
final HiveScan hiveScan = (HiveScan) scanRel.getGroupScan();
final Map<String, String> partitionNameTypeMap = hiveScan.hiveReadEntry.table.getPartitionNameTypeMap();
final Map<String, String> partitionNameTypeMap = hiveScan.getHiveReadEntry().table.getPartitionNameTypeMap();
for(PartitionLocation partitionLocation: partitions) {
for(int partitionColumnIndex : BitSets.toIter(partitionColumnBitSet)){
final String hiveType = partitionNameTypeMap.get(fieldNameMap.get(partitionColumnIndex));
Expand Down Expand Up @@ -126,7 +126,7 @@ int record = 0;
public TypeProtos.MajorType getVectorType(SchemaPath column, PlannerSettings plannerSettings) {
HiveScan hiveScan = (HiveScan) scanRel.getGroupScan();
String partitionName = column.getAsNamePart().getName();
Map<String, String> partitionNameTypeMap = hiveScan.hiveReadEntry.table.getPartitionNameTypeMap();
Map<String, String> partitionNameTypeMap = hiveScan.getHiveReadEntry().table.getPartitionNameTypeMap();
String hiveType = partitionNameTypeMap.get(partitionName);
PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) TypeInfoUtils.getTypeInfoFromTypeString(hiveType);

Expand All @@ -143,7 +143,7 @@ public Integer getIdIfValid(String name) {
@Override
protected void createPartitionSublists() {
List<PartitionLocation> locations = new LinkedList<>();
HiveReadEntry origEntry = ((HiveScan) scanRel.getGroupScan()).hiveReadEntry;
HiveReadEntry origEntry = ((HiveScan) scanRel.getGroupScan()).getHiveReadEntry();
for (Partition partition: origEntry.getPartitions()) {
locations.add(new HivePartitionLocation(partition.getValues(), partition.getSd().getLocation()));
}
Expand All @@ -165,7 +165,7 @@ public TableScan createTableScan(List<PartitionLocation> newPartitions, boolean

private GroupScan createNewGroupScan(List<PartitionLocation> newPartitionLocations) throws ExecutionSetupException {
HiveScan hiveScan = (HiveScan) scanRel.getGroupScan();
HiveReadEntry origReadEntry = hiveScan.hiveReadEntry;
HiveReadEntry origReadEntry = hiveScan.getHiveReadEntry();
List<HiveTableWrapper.HivePartitionWrapper> oldPartitions = origReadEntry.partitions;
List<HiveTableWrapper.HivePartitionWrapper> newPartitions = Lists.newLinkedList();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,16 @@ public boolean matches(RelOptRuleCall call) {

final HiveScan hiveScan = (HiveScan) scanRel.getGroupScan();
final HiveConf hiveConf = hiveScan.getHiveConf();
final HiveTableWithColumnCache hiveTable = hiveScan.hiveReadEntry.getTable();
final HiveTableWithColumnCache hiveTable = hiveScan.getHiveReadEntry().getTable();

final Class<? extends InputFormat<?,?>> tableInputFormat =
getInputFormatFromSD(HiveUtilities.getTableMetadata(hiveTable), hiveScan.hiveReadEntry, hiveTable.getSd(),
getInputFormatFromSD(HiveUtilities.getTableMetadata(hiveTable), hiveScan.getHiveReadEntry(), hiveTable.getSd(),
hiveConf);
if (tableInputFormat == null || !tableInputFormat.equals(MapredParquetInputFormat.class)) {
return false;
}

final List<HivePartitionWrapper> partitions = hiveScan.hiveReadEntry.getHivePartitionWrappers();
final List<HivePartitionWrapper> partitions = hiveScan.getHiveReadEntry().getHivePartitionWrappers();
if (partitions == null) {
return true;
}
Expand All @@ -116,7 +116,7 @@ public boolean matches(RelOptRuleCall call) {
for (HivePartitionWrapper partition : partitions) {
final StorageDescriptor partitionSD = partition.getPartition().getSd();
Class<? extends InputFormat<?, ?>> inputFormat = getInputFormatFromSD(
HiveUtilities.getPartitionMetadata(partition.getPartition(), hiveTable), hiveScan.hiveReadEntry, partitionSD,
HiveUtilities.getPartitionMetadata(partition.getPartition(), hiveTable), hiveScan.getHiveReadEntry(), partitionSD,
hiveConf);
if (inputFormat == null || !inputFormat.equals(tableInputFormat)) {
return false;
Expand Down Expand Up @@ -172,7 +172,7 @@ public void onMatch(RelOptRuleCall call) {
final PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
final String partitionColumnLabel = settings.getFsPartitionColumnLabel();

final Table hiveTable = hiveScan.hiveReadEntry.getTable();
final Table hiveTable = hiveScan.getHiveReadEntry().getTable();
checkForUnsupportedDataTypes(hiveTable);

final Map<String, String> partitionColMapping =
Expand Down Expand Up @@ -245,8 +245,8 @@ private DrillScanRel createNativeScanRel(final Map<String, String> partitionColM
final HiveDrillNativeParquetScan nativeHiveScan =
new HiveDrillNativeParquetScan(
hiveScan.getUserName(),
hiveScan.hiveReadEntry,
hiveScan.storagePlugin,
hiveScan.getHiveReadEntry(),
hiveScan.getStoragePlugin(),
nativeScanCols,
null);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -41,16 +41,16 @@ public class HiveDrillNativeParquetScan extends HiveScan {

@JsonCreator
public HiveDrillNativeParquetScan(@JsonProperty("userName") String userName,
@JsonProperty("hive-table") HiveReadEntry hiveReadEntry,
@JsonProperty("storage-plugin") String storagePluginName,
@JsonProperty("hiveReadEntry") HiveReadEntry hiveReadEntry,
@JsonProperty("hiveStoragePluginConfig") HiveStoragePluginConfig hiveStoragePluginConfig,
@JsonProperty("columns") List<SchemaPath> columns,
@JacksonInject StoragePluginRegistry pluginRegistry) throws ExecutionSetupException {
super(userName, hiveReadEntry, storagePluginName, columns, pluginRegistry);
super(userName, hiveReadEntry, hiveStoragePluginConfig, columns, pluginRegistry);
}

public HiveDrillNativeParquetScan(String userName, HiveReadEntry hiveReadEntry, HiveStoragePlugin storagePlugin,
public HiveDrillNativeParquetScan(String userName, HiveReadEntry hiveReadEntry, HiveStoragePlugin hiveStoragePlugin,
List<SchemaPath> columns, HiveMetadataProvider metadataProvider) throws ExecutionSetupException {
super(userName, hiveReadEntry, storagePlugin, columns, metadataProvider);
super(userName, hiveReadEntry, hiveStoragePlugin, columns, metadataProvider);
}

public HiveDrillNativeParquetScan(final HiveScan hiveScan) {
Expand Down Expand Up @@ -91,7 +91,7 @@ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) thro

@Override
public HiveScan clone(HiveReadEntry hiveReadEntry) throws ExecutionSetupException {
return new HiveDrillNativeParquetScan(getUserName(), hiveReadEntry, storagePlugin, columns, metadataProvider);
return new HiveDrillNativeParquetScan(getUserName(), hiveReadEntry, getStoragePlugin(), getColumns(), getMetadataProvider());
}

@Override
Expand All @@ -103,12 +103,12 @@ public GroupScan clone(List<SchemaPath> columns) {

@Override
public String toString() {
final List<HivePartitionWrapper> partitions = hiveReadEntry.getHivePartitionWrappers();
final List<HivePartitionWrapper> partitions = getHiveReadEntry().getHivePartitionWrappers();
int numPartitions = partitions == null ? 0 : partitions.size();
return "HiveDrillNativeParquetScan [table=" + hiveReadEntry.getHiveTableWrapper()
+ ", columns=" + columns
return "HiveDrillNativeParquetScan [table=" + getHiveReadEntry().getHiveTableWrapper()
+ ", columns=" + getColumns()
+ ", numPartitions=" + numPartitions
+ ", partitions= " + partitions
+ ", inputDirectories=" + metadataProvider.getInputDirectories(hiveReadEntry) + "]";
+ ", inputDirectories=" + getMetadataProvider().getInputDirectories(getHiveReadEntry()) + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ public HiveDrillNativeParquetSubScan(@JacksonInject StoragePluginRegistry regist
@JsonProperty("hiveReadEntry") HiveReadEntry hiveReadEntry,
@JsonProperty("splitClasses") List<String> splitClasses,
@JsonProperty("columns") List<SchemaPath> columns,
@JsonProperty("storagePluginName") String pluginName)
@JsonProperty("hiveStoragePluginConfig") HiveStoragePluginConfig hiveStoragePluginConfig)
throws IOException, ExecutionSetupException, ReflectiveOperationException {
super(registry, userName, splits, hiveReadEntry, splitClasses, columns, pluginName);
super(registry, userName, splits, hiveReadEntry, splitClasses, columns, hiveStoragePluginConfig);
}

public HiveDrillNativeParquetSubScan(final HiveSubScan subScan)
Expand Down
Loading

0 comments on commit 58e4cec

Please sign in to comment.