diff --git a/contrib/format-daffodil/README.md b/contrib/format-daffodil/README.md new file mode 100644 index 00000000000..784e7a7dcf5 --- /dev/null +++ b/contrib/format-daffodil/README.md @@ -0,0 +1,27 @@ +# Daffodil 'Format' Reader +This plugin enables Drill to read DFDL-described data from files by way of the Apache Daffodil DFDL implementation. + +## Validation + +Data read by Daffodil is always validated using Daffodil's Limited Validation mode. + +TBD: do we need an option to control escalating validation errors to fatal? Currently this is not provided. + +## Limitations: TBD + +At the moment, the DFDL schema is found on the local file system, which won't support Drill's distributed architecture. + +There are restrictions on the DFDL schemas that this can handle. + +In particular, all element children must have distinct element names, including across choice branches. +(This rules out a number of large DFDL schemas.) + +TBD: Auto renaming as part of the Daffodil-to-Drill metadata mapping? + +The data is parsed fully from its native form into a Drill data structure held in memory. +No attempt is made to avoid access to parts of the DFDL-described data that are not needed to answer the query. + +If the data is not well-formed, an error occurs and the query fails. + +If the data is invalid, and validity checking by Daffodil is enabled, then an error occurs and the query fails. + diff --git a/contrib/format-daffodil/pom.xml b/contrib/format-daffodil/pom.xml new file mode 100644 index 00000000000..0cae44a5257 --- /dev/null +++ b/contrib/format-daffodil/pom.xml @@ -0,0 +1,94 @@ + + + + 4.0.0 + + + drill-contrib-parent + org.apache.drill.contrib + 1.22.0-SNAPSHOT + + + drill-format-daffodil + Drill : Contrib : Format : Daffodil + + + + org.apache.drill.exec + drill-java-exec + ${project.version} + + + org.apache.daffodil + daffodil-japi_2.12 + 3.8.0 + + + org.apache.daffodil + daffodil-runtime1_2.12 + 3.8.0 + + + + org.apache.drill.exec + drill-java-exec + tests + ${project.version} + test + + + + org.apache.drill + drill-common + tests + ${project.version} + test + + + + + + + maven-resources-plugin + + + copy-java-sources + process-sources + + copy-resources + + + ${basedir}/target/classes/org/apache/drill/exec/store/daffodil + + + + src/main/java/org/apache/drill/exec/store/daffodil + true + + + + + + + + + diff --git a/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilBatchReader.java b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilBatchReader.java new file mode 100644 index 00000000000..976122c7fd3 --- /dev/null +++ b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilBatchReader.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.store.daffodil; + +import org.apache.daffodil.japi.DataProcessor; +import org.apache.drill.common.AutoCloseables; +import org.apache.drill.common.exceptions.CustomErrorContext; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader; +import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip; +import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator; +import org.apache.drill.exec.physical.resultSet.RowSetLoader; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.store.daffodil.schema.DaffodilDataProcessorFactory; +import org.apache.drill.exec.store.dfs.DrillFileSystem; +import org.apache.drill.exec.store.dfs.easy.EasySubScan; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Objects; + +import static org.apache.drill.exec.store.daffodil.schema.DaffodilDataProcessorFactory.CompileFailure; +import static org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaUtils.daffodilDataProcessorToDrillSchema; + +public class DaffodilBatchReader implements ManagedReader { + + private static final Logger logger = LoggerFactory.getLogger(DaffodilBatchReader.class); + private final RowSetLoader rowSetLoader; + private final CustomErrorContext errorContext; + private final DaffodilMessageParser dafParser; + private final InputStream dataInputStream; + + public DaffodilBatchReader(DaffodilReaderConfig readerConfig, EasySubScan scan, + FileSchemaNegotiator negotiator) { + + errorContext = negotiator.parentErrorContext(); + DaffodilFormatConfig dafConfig = readerConfig.plugin.getConfig(); + + String schemaURIString = dafConfig.getSchemaURI(); // "schema/complexArray1.dfdl.xsd"; + String rootName = dafConfig.getRootName(); + String rootNamespace = dafConfig.getRootNamespace(); + boolean validationMode = dafConfig.getValidationMode(); + + URI dfdlSchemaURI; + try { + dfdlSchemaURI = new URI(schemaURIString); + } catch (URISyntaxException e) { + throw UserException.validationError(e).build(logger); + } + + FileDescrip file = negotiator.file(); + DrillFileSystem fs = file.fileSystem(); + URI fsSchemaURI = fs.getUri().resolve(dfdlSchemaURI); + + DaffodilDataProcessorFactory dpf = new DaffodilDataProcessorFactory(); + DataProcessor dp; + try { + dp = dpf.getDataProcessor(fsSchemaURI, validationMode, rootName, rootNamespace); + } catch (CompileFailure e) { + throw UserException.dataReadError(e) + .message(String.format("Failed to get Daffodil DFDL processor for: %s", fsSchemaURI)) + .addContext(errorContext).addContext(e.getMessage()).build(logger); + } + // Create the corresponding Drill schema. + // Note: this could be a very large schema. Think of a large complex RDBMS schema, + // all of it, hundreds of tables, but all part of the same metadata tree. + TupleMetadata drillSchema = daffodilDataProcessorToDrillSchema(dp); + // Inform Drill about the schema + negotiator.tableSchema(drillSchema, true); + + // + // DATA TIME: Next we construct the runtime objects, and open files. + // + // We get the DaffodilMessageParser, which is a stateful driver for daffodil that + // actually does the parsing. + rowSetLoader = negotiator.build().writer(); + + // We construct the Daffodil InfosetOutputter which the daffodil parser uses to + // convert infoset event calls to fill in a Drill row via a rowSetLoader. + DaffodilDrillInfosetOutputter outputter = new DaffodilDrillInfosetOutputter(rowSetLoader); + + // Now we can set up the dafParser with the outputter it will drive with + // the parser-produced infoset. + dafParser = new DaffodilMessageParser(dp); // needs further initialization after this. + dafParser.setInfosetOutputter(outputter); + + Path dataPath = file.split().getPath(); + // Lastly, we open the data stream + try { + dataInputStream = fs.openPossiblyCompressedStream(dataPath); + } catch (IOException e) { + throw UserException.dataReadError(e) + .message(String.format("Failed to open input file: %s", dataPath.toString())) + .addContext(errorContext).addContext(e.getMessage()).build(logger); + } + // And lastly,... tell daffodil the input data stream. + dafParser.setInputStream(dataInputStream); + } + + /** + * This is the core of actual processing - data movement from Daffodil to Drill. + *

+ * If there is space in the batch, and there is data available to parse then this calls the + * daffodil parser, which parses data, delivering it to the rowWriter by way of the infoset + * outputter. + *

+ * Repeats until the rowWriter is full (a batch is full), or there is no more data, or a parse + * error ends execution with a throw. + *

