From eb418bf85adaaa798604efb236d8ca0dce4a4a54 Mon Sep 17 00:00:00 2001 From: Michael Beckerle Date: Wed, 11 Oct 2023 08:21:17 -0400 Subject: [PATCH] Checkpoint on adding Daffodil to Drill Depends on a mbeckerle-specific 3.7.0-SNAPSHOT fork of Daffodil. New format-daffodil module created A basic unit test of DFDL schema with a single int field works in that drill metadata is created by traversing the schema, then data is populated using drill's row-set-writer being drivn by a Daffodil InfosetOutputter from a Daffodil parse. Still uses absolute paths for the schemaFileURI. (which is cheating. Wouldn't work in a true distributed drill environment.) We have yet to work out how to enable Drill to provide access for DFDL schemas in XML form with include/import to be resolved. The input data stream is, however, being accessed in the proper Drill manner. Gunzip happened automatically. Nice. Note: Fix boxed Boolean vs. boolean problem. Don't use boxed primitives in Format config objects. https://github.com/apache/drill/issues/2835ssues/2835 --- contrib/format-daffodil/.gitignore | 2 + contrib/format-daffodil/README.md | 41 ++++ contrib/format-daffodil/pom.xml | 94 ++++++++ .../store/daffodil/DaffodilBatchReader.java | 184 ++++++++++++++++ .../DaffodilDrillInfosetOutputter.java | 131 +++++++++++ .../store/daffodil/DaffodilFormatConfig.java | 119 ++++++++++ .../store/daffodil/DaffodilFormatPlugin.java | 83 +++++++ .../store/daffodil/DaffodilMessageParser.java | 205 ++++++++++++++++++ .../schema/DaffodilDataProcessorFactory.java | 153 +++++++++++++ .../schema/DrillDaffodilSchemaUtils.java | 101 +++++++++ .../schema/DrillDaffodilSchemaVisitor.java | 100 +++++++++ .../resources/bootstrap-format-plugins.json | 37 ++++ .../src/main/resources/drill-module.conf | 25 +++ .../store/daffodil/TestDaffodilReader.java | 107 +++++++++ .../store/daffodil/TestDaffodilUtils.java | 44 ++++ .../daffodil/xsd/TestDaffodilSchema.java | 82 +++++++ .../src/test/resources/data/simple.dat | Bin 0 -> 4 bytes .../src/test/resources/data/simple.dat.gz | Bin 0 -> 35 bytes .../test/resources/schema/complex1.dfdl.xsd | 46 ++++ .../resources/schema/complexArray1.dfdl.xsd | 52 +++++ .../src/test/resources/schema/simple.dfdl.xsd | 39 ++++ contrib/pom.xml | 1 + distribution/pom.xml | 5 + distribution/src/assemble/component.xml | 1 + .../impl/scan/framework/SchemaNegotiator.java | 2 +- pom.xml | 6 + 26 files changed, 1659 insertions(+), 1 deletion(-) create mode 100644 contrib/format-daffodil/.gitignore create mode 100644 contrib/format-daffodil/README.md create mode 100644 contrib/format-daffodil/pom.xml create mode 100644 contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilBatchReader.java create mode 100644 contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilDrillInfosetOutputter.java create mode 100644 contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilFormatConfig.java create mode 100644 contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilFormatPlugin.java create mode 100644 contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilMessageParser.java create mode 100644 contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/schema/DaffodilDataProcessorFactory.java create mode 100644 contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/schema/DrillDaffodilSchemaUtils.java create mode 100644 contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/schema/DrillDaffodilSchemaVisitor.java create mode 100644 contrib/format-daffodil/src/main/resources/bootstrap-format-plugins.json create mode 100644 contrib/format-daffodil/src/main/resources/drill-module.conf create mode 100644 contrib/format-daffodil/src/test/java/org/apache/drill/exec/store/daffodil/TestDaffodilReader.java create mode 100644 contrib/format-daffodil/src/test/java/org/apache/drill/exec/store/daffodil/TestDaffodilUtils.java create mode 100644 contrib/format-daffodil/src/test/java/org/apache/drill/exec/store/daffodil/xsd/TestDaffodilSchema.java create mode 100644 contrib/format-daffodil/src/test/resources/data/simple.dat create mode 100644 contrib/format-daffodil/src/test/resources/data/simple.dat.gz create mode 100644 contrib/format-daffodil/src/test/resources/schema/complex1.dfdl.xsd create mode 100644 contrib/format-daffodil/src/test/resources/schema/complexArray1.dfdl.xsd create mode 100644 contrib/format-daffodil/src/test/resources/schema/simple.dfdl.xsd diff --git a/contrib/format-daffodil/.gitignore b/contrib/format-daffodil/.gitignore new file mode 100644 index 00000000000..9341ff44dc5 --- /dev/null +++ b/contrib/format-daffodil/.gitignore @@ -0,0 +1,2 @@ +# Directory to store oauth tokens for testing Googlesheets Storage plugin +/src/test/resources/logback-test.xml diff --git a/contrib/format-daffodil/README.md b/contrib/format-daffodil/README.md new file mode 100644 index 00000000000..12c564ff530 --- /dev/null +++ b/contrib/format-daffodil/README.md @@ -0,0 +1,41 @@ +# Daffodil 'Format' Reader +This plugin enables Drill to read DFDL-described data by way of the Apache Daffodil DFDL implementation. + +## Configuration + * ?? src/main/resources/bootstrap-format-plugins.json + * ?? src/main/resources/drill-module.conf + +TBD - how to enable/disable validation by Daffodil? +Note that this should be per sql query that reads data. + +Note that schemas should be able to be loaded once in the config file and then referenced from queries. + +The query should include whether to validate (daffodil 'limited' is the only option) or not. + +## Data Types + +TBD + +## Provided Schema +The Daffodil Format Reader supports pre-compiled DFDL schemas. + +TBD example query, data, and schema +```sql +SELECT * FROM dfs.`/tmp/data.dat`(type => 'daffodil', + schema => "schema_identifier_from_config"); +``` + +## Limitations: TBD + +This version of the daffodil interface only supports data querying, there is no ability to output (aka unparse in DFDL terminology). + +The data is parsed fully from its native form into a Drill data structure held in memory. +No attempt is made to avoid access to parts of the DFDL-described data that are not needed to answer the query. + +If the data is not well-formed, an error occurs and the query fails. + +If the data is invalid, and validity checking by Daffodil is enabled, then an error occurs and the query fails. + +## Future Functionality + +* TBD diff --git a/contrib/format-daffodil/pom.xml b/contrib/format-daffodil/pom.xml new file mode 100644 index 00000000000..01955c746dc --- /dev/null +++ b/contrib/format-daffodil/pom.xml @@ -0,0 +1,94 @@ + + + + 4.0.0 + + + drill-contrib-parent + org.apache.drill.contrib + 1.22.0-SNAPSHOT + + + drill-format-daffodil + Drill : Contrib : Format : Daffodil + + + + org.apache.drill.exec + drill-java-exec + ${project.version} + + + org.apache.daffodil + daffodil-japi_2.12 + 3.7.0-SNAPSHOT + + + org.apache.daffodil + daffodil-runtime1_2.12 + 3.7.0-SNAPSHOT + + + + org.apache.drill.exec + drill-java-exec + tests + ${project.version} + test + + + + org.apache.drill + drill-common + tests + ${project.version} + test + + + + + + + maven-resources-plugin + + + copy-java-sources + process-sources + + copy-resources + + + ${basedir}/target/classes/org/apache/drill/exec/store/daffodil + + + + src/main/java/org/apache/drill/exec/store/daffodil + true + + + + + + + + + diff --git a/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilBatchReader.java b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilBatchReader.java new file mode 100644 index 00000000000..e44c1f078c9 --- /dev/null +++ b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilBatchReader.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.store.daffodil; + +import java.io.InputStream; +import java.net.URI; +import java.net.URISyntaxException; + +import org.apache.daffodil.japi.DataProcessor; +import org.apache.drill.common.AutoCloseables; +import org.apache.drill.common.exceptions.CustomErrorContext; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader; +import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip; +import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator; +import org.apache.drill.exec.physical.resultSet.RowSetLoader; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.store.daffodil.schema.DaffodilDataProcessorFactory; +import org.apache.drill.exec.store.dfs.DrillFileSystem; +import org.apache.drill.exec.store.dfs.easy.EasySubScan; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaUtils.daffodilDataProcessorToDrillSchema; + + +public class DaffodilBatchReader implements ManagedReader { + + private static final Logger logger = LoggerFactory.getLogger(DaffodilBatchReader.class); + private final DaffodilFormatConfig dafConfig; + private final RowSetLoader rowSetLoader; + private final CustomErrorContext errorContext; + private final DaffodilMessageParser dafParser; + private final InputStream dataInputStream; + + static class DaffodilReaderConfig { + final DaffodilFormatPlugin plugin; + DaffodilReaderConfig(DaffodilFormatPlugin plugin) { + this.plugin = plugin; + } + } + + public DaffodilBatchReader (DaffodilReaderConfig readerConfig, EasySubScan scan, FileSchemaNegotiator negotiator) { + + errorContext = negotiator.parentErrorContext(); + this.dafConfig = readerConfig.plugin.getConfig(); + + String schemaURIString = dafConfig.getSchemaURI(); // "schema/complexArray1.dfdl.xsd"; + String rootName = dafConfig.getRootName(); + String rootNamespace = dafConfig.getRootNamespace(); + Boolean validationMode = dafConfig.getValidationMode(); + + URI dfdlSchemaURI; + try { + dfdlSchemaURI = new URI(schemaURIString); + } catch (URISyntaxException e) { + throw UserException.validationError(e) + .build(logger); + } + + FileDescrip file = negotiator.file(); + DrillFileSystem fs = file.fileSystem(); + URI fsSchemaURI = fs.getUri().resolve(dfdlSchemaURI); + + + DaffodilDataProcessorFactory dpf = new DaffodilDataProcessorFactory(); + DataProcessor dp; + try { + dp = dpf.getDataProcessor(fsSchemaURI, validationMode, rootName, rootNamespace); + } catch (Exception e) { + throw UserException.dataReadError(e) + .message(String.format("Failed to get Daffodil DFDL processor for: %s", fsSchemaURI)) + .addContext(errorContext).addContext(e.getMessage()).build(logger); + } + // Create the corresponding Drill schema. + // Note: this could be a very large schema. Think of a large complex RDBMS schema, + // all of it, hundreds of tables, but all part of the same metadata tree. + TupleMetadata drillSchema = daffodilDataProcessorToDrillSchema(dp); + // Inform Drill about the schema + negotiator.tableSchema(drillSchema, true); + + // + // DATA TIME: Next we construct the runtime objects, and open files. + // + // We get the DaffodilMessageParser, which is a stateful driver for daffodil that + // actually does the parsing. + rowSetLoader = negotiator.build().writer(); + + // We construct the Daffodil InfosetOutputter which the daffodil parser uses to + // convert infoset event calls to fill in a Drill row via a rowSetLoader. + DaffodilDrillInfosetOutputter outputter = new DaffodilDrillInfosetOutputter(rowSetLoader); + + // Now we can setup the dafParser with the outputter it will drive with + // the parser-produced infoset. + dafParser = new DaffodilMessageParser(dp); // needs further initialization after this. + dafParser.setInfosetOutputter(outputter); + + Path dataPath = file.split().getPath(); + // Lastly, we open the data stream + try { + dataInputStream = fs.openPossiblyCompressedStream(dataPath); + } catch (Exception e) { + throw UserException.dataReadError(e) + .message(String.format("Failed to open input file: %s", dataPath.toString())) + .addContext(errorContext).addContext(e.getMessage()).build(logger); + } + // And lastly,... tell daffodil the input data stream. + dafParser.setInputStream(dataInputStream); + } + + + /** + * This is the core of actual processing - data movement from Daffodil to Drill. + * + * If there is space in the batch, and there is data available to parse + * then this calls the daffodil parser, which parses data, delivering it to the rowWriter + * by way of the infoset outputter. + *

