diff --git a/contrib/format-daffodil/.gitignore b/contrib/format-daffodil/.gitignore
new file mode 100644
index 00000000000..9341ff44dc5
--- /dev/null
+++ b/contrib/format-daffodil/.gitignore
@@ -0,0 +1,2 @@
+# Directory to store oauth tokens for testing Googlesheets Storage plugin
+/src/test/resources/logback-test.xml
diff --git a/contrib/format-daffodil/README.md b/contrib/format-daffodil/README.md
new file mode 100644
index 00000000000..12c564ff530
--- /dev/null
+++ b/contrib/format-daffodil/README.md
@@ -0,0 +1,41 @@
+# Daffodil 'Format' Reader
+This plugin enables Drill to read DFDL-described data by way of the Apache Daffodil DFDL implementation.
+
+## Configuration
+ * ?? src/main/resources/bootstrap-format-plugins.json
+ * ?? src/main/resources/drill-module.conf
+
+TBD - how to enable/disable validation by Daffodil?
+Note that this should be per sql query that reads data.
+
+Note that schemas should be able to be loaded once in the config file and then referenced from queries.
+
+The query should include whether to validate (daffodil 'limited' is the only option) or not.
+
+## Data Types
+
+TBD
+
+## Provided Schema
+The Daffodil Format Reader supports pre-compiled DFDL schemas.
+
+TBD example query, data, and schema
+```sql
+SELECT * FROM dfs.`/tmp/data.dat`(type => 'daffodil',
+ schema => "schema_identifier_from_config");
+```
+
+## Limitations: TBD
+
+This version of the daffodil interface only supports data querying, there is no ability to output (aka unparse in DFDL terminology).
+
+The data is parsed fully from its native form into a Drill data structure held in memory.
+No attempt is made to avoid access to parts of the DFDL-described data that are not needed to answer the query.
+
+If the data is not well-formed, an error occurs and the query fails.
+
+If the data is invalid, and validity checking by Daffodil is enabled, then an error occurs and the query fails.
+
+## Future Functionality
+
+* TBD
diff --git a/contrib/format-daffodil/pom.xml b/contrib/format-daffodil/pom.xml
new file mode 100644
index 00000000000..01955c746dc
--- /dev/null
+++ b/contrib/format-daffodil/pom.xml
@@ -0,0 +1,94 @@
+
+
+
+ 4.0.0
+
+
+ drill-contrib-parent
+ org.apache.drill.contrib
+ 1.22.0-SNAPSHOT
+
+
+ drill-format-daffodil
+ Drill : Contrib : Format : Daffodil
+
+
+
+ org.apache.drill.exec
+ drill-java-exec
+ ${project.version}
+
+
+ org.apache.daffodil
+ daffodil-japi_2.12
+ 3.7.0-SNAPSHOT
+
+
+ org.apache.daffodil
+ daffodil-runtime1_2.12
+ 3.7.0-SNAPSHOT
+
+
+
+ org.apache.drill.exec
+ drill-java-exec
+ tests
+ ${project.version}
+ test
+
+
+
+ org.apache.drill
+ drill-common
+ tests
+ ${project.version}
+ test
+
+
+
+
+
+
+ maven-resources-plugin
+
+
+ copy-java-sources
+ process-sources
+
+ copy-resources
+
+
+ ${basedir}/target/classes/org/apache/drill/exec/store/daffodil
+
+
+
+ src/main/java/org/apache/drill/exec/store/daffodil
+ true
+
+
+
+
+
+
+
+
+
diff --git a/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilBatchReader.java b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilBatchReader.java
new file mode 100644
index 00000000000..e44c1f078c9
--- /dev/null
+++ b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilBatchReader.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.daffodil;
+
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.daffodil.japi.DataProcessor;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.daffodil.schema.DaffodilDataProcessorFactory;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaUtils.daffodilDataProcessorToDrillSchema;
+
+
+public class DaffodilBatchReader implements ManagedReader {
+
+ private static final Logger logger = LoggerFactory.getLogger(DaffodilBatchReader.class);
+ private final DaffodilFormatConfig dafConfig;
+ private final RowSetLoader rowSetLoader;
+ private final CustomErrorContext errorContext;
+ private final DaffodilMessageParser dafParser;
+ private final InputStream dataInputStream;
+
+ static class DaffodilReaderConfig {
+ final DaffodilFormatPlugin plugin;
+ DaffodilReaderConfig(DaffodilFormatPlugin plugin) {
+ this.plugin = plugin;
+ }
+ }
+
+ public DaffodilBatchReader (DaffodilReaderConfig readerConfig, EasySubScan scan, FileSchemaNegotiator negotiator) {
+
+ errorContext = negotiator.parentErrorContext();
+ this.dafConfig = readerConfig.plugin.getConfig();
+
+ String schemaURIString = dafConfig.getSchemaURI(); // "schema/complexArray1.dfdl.xsd";
+ String rootName = dafConfig.getRootName();
+ String rootNamespace = dafConfig.getRootNamespace();
+ Boolean validationMode = dafConfig.getValidationMode();
+
+ URI dfdlSchemaURI;
+ try {
+ dfdlSchemaURI = new URI(schemaURIString);
+ } catch (URISyntaxException e) {
+ throw UserException.validationError(e)
+ .build(logger);
+ }
+
+ FileDescrip file = negotiator.file();
+ DrillFileSystem fs = file.fileSystem();
+ URI fsSchemaURI = fs.getUri().resolve(dfdlSchemaURI);
+
+
+ DaffodilDataProcessorFactory dpf = new DaffodilDataProcessorFactory();
+ DataProcessor dp;
+ try {
+ dp = dpf.getDataProcessor(fsSchemaURI, validationMode, rootName, rootNamespace);
+ } catch (Exception e) {
+ throw UserException.dataReadError(e)
+ .message(String.format("Failed to get Daffodil DFDL processor for: %s", fsSchemaURI))
+ .addContext(errorContext).addContext(e.getMessage()).build(logger);
+ }
+ // Create the corresponding Drill schema.
+ // Note: this could be a very large schema. Think of a large complex RDBMS schema,
+ // all of it, hundreds of tables, but all part of the same metadata tree.
+ TupleMetadata drillSchema = daffodilDataProcessorToDrillSchema(dp);
+ // Inform Drill about the schema
+ negotiator.tableSchema(drillSchema, true);
+
+ //
+ // DATA TIME: Next we construct the runtime objects, and open files.
+ //
+ // We get the DaffodilMessageParser, which is a stateful driver for daffodil that
+ // actually does the parsing.
+ rowSetLoader = negotiator.build().writer();
+
+ // We construct the Daffodil InfosetOutputter which the daffodil parser uses to
+ // convert infoset event calls to fill in a Drill row via a rowSetLoader.
+ DaffodilDrillInfosetOutputter outputter = new DaffodilDrillInfosetOutputter(rowSetLoader);
+
+ // Now we can setup the dafParser with the outputter it will drive with
+ // the parser-produced infoset.
+ dafParser = new DaffodilMessageParser(dp); // needs further initialization after this.
+ dafParser.setInfosetOutputter(outputter);
+
+ Path dataPath = file.split().getPath();
+ // Lastly, we open the data stream
+ try {
+ dataInputStream = fs.openPossiblyCompressedStream(dataPath);
+ } catch (Exception e) {
+ throw UserException.dataReadError(e)
+ .message(String.format("Failed to open input file: %s", dataPath.toString()))
+ .addContext(errorContext).addContext(e.getMessage()).build(logger);
+ }
+ // And lastly,... tell daffodil the input data stream.
+ dafParser.setInputStream(dataInputStream);
+ }
+
+
+ /**
+ * This is the core of actual processing - data movement from Daffodil to Drill.
+ *
+ * If there is space in the batch, and there is data available to parse
+ * then this calls the daffodil parser, which parses data, delivering it to the rowWriter
+ * by way of the infoset outputter.
+ *
+ * Repeats until the rowWriter is full (a batch is full), or there is no more data, or
+ * a parse error ends execution with a throw.
+ *
+ * Validation errors and other warnings are not errors and are logged but do not cause
+ * parsing to fail/throw.
+ * @return true if there are rows retrieved, false if no rows were retrieved, which means
+ * no more will ever be retrieved (end of data).
+ * @throws RuntimeException on parse errors.
+ */
+ @Override
+ public boolean next() {
+ // Check assumed invariants
+ // We don't know if there is data or not. This could be called on an empty data file.
+ // We DO know that this won't be called if there is no space in the batch for even 1
+ // row.
+ if (dafParser.isEOF()) {
+ return false; // return without even checking for more rows or trying to parse.
+ }
+ while (rowSetLoader.start() && !dafParser.isEOF()) { // we never zero-trip this loop.
+ // the predicate is always true once.
+ try {
+ dafParser.parse();
+ if (dafParser.isProcessingError()) {
+ throw UserException.dataReadError().message(dafParser.getDiagnosticsAsString())
+ .addContext(errorContext).build(logger);
+ }
+ if (dafParser.isValidationError()) {
+ logger.warn(dafParser.getDiagnosticsAsString());
+ // Note that even if daffodil is set to not validate, validation errors may still occur
+ // from DFDL's "recoverableError" assertions.
+ }
+ } catch (Exception e) {
+ throw UserException.dataReadError(e).message("Error parsing file: " + e.getMessage())
+ .addContext(errorContext).build(logger);
+ }
+ // success! We have parsed data and populated a row.
+ rowSetLoader.save();
+ }
+ int nRows = rowSetLoader.rowCount();
+ assert nRows > 0; // This cannot be zero. If the parse failed we will have already thrown out of here.
+ return true;
+ }
+
+ @Override
+ public void close() {
+ if (dataInputStream != null)
+ AutoCloseables.closeSilently(dataInputStream);
+ }
+}
diff --git a/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilDrillInfosetOutputter.java b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilDrillInfosetOutputter.java
new file mode 100644
index 00000000000..e2d3ddb8e7a
--- /dev/null
+++ b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilDrillInfosetOutputter.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.daffodil;
+
+import org.apache.daffodil.lib.xml.NamedQName;
+import org.apache.daffodil.runtime1.api.BlobMethodsImpl;
+import org.apache.daffodil.runtime1.api.ElementMetadata;
+import org.apache.daffodil.runtime1.api.InfosetArray;
+import org.apache.daffodil.runtime1.api.InfosetComplexElement;
+import org.apache.daffodil.runtime1.api.InfosetOutputter;
+import org.apache.daffodil.runtime1.api.InfosetSimpleElement;
+import org.apache.daffodil.runtime1.api.Status;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.vector.accessor.ObjectWriter;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Enumeration;
+
+/**
+ * Adapts Daffodil parser infoset event calls to Drill writer calls
+ * to fill in Drill data rows.
+ */
+public class DaffodilDrillInfosetOutputter
+ extends BlobMethodsImpl implements InfosetOutputter {
+
+ private RowSetLoader writer;
+
+ private static final Logger logger = LoggerFactory.getLogger(DaffodilDrillInfosetOutputter.class);
+
+ private DaffodilDrillInfosetOutputter() {} // no default constructor
+
+ public DaffodilDrillInfosetOutputter(RowSetLoader writer) {
+ this.writer = writer;
+ }
+
+ @Override
+ public void reset() {
+ }
+
+ @Override
+ public void startDocument() {
+ }
+
+ @Override
+ public void endDocument() {
+ }
+
+ @Override
+ public void startSimple(InfosetSimpleElement diSimple) {
+ ElementMetadata erd = diSimple.metadata();
+ boolean isNilled = diSimple.isNilled();
+ NamedQName nqn = erd.namedQName();
+ String colName;
+ if (nqn.prefix().isDefined()) {
+ colName = nqn.prefix().get() + "_" + nqn.local();
+ } else {
+ colName = nqn.local();
+ }
+ // Keep in mind some Daffodil types must be converted to get the proper type for Drill
+ // but simple types like INT will work fine.
+ ObjectWriter objWriter = writer.column(colName);
+ if (isNilled) {
+ objWriter.setNull();
+ } else {
+ //
+ // FIXME: only INT is implemented right now.
+ //
+ ScalarWriter scalarWriter = objWriter.scalar();
+ //
+ // This downcast is needed because dataValue().getInt() the getInt() is declared @inline
+ // in the scala code. These value nodes and runtime data (metadata) nodes need a JAPI layer.
+ //
+ int value = diSimple.getInt(); // will fail on downcast if not an INT.
+ scalarWriter.setInt(value);
+ }
+ }
+
+ @Override
+ public void endSimple(InfosetSimpleElement diSimple) {
+ /// nothing to do.
+ }
+
+ @Override
+ public void startComplex(InfosetComplexElement diComplex) {
+ // FIXME: set a map as the value of the column
+ // push a new map writer for it
+ }
+
+ @Override
+ public void endComplex(InfosetComplexElement diComplex) {
+ // FIXME: pop stack of map writers
+ }
+
+ @Override
+ public void startArray(InfosetArray diArray) {
+ // FIXME: set an array as the value of the column
+ // push a new writer for it
+ // The contents of the column could be a scalar value
+ // or a map.
+ // It cannot be directly another vector, as DFDL requires the
+ // inner vector to be named, so there will be a map with that name.
+ }
+
+ @Override
+ public void endArray(InfosetArray diArray) {
+ // FIXME: pop stack of array writer.
+ }
+
+ @Override
+ public Enumeration.Value getStatus() {
+ return Status.READY();
+ }
+}
+
diff --git a/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilFormatConfig.java b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilFormatConfig.java
new file mode 100644
index 00000000000..80e2ba5110d
--- /dev/null
+++ b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilFormatConfig.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.daffodil;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.collect.ImmutableList;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.exec.store.daffodil.DaffodilBatchReader.DaffodilReaderConfig;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+@JsonTypeName(DaffodilFormatPlugin.DEFAULT_NAME)
+@JsonInclude(JsonInclude.Include.NON_DEFAULT)
+public class DaffodilFormatConfig implements FormatPluginConfig {
+
+ public final List extensions;
+ public final String schemaURI;
+ public final boolean validationMode;
+ public final String rootName;
+ public final String rootNamespace;
+ /**
+ * In the constructor for a format config, you should not use
+ * boxed versions of primitive types. It creates problems with
+ * defaulting them (they default to null) which cannot be unboxed.
+ */
+ @JsonCreator
+ public DaffodilFormatConfig(@JsonProperty("extensions") List extensions,
+ @JsonProperty("schemaURI") String schemaURI,
+ @JsonProperty("rootName") String rootName,
+ @JsonProperty("rootNamespace") String rootNamespace,
+ @JsonProperty("validationMode") boolean validationMode) {
+
+ this.extensions = extensions == null ? Collections.singletonList("dat") : ImmutableList.copyOf(extensions);
+ this.rootName = rootName;
+ this.rootNamespace = rootNamespace;
+ this.schemaURI = schemaURI;
+ // no default. Users must pick.
+ this.validationMode = validationMode;
+ }
+
+ @JsonInclude(JsonInclude.Include.NON_DEFAULT)
+ public List getExtensions() {
+ return extensions;
+ }
+
+ public String getSchemaURI() {
+ return schemaURI;
+ }
+
+ public String getRootName() {
+ return rootName;
+ }
+
+ public String getRootNamespace() {
+ return rootNamespace;
+ }
+
+ @JsonInclude(JsonInclude.Include.NON_DEFAULT)
+ public boolean getValidationMode() {
+ return validationMode;
+ }
+
+ public DaffodilReaderConfig getReaderConfig(DaffodilFormatPlugin plugin) {
+ DaffodilReaderConfig readerConfig = new DaffodilReaderConfig(plugin);
+ return readerConfig;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(schemaURI, validationMode, rootName, rootNamespace);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ DaffodilFormatConfig other = (DaffodilFormatConfig) obj;
+ return Objects.equals(schemaURI, other.schemaURI)
+ && Objects.equals(rootName, other.rootName)
+ && Objects.equals(rootNamespace, other.rootNamespace)
+ && Objects.equals(validationMode, other.validationMode);
+ }
+
+ @Override
+ public String toString() {
+ return new PlanStringBuilder(this)
+ .field("schemaURI", schemaURI)
+ .field("rootName", rootName)
+ .field("rootNamespace", rootNamespace)
+ .field("validationMode", validationMode)
+ .toString();
+ }
+}
diff --git a/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilFormatPlugin.java b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilFormatPlugin.java
new file mode 100644
index 00000000000..95b5e82aaac
--- /dev/null
+++ b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilFormatPlugin.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.daffodil;
+
+
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileReaderFactory;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileScanLifecycleBuilder;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.hadoop.conf.Configuration;
+
+
+public class DaffodilFormatPlugin extends EasyFormatPlugin {
+
+ public static final String DEFAULT_NAME = "daffodil";
+ public static final String OPERATOR_TYPE = "DAFFODIL_SUB_SCAN";
+
+ public static class DaffodilReaderFactory extends FileReaderFactory {
+ private final DaffodilBatchReader.DaffodilReaderConfig readerConfig;
+
+ private final EasySubScan scan;
+
+ public DaffodilReaderFactory(DaffodilBatchReader.DaffodilReaderConfig config, EasySubScan scan) {
+ this.readerConfig = config;
+ this.scan = scan;
+ }
+
+ @Override
+ public ManagedReader newReader(FileSchemaNegotiator negotiator) {
+ return new DaffodilBatchReader(readerConfig, scan, negotiator);
+ }
+ }
+
+ public DaffodilFormatPlugin(String name, DrillbitContext context,
+ Configuration fsConf, StoragePluginConfig storageConfig,
+ DaffodilFormatConfig formatConfig) {
+ super(name, easyConfig(fsConf, formatConfig), context, storageConfig, formatConfig);
+ }
+
+ private static EasyFormatConfig easyConfig(Configuration fsConf, DaffodilFormatConfig pluginConfig) {
+ return EasyFormatConfig.builder()
+ .readable(true)
+ .writable(false)
+ .blockSplittable(false)
+ .compressible(true)
+ .extensions(pluginConfig.getExtensions())
+ .fsConf(fsConf)
+ .readerOperatorType(OPERATOR_TYPE)
+ .scanVersion(ScanFrameworkVersion.EVF_V2)
+ .supportsLimitPushdown(true)
+ .supportsProjectPushdown(true)
+ .defaultName(DaffodilFormatPlugin.DEFAULT_NAME)
+ .build();
+ }
+
+ @Override
+ protected void configureScan(FileScanLifecycleBuilder builder, EasySubScan scan) {
+ builder.nullType(Types.optional(MinorType.VARCHAR));
+ builder.readerFactory(new DaffodilReaderFactory(formatConfig.getReaderConfig(this), scan));
+ }
+}
diff --git a/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilMessageParser.java b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilMessageParser.java
new file mode 100644
index 00000000000..8972b73608d
--- /dev/null
+++ b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilMessageParser.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.daffodil;
+
+
+import org.apache.daffodil.japi.Compiler;
+import org.apache.daffodil.japi.DataProcessor;
+import org.apache.daffodil.japi.Diagnostic;
+import org.apache.daffodil.japi.ParseResult;
+import org.apache.daffodil.runtime1.api.InfosetOutputter;
+import org.apache.daffodil.japi.io.InputSourceDataInputStream;
+import org.jdom2.Document;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.channels.Channels;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * DFDL Daffodil Streaming message parser
+ *
+ * You construct this providing a DataProcessor obtained from the
+ * DaffodilDataProcessorFactory.
+ * The DataProcessor contains the compiled DFDL schema, ready to use, as
+ * well as whether validation while parsing has been requested.
+ *
+ * The DataProcessor object may be shared/reused by multiple threads each of which
+ * has its own copy of this class.
+ * This object is, however, stateful, and must not be shared by multiple threads.
+ *
+ * You must call setInputStream, and setInfosetOutputter before
+ * you call parse().
+ * The input stream and the InfosetOutputter objects are also private to one thread and are stateful
+ * and owned by this object.
+ * Once you have called setInputStream, you should view the input stream as the private property of
+ * this object.
+ * The parse() will invoke the InfosetOutputter's methods to deliver
+ * parsed data, and it may optionally create diagnostics (obtained via getDiagnostics)
+ * indicating which kind they are via the getIsProcessingError, getIsValidationError.
+ *
+ * Note that the InfosetOutputter may be called many times before a processing error is detected,
+ * as Daffodil delivers result data incrementally.
+ *
+ * Validation errors do not affect the InfosetOutputter output calls, but indicate that data was
+ * detected that is invalid.
+ *
+ * When parse() returns, the parse has ended and one can check for errors/diagnostics.
+ * One can call parse() again if there is still data to consume, which is checked via the
+ * isEOF() method.
+ *
+ * There are no guarantees about where the input stream is positioned between parse() calls.
+ * In particular, it may not be positioned at the start of the next message, as Daffodil may
+ * have pre-fetched additional bytes from the input stream which it found are not part of the
+ * current infoset, but the next one.
+ * The positioning of the input stream may in fact be somewhere in the middle of a byte,
+ * as Daffodil does not require messages to be of lengths that are in whole byte units.
+ * Hence, once you give the input stream to this object via setInputStream, that input stream is
+ * owned privately by this class for ever after.
+ */
+public class DaffodilMessageParser {
+
+ /**
+ * Constructs the parser using a DataProcessor obtained from
+ * a DaffodilDataProcessorFactory.
+ * @param dp
+ */
+ DaffodilMessageParser(DataProcessor dp) {
+ this.dp = dp;
+ }
+
+ /**
+ * Provide the input stream from which data is to be parsed.
+ *
+ * This input stream is then owned by this object and becomes part of its state.
+ *
+ * It is; however, the responsibility of the caller to close this
+ * input stream after the completion of all parse calls.
+ * In particular, if a parse error is considered fatal, then
+ * the caller should close the input stream.
+ * There are advanced error-recovery techniques that may attempt to find
+ * data that can be parsed later in the data stream.
+ * In those cases the input stream would not be closed after a processing error,
+ * but such usage is beyond the scope of this javadoc.
+ * @param inputStream
+ */
+ public void setInputStream(InputStream inputStream) {
+ dis = new InputSourceDataInputStream(inputStream);
+ }
+
+ /**
+ * Provides the InfosetOutputter which will be called to deliver
+ * the Infoset via calls to its methods.
+ * @param outputter
+ */
+ public void setInfosetOutputter(InfosetOutputter outputter) {
+ this.outputter = outputter;
+ }
+
+ /**
+ * Called to pull messages from the data stream.
+ * The message 'Infoset' is delivered by way of calls to the InfosetOutputter's methods.
+ *
+ * After calling this, one may call getIsProcessingError, getIsValiationError, isEOF, and
+ * getDiagnostics.
+ */
+ public void parse() {
+ if (dis == null)
+ throw new IllegalStateException("Input stream must be provided by setInputStream() call.");
+ if (outputter == null)
+ throw new IllegalStateException("InfosetOutputter must be provided by setInfosetOutputter() call.");
+
+ reset();
+ ParseResult res = dp.parse(dis, outputter);
+ isProcessingError = res.isProcessingError();
+ isValidationError = res.isValidationError();
+ List diags = res.getDiagnostics();
+ }
+
+ /**
+ * True if the input stream is known to contain no more data.
+ * If the input stream is a true stream, not a file, then temporary unavailability of data
+ * may cause this call to block until the stream is closed from the other end, or data becomes
+ * available.
+ *
+ * False if the input stream is at EOF, and no more data can be obtained.
+ * It is an error to call parse() after isEOF has returned true.
+ * @return
+ */
+ public boolean isEOF() {
+ return !dis.hasData();
+ }
+
+ /**
+ * True if the parse() call failed with a processing error.
+ * This indicates that the data was not well-formed and could not be
+ * parsed successfully.
+ *
+ * It is possible for isProcessingError and isValidationError to both be true.
+ * @return
+ */
+ public boolean isProcessingError() { return isProcessingError; }
+
+ /**
+ * True if a validation error occurred during parsing.
+ * Subsequently to a validation error occurring, parsing may succeed or fail.
+ * after the validation error was detected.
+ * @return
+ */
+ public boolean isValidationError() { return isValidationError; }
+
+ /**
+ * After a parse() call this returns null or a list of 1 or more diagnostics.
+ *
+ * If isProcessingError or isValidationError are true, then this will contain at least 1
+ * diagnostic.
+ * If both are true this will contain at least 2 diagnostics.
+ * @return
+ */
+ public List getDiagnostics() { return diagnostics; }
+ public String getDiagnosticsAsString() {
+ String result = diagnostics.stream()
+ .map(Diagnostic::getMessage)
+ .collect(Collectors.joining("\n"));
+ return result;
+ }
+
+
+ private static final Logger logger = LoggerFactory.getLogger(DaffodilMessageParser.class);
+
+ private List diagnostics; // diagnostics.
+ private boolean isProcessingError;
+ private boolean isValidationError;
+
+ private InputSourceDataInputStream dis;
+ private InfosetOutputter outputter;
+ private DataProcessor dp;
+
+ private void reset() {
+ outputter.reset();
+ isProcessingError = false;
+ isValidationError = false;
+ diagnostics = null;
+ }
+}
diff --git a/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/schema/DaffodilDataProcessorFactory.java b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/schema/DaffodilDataProcessorFactory.java
new file mode 100644
index 00000000000..ab5e35f4692
--- /dev/null
+++ b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/schema/DaffodilDataProcessorFactory.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.daffodil.schema;
+
+
+import org.apache.daffodil.japi.Compiler;
+import org.apache.daffodil.japi.Compiler;
+import org.apache.daffodil.japi.Daffodil;
+import org.apache.daffodil.japi.DataProcessor;
+import org.apache.daffodil.japi.Diagnostic;
+import org.apache.daffodil.japi.InvalidParserException;
+import org.apache.daffodil.japi.InvalidUsageException;
+import org.apache.daffodil.japi.ProcessorFactory;
+import org.apache.daffodil.japi.ValidationMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.net.URI;
+import java.nio.channels.Channels;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Compiles a DFDL schema (mostly for tests) or loads a pre-compiled DFDL schema so
+ * that one can obtain a DataProcessor for use with DaffodilMessageParser.
+ *
+ * TODO: Needs to use a cache to avoid reloading/recompiling every time.
+ */
+public class DaffodilDataProcessorFactory {
+ // Default constructor is used.
+
+ private static final Logger logger = LoggerFactory.getLogger(DaffodilDataProcessorFactory.class);
+
+ private DataProcessor dp;
+
+ /**
+ * Thrown if schema compilation fails.
+ *
+ * Contains diagnostic objects which give the cause(s) of the failure.
+ */
+ public static class CompileFailure extends Exception {
+ List diags;
+
+ CompileFailure(List diagnostics) {
+ super("DFDL Schema Compile Failure");
+ diags = diagnostics;
+ }
+ }
+
+ /**
+ * Gets a Daffodil DataProcessor given the necessary arguments to compile or reload it.
+ * @param schemaFileURI pre-compiled dfdl schema (.bin extension) or DFDL schema source (.xsd extension)
+ * @param validationMode Use true to request Daffodil built-in 'limited' validation.
+ * Use false for no validation.
+ * @param rootName Local name of root element of the message. Can be null to use the first element
+ * declaration of the primary schema file. Ignored if reloading a pre-compiled schema.
+ * @param rootNS Namespace URI as a string. Can be null to use the target namespace
+ * of the primary schema file or if it is unambiguous what element is the rootName.
+ * Ignored if reloading a pre-compiled schema.
+ * @return the DataProcessor
+ * @throws CompileFailure - if schema compilation fails
+ * @throws IOException - if the schemaFileURI cannot be opened or is not found.
+ * @throws URISyntaxException - if the schemaFileURI is not legal syntax.
+ * @throws InvalidParserException - if the reloading of the parser from pre-compiled binary fails.
+ */
+ public DataProcessor getDataProcessor(
+ URI schemaFileURI,
+ boolean validationMode,
+ String rootName,
+ String rootNS)
+ throws CompileFailure, IOException, URISyntaxException, InvalidParserException {
+
+ DaffodilDataProcessorFactory dmp = new DaffodilDataProcessorFactory();
+ boolean isPrecompiled = schemaFileURI.toString().endsWith(".bin");
+ if (isPrecompiled) {
+ if (Objects.nonNull(rootName) && !rootName.isEmpty()) {
+ // A usage error. You shouldn't supply the name and optionally namespace if loading
+ // precompiled schema because those are built into it. Should be null or "".
+ logger.warn("Root element name '{}' is ignored when used with precompiled DFDL schema.", rootName);
+ }
+ dmp.loadSchema(schemaFileURI);
+ dmp.setupDP(validationMode, null);
+ } else {
+ List pfDiags = dmp.compileSchema(schemaFileURI, rootName, rootNS);
+ dmp.setupDP(validationMode, pfDiags);
+ }
+ return dmp.dp;
+ }
+
+ private void loadSchema(URI schemaFileURI)
+ throws URISyntaxException, IOException, InvalidParserException {
+ Compiler c = Daffodil.compiler();
+ dp = c.reload(Channels.newChannel(schemaFileURI.toURL().openStream()));
+ }
+
+ private List compileSchema(URI schemaFileURI, String rootName, String rootNS)
+ throws URISyntaxException, IOException, CompileFailure {
+ Compiler c = Daffodil.compiler();
+ ProcessorFactory pf = c.compileSource(schemaFileURI)
+ .withDistinguishedRootNode(rootName, rootNS);
+
+ List pfDiags = pf.getDiagnostics();
+ if (pf.isError()) {
+ throw new CompileFailure(pfDiags);
+ }
+ dp = pf.onPath("/");
+ return pfDiags; // must be just warnings. If it was errors we would have thrown.
+ }
+
+ /**
+ * Common setup steps used whether or not we reloaded or compiled a DFDL schema.
+ */
+ private void setupDP(boolean validationMode, List pfDiags) throws CompileFailure {
+ Objects.requireNonNull(dp); // true because failure to produce a dp throws CompileFailure.
+ if (validationMode) {
+ try {
+ dp = dp.withValidationMode(ValidationMode.Limited);
+ } catch (InvalidUsageException e) {
+ // impossible
+ throw new Error(e);
+ }
+ }
+ List dpDiags = dp.getDiagnostics();
+ if (dp.isError()) {
+ throw new CompileFailure(dpDiags);
+ }
+ // well this part is only if we compiled, and provided the pfDiags arg as non-null.
+ List compilationWarnings;
+ if (pfDiags != null && !pfDiags.isEmpty()) {
+ compilationWarnings = pfDiags;
+ compilationWarnings.addAll(dpDiags); // dpDiags might be empty. That's ok.
+ } else {
+ compilationWarnings = dpDiags; // dpDiags might be empty. That's ok.
+ }
+ }
+}
diff --git a/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/schema/DrillDaffodilSchemaUtils.java b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/schema/DrillDaffodilSchemaUtils.java
new file mode 100644
index 00000000000..26def784c32
--- /dev/null
+++ b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/schema/DrillDaffodilSchemaUtils.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.daffodil.schema;
+
+import org.apache.daffodil.japi.InvalidParserException;
+import org.apache.daffodil.japi.DataProcessor;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+
+public class DrillDaffodilSchemaUtils {
+ private static final MinorType DEFAULT_TYPE = MinorType.VARCHAR;
+ private static final Logger logger = LoggerFactory.getLogger(DrillDaffodilSchemaUtils.class);
+
+ /**
+ * This map maps the data types defined by the DFDL definition to Drill data types.
+ */
+ public static final ImmutableMap DFDL_TYPE_MAPPINGS =
+ ImmutableMap.builder()
+ .put("LONG", MinorType.BIGINT)
+ .put("INT", MinorType.INT)
+ .put("SHORT", MinorType.SMALLINT)
+ .put("BYTE", MinorType.TINYINT)
+ .put("UNSIGNEDLONG", MinorType.UINT8)
+ .put("UNSIGNEDINT", MinorType.UINT4)
+ .put("UNSIGNEDSHORT", MinorType.UINT2)
+ .put("UNSIGNEDBYTE", MinorType.UINT1)
+ .put("INTEGER", MinorType.BIGINT)
+ .put("NONNEGATIVEINTEGER", MinorType.BIGINT)
+ .put("BOOLEAN", MinorType.BIT)
+ .put("DATE", MinorType.DATE) // requires conversion
+ .put("DATETIME", MinorType.TIMESTAMP) // requires conversion
+ .put("DECIMAL", MinorType.VARDECIMAL) // requires conversion (maybe)
+ .put("DOUBLE", MinorType.FLOAT8)
+ .put("FLOAT", MinorType.FLOAT4)
+ .put("HEXBINARY", MinorType.VARBINARY)
+ .put("STRING", MinorType.VARCHAR)
+ .put("TIME", MinorType.TIME) // requires conversion
+ .build();
+
+
+ @VisibleForTesting
+ public static TupleMetadata processSchema(URI dfdlSchemaURI)
+ throws IOException, DaffodilDataProcessorFactory.CompileFailure,
+ URISyntaxException, InvalidParserException {
+ DaffodilDataProcessorFactory dpf = new DaffodilDataProcessorFactory();
+ DataProcessor dp = dpf.getDataProcessor(dfdlSchemaURI, true, null, null);
+ return daffodilDataProcessorToDrillSchema(dp);
+ }
+
+ public static TupleMetadata daffodilDataProcessorToDrillSchema(DataProcessor dp) {
+ DrillDaffodilSchemaVisitor schemaVisitor = new DrillDaffodilSchemaVisitor();
+ dp.walkMetadata(schemaVisitor);
+ TupleMetadata drillSchema = schemaVisitor.getDrillSchema();
+ return drillSchema;
+ }
+
+ /**
+ * Returns a {@link MinorType} of the corresponding DFDL Data Type. Defaults to VARCHAR if unknown
+ * @param dfdlType A String of the DFDL Data Type (local name only, i.e., no "xs:" prefix.
+ * @return A {@link MinorType} of the Drill data type.
+ */
+ public static MinorType getDrillDataType(String dfdlType) {
+ try {
+ MinorType type = DrillDaffodilSchemaUtils.DFDL_TYPE_MAPPINGS.get(dfdlType.toUpperCase());
+ if (type == null) {
+ return DEFAULT_TYPE;
+ } else {
+ return type;
+ }
+ } catch (NullPointerException e) {
+ logger.warn("Unknown data type found in XSD reader: {}. Returning VARCHAR.", dfdlType);
+ return DEFAULT_TYPE;
+ }
+ }
+}
diff --git a/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/schema/DrillDaffodilSchemaVisitor.java b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/schema/DrillDaffodilSchemaVisitor.java
new file mode 100644
index 00000000000..c4ce3c03757
--- /dev/null
+++ b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/schema/DrillDaffodilSchemaVisitor.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.daffodil.schema;
+
+import org.apache.daffodil.runtime1.api.ComplexElementMetadata;
+import org.apache.daffodil.runtime1.api.MetadataHandler;
+import org.apache.daffodil.runtime1.api.SimpleElementMetadata;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.record.metadata.MapBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Stack;
+
+/**
+ * This class transforms a DFDL/Daffodil schema into a Drill Schema.
+ */
+public class DrillDaffodilSchemaVisitor
+ extends MetadataHandler
+{
+ private static final Logger logger = LoggerFactory.getLogger(DrillDaffodilSchemaVisitor.class);
+ private final SchemaBuilder builder = new SchemaBuilder();
+
+ private final Stack mapBuilderStack = new Stack();
+
+ private MapBuilder mapBuilder() {
+ return mapBuilderStack.peek();
+ }
+
+ /**
+ * Returns a {@link TupleMetadata} representation of the DFDL schema.
+ * Should only be called after the walk of the DFDL schema with this visitor has been called.
+ * @return A {@link TupleMetadata} representation of the DFDL schema.
+ */
+ public TupleMetadata getDrillSchema() {
+ return builder.build();
+ }
+
+ @Override
+ public void simpleElementMetadata(SimpleElementMetadata md) {
+ String colName = md.namedQName().toQNameString().replace(":", "_");
+ MinorType drillType = DrillDaffodilSchemaUtils.getDrillDataType(md.primType().toString());
+ if (mapBuilderStack.isEmpty()) {
+ // special case if the root element is of simple type.
+ // This occurs in very small test cases. It is technically not allowed in XSD
+ // and a DFDL schema is an XSD; however, DFDL may loosen this restriction
+ if (md.isArray()) builder.addArray(colName, drillType);
+ else if (md.isOptional()) builder.addNullable(colName, drillType);
+ else builder.add(colName, drillType);
+ } else {
+ // the more common case is that a simple element appears within
+ // a map.
+ if (md.isArray()) mapBuilder().addArray(colName, drillType);
+ else if (md.isOptional()) mapBuilder().addNullable(colName, drillType);
+ else mapBuilder().add(colName, drillType);
+ }
+ }
+
+ @Override
+ public void startComplexElementMetadata(ComplexElementMetadata md) {
+ String colName = md.namedQName().toQNameString().replace(":", "_");
+ if (mapBuilderStack.isEmpty()) {
+ if (md.isArray()) mapBuilderStack.push(builder.addMapArray(colName));
+ else mapBuilderStack.push(builder.addMap(colName)); // also handles optional complex elements
+ } else {
+ if (md.isArray()) mapBuilderStack.push(mapBuilder().addMapArray(colName));
+ else mapBuilderStack.push(mapBuilder().addMap(colName)); // also handles optional complex elements
+ }
+ }
+
+ @Override
+ public void endComplexElementMetadata(ComplexElementMetadata md) {
+ if (mapBuilderStack.size() == 1) {
+ mapBuilder().resumeSchema();
+ } else {
+ mapBuilder().resumeMap();
+ }
+ mapBuilderStack.pop();
+ }
+
+}
diff --git a/contrib/format-daffodil/src/main/resources/bootstrap-format-plugins.json b/contrib/format-daffodil/src/main/resources/bootstrap-format-plugins.json
new file mode 100644
index 00000000000..966d9ba1b5b
--- /dev/null
+++ b/contrib/format-daffodil/src/main/resources/bootstrap-format-plugins.json
@@ -0,0 +1,37 @@
+{
+ "storage":{
+ "dfs": {
+ "type": "file",
+ "formats": {
+ "daffodil": {
+ "type": "daffodil",
+ "extensions": [
+ "dat"
+ ]
+ }
+ }
+ },
+ "cp": {
+ "type": "file",
+ "formats": {
+ "daffodil": {
+ "type": "daffodil",
+ "extensions": [
+ "dat"
+ ]
+ }
+ }
+ },
+ "s3": {
+ "type": "file",
+ "formats": {
+ "daffodil": {
+ "type": "daffodil",
+ "extensions": [
+ "dat"
+ ]
+ }
+ }
+ }
+ }
+}
diff --git a/contrib/format-daffodil/src/main/resources/drill-module.conf b/contrib/format-daffodil/src/main/resources/drill-module.conf
new file mode 100644
index 00000000000..52a902572e3
--- /dev/null
+++ b/contrib/format-daffodil/src/main/resources/drill-module.conf
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This file tells Drill to consider this module when class path scanning.
+# This file can also include any supplementary configuration information.
+# This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
+
+drill.classpath.scanning: {
+ packages += "org.apache.drill.exec.store.daffodil"
+}
diff --git a/contrib/format-daffodil/src/test/java/org/apache/drill/exec/store/daffodil/TestDaffodilReader.java b/contrib/format-daffodil/src/test/java/org/apache/drill/exec/store/daffodil/TestDaffodilReader.java
new file mode 100644
index 00000000000..1f68d3f8548
--- /dev/null
+++ b/contrib/format-daffodil/src/test/java/org/apache/drill/exec/store/daffodil/TestDaffodilReader.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.daffodil;
+
+import org.apache.drill.categories.RowSetTest;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryBuilder;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.nio.file.Paths;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+
+import static org.apache.drill.test.QueryTestUtil.generateCompressedFile;
+import static org.apache.drill.test.rowSet.RowSetUtilities.mapArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.objArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
+import static org.junit.Assert.assertEquals;
+
+@Category(RowSetTest.class)
+public class TestDaffodilReader extends ClusterTest {
+
+ String schemaURIRoot = "file:///opt/drill/contrib/format-daffodil/src/test/resources/";
+ String dataURIRoot = "file:///opt/drill/contrib/format-daffodil/src/test/resources/";
+ @BeforeClass
+ public static void setup() throws Exception {
+ // boilerplate call to start test rig
+ ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
+
+ DaffodilFormatConfig formatConfig =
+ new DaffodilFormatConfig(null,
+ "",
+ "",
+ "",
+ false);
+
+ cluster.defineFormat("dfs", "daffodil", formatConfig);
+
+ // Needed to test against compressed files.
+ // Copies data from src/test/resources to the dfs root.
+ java.io.File newDataDir = dirTestWatcher.copyResourceToRoot(Paths.get("data/"));
+ java.io.File newSchemaDir = dirTestWatcher.copyResourceToRoot(Paths.get("schema/"));
+ }
+
+ /**
+ * This unit test tests a simple data file
+ *
+ * @throws Exception Throw exception if anything goes wrong
+ */
+ @Test
+ public void testSimple1() throws Exception {
+
+ String sql = "SELECT * FROM table(dfs.`data/simple.dat.gz` " +
+ " (type => 'daffodil'," +
+ " validationMode => 'true', " +
+ " schemaURI => '" + schemaURIRoot + "schema/simple.dfdl.xsd'," +
+ " rootName => ''," +
+ " rootNamespace => '' " +
+ "))";
+
+ // String sql2 = "show files in dfs.`data`";
+ QueryBuilder qb = client.queryBuilder();
+ QueryBuilder query = qb.sql(sql);
+ RowSet results = query.rowSet();
+ results.print();
+ assertEquals(1, results.rowCount());
+
+ // create the expected metadata and data for this test
+ // metadata first
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("ex_r", MinorType.INT)
+ .buildSchema();
+
+ RowSet expected = client.rowSetBuilder(expectedSchema)
+ .addRow(0x00000101) // aka 257
+ .build();
+
+ new RowSetComparison(expected).verifyAndClearAll(results);
+ }
+
+}
diff --git a/contrib/format-daffodil/src/test/java/org/apache/drill/exec/store/daffodil/TestDaffodilUtils.java b/contrib/format-daffodil/src/test/java/org/apache/drill/exec/store/daffodil/TestDaffodilUtils.java
new file mode 100644
index 00000000000..285234f057f
--- /dev/null
+++ b/contrib/format-daffodil/src/test/java/org/apache/drill/exec/store/daffodil/TestDaffodilUtils.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.daffodil;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestDaffodilUtils {
+
+// @Test
+// public void testRemoveField() {
+// String test1 = "field1_field2_field3";
+// assertEquals(DaffodilUtils.removeField(test1, "field3"), "field1_field2");
+//
+// // Test with underscores
+// String test2 = "field_1_field_2_field_3";
+// assertEquals(DaffodilUtils.removeField(test2, "field_3"), "field_1_field_2");
+//
+// // Test with missing field
+// String test3 = "field_1_field_2_field_3";
+// assertEquals(DaffodilUtils.removeField(test3, "field_4"), "");
+//
+// // Test with empty string
+// String test4 = "";
+// assertEquals(DaffodilUtils.removeField(test4, "field_4"), "");
+// }
+}
diff --git a/contrib/format-daffodil/src/test/java/org/apache/drill/exec/store/daffodil/xsd/TestDaffodilSchema.java b/contrib/format-daffodil/src/test/java/org/apache/drill/exec/store/daffodil/xsd/TestDaffodilSchema.java
new file mode 100644
index 00000000000..9273f41fdcb
--- /dev/null
+++ b/contrib/format-daffodil/src/test/java/org/apache/drill/exec/store/daffodil/xsd/TestDaffodilSchema.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.daffodil.xsd;
+
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.util.DrillFileUtils;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaUtils;
+import org.junit.Test;
+
+import java.io.File;
+import java.net.URI;
+import java.net.URL;
+import java.time.format.DateTimeFormatter;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestDaffodilSchema {
+
+ @Test
+ public void testSimple() throws Exception {
+ URI schemaURI = getClass().getResource("/schema/simple.dfdl.xsd").toURI();
+ TupleMetadata schema = DrillDaffodilSchemaUtils.processSchema(schemaURI);
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("ex_r", MinorType.INT)
+ .buildSchema();
+ assertTrue(expectedSchema.isEquivalent(schema));
+ }
+
+ @Test
+ public void testComplex1() throws Exception {
+ URI schemaURI = getClass().getResource("/schema/complex1.dfdl.xsd").toURI();
+ TupleMetadata schema = DrillDaffodilSchemaUtils.processSchema(schemaURI);
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addMap("ex_r")
+ .add("a1", MinorType.INT)
+ .add("a2", MinorType.INT)
+ .resumeSchema()
+ .buildSchema();
+ assertTrue(expectedSchema.isEquivalent(schema));
+ }
+
+ @Test
+ public void testComplexArray1() throws Exception {
+ URI schemaURI = getClass().getResource("/schema/complexArray1.dfdl.xsd").toURI();
+ TupleMetadata schema = DrillDaffodilSchemaUtils.processSchema(schemaURI);
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addMap("ex_r")
+ .addMapArray("record")
+ .add("a1", MinorType.INT)
+ .add("a2", MinorType.INT)
+ .resumeMap()
+ .resumeSchema()
+ .buildSchema();
+ assertTrue(expectedSchema.isEquivalent(schema));
+ }
+
+}
diff --git a/contrib/format-daffodil/src/test/resources/data/simple.dat b/contrib/format-daffodil/src/test/resources/data/simple.dat
new file mode 100644
index 00000000000..dee9c4c8ada
Binary files /dev/null and b/contrib/format-daffodil/src/test/resources/data/simple.dat differ
diff --git a/contrib/format-daffodil/src/test/resources/data/simple.dat.gz b/contrib/format-daffodil/src/test/resources/data/simple.dat.gz
new file mode 100644
index 00000000000..5e4b3b37acf
Binary files /dev/null and b/contrib/format-daffodil/src/test/resources/data/simple.dat.gz differ
diff --git a/contrib/format-daffodil/src/test/resources/schema/complex1.dfdl.xsd b/contrib/format-daffodil/src/test/resources/schema/complex1.dfdl.xsd
new file mode 100644
index 00000000000..2a82b96b10a
--- /dev/null
+++ b/contrib/format-daffodil/src/test/resources/schema/complex1.dfdl.xsd
@@ -0,0 +1,46 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/contrib/format-daffodil/src/test/resources/schema/complexArray1.dfdl.xsd b/contrib/format-daffodil/src/test/resources/schema/complexArray1.dfdl.xsd
new file mode 100644
index 00000000000..919544ea7e5
--- /dev/null
+++ b/contrib/format-daffodil/src/test/resources/schema/complexArray1.dfdl.xsd
@@ -0,0 +1,52 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/contrib/format-daffodil/src/test/resources/schema/simple.dfdl.xsd b/contrib/format-daffodil/src/test/resources/schema/simple.dfdl.xsd
new file mode 100644
index 00000000000..ee617483337
--- /dev/null
+++ b/contrib/format-daffodil/src/test/resources/schema/simple.dfdl.xsd
@@ -0,0 +1,39 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/contrib/pom.xml b/contrib/pom.xml
index b4ee5326b35..4b84892e2ee 100644
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -59,6 +59,7 @@
format-pcapng
format-iceberg
format-deltalake
+ format-daffodil
storage-drill
storage-phoenix
storage-googlesheets
diff --git a/distribution/pom.xml b/distribution/pom.xml
index dfb9d28484a..a66376cd1ca 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -479,6 +479,11 @@
drill-format-log
${project.version}
+
+ org.apache.drill.contrib
+ drill-format-daffodil
+ ${project.version}
+
org.apache.drill.contrib
drill-druid-storage
diff --git a/distribution/src/assemble/component.xml b/distribution/src/assemble/component.xml
index 7cf3066e51d..35f0f0beadf 100644
--- a/distribution/src/assemble/component.xml
+++ b/distribution/src/assemble/component.xml
@@ -57,6 +57,7 @@
org.apache.drill.contrib:drill-format-msaccess:jar
org.apache.drill.contrib:drill-format-spss:jar
org.apache.drill.contrib:drill-format-sas:jar
+ org.apache.drill.contrib:drill-format-daffodil:jar
org.apache.drill.contrib:drill-jdbc-storage:jar
org.apache.drill.contrib:drill-kudu-storage:jar
org.apache.drill.contrib:drill-storage-phoenix:jar
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java
index 0a112b59360..001378707d6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java
@@ -78,7 +78,7 @@
* Some readers can determine the source schema at the start of a scan.
* For example, a CSV file has headers, a Parquet file has footers, both
* of which define a schema. This case is called "early schema." The
- * reader fefines the schema by calling
+ * reader defines the schema by calling
* {@link #tableSchema(TupleMetadata)} to provide the known schema.
*
* Late Dynamic Schema
diff --git a/pom.xml b/pom.xml
index 8baa6f7022d..86e1cb51f9c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -134,6 +134,7 @@
4.3.3
2.1.12
+ 2.6.0-SNAPSHOT
@@ -330,6 +331,11 @@
logback-core
${logback.version}
+
+ org.apache.daffodil
+ daffodil-japi_2.12
+ ${daffodil.version}
+