+ * Validation errors and other warnings are not errors and are logged but do not cause parsing to + * fail/throw. + * + * @return true if there are rows retrieved, false if no rows were retrieved, which means no more + * will ever be retrieved (end of data). + * @throws RuntimeException + * on parse errors. + */ + @Override + public boolean next() { + // Check assumed invariants + // We don't know if there is data or not. This could be called on an empty data file. + // We DO know that this won't be called if there is no space in the batch for even 1 + // row. + if (dafParser.isEOF()) { + return false; // return without even checking for more rows or trying to parse. + } + while (rowSetLoader.start() && !dafParser.isEOF()) { // we never zero-trip this loop. + // the predicate is always true once. + dafParser.parse(); + if (dafParser.isProcessingError()) { + assert (Objects.nonNull(dafParser.getDiagnostics())); + throw UserException.dataReadError().message(dafParser.getDiagnosticsAsString()) + .addContext(errorContext).build(logger); + } + if (dafParser.isValidationError()) { + logger.warn(dafParser.getDiagnosticsAsString()); + // Note that even if daffodil is set to not validate, validation errors may still occur + // from DFDL's "recoverableError" assertions. + } + rowSetLoader.save(); + } + int nRows = rowSetLoader.rowCount(); + assert nRows > 0; // This cannot be zero. If the parse failed we will have already thrown out + // of here. + return true; + } + + @Override + public void close() { + AutoCloseables.closeSilently(dataInputStream); + } +} + +class DaffodilReaderConfig { + final DaffodilFormatPlugin plugin; + + DaffodilReaderConfig(DaffodilFormatPlugin plugin) { + this.plugin = plugin; + } +} diff --git a/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilDrillInfosetOutputter.java b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilDrillInfosetOutputter.java new file mode 100644 index 00000000000..189d2e0c18b --- /dev/null +++ b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilDrillInfosetOutputter.java @@ -0,0 +1,393 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.daffodil; + +import com.ibm.icu.util.Calendar; +import com.ibm.icu.util.TimeZone; +import org.apache.daffodil.japi.infoset.InfosetOutputter; +import org.apache.daffodil.runtime1.api.ComplexElementMetadata; +import org.apache.daffodil.runtime1.api.ElementMetadata; +import org.apache.daffodil.runtime1.api.InfosetArray; +import org.apache.daffodil.runtime1.api.InfosetComplexElement; +import org.apache.daffodil.runtime1.api.InfosetSimpleElement; +import org.apache.daffodil.runtime1.api.DFDLPrimType; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.exec.physical.resultSet.RowSetLoader; +import org.apache.drill.exec.record.metadata.ColumnMetadata; +import org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaUtils; +import org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaVisitor; +import org.apache.drill.exec.vector.accessor.ArrayWriter; +import org.apache.drill.exec.vector.accessor.ObjectType; +import org.apache.drill.exec.vector.accessor.ScalarWriter; +import org.apache.drill.exec.vector.accessor.TupleWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.math.BigDecimal; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalTime; +import java.time.ZoneId; +import java.util.Stack; + +/** + * Adapts Daffodil parser infoset event calls to Drill writer calls to fill in Drill data rows. + */ +public class DaffodilDrillInfosetOutputter extends InfosetOutputter { + + private static final Logger logger = LoggerFactory.getLogger(DaffodilDrillInfosetOutputter.class); + /** + * Stack that is used only if we have sub-structures that are not simple-type fields of the row. + */ + private final Stack tupleWriterStack = new Stack<>(); + private final Stack arrayWriterStack = new Stack<>(); + /** + * True if the next startComplex call will be for the DFDL infoset root element whose children are + * the columns of the row set. + */ + private boolean isRootElement = true; + private RowSetLoader rowSetWriter; + + private DaffodilDrillInfosetOutputter() { + } // no default constructor + + public DaffodilDrillInfosetOutputter(RowSetLoader writer) { + this.rowSetWriter = writer; + this.tupleWriterStack.push(writer); + } + + private static void nyi() { + throw new IllegalStateException("not yet implemented."); + } + + private static void fatalError(String s) { + throw new IllegalStateException(s); + } + + private boolean isOriginalRoot() { + boolean result = currentTupleWriter() == rowSetWriter; + if (result) { + assert (tupleWriterStack.size() == 1); + } + return result; + } + + private TupleWriter currentTupleWriter() { + return tupleWriterStack.peek(); + } + + private ArrayWriter currentArrayWriter() { + return arrayWriterStack.peek(); + } + + @Override + public void reset() { + tupleWriterStack.clear(); + tupleWriterStack.push(rowSetWriter); + arrayWriterStack.clear(); + this.isRootElement = true; + checkCleanState(); + } + + private void checkCleanState() { + assert (isOriginalRoot()); + assert (arrayWriterStack.isEmpty()); + assert (isRootElement); + } + + @Override + public void startDocument() { + checkCleanState(); + } + + @Override + public void endDocument() { + checkCleanState(); + } + + private String colName(ElementMetadata md) { + return DrillDaffodilSchemaVisitor.makeColumnName(md); + } + + @Override + public void startSimple(InfosetSimpleElement ise) { + assert (!isRootElement); + ElementMetadata md = ise.metadata(); + String colName = colName(md); + ScalarWriter cw; + if (md.isArray()) { + // A simple type array + assert (!arrayWriterStack.isEmpty()); + cw = currentArrayWriter().scalar(); + } else { + // A simple element within a map + // Note the map itself might be an array, + // but we don't care about that here. + cw = currentTupleWriter().scalar(colName); + } + ColumnMetadata cm = cw.schema(); + assert (cm.isScalar()); + if (md.isNillable() && ise.isNilled()) { + assert cm.isNullable(); + cw.setNull(); + } else { + convertDaffodilValueToDrillValue(ise, cm, cw); + } + } + + @Override + public void endSimple(InfosetSimpleElement diSimple) { + assert (!isRootElement); + // do nothing + } + + @Override + public void startComplex(InfosetComplexElement ce) { + ComplexElementMetadata md = ce.metadata(); + String colName = colName(ce.metadata()); + if (isRootElement) { + assert (isOriginalRoot()); + // This complex element's corresponds to the root element of the + // DFDL schema. We don't treat this as a column of the row set. + // Rather, it's children are the columns of the row set. + // + // If we do nothing at all here, then we'll start getting + // event calls for the children. + isRootElement = false; + return; + } + if (md.isArray()) { + assert (!arrayWriterStack.isEmpty()); + tupleWriterStack.push(currentArrayWriter().tuple()); + } else { + tupleWriterStack.push(currentTupleWriter().tuple(colName)); + } + } + + @Override + public void endComplex(InfosetComplexElement ce) { + ComplexElementMetadata md = ce.metadata(); + if (isOriginalRoot()) { + isRootElement = true; + // do nothing else. The row gets closed-out in the DaffodilBatchReader.next() method. + } else { + // it's a map. + // We seem to not need to do anything to end the map. No action taken here works. + if (md.isArray()) { + assert (!arrayWriterStack.isEmpty()); + currentArrayWriter().save(); // required for map array entries. + } + tupleWriterStack.pop(); + } + } + + @Override + public void startArray(InfosetArray diArray) { + ElementMetadata md = diArray.metadata(); + assert (md.isArray()); + // DFDL has no notion of an array directly within another array. A named field (map) is + // necessary before you can have another array. + assert (currentTupleWriter().type() == ObjectType.TUPLE); // parent is a map, or the top + // level row. + String colName = colName(md); + TupleWriter enclosingParentTupleWriter = currentTupleWriter(); + ArrayWriter aw = enclosingParentTupleWriter.array(colName); + arrayWriterStack.push(aw); + } + + @Override + public void endArray(InfosetArray ia) { + ElementMetadata md = ia.metadata(); + assert (md.isArray()); + assert (!arrayWriterStack.empty()); + // FIXME: How do we end/close-out an array? + // note that each array instance, when the instance is a map, must have + // save called after it is written to the array but that happens + // in endComplex events since it must be called not once per array, but + // once per array item. + arrayWriterStack.pop(); + } + + private void invariantFailed(String dafTypeName, ColumnMetadata cm) { + String msg = String.format( + "Daffodil to Drill Conversion Invariant Failed: dafType %s, drill type %s.", dafTypeName, + cm.typeString()); + logger.error(msg); + fatalError(msg); + } + + private void convertDaffodilValueToDrillValue(InfosetSimpleElement ise, ColumnMetadata cm, + ScalarWriter cw) { + DFDLPrimType dafType = ise.metadata().dfdlType(); + String dafTypeName = dafType.name(); + TypeProtos.MinorType drillType = DrillDaffodilSchemaUtils.getDrillDataType(dafType); + assert (drillType == cm.type()); + switch (drillType) { + case BIGINT: { // BIGINT type is not a Java BigInteger, BIGINT is a signed 8-byte long in Drill. + switch (dafType) { + case UnsignedInt: { + cw.setLong(ise.getUnsignedInt()); + break; + } + case Long: { + cw.setLong(ise.getLong()); + break; + } + default: + invariantFailed(dafTypeName, cm); + } + break; + } + case INT: { + cw.setInt(ise.getInt()); + break; + } + case SMALLINT: { + cw.setInt(ise.getShort()); // there is no setShort + break; + } + case TINYINT: { + cw.setInt(ise.getByte()); // there is no setByte + break; + } + case UINT4: { + // daffodil represents unsigned int as long. + // drill represents unsigned int as int. + cw.setInt(ise.getUnsignedInt().intValue()); + break; + } + case UINT2: { + cw.setInt(ise.getUnsignedShort()); + break; + } + case UINT1: { + cw.setInt(ise.getUnsignedByte()); + break; + } + case VARDECIMAL: { + switch (dafType) { + case UnsignedLong: { + cw.setDecimal(new BigDecimal(ise.getUnsignedLong())); + break; + } + case Integer: { + cw.setDecimal(new BigDecimal(ise.getInteger())); + break; + } + case NonNegativeInteger: { + cw.setDecimal(new BigDecimal(ise.getNonNegativeInteger())); + break; + } + default: + invariantFailed(dafTypeName, cm); + } + break; + } + case BIT: { + cw.setBoolean(ise.getBoolean()); + break; + } + case FLOAT8: { + switch (dafType) { + case Double: { + cw.setDouble(ise.getDouble()); + break; + } + case Float: { + // converting a float to a double by doubleValue() fails here + // Float.MaxValue converted to a double via doubleValue() + // then placed in a FLOAT8 column displays as + // 3.4028234663852886E38 not 3.4028235E38. + // But converting to string first, then to double works properly. + cw.setDouble(Double.parseDouble(ise.getFloat().toString())); + break; + } + default: + invariantFailed(dafTypeName, cm); + } + break; + } + case FLOAT4: { + // we don't use float4, we always use float8. + invariantFailed(dafTypeName, cm); + // cw.setFloat(ise.getFloat()); + break; + } + case VARBINARY: { + byte[] hexBinary = ise.getHexBinary(); + cw.setBytes(hexBinary, hexBinary.length); + break; + } + case VARCHAR: { + switch (dafType) { + case Decimal: { + BigDecimal decimal = ise.getDecimal(); + cw.setString(decimal.toString()); + break; + } + case String: { + String s = ise.getString(); + cw.setString(s); + break; + } + default: + invariantFailed(dafTypeName, cm); + } + break; + } + case TIME: { + Calendar icuCal = ise.getTime(); + Instant instant = Instant.ofEpochMilli(icuCal.getTimeInMillis()); + TimeZone icuZone = icuCal.getTimeZone(); + String zoneString = icuZone.getID(); + ZoneId zoneId = ZoneId.of(zoneString); + LocalTime localTime = instant.atZone(zoneId).toLocalTime(); + cw.setTime(localTime); + break; + } + case DATE: { + Calendar icuCalendar = ise.getDate(); + // Extract year, month, and day from ICU Calendar + int year = icuCalendar.get(Calendar.YEAR); + // Note: ICU Calendar months are zero-based, similar to java.util.Calendar + int month = icuCalendar.get(Calendar.MONTH) + 1; + int day = icuCalendar.get(Calendar.DAY_OF_MONTH); + // Create a LocalDate + LocalDate localDate = LocalDate.of(year, month, day); + cw.setDate(localDate); + break; + } + case TIMESTAMP: { + Calendar icuCalendar = ise.getDateTime(); + // Get time in milliseconds from the epoch + long millis = icuCalendar.getTimeInMillis(); + // Create an Instant from milliseconds + Instant instant = Instant.ofEpochMilli(millis); + cw.setTimestamp(instant); + break; + } + default: + invariantFailed(dafTypeName, cm); + } + } + + private void DFDLParseError(String s) { + throw new RuntimeException(s); + } +} + diff --git a/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilFormatConfig.java b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilFormatConfig.java new file mode 100644 index 00000000000..1d197bbd8e7 --- /dev/null +++ b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilFormatConfig.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.store.daffodil; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.collect.ImmutableList; +import org.apache.drill.common.PlanStringBuilder; +import org.apache.drill.common.logical.FormatPluginConfig; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +@JsonTypeName(DaffodilFormatPlugin.DEFAULT_NAME) +@JsonInclude(JsonInclude.Include.NON_DEFAULT) +public class DaffodilFormatConfig implements FormatPluginConfig { + + public final List extensions; + public final String schemaURI; + public final boolean validationMode; + public final String rootName; + public final String rootNamespace; + + /** + * In the constructor for a format config, you should not use boxed versions of primitive types. + * It creates problems with defaulting them (they default to null) which cannot be unboxed. + */ + @JsonCreator + public DaffodilFormatConfig(@JsonProperty("extensions") List extensions, + @JsonProperty("schemaURI") String schemaURI, @JsonProperty("rootName") String rootName, + @JsonProperty("rootNamespace") String rootNamespace, + @JsonProperty("validationMode") boolean validationMode) { + + this.extensions = extensions == null + ? Collections.singletonList("dat") + : ImmutableList.copyOf(extensions); + this.rootName = rootName; + this.rootNamespace = rootNamespace; + this.schemaURI = schemaURI; + // no default. Users must pick. + this.validationMode = validationMode; + } + + @JsonInclude(JsonInclude.Include.NON_DEFAULT) + public List getExtensions() { + return extensions; + } + + public String getSchemaURI() { + return schemaURI; + } + + public String getRootName() { + return rootName; + } + + public String getRootNamespace() { + return rootNamespace; + } + + @JsonInclude(JsonInclude.Include.NON_DEFAULT) + public boolean getValidationMode() { + return validationMode; + } + + public DaffodilReaderConfig getReaderConfig(DaffodilFormatPlugin plugin) { + return new DaffodilReaderConfig(plugin); + } + + @Override + public int hashCode() { + return Objects.hash(schemaURI, validationMode, rootName, rootNamespace); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + DaffodilFormatConfig other = (DaffodilFormatConfig) obj; + return Objects.equals(schemaURI, other.schemaURI) && Objects.equals(rootName, + other.rootName) && Objects.equals(rootNamespace, other.rootNamespace) && Objects.equals( + validationMode, other.validationMode); + } + + @Override + public String toString() { + return new PlanStringBuilder(this).field("schemaURI", schemaURI).field("rootName", rootName) + .field("rootNamespace", rootNamespace).field("validationMode", validationMode).toString(); + } +} diff --git a/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilFormatPlugin.java b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilFormatPlugin.java new file mode 100644 index 00000000000..b6a74467286 --- /dev/null +++ b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilFormatPlugin.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.store.daffodil; + +import org.apache.drill.common.logical.StoragePluginConfig; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader; +import org.apache.drill.exec.physical.impl.scan.v3.file.FileReaderFactory; +import org.apache.drill.exec.physical.impl.scan.v3.file.FileScanLifecycleBuilder; +import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin; +import org.apache.drill.exec.store.dfs.easy.EasySubScan; +import org.apache.hadoop.conf.Configuration; + +public class DaffodilFormatPlugin extends EasyFormatPlugin { + + public static final String DEFAULT_NAME = "daffodil"; + public static final String OPERATOR_TYPE = "DAFFODIL_SUB_SCAN"; + + public DaffodilFormatPlugin(String name, DrillbitContext context, Configuration fsConf, + StoragePluginConfig storageConfig, DaffodilFormatConfig formatConfig) { + super(name, easyConfig(fsConf, formatConfig), context, storageConfig, formatConfig); + } + + private static EasyFormatConfig easyConfig(Configuration fsConf, + DaffodilFormatConfig pluginConfig) { + return EasyFormatConfig.builder().readable(true).writable(false).blockSplittable(false) + .compressible(true).extensions(pluginConfig.getExtensions()).fsConf(fsConf) + .readerOperatorType(OPERATOR_TYPE).scanVersion(ScanFrameworkVersion.EVF_V2) + .supportsLimitPushdown(true).supportsProjectPushdown(true) + .defaultName(DaffodilFormatPlugin.DEFAULT_NAME).build(); + } + + @Override + protected void configureScan(FileScanLifecycleBuilder builder, EasySubScan scan) { + builder.nullType(Types.optional(MinorType.VARCHAR)); + builder.readerFactory(new DaffodilReaderFactory(formatConfig.getReaderConfig(this), scan)); + } + + public static class DaffodilReaderFactory extends FileReaderFactory { + private final DaffodilReaderConfig readerConfig; + + private final EasySubScan scan; + + public DaffodilReaderFactory(DaffodilReaderConfig config, + EasySubScan scan) { + this.readerConfig = config; + this.scan = scan; + } + + @Override + public ManagedReader newReader(FileSchemaNegotiator negotiator) { + return new DaffodilBatchReader(readerConfig, scan, negotiator); + } + } +} diff --git a/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilMessageParser.java b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilMessageParser.java new file mode 100644 index 00000000000..97f06d7d35e --- /dev/null +++ b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilMessageParser.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.daffodil; + +import org.apache.daffodil.japi.DataProcessor; +import org.apache.daffodil.japi.Diagnostic; +import org.apache.daffodil.japi.ParseResult; +import org.apache.daffodil.japi.infoset.InfosetOutputter; +import org.apache.daffodil.japi.io.InputSourceDataInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.InputStream; +import java.util.List; +import java.util.stream.Collectors; + +/** + * DFDL Daffodil Streaming message parser + *

+ * You construct this providing a DataProcessor obtained from the DaffodilDataProcessorFactory. The + * DataProcessor contains the compiled DFDL schema, ready to use, as well as whether validation + * while parsing has been requested. + *

+ * The DataProcessor object may be shared/reused by multiple threads each of which has its own copy + * of this class. This object is, however, stateful, and must not be shared by multiple threads. + *

+ * You must call setInputStream, and setInfosetOutputter before you call parse(). The input stream + * and the InfosetOutputter objects are also private to one thread and are stateful and owned by + * this object. Once you have called setInputStream, you should view the input stream as the private + * property of this object. The parse() will invoke the InfosetOutputter's methods to deliver parsed + * data, and it may optionally create diagnostics (obtained via getDiagnostics) indicating which + * kind they are via the getIsProcessingError, getIsValidationError. + *

+ * Note that the InfosetOutputter may be called many times before a processing error is detected, as + * Daffodil delivers result data incrementally. + *

+ * Validation errors do not affect the InfosetOutputter output calls, but indicate that data was + * detected that is invalid. + *

+ * When parse() returns, the parse has ended and one can check for errors/diagnostics. One can call + * parse() again if there is still data to consume, which is checked via the isEOF() method. + *

+ * There are no guarantees about where the input stream is positioned between parse() calls. In + * particular, it may not be positioned at the start of the next message, as Daffodil may have + * pre-fetched additional bytes from the input stream which it found are not part of the current + * infoset, but the next one. The positioning of the input stream may in fact be somewhere in the + * middle of a byte, as Daffodil does not require messages to be of lengths that are in whole byte + * units. Hence, once you give the input stream to this object via setInputStream, that input stream + * is owned privately by this class for ever after. + */ +public class DaffodilMessageParser { + + private static final Logger logger = LoggerFactory.getLogger(DaffodilMessageParser.class); + private List diagnostics; // diagnostics. + private boolean isProcessingError; + private boolean isValidationError; + private InputSourceDataInputStream dis; + private InfosetOutputter outputter; + private DataProcessor dp; + + /** + * Constructs the parser using a DataProcessor obtained from a DaffodilDataProcessorFactory. + * + * @param dp + */ + DaffodilMessageParser(DataProcessor dp) { + this.dp = dp; + } + + /** + * Provide the input stream from which data is to be parsed. + *

+ * This input stream is then owned by this object and becomes part of its state. + *

+ * It is; however, the responsibility of the caller to close this input stream after the + * completion of all parse calls. In particular, if a parse error is considered fatal, then the + * caller should close the input stream. There are advanced error-recovery techniques that may + * attempt to find data that can be parsed later in the data stream. In those cases the input + * stream would not be closed after a processing error, but such usage is beyond the scope of this + * javadoc. + * + * @param inputStream + */ + public void setInputStream(InputStream inputStream) { + dis = new InputSourceDataInputStream(inputStream); + } + + /** + * Provides the InfosetOutputter which will be called to deliver the Infoset via calls to its + * methods. + * + * @param outputter + */ + public void setInfosetOutputter(InfosetOutputter outputter) { + this.outputter = outputter; + } + + /** + * Called to pull messages from the data stream. The message 'Infoset' is delivered by way of + * calls to the InfosetOutputter's methods. + *

+ * After calling this, one may call getIsProcessingError, getIsValiationError, isEOF, and + * getDiagnostics. + */ + public void parse() { + if (dis == null) { + throw new IllegalStateException("Input stream must be provided by setInputStream() call."); + } + if (outputter == null) { + throw new IllegalStateException( + "InfosetOutputter must be provided by setInfosetOutputter() call."); + } + + reset(); + ParseResult res = dp.parse(dis, outputter); + isProcessingError = res.isProcessingError(); + isValidationError = res.isValidationError(); + diagnostics = res.getDiagnostics(); + } + + /** + * True if the input stream is known to contain no more data. If the input stream is a true + * stream, not a file, then temporary unavailability of data may cause this call to block until + * the stream is closed from the other end, or data becomes available. + *

+ * False if the input stream is at EOF, and no more data can be obtained. It is an error to call + * parse() after isEOF has returned true. + * + * @return + */ + public boolean isEOF() { + return !dis.hasData(); + } + + /** + * True if the parse() call failed with a processing error. This indicates that the data was not + * well-formed and could not be parsed successfully. + *

+ * It is possible for isProcessingError and isValidationError to both be true. + * + * @return + */ + public boolean isProcessingError() { + return isProcessingError; + } + + /** + * True if a validation error occurred during parsing. Subsequently to a validation error + * occurring, parsing may succeed or fail. after the validation error was detected. + * + * @return + */ + public boolean isValidationError() { + return isValidationError; + } + + /** + * After a parse() call this returns null or a list of 1 or more diagnostics. + *

+ * If isProcessingError or isValidationError are true, then this will contain at least 1 + * diagnostic. If both are true this will contain at least 2 diagnostics. + * + * @return + */ + public List getDiagnostics() { + return diagnostics; + } + + public String getDiagnosticsAsString() { + String result = diagnostics.stream().map(Diagnostic::getMessage) + .collect(Collectors.joining("\n")); + return result; + } + + private void reset() { + outputter.reset(); + isProcessingError = false; + isValidationError = false; + diagnostics = null; + } +} diff --git a/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/schema/DaffodilDataProcessorFactory.java b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/schema/DaffodilDataProcessorFactory.java new file mode 100644 index 00000000000..e338f266b7e --- /dev/null +++ b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/schema/DaffodilDataProcessorFactory.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.daffodil.schema; + +import org.apache.daffodil.japi.Compiler; +import org.apache.daffodil.japi.Daffodil; +import org.apache.daffodil.japi.DataProcessor; +import org.apache.daffodil.japi.Diagnostic; +import org.apache.daffodil.japi.InvalidParserException; +import org.apache.daffodil.japi.InvalidUsageException; +import org.apache.daffodil.japi.ProcessorFactory; +import org.apache.daffodil.japi.ValidationMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.channels.Channels; +import java.util.List; +import java.util.Objects; + +/** + * Compiles a DFDL schema (mostly for tests) or loads a pre-compiled DFDL schema so that one can + * obtain a DataProcessor for use with DaffodilMessageParser. + *

+ * TODO: Needs to use a cache to avoid reloading/recompiling every time. + */ +public class DaffodilDataProcessorFactory { + // Default constructor is used. + + private static final Logger logger = LoggerFactory.getLogger(DaffodilDataProcessorFactory.class); + + private DataProcessor dp; + + /** + * Gets a Daffodil DataProcessor given the necessary arguments to compile or reload it. + * + * @param schemaFileURI + * pre-compiled dfdl schema (.bin extension) or DFDL schema source (.xsd extension) + * @param validationMode + * Use true to request Daffodil built-in 'limited' validation. Use false for no validation. + * @param rootName + * Local name of root element of the message. Can be null to use the first element declaration + * of the primary schema file. Ignored if reloading a pre-compiled schema. + * @param rootNS + * Namespace URI as a string. Can be null to use the target namespace of the primary schema + * file or if it is unambiguous what element is the rootName. Ignored if reloading a + * pre-compiled schema. + * @return the DataProcessor + * @throws CompileFailure + * - if schema compilation fails + */ + public DataProcessor getDataProcessor(URI schemaFileURI, boolean validationMode, String rootName, + String rootNS) + throws CompileFailure { + + DaffodilDataProcessorFactory dmp = new DaffodilDataProcessorFactory(); + boolean isPrecompiled = schemaFileURI.toString().endsWith(".bin"); + if (isPrecompiled) { + if (Objects.nonNull(rootName) && !rootName.isEmpty()) { + // A usage error. You shouldn't supply the name and optionally namespace if loading + // precompiled schema because those are built into it. Should be null or "". + logger.warn("Root element name '{}' is ignored when used with precompiled DFDL schema.", + rootName); + } + try { + dmp.loadSchema(schemaFileURI); + } catch (IOException | InvalidParserException e) { + throw new CompileFailure(e); + } + dmp.setupDP(validationMode, null); + } else { + List pfDiags; + try { + pfDiags = dmp.compileSchema(schemaFileURI, rootName, rootNS); + } catch (URISyntaxException | IOException e) { + throw new CompileFailure(e); + } + dmp.setupDP(validationMode, pfDiags); + } + return dmp.dp; + } + + private void loadSchema(URI schemaFileURI) throws IOException, InvalidParserException { + Compiler c = Daffodil.compiler(); + dp = c.reload(Channels.newChannel(schemaFileURI.toURL().openStream())); + } + + private List compileSchema(URI schemaFileURI, String rootName, String rootNS) + throws URISyntaxException, IOException, CompileFailure { + Compiler c = Daffodil.compiler(); + ProcessorFactory pf = c.compileSource(schemaFileURI, rootName, rootNS); + List pfDiags = pf.getDiagnostics(); + if (pf.isError()) { + pfDiags.forEach(diag -> logger.error(diag.getSomeMessage())); + throw new CompileFailure(pfDiags); + } + dp = pf.onPath("/"); + return pfDiags; // must be just warnings. If it was errors we would have thrown. + } + + /** + * Common setup steps used whether or not we reloaded or compiled a DFDL schema. + */ + private void setupDP(boolean validationMode, List pfDiags) throws CompileFailure { + Objects.requireNonNull(dp); // true because failure to produce a dp throws CompileFailure. + if (validationMode) { + try { + // We don't have the DFDL schema text, and we're not creating XML as an intermediate form, + // we're taking data direct from Daffodil into Drill rows, so using any Xerces-based + // XML Validator is not possible. + dp = dp.withValidationMode(ValidationMode.Limited); + } catch (InvalidUsageException e) { + // impossible + throw new Error(e); + } + } + List dpDiags = dp.getDiagnostics(); + if (dp.isError()) { + throw new CompileFailure(dpDiags); + } + // well this part is only if we compiled, and provided the pfDiags arg as non-null. + List compilationWarnings; + if (pfDiags != null && !pfDiags.isEmpty()) { + compilationWarnings = pfDiags; + compilationWarnings.addAll(dpDiags); // dpDiags might be empty. That's ok. + } else { + compilationWarnings = dpDiags; // dpDiags might be empty. That's ok. + } + } + + /** + * Thrown if schema compilation fails. + *

+ * Contains diagnostic objects which give the cause(s) of the failure, or + * contains a cause Throwable providing the reason. + */ + public static class CompileFailure extends Exception { + List diags; + + CompileFailure(List diagnostics) { + super("DFDL Schema Compile Failure"); + diags = diagnostics; + } + CompileFailure(Throwable cause) { + super(cause); + } + } +} diff --git a/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/schema/DrillDaffodilSchemaUtils.java b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/schema/DrillDaffodilSchemaUtils.java new file mode 100644 index 00000000000..5481f1da7ad --- /dev/null +++ b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/schema/DrillDaffodilSchemaUtils.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.store.daffodil.schema; + +import org.apache.daffodil.japi.InvalidParserException; +import org.apache.daffodil.japi.DataProcessor; +import org.apache.daffodil.runtime1.api.DFDLPrimType; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; + + +public class DrillDaffodilSchemaUtils { + private static final MinorType DEFAULT_TYPE = MinorType.VARCHAR; + private static final Logger logger = LoggerFactory.getLogger(DrillDaffodilSchemaUtils.class); + + /** + * This map maps the data types defined by the DFDL definition to Drill data types. + */ + public static final ImmutableMap DFDL_TYPE_MAPPINGS = + ImmutableMap.builder() + .put(DFDLPrimType.Long, MinorType.BIGINT) + .put(DFDLPrimType.Int, MinorType.INT) + .put(DFDLPrimType.Short, MinorType.SMALLINT) + .put(DFDLPrimType.Byte, MinorType.TINYINT) + // daffodil unsigned longs are modeled as DECIMAL(38, 0) which is the default for VARDECIMAL + .put(DFDLPrimType.UnsignedLong, MinorType.VARDECIMAL) + .put(DFDLPrimType.UnsignedInt, MinorType.BIGINT) + .put(DFDLPrimType.UnsignedShort, MinorType.UINT2) + .put(DFDLPrimType.UnsignedByte, MinorType.UINT1) + // daffodil integer, nonNegativeInteger, are modeled as DECIMAL(38, 0) which is the default for VARDECIMAL + .put(DFDLPrimType.Integer, MinorType.VARDECIMAL) + .put(DFDLPrimType.NonNegativeInteger, MinorType.VARDECIMAL) + // decimal has to be modeled as string since we really have no idea what to set the + // scale to. + .put(DFDLPrimType.Decimal, MinorType.VARCHAR) + .put(DFDLPrimType.Boolean, MinorType.BIT) + .put(DFDLPrimType.Date, MinorType.DATE) // requires conversion + .put(DFDLPrimType.DateTime, MinorType.TIMESTAMP) // requires conversion + .put(DFDLPrimType.Double, MinorType.FLOAT8) + // + // daffodil float type is mapped to double aka Float8 in drill because there + // seems to be bugs in FLOAT4. Float.MaxValue in a Float4 column displays as + // 3.4028234663852886E38 not 3.4028235E38. + // + // We don't really care about single float precision, so we just use double precision. + // + .put(DFDLPrimType.Float, MinorType.FLOAT8) + .put(DFDLPrimType.HexBinary, MinorType.VARBINARY) + .put(DFDLPrimType.String, MinorType.VARCHAR) + .put(DFDLPrimType.Time, MinorType.TIME) // requires conversion + .build(); + + + @VisibleForTesting + public static TupleMetadata processSchema(URI dfdlSchemaURI, String rootName, String namespace) + throws IOException, DaffodilDataProcessorFactory.CompileFailure, + URISyntaxException, InvalidParserException { + DaffodilDataProcessorFactory dpf = new DaffodilDataProcessorFactory(); + boolean validationMode = true; // use Daffodil's limited validation always + DataProcessor dp = dpf.getDataProcessor(dfdlSchemaURI, validationMode, rootName, namespace); + return daffodilDataProcessorToDrillSchema(dp); + } + + public static TupleMetadata daffodilDataProcessorToDrillSchema(DataProcessor dp) { + DrillDaffodilSchemaVisitor schemaVisitor = new DrillDaffodilSchemaVisitor(); + dp.walkMetadata(schemaVisitor); + TupleMetadata drillSchema = schemaVisitor.getDrillSchema(); + return drillSchema; + } + + /** + * Returns a {@link MinorType} of the corresponding DFDL Data Type. Defaults to VARCHAR if unknown + * @param dfdlType The type as provided by Daffodil. + * @return A {@link MinorType} of the Drill data type. + */ + public static MinorType getDrillDataType(DFDLPrimType dfdlType) { + try { + MinorType type = DrillDaffodilSchemaUtils.DFDL_TYPE_MAPPINGS.get(dfdlType); + if (type == null) { + return DEFAULT_TYPE; + } else { + return type; + } + } catch (NullPointerException e) { + logger.warn("Unknown data type found in XSD reader: {}. Returning VARCHAR.", dfdlType); + return DEFAULT_TYPE; + } + } +} diff --git a/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/schema/DrillDaffodilSchemaVisitor.java b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/schema/DrillDaffodilSchemaVisitor.java new file mode 100644 index 00000000000..0bd8992eee7 --- /dev/null +++ b/contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/schema/DrillDaffodilSchemaVisitor.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.store.daffodil.schema; + +import org.apache.daffodil.runtime1.api.ChoiceMetadata; +import org.apache.daffodil.runtime1.api.ComplexElementMetadata; +import org.apache.daffodil.runtime1.api.ElementMetadata; +import org.apache.daffodil.runtime1.api.MetadataHandler; +import org.apache.daffodil.runtime1.api.SequenceMetadata; +import org.apache.daffodil.runtime1.api.SimpleElementMetadata; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.record.metadata.MapBuilderLike; +import org.apache.drill.exec.record.metadata.SchemaBuilder; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Stack; + +/** + * This class transforms a DFDL/Daffodil schema into a Drill Schema. + */ +public class DrillDaffodilSchemaVisitor extends MetadataHandler { + + private static final Logger logger = LoggerFactory.getLogger(DrillDaffodilSchemaVisitor.class); + + /** + * SchemaBuilder and MapBuilder share a polymorphic interface MapBuilderLike + */ + private final SchemaBuilder builder = new SchemaBuilder(); + private final Stack mapBuilderStack = new Stack<>(); + + private MapBuilderLike mapBuilder() { + return mapBuilderStack.peek(); + } + + /** + * Converts Daffodil names into appropriate Drill column names. + * @param md Daffodil element metadata, which contains an element name. + * @return a string usable as a Drill column name + */ + public static String makeColumnName(ElementMetadata md) { + return md.toQName().replace(":", "_"); + } + + /** + * Returns a {@link TupleMetadata} representation of the DFDL schema. Should only be called after + * the walk of the DFDL schema with this visitor has been called. + * + * @return A {@link TupleMetadata} representation of the DFDL schema. + */ + public TupleMetadata getDrillSchema() { + return builder.build(); + } + + @Override + public void simpleElementMetadata(SimpleElementMetadata md) { + assert (!mapBuilderStack.isEmpty()); + String colName = makeColumnName(md); + MinorType drillType = DrillDaffodilSchemaUtils.getDrillDataType(md.dfdlType()); + if (md.isArray()) { + mapBuilder().addArray(colName, drillType); + } else if (md.isOptional() || md.isNillable()) { + mapBuilder().addNullable(colName, drillType); + } else { + mapBuilder().add(colName, drillType); + } + } + + @Override + public void startComplexElementMetadata(ComplexElementMetadata md) { + if (mapBuilderStack.isEmpty()) { + // root element case. The SchemaBuilder top level row is the container of the root element's children + mapBuilderStack.push(builder); + } else { + // enclosed complex element case. Create a map field. + String colName = makeColumnName(md); + if (md.isArray()) { + mapBuilderStack.push(mapBuilder().addMapArray(colName)); + } else { + mapBuilderStack.push(mapBuilder().addMap(colName)); // also handles optional complex elements + } + } + } + + @Override + public void endComplexElementMetadata(ComplexElementMetadata md) { + assert (!mapBuilderStack.isEmpty()); + mapBuilder().resume(); + mapBuilderStack.pop(); + } + + @Override + public void startSequenceMetadata(SequenceMetadata m) { + } + + @Override + public void endSequenceMetadata(SequenceMetadata m) { + } + + @Override + public void startChoiceMetadata(ChoiceMetadata m) { + } + + @Override + public void endChoiceMetadata(ChoiceMetadata m) { + } + + private void DFDLSchemaError(String s) { + throw new RuntimeException(s); + } +} diff --git a/contrib/format-daffodil/src/main/resources/bootstrap-format-plugins.json b/contrib/format-daffodil/src/main/resources/bootstrap-format-plugins.json new file mode 100644 index 00000000000..966d9ba1b5b --- /dev/null +++ b/contrib/format-daffodil/src/main/resources/bootstrap-format-plugins.json @@ -0,0 +1,37 @@ +{ + "storage":{ + "dfs": { + "type": "file", + "formats": { + "daffodil": { + "type": "daffodil", + "extensions": [ + "dat" + ] + } + } + }, + "cp": { + "type": "file", + "formats": { + "daffodil": { + "type": "daffodil", + "extensions": [ + "dat" + ] + } + } + }, + "s3": { + "type": "file", + "formats": { + "daffodil": { + "type": "daffodil", + "extensions": [ + "dat" + ] + } + } + } + } +} diff --git a/contrib/format-daffodil/src/main/resources/drill-module.conf b/contrib/format-daffodil/src/main/resources/drill-module.conf new file mode 100644 index 00000000000..52a902572e3 --- /dev/null +++ b/contrib/format-daffodil/src/main/resources/drill-module.conf @@ -0,0 +1,25 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# This file tells Drill to consider this module when class path scanning. +# This file can also include any supplementary configuration information. +# This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information. + +drill.classpath.scanning: { + packages += "org.apache.drill.exec.store.daffodil" +} diff --git a/contrib/format-daffodil/src/test/java/org/apache/drill/exec/store/daffodil/TestDaffodilReader.java b/contrib/format-daffodil/src/test/java/org/apache/drill/exec/store/daffodil/TestDaffodilReader.java new file mode 100644 index 00000000000..d5aa3a23058 --- /dev/null +++ b/contrib/format-daffodil/src/test/java/org/apache/drill/exec/store/daffodil/TestDaffodilReader.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.store.daffodil; + +import org.apache.drill.categories.RowSetTest; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.physical.rowSet.RowSet; +import org.apache.drill.exec.physical.rowSet.RowSetReader; +import org.apache.drill.exec.record.metadata.SchemaBuilder; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.test.ClusterFixture; +import org.apache.drill.test.ClusterTest; +import org.apache.drill.test.QueryBuilder; +import org.apache.drill.test.rowSet.RowSetComparison; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.nio.file.Paths; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +@Category(RowSetTest.class) +public class TestDaffodilReader extends ClusterTest { + + String schemaURIRoot = "file:///opt/drill/contrib/format-daffodil/src/test/resources/"; + + @BeforeClass + public static void setup() throws Exception { + // boilerplate call to start test rig + ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher)); + + DaffodilFormatConfig formatConfig = new DaffodilFormatConfig(null, "", "", "", false); + + cluster.defineFormat("dfs", "daffodil", formatConfig); + + // Needed to test against compressed files. + // Copies data from src/test/resources to the dfs root. + dirTestWatcher.copyResourceToRoot(Paths.get("data/")); + dirTestWatcher.copyResourceToRoot(Paths.get("schema/")); + } + + private String selectRow(String schema, String file) { + return "SELECT * FROM table(dfs.`data/" + file + "` " + " (type => 'daffodil'," + " " + + "validationMode => 'true', " + " schemaURI => '" + schemaURIRoot + "schema/" + schema + + ".dfdl.xsd'," + " rootName => 'row'," + " rootNamespace => null " + "))"; + } + + /** + * This unit test tests a simple data file + * + * @throws Exception + * Throw exception if anything goes wrong + */ + @Test + public void testSimpleQuery1() throws Exception { + + QueryBuilder qb = client.queryBuilder(); + QueryBuilder query = qb.sql(selectRow("simple", "data01Int.dat.gz")); + RowSet results = query.rowSet(); + assertEquals(1, results.rowCount()); + + // create the expected metadata and data for this test + // metadata first + TupleMetadata expectedSchema = new SchemaBuilder().add("col", MinorType.INT).buildSchema(); + + RowSet expected = client.rowSetBuilder(expectedSchema).addRow(0x00000101) // aka 257 + .build(); + + new RowSetComparison(expected).verifyAndClearAll(results); + } + + @Test + public void testSimpleQuery2() throws Exception { + + QueryBuilder qb = client.queryBuilder(); + QueryBuilder query = qb.sql(selectRow("simple", "data06Int.dat")); + RowSet results = query.rowSet(); + assertEquals(6, results.rowCount()); + + // create the expected metadata and data for this test + // metadata first + TupleMetadata expectedSchema = new SchemaBuilder().add("col", MinorType.INT).buildSchema(); + + RowSet expected = client.rowSetBuilder(expectedSchema).addRow(0x00000101).addRow(0x00000102) + .addRow(0x00000103).addRow(0x00000104).addRow(0x00000105).addRow(0x00000106).build(); + + new RowSetComparison(expected).verifyAndClearAll(results); + } + + @Test + public void testComplexQuery1() throws Exception { + + QueryBuilder qb = client.queryBuilder(); + QueryBuilder query = qb.sql(selectRow("complex1", "data02Int.dat")); + RowSet results = query.rowSet(); + assertEquals(1, results.rowCount()); + + RowSetReader rdr = results.reader(); + rdr.next(); + String col = rdr.getAsString(); + assertEquals("{257, 258}", col); + assertFalse(rdr.next()); + results.clear(); + } + + @Test + public void testComplexQuery2() throws Exception { + + QueryBuilder qb = client.queryBuilder(); + QueryBuilder query = qb.sql(selectRow("complex1", "data06Int.dat")); + RowSet results = query.rowSet(); + assertEquals(3, results.rowCount()); + + RowSetReader rdr = results.reader(); + rdr.next(); + String map = rdr.getAsString(); + assertEquals("{257, 258}", map); + rdr.next(); + map = rdr.getAsString(); + assertEquals("{259, 260}", map); + rdr.next(); + map = rdr.getAsString(); + assertEquals("{261, 262}", map); + assertFalse(rdr.next()); + results.clear(); + } + + /** + * Tests data which is rows of two ints and an array containing a map containing two ints. Each + * row can be visualized like this: "{257, 258, [{259, 260},...]}" + */ + @Test + public void testComplexArrayQuery1() throws Exception { + + QueryBuilder qb = client.queryBuilder(); + QueryBuilder query = qb.sql(selectRow("complexArray1", "data12Int.dat")); + RowSet results = query.rowSet(); + assertEquals(1, results.rowCount()); + + RowSetReader rdr = results.reader(); + rdr.next(); + String map = rdr.getAsString(); + assertEquals("{257, 258, [{259, 260}, {261, 262}, {257, 258}, {259, 260}, {261, 262}]}", map); + assertFalse(rdr.next()); + results.clear(); + } + + /** + * Tests data which is an array of ints in one column of the row set + */ + @Test + public void testSimpleArrayQuery1() throws Exception { + + QueryBuilder qb = client.queryBuilder(); + QueryBuilder query = qb.sql(selectRow("simpleArrayField1", "data12Int.dat")); + RowSet results = query.rowSet(); + assertEquals(1, results.rowCount()); + + RowSetReader rdr = results.reader(); + rdr.next(); + String map = rdr.getAsString(); + assertEquals("{[257, 258, 259, 260, 261, 262, 257, 258, 259, 260, 261, 262]}", map); + assertFalse(rdr.next()); + results.clear(); + } + + /** + * Tests data which is rows of two ints and an array containing a map containing an int and a + * vector of ints. Each row can be visualized like this: "{257, 258, [{259, [260, 261, + * 262]},...]}" + */ + @Test + public void testComplexArrayQuery2() throws Exception { + + QueryBuilder qb = client.queryBuilder(); + QueryBuilder query = qb.sql(selectRow("complexArray2", "data12Int.dat")); + RowSet results = query.rowSet(); + assertEquals(1, results.rowCount()); + + RowSetReader rdr = results.reader(); + rdr.next(); + String map = rdr.getAsString(); + assertEquals("{257, 258, [{259, [260, 261, 262]}, {257, [258, 259, 260]}, {261, [262]}]}", map); + assertFalse(rdr.next()); + results.clear(); + } + + @Test + public void testMoreTypes1() throws Exception { + + QueryBuilder qb = client.queryBuilder(); + QueryBuilder query = qb.sql(selectRow("moreTypes1", "moreTypes1.txt.dat")); + RowSet results = query.rowSet(); + assertEquals(2, results.rowCount()); + + RowSetReader rdr = results.reader(); + rdr.next(); + String map = rdr.getAsString(); + assertEquals( + "{2147483647, 9223372036854775807, 32767, 127, true, " + "1.7976931348623157E308, 3" + + ".4028235E38, [31, 32, 33, 34, 35, 36, 37, 38], \"daffodil\"}", + map); + rdr.next(); + map = rdr.getAsString(); + assertEquals( + "{-2147483648, -9223372036854775808, -32768, -128, false, " + "-1.7976931348623157E308, " + + "-3.4028235E38, [38, 37, 36, 35, 34, 33, 32, 31], \"drill\"}", + map); + assertFalse(rdr.next()); + results.clear(); + } + + @Test + public void testMoreTypes2() throws Exception { + + QueryBuilder qb = client.queryBuilder(); + QueryBuilder query = qb.sql(selectRow("moreTypes2", "moreTypes2.txt.dat")); + RowSet results = query.rowSet(); + assertEquals(1, results.rowCount()); + + RowSetReader rdr = results.reader(); + rdr.next(); + String map = rdr.getAsString(); + assertEquals( + "{4294967295, 18446744073709551615, 65535, 255, " + "-18446744073709551616, " + + "18446744073709551616, " + "\"0.18446744073709551616\", " + // xs:decimal is modeled + // as VARCHAR i.e., a string. So needs quotation marks. + "1970-01-01, 00:00, 1970-01-01T00:00:00Z}", map); + assertFalse(rdr.next()); + results.clear(); + } +} diff --git a/contrib/format-daffodil/src/test/java/org/apache/drill/exec/store/daffodil/schema/TestDaffodilToDrillMetadataConversion.java b/contrib/format-daffodil/src/test/java/org/apache/drill/exec/store/daffodil/schema/TestDaffodilToDrillMetadataConversion.java new file mode 100644 index 00000000000..8076fe5e8cf --- /dev/null +++ b/contrib/format-daffodil/src/test/java/org/apache/drill/exec/store/daffodil/schema/TestDaffodilToDrillMetadataConversion.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.store.daffodil.schema; + +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.record.metadata.SchemaBuilder; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.junit.Test; + +import java.net.URI; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestDaffodilToDrillMetadataConversion { + + @Test + public void testSimple() throws Exception { + URI schemaURI = getClass().getResource("/schema/simple.dfdl.xsd").toURI(); + TupleMetadata schema = DrillDaffodilSchemaUtils.processSchema(schemaURI, "row", null); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("col", MinorType.INT) + .buildSchema(); + assertTrue(expectedSchema.isEquivalent(schema)); + } + + @Test + public void testComplex1() throws Exception { + URI schemaURI = getClass().getResource("/schema/complex1.dfdl.xsd").toURI(); + TupleMetadata schema = DrillDaffodilSchemaUtils.processSchema(schemaURI, "row", null); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("a1", MinorType.INT) + .add("a2", MinorType.INT) + .buildSchema(); + assertTrue(expectedSchema.isEquivalent(schema)); + } + + @Test + public void testComplex2() throws Exception { + URI schemaURI = getClass().getResource("/schema/complex2.dfdl.xsd").toURI(); + TupleMetadata schema = DrillDaffodilSchemaUtils.processSchema(schemaURI, "row", null); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("a1", MinorType.INT) + .add("a2", MinorType.INT) + .addMap("b") + .add("b1", MinorType.INT) + .add("b2", MinorType.INT) + .resumeSchema() + .buildSchema(); + assertTrue(expectedSchema.isEquivalent(schema)); + } + +} diff --git a/contrib/format-daffodil/src/test/resources/data/data01Int.dat b/contrib/format-daffodil/src/test/resources/data/data01Int.dat new file mode 100644 index 00000000000..dee9c4c8ada Binary files /dev/null and b/contrib/format-daffodil/src/test/resources/data/data01Int.dat differ diff --git a/contrib/format-daffodil/src/test/resources/data/data01Int.dat.gz b/contrib/format-daffodil/src/test/resources/data/data01Int.dat.gz new file mode 100644 index 00000000000..5e4b3b37acf Binary files /dev/null and b/contrib/format-daffodil/src/test/resources/data/data01Int.dat.gz differ diff --git a/contrib/format-daffodil/src/test/resources/data/data02Int.dat b/contrib/format-daffodil/src/test/resources/data/data02Int.dat new file mode 100644 index 00000000000..8577a259181 Binary files /dev/null and b/contrib/format-daffodil/src/test/resources/data/data02Int.dat differ diff --git a/contrib/format-daffodil/src/test/resources/data/data06Int.dat b/contrib/format-daffodil/src/test/resources/data/data06Int.dat new file mode 100644 index 00000000000..8c29db2ec51 Binary files /dev/null and b/contrib/format-daffodil/src/test/resources/data/data06Int.dat differ diff --git a/contrib/format-daffodil/src/test/resources/data/data12Int.dat b/contrib/format-daffodil/src/test/resources/data/data12Int.dat new file mode 100644 index 00000000000..39fa5271b4f Binary files /dev/null and b/contrib/format-daffodil/src/test/resources/data/data12Int.dat differ diff --git a/contrib/format-daffodil/src/test/resources/data/moreTypes1.txt.dat b/contrib/format-daffodil/src/test/resources/data/moreTypes1.txt.dat new file mode 100644 index 00000000000..cd0b5c1591a --- /dev/null +++ b/contrib/format-daffodil/src/test/resources/data/moreTypes1.txt.dat @@ -0,0 +1,2 @@ +2147483647 9223372036854775807 32767 127 T 1.7976931348623157E308 3.4028235E38 12345678 'daffodil' +-2147483648 -9223372036854775808 -32768 -128 F -1.7976931348623157E308 -3.4028235E38 87654321 'drill' diff --git a/contrib/format-daffodil/src/test/resources/data/moreTypes2.txt.dat b/contrib/format-daffodil/src/test/resources/data/moreTypes2.txt.dat new file mode 100644 index 00000000000..fca536e8543 --- /dev/null +++ b/contrib/format-daffodil/src/test/resources/data/moreTypes2.txt.dat @@ -0,0 +1 @@ +4294967295 18446744073709551615 65535 255 -18446744073709551616 18446744073709551616 0.18446744073709551616 1970-01-01 00:00:00+0000 1970-01-01T00:00:00 diff --git a/contrib/format-daffodil/src/test/resources/schema/complex1.dfdl.xsd b/contrib/format-daffodil/src/test/resources/schema/complex1.dfdl.xsd new file mode 100644 index 00000000000..1f4ab8954ef --- /dev/null +++ b/contrib/format-daffodil/src/test/resources/schema/complex1.dfdl.xsd @@ -0,0 +1,54 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/contrib/format-daffodil/src/test/resources/schema/complex2.dfdl.xsd b/contrib/format-daffodil/src/test/resources/schema/complex2.dfdl.xsd new file mode 100644 index 00000000000..d4d74aefe1f --- /dev/null +++ b/contrib/format-daffodil/src/test/resources/schema/complex2.dfdl.xsd @@ -0,0 +1,65 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/contrib/format-daffodil/src/test/resources/schema/complexArray1.dfdl.xsd b/contrib/format-daffodil/src/test/resources/schema/complexArray1.dfdl.xsd new file mode 100644 index 00000000000..59d285bd31e --- /dev/null +++ b/contrib/format-daffodil/src/test/resources/schema/complexArray1.dfdl.xsd @@ -0,0 +1,65 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/contrib/format-daffodil/src/test/resources/schema/complexArray2.dfdl.xsd b/contrib/format-daffodil/src/test/resources/schema/complexArray2.dfdl.xsd new file mode 100644 index 00000000000..89d74fa2de9 --- /dev/null +++ b/contrib/format-daffodil/src/test/resources/schema/complexArray2.dfdl.xsd @@ -0,0 +1,65 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/contrib/format-daffodil/src/test/resources/schema/moreTypes1.dfdl.xsd b/contrib/format-daffodil/src/test/resources/schema/moreTypes1.dfdl.xsd new file mode 100644 index 00000000000..a294e29e548 --- /dev/null +++ b/contrib/format-daffodil/src/test/resources/schema/moreTypes1.dfdl.xsd @@ -0,0 +1,65 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/contrib/format-daffodil/src/test/resources/schema/moreTypes2.dfdl.xsd b/contrib/format-daffodil/src/test/resources/schema/moreTypes2.dfdl.xsd new file mode 100644 index 00000000000..00bd3d7a155 --- /dev/null +++ b/contrib/format-daffodil/src/test/resources/schema/moreTypes2.dfdl.xsd @@ -0,0 +1,64 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/contrib/format-daffodil/src/test/resources/schema/simple.dfdl.xsd b/contrib/format-daffodil/src/test/resources/schema/simple.dfdl.xsd new file mode 100644 index 00000000000..eea582b9a56 --- /dev/null +++ b/contrib/format-daffodil/src/test/resources/schema/simple.dfdl.xsd @@ -0,0 +1,71 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/contrib/format-daffodil/src/test/resources/schema/simpleArrayField1.dfdl.xsd b/contrib/format-daffodil/src/test/resources/schema/simpleArrayField1.dfdl.xsd new file mode 100644 index 00000000000..6c72c375159 --- /dev/null +++ b/contrib/format-daffodil/src/test/resources/schema/simpleArrayField1.dfdl.xsd @@ -0,0 +1,71 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/contrib/pom.xml b/contrib/pom.xml index 59747eba87f..7379fd37325 100644 --- a/contrib/pom.xml +++ b/contrib/pom.xml @@ -42,6 +42,7 @@ data format-access + format-daffodil format-deltalake format-esri format-excel diff --git a/distribution/pom.xml b/distribution/pom.xml index 80ba9650a8b..90232ee0cd2 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -497,6 +497,11 @@ drill-format-log ${project.version} + + org.apache.drill.contrib + drill-format-daffodil + ${project.version} + org.apache.drill.contrib drill-druid-storage diff --git a/distribution/src/assemble/component.xml b/distribution/src/assemble/component.xml index 66792fd43f6..76ba62978fe 100644 --- a/distribution/src/assemble/component.xml +++ b/distribution/src/assemble/component.xml @@ -29,6 +29,7 @@ org.apache.drill.contrib.data:tpch-sample-data:jar org.apache.drill.contrib:drill-deltalake-format:jar org.apache.drill.contrib:drill-druid-storage:jar + org.apache.drill.contrib:drill-format-daffodil:jar org.apache.drill.contrib:drill-format-esri:jar org.apache.drill.contrib:drill-format-excel:jar org.apache.drill.contrib:drill-format-hdf5:jar diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MapBuilder.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MapBuilder.java index f333cb9cd2b..37809e64826 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MapBuilder.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MapBuilder.java @@ -22,6 +22,8 @@ import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.exec.record.MaterializedField; +import java.util.Objects; + /** * Internal structure for building a map. A map is just a schema, * but one that is part of a parent column. @@ -33,7 +35,7 @@ * All resumeXXX methods do not produce any action and return null. * To access built column {@link #buildColumn()} should be used. */ -public class MapBuilder implements SchemaContainer { +public class MapBuilder implements MapBuilderLike, SchemaContainer { private final SchemaContainer parent; private final TupleBuilder tupleBuilder = new TupleBuilder(); private final String memberName; @@ -68,6 +70,7 @@ public MapBuilder add(String name, MinorType type, DataMode mode) { return this; } + @Override public MapBuilder add(String name, MinorType type) { tupleBuilder.add(name, type); return this; @@ -82,6 +85,7 @@ public MapBuilder add(String name, MinorType type, int precision, int scale) { return addDecimal(name, type, DataMode.REQUIRED, precision, scale); } + @Override public MapBuilder addNullable(String name, MinorType type) { tupleBuilder.addNullable(name, type); return this; @@ -96,6 +100,7 @@ public MapBuilder addNullable(String name, MinorType type, int precision, int sc return addDecimal(name, type, DataMode.OPTIONAL, precision, scale); } + @Override public MapBuilder addArray(String name, MinorType type) { tupleBuilder.addArray(name, type); return this; @@ -129,10 +134,12 @@ public MapBuilder addDecimal(String name, MinorType type, * @param name the name of the map column * @return a builder for the map */ + @Override public MapBuilder addMap(String name) { return tupleBuilder.addMap(this, name); } + @Override public MapBuilder addMapArray(String name) { return tupleBuilder.addMapArray(this, name); } @@ -185,6 +192,27 @@ public MapBuilder resumeMap() { return (MapBuilder) parent; } + /** + * Depending on whether the parent is a schema builder or map builder + * we resume appropriately. + */ + @Override + public void resume() { + if (Objects.isNull(parent)) { + throw new IllegalStateException("Call to resume() on MapBuilder with no parent."); + } + if (parent instanceof MapBuilder) { + resumeMap(); + } else { + assert(parent instanceof SchemaBuilder); + // + // This would be extended for other kinds of possible containers of a Map. + // First version only needed SchemaBuilder parents + // + resumeSchema(); + } + } + public RepeatedListBuilder resumeList() { build(); return (RepeatedListBuilder) parent; diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MapBuilderLike.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MapBuilderLike.java new file mode 100644 index 00000000000..086c6a5b5e7 --- /dev/null +++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MapBuilderLike.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.record.metadata; + +import org.apache.drill.common.types.TypeProtos; + +/** + * A common interface shared by SchemaBuilder and MapBuilder allowing one to do most + * operations for constructing metadata for hierarchical data + * without having to keep track of whether you are dealing with the top-level schema/row + * level or a map within it. + */ +public interface MapBuilderLike { + + MapBuilderLike addArray(String colName, TypeProtos.MinorType drillType); + + MapBuilderLike addNullable(String colName, TypeProtos.MinorType drillType); + + MapBuilderLike add(String colName, TypeProtos.MinorType drillType); + + MapBuilderLike addMapArray(String colName); + + MapBuilderLike addMap(String colName); + + void resume(); +} diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/SchemaBuilder.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/SchemaBuilder.java index c1b97609357..f2aee1d2190 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/SchemaBuilder.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/SchemaBuilder.java @@ -61,7 +61,7 @@ * */ -public class SchemaBuilder implements SchemaContainer { +public class SchemaBuilder implements MapBuilderLike, SchemaContainer { /** * Actual tuple schema builder. The odd layered structure is needed @@ -114,6 +114,7 @@ public SchemaBuilder add(String name, MinorType type, DataMode mode) { return this; } + @Override public SchemaBuilder add(String name, MinorType type) { tupleBuilder.add(name, type); return this; @@ -128,6 +129,7 @@ public SchemaBuilder add(String name, MinorType type, int precision, int scale) return addDecimal(name, type, DataMode.REQUIRED, precision, scale); } + @Override public SchemaBuilder addNullable(String name, MinorType type) { tupleBuilder.addNullable(name, type); return this; @@ -142,6 +144,7 @@ public SchemaBuilder addNullable(String name, MinorType type, int precision, int return addDecimal(name, type, DataMode.OPTIONAL, precision, scale); } + @Override public SchemaBuilder addArray(String name, MinorType type) { tupleBuilder.addArray(name, type); return this; @@ -157,7 +160,7 @@ public SchemaBuilder addDecimal(String name, MinorType type, DataMode mode, int } /** - * Add a multi-dimensional array, implemented as a repeated vector + * Add a multidimensional array, implemented as a repeated vector * along with 0 or more repeated list vectors. * * @param name column name @@ -191,10 +194,12 @@ public SchemaBuilder addAll(TupleMetadata from) { * @param name the name of the map column * @return a builder for the map */ + @Override public MapBuilder addMap(String name) { return tupleBuilder.addMap(this, name); } + @Override public MapBuilder addMapArray(String name) { return tupleBuilder.addMapArray(this, name); } @@ -237,4 +242,9 @@ public TupleMetadata buildSchema() { public TupleMetadata build() { return tupleBuilder.schema(); } + + @Override + public void resume() { + // do nothing. There is nothing else to resume. + } } diff --git a/pom.xml b/pom.xml index 957679eff50..6d444612244 100644 --- a/pom.xml +++ b/pom.xml @@ -70,6 +70,7 @@ 1.10.0 1.7 5.5.0 + 3.8.0 10.14.2.0 3072 apache/drill