diff --git a/contrib/format-daffodil/README.md b/contrib/format-daffodil/README.md
new file mode 100644
index 00000000000..784e7a7dcf5
--- /dev/null
+++ b/contrib/format-daffodil/README.md
@@ -0,0 +1,27 @@
+# Daffodil 'Format' Reader
+This plugin enables Drill to read DFDL-described data from files by way of the Apache Daffodil DFDL implementation.
+
+## Validation
+
+Data read by Daffodil is always validated using Daffodil's Limited Validation mode.
+
+TBD: do we need an option to control escalating validation errors to fatal? Currently this is not provided.
+
+## Limitations: TBD
+
+At the moment, the DFDL schema is found on the local file system, which won't support Drill's distributed architecture.
+
+There are restrictions on the DFDL schemas that this can handle.
+
+In particular, all element children must have distinct element names, including across choice branches.
+(This rules out a number of large DFDL schemas.)
+
+TBD: Auto renaming as part of the Daffodil-to-Drill metadata mapping?
+
+The data is parsed fully from its native form into a Drill data structure held in memory.
+No attempt is made to avoid access to parts of the DFDL-described data that are not needed to answer the query.
+
+If the data is not well-formed, an error occurs and the query fails.
+
+If the data is invalid, and validity checking by Daffodil is enabled, then an error occurs and the query fails.
+
diff --git a/contrib/format-daffodil/pom.xml b/contrib/format-daffodil/pom.xml
new file mode 100644
index 00000000000..0cae44a5257
--- /dev/null
+++ b/contrib/format-daffodil/pom.xml
@@ -0,0 +1,94 @@
+
+
+
+ 4.0.0
+
+
+ drill-contrib-parent
+ org.apache.drill.contrib
+ 1.22.0-SNAPSHOT
+
+
+ drill-format-daffodil
+ Drill : Contrib : Format : Daffodil
+
+
+
+ org.apache.drill.exec
+ drill-java-exec
+ ${project.version}
+
+
+ org.apache.daffodil
+ daffodil-japi_2.12
+ 3.8.0
+
+
+ org.apache.daffodil
+ daffodil-runtime1_2.12
+ 3.8.0
+
+
+
+ org.apache.drill.exec
+ drill-java-exec
+ tests
+ ${project.version}
+ test
+
+
+
+ org.apache.drill
+ drill-common
+ tests
+ ${project.version}
+ test
+
+
+
+
+
+
+ maven-resources-plugin
+
+
+ copy-java-sources
+ process-sources
+
+ copy-resources
+
+
+ ${basedir}/target/classes/org/apache/drill/exec/store/daffodil
+
+
+
+ src/main/java/org/apache/drill/exec/store/daffodil
+ true
+
+
+
+
+
+
+
+
+
diff --git a/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilBatchReader.java b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilBatchReader.java
new file mode 100644
index 00000000000..976122c7fd3
--- /dev/null
+++ b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilBatchReader.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.daffodil;
+
+import org.apache.daffodil.japi.DataProcessor;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.daffodil.schema.DaffodilDataProcessorFactory;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Objects;
+
+import static org.apache.drill.exec.store.daffodil.schema.DaffodilDataProcessorFactory.CompileFailure;
+import static org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaUtils.daffodilDataProcessorToDrillSchema;
+
+public class DaffodilBatchReader implements ManagedReader {
+
+ private static final Logger logger = LoggerFactory.getLogger(DaffodilBatchReader.class);
+ private final RowSetLoader rowSetLoader;
+ private final CustomErrorContext errorContext;
+ private final DaffodilMessageParser dafParser;
+ private final InputStream dataInputStream;
+
+ public DaffodilBatchReader(DaffodilReaderConfig readerConfig, EasySubScan scan,
+ FileSchemaNegotiator negotiator) {
+
+ errorContext = negotiator.parentErrorContext();
+ DaffodilFormatConfig dafConfig = readerConfig.plugin.getConfig();
+
+ String schemaURIString = dafConfig.getSchemaURI(); // "schema/complexArray1.dfdl.xsd";
+ String rootName = dafConfig.getRootName();
+ String rootNamespace = dafConfig.getRootNamespace();
+ boolean validationMode = dafConfig.getValidationMode();
+
+ URI dfdlSchemaURI;
+ try {
+ dfdlSchemaURI = new URI(schemaURIString);
+ } catch (URISyntaxException e) {
+ throw UserException.validationError(e).build(logger);
+ }
+
+ FileDescrip file = negotiator.file();
+ DrillFileSystem fs = file.fileSystem();
+ URI fsSchemaURI = fs.getUri().resolve(dfdlSchemaURI);
+
+ DaffodilDataProcessorFactory dpf = new DaffodilDataProcessorFactory();
+ DataProcessor dp;
+ try {
+ dp = dpf.getDataProcessor(fsSchemaURI, validationMode, rootName, rootNamespace);
+ } catch (CompileFailure e) {
+ throw UserException.dataReadError(e)
+ .message(String.format("Failed to get Daffodil DFDL processor for: %s", fsSchemaURI))
+ .addContext(errorContext).addContext(e.getMessage()).build(logger);
+ }
+ // Create the corresponding Drill schema.
+ // Note: this could be a very large schema. Think of a large complex RDBMS schema,
+ // all of it, hundreds of tables, but all part of the same metadata tree.
+ TupleMetadata drillSchema = daffodilDataProcessorToDrillSchema(dp);
+ // Inform Drill about the schema
+ negotiator.tableSchema(drillSchema, true);
+
+ //
+ // DATA TIME: Next we construct the runtime objects, and open files.
+ //
+ // We get the DaffodilMessageParser, which is a stateful driver for daffodil that
+ // actually does the parsing.
+ rowSetLoader = negotiator.build().writer();
+
+ // We construct the Daffodil InfosetOutputter which the daffodil parser uses to
+ // convert infoset event calls to fill in a Drill row via a rowSetLoader.
+ DaffodilDrillInfosetOutputter outputter = new DaffodilDrillInfosetOutputter(rowSetLoader);
+
+ // Now we can set up the dafParser with the outputter it will drive with
+ // the parser-produced infoset.
+ dafParser = new DaffodilMessageParser(dp); // needs further initialization after this.
+ dafParser.setInfosetOutputter(outputter);
+
+ Path dataPath = file.split().getPath();
+ // Lastly, we open the data stream
+ try {
+ dataInputStream = fs.openPossiblyCompressedStream(dataPath);
+ } catch (IOException e) {
+ throw UserException.dataReadError(e)
+ .message(String.format("Failed to open input file: %s", dataPath.toString()))
+ .addContext(errorContext).addContext(e.getMessage()).build(logger);
+ }
+ // And lastly,... tell daffodil the input data stream.
+ dafParser.setInputStream(dataInputStream);
+ }
+
+ /**
+ * This is the core of actual processing - data movement from Daffodil to Drill.
+ *
+ * If there is space in the batch, and there is data available to parse then this calls the
+ * daffodil parser, which parses data, delivering it to the rowWriter by way of the infoset
+ * outputter.
+ *
+ * Repeats until the rowWriter is full (a batch is full), or there is no more data, or a parse
+ * error ends execution with a throw.
+ *
+ * Validation errors and other warnings are not errors and are logged but do not cause parsing to
+ * fail/throw.
+ *
+ * @return true if there are rows retrieved, false if no rows were retrieved, which means no more
+ * will ever be retrieved (end of data).
+ * @throws RuntimeException
+ * on parse errors.
+ */
+ @Override
+ public boolean next() {
+ // Check assumed invariants
+ // We don't know if there is data or not. This could be called on an empty data file.
+ // We DO know that this won't be called if there is no space in the batch for even 1
+ // row.
+ if (dafParser.isEOF()) {
+ return false; // return without even checking for more rows or trying to parse.
+ }
+ while (rowSetLoader.start() && !dafParser.isEOF()) { // we never zero-trip this loop.
+ // the predicate is always true once.
+ dafParser.parse();
+ if (dafParser.isProcessingError()) {
+ assert (Objects.nonNull(dafParser.getDiagnostics()));
+ throw UserException.dataReadError().message(dafParser.getDiagnosticsAsString())
+ .addContext(errorContext).build(logger);
+ }
+ if (dafParser.isValidationError()) {
+ logger.warn(dafParser.getDiagnosticsAsString());
+ // Note that even if daffodil is set to not validate, validation errors may still occur
+ // from DFDL's "recoverableError" assertions.
+ }
+ rowSetLoader.save();
+ }
+ int nRows = rowSetLoader.rowCount();
+ assert nRows > 0; // This cannot be zero. If the parse failed we will have already thrown out
+ // of here.
+ return true;
+ }
+
+ @Override
+ public void close() {
+ AutoCloseables.closeSilently(dataInputStream);
+ }
+}
+
+class DaffodilReaderConfig {
+ final DaffodilFormatPlugin plugin;
+
+ DaffodilReaderConfig(DaffodilFormatPlugin plugin) {
+ this.plugin = plugin;
+ }
+}
diff --git a/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilDrillInfosetOutputter.java b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilDrillInfosetOutputter.java
new file mode 100644
index 00000000000..189d2e0c18b
--- /dev/null
+++ b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilDrillInfosetOutputter.java
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.daffodil;
+
+import com.ibm.icu.util.Calendar;
+import com.ibm.icu.util.TimeZone;
+import org.apache.daffodil.japi.infoset.InfosetOutputter;
+import org.apache.daffodil.runtime1.api.ComplexElementMetadata;
+import org.apache.daffodil.runtime1.api.ElementMetadata;
+import org.apache.daffodil.runtime1.api.InfosetArray;
+import org.apache.daffodil.runtime1.api.InfosetComplexElement;
+import org.apache.daffodil.runtime1.api.InfosetSimpleElement;
+import org.apache.daffodil.runtime1.api.DFDLPrimType;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaUtils;
+import org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaVisitor;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ObjectType;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.util.Stack;
+
+/**
+ * Adapts Daffodil parser infoset event calls to Drill writer calls to fill in Drill data rows.
+ */
+public class DaffodilDrillInfosetOutputter extends InfosetOutputter {
+
+ private static final Logger logger = LoggerFactory.getLogger(DaffodilDrillInfosetOutputter.class);
+ /**
+ * Stack that is used only if we have sub-structures that are not simple-type fields of the row.
+ */
+ private final Stack tupleWriterStack = new Stack<>();
+ private final Stack arrayWriterStack = new Stack<>();
+ /**
+ * True if the next startComplex call will be for the DFDL infoset root element whose children are
+ * the columns of the row set.
+ */
+ private boolean isRootElement = true;
+ private RowSetLoader rowSetWriter;
+
+ private DaffodilDrillInfosetOutputter() {
+ } // no default constructor
+
+ public DaffodilDrillInfosetOutputter(RowSetLoader writer) {
+ this.rowSetWriter = writer;
+ this.tupleWriterStack.push(writer);
+ }
+
+ private static void nyi() {
+ throw new IllegalStateException("not yet implemented.");
+ }
+
+ private static void fatalError(String s) {
+ throw new IllegalStateException(s);
+ }
+
+ private boolean isOriginalRoot() {
+ boolean result = currentTupleWriter() == rowSetWriter;
+ if (result) {
+ assert (tupleWriterStack.size() == 1);
+ }
+ return result;
+ }
+
+ private TupleWriter currentTupleWriter() {
+ return tupleWriterStack.peek();
+ }
+
+ private ArrayWriter currentArrayWriter() {
+ return arrayWriterStack.peek();
+ }
+
+ @Override
+ public void reset() {
+ tupleWriterStack.clear();
+ tupleWriterStack.push(rowSetWriter);
+ arrayWriterStack.clear();
+ this.isRootElement = true;
+ checkCleanState();
+ }
+
+ private void checkCleanState() {
+ assert (isOriginalRoot());
+ assert (arrayWriterStack.isEmpty());
+ assert (isRootElement);
+ }
+
+ @Override
+ public void startDocument() {
+ checkCleanState();
+ }
+
+ @Override
+ public void endDocument() {
+ checkCleanState();
+ }
+
+ private String colName(ElementMetadata md) {
+ return DrillDaffodilSchemaVisitor.makeColumnName(md);
+ }
+
+ @Override
+ public void startSimple(InfosetSimpleElement ise) {
+ assert (!isRootElement);
+ ElementMetadata md = ise.metadata();
+ String colName = colName(md);
+ ScalarWriter cw;
+ if (md.isArray()) {
+ // A simple type array
+ assert (!arrayWriterStack.isEmpty());
+ cw = currentArrayWriter().scalar();
+ } else {
+ // A simple element within a map
+ // Note the map itself might be an array,
+ // but we don't care about that here.
+ cw = currentTupleWriter().scalar(colName);
+ }
+ ColumnMetadata cm = cw.schema();
+ assert (cm.isScalar());
+ if (md.isNillable() && ise.isNilled()) {
+ assert cm.isNullable();
+ cw.setNull();
+ } else {
+ convertDaffodilValueToDrillValue(ise, cm, cw);
+ }
+ }
+
+ @Override
+ public void endSimple(InfosetSimpleElement diSimple) {
+ assert (!isRootElement);
+ // do nothing
+ }
+
+ @Override
+ public void startComplex(InfosetComplexElement ce) {
+ ComplexElementMetadata md = ce.metadata();
+ String colName = colName(ce.metadata());
+ if (isRootElement) {
+ assert (isOriginalRoot());
+ // This complex element's corresponds to the root element of the
+ // DFDL schema. We don't treat this as a column of the row set.
+ // Rather, it's children are the columns of the row set.
+ //
+ // If we do nothing at all here, then we'll start getting
+ // event calls for the children.
+ isRootElement = false;
+ return;
+ }
+ if (md.isArray()) {
+ assert (!arrayWriterStack.isEmpty());
+ tupleWriterStack.push(currentArrayWriter().tuple());
+ } else {
+ tupleWriterStack.push(currentTupleWriter().tuple(colName));
+ }
+ }
+
+ @Override
+ public void endComplex(InfosetComplexElement ce) {
+ ComplexElementMetadata md = ce.metadata();
+ if (isOriginalRoot()) {
+ isRootElement = true;
+ // do nothing else. The row gets closed-out in the DaffodilBatchReader.next() method.
+ } else {
+ // it's a map.
+ // We seem to not need to do anything to end the map. No action taken here works.
+ if (md.isArray()) {
+ assert (!arrayWriterStack.isEmpty());
+ currentArrayWriter().save(); // required for map array entries.
+ }
+ tupleWriterStack.pop();
+ }
+ }
+
+ @Override
+ public void startArray(InfosetArray diArray) {
+ ElementMetadata md = diArray.metadata();
+ assert (md.isArray());
+ // DFDL has no notion of an array directly within another array. A named field (map) is
+ // necessary before you can have another array.
+ assert (currentTupleWriter().type() == ObjectType.TUPLE); // parent is a map, or the top
+ // level row.
+ String colName = colName(md);
+ TupleWriter enclosingParentTupleWriter = currentTupleWriter();
+ ArrayWriter aw = enclosingParentTupleWriter.array(colName);
+ arrayWriterStack.push(aw);
+ }
+
+ @Override
+ public void endArray(InfosetArray ia) {
+ ElementMetadata md = ia.metadata();
+ assert (md.isArray());
+ assert (!arrayWriterStack.empty());
+ // FIXME: How do we end/close-out an array?
+ // note that each array instance, when the instance is a map, must have
+ // save called after it is written to the array but that happens
+ // in endComplex events since it must be called not once per array, but
+ // once per array item.
+ arrayWriterStack.pop();
+ }
+
+ private void invariantFailed(String dafTypeName, ColumnMetadata cm) {
+ String msg = String.format(
+ "Daffodil to Drill Conversion Invariant Failed: dafType %s, drill type %s.", dafTypeName,
+ cm.typeString());
+ logger.error(msg);
+ fatalError(msg);
+ }
+
+ private void convertDaffodilValueToDrillValue(InfosetSimpleElement ise, ColumnMetadata cm,
+ ScalarWriter cw) {
+ DFDLPrimType dafType = ise.metadata().dfdlType();
+ String dafTypeName = dafType.name();
+ TypeProtos.MinorType drillType = DrillDaffodilSchemaUtils.getDrillDataType(dafType);
+ assert (drillType == cm.type());
+ switch (drillType) {
+ case BIGINT: { // BIGINT type is not a Java BigInteger, BIGINT is a signed 8-byte long in Drill.
+ switch (dafType) {
+ case UnsignedInt: {
+ cw.setLong(ise.getUnsignedInt());
+ break;
+ }
+ case Long: {
+ cw.setLong(ise.getLong());
+ break;
+ }
+ default:
+ invariantFailed(dafTypeName, cm);
+ }
+ break;
+ }
+ case INT: {
+ cw.setInt(ise.getInt());
+ break;
+ }
+ case SMALLINT: {
+ cw.setInt(ise.getShort()); // there is no setShort
+ break;
+ }
+ case TINYINT: {
+ cw.setInt(ise.getByte()); // there is no setByte
+ break;
+ }
+ case UINT4: {
+ // daffodil represents unsigned int as long.
+ // drill represents unsigned int as int.
+ cw.setInt(ise.getUnsignedInt().intValue());
+ break;
+ }
+ case UINT2: {
+ cw.setInt(ise.getUnsignedShort());
+ break;
+ }
+ case UINT1: {
+ cw.setInt(ise.getUnsignedByte());
+ break;
+ }
+ case VARDECIMAL: {
+ switch (dafType) {
+ case UnsignedLong: {
+ cw.setDecimal(new BigDecimal(ise.getUnsignedLong()));
+ break;
+ }
+ case Integer: {
+ cw.setDecimal(new BigDecimal(ise.getInteger()));
+ break;
+ }
+ case NonNegativeInteger: {
+ cw.setDecimal(new BigDecimal(ise.getNonNegativeInteger()));
+ break;
+ }
+ default:
+ invariantFailed(dafTypeName, cm);
+ }
+ break;
+ }
+ case BIT: {
+ cw.setBoolean(ise.getBoolean());
+ break;
+ }
+ case FLOAT8: {
+ switch (dafType) {
+ case Double: {
+ cw.setDouble(ise.getDouble());
+ break;
+ }
+ case Float: {
+ // converting a float to a double by doubleValue() fails here
+ // Float.MaxValue converted to a double via doubleValue()
+ // then placed in a FLOAT8 column displays as
+ // 3.4028234663852886E38 not 3.4028235E38.
+ // But converting to string first, then to double works properly.
+ cw.setDouble(Double.parseDouble(ise.getFloat().toString()));
+ break;
+ }
+ default:
+ invariantFailed(dafTypeName, cm);
+ }
+ break;
+ }
+ case FLOAT4: {
+ // we don't use float4, we always use float8.
+ invariantFailed(dafTypeName, cm);
+ // cw.setFloat(ise.getFloat());
+ break;
+ }
+ case VARBINARY: {
+ byte[] hexBinary = ise.getHexBinary();
+ cw.setBytes(hexBinary, hexBinary.length);
+ break;
+ }
+ case VARCHAR: {
+ switch (dafType) {
+ case Decimal: {
+ BigDecimal decimal = ise.getDecimal();
+ cw.setString(decimal.toString());
+ break;
+ }
+ case String: {
+ String s = ise.getString();
+ cw.setString(s);
+ break;
+ }
+ default:
+ invariantFailed(dafTypeName, cm);
+ }
+ break;
+ }
+ case TIME: {
+ Calendar icuCal = ise.getTime();
+ Instant instant = Instant.ofEpochMilli(icuCal.getTimeInMillis());
+ TimeZone icuZone = icuCal.getTimeZone();
+ String zoneString = icuZone.getID();
+ ZoneId zoneId = ZoneId.of(zoneString);
+ LocalTime localTime = instant.atZone(zoneId).toLocalTime();
+ cw.setTime(localTime);
+ break;
+ }
+ case DATE: {
+ Calendar icuCalendar = ise.getDate();
+ // Extract year, month, and day from ICU Calendar
+ int year = icuCalendar.get(Calendar.YEAR);
+ // Note: ICU Calendar months are zero-based, similar to java.util.Calendar
+ int month = icuCalendar.get(Calendar.MONTH) + 1;
+ int day = icuCalendar.get(Calendar.DAY_OF_MONTH);
+ // Create a LocalDate
+ LocalDate localDate = LocalDate.of(year, month, day);
+ cw.setDate(localDate);
+ break;
+ }
+ case TIMESTAMP: {
+ Calendar icuCalendar = ise.getDateTime();
+ // Get time in milliseconds from the epoch
+ long millis = icuCalendar.getTimeInMillis();
+ // Create an Instant from milliseconds
+ Instant instant = Instant.ofEpochMilli(millis);
+ cw.setTimestamp(instant);
+ break;
+ }
+ default:
+ invariantFailed(dafTypeName, cm);
+ }
+ }
+
+ private void DFDLParseError(String s) {
+ throw new RuntimeException(s);
+ }
+}
+
diff --git a/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilFormatConfig.java b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilFormatConfig.java
new file mode 100644
index 00000000000..1d197bbd8e7
--- /dev/null
+++ b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilFormatConfig.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.daffodil;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.collect.ImmutableList;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.logical.FormatPluginConfig;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+@JsonTypeName(DaffodilFormatPlugin.DEFAULT_NAME)
+@JsonInclude(JsonInclude.Include.NON_DEFAULT)
+public class DaffodilFormatConfig implements FormatPluginConfig {
+
+ public final List extensions;
+ public final String schemaURI;
+ public final boolean validationMode;
+ public final String rootName;
+ public final String rootNamespace;
+
+ /**
+ * In the constructor for a format config, you should not use boxed versions of primitive types.
+ * It creates problems with defaulting them (they default to null) which cannot be unboxed.
+ */
+ @JsonCreator
+ public DaffodilFormatConfig(@JsonProperty("extensions") List extensions,
+ @JsonProperty("schemaURI") String schemaURI, @JsonProperty("rootName") String rootName,
+ @JsonProperty("rootNamespace") String rootNamespace,
+ @JsonProperty("validationMode") boolean validationMode) {
+
+ this.extensions = extensions == null
+ ? Collections.singletonList("dat")
+ : ImmutableList.copyOf(extensions);
+ this.rootName = rootName;
+ this.rootNamespace = rootNamespace;
+ this.schemaURI = schemaURI;
+ // no default. Users must pick.
+ this.validationMode = validationMode;
+ }
+
+ @JsonInclude(JsonInclude.Include.NON_DEFAULT)
+ public List getExtensions() {
+ return extensions;
+ }
+
+ public String getSchemaURI() {
+ return schemaURI;
+ }
+
+ public String getRootName() {
+ return rootName;
+ }
+
+ public String getRootNamespace() {
+ return rootNamespace;
+ }
+
+ @JsonInclude(JsonInclude.Include.NON_DEFAULT)
+ public boolean getValidationMode() {
+ return validationMode;
+ }
+
+ public DaffodilReaderConfig getReaderConfig(DaffodilFormatPlugin plugin) {
+ return new DaffodilReaderConfig(plugin);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(schemaURI, validationMode, rootName, rootNamespace);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ DaffodilFormatConfig other = (DaffodilFormatConfig) obj;
+ return Objects.equals(schemaURI, other.schemaURI) && Objects.equals(rootName,
+ other.rootName) && Objects.equals(rootNamespace, other.rootNamespace) && Objects.equals(
+ validationMode, other.validationMode);
+ }
+
+ @Override
+ public String toString() {
+ return new PlanStringBuilder(this).field("schemaURI", schemaURI).field("rootName", rootName)
+ .field("rootNamespace", rootNamespace).field("validationMode", validationMode).toString();
+ }
+}
diff --git a/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilFormatPlugin.java b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilFormatPlugin.java
new file mode 100644
index 00000000000..b6a74467286
--- /dev/null
+++ b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilFormatPlugin.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.daffodil;
+
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileReaderFactory;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileScanLifecycleBuilder;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.hadoop.conf.Configuration;
+
+public class DaffodilFormatPlugin extends EasyFormatPlugin {
+
+ public static final String DEFAULT_NAME = "daffodil";
+ public static final String OPERATOR_TYPE = "DAFFODIL_SUB_SCAN";
+
+ public DaffodilFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
+ StoragePluginConfig storageConfig, DaffodilFormatConfig formatConfig) {
+ super(name, easyConfig(fsConf, formatConfig), context, storageConfig, formatConfig);
+ }
+
+ private static EasyFormatConfig easyConfig(Configuration fsConf,
+ DaffodilFormatConfig pluginConfig) {
+ return EasyFormatConfig.builder().readable(true).writable(false).blockSplittable(false)
+ .compressible(true).extensions(pluginConfig.getExtensions()).fsConf(fsConf)
+ .readerOperatorType(OPERATOR_TYPE).scanVersion(ScanFrameworkVersion.EVF_V2)
+ .supportsLimitPushdown(true).supportsProjectPushdown(true)
+ .defaultName(DaffodilFormatPlugin.DEFAULT_NAME).build();
+ }
+
+ @Override
+ protected void configureScan(FileScanLifecycleBuilder builder, EasySubScan scan) {
+ builder.nullType(Types.optional(MinorType.VARCHAR));
+ builder.readerFactory(new DaffodilReaderFactory(formatConfig.getReaderConfig(this), scan));
+ }
+
+ public static class DaffodilReaderFactory extends FileReaderFactory {
+ private final DaffodilReaderConfig readerConfig;
+
+ private final EasySubScan scan;
+
+ public DaffodilReaderFactory(DaffodilReaderConfig config,
+ EasySubScan scan) {
+ this.readerConfig = config;
+ this.scan = scan;
+ }
+
+ @Override
+ public ManagedReader newReader(FileSchemaNegotiator negotiator) {
+ return new DaffodilBatchReader(readerConfig, scan, negotiator);
+ }
+ }
+}
diff --git a/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilMessageParser.java b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilMessageParser.java
new file mode 100644
index 00000000000..97f06d7d35e
--- /dev/null
+++ b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilMessageParser.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.daffodil;
+
+import org.apache.daffodil.japi.DataProcessor;
+import org.apache.daffodil.japi.Diagnostic;
+import org.apache.daffodil.japi.ParseResult;
+import org.apache.daffodil.japi.infoset.InfosetOutputter;
+import org.apache.daffodil.japi.io.InputSourceDataInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * DFDL Daffodil Streaming message parser
+ *
+ * You construct this providing a DataProcessor obtained from the DaffodilDataProcessorFactory. The
+ * DataProcessor contains the compiled DFDL schema, ready to use, as well as whether validation
+ * while parsing has been requested.
+ *
+ * The DataProcessor object may be shared/reused by multiple threads each of which has its own copy
+ * of this class. This object is, however, stateful, and must not be shared by multiple threads.
+ *
+ * You must call setInputStream, and setInfosetOutputter before you call parse(). The input stream
+ * and the InfosetOutputter objects are also private to one thread and are stateful and owned by
+ * this object. Once you have called setInputStream, you should view the input stream as the private
+ * property of this object. The parse() will invoke the InfosetOutputter's methods to deliver parsed
+ * data, and it may optionally create diagnostics (obtained via getDiagnostics) indicating which
+ * kind they are via the getIsProcessingError, getIsValidationError.
+ *
+ * Note that the InfosetOutputter may be called many times before a processing error is detected, as
+ * Daffodil delivers result data incrementally.
+ *
+ * Validation errors do not affect the InfosetOutputter output calls, but indicate that data was
+ * detected that is invalid.
+ *
+ * When parse() returns, the parse has ended and one can check for errors/diagnostics. One can call
+ * parse() again if there is still data to consume, which is checked via the isEOF() method.
+ *
+ * There are no guarantees about where the input stream is positioned between parse() calls. In
+ * particular, it may not be positioned at the start of the next message, as Daffodil may have
+ * pre-fetched additional bytes from the input stream which it found are not part of the current
+ * infoset, but the next one. The positioning of the input stream may in fact be somewhere in the
+ * middle of a byte, as Daffodil does not require messages to be of lengths that are in whole byte
+ * units. Hence, once you give the input stream to this object via setInputStream, that input stream
+ * is owned privately by this class for ever after.
+ */
+public class DaffodilMessageParser {
+
+ private static final Logger logger = LoggerFactory.getLogger(DaffodilMessageParser.class);
+ private List diagnostics; // diagnostics.
+ private boolean isProcessingError;
+ private boolean isValidationError;
+ private InputSourceDataInputStream dis;
+ private InfosetOutputter outputter;
+ private DataProcessor dp;
+
+ /**
+ * Constructs the parser using a DataProcessor obtained from a DaffodilDataProcessorFactory.
+ *
+ * @param dp
+ */
+ DaffodilMessageParser(DataProcessor dp) {
+ this.dp = dp;
+ }
+
+ /**
+ * Provide the input stream from which data is to be parsed.
+ *
+ * This input stream is then owned by this object and becomes part of its state.
+ *
+ * It is; however, the responsibility of the caller to close this input stream after the
+ * completion of all parse calls. In particular, if a parse error is considered fatal, then the
+ * caller should close the input stream. There are advanced error-recovery techniques that may
+ * attempt to find data that can be parsed later in the data stream. In those cases the input
+ * stream would not be closed after a processing error, but such usage is beyond the scope of this
+ * javadoc.
+ *
+ * @param inputStream
+ */
+ public void setInputStream(InputStream inputStream) {
+ dis = new InputSourceDataInputStream(inputStream);
+ }
+
+ /**
+ * Provides the InfosetOutputter which will be called to deliver the Infoset via calls to its
+ * methods.
+ *
+ * @param outputter
+ */
+ public void setInfosetOutputter(InfosetOutputter outputter) {
+ this.outputter = outputter;
+ }
+
+ /**
+ * Called to pull messages from the data stream. The message 'Infoset' is delivered by way of
+ * calls to the InfosetOutputter's methods.
+ *
+ * After calling this, one may call getIsProcessingError, getIsValiationError, isEOF, and
+ * getDiagnostics.
+ */
+ public void parse() {
+ if (dis == null) {
+ throw new IllegalStateException("Input stream must be provided by setInputStream() call.");
+ }
+ if (outputter == null) {
+ throw new IllegalStateException(
+ "InfosetOutputter must be provided by setInfosetOutputter() call.");
+ }
+
+ reset();
+ ParseResult res = dp.parse(dis, outputter);
+ isProcessingError = res.isProcessingError();
+ isValidationError = res.isValidationError();
+ diagnostics = res.getDiagnostics();
+ }
+
+ /**
+ * True if the input stream is known to contain no more data. If the input stream is a true
+ * stream, not a file, then temporary unavailability of data may cause this call to block until
+ * the stream is closed from the other end, or data becomes available.
+ *
+ * False if the input stream is at EOF, and no more data can be obtained. It is an error to call
+ * parse() after isEOF has returned true.
+ *
+ * @return
+ */
+ public boolean isEOF() {
+ return !dis.hasData();
+ }
+
+ /**
+ * True if the parse() call failed with a processing error. This indicates that the data was not
+ * well-formed and could not be parsed successfully.
+ *
+ * It is possible for isProcessingError and isValidationError to both be true.
+ *
+ * @return
+ */
+ public boolean isProcessingError() {
+ return isProcessingError;
+ }
+
+ /**
+ * True if a validation error occurred during parsing. Subsequently to a validation error
+ * occurring, parsing may succeed or fail. after the validation error was detected.
+ *
+ * @return
+ */
+ public boolean isValidationError() {
+ return isValidationError;
+ }
+
+ /**
+ * After a parse() call this returns null or a list of 1 or more diagnostics.
+ *
+ * If isProcessingError or isValidationError are true, then this will contain at least 1
+ * diagnostic. If both are true this will contain at least 2 diagnostics.
+ *
+ * @return
+ */
+ public List getDiagnostics() {
+ return diagnostics;
+ }
+
+ public String getDiagnosticsAsString() {
+ String result = diagnostics.stream().map(Diagnostic::getMessage)
+ .collect(Collectors.joining("\n"));
+ return result;
+ }
+
+ private void reset() {
+ outputter.reset();
+ isProcessingError = false;
+ isValidationError = false;
+ diagnostics = null;
+ }
+}
diff --git a/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/schema/DaffodilDataProcessorFactory.java b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/schema/DaffodilDataProcessorFactory.java
new file mode 100644
index 00000000000..e338f266b7e
--- /dev/null
+++ b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/schema/DaffodilDataProcessorFactory.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.daffodil.schema;
+
+import org.apache.daffodil.japi.Compiler;
+import org.apache.daffodil.japi.Daffodil;
+import org.apache.daffodil.japi.DataProcessor;
+import org.apache.daffodil.japi.Diagnostic;
+import org.apache.daffodil.japi.InvalidParserException;
+import org.apache.daffodil.japi.InvalidUsageException;
+import org.apache.daffodil.japi.ProcessorFactory;
+import org.apache.daffodil.japi.ValidationMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.channels.Channels;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Compiles a DFDL schema (mostly for tests) or loads a pre-compiled DFDL schema so that one can
+ * obtain a DataProcessor for use with DaffodilMessageParser.
+ *
+ * TODO: Needs to use a cache to avoid reloading/recompiling every time.
+ */
+public class DaffodilDataProcessorFactory {
+ // Default constructor is used.
+
+ private static final Logger logger = LoggerFactory.getLogger(DaffodilDataProcessorFactory.class);
+
+ private DataProcessor dp;
+
+ /**
+ * Gets a Daffodil DataProcessor given the necessary arguments to compile or reload it.
+ *
+ * @param schemaFileURI
+ * pre-compiled dfdl schema (.bin extension) or DFDL schema source (.xsd extension)
+ * @param validationMode
+ * Use true to request Daffodil built-in 'limited' validation. Use false for no validation.
+ * @param rootName
+ * Local name of root element of the message. Can be null to use the first element declaration
+ * of the primary schema file. Ignored if reloading a pre-compiled schema.
+ * @param rootNS
+ * Namespace URI as a string. Can be null to use the target namespace of the primary schema
+ * file or if it is unambiguous what element is the rootName. Ignored if reloading a
+ * pre-compiled schema.
+ * @return the DataProcessor
+ * @throws CompileFailure
+ * - if schema compilation fails
+ */
+ public DataProcessor getDataProcessor(URI schemaFileURI, boolean validationMode, String rootName,
+ String rootNS)
+ throws CompileFailure {
+
+ DaffodilDataProcessorFactory dmp = new DaffodilDataProcessorFactory();
+ boolean isPrecompiled = schemaFileURI.toString().endsWith(".bin");
+ if (isPrecompiled) {
+ if (Objects.nonNull(rootName) && !rootName.isEmpty()) {
+ // A usage error. You shouldn't supply the name and optionally namespace if loading
+ // precompiled schema because those are built into it. Should be null or "".
+ logger.warn("Root element name '{}' is ignored when used with precompiled DFDL schema.",
+ rootName);
+ }
+ try {
+ dmp.loadSchema(schemaFileURI);
+ } catch (IOException | InvalidParserException e) {
+ throw new CompileFailure(e);
+ }
+ dmp.setupDP(validationMode, null);
+ } else {
+ List pfDiags;
+ try {
+ pfDiags = dmp.compileSchema(schemaFileURI, rootName, rootNS);
+ } catch (URISyntaxException | IOException e) {
+ throw new CompileFailure(e);
+ }
+ dmp.setupDP(validationMode, pfDiags);
+ }
+ return dmp.dp;
+ }
+
+ private void loadSchema(URI schemaFileURI) throws IOException, InvalidParserException {
+ Compiler c = Daffodil.compiler();
+ dp = c.reload(Channels.newChannel(schemaFileURI.toURL().openStream()));
+ }
+
+ private List compileSchema(URI schemaFileURI, String rootName, String rootNS)
+ throws URISyntaxException, IOException, CompileFailure {
+ Compiler c = Daffodil.compiler();
+ ProcessorFactory pf = c.compileSource(schemaFileURI, rootName, rootNS);
+ List pfDiags = pf.getDiagnostics();
+ if (pf.isError()) {
+ pfDiags.forEach(diag -> logger.error(diag.getSomeMessage()));
+ throw new CompileFailure(pfDiags);
+ }
+ dp = pf.onPath("/");
+ return pfDiags; // must be just warnings. If it was errors we would have thrown.
+ }
+
+ /**
+ * Common setup steps used whether or not we reloaded or compiled a DFDL schema.
+ */
+ private void setupDP(boolean validationMode, List pfDiags) throws CompileFailure {
+ Objects.requireNonNull(dp); // true because failure to produce a dp throws CompileFailure.
+ if (validationMode) {
+ try {
+ // We don't have the DFDL schema text, and we're not creating XML as an intermediate form,
+ // we're taking data direct from Daffodil into Drill rows, so using any Xerces-based
+ // XML Validator is not possible.
+ dp = dp.withValidationMode(ValidationMode.Limited);
+ } catch (InvalidUsageException e) {
+ // impossible
+ throw new Error(e);
+ }
+ }
+ List dpDiags = dp.getDiagnostics();
+ if (dp.isError()) {
+ throw new CompileFailure(dpDiags);
+ }
+ // well this part is only if we compiled, and provided the pfDiags arg as non-null.
+ List compilationWarnings;
+ if (pfDiags != null && !pfDiags.isEmpty()) {
+ compilationWarnings = pfDiags;
+ compilationWarnings.addAll(dpDiags); // dpDiags might be empty. That's ok.
+ } else {
+ compilationWarnings = dpDiags; // dpDiags might be empty. That's ok.
+ }
+ }
+
+ /**
+ * Thrown if schema compilation fails.
+ *
+ * Contains diagnostic objects which give the cause(s) of the failure, or
+ * contains a cause Throwable providing the reason.
+ */
+ public static class CompileFailure extends Exception {
+ List diags;
+
+ CompileFailure(List diagnostics) {
+ super("DFDL Schema Compile Failure");
+ diags = diagnostics;
+ }
+ CompileFailure(Throwable cause) {
+ super(cause);
+ }
+ }
+}
diff --git a/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/schema/DrillDaffodilSchemaUtils.java b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/schema/DrillDaffodilSchemaUtils.java
new file mode 100644
index 00000000000..5481f1da7ad
--- /dev/null
+++ b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/schema/DrillDaffodilSchemaUtils.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.daffodil.schema;
+
+import org.apache.daffodil.japi.InvalidParserException;
+import org.apache.daffodil.japi.DataProcessor;
+import org.apache.daffodil.runtime1.api.DFDLPrimType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+
+public class DrillDaffodilSchemaUtils {
+ private static final MinorType DEFAULT_TYPE = MinorType.VARCHAR;
+ private static final Logger logger = LoggerFactory.getLogger(DrillDaffodilSchemaUtils.class);
+
+ /**
+ * This map maps the data types defined by the DFDL definition to Drill data types.
+ */
+ public static final ImmutableMap DFDL_TYPE_MAPPINGS =
+ ImmutableMap.builder()
+ .put(DFDLPrimType.Long, MinorType.BIGINT)
+ .put(DFDLPrimType.Int, MinorType.INT)
+ .put(DFDLPrimType.Short, MinorType.SMALLINT)
+ .put(DFDLPrimType.Byte, MinorType.TINYINT)
+ // daffodil unsigned longs are modeled as DECIMAL(38, 0) which is the default for VARDECIMAL
+ .put(DFDLPrimType.UnsignedLong, MinorType.VARDECIMAL)
+ .put(DFDLPrimType.UnsignedInt, MinorType.BIGINT)
+ .put(DFDLPrimType.UnsignedShort, MinorType.UINT2)
+ .put(DFDLPrimType.UnsignedByte, MinorType.UINT1)
+ // daffodil integer, nonNegativeInteger, are modeled as DECIMAL(38, 0) which is the default for VARDECIMAL
+ .put(DFDLPrimType.Integer, MinorType.VARDECIMAL)
+ .put(DFDLPrimType.NonNegativeInteger, MinorType.VARDECIMAL)
+ // decimal has to be modeled as string since we really have no idea what to set the
+ // scale to.
+ .put(DFDLPrimType.Decimal, MinorType.VARCHAR)
+ .put(DFDLPrimType.Boolean, MinorType.BIT)
+ .put(DFDLPrimType.Date, MinorType.DATE) // requires conversion
+ .put(DFDLPrimType.DateTime, MinorType.TIMESTAMP) // requires conversion
+ .put(DFDLPrimType.Double, MinorType.FLOAT8)
+ //
+ // daffodil float type is mapped to double aka Float8 in drill because there
+ // seems to be bugs in FLOAT4. Float.MaxValue in a Float4 column displays as
+ // 3.4028234663852886E38 not 3.4028235E38.
+ //
+ // We don't really care about single float precision, so we just use double precision.
+ //
+ .put(DFDLPrimType.Float, MinorType.FLOAT8)
+ .put(DFDLPrimType.HexBinary, MinorType.VARBINARY)
+ .put(DFDLPrimType.String, MinorType.VARCHAR)
+ .put(DFDLPrimType.Time, MinorType.TIME) // requires conversion
+ .build();
+
+
+ @VisibleForTesting
+ public static TupleMetadata processSchema(URI dfdlSchemaURI, String rootName, String namespace)
+ throws IOException, DaffodilDataProcessorFactory.CompileFailure,
+ URISyntaxException, InvalidParserException {
+ DaffodilDataProcessorFactory dpf = new DaffodilDataProcessorFactory();
+ boolean validationMode = true; // use Daffodil's limited validation always
+ DataProcessor dp = dpf.getDataProcessor(dfdlSchemaURI, validationMode, rootName, namespace);
+ return daffodilDataProcessorToDrillSchema(dp);
+ }
+
+ public static TupleMetadata daffodilDataProcessorToDrillSchema(DataProcessor dp) {
+ DrillDaffodilSchemaVisitor schemaVisitor = new DrillDaffodilSchemaVisitor();
+ dp.walkMetadata(schemaVisitor);
+ TupleMetadata drillSchema = schemaVisitor.getDrillSchema();
+ return drillSchema;
+ }
+
+ /**
+ * Returns a {@link MinorType} of the corresponding DFDL Data Type. Defaults to VARCHAR if unknown
+ * @param dfdlType The type as provided by Daffodil.
+ * @return A {@link MinorType} of the Drill data type.
+ */
+ public static MinorType getDrillDataType(DFDLPrimType dfdlType) {
+ try {
+ MinorType type = DrillDaffodilSchemaUtils.DFDL_TYPE_MAPPINGS.get(dfdlType);
+ if (type == null) {
+ return DEFAULT_TYPE;
+ } else {
+ return type;
+ }
+ } catch (NullPointerException e) {
+ logger.warn("Unknown data type found in XSD reader: {}. Returning VARCHAR.", dfdlType);
+ return DEFAULT_TYPE;
+ }
+ }
+}
diff --git a/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/schema/DrillDaffodilSchemaVisitor.java b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/schema/DrillDaffodilSchemaVisitor.java
new file mode 100644
index 00000000000..0bd8992eee7
--- /dev/null
+++ b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/schema/DrillDaffodilSchemaVisitor.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.daffodil.schema;
+
+import org.apache.daffodil.runtime1.api.ChoiceMetadata;
+import org.apache.daffodil.runtime1.api.ComplexElementMetadata;
+import org.apache.daffodil.runtime1.api.ElementMetadata;
+import org.apache.daffodil.runtime1.api.MetadataHandler;
+import org.apache.daffodil.runtime1.api.SequenceMetadata;
+import org.apache.daffodil.runtime1.api.SimpleElementMetadata;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.record.metadata.MapBuilderLike;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Stack;
+
+/**
+ * This class transforms a DFDL/Daffodil schema into a Drill Schema.
+ */
+public class DrillDaffodilSchemaVisitor extends MetadataHandler {
+
+ private static final Logger logger = LoggerFactory.getLogger(DrillDaffodilSchemaVisitor.class);
+
+ /**
+ * SchemaBuilder and MapBuilder share a polymorphic interface MapBuilderLike
+ */
+ private final SchemaBuilder builder = new SchemaBuilder();
+ private final Stack mapBuilderStack = new Stack<>();
+
+ private MapBuilderLike mapBuilder() {
+ return mapBuilderStack.peek();
+ }
+
+ /**
+ * Converts Daffodil names into appropriate Drill column names.
+ * @param md Daffodil element metadata, which contains an element name.
+ * @return a string usable as a Drill column name
+ */
+ public static String makeColumnName(ElementMetadata md) {
+ return md.toQName().replace(":", "_");
+ }
+
+ /**
+ * Returns a {@link TupleMetadata} representation of the DFDL schema. Should only be called after
+ * the walk of the DFDL schema with this visitor has been called.
+ *
+ * @return A {@link TupleMetadata} representation of the DFDL schema.
+ */
+ public TupleMetadata getDrillSchema() {
+ return builder.build();
+ }
+
+ @Override
+ public void simpleElementMetadata(SimpleElementMetadata md) {
+ assert (!mapBuilderStack.isEmpty());
+ String colName = makeColumnName(md);
+ MinorType drillType = DrillDaffodilSchemaUtils.getDrillDataType(md.dfdlType());
+ if (md.isArray()) {
+ mapBuilder().addArray(colName, drillType);
+ } else if (md.isOptional() || md.isNillable()) {
+ mapBuilder().addNullable(colName, drillType);
+ } else {
+ mapBuilder().add(colName, drillType);
+ }
+ }
+
+ @Override
+ public void startComplexElementMetadata(ComplexElementMetadata md) {
+ if (mapBuilderStack.isEmpty()) {
+ // root element case. The SchemaBuilder top level row is the container of the root element's children
+ mapBuilderStack.push(builder);
+ } else {
+ // enclosed complex element case. Create a map field.
+ String colName = makeColumnName(md);
+ if (md.isArray()) {
+ mapBuilderStack.push(mapBuilder().addMapArray(colName));
+ } else {
+ mapBuilderStack.push(mapBuilder().addMap(colName)); // also handles optional complex elements
+ }
+ }
+ }
+
+ @Override
+ public void endComplexElementMetadata(ComplexElementMetadata md) {
+ assert (!mapBuilderStack.isEmpty());
+ mapBuilder().resume();
+ mapBuilderStack.pop();
+ }
+
+ @Override
+ public void startSequenceMetadata(SequenceMetadata m) {
+ }
+
+ @Override
+ public void endSequenceMetadata(SequenceMetadata m) {
+ }
+
+ @Override
+ public void startChoiceMetadata(ChoiceMetadata m) {
+ }
+
+ @Override
+ public void endChoiceMetadata(ChoiceMetadata m) {
+ }
+
+ private void DFDLSchemaError(String s) {
+ throw new RuntimeException(s);
+ }
+}
diff --git a/contrib/format-daffodil/src/main/resources/bootstrap-format-plugins.json b/contrib/format-daffodil/src/main/resources/bootstrap-format-plugins.json
new file mode 100644
index 00000000000..966d9ba1b5b
--- /dev/null
+++ b/contrib/format-daffodil/src/main/resources/bootstrap-format-plugins.json
@@ -0,0 +1,37 @@
+{
+ "storage":{
+ "dfs": {
+ "type": "file",
+ "formats": {
+ "daffodil": {
+ "type": "daffodil",
+ "extensions": [
+ "dat"
+ ]
+ }
+ }
+ },
+ "cp": {
+ "type": "file",
+ "formats": {
+ "daffodil": {
+ "type": "daffodil",
+ "extensions": [
+ "dat"
+ ]
+ }
+ }
+ },
+ "s3": {
+ "type": "file",
+ "formats": {
+ "daffodil": {
+ "type": "daffodil",
+ "extensions": [
+ "dat"
+ ]
+ }
+ }
+ }
+ }
+}
diff --git a/contrib/format-daffodil/src/main/resources/drill-module.conf b/contrib/format-daffodil/src/main/resources/drill-module.conf
new file mode 100644
index 00000000000..52a902572e3
--- /dev/null
+++ b/contrib/format-daffodil/src/main/resources/drill-module.conf
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This file tells Drill to consider this module when class path scanning.
+# This file can also include any supplementary configuration information.
+# This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
+
+drill.classpath.scanning: {
+ packages += "org.apache.drill.exec.store.daffodil"
+}
diff --git a/contrib/format-daffodil/src/test/java/org/apache/drill/exec/store/daffodil/TestDaffodilReader.java b/contrib/format-daffodil/src/test/java/org/apache/drill/exec/store/daffodil/TestDaffodilReader.java
new file mode 100644
index 00000000000..d5aa3a23058
--- /dev/null
+++ b/contrib/format-daffodil/src/test/java/org/apache/drill/exec/store/daffodil/TestDaffodilReader.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.daffodil;
+
+import org.apache.drill.categories.RowSetTest;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetReader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryBuilder;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.nio.file.Paths;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+@Category(RowSetTest.class)
+public class TestDaffodilReader extends ClusterTest {
+
+ String schemaURIRoot = "file:///opt/drill/contrib/format-daffodil/src/test/resources/";
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ // boilerplate call to start test rig
+ ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
+
+ DaffodilFormatConfig formatConfig = new DaffodilFormatConfig(null, "", "", "", false);
+
+ cluster.defineFormat("dfs", "daffodil", formatConfig);
+
+ // Needed to test against compressed files.
+ // Copies data from src/test/resources to the dfs root.
+ dirTestWatcher.copyResourceToRoot(Paths.get("data/"));
+ dirTestWatcher.copyResourceToRoot(Paths.get("schema/"));
+ }
+
+ private String selectRow(String schema, String file) {
+ return "SELECT * FROM table(dfs.`data/" + file + "` " + " (type => 'daffodil'," + " " +
+ "validationMode => 'true', " + " schemaURI => '" + schemaURIRoot + "schema/" + schema +
+ ".dfdl.xsd'," + " rootName => 'row'," + " rootNamespace => null " + "))";
+ }
+
+ /**
+ * This unit test tests a simple data file
+ *
+ * @throws Exception
+ * Throw exception if anything goes wrong
+ */
+ @Test
+ public void testSimpleQuery1() throws Exception {
+
+ QueryBuilder qb = client.queryBuilder();
+ QueryBuilder query = qb.sql(selectRow("simple", "data01Int.dat.gz"));
+ RowSet results = query.rowSet();
+ assertEquals(1, results.rowCount());
+
+ // create the expected metadata and data for this test
+ // metadata first
+ TupleMetadata expectedSchema = new SchemaBuilder().add("col", MinorType.INT).buildSchema();
+
+ RowSet expected = client.rowSetBuilder(expectedSchema).addRow(0x00000101) // aka 257
+ .build();
+
+ new RowSetComparison(expected).verifyAndClearAll(results);
+ }
+
+ @Test
+ public void testSimpleQuery2() throws Exception {
+
+ QueryBuilder qb = client.queryBuilder();
+ QueryBuilder query = qb.sql(selectRow("simple", "data06Int.dat"));
+ RowSet results = query.rowSet();
+ assertEquals(6, results.rowCount());
+
+ // create the expected metadata and data for this test
+ // metadata first
+ TupleMetadata expectedSchema = new SchemaBuilder().add("col", MinorType.INT).buildSchema();
+
+ RowSet expected = client.rowSetBuilder(expectedSchema).addRow(0x00000101).addRow(0x00000102)
+ .addRow(0x00000103).addRow(0x00000104).addRow(0x00000105).addRow(0x00000106).build();
+
+ new RowSetComparison(expected).verifyAndClearAll(results);
+ }
+
+ @Test
+ public void testComplexQuery1() throws Exception {
+
+ QueryBuilder qb = client.queryBuilder();
+ QueryBuilder query = qb.sql(selectRow("complex1", "data02Int.dat"));
+ RowSet results = query.rowSet();
+ assertEquals(1, results.rowCount());
+
+ RowSetReader rdr = results.reader();
+ rdr.next();
+ String col = rdr.getAsString();
+ assertEquals("{257, 258}", col);
+ assertFalse(rdr.next());
+ results.clear();
+ }
+
+ @Test
+ public void testComplexQuery2() throws Exception {
+
+ QueryBuilder qb = client.queryBuilder();
+ QueryBuilder query = qb.sql(selectRow("complex1", "data06Int.dat"));
+ RowSet results = query.rowSet();
+ assertEquals(3, results.rowCount());
+
+ RowSetReader rdr = results.reader();
+ rdr.next();
+ String map = rdr.getAsString();
+ assertEquals("{257, 258}", map);
+ rdr.next();
+ map = rdr.getAsString();
+ assertEquals("{259, 260}", map);
+ rdr.next();
+ map = rdr.getAsString();
+ assertEquals("{261, 262}", map);
+ assertFalse(rdr.next());
+ results.clear();
+ }
+
+ /**
+ * Tests data which is rows of two ints and an array containing a map containing two ints. Each
+ * row can be visualized like this: "{257, 258, [{259, 260},...]}"
+ */
+ @Test
+ public void testComplexArrayQuery1() throws Exception {
+
+ QueryBuilder qb = client.queryBuilder();
+ QueryBuilder query = qb.sql(selectRow("complexArray1", "data12Int.dat"));
+ RowSet results = query.rowSet();
+ assertEquals(1, results.rowCount());
+
+ RowSetReader rdr = results.reader();
+ rdr.next();
+ String map = rdr.getAsString();
+ assertEquals("{257, 258, [{259, 260}, {261, 262}, {257, 258}, {259, 260}, {261, 262}]}", map);
+ assertFalse(rdr.next());
+ results.clear();
+ }
+
+ /**
+ * Tests data which is an array of ints in one column of the row set
+ */
+ @Test
+ public void testSimpleArrayQuery1() throws Exception {
+
+ QueryBuilder qb = client.queryBuilder();
+ QueryBuilder query = qb.sql(selectRow("simpleArrayField1", "data12Int.dat"));
+ RowSet results = query.rowSet();
+ assertEquals(1, results.rowCount());
+
+ RowSetReader rdr = results.reader();
+ rdr.next();
+ String map = rdr.getAsString();
+ assertEquals("{[257, 258, 259, 260, 261, 262, 257, 258, 259, 260, 261, 262]}", map);
+ assertFalse(rdr.next());
+ results.clear();
+ }
+
+ /**
+ * Tests data which is rows of two ints and an array containing a map containing an int and a
+ * vector of ints. Each row can be visualized like this: "{257, 258, [{259, [260, 261,
+ * 262]},...]}"
+ */
+ @Test
+ public void testComplexArrayQuery2() throws Exception {
+
+ QueryBuilder qb = client.queryBuilder();
+ QueryBuilder query = qb.sql(selectRow("complexArray2", "data12Int.dat"));
+ RowSet results = query.rowSet();
+ assertEquals(1, results.rowCount());
+
+ RowSetReader rdr = results.reader();
+ rdr.next();
+ String map = rdr.getAsString();
+ assertEquals("{257, 258, [{259, [260, 261, 262]}, {257, [258, 259, 260]}, {261, [262]}]}", map);
+ assertFalse(rdr.next());
+ results.clear();
+ }
+
+ @Test
+ public void testMoreTypes1() throws Exception {
+
+ QueryBuilder qb = client.queryBuilder();
+ QueryBuilder query = qb.sql(selectRow("moreTypes1", "moreTypes1.txt.dat"));
+ RowSet results = query.rowSet();
+ assertEquals(2, results.rowCount());
+
+ RowSetReader rdr = results.reader();
+ rdr.next();
+ String map = rdr.getAsString();
+ assertEquals(
+ "{2147483647, 9223372036854775807, 32767, 127, true, " + "1.7976931348623157E308, 3" +
+ ".4028235E38, [31, 32, 33, 34, 35, 36, 37, 38], \"daffodil\"}",
+ map);
+ rdr.next();
+ map = rdr.getAsString();
+ assertEquals(
+ "{-2147483648, -9223372036854775808, -32768, -128, false, " + "-1.7976931348623157E308, " +
+ "-3.4028235E38, [38, 37, 36, 35, 34, 33, 32, 31], \"drill\"}",
+ map);
+ assertFalse(rdr.next());
+ results.clear();
+ }
+
+ @Test
+ public void testMoreTypes2() throws Exception {
+
+ QueryBuilder qb = client.queryBuilder();
+ QueryBuilder query = qb.sql(selectRow("moreTypes2", "moreTypes2.txt.dat"));
+ RowSet results = query.rowSet();
+ assertEquals(1, results.rowCount());
+
+ RowSetReader rdr = results.reader();
+ rdr.next();
+ String map = rdr.getAsString();
+ assertEquals(
+ "{4294967295, 18446744073709551615, 65535, 255, " + "-18446744073709551616, " +
+ "18446744073709551616, " + "\"0.18446744073709551616\", " + // xs:decimal is modeled
+ // as VARCHAR i.e., a string. So needs quotation marks.
+ "1970-01-01, 00:00, 1970-01-01T00:00:00Z}", map);
+ assertFalse(rdr.next());
+ results.clear();
+ }
+}
diff --git a/contrib/format-daffodil/src/test/java/org/apache/drill/exec/store/daffodil/schema/TestDaffodilToDrillMetadataConversion.java b/contrib/format-daffodil/src/test/java/org/apache/drill/exec/store/daffodil/schema/TestDaffodilToDrillMetadataConversion.java
new file mode 100644
index 00000000000..8076fe5e8cf
--- /dev/null
+++ b/contrib/format-daffodil/src/test/java/org/apache/drill/exec/store/daffodil/schema/TestDaffodilToDrillMetadataConversion.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.daffodil.schema;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.junit.Test;
+
+import java.net.URI;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestDaffodilToDrillMetadataConversion {
+
+ @Test
+ public void testSimple() throws Exception {
+ URI schemaURI = getClass().getResource("/schema/simple.dfdl.xsd").toURI();
+ TupleMetadata schema = DrillDaffodilSchemaUtils.processSchema(schemaURI, "row", null);
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("col", MinorType.INT)
+ .buildSchema();
+ assertTrue(expectedSchema.isEquivalent(schema));
+ }
+
+ @Test
+ public void testComplex1() throws Exception {
+ URI schemaURI = getClass().getResource("/schema/complex1.dfdl.xsd").toURI();
+ TupleMetadata schema = DrillDaffodilSchemaUtils.processSchema(schemaURI, "row", null);
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("a1", MinorType.INT)
+ .add("a2", MinorType.INT)
+ .buildSchema();
+ assertTrue(expectedSchema.isEquivalent(schema));
+ }
+
+ @Test
+ public void testComplex2() throws Exception {
+ URI schemaURI = getClass().getResource("/schema/complex2.dfdl.xsd").toURI();
+ TupleMetadata schema = DrillDaffodilSchemaUtils.processSchema(schemaURI, "row", null);
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("a1", MinorType.INT)
+ .add("a2", MinorType.INT)
+ .addMap("b")
+ .add("b1", MinorType.INT)
+ .add("b2", MinorType.INT)
+ .resumeSchema()
+ .buildSchema();
+ assertTrue(expectedSchema.isEquivalent(schema));
+ }
+
+}
diff --git a/contrib/format-daffodil/src/test/resources/data/data01Int.dat b/contrib/format-daffodil/src/test/resources/data/data01Int.dat
new file mode 100644
index 00000000000..dee9c4c8ada
Binary files /dev/null and b/contrib/format-daffodil/src/test/resources/data/data01Int.dat differ
diff --git a/contrib/format-daffodil/src/test/resources/data/data01Int.dat.gz b/contrib/format-daffodil/src/test/resources/data/data01Int.dat.gz
new file mode 100644
index 00000000000..5e4b3b37acf
Binary files /dev/null and b/contrib/format-daffodil/src/test/resources/data/data01Int.dat.gz differ
diff --git a/contrib/format-daffodil/src/test/resources/data/data02Int.dat b/contrib/format-daffodil/src/test/resources/data/data02Int.dat
new file mode 100644
index 00000000000..8577a259181
Binary files /dev/null and b/contrib/format-daffodil/src/test/resources/data/data02Int.dat differ
diff --git a/contrib/format-daffodil/src/test/resources/data/data06Int.dat b/contrib/format-daffodil/src/test/resources/data/data06Int.dat
new file mode 100644
index 00000000000..8c29db2ec51
Binary files /dev/null and b/contrib/format-daffodil/src/test/resources/data/data06Int.dat differ
diff --git a/contrib/format-daffodil/src/test/resources/data/data12Int.dat b/contrib/format-daffodil/src/test/resources/data/data12Int.dat
new file mode 100644
index 00000000000..39fa5271b4f
Binary files /dev/null and b/contrib/format-daffodil/src/test/resources/data/data12Int.dat differ
diff --git a/contrib/format-daffodil/src/test/resources/data/moreTypes1.txt.dat b/contrib/format-daffodil/src/test/resources/data/moreTypes1.txt.dat
new file mode 100644
index 00000000000..cd0b5c1591a
--- /dev/null
+++ b/contrib/format-daffodil/src/test/resources/data/moreTypes1.txt.dat
@@ -0,0 +1,2 @@
+2147483647 9223372036854775807 32767 127 T 1.7976931348623157E308 3.4028235E38 12345678 'daffodil'
+-2147483648 -9223372036854775808 -32768 -128 F -1.7976931348623157E308 -3.4028235E38 87654321 'drill'
diff --git a/contrib/format-daffodil/src/test/resources/data/moreTypes2.txt.dat b/contrib/format-daffodil/src/test/resources/data/moreTypes2.txt.dat
new file mode 100644
index 00000000000..fca536e8543
--- /dev/null
+++ b/contrib/format-daffodil/src/test/resources/data/moreTypes2.txt.dat
@@ -0,0 +1 @@
+4294967295 18446744073709551615 65535 255 -18446744073709551616 18446744073709551616 0.18446744073709551616 1970-01-01 00:00:00+0000 1970-01-01T00:00:00
diff --git a/contrib/format-daffodil/src/test/resources/schema/complex1.dfdl.xsd b/contrib/format-daffodil/src/test/resources/schema/complex1.dfdl.xsd
new file mode 100644
index 00000000000..1f4ab8954ef
--- /dev/null
+++ b/contrib/format-daffodil/src/test/resources/schema/complex1.dfdl.xsd
@@ -0,0 +1,54 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/contrib/format-daffodil/src/test/resources/schema/complex2.dfdl.xsd b/contrib/format-daffodil/src/test/resources/schema/complex2.dfdl.xsd
new file mode 100644
index 00000000000..d4d74aefe1f
--- /dev/null
+++ b/contrib/format-daffodil/src/test/resources/schema/complex2.dfdl.xsd
@@ -0,0 +1,65 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/contrib/format-daffodil/src/test/resources/schema/complexArray1.dfdl.xsd b/contrib/format-daffodil/src/test/resources/schema/complexArray1.dfdl.xsd
new file mode 100644
index 00000000000..59d285bd31e
--- /dev/null
+++ b/contrib/format-daffodil/src/test/resources/schema/complexArray1.dfdl.xsd
@@ -0,0 +1,65 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/contrib/format-daffodil/src/test/resources/schema/complexArray2.dfdl.xsd b/contrib/format-daffodil/src/test/resources/schema/complexArray2.dfdl.xsd
new file mode 100644
index 00000000000..89d74fa2de9
--- /dev/null
+++ b/contrib/format-daffodil/src/test/resources/schema/complexArray2.dfdl.xsd
@@ -0,0 +1,65 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/contrib/format-daffodil/src/test/resources/schema/moreTypes1.dfdl.xsd b/contrib/format-daffodil/src/test/resources/schema/moreTypes1.dfdl.xsd
new file mode 100644
index 00000000000..a294e29e548
--- /dev/null
+++ b/contrib/format-daffodil/src/test/resources/schema/moreTypes1.dfdl.xsd
@@ -0,0 +1,65 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/contrib/format-daffodil/src/test/resources/schema/moreTypes2.dfdl.xsd b/contrib/format-daffodil/src/test/resources/schema/moreTypes2.dfdl.xsd
new file mode 100644
index 00000000000..00bd3d7a155
--- /dev/null
+++ b/contrib/format-daffodil/src/test/resources/schema/moreTypes2.dfdl.xsd
@@ -0,0 +1,64 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/contrib/format-daffodil/src/test/resources/schema/simple.dfdl.xsd b/contrib/format-daffodil/src/test/resources/schema/simple.dfdl.xsd
new file mode 100644
index 00000000000..eea582b9a56
--- /dev/null
+++ b/contrib/format-daffodil/src/test/resources/schema/simple.dfdl.xsd
@@ -0,0 +1,71 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/contrib/format-daffodil/src/test/resources/schema/simpleArrayField1.dfdl.xsd b/contrib/format-daffodil/src/test/resources/schema/simpleArrayField1.dfdl.xsd
new file mode 100644
index 00000000000..6c72c375159
--- /dev/null
+++ b/contrib/format-daffodil/src/test/resources/schema/simpleArrayField1.dfdl.xsd
@@ -0,0 +1,71 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 59747eba87f..7379fd37325 100644
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -42,6 +42,7 @@
data
format-access
+ format-daffodil
format-deltalake
format-esri
format-excel
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 80ba9650a8b..90232ee0cd2 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -497,6 +497,11 @@
drill-format-log
${project.version}
+
+ org.apache.drill.contrib
+ drill-format-daffodil
+ ${project.version}
+
org.apache.drill.contrib
drill-druid-storage
diff --git a/distribution/src/assemble/component.xml b/distribution/src/assemble/component.xml
index 66792fd43f6..76ba62978fe 100644
--- a/distribution/src/assemble/component.xml
+++ b/distribution/src/assemble/component.xml
@@ -29,6 +29,7 @@
org.apache.drill.contrib.data:tpch-sample-data:jar
org.apache.drill.contrib:drill-deltalake-format:jar
org.apache.drill.contrib:drill-druid-storage:jar
+ org.apache.drill.contrib:drill-format-daffodil:jar
org.apache.drill.contrib:drill-format-esri:jar
org.apache.drill.contrib:drill-format-excel:jar
org.apache.drill.contrib:drill-format-hdf5:jar
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MapBuilder.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MapBuilder.java
index f333cb9cd2b..37809e64826 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MapBuilder.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MapBuilder.java
@@ -22,6 +22,8 @@
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.record.MaterializedField;
+import java.util.Objects;
+
/**
* Internal structure for building a map. A map is just a schema,
* but one that is part of a parent column.
@@ -33,7 +35,7 @@
* All resumeXXX methods do not produce any action and return null.
* To access built column {@link #buildColumn()} should be used.
*/
-public class MapBuilder implements SchemaContainer {
+public class MapBuilder implements MapBuilderLike, SchemaContainer {
private final SchemaContainer parent;
private final TupleBuilder tupleBuilder = new TupleBuilder();
private final String memberName;
@@ -68,6 +70,7 @@ public MapBuilder add(String name, MinorType type, DataMode mode) {
return this;
}
+ @Override
public MapBuilder add(String name, MinorType type) {
tupleBuilder.add(name, type);
return this;
@@ -82,6 +85,7 @@ public MapBuilder add(String name, MinorType type, int precision, int scale) {
return addDecimal(name, type, DataMode.REQUIRED, precision, scale);
}
+ @Override
public MapBuilder addNullable(String name, MinorType type) {
tupleBuilder.addNullable(name, type);
return this;
@@ -96,6 +100,7 @@ public MapBuilder addNullable(String name, MinorType type, int precision, int sc
return addDecimal(name, type, DataMode.OPTIONAL, precision, scale);
}
+ @Override
public MapBuilder addArray(String name, MinorType type) {
tupleBuilder.addArray(name, type);
return this;
@@ -129,10 +134,12 @@ public MapBuilder addDecimal(String name, MinorType type,
* @param name the name of the map column
* @return a builder for the map
*/
+ @Override
public MapBuilder addMap(String name) {
return tupleBuilder.addMap(this, name);
}
+ @Override
public MapBuilder addMapArray(String name) {
return tupleBuilder.addMapArray(this, name);
}
@@ -185,6 +192,27 @@ public MapBuilder resumeMap() {
return (MapBuilder) parent;
}
+ /**
+ * Depending on whether the parent is a schema builder or map builder
+ * we resume appropriately.
+ */
+ @Override
+ public void resume() {
+ if (Objects.isNull(parent)) {
+ throw new IllegalStateException("Call to resume() on MapBuilder with no parent.");
+ }
+ if (parent instanceof MapBuilder) {
+ resumeMap();
+ } else {
+ assert(parent instanceof SchemaBuilder);
+ //
+ // This would be extended for other kinds of possible containers of a Map.
+ // First version only needed SchemaBuilder parents
+ //
+ resumeSchema();
+ }
+ }
+
public RepeatedListBuilder resumeList() {
build();
return (RepeatedListBuilder) parent;
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MapBuilderLike.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MapBuilderLike.java
new file mode 100644
index 00000000000..086c6a5b5e7
--- /dev/null
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MapBuilderLike.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.record.metadata;
+
+import org.apache.drill.common.types.TypeProtos;
+
+/**
+ * A common interface shared by SchemaBuilder and MapBuilder allowing one to do most
+ * operations for constructing metadata for hierarchical data
+ * without having to keep track of whether you are dealing with the top-level schema/row
+ * level or a map within it.
+ */
+public interface MapBuilderLike {
+
+ MapBuilderLike addArray(String colName, TypeProtos.MinorType drillType);
+
+ MapBuilderLike addNullable(String colName, TypeProtos.MinorType drillType);
+
+ MapBuilderLike add(String colName, TypeProtos.MinorType drillType);
+
+ MapBuilderLike addMapArray(String colName);
+
+ MapBuilderLike addMap(String colName);
+
+ void resume();
+}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/SchemaBuilder.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/SchemaBuilder.java
index c1b97609357..f2aee1d2190 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/SchemaBuilder.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/SchemaBuilder.java
@@ -61,7 +61,7 @@
*
*/
-public class SchemaBuilder implements SchemaContainer {
+public class SchemaBuilder implements MapBuilderLike, SchemaContainer {
/**
* Actual tuple schema builder. The odd layered structure is needed
@@ -114,6 +114,7 @@ public SchemaBuilder add(String name, MinorType type, DataMode mode) {
return this;
}
+ @Override
public SchemaBuilder add(String name, MinorType type) {
tupleBuilder.add(name, type);
return this;
@@ -128,6 +129,7 @@ public SchemaBuilder add(String name, MinorType type, int precision, int scale)
return addDecimal(name, type, DataMode.REQUIRED, precision, scale);
}
+ @Override
public SchemaBuilder addNullable(String name, MinorType type) {
tupleBuilder.addNullable(name, type);
return this;
@@ -142,6 +144,7 @@ public SchemaBuilder addNullable(String name, MinorType type, int precision, int
return addDecimal(name, type, DataMode.OPTIONAL, precision, scale);
}
+ @Override
public SchemaBuilder addArray(String name, MinorType type) {
tupleBuilder.addArray(name, type);
return this;
@@ -157,7 +160,7 @@ public SchemaBuilder addDecimal(String name, MinorType type, DataMode mode, int
}
/**
- * Add a multi-dimensional array, implemented as a repeated vector
+ * Add a multidimensional array, implemented as a repeated vector
* along with 0 or more repeated list vectors.
*
* @param name column name
@@ -191,10 +194,12 @@ public SchemaBuilder addAll(TupleMetadata from) {
* @param name the name of the map column
* @return a builder for the map
*/
+ @Override
public MapBuilder addMap(String name) {
return tupleBuilder.addMap(this, name);
}
+ @Override
public MapBuilder addMapArray(String name) {
return tupleBuilder.addMapArray(this, name);
}
@@ -237,4 +242,9 @@ public TupleMetadata buildSchema() {
public TupleMetadata build() {
return tupleBuilder.schema();
}
+
+ @Override
+ public void resume() {
+ // do nothing. There is nothing else to resume.
+ }
}
diff --git a/pom.xml b/pom.xml
index 957679eff50..6d444612244 100644
--- a/pom.xml
+++ b/pom.xml
@@ -70,6 +70,7 @@
1.10.0
1.7
5.5.0
+ 3.8.0
10.14.2.0
3072
apache/drill