Skip to content

Commit

Permalink
DRILL-5730: Mock testing improvements and interface improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
ilooner authored and arina-ielchiieva committed Jan 26, 2018
1 parent 9926eda commit 186536d
Show file tree
Hide file tree
Showing 224 changed files with 2,732 additions and 2,941 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,11 @@ public class DrillConfig extends NestedConfig {

private final ImmutableList<String> startupArguments;

public static final boolean ON_OSX = System.getProperty("os.name").contains("OS X");

@SuppressWarnings("restriction")
private static final long MAX_DIRECT_MEMORY = sun.misc.VM.maxDirectMemory();

@VisibleForTesting
public DrillConfig(Config config, boolean enableServerConfigs) {
public DrillConfig(Config config) {
super(config);
logger.debug("Setting up DrillConfig object.");
logger.trace("Given Config object is:\n{}",
Expand Down Expand Up @@ -138,7 +136,7 @@ public static DrillConfig forClient() {
* @param overrideFileResourcePathname
* the classpath resource pathname of the file to use for
* configuration override purposes; {@code null} specifies to use the
* default pathname ({@link CommonConstants.CONFIG_OVERRIDE}) (does
* default pathname ({@link CommonConstants#CONFIG_OVERRIDE_RESOURCE_PATHNAME}) (does
* <strong>not</strong> specify to suppress trying to load an
* overrides file)
* @return A merged Config object.
Expand Down Expand Up @@ -169,7 +167,7 @@ public static DrillConfig create(String overrideFileResourcePathname, boolean en
* @return {@link DrillConfig} instance
*/
public static DrillConfig create(Config config) {
return new DrillConfig(config.resolve(), true);
return new DrillConfig(config.resolve());
}

/**
Expand Down Expand Up @@ -252,7 +250,7 @@ private static DrillConfig create(String overrideFileResourcePathname,
logger.info("Configuration and plugin file(s) identified in {}ms.\n{}",
watch.elapsed(TimeUnit.MILLISECONDS),
logString);
return new DrillConfig(effectiveConfig.resolve(), enableServerConfigs);
return new DrillConfig(effectiveConfig.resolve());
}

public <T> Class<T> getClassAt(String location, Class<T> clazz) throws DrillConfigurationException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
import com.typesafe.config.ConfigValue;

abstract class NestedConfig implements Config {
// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NestedConfig.class);

private final Config c;

NestedConfig(Config c) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.util.List;

import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.physical.impl.ScanBatch;
import org.apache.drill.exec.record.RecordBatch;
Expand All @@ -33,11 +33,9 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;

public class MapRDBScanBatchCreator implements BatchCreator<MapRDBSubScan>{
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBScanBatchCreator.class);

public class MapRDBScanBatchCreator implements BatchCreator<MapRDBSubScan> {
@Override
public ScanBatch getBatch(FragmentContext context, MapRDBSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException {
public ScanBatch getBatch(ExecutorFragmentContext context, MapRDBSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException {
Preconditions.checkArgument(children.isEmpty());
List<RecordReader> readers = Lists.newArrayList();
for(MapRDBSubScanSpec scanSpec : subScan.getRegionScanSpecList()){
Expand All @@ -46,8 +44,7 @@ public ScanBatch getBatch(FragmentContext context, MapRDBSubScan subScan, List<R
readers.add(new HBaseRecordReader(
subScan.getFormatPlugin().getConnection(),
getHBaseSubScanSpec(scanSpec),
subScan.getColumns(),
context));
subScan.getColumns()));
} else {
readers.add(new MaprDBJsonRecordReader(scanSpec, subScan.getFormatPluginConfig(), subScan.getColumns(), context));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@
public class MaprDBJsonRecordReader extends AbstractRecordReader {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MaprDBJsonRecordReader.class);

public static final SchemaPath ID_PATH = SchemaPath.getSimplePath(ID_KEY);
private final long MILLISECONDS_IN_A_DAY = (long)1000 * 60 * 60 * 24;

private Table table;
Expand Down Expand Up @@ -117,7 +116,7 @@ public MaprDBJsonRecordReader(MapRDBSubScanSpec subScanSpec,

disableCountOptimization = formatPluginConfig.disableCountOptimization();
setColumns(projectedColumns);
unionEnabled = context.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE);
unionEnabled = context.getOptions().getBoolean(ExecConstants.ENABLE_UNION_TYPE_KEY);
readNumbersAsDouble = formatPluginConfig.isReadAllNumbersAsDouble();
allTextMode = formatPluginConfig.isAllTextMode();
ignoreSchemaChange = formatPluginConfig.isIgnoreSchemaChange();
Expand Down Expand Up @@ -518,5 +517,4 @@ public void close() {
table.close();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.drill.common.expression.PathSegment.NameSegment;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.ops.OperatorStats;
import org.apache.drill.exec.physical.impl.OutputMutator;
Expand Down Expand Up @@ -84,8 +83,7 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas

private final Connection connection;

public HBaseRecordReader(Connection connection, HBaseSubScan.HBaseSubScanSpec subScanSpec,
List<SchemaPath> projectedColumns, FragmentContext context) {
public HBaseRecordReader(Connection connection, HBaseSubScan.HBaseSubScanSpec subScanSpec, List<SchemaPath> projectedColumns) {
this.connection = connection;
hbaseTableName = TableName.valueOf(
Preconditions.checkNotNull(subScanSpec, "HBase reader needs a sub-scan spec").getTableName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.physical.impl.ScanBatch;
Expand All @@ -31,11 +31,9 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;

public class HBaseScanBatchCreator implements BatchCreator<HBaseSubScan>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseScanBatchCreator.class);

public class HBaseScanBatchCreator implements BatchCreator<HBaseSubScan> {
@Override
public ScanBatch getBatch(FragmentContext context, HBaseSubScan subScan, List<RecordBatch> children)
public ScanBatch getBatch(ExecutorFragmentContext context, HBaseSubScan subScan, List<RecordBatch> children)
throws ExecutionSetupException {
Preconditions.checkArgument(children.isEmpty());
List<RecordReader> readers = Lists.newArrayList();
Expand All @@ -45,7 +43,7 @@ public ScanBatch getBatch(FragmentContext context, HBaseSubScan subScan, List<Re
if ((columns = subScan.getColumns())==null) {
columns = GroupScan.ALL_COLUMNS;
}
readers.add(new HBaseRecordReader(subScan.getStorageEngine().getConnection(), scanSpec, columns, context));
readers.add(new HBaseRecordReader(subScan.getStorageEngine().getConnection(), scanSpec, columns));
} catch (Exception e1) {
throw new ExecutionSetupException(e1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.store.hive.HivePartition;
import org.apache.drill.exec.store.hive.HiveTableWithColumnCache;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.hadoop.io.Writable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.physical.impl.ScanBatch;
Expand Down Expand Up @@ -57,9 +57,10 @@

@SuppressWarnings("unused")
public class HiveDrillNativeScanBatchCreator implements BatchCreator<HiveDrillNativeParquetSubScan> {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveDrillNativeScanBatchCreator.class);

@Override
public ScanBatch getBatch(FragmentContext context, HiveDrillNativeParquetSubScan config, List<RecordBatch> children)
public ScanBatch getBatch(ExecutorFragmentContext context, HiveDrillNativeParquetSubScan config, List<RecordBatch> children)
throws ExecutionSetupException {
final HiveTableWithColumnCache table = config.getTable();
final List<List<InputSplit>> splits = config.getInputSplits();
Expand Down Expand Up @@ -176,7 +177,7 @@ public ScanBatch getBatch(FragmentContext context, HiveDrillNativeParquetSubScan
ImpersonationUtil.createProxyUgi(config.getUserName(), context.getQueryUserName())));
}

return new ScanBatch(config, context, oContext, readers, implicitColumns);
return new ScanBatch(context, oContext, readers, implicitColumns);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.util.List;

import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.physical.impl.ScanBatch;
import org.apache.drill.exec.record.RecordBatch;
Expand All @@ -29,12 +29,12 @@

@SuppressWarnings("unused")
public class HiveScanBatchCreator implements BatchCreator<HiveSubScan> {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveScanBatchCreator.class);

@Override
public ScanBatch getBatch(FragmentContext context, HiveSubScan config, List<RecordBatch> children)
public ScanBatch getBatch(ExecutorFragmentContext context, HiveSubScan config, List<RecordBatch> children)
throws ExecutionSetupException {
AbstractReadersInitializer readersInitializer = ReadersInitializer.getInitializer(context, config);
return new ScanBatch(config, context, readersInitializer.init());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.OptimizerRulesContext;
import org.apache.drill.exec.planner.sql.logical.ConvertHiveParquetScanToDrillParquetScan;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.apache.drill.exec.expr.holders.Decimal38SparseHolder;
import org.apache.drill.exec.expr.holders.Decimal9Holder;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.server.options.OptionSet;
import org.apache.drill.exec.util.DecimalUtility;
import org.apache.drill.exec.vector.NullableBigIntVector;
import org.apache.drill.exec.vector.NullableBitVector;
Expand Down Expand Up @@ -288,7 +288,7 @@ public static void populateVector(final ValueVector vector, final DrillBuf manag
}
}

public static MajorType getMajorTypeFromHiveTypeInfo(final TypeInfo typeInfo, final OptionManager options) {
public static MajorType getMajorTypeFromHiveTypeInfo(final TypeInfo typeInfo, final OptionSet options) {
switch (typeInfo.getCategory()) {
case PRIMITIVE: {
PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) typeInfo;
Expand Down Expand Up @@ -325,7 +325,7 @@ public static MajorType getMajorTypeFromHiveTypeInfo(final TypeInfo typeInfo, fi
}

public static TypeProtos.MinorType getMinorTypeFromHivePrimitiveTypeInfo(PrimitiveTypeInfo primitiveTypeInfo,
OptionManager options) {
OptionSet options) {
switch(primitiveTypeInfo.getPrimitiveCategory()) {
case BINARY:
return TypeProtos.MinorType.VARBINARY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.drill.exec.hive;

import mockit.integration.junit4.JMockit;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import org.apache.drill.categories.HiveStorageTest;
Expand All @@ -33,7 +32,6 @@
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.math.BigDecimal;
import java.sql.Date;
Expand All @@ -48,7 +46,6 @@
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

@RunWith(JMockit.class)
@Category({SlowTest.class, HiveStorageTest.class})
public class TestHiveStorage extends HiveTestBase {
@BeforeClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,21 @@
import java.util.List;

import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.physical.impl.ScanBatch;
//import org.apache.drill.exec.record.CloseableRecordBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.store.RecordReader;

import com.google.common.base.Preconditions;

public class JdbcBatchCreator implements BatchCreator<JdbcSubScan> {
@Override
public ScanBatch getBatch(FragmentContext context, JdbcSubScan config,
public ScanBatch getBatch(ExecutorFragmentContext context, JdbcSubScan config,
List<RecordBatch> children) throws ExecutionSetupException {
Preconditions.checkArgument(children.isEmpty());
JdbcStoragePlugin plugin = config.getPlugin();
RecordReader reader = new JdbcRecordReader(context, plugin.getSource(), config.getSql(), plugin.getName());
RecordReader reader = new JdbcRecordReader(plugin.getSource(), config.getSql(), plugin.getName());
return new ScanBatch(config, context, Collections.singletonList(reader));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.record.MaterializedField;
Expand Down Expand Up @@ -70,17 +69,13 @@ class JdbcRecordReader extends AbstractRecordReader {
private final DataSource source;
private ResultSet resultSet;
private final String storagePluginName;
private FragmentContext fragmentContext;
private Connection connection;
private Statement statement;
private final String sql;
private ImmutableList<ValueVector> vectors;
private ImmutableList<Copier<?>> copiers;

private OperatorContext operatorContext;

public JdbcRecordReader(FragmentContext fragmentContext, DataSource source, String sql, String storagePluginName) {
this.fragmentContext = fragmentContext;
public JdbcRecordReader(DataSource source, String sql, String storagePluginName) {
this.source = source;
this.sql = sql;
this.storagePluginName = storagePluginName;
Expand Down Expand Up @@ -170,8 +165,6 @@ private Copier<?> getCopier(int jdbcType, int offset, ResultSet result, ValueVec
@Override
public void setup(OperatorContext operatorContext, OutputMutator output) throws ExecutionSetupException {
try {

this.operatorContext = operatorContext;
connection = source.getConnection();
statement = connection.createStatement();
resultSet = statement.executeQuery(sql);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,12 @@ public class KafkaRecordReader extends AbstractRecordReader {
public KafkaRecordReader(KafkaSubScan.KafkaSubScanSpec subScanSpec, List<SchemaPath> projectedColumns,
FragmentContext context, KafkaStoragePlugin plugin) {
setColumns(projectedColumns);
this.enableAllTextMode = context.getOptions().getOption(ExecConstants.KAFKA_ALL_TEXT_MODE).bool_val;
this.readNumbersAsDouble = context.getOptions()
.getOption(ExecConstants.KAFKA_READER_READ_NUMBERS_AS_DOUBLE).bool_val;
OptionManager options = context.getOptions();
this.unionEnabled = options.getOption(ExecConstants.ENABLE_UNION_TYPE);
this.kafkaMsgReader = options.getOption(ExecConstants.KAFKA_RECORD_READER).string_val;
this.kafkaPollTimeOut = options.getOption(ExecConstants.KAFKA_POLL_TIMEOUT).num_val;
final OptionManager optionManager = context.getOptions();
this.enableAllTextMode = optionManager.getBoolean(ExecConstants.KAFKA_ALL_TEXT_MODE);
this.readNumbersAsDouble = optionManager.getBoolean(ExecConstants.KAFKA_READER_READ_NUMBERS_AS_DOUBLE);
this.unionEnabled = optionManager.getBoolean(ExecConstants.ENABLE_UNION_TYPE_KEY);
this.kafkaMsgReader = optionManager.getString(ExecConstants.KAFKA_RECORD_READER);
this.kafkaPollTimeOut = optionManager.getLong(ExecConstants.KAFKA_POLL_TIMEOUT);
this.plugin = plugin;
this.subScanSpec = subScanSpec;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.physical.impl.ScanBatch;
Expand All @@ -38,7 +38,7 @@ public class KafkaScanBatchCreator implements BatchCreator<KafkaSubScan> {
static final Logger logger = LoggerFactory.getLogger(KafkaScanBatchCreator.class);

@Override
public CloseableRecordBatch getBatch(FragmentContext context, KafkaSubScan subScan, List<RecordBatch> children)
public CloseableRecordBatch getBatch(ExecutorFragmentContext context, KafkaSubScan subScan, List<RecordBatch> children)
throws ExecutionSetupException {
Preconditions.checkArgument(children.isEmpty());
List<SchemaPath> columns = subScan.getColumns() != null ? subScan.getColumns() : GroupScan.ALL_COLUMNS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.record.MaterializedField;
Expand Down Expand Up @@ -87,8 +86,7 @@ private static class ProjectedColumnInfo {

private ImmutableList<ProjectedColumnInfo> projectedCols;

public KuduRecordReader(KuduClient client, KuduSubScan.KuduSubScanSpec subScanSpec,
List<SchemaPath> projectedColumns, FragmentContext context) {
public KuduRecordReader(KuduClient client, KuduSubScan.KuduSubScanSpec subScanSpec, List<SchemaPath> projectedColumns) {
setColumns(projectedColumns);
this.client = client;
scanSpec = subScanSpec;
Expand Down
Loading

0 comments on commit 186536d

Please sign in to comment.