+ * Repeats until the rowWriter is full (a batch is full), or there is no more data, or + * a parse error ends execution with a throw. + *

+ * Validation errors and other warnings are not errors and are logged but do not cause + * parsing to fail/throw. + * @return true if there are rows retrieved, false if no rows were retrieved, which means + * no more will ever be retrieved (end of data). + * @throws RuntimeException on parse errors. + */ + @Override + public boolean next() { + // Check assumed invariants + // We don't know if there is data or not. This could be called on an empty data file. + // We DO know that this won't be called if there is no space in the batch for even 1 + // row. + if (dafParser.isEOF()) { + return false; // return without even checking for more rows or trying to parse. + } + while (rowSetLoader.start() && !dafParser.isEOF()) { // we never zero-trip this loop. + // the predicate is always true once. + try { + dafParser.parse(); + if (dafParser.isProcessingError()) { + throw UserException.dataReadError().message(dafParser.getDiagnosticsAsString()) + .addContext(errorContext).build(logger); + } + if (dafParser.isValidationError()) { + logger.warn(dafParser.getDiagnosticsAsString()); + // Note that even if daffodil is set to not validate, validation errors may still occur + // from DFDL's "recoverableError" assertions. + } + } catch (Exception e) { + throw UserException.dataReadError(e).message("Error parsing file: " + e.getMessage()) + .addContext(errorContext).build(logger); + } + // success! We have parsed data and populated a row. + rowSetLoader.save(); + } + int nRows = rowSetLoader.rowCount(); + assert nRows > 0; // This cannot be zero. If the parse failed we will have already thrown out of here. + return true; + } + + @Override + public void close() { + if (dataInputStream != null) + AutoCloseables.closeSilently(dataInputStream); + } +} diff --git a/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilDrillInfosetOutputter.java b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilDrillInfosetOutputter.java new file mode 100644 index 00000000000..e2d3ddb8e7a --- /dev/null +++ b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilDrillInfosetOutputter.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.daffodil; + +import org.apache.daffodil.lib.xml.NamedQName; +import org.apache.daffodil.runtime1.api.BlobMethodsImpl; +import org.apache.daffodil.runtime1.api.ElementMetadata; +import org.apache.daffodil.runtime1.api.InfosetArray; +import org.apache.daffodil.runtime1.api.InfosetComplexElement; +import org.apache.daffodil.runtime1.api.InfosetOutputter; +import org.apache.daffodil.runtime1.api.InfosetSimpleElement; +import org.apache.daffodil.runtime1.api.Status; +import org.apache.drill.exec.physical.resultSet.RowSetLoader; +import org.apache.drill.exec.vector.accessor.ObjectWriter; +import org.apache.drill.exec.vector.accessor.ScalarWriter; +import org.apache.drill.exec.vector.accessor.TupleWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Enumeration; + +/** + * Adapts Daffodil parser infoset event calls to Drill writer calls + * to fill in Drill data rows. + */ +public class DaffodilDrillInfosetOutputter + extends BlobMethodsImpl implements InfosetOutputter { + + private RowSetLoader writer; + + private static final Logger logger = LoggerFactory.getLogger(DaffodilDrillInfosetOutputter.class); + + private DaffodilDrillInfosetOutputter() {} // no default constructor + + public DaffodilDrillInfosetOutputter(RowSetLoader writer) { + this.writer = writer; + } + + @Override + public void reset() { + } + + @Override + public void startDocument() { + } + + @Override + public void endDocument() { + } + + @Override + public void startSimple(InfosetSimpleElement diSimple) { + ElementMetadata erd = diSimple.metadata(); + boolean isNilled = diSimple.isNilled(); + NamedQName nqn = erd.namedQName(); + String colName; + if (nqn.prefix().isDefined()) { + colName = nqn.prefix().get() + "_" + nqn.local(); + } else { + colName = nqn.local(); + } + // Keep in mind some Daffodil types must be converted to get the proper type for Drill + // but simple types like INT will work fine. + ObjectWriter objWriter = writer.column(colName); + if (isNilled) { + objWriter.setNull(); + } else { + // + // FIXME: only INT is implemented right now. + // + ScalarWriter scalarWriter = objWriter.scalar(); + // + // This downcast is needed because dataValue().getInt() the getInt() is declared @inline + // in the scala code. These value nodes and runtime data (metadata) nodes need a JAPI layer. + // + int value = diSimple.getInt(); // will fail on downcast if not an INT. + scalarWriter.setInt(value); + } + } + + @Override + public void endSimple(InfosetSimpleElement diSimple) { + /// nothing to do. + } + + @Override + public void startComplex(InfosetComplexElement diComplex) { + // FIXME: set a map as the value of the column + // push a new map writer for it + } + + @Override + public void endComplex(InfosetComplexElement diComplex) { + // FIXME: pop stack of map writers + } + + @Override + public void startArray(InfosetArray diArray) { + // FIXME: set an array as the value of the column + // push a new writer for it + // The contents of the column could be a scalar value + // or a map. + // It cannot be directly another vector, as DFDL requires the + // inner vector to be named, so there will be a map with that name. + } + + @Override + public void endArray(InfosetArray diArray) { + // FIXME: pop stack of array writer. + } + + @Override + public Enumeration.Value getStatus() { + return Status.READY(); + } +} + diff --git a/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilFormatConfig.java b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilFormatConfig.java new file mode 100644 index 00000000000..80e2ba5110d --- /dev/null +++ b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilFormatConfig.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.store.daffodil; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.collect.ImmutableList; +import org.apache.drill.common.PlanStringBuilder; +import org.apache.drill.common.logical.FormatPluginConfig; +import org.apache.drill.exec.store.daffodil.DaffodilBatchReader.DaffodilReaderConfig; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +@JsonTypeName(DaffodilFormatPlugin.DEFAULT_NAME) +@JsonInclude(JsonInclude.Include.NON_DEFAULT) +public class DaffodilFormatConfig implements FormatPluginConfig { + + public final List extensions; + public final String schemaURI; + public final boolean validationMode; + public final String rootName; + public final String rootNamespace; + /** + * In the constructor for a format config, you should not use + * boxed versions of primitive types. It creates problems with + * defaulting them (they default to null) which cannot be unboxed. + */ + @JsonCreator + public DaffodilFormatConfig(@JsonProperty("extensions") List extensions, + @JsonProperty("schemaURI") String schemaURI, + @JsonProperty("rootName") String rootName, + @JsonProperty("rootNamespace") String rootNamespace, + @JsonProperty("validationMode") boolean validationMode) { + + this.extensions = extensions == null ? Collections.singletonList("dat") : ImmutableList.copyOf(extensions); + this.rootName = rootName; + this.rootNamespace = rootNamespace; + this.schemaURI = schemaURI; + // no default. Users must pick. + this.validationMode = validationMode; + } + + @JsonInclude(JsonInclude.Include.NON_DEFAULT) + public List getExtensions() { + return extensions; + } + + public String getSchemaURI() { + return schemaURI; + } + + public String getRootName() { + return rootName; + } + + public String getRootNamespace() { + return rootNamespace; + } + + @JsonInclude(JsonInclude.Include.NON_DEFAULT) + public boolean getValidationMode() { + return validationMode; + } + + public DaffodilReaderConfig getReaderConfig(DaffodilFormatPlugin plugin) { + DaffodilReaderConfig readerConfig = new DaffodilReaderConfig(plugin); + return readerConfig; + } + + @Override + public int hashCode() { + return Objects.hash(schemaURI, validationMode, rootName, rootNamespace); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + DaffodilFormatConfig other = (DaffodilFormatConfig) obj; + return Objects.equals(schemaURI, other.schemaURI) + && Objects.equals(rootName, other.rootName) + && Objects.equals(rootNamespace, other.rootNamespace) + && Objects.equals(validationMode, other.validationMode); + } + + @Override + public String toString() { + return new PlanStringBuilder(this) + .field("schemaURI", schemaURI) + .field("rootName", rootName) + .field("rootNamespace", rootNamespace) + .field("validationMode", validationMode) + .toString(); + } +} diff --git a/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilFormatPlugin.java b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilFormatPlugin.java new file mode 100644 index 00000000000..95b5e82aaac --- /dev/null +++ b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilFormatPlugin.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.store.daffodil; + + +import org.apache.drill.common.logical.StoragePluginConfig; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader; +import org.apache.drill.exec.physical.impl.scan.v3.file.FileReaderFactory; +import org.apache.drill.exec.physical.impl.scan.v3.file.FileScanLifecycleBuilder; +import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin; +import org.apache.drill.exec.store.dfs.easy.EasySubScan; +import org.apache.hadoop.conf.Configuration; + + +public class DaffodilFormatPlugin extends EasyFormatPlugin { + + public static final String DEFAULT_NAME = "daffodil"; + public static final String OPERATOR_TYPE = "DAFFODIL_SUB_SCAN"; + + public static class DaffodilReaderFactory extends FileReaderFactory { + private final DaffodilBatchReader.DaffodilReaderConfig readerConfig; + + private final EasySubScan scan; + + public DaffodilReaderFactory(DaffodilBatchReader.DaffodilReaderConfig config, EasySubScan scan) { + this.readerConfig = config; + this.scan = scan; + } + + @Override + public ManagedReader newReader(FileSchemaNegotiator negotiator) { + return new DaffodilBatchReader(readerConfig, scan, negotiator); + } + } + + public DaffodilFormatPlugin(String name, DrillbitContext context, + Configuration fsConf, StoragePluginConfig storageConfig, + DaffodilFormatConfig formatConfig) { + super(name, easyConfig(fsConf, formatConfig), context, storageConfig, formatConfig); + } + + private static EasyFormatConfig easyConfig(Configuration fsConf, DaffodilFormatConfig pluginConfig) { + return EasyFormatConfig.builder() + .readable(true) + .writable(false) + .blockSplittable(false) + .compressible(true) + .extensions(pluginConfig.getExtensions()) + .fsConf(fsConf) + .readerOperatorType(OPERATOR_TYPE) + .scanVersion(ScanFrameworkVersion.EVF_V2) + .supportsLimitPushdown(true) + .supportsProjectPushdown(true) + .defaultName(DaffodilFormatPlugin.DEFAULT_NAME) + .build(); + } + + @Override + protected void configureScan(FileScanLifecycleBuilder builder, EasySubScan scan) { + builder.nullType(Types.optional(MinorType.VARCHAR)); + builder.readerFactory(new DaffodilReaderFactory(formatConfig.getReaderConfig(this), scan)); + } +} diff --git a/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilMessageParser.java b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilMessageParser.java new file mode 100644 index 00000000000..8972b73608d --- /dev/null +++ b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilMessageParser.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.daffodil; + + +import org.apache.daffodil.japi.Compiler; +import org.apache.daffodil.japi.DataProcessor; +import org.apache.daffodil.japi.Diagnostic; +import org.apache.daffodil.japi.ParseResult; +import org.apache.daffodil.runtime1.api.InfosetOutputter; +import org.apache.daffodil.japi.io.InputSourceDataInputStream; +import org.jdom2.Document; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.channels.Channels; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * DFDL Daffodil Streaming message parser + *
+ * You construct this providing a DataProcessor obtained from the + * DaffodilDataProcessorFactory. + * The DataProcessor contains the compiled DFDL schema, ready to use, as + * well as whether validation while parsing has been requested. + *
+ * The DataProcessor object may be shared/reused by multiple threads each of which + * has its own copy of this class. + * This object is, however, stateful, and must not be shared by multiple threads. + *
+ * You must call setInputStream, and setInfosetOutputter before + * you call parse(). + * The input stream and the InfosetOutputter objects are also private to one thread and are stateful + * and owned by this object. + * Once you have called setInputStream, you should view the input stream as the private property of + * this object. + * The parse() will invoke the InfosetOutputter's methods to deliver + * parsed data, and it may optionally create diagnostics (obtained via getDiagnostics) + * indicating which kind they are via the getIsProcessingError, getIsValidationError. + *
+ * Note that the InfosetOutputter may be called many times before a processing error is detected, + * as Daffodil delivers result data incrementally. + *
+ * Validation errors do not affect the InfosetOutputter output calls, but indicate that data was + * detected that is invalid. + *
+ * When parse() returns, the parse has ended and one can check for errors/diagnostics. + * One can call parse() again if there is still data to consume, which is checked via the + * isEOF() method. + *
+ * There are no guarantees about where the input stream is positioned between parse() calls. + * In particular, it may not be positioned at the start of the next message, as Daffodil may + * have pre-fetched additional bytes from the input stream which it found are not part of the + * current infoset, but the next one. + * The positioning of the input stream may in fact be somewhere in the middle of a byte, + * as Daffodil does not require messages to be of lengths that are in whole byte units. + * Hence, once you give the input stream to this object via setInputStream, that input stream is + * owned privately by this class for ever after. + */ +public class DaffodilMessageParser { + + /** + * Constructs the parser using a DataProcessor obtained from + * a DaffodilDataProcessorFactory. + * @param dp + */ + DaffodilMessageParser(DataProcessor dp) { + this.dp = dp; + } + + /** + * Provide the input stream from which data is to be parsed. + *
+ * This input stream is then owned by this object and becomes part of its state. + *
+ * It is; however, the responsibility of the caller to close this + * input stream after the completion of all parse calls. + * In particular, if a parse error is considered fatal, then + * the caller should close the input stream. + * There are advanced error-recovery techniques that may attempt to find + * data that can be parsed later in the data stream. + * In those cases the input stream would not be closed after a processing error, + * but such usage is beyond the scope of this javadoc. + * @param inputStream + */ + public void setInputStream(InputStream inputStream) { + dis = new InputSourceDataInputStream(inputStream); + } + + /** + * Provides the InfosetOutputter which will be called to deliver + * the Infoset via calls to its methods. + * @param outputter + */ + public void setInfosetOutputter(InfosetOutputter outputter) { + this.outputter = outputter; + } + + /** + * Called to pull messages from the data stream. + * The message 'Infoset' is delivered by way of calls to the InfosetOutputter's methods. + *
+ * After calling this, one may call getIsProcessingError, getIsValiationError, isEOF, and + * getDiagnostics. + */ + public void parse() { + if (dis == null) + throw new IllegalStateException("Input stream must be provided by setInputStream() call."); + if (outputter == null) + throw new IllegalStateException("InfosetOutputter must be provided by setInfosetOutputter() call."); + + reset(); + ParseResult res = dp.parse(dis, outputter); + isProcessingError = res.isProcessingError(); + isValidationError = res.isValidationError(); + List diags = res.getDiagnostics(); + } + + /** + * True if the input stream is known to contain no more data. + * If the input stream is a true stream, not a file, then temporary unavailability of data + * may cause this call to block until the stream is closed from the other end, or data becomes + * available. + *
+ * False if the input stream is at EOF, and no more data can be obtained. + * It is an error to call parse() after isEOF has returned true. + * @return + */ + public boolean isEOF() { + return !dis.hasData(); + } + + /** + * True if the parse() call failed with a processing error. + * This indicates that the data was not well-formed and could not be + * parsed successfully. + *
+ * It is possible for isProcessingError and isValidationError to both be true. + * @return + */ + public boolean isProcessingError() { return isProcessingError; } + + /** + * True if a validation error occurred during parsing. + * Subsequently to a validation error occurring, parsing may succeed or fail. + * after the validation error was detected. + * @return + */ + public boolean isValidationError() { return isValidationError; } + + /** + * After a parse() call this returns null or a list of 1 or more diagnostics. + *
+ * If isProcessingError or isValidationError are true, then this will contain at least 1 + * diagnostic. + * If both are true this will contain at least 2 diagnostics. + * @return + */ + public List getDiagnostics() { return diagnostics; } + public String getDiagnosticsAsString() { + String result = diagnostics.stream() + .map(Diagnostic::getMessage) + .collect(Collectors.joining("\n")); + return result; + } + + + private static final Logger logger = LoggerFactory.getLogger(DaffodilMessageParser.class); + + private List diagnostics; // diagnostics. + private boolean isProcessingError; + private boolean isValidationError; + + private InputSourceDataInputStream dis; + private InfosetOutputter outputter; + private DataProcessor dp; + + private void reset() { + outputter.reset(); + isProcessingError = false; + isValidationError = false; + diagnostics = null; + } +} diff --git a/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/schema/DaffodilDataProcessorFactory.java b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/schema/DaffodilDataProcessorFactory.java new file mode 100644 index 00000000000..ab5e35f4692 --- /dev/null +++ b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/schema/DaffodilDataProcessorFactory.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.daffodil.schema; + + +import org.apache.daffodil.japi.Compiler; +import org.apache.daffodil.japi.Compiler; +import org.apache.daffodil.japi.Daffodil; +import org.apache.daffodil.japi.DataProcessor; +import org.apache.daffodil.japi.Diagnostic; +import org.apache.daffodil.japi.InvalidParserException; +import org.apache.daffodil.japi.InvalidUsageException; +import org.apache.daffodil.japi.ProcessorFactory; +import org.apache.daffodil.japi.ValidationMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.net.URI; +import java.nio.channels.Channels; +import java.util.List; +import java.util.Objects; + +/** + * Compiles a DFDL schema (mostly for tests) or loads a pre-compiled DFDL schema so + * that one can obtain a DataProcessor for use with DaffodilMessageParser. + *
+ * TODO: Needs to use a cache to avoid reloading/recompiling every time. + */ +public class DaffodilDataProcessorFactory { + // Default constructor is used. + + private static final Logger logger = LoggerFactory.getLogger(DaffodilDataProcessorFactory.class); + + private DataProcessor dp; + + /** + * Thrown if schema compilation fails. + *
+ * Contains diagnostic objects which give the cause(s) of the failure. + */ + public static class CompileFailure extends Exception { + List diags; + + CompileFailure(List diagnostics) { + super("DFDL Schema Compile Failure"); + diags = diagnostics; + } + } + + /** + * Gets a Daffodil DataProcessor given the necessary arguments to compile or reload it. + * @param schemaFileURI pre-compiled dfdl schema (.bin extension) or DFDL schema source (.xsd extension) + * @param validationMode Use true to request Daffodil built-in 'limited' validation. + * Use false for no validation. + * @param rootName Local name of root element of the message. Can be null to use the first element + * declaration of the primary schema file. Ignored if reloading a pre-compiled schema. + * @param rootNS Namespace URI as a string. Can be null to use the target namespace + * of the primary schema file or if it is unambiguous what element is the rootName. + * Ignored if reloading a pre-compiled schema. + * @return the DataProcessor + * @throws CompileFailure - if schema compilation fails + * @throws IOException - if the schemaFileURI cannot be opened or is not found. + * @throws URISyntaxException - if the schemaFileURI is not legal syntax. + * @throws InvalidParserException - if the reloading of the parser from pre-compiled binary fails. + */ + public DataProcessor getDataProcessor( + URI schemaFileURI, + boolean validationMode, + String rootName, + String rootNS) + throws CompileFailure, IOException, URISyntaxException, InvalidParserException { + + DaffodilDataProcessorFactory dmp = new DaffodilDataProcessorFactory(); + boolean isPrecompiled = schemaFileURI.toString().endsWith(".bin"); + if (isPrecompiled) { + if (Objects.nonNull(rootName) && !rootName.isEmpty()) { + // A usage error. You shouldn't supply the name and optionally namespace if loading + // precompiled schema because those are built into it. Should be null or "". + logger.warn("Root element name '{}' is ignored when used with precompiled DFDL schema.", rootName); + } + dmp.loadSchema(schemaFileURI); + dmp.setupDP(validationMode, null); + } else { + List pfDiags = dmp.compileSchema(schemaFileURI, rootName, rootNS); + dmp.setupDP(validationMode, pfDiags); + } + return dmp.dp; + } + + private void loadSchema(URI schemaFileURI) + throws URISyntaxException, IOException, InvalidParserException { + Compiler c = Daffodil.compiler(); + dp = c.reload(Channels.newChannel(schemaFileURI.toURL().openStream())); + } + + private List compileSchema(URI schemaFileURI, String rootName, String rootNS) + throws URISyntaxException, IOException, CompileFailure { + Compiler c = Daffodil.compiler(); + ProcessorFactory pf = c.compileSource(schemaFileURI) + .withDistinguishedRootNode(rootName, rootNS); + + List pfDiags = pf.getDiagnostics(); + if (pf.isError()) { + throw new CompileFailure(pfDiags); + } + dp = pf.onPath("/"); + return pfDiags; // must be just warnings. If it was errors we would have thrown. + } + + /** + * Common setup steps used whether or not we reloaded or compiled a DFDL schema. + */ + private void setupDP(boolean validationMode, List pfDiags) throws CompileFailure { + Objects.requireNonNull(dp); // true because failure to produce a dp throws CompileFailure. + if (validationMode) { + try { + dp = dp.withValidationMode(ValidationMode.Limited); + } catch (InvalidUsageException e) { + // impossible + throw new Error(e); + } + } + List dpDiags = dp.getDiagnostics(); + if (dp.isError()) { + throw new CompileFailure(dpDiags); + } + // well this part is only if we compiled, and provided the pfDiags arg as non-null. + List compilationWarnings; + if (pfDiags != null && !pfDiags.isEmpty()) { + compilationWarnings = pfDiags; + compilationWarnings.addAll(dpDiags); // dpDiags might be empty. That's ok. + } else { + compilationWarnings = dpDiags; // dpDiags might be empty. That's ok. + } + } +} diff --git a/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/schema/DrillDaffodilSchemaUtils.java b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/schema/DrillDaffodilSchemaUtils.java new file mode 100644 index 00000000000..26def784c32 --- /dev/null +++ b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/schema/DrillDaffodilSchemaUtils.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.store.daffodil.schema; + +import org.apache.daffodil.japi.InvalidParserException; +import org.apache.daffodil.japi.DataProcessor; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; + + +public class DrillDaffodilSchemaUtils { + private static final MinorType DEFAULT_TYPE = MinorType.VARCHAR; + private static final Logger logger = LoggerFactory.getLogger(DrillDaffodilSchemaUtils.class); + + /** + * This map maps the data types defined by the DFDL definition to Drill data types. + */ + public static final ImmutableMap DFDL_TYPE_MAPPINGS = + ImmutableMap.builder() + .put("LONG", MinorType.BIGINT) + .put("INT", MinorType.INT) + .put("SHORT", MinorType.SMALLINT) + .put("BYTE", MinorType.TINYINT) + .put("UNSIGNEDLONG", MinorType.UINT8) + .put("UNSIGNEDINT", MinorType.UINT4) + .put("UNSIGNEDSHORT", MinorType.UINT2) + .put("UNSIGNEDBYTE", MinorType.UINT1) + .put("INTEGER", MinorType.BIGINT) + .put("NONNEGATIVEINTEGER", MinorType.BIGINT) + .put("BOOLEAN", MinorType.BIT) + .put("DATE", MinorType.DATE) // requires conversion + .put("DATETIME", MinorType.TIMESTAMP) // requires conversion + .put("DECIMAL", MinorType.VARDECIMAL) // requires conversion (maybe) + .put("DOUBLE", MinorType.FLOAT8) + .put("FLOAT", MinorType.FLOAT4) + .put("HEXBINARY", MinorType.VARBINARY) + .put("STRING", MinorType.VARCHAR) + .put("TIME", MinorType.TIME) // requires conversion + .build(); + + + @VisibleForTesting + public static TupleMetadata processSchema(URI dfdlSchemaURI) + throws IOException, DaffodilDataProcessorFactory.CompileFailure, + URISyntaxException, InvalidParserException { + DaffodilDataProcessorFactory dpf = new DaffodilDataProcessorFactory(); + DataProcessor dp = dpf.getDataProcessor(dfdlSchemaURI, true, null, null); + return daffodilDataProcessorToDrillSchema(dp); + } + + public static TupleMetadata daffodilDataProcessorToDrillSchema(DataProcessor dp) { + DrillDaffodilSchemaVisitor schemaVisitor = new DrillDaffodilSchemaVisitor(); + dp.walkMetadata(schemaVisitor); + TupleMetadata drillSchema = schemaVisitor.getDrillSchema(); + return drillSchema; + } + + /** + * Returns a {@link MinorType} of the corresponding DFDL Data Type. Defaults to VARCHAR if unknown + * @param dfdlType A String of the DFDL Data Type (local name only, i.e., no "xs:" prefix. + * @return A {@link MinorType} of the Drill data type. + */ + public static MinorType getDrillDataType(String dfdlType) { + try { + MinorType type = DrillDaffodilSchemaUtils.DFDL_TYPE_MAPPINGS.get(dfdlType.toUpperCase()); + if (type == null) { + return DEFAULT_TYPE; + } else { + return type; + } + } catch (NullPointerException e) { + logger.warn("Unknown data type found in XSD reader: {}. Returning VARCHAR.", dfdlType); + return DEFAULT_TYPE; + } + } +} diff --git a/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/schema/DrillDaffodilSchemaVisitor.java b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/schema/DrillDaffodilSchemaVisitor.java new file mode 100644 index 00000000000..c4ce3c03757 --- /dev/null +++ b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/schema/DrillDaffodilSchemaVisitor.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.store.daffodil.schema; + +import org.apache.daffodil.runtime1.api.ComplexElementMetadata; +import org.apache.daffodil.runtime1.api.MetadataHandler; +import org.apache.daffodil.runtime1.api.SimpleElementMetadata; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.record.metadata.MapBuilder; +import org.apache.drill.exec.record.metadata.SchemaBuilder; +import org.apache.drill.exec.record.metadata.TupleMetadata; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Stack; + +/** + * This class transforms a DFDL/Daffodil schema into a Drill Schema. + */ +public class DrillDaffodilSchemaVisitor + extends MetadataHandler +{ + private static final Logger logger = LoggerFactory.getLogger(DrillDaffodilSchemaVisitor.class); + private final SchemaBuilder builder = new SchemaBuilder(); + + private final Stack mapBuilderStack = new Stack(); + + private MapBuilder mapBuilder() { + return mapBuilderStack.peek(); + } + + /** + * Returns a {@link TupleMetadata} representation of the DFDL schema. + * Should only be called after the walk of the DFDL schema with this visitor has been called. + * @return A {@link TupleMetadata} representation of the DFDL schema. + */ + public TupleMetadata getDrillSchema() { + return builder.build(); + } + + @Override + public void simpleElementMetadata(SimpleElementMetadata md) { + String colName = md.namedQName().toQNameString().replace(":", "_"); + MinorType drillType = DrillDaffodilSchemaUtils.getDrillDataType(md.primType().toString()); + if (mapBuilderStack.isEmpty()) { + // special case if the root element is of simple type. + // This occurs in very small test cases. It is technically not allowed in XSD + // and a DFDL schema is an XSD; however, DFDL may loosen this restriction + if (md.isArray()) builder.addArray(colName, drillType); + else if (md.isOptional()) builder.addNullable(colName, drillType); + else builder.add(colName, drillType); + } else { + // the more common case is that a simple element appears within + // a map. + if (md.isArray()) mapBuilder().addArray(colName, drillType); + else if (md.isOptional()) mapBuilder().addNullable(colName, drillType); + else mapBuilder().add(colName, drillType); + } + } + + @Override + public void startComplexElementMetadata(ComplexElementMetadata md) { + String colName = md.namedQName().toQNameString().replace(":", "_"); + if (mapBuilderStack.isEmpty()) { + if (md.isArray()) mapBuilderStack.push(builder.addMapArray(colName)); + else mapBuilderStack.push(builder.addMap(colName)); // also handles optional complex elements + } else { + if (md.isArray()) mapBuilderStack.push(mapBuilder().addMapArray(colName)); + else mapBuilderStack.push(mapBuilder().addMap(colName)); // also handles optional complex elements + } + } + + @Override + public void endComplexElementMetadata(ComplexElementMetadata md) { + if (mapBuilderStack.size() == 1) { + mapBuilder().resumeSchema(); + } else { + mapBuilder().resumeMap(); + } + mapBuilderStack.pop(); + } + +} diff --git a/contrib/format-daffodil/src/main/resources/bootstrap-format-plugins.json b/contrib/format-daffodil/src/main/resources/bootstrap-format-plugins.json new file mode 100644 index 00000000000..966d9ba1b5b --- /dev/null +++ b/contrib/format-daffodil/src/main/resources/bootstrap-format-plugins.json @@ -0,0 +1,37 @@ +{ + "storage":{ + "dfs": { + "type": "file", + "formats": { + "daffodil": { + "type": "daffodil", + "extensions": [ + "dat" + ] + } + } + }, + "cp": { + "type": "file", + "formats": { + "daffodil": { + "type": "daffodil", + "extensions": [ + "dat" + ] + } + } + }, + "s3": { + "type": "file", + "formats": { + "daffodil": { + "type": "daffodil", + "extensions": [ + "dat" + ] + } + } + } + } +} diff --git a/contrib/format-daffodil/src/main/resources/drill-module.conf b/contrib/format-daffodil/src/main/resources/drill-module.conf new file mode 100644 index 00000000000..52a902572e3 --- /dev/null +++ b/contrib/format-daffodil/src/main/resources/drill-module.conf @@ -0,0 +1,25 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# This file tells Drill to consider this module when class path scanning. +# This file can also include any supplementary configuration information. +# This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information. + +drill.classpath.scanning: { + packages += "org.apache.drill.exec.store.daffodil" +} diff --git a/contrib/format-daffodil/src/test/java/org/apache/drill/exec/store/daffodil/TestDaffodilReader.java b/contrib/format-daffodil/src/test/java/org/apache/drill/exec/store/daffodil/TestDaffodilReader.java new file mode 100644 index 00000000000..1f68d3f8548 --- /dev/null +++ b/contrib/format-daffodil/src/test/java/org/apache/drill/exec/store/daffodil/TestDaffodilReader.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.store.daffodil; + +import org.apache.drill.categories.RowSetTest; +import org.apache.drill.common.types.TypeProtos.DataMode; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.physical.rowSet.RowSet; +import org.apache.drill.exec.record.metadata.SchemaBuilder; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.test.ClusterFixture; +import org.apache.drill.test.ClusterTest; +import org.apache.drill.test.QueryBuilder; +import org.apache.drill.test.rowSet.RowSetComparison; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.nio.file.Paths; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalTime; + +import static org.apache.drill.test.QueryTestUtil.generateCompressedFile; +import static org.apache.drill.test.rowSet.RowSetUtilities.mapArray; +import static org.apache.drill.test.rowSet.RowSetUtilities.objArray; +import static org.apache.drill.test.rowSet.RowSetUtilities.strArray; +import static org.junit.Assert.assertEquals; + +@Category(RowSetTest.class) +public class TestDaffodilReader extends ClusterTest { + + String schemaURIRoot = "file:///opt/drill/contrib/format-daffodil/src/test/resources/"; + String dataURIRoot = "file:///opt/drill/contrib/format-daffodil/src/test/resources/"; + @BeforeClass + public static void setup() throws Exception { + // boilerplate call to start test rig + ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher)); + + DaffodilFormatConfig formatConfig = + new DaffodilFormatConfig(null, + "", + "", + "", + false); + + cluster.defineFormat("dfs", "daffodil", formatConfig); + + // Needed to test against compressed files. + // Copies data from src/test/resources to the dfs root. + java.io.File newDataDir = dirTestWatcher.copyResourceToRoot(Paths.get("data/")); + java.io.File newSchemaDir = dirTestWatcher.copyResourceToRoot(Paths.get("schema/")); + } + + /** + * This unit test tests a simple data file + * + * @throws Exception Throw exception if anything goes wrong + */ + @Test + public void testSimple1() throws Exception { + + String sql = "SELECT * FROM table(dfs.`data/simple.dat.gz` " + + " (type => 'daffodil'," + + " validationMode => 'true', " + + " schemaURI => '" + schemaURIRoot + "schema/simple.dfdl.xsd'," + + " rootName => ''," + + " rootNamespace => '' " + + "))"; + + // String sql2 = "show files in dfs.`data`"; + QueryBuilder qb = client.queryBuilder(); + QueryBuilder query = qb.sql(sql); + RowSet results = query.rowSet(); + results.print(); + assertEquals(1, results.rowCount()); + + // create the expected metadata and data for this test + // metadata first + TupleMetadata expectedSchema = new SchemaBuilder() + .add("ex_r", MinorType.INT) + .buildSchema(); + + RowSet expected = client.rowSetBuilder(expectedSchema) + .addRow(0x00000101) // aka 257 + .build(); + + new RowSetComparison(expected).verifyAndClearAll(results); + } + +} diff --git a/contrib/format-daffodil/src/test/java/org/apache/drill/exec/store/daffodil/TestDaffodilUtils.java b/contrib/format-daffodil/src/test/java/org/apache/drill/exec/store/daffodil/TestDaffodilUtils.java new file mode 100644 index 00000000000..285234f057f --- /dev/null +++ b/contrib/format-daffodil/src/test/java/org/apache/drill/exec/store/daffodil/TestDaffodilUtils.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.store.daffodil; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class TestDaffodilUtils { + +// @Test +// public void testRemoveField() { +// String test1 = "field1_field2_field3"; +// assertEquals(DaffodilUtils.removeField(test1, "field3"), "field1_field2"); +// +// // Test with underscores +// String test2 = "field_1_field_2_field_3"; +// assertEquals(DaffodilUtils.removeField(test2, "field_3"), "field_1_field_2"); +// +// // Test with missing field +// String test3 = "field_1_field_2_field_3"; +// assertEquals(DaffodilUtils.removeField(test3, "field_4"), ""); +// +// // Test with empty string +// String test4 = ""; +// assertEquals(DaffodilUtils.removeField(test4, "field_4"), ""); +// } +} diff --git a/contrib/format-daffodil/src/test/java/org/apache/drill/exec/store/daffodil/xsd/TestDaffodilSchema.java b/contrib/format-daffodil/src/test/java/org/apache/drill/exec/store/daffodil/xsd/TestDaffodilSchema.java new file mode 100644 index 00000000000..9273f41fdcb --- /dev/null +++ b/contrib/format-daffodil/src/test/java/org/apache/drill/exec/store/daffodil/xsd/TestDaffodilSchema.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.store.daffodil.xsd; + +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.common.util.DrillFileUtils; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.metadata.SchemaBuilder; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaUtils; +import org.junit.Test; + +import java.io.File; +import java.net.URI; +import java.net.URL; +import java.time.format.DateTimeFormatter; +import java.util.Map; +import java.util.Objects; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestDaffodilSchema { + + @Test + public void testSimple() throws Exception { + URI schemaURI = getClass().getResource("/schema/simple.dfdl.xsd").toURI(); + TupleMetadata schema = DrillDaffodilSchemaUtils.processSchema(schemaURI); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("ex_r", MinorType.INT) + .buildSchema(); + assertTrue(expectedSchema.isEquivalent(schema)); + } + + @Test + public void testComplex1() throws Exception { + URI schemaURI = getClass().getResource("/schema/complex1.dfdl.xsd").toURI(); + TupleMetadata schema = DrillDaffodilSchemaUtils.processSchema(schemaURI); + + TupleMetadata expectedSchema = new SchemaBuilder() + .addMap("ex_r") + .add("a1", MinorType.INT) + .add("a2", MinorType.INT) + .resumeSchema() + .buildSchema(); + assertTrue(expectedSchema.isEquivalent(schema)); + } + + @Test + public void testComplexArray1() throws Exception { + URI schemaURI = getClass().getResource("/schema/complexArray1.dfdl.xsd").toURI(); + TupleMetadata schema = DrillDaffodilSchemaUtils.processSchema(schemaURI); + + TupleMetadata expectedSchema = new SchemaBuilder() + .addMap("ex_r") + .addMapArray("record") + .add("a1", MinorType.INT) + .add("a2", MinorType.INT) + .resumeMap() + .resumeSchema() + .buildSchema(); + assertTrue(expectedSchema.isEquivalent(schema)); + } + +} diff --git a/contrib/format-daffodil/src/test/resources/data/simple.dat b/contrib/format-daffodil/src/test/resources/data/simple.dat new file mode 100644 index 0000000000000000000000000000000000000000..dee9c4c8adafc2e6af2ee0379ef082b7b43cd95f GIT binary patch literal 4 LcmZQzU}OXU00#gA literal 0 HcmV?d00001 diff --git a/contrib/format-daffodil/src/test/resources/data/simple.dat.gz b/contrib/format-daffodil/src/test/resources/data/simple.dat.gz new file mode 100644 index 0000000000000000000000000000000000000000..5e4b3b37acf4947e8f87c4a6e483941bbec8cbd2 GIT binary patch literal 35 qcmb2|=HQqVVw1|iT%4I(kdvyHl32o!oRE;h!f^Utgg*-d0|NlNJ_;28 literal 0 HcmV?d00001 diff --git a/contrib/format-daffodil/src/test/resources/schema/complex1.dfdl.xsd b/contrib/format-daffodil/src/test/resources/schema/complex1.dfdl.xsd new file mode 100644 index 00000000000..2a82b96b10a --- /dev/null +++ b/contrib/format-daffodil/src/test/resources/schema/complex1.dfdl.xsd @@ -0,0 +1,46 @@ + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/contrib/format-daffodil/src/test/resources/schema/complexArray1.dfdl.xsd b/contrib/format-daffodil/src/test/resources/schema/complexArray1.dfdl.xsd new file mode 100644 index 00000000000..919544ea7e5 --- /dev/null +++ b/contrib/format-daffodil/src/test/resources/schema/complexArray1.dfdl.xsd @@ -0,0 +1,52 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/contrib/format-daffodil/src/test/resources/schema/simple.dfdl.xsd b/contrib/format-daffodil/src/test/resources/schema/simple.dfdl.xsd new file mode 100644 index 00000000000..ee617483337 --- /dev/null +++ b/contrib/format-daffodil/src/test/resources/schema/simple.dfdl.xsd @@ -0,0 +1,39 @@ + + + + + + + + + + + + + + + + + diff --git a/contrib/pom.xml b/contrib/pom.xml index b4ee5326b35..4b84892e2ee 100644 --- a/contrib/pom.xml +++ b/contrib/pom.xml @@ -59,6 +59,7 @@ format-pcapng format-iceberg format-deltalake + format-daffodil storage-drill storage-phoenix storage-googlesheets diff --git a/distribution/pom.xml b/distribution/pom.xml index dfb9d28484a..a66376cd1ca 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -479,6 +479,11 @@ drill-format-log ${project.version} + + org.apache.drill.contrib + drill-format-daffodil + ${project.version} + org.apache.drill.contrib drill-druid-storage diff --git a/distribution/src/assemble/component.xml b/distribution/src/assemble/component.xml index 7cf3066e51d..35f0f0beadf 100644 --- a/distribution/src/assemble/component.xml +++ b/distribution/src/assemble/component.xml @@ -57,6 +57,7 @@ org.apache.drill.contrib:drill-format-msaccess:jar org.apache.drill.contrib:drill-format-spss:jar org.apache.drill.contrib:drill-format-sas:jar + org.apache.drill.contrib:drill-format-daffodil:jar org.apache.drill.contrib:drill-jdbc-storage:jar org.apache.drill.contrib:drill-kudu-storage:jar org.apache.drill.contrib:drill-storage-phoenix:jar diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java index 0a112b59360..001378707d6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java @@ -78,7 +78,7 @@ * Some readers can determine the source schema at the start of a scan. * For example, a CSV file has headers, a Parquet file has footers, both * of which define a schema. This case is called "early schema." The - * reader fefines the schema by calling + * reader defines the schema by calling * {@link #tableSchema(TupleMetadata)} to provide the known schema. * *

Late Dynamic Schema

diff --git a/pom.xml b/pom.xml index 8baa6f7022d..86e1cb51f9c 100644 --- a/pom.xml +++ b/pom.xml @@ -134,6 +134,7 @@ 4.3.3 2.1.12 + 2.6.0-SNAPSHOT @@ -330,6 +331,11 @@ logback-core ${logback.version} + + org.apache.daffodil + daffodil-japi_2.12 + ${daffodil.version} +