diff --git a/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/ComponentFactory.scala b/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/ComponentFactory.scala new file mode 100644 index 00000000..1ec692d2 --- /dev/null +++ b/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/ComponentFactory.scala @@ -0,0 +1,22 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.hyperdrive.ingestor.api + +import org.apache.commons.configuration2.Configuration + +trait ComponentFactory[T] extends HasComponentAttributes { + def apply(config: Configuration): T +} diff --git a/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/ComponentFactoryProvider.scala b/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/ComponentFactoryProvider.scala new file mode 100644 index 00000000..659802f7 --- /dev/null +++ b/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/ComponentFactoryProvider.scala @@ -0,0 +1,25 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.hyperdrive.ingestor.api + +/** + * Implementations of this trait are responsible for providing an instance of a [[ComponentFactory]] via the + * service provider interface (SPI) and need to be registered in a provider configuration file under META-INF/services + * Implementations must have a no-args constructor to be instantiable. + */ +trait ComponentFactoryProvider[T <: ComponentFactory[_]] { + def getComponentFactory: T +} diff --git a/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/HasComponentAttributes.scala b/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/HasComponentAttributes.scala new file mode 100644 index 00000000..5b1cb409 --- /dev/null +++ b/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/HasComponentAttributes.scala @@ -0,0 +1,61 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.hyperdrive.ingestor.api + +case class PropertyMetadata( + /** + * A human readable label for this property. + */ + label: String, + + /** + * An optional text that gives a detailed description of this property. + */ + hint: Option[String], + + /** + * true if the property is required, false otherwise + */ + required: Boolean) + +/** + * This trait should be implemented by all implementations of [[ComponentFactory]] to provide a self-description + * how the component should be used and configured. This information may be used by external applications + * that automatically discover components + */ +trait HasComponentAttributes { + /** + * @return a human readable name of the component. + */ + def getName: String + + /** + * @return a description for the component. + */ + def getDescription: String + + /** + * @return a map describing configuration properties for this component. The keys have to be unique to avoid + * name clashes with properties from other components. + */ + def getProperties: Map[String, PropertyMetadata] + + /** + * @return a prefix to be used for arbitrary extra configuration. Typically extra configuration is required + * to pass on configuration properties, e.g. to DataStreamWriter.options + */ + def getExtraConfigurationPrefix: Option[String] = None +} diff --git a/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/decoder/StreamDecoderFactory.scala b/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/decoder/StreamDecoderFactory.scala index f0b00712..92f1500a 100644 --- a/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/decoder/StreamDecoderFactory.scala +++ b/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/decoder/StreamDecoderFactory.scala @@ -15,8 +15,6 @@ package za.co.absa.hyperdrive.ingestor.api.decoder -import org.apache.commons.configuration2.Configuration +import za.co.absa.hyperdrive.ingestor.api.ComponentFactory -trait StreamDecoderFactory { - def apply(config: Configuration): StreamDecoder -} +trait StreamDecoderFactory extends ComponentFactory[StreamDecoder] diff --git a/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/decoder/StreamDecoderFactoryProvider.scala b/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/decoder/StreamDecoderFactoryProvider.scala new file mode 100644 index 00000000..341c1563 --- /dev/null +++ b/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/decoder/StreamDecoderFactoryProvider.scala @@ -0,0 +1,20 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.hyperdrive.ingestor.api.decoder + +import za.co.absa.hyperdrive.ingestor.api.ComponentFactoryProvider + +trait StreamDecoderFactoryProvider extends ComponentFactoryProvider[StreamDecoderFactory] diff --git a/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/manager/StreamManagerFactory.scala b/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/manager/StreamManagerFactory.scala index 78ac5980..30628f40 100644 --- a/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/manager/StreamManagerFactory.scala +++ b/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/manager/StreamManagerFactory.scala @@ -15,8 +15,6 @@ package za.co.absa.hyperdrive.ingestor.api.manager -import org.apache.commons.configuration2.Configuration +import za.co.absa.hyperdrive.ingestor.api.ComponentFactory -trait StreamManagerFactory { - def apply(config: Configuration): StreamManager -} +trait StreamManagerFactory extends ComponentFactory[StreamManager] diff --git a/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/manager/StreamManagerFactoryProvider.scala b/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/manager/StreamManagerFactoryProvider.scala new file mode 100644 index 00000000..61d48ddc --- /dev/null +++ b/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/manager/StreamManagerFactoryProvider.scala @@ -0,0 +1,20 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.hyperdrive.ingestor.api.manager + +import za.co.absa.hyperdrive.ingestor.api.ComponentFactoryProvider + +trait StreamManagerFactoryProvider extends ComponentFactoryProvider[StreamManagerFactory] diff --git a/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/reader/StreamReaderFactory.scala b/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/reader/StreamReaderFactory.scala index fa00f0ad..23cddaff 100644 --- a/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/reader/StreamReaderFactory.scala +++ b/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/reader/StreamReaderFactory.scala @@ -15,8 +15,6 @@ package za.co.absa.hyperdrive.ingestor.api.reader -import org.apache.commons.configuration2.Configuration +import za.co.absa.hyperdrive.ingestor.api.ComponentFactory -trait StreamReaderFactory { - def apply(conf: Configuration): StreamReader -} +trait StreamReaderFactory extends ComponentFactory[StreamReader] diff --git a/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/reader/StreamReaderFactoryProvider.scala b/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/reader/StreamReaderFactoryProvider.scala new file mode 100644 index 00000000..53fb8085 --- /dev/null +++ b/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/reader/StreamReaderFactoryProvider.scala @@ -0,0 +1,20 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.hyperdrive.ingestor.api.reader + +import za.co.absa.hyperdrive.ingestor.api.ComponentFactoryProvider + +trait StreamReaderFactoryProvider extends ComponentFactoryProvider[StreamReaderFactory] diff --git a/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/transformer/StreamTransformerFactory.scala b/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/transformer/StreamTransformerFactory.scala index 4c589bd9..13e69944 100644 --- a/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/transformer/StreamTransformerFactory.scala +++ b/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/transformer/StreamTransformerFactory.scala @@ -15,8 +15,6 @@ package za.co.absa.hyperdrive.ingestor.api.transformer -import org.apache.commons.configuration2.Configuration +import za.co.absa.hyperdrive.ingestor.api.ComponentFactory -trait StreamTransformerFactory { - def apply(config: Configuration): StreamTransformer -} +trait StreamTransformerFactory extends ComponentFactory[StreamTransformer] diff --git a/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/transformer/StreamTransformerFactoryProvider.scala b/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/transformer/StreamTransformerFactoryProvider.scala new file mode 100644 index 00000000..21d5727c --- /dev/null +++ b/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/transformer/StreamTransformerFactoryProvider.scala @@ -0,0 +1,20 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.hyperdrive.ingestor.api.transformer + +import za.co.absa.hyperdrive.ingestor.api.ComponentFactoryProvider + +trait StreamTransformerFactoryProvider extends ComponentFactoryProvider[StreamTransformerFactory] diff --git a/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/writer/StreamWriterFactory.scala b/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/writer/StreamWriterFactory.scala index dd8cc91d..784f274b 100644 --- a/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/writer/StreamWriterFactory.scala +++ b/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/writer/StreamWriterFactory.scala @@ -15,8 +15,6 @@ package za.co.absa.hyperdrive.ingestor.api.writer -import org.apache.commons.configuration2.Configuration +import za.co.absa.hyperdrive.ingestor.api.ComponentFactory -trait StreamWriterFactory { - def apply(config: Configuration): StreamWriter -} +trait StreamWriterFactory extends ComponentFactory[StreamWriter] diff --git a/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/writer/StreamWriterFactoryProvider.scala b/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/writer/StreamWriterFactoryProvider.scala new file mode 100644 index 00000000..ac90ed06 --- /dev/null +++ b/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/writer/StreamWriterFactoryProvider.scala @@ -0,0 +1,20 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.hyperdrive.ingestor.api.writer + +import za.co.absa.hyperdrive.ingestor.api.ComponentFactoryProvider + +trait StreamWriterFactoryProvider extends ComponentFactoryProvider[StreamWriterFactory] diff --git a/component-archetype/README.md b/component-archetype/README.md index a3f35b57..8343d079 100644 --- a/component-archetype/README.md +++ b/component-archetype/README.md @@ -16,17 +16,16 @@ ~ --> -Hyperdrive Component Archetype -============================== +# Hyperdrive Component Archetype This is a Maven archetype for creating custom Hyperdrive components. -Creating a custom Hyperdrive components project ------------------------------------------------ +## Creating a custom Hyperdrive components project + Download the artifact to your local maven repository ``` -mvn dependency:get -Dartifact=za.co.absa.hyperdrive:component-archetype:2.0.0 +mvn dependency:get -Dartifact=za.co.absa.hyperdrive:component-archetype:3.0.0 ``` Update the local archetype catalog @@ -39,29 +38,38 @@ Generate a skeleton project by executing the following command mvn archetype:generate \ -DarchetypeGroupId=za.co.absa.hyperdrive \ -DarchetypeArtifactId=component-archetype \ - -DarchetypeVersion=2.0.0 \ + -DarchetypeVersion=3.0.0 \ -DgroupId= \ -DartifactId= \ -Dversion= ``` - `` is your group id, e.g. com.acme, - `` is the name for your artifact, e.g. mytransformer, -- `` is the version number of the artifact, e.g. 0.1.0-SNAPSHOT and +- `` is the version number of the artifact, e.g. 0.1.0-SNAPSHOT + +## Implementing a Hyperdrive component -Implementing a Hyperdrive component ------------------------------------ +There are five types of Hyperdrive components: Reader, Decoder, Transformer, Writer and Manager. -There are five types of Hyperdrive components: Decoder, Manager, Reader, Transformer and Writer. +### Component stubs This archetype provides stubs for all types, which are located under `/src/main/scala//` -For example, if you need to implement a custom transformer, you should modify the stubs `/src/main/scala/{groupId}/transformer/mycomponent/MyStreamTransformerImpl` +For example, if you need to implement a custom transformer, you should modify the stubs in + `/src/main/scala/{groupId}/transformer/mycomponent/MyStreamTransformerImpl` + +If you don't need to implement a component type, you should delete the corresponding stubs. + E.g. if you don't need to implement the writer, delete the folder `/src/main/scala/{groupId}/writer` -If you don't need to implement a component type, you should delete the corresponding stubs. E.g. if you don't need to implement the writer, delete the folder `/src/main/scala/{groupId}/writer` +### Service provider configuration +Components need to be registered using the Java Service Provider Interface (SPI). There are configuration file templates + under `/src/resources/META-INF/services`. If you don't need a component type, + you should delete the corresponding configuration files. + +A model test to verify the configuration is available under `/test/scala/ServiceProviderConfigurationTest.scala` -Building Hyperdrive components ------------------------------- +## Building Hyperdrive components ``` % cd / % mvn clean package diff --git a/component-archetype/src/main/resources/META-INF/maven/archetype-metadata.xml b/component-archetype/src/main/resources/META-INF/maven/archetype-metadata.xml index c65ee5ef..7bffefad 100644 --- a/component-archetype/src/main/resources/META-INF/maven/archetype-metadata.xml +++ b/component-archetype/src/main/resources/META-INF/maven/archetype-metadata.xml @@ -15,16 +15,16 @@ --> + xmlns="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + name="archetype"> - .gitignore + .gitignore @@ -32,8 +32,14 @@ **/*.scala - - + + + src/main/resources + + **/* + + + src/test/scala **/*.scala diff --git a/component-archetype/src/main/resources/archetype-resources/pom.xml b/component-archetype/src/main/resources/archetype-resources/pom.xml index 260cf4d8..1a82e732 100644 --- a/component-archetype/src/main/resources/archetype-resources/pom.xml +++ b/component-archetype/src/main/resources/archetype-resources/pom.xml @@ -30,11 +30,18 @@ UTF-8 UTF-8 - + + ${project.version} + + 3.1.2 ${scala.version} ${scala.maven.plugin.version} - ${project.version} + + + ${scala.compat.version} + ${scalatest.version} + ${scalatest.maven.version} @@ -45,6 +52,14 @@ \${hyperdrive.version} provided + + + + org.scalatest + scalatest_\${scala.compat.version} + \${scalatest.version} + test + @@ -90,11 +105,38 @@ - ${hyperdrive.version} + \${hyperdrive.version} + + + + maven-surefire-plugin + ${maven.surefire.plugin.version} + + true + + + + org.scalatest + scalatest-maven-plugin + \${scalatest.maven.version} + + ${project.build.directory}/surefire-reports + . + TestSuite.txt + + + + unit-tests + + test + + + + diff --git a/component-archetype/src/main/resources/archetype-resources/src/main/resources/META-INF/services/za.co.absa.hyperdrive.ingestor.api.decoder.StreamDecoderFactoryProvider b/component-archetype/src/main/resources/archetype-resources/src/main/resources/META-INF/services/za.co.absa.hyperdrive.ingestor.api.decoder.StreamDecoderFactoryProvider new file mode 100644 index 00000000..d9362105 --- /dev/null +++ b/component-archetype/src/main/resources/archetype-resources/src/main/resources/META-INF/services/za.co.absa.hyperdrive.ingestor.api.decoder.StreamDecoderFactoryProvider @@ -0,0 +1,18 @@ +# +# Copyright 2018 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Replace the following line with the fully qualified name of your implementation. +# Multiple implementations can be registered, one per line. +${package}.decoder.mycomponent.MyStreamDecoderImplLoader diff --git a/component-archetype/src/main/resources/archetype-resources/src/main/resources/META-INF/services/za.co.absa.hyperdrive.ingestor.api.manager.StreamManagerFactoryProvider b/component-archetype/src/main/resources/archetype-resources/src/main/resources/META-INF/services/za.co.absa.hyperdrive.ingestor.api.manager.StreamManagerFactoryProvider new file mode 100644 index 00000000..cc6fccec --- /dev/null +++ b/component-archetype/src/main/resources/archetype-resources/src/main/resources/META-INF/services/za.co.absa.hyperdrive.ingestor.api.manager.StreamManagerFactoryProvider @@ -0,0 +1,18 @@ +# +# Copyright 2018 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Replace the following line with the fully qualified name of your implementation. +# Multiple implementations can be registered, one per line. +${package}.manager.mycomponent.MyStreamManagerImplLoader diff --git a/component-archetype/src/main/resources/archetype-resources/src/main/resources/META-INF/services/za.co.absa.hyperdrive.ingestor.api.reader.StreamReaderFactoryProvider b/component-archetype/src/main/resources/archetype-resources/src/main/resources/META-INF/services/za.co.absa.hyperdrive.ingestor.api.reader.StreamReaderFactoryProvider new file mode 100644 index 00000000..2d5b823f --- /dev/null +++ b/component-archetype/src/main/resources/archetype-resources/src/main/resources/META-INF/services/za.co.absa.hyperdrive.ingestor.api.reader.StreamReaderFactoryProvider @@ -0,0 +1,18 @@ +# +# Copyright 2018 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Replace the following line with the fully qualified name of your implementation. +# Multiple implementations can be registered, one per line. +${package}.reader.mycomponent.MyStreamReaderImplLoader diff --git a/component-archetype/src/main/resources/archetype-resources/src/main/resources/META-INF/services/za.co.absa.hyperdrive.ingestor.api.transformer.StreamTransformerFactoryProvider b/component-archetype/src/main/resources/archetype-resources/src/main/resources/META-INF/services/za.co.absa.hyperdrive.ingestor.api.transformer.StreamTransformerFactoryProvider new file mode 100644 index 00000000..cc48733b --- /dev/null +++ b/component-archetype/src/main/resources/archetype-resources/src/main/resources/META-INF/services/za.co.absa.hyperdrive.ingestor.api.transformer.StreamTransformerFactoryProvider @@ -0,0 +1,18 @@ +# +# Copyright 2018 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Replace the following line with the fully qualified name of your implementation. +# Multiple implementations can be registered, one per line. +${package}.transformer.mycomponent.MyStreamTransformerImplLoader diff --git a/component-archetype/src/main/resources/archetype-resources/src/main/resources/META-INF/services/za.co.absa.hyperdrive.ingestor.api.writer.StreamWriterFactoryProvider b/component-archetype/src/main/resources/archetype-resources/src/main/resources/META-INF/services/za.co.absa.hyperdrive.ingestor.api.writer.StreamWriterFactoryProvider new file mode 100644 index 00000000..2109ffbb --- /dev/null +++ b/component-archetype/src/main/resources/archetype-resources/src/main/resources/META-INF/services/za.co.absa.hyperdrive.ingestor.api.writer.StreamWriterFactoryProvider @@ -0,0 +1,18 @@ +# +# Copyright 2018 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Replace the following line with the fully qualified name of your implementation. +# Multiple implementations can be registered, one per line. +${package}.writer.mycomponent.MyStreamWriterImplLoader diff --git a/component-archetype/src/main/resources/archetype-resources/src/main/scala/decoder/mycomponent/MyStreamDecoderImpl.scala b/component-archetype/src/main/resources/archetype-resources/src/main/scala/decoder/mycomponent/MyStreamDecoderImpl.scala index 8b574f9d..120c4b84 100644 --- a/component-archetype/src/main/resources/archetype-resources/src/main/scala/decoder/mycomponent/MyStreamDecoderImpl.scala +++ b/component-archetype/src/main/resources/archetype-resources/src/main/scala/decoder/mycomponent/MyStreamDecoderImpl.scala @@ -21,7 +21,8 @@ import org.apache.commons.configuration2.Configuration import org.apache.logging.log4j.LogManager import org.apache.spark.sql.DataFrame import org.apache.spark.sql.streaming.DataStreamReader -import za.co.absa.hyperdrive.ingestor.api.decoder.{StreamDecoder, StreamDecoderFactory} +import za.co.absa.hyperdrive.ingestor.api.decoder.{StreamDecoder, StreamDecoderFactory, StreamDecoderFactoryProvider} +import za.co.absa.hyperdrive.ingestor.api.{HasComponentAttributes, PropertyMetadata} /** * This is a stub for a custom implementation of a StreamDecoder @@ -31,7 +32,7 @@ private[decoder] class MyStreamDecoderImpl extends StreamDecoder { override def decode(streamReader: DataStreamReader): DataFrame = ??? } -object MyStreamDecoderImpl extends StreamDecoderFactory { +object MyStreamDecoderImpl extends StreamDecoderFactory with MyStreamDecoderImplAttributes { private val logger = LogManager.getLogger override def apply(conf: Configuration): StreamDecoder = { @@ -39,3 +40,18 @@ object MyStreamDecoderImpl extends StreamDecoderFactory { new MyStreamDecoderImpl() } } + +trait MyStreamDecoderImplAttributes extends HasComponentAttributes { + + override def getName: String = "My Stream Decoder" + + override def getDescription: String = "This component is a stub" + + override def getProperties: Map[String, PropertyMetadata] = Map() + + override def getExtraConfigurationPrefix: Option[String] = None +} + +class MyStreamDecoderImplLoader extends StreamDecoderFactoryProvider { + override def getComponentFactory: StreamDecoderFactory = MyStreamDecoderImpl +} diff --git a/component-archetype/src/main/resources/archetype-resources/src/main/scala/manager/mycomponent/MyStreamManagerImpl.scala b/component-archetype/src/main/resources/archetype-resources/src/main/scala/manager/mycomponent/MyStreamManagerImpl.scala index 8f3309b0..c82ccfb0 100644 --- a/component-archetype/src/main/resources/archetype-resources/src/main/scala/manager/mycomponent/MyStreamManagerImpl.scala +++ b/component-archetype/src/main/resources/archetype-resources/src/main/scala/manager/mycomponent/MyStreamManagerImpl.scala @@ -21,7 +21,8 @@ import org.apache.commons.configuration2.Configuration import org.apache.logging.log4j.LogManager import org.apache.spark.sql.Row import org.apache.spark.sql.streaming.{DataStreamReader, DataStreamWriter} -import za.co.absa.hyperdrive.ingestor.api.manager.{StreamManager, StreamManagerFactory} +import za.co.absa.hyperdrive.ingestor.api.manager.{StreamManager, StreamManagerFactory, StreamManagerFactoryProvider} +import za.co.absa.hyperdrive.ingestor.api.{HasComponentAttributes, PropertyMetadata} /** * This is a stub for a custom implementation of a StreamManager @@ -34,7 +35,7 @@ private[manager] class MyStreamManagerImpl extends StreamManager { override def configure(streamWriter: DataStreamWriter[Row], configuration: org.apache.hadoop.conf.Configuration): DataStreamWriter[Row] = ??? } -object MyStreamManagerImpl extends StreamManagerFactory { +object MyStreamManagerImpl extends StreamManagerFactory with MyStreamManagerImplAttributes { private val logger = LogManager.getLogger override def apply(conf: Configuration): StreamManager = { @@ -42,3 +43,19 @@ object MyStreamManagerImpl extends StreamManagerFactory { new MyStreamManagerImpl() } } + +trait MyStreamManagerImplAttributes extends HasComponentAttributes { + + override def getName: String = "My Stream Manager" + + override def getDescription: String = "This component is a stub" + + override def getProperties: Map[String, PropertyMetadata] = Map() + + override def getExtraConfigurationPrefix: Option[String] = None + +} + +class MyStreamManagerImplLoader extends StreamManagerFactoryProvider { + override def getComponentFactory: StreamManagerFactory = MyStreamManagerImpl +} diff --git a/component-archetype/src/main/resources/archetype-resources/src/main/scala/reader/mycomponent/MyStreamReaderImpl.scala b/component-archetype/src/main/resources/archetype-resources/src/main/scala/reader/mycomponent/MyStreamReaderImpl.scala index 635024eb..8c0eadd9 100644 --- a/component-archetype/src/main/resources/archetype-resources/src/main/scala/reader/mycomponent/MyStreamReaderImpl.scala +++ b/component-archetype/src/main/resources/archetype-resources/src/main/scala/reader/mycomponent/MyStreamReaderImpl.scala @@ -21,20 +21,19 @@ import org.apache.commons.configuration2.Configuration import org.apache.logging.log4j.LogManager import org.apache.spark.sql.SparkSession import org.apache.spark.sql.streaming.DataStreamReader -import za.co.absa.hyperdrive.ingestor.api.reader.{StreamReader, StreamReaderFactory} +import za.co.absa.hyperdrive.ingestor.api.reader.{StreamReader, StreamReaderFactory, StreamReaderFactoryProvider} +import za.co.absa.hyperdrive.ingestor.api.{HasComponentAttributes, PropertyMetadata} + /** * This is a stub for a custom implementation of a StreamReader */ private[reader] class MyStreamReaderImpl() extends StreamReader { - override def read(spark: SparkSession): DataStreamReader = ??? - - override def getSourceName: String = ??? } -object MyStreamReaderImpl extends StreamReaderFactory { +object MyStreamReaderImpl extends StreamReaderFactory with MyStreamReaderImplAttributes { private val logger = LogManager.getLogger override def apply(conf: Configuration): StreamReader = { @@ -42,3 +41,18 @@ object MyStreamReaderImpl extends StreamReaderFactory { new MyStreamReaderImpl() } } + +trait MyStreamReaderImplAttributes extends HasComponentAttributes { + + override def getName: String = "My Stream Reader" + + override def getDescription: String = "This component is a stub" + + override def getProperties: Map[String, PropertyMetadata] = Map() + + override def getExtraConfigurationPrefix: Option[String] = None +} + +class MyStreamReaderImplLoader extends StreamReaderFactoryProvider { + override def getComponentFactory: StreamReaderFactory = MyStreamReaderImpl +} diff --git a/component-archetype/src/main/resources/archetype-resources/src/main/scala/transformer/mycomponent/MyStreamTransformerImpl.scala b/component-archetype/src/main/resources/archetype-resources/src/main/scala/transformer/mycomponent/MyStreamTransformerImpl.scala index c6b93030..4bdfa7a8 100644 --- a/component-archetype/src/main/resources/archetype-resources/src/main/scala/transformer/mycomponent/MyStreamTransformerImpl.scala +++ b/component-archetype/src/main/resources/archetype-resources/src/main/scala/transformer/mycomponent/MyStreamTransformerImpl.scala @@ -20,7 +20,8 @@ package ${package}.transformer.mycomponent import org.apache.commons.configuration2.Configuration import org.apache.logging.log4j.LogManager import org.apache.spark.sql.DataFrame -import za.co.absa.hyperdrive.ingestor.api.transformer.{StreamTransformer, StreamTransformerFactory} +import za.co.absa.hyperdrive.ingestor.api.transformer.{StreamTransformer, StreamTransformerFactory, StreamTransformerFactoryProvider} +import za.co.absa.hyperdrive.ingestor.api.{HasComponentAttributes, PropertyMetadata} /** * This is a stub for a custom implementation of a StreamTransformer @@ -31,7 +32,7 @@ private[transformer] class MyStreamTransformerImpl() extends StreamTransformer { override def transform(streamData: DataFrame): DataFrame = ??? } -object MyStreamTransformerImpl extends StreamTransformerFactory { +object MyStreamTransformerImpl extends StreamTransformerFactory with MyStreamTransformerImplAttributes { private val logger = LogManager.getLogger override def apply(conf: Configuration): StreamTransformer = { @@ -39,3 +40,18 @@ object MyStreamTransformerImpl extends StreamTransformerFactory { new MyStreamTransformerImpl() } } + +trait MyStreamTransformerImplAttributes extends HasComponentAttributes { + + override def getName: String = "My Stream Transformer" + + override def getDescription: String = "This component is a stub" + + override def getProperties: Map[String, PropertyMetadata] = Map() + + override def getExtraConfigurationPrefix: Option[String] = None +} + +class MyStreamTransformerImplLoader extends StreamTransformerFactoryProvider { + override def getComponentFactory: StreamTransformerFactory = MyStreamTransformerImpl +} diff --git a/component-archetype/src/main/resources/archetype-resources/src/main/scala/writer/mycomponent/MyStreamWriterImpl.scala b/component-archetype/src/main/resources/archetype-resources/src/main/scala/writer/mycomponent/MyStreamWriterImpl.scala index 47d58f94..482eeba8 100644 --- a/component-archetype/src/main/resources/archetype-resources/src/main/scala/writer/mycomponent/MyStreamWriterImpl.scala +++ b/component-archetype/src/main/resources/archetype-resources/src/main/scala/writer/mycomponent/MyStreamWriterImpl.scala @@ -21,8 +21,9 @@ import org.apache.commons.configuration2.Configuration import org.apache.logging.log4j.LogManager import org.apache.spark.sql.DataFrame import org.apache.spark.sql.streaming.StreamingQuery -import za.co.absa.hyperdrive.ingestor.api.writer.{StreamWriter, StreamWriterFactory} +import za.co.absa.hyperdrive.ingestor.api.writer.{StreamWriter, StreamWriterFactory, StreamWriterFactoryProvider} import za.co.absa.hyperdrive.ingestor.api.manager.StreamManager +import za.co.absa.hyperdrive.ingestor.api.{HasComponentAttributes, PropertyMetadata} /** @@ -30,13 +31,10 @@ import za.co.absa.hyperdrive.ingestor.api.manager.StreamManager */ private[writer] class MyStreamWriterImpl(val destination: String) extends StreamWriter { - override def write(dataFrame: DataFrame, streamManager: StreamManager): StreamingQuery = ??? - - override def getDestination: String = destination } -object MyStreamWriterImpl extends StreamWriterFactory { +object MyStreamWriterImpl extends StreamWriterFactory with MyStreamWriterImplAttributes { private val logger = LogManager.getLogger override def apply(conf: Configuration): StreamWriter = { @@ -44,3 +42,18 @@ object MyStreamWriterImpl extends StreamWriterFactory { new MyStreamWriterImpl("destination") } } + +trait MyStreamWriterImplAttributes extends HasComponentAttributes { + + override def getName: String = "My Stream Writer" + + override def getDescription: String = "This component is a stub" + + override def getProperties: Map[String, PropertyMetadata] = Map() + + override def getExtraConfigurationPrefix: Option[String] = None +} + +class MyStreamWriterImplLoader extends StreamWriterFactoryProvider { + override def getComponentFactory: StreamWriterFactory = MyStreamWriterImpl +} diff --git a/component-archetype/src/main/resources/archetype-resources/src/test/scala/ServiceProviderConfigurationTest.scala b/component-archetype/src/main/resources/archetype-resources/src/test/scala/ServiceProviderConfigurationTest.scala new file mode 100644 index 00000000..da4b793f --- /dev/null +++ b/component-archetype/src/main/resources/archetype-resources/src/test/scala/ServiceProviderConfigurationTest.scala @@ -0,0 +1,59 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ${package} + +import java.util.ServiceLoader + +import org.scalatest.{FlatSpec, Matchers} +import za.co.absa.hyperdrive.ingestor.api.decoder.StreamDecoderFactoryProvider +import za.co.absa.hyperdrive.ingestor.api.manager.StreamManagerFactoryProvider +import za.co.absa.hyperdrive.ingestor.api.reader.StreamReaderFactoryProvider +import za.co.absa.hyperdrive.ingestor.api.transformer.StreamTransformerFactoryProvider +import za.co.absa.hyperdrive.ingestor.api.writer.StreamWriterFactoryProvider +import ${package}.decoder.mycomponent.MyStreamDecoderImpl +import ${package}.manager.mycomponent.MyStreamManagerImpl +import ${package}.reader.mycomponent.MyStreamReaderImpl +import ${package}.transformer.mycomponent.MyStreamTransformerImpl +import ${package}.writer.mycomponent.MyStreamWriterImpl + +class ServiceProviderConfigurationTest extends FlatSpec with Matchers { + behavior of "Service Provider Configuration (META-INF/services)" + + it should "load configured factories" in { + val classLoader = this.getClass.getClassLoader + import scala.collection.JavaConverters._ + + val readerFactory = ServiceLoader.load(classOf[StreamReaderFactoryProvider], classLoader).asScala + .map(_.getComponentFactory).toList + readerFactory should contain only MyStreamReaderImpl + + val decoderFactory = ServiceLoader.load(classOf[StreamDecoderFactoryProvider], classLoader).asScala + .map(_.getComponentFactory).toList + decoderFactory should contain only MyStreamDecoderImpl + + val transformerFactory = ServiceLoader.load(classOf[StreamTransformerFactoryProvider], classLoader).asScala + .map(_.getComponentFactory).toList + transformerFactory should contain only MyStreamTransformerImpl + + val writerFactory = ServiceLoader.load(classOf[StreamWriterFactoryProvider], classLoader).asScala + .map(_.getComponentFactory).toList + writerFactory should contain only MyStreamWriterImpl + + val managerFactory = ServiceLoader.load(classOf[StreamManagerFactoryProvider], classLoader).asScala + .map(_.getComponentFactory).toList + managerFactory should contain only MyStreamManagerImpl + } +} diff --git a/component-scanner/src/main/resources/log4j2.xml b/component-scanner/src/main/resources/log4j2.xml new file mode 100644 index 00000000..8414f9ae --- /dev/null +++ b/component-scanner/src/main/resources/log4j2.xml @@ -0,0 +1,28 @@ + + + + + + + + + + + + + + + diff --git a/component-scanner/src/main/scala/za/co/absa/hyperdrive/scanner/ComponentScanner.scala b/component-scanner/src/main/scala/za/co/absa/hyperdrive/scanner/ComponentScanner.scala index 6d1e8efd..1f9d6983 100644 --- a/component-scanner/src/main/scala/za/co/absa/hyperdrive/scanner/ComponentScanner.scala +++ b/component-scanner/src/main/scala/za/co/absa/hyperdrive/scanner/ComponentScanner.scala @@ -15,41 +15,96 @@ package za.co.absa.hyperdrive.scanner -import java.io.File +import java.net.URLClassLoader +import java.nio.file.{Files, Path} +import java.util.ServiceLoader -import za.co.absa.hyperdrive.ingestor.api.decoder.StreamDecoderFactory -import za.co.absa.hyperdrive.ingestor.api.manager.StreamManagerFactory -import za.co.absa.hyperdrive.ingestor.api.reader.StreamReaderFactory -import za.co.absa.hyperdrive.ingestor.api.transformer.StreamTransformerFactory -import za.co.absa.hyperdrive.ingestor.api.writer.StreamWriterFactory +import org.apache.logging.log4j.LogManager +import za.co.absa.hyperdrive.ingestor.api.decoder.{StreamDecoderFactory, StreamDecoderFactoryProvider} +import za.co.absa.hyperdrive.ingestor.api.manager.{StreamManagerFactory, StreamManagerFactoryProvider} +import za.co.absa.hyperdrive.ingestor.api.reader.{StreamReaderFactory, StreamReaderFactoryProvider} +import za.co.absa.hyperdrive.ingestor.api.transformer.{StreamTransformerFactory, StreamTransformerFactoryProvider} +import za.co.absa.hyperdrive.ingestor.api.writer.{StreamWriterFactory, StreamWriterFactoryProvider} +import za.co.absa.hyperdrive.ingestor.api.{ComponentFactory, ComponentFactoryProvider, HasComponentAttributes} -import scala.reflect.runtime.{universe => ru} -import scala.util.Try +import scala.reflect.ClassTag +import scala.util.{Failure, Success, Try} -case class ComponentInfo(fullyQualifiedName: String, - humanReadableName: String, - jarPath: String) +case class ComponentDescriptors(readers: Seq[ComponentDescriptor], + decoders: Seq[ComponentDescriptor], + transformers: Seq[ComponentDescriptor], + writers: Seq[ComponentDescriptor], + managers: Seq[ComponentDescriptor]) + +case class ComponentDescriptor(attributes: HasComponentAttributes, + fullyQualifiedName: String, + jarPath: Path) object ComponentScanner { + private val logger = LogManager.getLogger + private val jarSuffix = ".jar" - def getStreamReaderComponents(directory: File): Try[List[ComponentInfo]] = getComponentsInfo(directory, ru.symbolOf[StreamReaderFactory]) + def getComponents(baseDirectory: Path): Try[ComponentDescriptors] = getComponents(List(baseDirectory)) - def getStreamManagerComponents(directory: File): Try[List[ComponentInfo]] = getComponentsInfo(directory, ru.symbolOf[StreamManagerFactory]) + def getComponents(baseDirectories: List[Path]): Try[ComponentDescriptors] = { + Try( + flatten(baseDirectories + .flatMap(findJarsInDirectory) + .map(jar => findComponentsInJar(jar))) + ) + } - def getStreamDecoderComponents(directory: File): Try[List[ComponentInfo]] = getComponentsInfo(directory, ru.symbolOf[StreamDecoderFactory]) + private def flatten(componentDescriptors: Seq[ComponentDescriptors]) = { + ComponentDescriptors( + componentDescriptors.flatMap(_.readers), + componentDescriptors.flatMap(_.decoders), + componentDescriptors.flatMap(_.transformers), + componentDescriptors.flatMap(_.writers), + componentDescriptors.flatMap(_.managers) + ) + } - def getStreamTransformerComponents(directory: File): Try[List[ComponentInfo]] = getComponentsInfo(directory, ru.symbolOf[StreamTransformerFactory]) + private def findJarsInDirectory(directory: Path): List[Path] = { + if (!Files.exists(directory)) { + throw new IllegalArgumentException(s"Directory $directory does not exist") + } + if (!Files.isDirectory(directory)) { + throw new IllegalArgumentException(s"Argument $directory is not a directory") + } - def getStreamWriterComponents(directory: File): Try[List[ComponentInfo]] = getComponentsInfo(directory, ru.symbolOf[StreamWriterFactory]) + import scala.collection.JavaConverters._ + Files.list(directory) + .iterator() + .asScala + .filter(p => Files.isRegularFile(p)) + .filter(p => p.toString.endsWith(jarSuffix)) + .toList + } - def getComponentsInfo(directory: File, factoryType: ru.TypeSymbol): Try[List[ComponentInfo]] = { - for { - objectsInfo <- ObjectScanner.getObjectsInfo(directory, factoryType) - componentsInfo <- Try(objectsInfo.map{case (className, jarPath) => ComponentInfo(className, getHumanReadableName(className), jarPath)}) - } yield componentsInfo + private def findComponentsInJar(jar: Path): ComponentDescriptors = { + val classLoader = new URLClassLoader(Array(jar.toUri.toURL)) + ComponentDescriptors( + loadService[StreamReaderFactoryProvider, StreamReaderFactory](classLoader, jar), + loadService[StreamDecoderFactoryProvider, StreamDecoderFactory](classLoader, jar), + loadService[StreamTransformerFactoryProvider, StreamTransformerFactory](classLoader, jar), + loadService[StreamWriterFactoryProvider, StreamWriterFactory](classLoader, jar), + loadService[StreamManagerFactoryProvider, StreamManagerFactory](classLoader, jar) + ) } - private def getHumanReadableName(fullyQualifiedName: String) = { - if (fullyQualifiedName.takeRight(1) == "$") fullyQualifiedName.dropRight(1) else fullyQualifiedName + private def loadService[P <: ComponentFactoryProvider[F], F <: ComponentFactory[_]](classLoader: ClassLoader, jar: Path)(implicit classTag: ClassTag[P]): List[ComponentDescriptor] = { + import scala.collection.JavaConverters._ + Try(ServiceLoader.load(classTag.runtimeClass, classLoader) + .asScala + .map(untypedClass => untypedClass.asInstanceOf[P]) + .map(provider => provider.getComponentFactory) + .map(factory => ComponentDescriptor(factory, factory.getClass.getName, jar)) + .toList + ) match { + case Failure(exception) => + logger.warn(s"Could not load components from ${jar.toAbsolutePath}", exception) + List() + case Success(components) => components + } } } diff --git a/component-scanner/src/main/scala/za/co/absa/hyperdrive/scanner/ObjectScanner.scala b/component-scanner/src/main/scala/za/co/absa/hyperdrive/scanner/ObjectScanner.scala deleted file mode 100644 index bc506cab..00000000 --- a/component-scanner/src/main/scala/za/co/absa/hyperdrive/scanner/ObjectScanner.scala +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Copyright 2018 ABSA Group Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package za.co.absa.hyperdrive.scanner - -import java.io.File -import java.net.URLClassLoader -import java.util.zip.ZipFile -import scala.collection.JavaConverters._ -import scala.reflect.runtime.{universe => ru} -import scala.util.Try - -object ObjectScanner { - def getObjectsInfo(directory: File, baseClass: ru.TypeSymbol): Try[List[(String, String)]] = { - Try(findAllJarsInDirectory(directory) - .flatMap(findObjectsOfType(baseClass, _))) - } - - private def findAllJarsInDirectory(directory: File): List[File] = { - if (!directory.exists()) throw new IllegalArgumentException(s"Directory $directory does not exist") - if (!directory.isDirectory) throw new IllegalArgumentException(s"Argument $directory is not a directory") - findAllJarsInDirectoryRecursively(directory) - } - - private def findAllJarsInDirectoryRecursively(directory: File): List[File] = { - val jarsInSubdirectories = directory - .listFiles() - .filter(_.isDirectory) - .flatMap(findAllJarsInDirectoryRecursively) - - val jars = directory - .listFiles() - .filter(_.isFile) - .filter(_.getName.endsWith(".jar")) - .toList - - jars ++ jarsInSubdirectories - } - - private def findObjectsOfType(baseClass: ru.TypeSymbol, file: File): List[(String, String)] = { - val zipFile = new ZipFile(file.getPath) - val classLoader = new URLClassLoader(Array(file.toURI.toURL)) - val objects = zipFile - .stream - .iterator - .asScala - .map(_.getName) - .filter(_.endsWith(".class")) - .map(_.replace(".class", "").replace('/', '.')) - .map(loadObjectOfType(_, baseClass, classLoader)) - .filter(_.isDefined) - .map(_.get) - .map((_, file.getAbsolutePath)) - .toList - zipFile.close() - - objects - } - - private def loadObjectOfType(fullyQualifiedName: String, baseClass: ru.TypeSymbol, classLoader: ClassLoader): Option[String] = { - val mirror = ru.runtimeMirror(classLoader) - val module = mirror.staticClass(fullyQualifiedName) - val classSymbol = mirror.reflectClass(module).symbol - val baseClasses = classSymbol.baseClasses - if (baseClasses.contains(baseClass) && !classSymbol.isAbstract && !classSymbol.isTrait) Some(fullyQualifiedName) else None - } -} diff --git a/component-scanner/src/test/scala/za/co/absa/hyperdrive/scanner/JarTestUtils.scala b/component-scanner/src/test/scala/za/co/absa/hyperdrive/scanner/JarTestUtils.scala index 143d2f09..bb79ef8e 100644 --- a/component-scanner/src/test/scala/za/co/absa/hyperdrive/scanner/JarTestUtils.scala +++ b/component-scanner/src/test/scala/za/co/absa/hyperdrive/scanner/JarTestUtils.scala @@ -15,21 +15,24 @@ package za.co.absa.hyperdrive.scanner -import java.io._ +import java.io.{BufferedInputStream, IOException} +import java.nio.file.{Files, Path, Paths} import java.util.jar.{Attributes, JarEntry, JarOutputStream, Manifest} object JarTestUtils { val BUFFER_SIZE = 1024 - def createJar(baseDir: File, jarName: String, filenames: List[String]): File = { - val content = filenames.map(filename => new File(getClass.getClassLoader.getResource(filename).toURI) -> filename).toMap - JarTestUtils.createJar(baseDir, jarName, content) + private val serviceProviderPath = "META-INF/services" + + def createJar(baseDir: Path, jarName: String, filenames: List[String], serviceProviders: Map[String, List[String]] = Map()): Path = { + val content = filenames.map(filename => Paths.get(getClass.getClassLoader.getResource(filename).toURI) -> filename).toMap + createJar(baseDir, jarName, content, serviceProviders) } - def createJar(baseDir: File, jarName: String, content: Map[File, String]): File = { - val jarFile = new File(baseDir, jarName) - addEntries(jarFile, createManifest(), content) + private def createJar(baseDir: Path, jarName: String, content: Map[Path, String], serviceProviders: Map[String, List[String]]): Path = { + val jarFile = baseDir.resolve(jarName) + addEntries(jarFile, createManifest(), content, serviceProviders) jarFile } @@ -41,23 +44,24 @@ object JarTestUtils { manifest } - private def addEntries(destJarFile: File, manifest: Manifest, content: Map[File, String]): Unit = { - val outputJar = new JarOutputStream(new FileOutputStream(destJarFile.getAbsolutePath), manifest) + private def addEntries(destJarFile: Path, manifest: Manifest, content: Map[Path, String], serviceProviders: Map[String, List[String]]): Unit = { + val outputJar = new JarOutputStream(Files.newOutputStream(destJarFile.toAbsolutePath), manifest) content.foreach(entry => add(entry._1, entry._2, outputJar)) + serviceProviders.foreach(entry => addServiceProvider(entry._1, entry._2, outputJar)) outputJar.close() } @throws[IOException] - private def add(source: File, targetPath: String, outputJar: JarOutputStream): Unit = { - if (source.isDirectory) { + private def add(source: Path, targetPath: String, outputJar: JarOutputStream): Unit = { + if (Files.isDirectory(source)) { throw new UnsupportedOperationException("Adding directories to jars is not supported") } else { val entry = new JarEntry(targetPath.replace("\\", "/")) - entry.setTime(source.lastModified()) + entry.setTime(Files.getLastModifiedTime(source).toMillis) outputJar.putNextEntry(entry) - val in = new BufferedInputStream(new FileInputStream(source)) + val in = new BufferedInputStream(Files.newInputStream(source)) val buffer = new Array[Byte](BUFFER_SIZE) @@ -71,4 +75,10 @@ object JarTestUtils { in.close() } } + + private def addServiceProvider(interface: String, providerClass: List[String], outputJar: JarOutputStream): Unit = { + outputJar.putNextEntry(new JarEntry(s"$serviceProviderPath/$interface")) + val providerClasses = providerClass.mkString("\n") + outputJar.write(providerClasses.getBytes()) + } } diff --git a/component-scanner/src/test/scala/za/co/absa/hyperdrive/scanner/TestComponentScanner.scala b/component-scanner/src/test/scala/za/co/absa/hyperdrive/scanner/TestComponentScanner.scala index b3c72b71..a10b62ce 100644 --- a/component-scanner/src/test/scala/za/co/absa/hyperdrive/scanner/TestComponentScanner.scala +++ b/component-scanner/src/test/scala/za/co/absa/hyperdrive/scanner/TestComponentScanner.scala @@ -15,55 +15,219 @@ package za.co.absa.hyperdrive.scanner -import java.io.File -import java.nio.file.Files +import java.nio.file.{Files, Paths} -import org.scalatest.{FlatSpec, Matchers} +import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers} +import za.co.absa.hyperdrive.ingestor.api.decoder.StreamDecoderFactoryProvider +import za.co.absa.hyperdrive.ingestor.api.manager.StreamManagerFactoryProvider +import za.co.absa.hyperdrive.ingestor.api.reader.StreamReaderFactoryProvider +import za.co.absa.hyperdrive.ingestor.api.transformer.StreamTransformerFactoryProvider +import za.co.absa.hyperdrive.ingestor.api.writer.StreamWriterFactoryProvider +import za.co.absa.hyperdrive.scanner.dummyjar._ -import scala.reflect.io.Directory - -class TestComponentScanner extends FlatSpec with Matchers { +class TestComponentScanner extends FlatSpec with Matchers with BeforeAndAfter { behavior of "ComponentScanner" - val DUMMYJARPATH = "za/co/absa/hyperdrive/scanner/dummyjar/" - val DUMMYPACKAGE = "za.co.absa.hyperdrive.scanner.dummyjar." + private var baseDir = Paths.get(".") + + private val dummyJarPath = "za/co/absa/hyperdrive/scanner/dummyjar/" + private val dummyPackage = dummyJarPath.replace("/", ".") + + before { + baseDir = Files.createTempDirectory("TestComponentScanner") + } - it should "list API components in the same jar" in { + after { + scala.reflect.io.Path(baseDir.toFile).deleteRecursively() + } + + it should "list components in the same jar" in { // given - val baseDirPath = Files.createTempDirectory("listAllComponentsInSingleJar") - val baseDir = new File(baseDirPath.toUri) val filenames = List( - s"${DUMMYJARPATH}DummyStreamReaderOne.class", - s"${DUMMYJARPATH}DummyStreamReaderOne$$.class", - s"${DUMMYJARPATH}DummyStreamManager.class", - s"${DUMMYJARPATH}DummyStreamManager$$.class", - s"${DUMMYJARPATH}DummyStreamDecoder.class", - s"${DUMMYJARPATH}DummyStreamDecoder$$.class", - s"${DUMMYJARPATH}DummyStreamTransformer.class", - s"${DUMMYJARPATH}DummyStreamTransformer$$.class", - s"${DUMMYJARPATH}DummyStreamWriterOne.class", - s"${DUMMYJARPATH}DummyStreamWriterOne$$.class") - JarTestUtils.createJar(baseDir, "jar1.jar", filenames) + s"${dummyJarPath}DummyStreamReaderOne.class", + s"${dummyJarPath}DummyStreamReaderOne$$.class", + s"${dummyJarPath}DummyStreamReaderOneLoader.class", + s"${dummyJarPath}DummyStreamReaderTwo.class", + s"${dummyJarPath}DummyStreamReaderTwo$$.class", + s"${dummyJarPath}DummyStreamReaderTwoLoader.class", + s"${dummyJarPath}DummyStreamManager.class", + s"${dummyJarPath}DummyStreamManager$$.class", + s"${dummyJarPath}DummyStreamManagerLoader.class", + s"${dummyJarPath}DummyStreamDecoder.class", + s"${dummyJarPath}DummyStreamDecoder$$.class", + s"${dummyJarPath}DummyStreamDecoderLoader.class", + s"${dummyJarPath}DummyStreamTransformer.class", + s"${dummyJarPath}DummyStreamTransformer$$.class", + s"${dummyJarPath}DummyStreamTransformerLoader.class", + s"${dummyJarPath}DummyStreamWriterOne.class", + s"${dummyJarPath}DummyStreamWriterOne$$.class", + s"${dummyJarPath}DummyStreamWriterOneLoader.class" + ) + + val serviceProviders = Map( + classOf[StreamReaderFactoryProvider].getName -> List( + s"${dummyPackage}DummyStreamReaderOneLoader", + s"${dummyPackage}DummyStreamReaderTwoLoader"), + classOf[StreamManagerFactoryProvider].getName -> List(s"${dummyPackage}DummyStreamManagerLoader"), + classOf[StreamDecoderFactoryProvider].getName -> List(s"${dummyPackage}DummyStreamDecoderLoader"), + classOf[StreamTransformerFactoryProvider].getName -> List(s"${dummyPackage}DummyStreamTransformerLoader"), + classOf[StreamWriterFactoryProvider].getName -> List(s"${dummyPackage}DummyStreamWriterOneLoader")) + + JarTestUtils.createJar(baseDir, "jar1.jar", filenames, serviceProviders) + + // when + val components = ComponentScanner.getComponents(baseDir).get + + // then + val expectedJarPath = baseDir.resolve("jar1.jar").toAbsolutePath + components.readers should contain theSameElementsAs List( + ComponentDescriptor(DummyStreamReaderOne, s"${dummyPackage}DummyStreamReaderOne$$", expectedJarPath), + ComponentDescriptor(DummyStreamReaderTwo, s"${dummyPackage}DummyStreamReaderTwo$$", expectedJarPath)) + components.managers should contain only + ComponentDescriptor(DummyStreamManager, s"${dummyPackage}DummyStreamManager$$", expectedJarPath) + components.decoders should contain only + ComponentDescriptor(DummyStreamDecoder, s"${dummyPackage}DummyStreamDecoder$$", expectedJarPath) + components.transformers should contain only + ComponentDescriptor(DummyStreamTransformer, s"${dummyPackage}DummyStreamTransformer$$", expectedJarPath) + components.writers should contain only + ComponentDescriptor(DummyStreamWriterOne, s"${dummyPackage}DummyStreamWriterOne$$", expectedJarPath) + } + + it should "list components in multiple jars" in { + // given + val filesJar1 = List( + s"${dummyJarPath}DummyStreamReaderOne.class", + s"${dummyJarPath}DummyStreamReaderOne$$.class", + s"${dummyJarPath}DummyStreamWriterOne.class", + s"${dummyJarPath}DummyStreamWriterOne$$.class") + val serviceProviders1 = Map( + classOf[StreamReaderFactoryProvider].getName -> List(s"${dummyPackage}DummyStreamReaderOneLoader"), + classOf[StreamWriterFactoryProvider].getName -> List(s"${dummyPackage}DummyStreamWriterOneLoader") + ) + JarTestUtils.createJar(baseDir, "jar1.jar", filesJar1, serviceProviders1) + + val filesJar2 = List( + s"${dummyJarPath}DummyStreamReaderTwo.class", + s"${dummyJarPath}DummyStreamReaderTwo$$.class", + s"${dummyJarPath}DummyStreamWriterTwo.class", + s"${dummyJarPath}DummyStreamWriterTwo$$.class") + val serviceProviders2 = Map( + classOf[StreamReaderFactoryProvider].getName -> List(s"${dummyPackage}DummyStreamReaderTwoLoader"), + classOf[StreamWriterFactoryProvider].getName -> List(s"${dummyPackage}DummyStreamWriterTwoLoader") + ) + JarTestUtils.createJar(baseDir, "jar2.jar", filesJar2, serviceProviders2) + + // when + val components = ComponentScanner.getComponents(baseDir).get + + // then + val expectedJar1 = baseDir.resolve("jar1.jar").toAbsolutePath + val expectedJar2 = baseDir.resolve("jar2.jar").toAbsolutePath + components.readers should contain theSameElementsAs List( + ComponentDescriptor(DummyStreamReaderOne, s"${dummyPackage}DummyStreamReaderOne$$", expectedJar1), + ComponentDescriptor(DummyStreamReaderTwo, s"${dummyPackage}DummyStreamReaderTwo$$", expectedJar2) + ) + + components.writers should contain theSameElementsAs List( + ComponentDescriptor(DummyStreamWriterOne, s"${dummyPackage}DummyStreamWriterOne$$", expectedJar1), + ComponentDescriptor(DummyStreamWriterTwo, s"${dummyPackage}DummyStreamWriterTwo$$", expectedJar2) + ) + } + + it should "return an empty list if the given directory contains only jar files without class files" in { + // given + JarTestUtils.createJar(baseDir, "jar1.jar", List()) + + // when + val components = ComponentScanner.getComponents(baseDir).get + + // then + components.readers shouldBe empty + components.decoders shouldBe empty + components.transformers shouldBe empty + components.writers shouldBe empty + components.managers shouldBe empty + } + + it should "skip but not fail if a jar is not a zip file" in { + // given + Files.createTempFile(Paths.get(baseDir.toUri), "notAZipFile", ".jar") + + val filesJar = List( + s"${dummyJarPath}DummyStreamWriterTwo.class", + s"${dummyJarPath}DummyStreamWriterTwo$$.class") + val serviceProviders = Map( + classOf[StreamWriterFactoryProvider].getName -> List(s"${dummyPackage}DummyStreamWriterTwoLoader") + ) + JarTestUtils.createJar(baseDir, "jar2.jar", filesJar, serviceProviders) + + // when + val components = ComponentScanner.getComponents(baseDir).get + + // then + val expectedJar = baseDir.resolve("jar2.jar").toAbsolutePath + components.writers should contain only + ComponentDescriptor(DummyStreamWriterTwo, s"${dummyPackage}DummyStreamWriterTwo$$", expectedJar) + } + + it should "skip but not fail if the SPI points to a non-existent implementation" in { + // given + val filesFakeJar = List( + s"${dummyJarPath}DummyStreamWriterOne.class", + s"${dummyJarPath}DummyStreamWriterOne$$.class") + val fakeServiceProviders = Map( + classOf[StreamWriterFactoryProvider].getName -> List(s"${dummyPackage}NonExistentLoader") + ) + JarTestUtils.createJar(baseDir, "jar1.jar", filesFakeJar, fakeServiceProviders) + + val filesJar = List( + s"${dummyJarPath}DummyStreamWriterTwo.class", + s"${dummyJarPath}DummyStreamWriterTwo$$.class") + val serviceProviders = Map( + classOf[StreamWriterFactoryProvider].getName -> List(s"${dummyPackage}DummyStreamWriterTwoLoader") + ) + JarTestUtils.createJar(baseDir, "jar2.jar", filesJar, serviceProviders) + + // when + val components = ComponentScanner.getComponents(baseDir).get + + // then + val expectedJar = baseDir.resolve("jar2.jar").toAbsolutePath + components.writers should contain only + ComponentDescriptor(DummyStreamWriterTwo, s"${dummyPackage}DummyStreamWriterTwo$$", expectedJar) + } + + + it should "return a failure if the given path does not exist" in { + // given + val baseDirPath = Files.createTempDirectory("directorynotexist") + Files.delete(baseDirPath) + + // when + val result = ComponentScanner.getComponents(baseDirPath) + + // then + result.isFailure shouldBe true + result.failed.get.getClass shouldBe classOf[IllegalArgumentException] + result.failed.get.getMessage should fullyMatch regex "Directory .*directorynotexist.* does not exist" + } + + it should "return a failure if the given path is not a directory" in { + // given + val anyFilePath = Files.createTempFile("anyFile", ".tmp") // when - val readers = ComponentScanner.getStreamReaderComponents(baseDir).get - val managers = ComponentScanner.getStreamManagerComponents(baseDir).get - val decoders = ComponentScanner.getStreamDecoderComponents(baseDir).get - val transformers = ComponentScanner.getStreamTransformerComponents(baseDir).get - val writers = ComponentScanner.getStreamWriterComponents(baseDir).get + val result = ComponentScanner.getComponents(anyFilePath) // then - val expectedJarPath = baseDir.getAbsolutePath + "/jar1.jar" - readers should contain only ComponentInfo(s"${DUMMYPACKAGE}DummyStreamReaderOne$$", s"${DUMMYPACKAGE}DummyStreamReaderOne", expectedJarPath) - managers should contain only ComponentInfo(s"${DUMMYPACKAGE}DummyStreamManager$$", s"${DUMMYPACKAGE}DummyStreamManager", expectedJarPath) - decoders should contain only ComponentInfo(s"${DUMMYPACKAGE}DummyStreamDecoder$$", s"${DUMMYPACKAGE}DummyStreamDecoder", expectedJarPath) - transformers should contain only ComponentInfo(s"${DUMMYPACKAGE}DummyStreamTransformer$$", s"${DUMMYPACKAGE}DummyStreamTransformer", expectedJarPath) - writers should contain only ComponentInfo(s"${DUMMYPACKAGE}DummyStreamWriterOne$$", s"${DUMMYPACKAGE}DummyStreamWriterOne", expectedJarPath) + result.isFailure shouldBe true + result.failed.get.getClass shouldBe classOf[IllegalArgumentException] + result.failed.get.getMessage should fullyMatch regex "Argument .*anyFile.*tmp is not a directory" // cleanup - new Directory(baseDir).deleteRecursively() + Files.delete(anyFilePath) } } diff --git a/component-scanner/src/test/scala/za/co/absa/hyperdrive/scanner/TestObjectScanner.scala b/component-scanner/src/test/scala/za/co/absa/hyperdrive/scanner/TestObjectScanner.scala deleted file mode 100644 index 8477868b..00000000 --- a/component-scanner/src/test/scala/za/co/absa/hyperdrive/scanner/TestObjectScanner.scala +++ /dev/null @@ -1,249 +0,0 @@ -/* - * Copyright 2018 ABSA Group Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package za.co.absa.hyperdrive.scanner - -import java.io.File -import java.nio.file.Files - -import org.scalatest.{FlatSpec, Matchers} -import za.co.absa.hyperdrive.ingestor.api.decoder.StreamDecoderFactory -import za.co.absa.hyperdrive.ingestor.api.manager.StreamManagerFactory -import za.co.absa.hyperdrive.ingestor.api.reader.StreamReaderFactory -import za.co.absa.hyperdrive.ingestor.api.transformer.StreamTransformerFactory -import za.co.absa.hyperdrive.ingestor.api.writer.StreamWriterFactory - -import scala.reflect.io.Directory -import scala.reflect.runtime.{universe => ru} - - -class TestObjectScanner extends FlatSpec with Matchers { - - behavior of "ObjectScanner" - - val DUMMYJARPATH = "za/co/absa/hyperdrive/scanner/dummyjar/" - val DUMMYPACKAGE = "za.co.absa.hyperdrive.scanner.dummyjar." - - it should "list objects in the same jar" in { - // given - val baseDirPath = Files.createTempDirectory("listAllComponentsInSingleJar") - val baseDir = new File(baseDirPath.toUri) - val filenames = List( - s"${DUMMYJARPATH}DummyStreamReaderOne.class", - s"${DUMMYJARPATH}DummyStreamReaderOne$$.class", - s"${DUMMYJARPATH}DummyStreamManager.class", - s"${DUMMYJARPATH}DummyStreamManager$$.class", - s"${DUMMYJARPATH}DummyStreamDecoder.class", - s"${DUMMYJARPATH}DummyStreamDecoder$$.class", - s"${DUMMYJARPATH}DummyStreamTransformer.class", - s"${DUMMYJARPATH}DummyStreamTransformer$$.class", - s"${DUMMYJARPATH}DummyStreamWriterOne.class", - s"${DUMMYJARPATH}DummyStreamWriterOne$$.class") - JarTestUtils.createJar(baseDir, "jar1.jar", filenames) - - // when - val readers = ObjectScanner.getObjectsInfo(baseDir, ru.symbolOf[StreamReaderFactory]).get - val managers = ObjectScanner.getObjectsInfo(baseDir, ru.symbolOf[StreamManagerFactory]).get - val decoders = ObjectScanner.getObjectsInfo(baseDir, ru.symbolOf[StreamDecoderFactory]).get - val transformers = ObjectScanner.getObjectsInfo(baseDir, ru.symbolOf[StreamTransformerFactory]).get - val writers = ObjectScanner.getObjectsInfo(baseDir, ru.symbolOf[StreamWriterFactory]).get - - // then - val expectedJarPath = baseDir.getAbsolutePath + "/jar1.jar" - readers should contain only ((s"${DUMMYPACKAGE}DummyStreamReaderOne$$", expectedJarPath)) - managers should contain only ((s"${DUMMYPACKAGE}DummyStreamManager$$", expectedJarPath)) - decoders should contain only ((s"${DUMMYPACKAGE}DummyStreamDecoder$$", expectedJarPath)) - transformers should contain only ((s"${DUMMYPACKAGE}DummyStreamTransformer$$", expectedJarPath)) - writers should contain only ((s"${DUMMYPACKAGE}DummyStreamWriterOne$$", expectedJarPath)) - - // cleanup - new Directory(baseDir).deleteRecursively() - } - - it should "list objects in multiple jars" in { - // given - val baseDirPath = Files.createTempDirectory("listMultipleComponentsInMultipleJars") - val baseDir = new File(baseDirPath.toUri) - val filesJar1 = List( - s"${DUMMYJARPATH}DummyStreamReaderOne.class", - s"${DUMMYJARPATH}DummyStreamReaderOne$$.class", - s"${DUMMYJARPATH}DummyStreamWriterOne.class", - s"${DUMMYJARPATH}DummyStreamWriterOne$$.class") - JarTestUtils.createJar(baseDir, "jar1.jar", filesJar1) - - val filesJar2 = List( - s"${DUMMYJARPATH}DummyStreamReaderTwo.class", - s"${DUMMYJARPATH}DummyStreamReaderTwo$$.class", - s"${DUMMYJARPATH}DummyStreamWriterTwo.class", - s"${DUMMYJARPATH}DummyStreamWriterTwo$$.class") - JarTestUtils.createJar(baseDir, "jar2.jar", filesJar2) - - // when - val readers = ObjectScanner.getObjectsInfo(baseDir, ru.symbolOf[StreamReaderFactory]).get - val writers = ObjectScanner.getObjectsInfo(baseDir, ru.symbolOf[StreamWriterFactory]).get - - // then - readers should contain theSameElementsAs List( - (s"${DUMMYPACKAGE}DummyStreamReaderOne$$", baseDir.getAbsolutePath + "/jar1.jar"), - (s"${DUMMYPACKAGE}DummyStreamReaderTwo$$", baseDir.getAbsolutePath + "/jar2.jar")) - - writers should contain theSameElementsAs List( - (s"${DUMMYPACKAGE}DummyStreamWriterOne$$", baseDir.getAbsolutePath + "/jar1.jar"), - (s"${DUMMYPACKAGE}DummyStreamWriterTwo$$", baseDir.getAbsolutePath + "/jar2.jar")) - - // cleanup - new Directory(baseDir).deleteRecursively() - } - - it should "list objects of the same type in the same jar" in { - // given - val baseDirPath = Files.createTempDirectory("listMultipleComponentsInSingleJar") - val baseDir = new File(baseDirPath.toUri) - val files = List( - s"${DUMMYJARPATH}DummyStreamReaderOne.class", - s"${DUMMYJARPATH}DummyStreamReaderOne$$.class", - s"${DUMMYJARPATH}DummyStreamReaderTwo.class", - s"${DUMMYJARPATH}DummyStreamReaderTwo$$.class") - JarTestUtils.createJar(baseDir, "jar1.jar", files) - - // when - val readers = ObjectScanner.getObjectsInfo(baseDir, ru.symbolOf[StreamReaderFactory]).get - - // then - readers should contain theSameElementsAs List( - (s"${DUMMYPACKAGE}DummyStreamReaderOne$$", baseDir.getAbsolutePath + "/jar1.jar"), - (s"${DUMMYPACKAGE}DummyStreamReaderTwo$$", baseDir.getAbsolutePath + "/jar1.jar")) - - // cleanup - new Directory(baseDir).deleteRecursively() - } - - it should "not list any abstract classes or traits" in { - // given - val baseDirPath = Files.createTempDirectory("listNoAbstractClassesOrTraits") - val baseDir = new File(baseDirPath.toUri) - val files = List( - s"${DUMMYJARPATH}DummyStreamReaderOne.class", - s"${DUMMYJARPATH}DummyStreamReaderOne$$.class", - s"${DUMMYJARPATH}DummyStreamReaderTwo.class", - s"${DUMMYJARPATH}DummyStreamReaderTwo$$.class", - s"${DUMMYJARPATH}AbstractDummyStreamReaderFactory.class", - s"${DUMMYJARPATH}DummyTrait.class") - JarTestUtils.createJar(baseDir, "jar1.jar", files) - - // when - val readers = ObjectScanner.getObjectsInfo(baseDir, ru.symbolOf[StreamReaderFactory]).get - - // then - readers should contain theSameElementsAs List( - (s"${DUMMYPACKAGE}DummyStreamReaderOne$$", baseDir.getAbsolutePath + "/jar1.jar"), - (s"${DUMMYPACKAGE}DummyStreamReaderTwo$$", baseDir.getAbsolutePath + "/jar1.jar")) - - // cleanup - new Directory(baseDir).deleteRecursively() - } - - it should "return an empty list if the given directory contains only jar files without class files" in { - // given - val baseDirPath = Files.createTempDirectory("jarswithoutclassfiles") - val baseDir = new File(baseDirPath.toUri) - JarTestUtils.createJar(baseDir, "jar1.jar", List()) - - // when - val readers = ObjectScanner.getObjectsInfo(baseDir, ru.symbolOf[StreamReaderFactory]).get - - // then - readers shouldBe empty - - // cleanup - new Directory(baseDir).deleteRecursively() - } - - it should "list components of jars in subdirectories" in { - // given - val baseDirPath = Files.createTempDirectory("subdirectoriesjars") - val baseDir = new File(baseDirPath.toUri) - val subDirPath1 = Files.createDirectories(baseDirPath.resolve("subdir1").resolve("subdir2")) - val subDirPath2 = Files.createDirectories(baseDirPath.resolve("subdir1").resolve("subdir3")) - Files.createDirectories(baseDirPath.resolve("subdir4")) - - JarTestUtils.createJar(new File(subDirPath1.toUri), "jar1.jar", List(s"${DUMMYJARPATH}DummyStreamReaderOne$$.class")) - JarTestUtils.createJar(new File(subDirPath2.toUri), "jar2.jar", List(s"${DUMMYJARPATH}DummyStreamReaderTwo$$.class")) - - // when - val readers = ObjectScanner.getObjectsInfo(baseDir, ru.symbolOf[StreamReaderFactory]).get - - // then - readers should contain theSameElementsAs List( - (s"${DUMMYPACKAGE}DummyStreamReaderOne$$", subDirPath1.toAbsolutePath.toString + "/jar1.jar"), - (s"${DUMMYPACKAGE}DummyStreamReaderTwo$$", subDirPath2.toAbsolutePath.toString + "/jar2.jar")) - - // cleanup - new Directory(baseDir).deleteRecursively() - } - - - it should "return a failure if the given directory does not exist" in { - // given - val baseDirPath = Files.createTempDirectory("directorynotexist") - val baseDir = new File(baseDirPath.toUri) - new Directory(baseDir).delete() - - // when - val result = ObjectScanner.getObjectsInfo(baseDir, ru.symbolOf[StreamReaderFactory]) - - // then - result.isFailure shouldBe true - result.failed.get.getClass shouldBe classOf[IllegalArgumentException] - result.failed.get.getMessage should fullyMatch regex "Directory .*directorynotexist.* does not exist" - } - - it should "return a failure if the given directory is not a directory" in { - // given - val anyFilePath = Files.createTempFile("anyFile", ".tmp") - val anyFile = new File(anyFilePath.toUri) - - // when - val result = ObjectScanner.getObjectsInfo(anyFile, ru.symbolOf[StreamReaderFactory]) - - // then - result.isFailure shouldBe true - result.failed.get.getClass shouldBe classOf[IllegalArgumentException] - result.failed.get.getMessage should fullyMatch regex "Argument .*anyFile.*tmp is not a directory" - - // cleanup - anyFile.delete() - } - - it should "return a failure if a class file in any jar is invalid" in { - // given - val baseDirPath = Files.createTempDirectory("fakeClass") - val baseDir = new File(baseDirPath.toUri) - val fakeClass = new File(Files.createFile(baseDirPath.resolve("fakeClass.class")).toUri) - JarTestUtils.createJar(baseDir, "jar1.jar", Map(fakeClass -> "fakeClass.class")) - - // when - val result = ObjectScanner.getObjectsInfo(baseDir, ru.symbolOf[StreamReaderFactory]) - - // then - result.isFailure shouldBe true - result.failed.get.getClass shouldBe classOf[ScalaReflectionException] - result.failed.get.getMessage shouldBe "class fakeClass not found." - - // cleanup - new Directory(baseDir).deleteRecursively() - } -} - diff --git a/component-scanner/src/test/scala/za/co/absa/hyperdrive/scanner/dummyjar/DummyAttributes.scala b/component-scanner/src/test/scala/za/co/absa/hyperdrive/scanner/dummyjar/DummyAttributes.scala new file mode 100644 index 00000000..a1b66adf --- /dev/null +++ b/component-scanner/src/test/scala/za/co/absa/hyperdrive/scanner/dummyjar/DummyAttributes.scala @@ -0,0 +1,36 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.hyperdrive.scanner.dummyjar + +import za.co.absa.hyperdrive.ingestor.api.{HasComponentAttributes, PropertyMetadata} + +trait DummyAttributes extends HasComponentAttributes { + /** + * @return a human readable name of the component. + */ + override def getName: String = "" + + /** + * @return a description for the component. + */ + override def getDescription: String = "" + + /** + * @return a map describing configuration properties for this component. The keys have to be unique to avoid + * name clashes with properties from other components. + */ + override def getProperties: Map[String, PropertyMetadata] = Map() +} diff --git a/component-scanner/src/test/scala/za/co/absa/hyperdrive/scanner/dummyjar/DummyStreamDecoder.scala b/component-scanner/src/test/scala/za/co/absa/hyperdrive/scanner/dummyjar/DummyStreamDecoder.scala index 838b99b0..546dfc38 100644 --- a/component-scanner/src/test/scala/za/co/absa/hyperdrive/scanner/dummyjar/DummyStreamDecoder.scala +++ b/component-scanner/src/test/scala/za/co/absa/hyperdrive/scanner/dummyjar/DummyStreamDecoder.scala @@ -18,12 +18,16 @@ package za.co.absa.hyperdrive.scanner.dummyjar import org.apache.commons.configuration2.Configuration import org.apache.spark.sql.DataFrame import org.apache.spark.sql.streaming.DataStreamReader -import za.co.absa.hyperdrive.ingestor.api.decoder.{StreamDecoder, StreamDecoderFactory} +import za.co.absa.hyperdrive.ingestor.api.decoder.{StreamDecoder, StreamDecoderFactory, StreamDecoderFactoryProvider} class DummyStreamDecoder extends StreamDecoder { override def decode(streamReader: DataStreamReader): DataFrame = ??? } -object DummyStreamDecoder extends StreamDecoderFactory { +object DummyStreamDecoder extends StreamDecoderFactory with DummyAttributes { override def apply(config: Configuration): StreamDecoder = ??? } + +class DummyStreamDecoderLoader extends StreamDecoderFactoryProvider { + override def getComponentFactory: StreamDecoderFactory = DummyStreamDecoder +} diff --git a/component-scanner/src/test/scala/za/co/absa/hyperdrive/scanner/dummyjar/DummyStreamManager.scala b/component-scanner/src/test/scala/za/co/absa/hyperdrive/scanner/dummyjar/DummyStreamManager.scala index 38d7bc6f..13be2060 100644 --- a/component-scanner/src/test/scala/za/co/absa/hyperdrive/scanner/dummyjar/DummyStreamManager.scala +++ b/component-scanner/src/test/scala/za/co/absa/hyperdrive/scanner/dummyjar/DummyStreamManager.scala @@ -19,7 +19,7 @@ import org.apache.commons.configuration2 import org.apache.hadoop.conf.Configuration import org.apache.spark.sql.Row import org.apache.spark.sql.streaming.{DataStreamReader, DataStreamWriter} -import za.co.absa.hyperdrive.ingestor.api.manager.{StreamManager, StreamManagerFactory} +import za.co.absa.hyperdrive.ingestor.api.manager.{StreamManager, StreamManagerFactory, StreamManagerFactoryProvider} class DummyStreamManager(topic: String) extends StreamManager { override def configure(streamReader: DataStreamReader, configuration: Configuration): DataStreamReader = ??? @@ -27,6 +27,10 @@ class DummyStreamManager(topic: String) extends StreamManager { override def configure(streamWriter: DataStreamWriter[Row], configuration: Configuration): DataStreamWriter[Row] = ??? } -object DummyStreamManager extends StreamManagerFactory { +object DummyStreamManager extends StreamManagerFactory with DummyAttributes { override def apply(config: configuration2.Configuration): StreamManager = ??? } + +class DummyStreamManagerLoader extends StreamManagerFactoryProvider { + override def getComponentFactory: StreamManagerFactory = DummyStreamManager +} diff --git a/component-scanner/src/test/scala/za/co/absa/hyperdrive/scanner/dummyjar/DummyStreamReaderOne.scala b/component-scanner/src/test/scala/za/co/absa/hyperdrive/scanner/dummyjar/DummyStreamReaderOne.scala index 8508c8ca..ad527f22 100644 --- a/component-scanner/src/test/scala/za/co/absa/hyperdrive/scanner/dummyjar/DummyStreamReaderOne.scala +++ b/component-scanner/src/test/scala/za/co/absa/hyperdrive/scanner/dummyjar/DummyStreamReaderOne.scala @@ -18,12 +18,16 @@ package za.co.absa.hyperdrive.scanner.dummyjar import org.apache.commons.configuration2.Configuration import org.apache.spark.sql.SparkSession import org.apache.spark.sql.streaming.DataStreamReader -import za.co.absa.hyperdrive.ingestor.api.reader.StreamReader +import za.co.absa.hyperdrive.ingestor.api.reader.{StreamReader, StreamReaderFactory, StreamReaderFactoryProvider} class DummyStreamReaderOne extends DummyTrait { override def read(spark: SparkSession): DataStreamReader = ??? } -object DummyStreamReaderOne extends AbstractDummyStreamReaderFactory { +object DummyStreamReaderOne extends AbstractDummyStreamReaderFactory with DummyAttributes { override def apply(conf: Configuration): StreamReader = ??? } + +class DummyStreamReaderOneLoader extends StreamReaderFactoryProvider { + override def getComponentFactory: StreamReaderFactory = DummyStreamReaderOne +} diff --git a/component-scanner/src/test/scala/za/co/absa/hyperdrive/scanner/dummyjar/DummyStreamReaderTwo.scala b/component-scanner/src/test/scala/za/co/absa/hyperdrive/scanner/dummyjar/DummyStreamReaderTwo.scala index 318aedb0..5c26756e 100644 --- a/component-scanner/src/test/scala/za/co/absa/hyperdrive/scanner/dummyjar/DummyStreamReaderTwo.scala +++ b/component-scanner/src/test/scala/za/co/absa/hyperdrive/scanner/dummyjar/DummyStreamReaderTwo.scala @@ -18,13 +18,16 @@ package za.co.absa.hyperdrive.scanner.dummyjar import org.apache.commons.configuration2.Configuration import org.apache.spark.sql.SparkSession import org.apache.spark.sql.streaming.DataStreamReader -import za.co.absa.hyperdrive.ingestor.api.reader.StreamReader +import za.co.absa.hyperdrive.ingestor.api.reader.{StreamReader, StreamReaderFactory, StreamReaderFactoryProvider} class DummyStreamReaderTwo extends DummyTrait { override def read(spark: SparkSession): DataStreamReader = ??? } -object DummyStreamReaderTwo extends AbstractDummyStreamReaderFactory { +object DummyStreamReaderTwo extends AbstractDummyStreamReaderFactory with DummyAttributes { override def apply(conf: Configuration): StreamReader = ??? } +class DummyStreamReaderTwoLoader extends StreamReaderFactoryProvider { + override def getComponentFactory: StreamReaderFactory = DummyStreamReaderTwo +} diff --git a/component-scanner/src/test/scala/za/co/absa/hyperdrive/scanner/dummyjar/DummyStreamTransformer.scala b/component-scanner/src/test/scala/za/co/absa/hyperdrive/scanner/dummyjar/DummyStreamTransformer.scala index d0591815..e6536e5f 100644 --- a/component-scanner/src/test/scala/za/co/absa/hyperdrive/scanner/dummyjar/DummyStreamTransformer.scala +++ b/component-scanner/src/test/scala/za/co/absa/hyperdrive/scanner/dummyjar/DummyStreamTransformer.scala @@ -17,12 +17,16 @@ package za.co.absa.hyperdrive.scanner.dummyjar import org.apache.commons.configuration2.Configuration import org.apache.spark.sql.DataFrame -import za.co.absa.hyperdrive.ingestor.api.transformer.{StreamTransformer, StreamTransformerFactory} +import za.co.absa.hyperdrive.ingestor.api.transformer.{StreamTransformer, StreamTransformerFactory, StreamTransformerFactoryProvider} class DummyStreamTransformer extends StreamTransformer { override def transform(streamData: DataFrame): DataFrame = ??? } -object DummyStreamTransformer extends StreamTransformerFactory { +object DummyStreamTransformer extends StreamTransformerFactory with DummyAttributes { override def apply(config: Configuration): StreamTransformer = ??? } + +class DummyStreamTransformerLoader extends StreamTransformerFactoryProvider { + override def getComponentFactory: StreamTransformerFactory = DummyStreamTransformer +} diff --git a/component-scanner/src/test/scala/za/co/absa/hyperdrive/scanner/dummyjar/DummyStreamWriterOne.scala b/component-scanner/src/test/scala/za/co/absa/hyperdrive/scanner/dummyjar/DummyStreamWriterOne.scala index 31cbb04c..1c4b6286 100644 --- a/component-scanner/src/test/scala/za/co/absa/hyperdrive/scanner/dummyjar/DummyStreamWriterOne.scala +++ b/component-scanner/src/test/scala/za/co/absa/hyperdrive/scanner/dummyjar/DummyStreamWriterOne.scala @@ -19,12 +19,16 @@ import org.apache.commons.configuration2.Configuration import org.apache.spark.sql.DataFrame import org.apache.spark.sql.streaming.StreamingQuery import za.co.absa.hyperdrive.ingestor.api.manager.StreamManager -import za.co.absa.hyperdrive.ingestor.api.writer.{StreamWriter, StreamWriterFactory} +import za.co.absa.hyperdrive.ingestor.api.writer.{StreamWriter, StreamWriterFactory, StreamWriterFactoryProvider} class DummyStreamWriterOne extends StreamWriter { override def write(dataFrame: DataFrame, streamManager: StreamManager): StreamingQuery = ??? } -object DummyStreamWriterOne extends StreamWriterFactory { +object DummyStreamWriterOne extends StreamWriterFactory with DummyAttributes { override def apply(config: Configuration): StreamWriter = ??? } + +class DummyStreamWriterOneLoader extends StreamWriterFactoryProvider { + override def getComponentFactory: StreamWriterFactory = DummyStreamWriterOne +} diff --git a/component-scanner/src/test/scala/za/co/absa/hyperdrive/scanner/dummyjar/DummyStreamWriterTwo.scala b/component-scanner/src/test/scala/za/co/absa/hyperdrive/scanner/dummyjar/DummyStreamWriterTwo.scala index 68d112de..2f5e4f45 100644 --- a/component-scanner/src/test/scala/za/co/absa/hyperdrive/scanner/dummyjar/DummyStreamWriterTwo.scala +++ b/component-scanner/src/test/scala/za/co/absa/hyperdrive/scanner/dummyjar/DummyStreamWriterTwo.scala @@ -19,12 +19,16 @@ import org.apache.commons.configuration2.Configuration import org.apache.spark.sql.DataFrame import org.apache.spark.sql.streaming.StreamingQuery import za.co.absa.hyperdrive.ingestor.api.manager.StreamManager -import za.co.absa.hyperdrive.ingestor.api.writer.{StreamWriter, StreamWriterFactory} +import za.co.absa.hyperdrive.ingestor.api.writer.{StreamWriter, StreamWriterFactory, StreamWriterFactoryProvider} class DummyStreamWriterTwo extends StreamWriter { override def write(dataFrame: DataFrame, streamManager: StreamManager): StreamingQuery = ??? } -object DummyStreamWriterTwo extends StreamWriterFactory { +object DummyStreamWriterTwo extends StreamWriterFactory with DummyAttributes { override def apply(config: Configuration): StreamWriter = ??? } + +class DummyStreamWriterTwoLoader extends StreamWriterFactoryProvider { + override def getComponentFactory: StreamWriterFactory = DummyStreamWriterTwo +} diff --git a/ingestor-default/src/main/resources/META-INF/services/za.co.absa.hyperdrive.ingestor.api.decoder.StreamDecoderFactoryProvider b/ingestor-default/src/main/resources/META-INF/services/za.co.absa.hyperdrive.ingestor.api.decoder.StreamDecoderFactoryProvider new file mode 100644 index 00000000..4016ba81 --- /dev/null +++ b/ingestor-default/src/main/resources/META-INF/services/za.co.absa.hyperdrive.ingestor.api.decoder.StreamDecoderFactoryProvider @@ -0,0 +1,15 @@ +# +# Copyright 2018 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +za.co.absa.hyperdrive.ingestor.implementation.decoder.avro.confluent.ConfluentAvroKafkaStreamDecoderLoader \ No newline at end of file diff --git a/ingestor-default/src/main/resources/META-INF/services/za.co.absa.hyperdrive.ingestor.api.manager.StreamManagerFactoryProvider b/ingestor-default/src/main/resources/META-INF/services/za.co.absa.hyperdrive.ingestor.api.manager.StreamManagerFactoryProvider new file mode 100644 index 00000000..dc4a0572 --- /dev/null +++ b/ingestor-default/src/main/resources/META-INF/services/za.co.absa.hyperdrive.ingestor.api.manager.StreamManagerFactoryProvider @@ -0,0 +1,15 @@ +# +# Copyright 2018 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +za.co.absa.hyperdrive.ingestor.implementation.manager.checkpoint.CheckpointOffsetManagerLoader diff --git a/ingestor-default/src/main/resources/META-INF/services/za.co.absa.hyperdrive.ingestor.api.reader.StreamReaderFactoryProvider b/ingestor-default/src/main/resources/META-INF/services/za.co.absa.hyperdrive.ingestor.api.reader.StreamReaderFactoryProvider new file mode 100644 index 00000000..cf252d77 --- /dev/null +++ b/ingestor-default/src/main/resources/META-INF/services/za.co.absa.hyperdrive.ingestor.api.reader.StreamReaderFactoryProvider @@ -0,0 +1,15 @@ +# +# Copyright 2018 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +za.co.absa.hyperdrive.ingestor.implementation.reader.kafka.KafkaStreamReaderLoader diff --git a/ingestor-default/src/main/resources/META-INF/services/za.co.absa.hyperdrive.ingestor.api.transformer.StreamTransformerFactoryProvider b/ingestor-default/src/main/resources/META-INF/services/za.co.absa.hyperdrive.ingestor.api.transformer.StreamTransformerFactoryProvider new file mode 100644 index 00000000..5c195ff0 --- /dev/null +++ b/ingestor-default/src/main/resources/META-INF/services/za.co.absa.hyperdrive.ingestor.api.transformer.StreamTransformerFactoryProvider @@ -0,0 +1,15 @@ +# +# Copyright 2018 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +za.co.absa.hyperdrive.ingestor.implementation.transformer.column.selection.ColumnSelectorStreamTransformerLoader diff --git a/ingestor-default/src/main/resources/META-INF/services/za.co.absa.hyperdrive.ingestor.api.writer.StreamWriterFactoryProvider b/ingestor-default/src/main/resources/META-INF/services/za.co.absa.hyperdrive.ingestor.api.writer.StreamWriterFactoryProvider new file mode 100644 index 00000000..52d6139e --- /dev/null +++ b/ingestor-default/src/main/resources/META-INF/services/za.co.absa.hyperdrive.ingestor.api.writer.StreamWriterFactoryProvider @@ -0,0 +1,16 @@ +# +# Copyright 2018 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +za.co.absa.hyperdrive.ingestor.implementation.writer.parquet.ParquetStreamWriterLoader +za.co.absa.hyperdrive.ingestor.implementation.writer.parquet.ParquetPartitioningStreamWriterLoader diff --git a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/decoder/avro/confluent/ConfluentAvroKafkaStreamDecoder.scala b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/decoder/avro/confluent/ConfluentAvroKafkaStreamDecoder.scala index 0d4116e8..60dc056c 100644 --- a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/decoder/avro/confluent/ConfluentAvroKafkaStreamDecoder.scala +++ b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/decoder/avro/confluent/ConfluentAvroKafkaStreamDecoder.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.streaming.DataStreamReader import za.co.absa.abris.avro.read.confluent.SchemaManager import za.co.absa.abris.avro.read.confluent.SchemaManager.{PARAM_SCHEMA_NAMESPACE_FOR_RECORD_STRATEGY, PARAM_SCHEMA_NAME_FOR_RECORD_STRATEGY, PARAM_VALUE_SCHEMA_NAMING_STRATEGY, SchemaStorageNamingStrategies} import za.co.absa.hyperdrive.ingestor.api.decoder.{StreamDecoder, StreamDecoderFactory} -import za.co.absa.hyperdrive.shared.configurations.ConfigurationsKeys.AvroKafkaStreamDecoderKeys.{KEY_SCHEMA_REGISTRY_URL, KEY_SCHEMA_REGISTRY_VALUE_NAMING_STRATEGY, KEY_SCHEMA_REGISTRY_VALUE_RECORD_NAME, KEY_SCHEMA_REGISTRY_VALUE_RECORD_NAMESPACE, KEY_SCHEMA_REGISTRY_VALUE_SCHEMA_ID, KEY_TOPIC} +import za.co.absa.hyperdrive.shared.configurations.ConfigurationsKeys.AvroKafkaStreamDecoderKeys._ import za.co.absa.hyperdrive.shared.utils.ConfigUtils.getOrThrow private[decoder] class ConfluentAvroKafkaStreamDecoder(val topic: String, val schemaRegistrySettings: Map[String,String]) extends StreamDecoder { @@ -42,8 +42,8 @@ private[decoder] class ConfluentAvroKafkaStreamDecoder(val topic: String, val sc val schemaRegistryFullSettings = schemaRegistrySettings + (SchemaManager.PARAM_SCHEMA_REGISTRY_TOPIC -> topic) logger.info(s"SchemaRegistry settings: $schemaRegistryFullSettings") - import za.co.absa.abris.avro.functions.from_confluent_avro import org.apache.spark.sql.functions.col + import za.co.absa.abris.avro.functions.from_confluent_avro streamReader .load() .select(from_confluent_avro(col("value"), schemaRegistryFullSettings) as 'data) @@ -51,7 +51,7 @@ private[decoder] class ConfluentAvroKafkaStreamDecoder(val topic: String, val sc } } -object ConfluentAvroKafkaStreamDecoder extends StreamDecoderFactory { +object ConfluentAvroKafkaStreamDecoder extends StreamDecoderFactory with ConfluentAvroKafkaStreamDecoderAttributes { override def apply(config: Configuration): StreamDecoder = { val topic = getTopic(config) diff --git a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/decoder/avro/confluent/ConfluentAvroKafkaStreamDecoderAttributes.scala b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/decoder/avro/confluent/ConfluentAvroKafkaStreamDecoderAttributes.scala new file mode 100644 index 00000000..6ab346f0 --- /dev/null +++ b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/decoder/avro/confluent/ConfluentAvroKafkaStreamDecoderAttributes.scala @@ -0,0 +1,36 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.hyperdrive.ingestor.implementation.decoder.avro.confluent + +import za.co.absa.abris.avro.read.confluent.SchemaManager.{PARAM_SCHEMA_NAMESPACE_FOR_RECORD_STRATEGY, PARAM_SCHEMA_NAME_FOR_RECORD_STRATEGY, PARAM_SCHEMA_REGISTRY_URL, PARAM_VALUE_SCHEMA_ID, PARAM_VALUE_SCHEMA_NAMING_STRATEGY} +import za.co.absa.hyperdrive.ingestor.api.{HasComponentAttributes, PropertyMetadata} + +trait ConfluentAvroKafkaStreamDecoderAttributes extends HasComponentAttributes { + + override def getName: String = "Confluent Avro Stream Decoder" + + override def getDescription: String = "Decoder for Kafka messages in Avro format. The decoder connects to a Schema Registry instance to retrieve the schema information." + + override def getProperties: Map[String, PropertyMetadata] = Map( + PARAM_SCHEMA_REGISTRY_URL -> PropertyMetadata("Schema Registry URL", None, required = true), + PARAM_VALUE_SCHEMA_ID -> PropertyMetadata("Schema Id", Some("Specific Id of schema or \"latest\""), required = true), + PARAM_VALUE_SCHEMA_NAMING_STRATEGY -> PropertyMetadata("Schema naming strategy", + Some("Subject name strategy of Schema Registry. Must be one of \"topic.name\", \"record.name\" or \"topic.record.name\""), required = true), + PARAM_SCHEMA_NAME_FOR_RECORD_STRATEGY -> PropertyMetadata("Record name", Some("Record name for naming strategies record.name or topic.record.name"), required = false), + PARAM_SCHEMA_NAMESPACE_FOR_RECORD_STRATEGY -> PropertyMetadata("Record namespace", Some("Record namespace for naming strategies record.name or topic.record.name"), required = false) + ) + +} diff --git a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/decoder/avro/confluent/ConfluentAvroKafkaStreamDecoderLoader.scala b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/decoder/avro/confluent/ConfluentAvroKafkaStreamDecoderLoader.scala new file mode 100644 index 00000000..ec040d1d --- /dev/null +++ b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/decoder/avro/confluent/ConfluentAvroKafkaStreamDecoderLoader.scala @@ -0,0 +1,22 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.hyperdrive.ingestor.implementation.decoder.avro.confluent + +import za.co.absa.hyperdrive.ingestor.api.decoder.{StreamDecoderFactory, StreamDecoderFactoryProvider} + +class ConfluentAvroKafkaStreamDecoderLoader extends StreamDecoderFactoryProvider { + override def getComponentFactory: StreamDecoderFactory = ConfluentAvroKafkaStreamDecoder +} diff --git a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/manager/checkpoint/CheckpointOffsetManager.scala b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/manager/checkpoint/CheckpointOffsetManager.scala index 9b504699..2824252e 100644 --- a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/manager/checkpoint/CheckpointOffsetManager.scala +++ b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/manager/checkpoint/CheckpointOffsetManager.scala @@ -21,7 +21,7 @@ import org.apache.logging.log4j.LogManager import org.apache.spark.sql.Row import org.apache.spark.sql.streaming.{DataStreamReader, DataStreamWriter} import za.co.absa.hyperdrive.ingestor.api.manager.{StreamManager, StreamManagerFactory} -import za.co.absa.hyperdrive.shared.configurations.ConfigurationsKeys.CheckpointOffsetManagerKeys.{KEY_CHECKPOINT_BASE_LOCATION, KEY_TOPIC} +import za.co.absa.hyperdrive.shared.configurations.ConfigurationsKeys.CheckpointOffsetManagerKeys.KEY_CHECKPOINT_BASE_LOCATION import za.co.absa.hyperdrive.shared.configurations.ConfigurationsKeys.KafkaStreamReaderKeys.{KEY_STARTING_OFFSETS, WORD_STARTING_OFFSETS} import za.co.absa.hyperdrive.shared.utils.ConfigUtils.{getOrNone, getOrThrow} import za.co.absa.hyperdrive.shared.utils.FileUtils @@ -70,7 +70,7 @@ private[manager] class CheckpointOffsetManager(val checkpointLocation: String, } } -object CheckpointOffsetManager extends StreamManagerFactory { +object CheckpointOffsetManager extends StreamManagerFactory with CheckpointOffsetManagerAttributes { override def apply(config: Configuration): StreamManager = { val checkpointLocation = getCheckpointLocation(config) val startingOffsets = getStartingOffsets(config) diff --git a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/manager/checkpoint/CheckpointOffsetManagerAttributes.scala b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/manager/checkpoint/CheckpointOffsetManagerAttributes.scala new file mode 100644 index 00000000..9ba8a56e --- /dev/null +++ b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/manager/checkpoint/CheckpointOffsetManagerAttributes.scala @@ -0,0 +1,37 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.hyperdrive.ingestor.implementation.manager.checkpoint + +import za.co.absa.hyperdrive.ingestor.api.{HasComponentAttributes, PropertyMetadata} +import za.co.absa.hyperdrive.shared.configurations.ConfigurationsKeys.CheckpointOffsetManagerKeys.KEY_CHECKPOINT_BASE_LOCATION +import za.co.absa.hyperdrive.shared.configurations.ConfigurationsKeys.KafkaStreamReaderKeys.KEY_STARTING_OFFSETS + +trait CheckpointOffsetManagerAttributes extends HasComponentAttributes { + + override def getName: String = "Checkpoint Offset Manager" + + override def getDescription: String = "Configures the checkpoint location for both reader and writer." + + override def getProperties: Map[String, PropertyMetadata] = { + val checkpointDescription = "Path to the checkpoint location. The checkpoint location has to be unique for each workflow" + val offsetDescription = "The starting offset is only considered if the checkpoint location does not already exist," + + "i.e. the ingestion has not been started yet." + Map( + KEY_CHECKPOINT_BASE_LOCATION -> PropertyMetadata("Checkpoint Location", Some(checkpointDescription), required = true), + KEY_STARTING_OFFSETS -> PropertyMetadata("Starting offset", Some(offsetDescription), required = false) + ) + } +} diff --git a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/manager/checkpoint/CheckpointOffsetManagerLoader.scala b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/manager/checkpoint/CheckpointOffsetManagerLoader.scala new file mode 100644 index 00000000..891d4a4d --- /dev/null +++ b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/manager/checkpoint/CheckpointOffsetManagerLoader.scala @@ -0,0 +1,22 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.hyperdrive.ingestor.implementation.manager.checkpoint + +import za.co.absa.hyperdrive.ingestor.api.manager.{StreamManagerFactory, StreamManagerFactoryProvider} + +class CheckpointOffsetManagerLoader extends StreamManagerFactoryProvider { + override def getComponentFactory: StreamManagerFactory = CheckpointOffsetManager +} diff --git a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/reader/kafka/KafkaStreamReader.scala b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/reader/kafka/KafkaStreamReader.scala index 0c01e5d7..5a244377 100644 --- a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/reader/kafka/KafkaStreamReader.scala +++ b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/reader/kafka/KafkaStreamReader.scala @@ -20,7 +20,8 @@ import org.apache.commons.lang3.StringUtils import org.apache.logging.log4j.LogManager import org.apache.spark.sql.SparkSession import org.apache.spark.sql.streaming.DataStreamReader -import za.co.absa.hyperdrive.ingestor.api.reader.{StreamReader, StreamReaderFactory} +import za.co.absa.hyperdrive.ingestor.api.PropertyMetadata +import za.co.absa.hyperdrive.ingestor.api.reader.{StreamReader, StreamReaderFactory, StreamReaderFactoryProvider} import za.co.absa.hyperdrive.shared.configurations.ConfigurationsKeys.KafkaStreamReaderKeys.{KEY_BROKERS, KEY_TOPIC, rootFactoryOptionalConfKey} import za.co.absa.hyperdrive.shared.utils.ConfigUtils import za.co.absa.hyperdrive.shared.utils.ConfigUtils.{getOrThrow, getSeqOrThrow} @@ -76,7 +77,7 @@ private[reader] class KafkaStreamReader(val topic: String, val brokers: String, } } -object KafkaStreamReader extends StreamReaderFactory { +object KafkaStreamReader extends StreamReaderFactory with KafkaStreamReaderAttributes { private val logger = LogManager.getLogger override def apply(conf: Configuration): StreamReader = { @@ -100,5 +101,5 @@ object KafkaStreamReader extends StreamReaderFactory { private def getExtraOptions(configuration: Configuration): Map[String, String] = ConfigUtils.getPropertySubset(configuration, rootFactoryOptionalConfKey) - private def filterKeysContaining(map: Map[String, String], exclusionToken: String) = map.filterKeys(!_.contains(exclusionToken)) + private def filterKeysContaining(map: Map[String, String], exclusionToken: String) = map.filterKeys(!_.contains(exclusionToken)) } diff --git a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/reader/kafka/KafkaStreamReaderAttributes.scala b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/reader/kafka/KafkaStreamReaderAttributes.scala new file mode 100644 index 00000000..beeeda00 --- /dev/null +++ b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/reader/kafka/KafkaStreamReaderAttributes.scala @@ -0,0 +1,32 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.hyperdrive.ingestor.implementation.reader.kafka + +import za.co.absa.hyperdrive.ingestor.api.{HasComponentAttributes, PropertyMetadata} +import za.co.absa.hyperdrive.shared.configurations.ConfigurationsKeys.KafkaStreamReaderKeys.{KEY_BROKERS, KEY_TOPIC, rootFactoryOptionalConfKey} + +trait KafkaStreamReaderAttributes extends HasComponentAttributes { + + override def getName: String = "Kafka Reader" + + override def getDescription: String = "This component ingests data from a kafka topic." + + override def getProperties: Map[String, PropertyMetadata] = Map( + KEY_TOPIC -> PropertyMetadata("Topic name", Some("Name of the kafka topic"), required = true), + KEY_BROKERS -> PropertyMetadata("Brokers", Some("Comma-separated list of kafka broker urls"), required = true)) + + override def getExtraConfigurationPrefix: Option[String] = Some(rootFactoryOptionalConfKey) +} diff --git a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/reader/kafka/KafkaStreamReaderLoader.scala b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/reader/kafka/KafkaStreamReaderLoader.scala new file mode 100644 index 00000000..bf8b93f3 --- /dev/null +++ b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/reader/kafka/KafkaStreamReaderLoader.scala @@ -0,0 +1,22 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.hyperdrive.ingestor.implementation.reader.kafka + +import za.co.absa.hyperdrive.ingestor.api.reader.{StreamReaderFactory, StreamReaderFactoryProvider} + +class KafkaStreamReaderLoader extends StreamReaderFactoryProvider { + override def getComponentFactory: StreamReaderFactory = KafkaStreamReader +} diff --git a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/column/selection/ColumnSelectorStreamTransformer.scala b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/column/selection/ColumnSelectorStreamTransformer.scala index 63f7f0e8..2c0f7d3c 100644 --- a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/column/selection/ColumnSelectorStreamTransformer.scala +++ b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/column/selection/ColumnSelectorStreamTransformer.scala @@ -17,7 +17,8 @@ package za.co.absa.hyperdrive.ingestor.implementation.transformer.column.selecti import org.apache.commons.configuration2.Configuration import org.apache.logging.log4j.LogManager import org.apache.spark.sql.DataFrame -import za.co.absa.hyperdrive.ingestor.api.transformer.{StreamTransformer, StreamTransformerFactory} +import za.co.absa.hyperdrive.ingestor.api.PropertyMetadata +import za.co.absa.hyperdrive.ingestor.api.transformer.{StreamTransformer, StreamTransformerFactory, StreamTransformerFactoryProvider} import za.co.absa.hyperdrive.shared.configurations.ConfigurationsKeys.ColumnSelectorStreamTransformerKeys.KEY_COLUMNS_TO_SELECT private[transformer] class ColumnSelectorStreamTransformer(val columns: Seq[String]) extends StreamTransformer { @@ -31,7 +32,7 @@ private[transformer] class ColumnSelectorStreamTransformer(val columns: Seq[Stri } } -object ColumnSelectorStreamTransformer extends StreamTransformerFactory { +object ColumnSelectorStreamTransformer extends StreamTransformerFactory with ColumnSelectorStreamTransformerAttributes { override def apply(config: Configuration): StreamTransformer = { val columns = getColumnsAsSequence(config) LogManager.getLogger.info(s"Going to create ColumnSelectorStreamTransformer using: columns='$columns'") diff --git a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/column/selection/ColumnSelectorStreamTransformerAttributes.scala b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/column/selection/ColumnSelectorStreamTransformerAttributes.scala new file mode 100644 index 00000000..09bacf5d --- /dev/null +++ b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/column/selection/ColumnSelectorStreamTransformerAttributes.scala @@ -0,0 +1,30 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.hyperdrive.ingestor.implementation.transformer.column.selection + +import za.co.absa.hyperdrive.ingestor.api.{HasComponentAttributes, PropertyMetadata} +import za.co.absa.hyperdrive.shared.configurations.ConfigurationsKeys.ColumnSelectorStreamTransformerKeys.KEY_COLUMNS_TO_SELECT + +trait ColumnSelectorStreamTransformerAttributes extends HasComponentAttributes { + + override def getName: String = "Column Selector Transformer" + + override def getDescription: String = "This transformer selects only the given columns. Column expressions are not possible" + + override def getProperties: Map[String, PropertyMetadata] = Map( + KEY_COLUMNS_TO_SELECT -> PropertyMetadata("Columns to select", Some("Comma separated list of columns that should be selected. If empty, all columns are selected."), required = false) + ) +} diff --git a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/column/selection/ColumnSelectorStreamTransformerLoader.scala b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/column/selection/ColumnSelectorStreamTransformerLoader.scala new file mode 100644 index 00000000..f901bcd7 --- /dev/null +++ b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/column/selection/ColumnSelectorStreamTransformerLoader.scala @@ -0,0 +1,22 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.hyperdrive.ingestor.implementation.transformer.column.selection + +import za.co.absa.hyperdrive.ingestor.api.transformer.{StreamTransformerFactory, StreamTransformerFactoryProvider} + +class ColumnSelectorStreamTransformerLoader extends StreamTransformerFactoryProvider { + override def getComponentFactory: StreamTransformerFactory = ColumnSelectorStreamTransformer +} diff --git a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/writer/parquet/ParquetPartitioningStreamWriter.scala b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/writer/parquet/ParquetPartitioningStreamWriter.scala index afc3be1e..2c489a2c 100644 --- a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/writer/parquet/ParquetPartitioningStreamWriter.scala +++ b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/writer/parquet/ParquetPartitioningStreamWriter.scala @@ -19,16 +19,17 @@ import java.time.LocalDate import java.time.format.DateTimeFormatter import org.apache.commons.configuration2.Configuration - import org.apache.hadoop.fs.Path import org.apache.logging.log4j.LogManager import org.apache.spark.sql.execution.streaming.{FileStreamSink, MetadataLogFileIndex} import org.apache.spark.sql.functions.{lit, to_date} import org.apache.spark.sql.streaming.{DataStreamWriter, OutputMode, Trigger} import org.apache.spark.sql.{DataFrame, Row, SparkSession} -import za.co.absa.hyperdrive.ingestor.api.writer.{StreamWriter, StreamWriterFactory} +import za.co.absa.hyperdrive.ingestor.api.PropertyMetadata +import za.co.absa.hyperdrive.ingestor.api.writer.{StreamWriter, StreamWriterFactory, StreamWriterFactoryProvider} import za.co.absa.hyperdrive.ingestor.implementation.writer.parquet.AbstractParquetStreamWriter._ import za.co.absa.hyperdrive.shared.configurations.ConfigurationsKeys.ParquetPartitioningStreamWriterKeys._ +import za.co.absa.hyperdrive.shared.configurations.ConfigurationsKeys.ParquetStreamWriterKeys.{KEY_DESTINATION_DIRECTORY, KEY_EXTRA_CONFS_ROOT} private[writer] class ParquetPartitioningStreamWriter(destination: String, reportDate: String, extraConfOptions: Map[String, String]) extends AbstractParquetStreamWriter(destination, extraConfOptions) { @@ -73,7 +74,7 @@ private[writer] class ParquetPartitioningStreamWriter(destination: String, repor } } -object ParquetPartitioningStreamWriter extends StreamWriterFactory { +object ParquetPartitioningStreamWriter extends StreamWriterFactory with ParquetPartitioningStreamWriterAttributes { val reportDateFormat: String = "yyyy-MM-dd" val reportDateFormatter: DateTimeFormatter = DateTimeFormatter.ofPattern(reportDateFormat) @@ -93,4 +94,6 @@ object ParquetPartitioningStreamWriter extends StreamWriterFactory { case _ => reportDateFormatter.format(LocalDate.now()) } } + + override def getExtraConfigurationPrefix: Option[String] = Some(KEY_EXTRA_CONFS_ROOT) } diff --git a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/writer/parquet/ParquetPartitioningStreamWriterAttributes.scala b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/writer/parquet/ParquetPartitioningStreamWriterAttributes.scala new file mode 100644 index 00000000..ab07997f --- /dev/null +++ b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/writer/parquet/ParquetPartitioningStreamWriterAttributes.scala @@ -0,0 +1,31 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.hyperdrive.ingestor.implementation.writer.parquet + +import za.co.absa.hyperdrive.ingestor.api.{HasComponentAttributes, PropertyMetadata} +import za.co.absa.hyperdrive.shared.configurations.ConfigurationsKeys.ParquetStreamWriterKeys.KEY_DESTINATION_DIRECTORY + +trait ParquetPartitioningStreamWriterAttributes extends HasComponentAttributes { + + override def getName: String = "Parquet Partitioning Stream Writer" + + override def getDescription: String = "This writer saves the ingested data in parquet format, partitioned by ingestion date and version. " + + "The version is incremented automatically for each ingestion on the same day" + + override def getProperties: Map[String, PropertyMetadata] = Map( + KEY_DESTINATION_DIRECTORY -> PropertyMetadata("Destination directory", Some("A path to a directory"), required = true) + ) +} diff --git a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/writer/parquet/ParquetPartitioningStreamWriterLoader.scala b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/writer/parquet/ParquetPartitioningStreamWriterLoader.scala new file mode 100644 index 00000000..1e051b9e --- /dev/null +++ b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/writer/parquet/ParquetPartitioningStreamWriterLoader.scala @@ -0,0 +1,22 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.hyperdrive.ingestor.implementation.writer.parquet + +import za.co.absa.hyperdrive.ingestor.api.writer.{StreamWriterFactory, StreamWriterFactoryProvider} + +class ParquetPartitioningStreamWriterLoader extends StreamWriterFactoryProvider { + override def getComponentFactory: StreamWriterFactory = ParquetPartitioningStreamWriter +} diff --git a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/writer/parquet/ParquetStreamWriter.scala b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/writer/parquet/ParquetStreamWriter.scala index 9840c06e..1b7989eb 100644 --- a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/writer/parquet/ParquetStreamWriter.scala +++ b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/writer/parquet/ParquetStreamWriter.scala @@ -19,10 +19,11 @@ import org.apache.commons.configuration2.Configuration import org.apache.logging.log4j.LogManager import za.co.absa.hyperdrive.ingestor.api.writer.{StreamWriter, StreamWriterFactory} import za.co.absa.hyperdrive.ingestor.implementation.writer.parquet.AbstractParquetStreamWriter._ +import za.co.absa.hyperdrive.shared.configurations.ConfigurationsKeys.ParquetStreamWriterKeys.KEY_EXTRA_CONFS_ROOT private[writer] class ParquetStreamWriter(destination: String, extraConfOptions: Map[String, String]) extends AbstractParquetStreamWriter(destination, extraConfOptions) -object ParquetStreamWriter extends StreamWriterFactory { +object ParquetStreamWriter extends StreamWriterFactory with ParquetStreamWriterAttributes { def apply(config: Configuration): StreamWriter = { val destinationDirectory = getDestinationDirectory(config) @@ -32,4 +33,6 @@ object ParquetStreamWriter extends StreamWriterFactory { new ParquetStreamWriter(destinationDirectory, extraOptions) } + + override def getExtraConfigurationPrefix: Option[String] = Some(KEY_EXTRA_CONFS_ROOT) } diff --git a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/writer/parquet/ParquetStreamWriterAttributes.scala b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/writer/parquet/ParquetStreamWriterAttributes.scala new file mode 100644 index 00000000..8c406504 --- /dev/null +++ b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/writer/parquet/ParquetStreamWriterAttributes.scala @@ -0,0 +1,30 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.hyperdrive.ingestor.implementation.writer.parquet + +import za.co.absa.hyperdrive.ingestor.api.{HasComponentAttributes, PropertyMetadata} +import za.co.absa.hyperdrive.shared.configurations.ConfigurationsKeys.ParquetStreamWriterKeys.KEY_DESTINATION_DIRECTORY + +trait ParquetStreamWriterAttributes extends HasComponentAttributes { + + override def getName: String = "Parquet Stream Writer" + + override def getDescription: String = "This writer saves ingested data in Parquet format on a filesystem (e.g. HDFS)" + + override def getProperties: Map[String, PropertyMetadata] = Map( + KEY_DESTINATION_DIRECTORY -> PropertyMetadata("Destination directory", Some("A path to a directory"), required = true) + ) +} diff --git a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/writer/parquet/ParquetStreamWriterLoader.scala b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/writer/parquet/ParquetStreamWriterLoader.scala new file mode 100644 index 00000000..1ff0d8b2 --- /dev/null +++ b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/writer/parquet/ParquetStreamWriterLoader.scala @@ -0,0 +1,22 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.hyperdrive.ingestor.implementation.writer.parquet + +import za.co.absa.hyperdrive.ingestor.api.writer.{StreamWriterFactory, StreamWriterFactoryProvider} + +class ParquetStreamWriterLoader extends StreamWriterFactoryProvider { + override def getComponentFactory: StreamWriterFactory = ParquetStreamWriter +} diff --git a/ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/TestServiceProviderConfiguration.scala b/ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/TestServiceProviderConfiguration.scala new file mode 100644 index 00000000..733f8981 --- /dev/null +++ b/ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/TestServiceProviderConfiguration.scala @@ -0,0 +1,73 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.hyperdrive.ingestor.implementation + +import java.util.ServiceLoader + +import org.scalatest.{FlatSpec, Matchers} +import za.co.absa.hyperdrive.ingestor.api.decoder.{StreamDecoderFactory, StreamDecoderFactoryProvider} +import za.co.absa.hyperdrive.ingestor.api.manager.{StreamManagerFactory, StreamManagerFactoryProvider} +import za.co.absa.hyperdrive.ingestor.api.reader.{StreamReaderFactory, StreamReaderFactoryProvider} +import za.co.absa.hyperdrive.ingestor.api.transformer.{StreamTransformerFactory, StreamTransformerFactoryProvider} +import za.co.absa.hyperdrive.ingestor.api.writer.{StreamWriterFactory, StreamWriterFactoryProvider} +import za.co.absa.hyperdrive.ingestor.api.{ComponentFactory, ComponentFactoryProvider} +import za.co.absa.hyperdrive.ingestor.implementation.decoder.avro.confluent.ConfluentAvroKafkaStreamDecoder +import za.co.absa.hyperdrive.ingestor.implementation.manager.checkpoint.CheckpointOffsetManager +import za.co.absa.hyperdrive.ingestor.implementation.reader.kafka.KafkaStreamReader +import za.co.absa.hyperdrive.ingestor.implementation.transformer.column.selection.ColumnSelectorStreamTransformer +import za.co.absa.hyperdrive.ingestor.implementation.writer.parquet.{ParquetPartitioningStreamWriter, ParquetStreamWriter} + +import scala.reflect.ClassTag + +class TestServiceProviderConfiguration extends FlatSpec with Matchers { + + behavior of "Service Provider Interface (META-INF/services)" + + it should "load ConfluentAvroKafkaStreamDecoder" in { + val factoryProviders = loadServices[StreamDecoderFactoryProvider, StreamDecoderFactory]() + factoryProviders should contain only ConfluentAvroKafkaStreamDecoder + } + + it should "load CheckpointOffsetManager" in { + val factoryProviders = loadServices[StreamManagerFactoryProvider, StreamManagerFactory]() + factoryProviders should contain only CheckpointOffsetManager + } + + it should "load KafkaStreamReader" in { + val factoryProviders = loadServices[StreamReaderFactoryProvider, StreamReaderFactory]() + factoryProviders should contain only KafkaStreamReader + } + + it should "load ColumnSelectorStreamTransformer" in { + val factoryProviders = loadServices[StreamTransformerFactoryProvider, StreamTransformerFactory]() + factoryProviders should contain only ColumnSelectorStreamTransformer + } + + it should "load StreamWriters" in { + val factoryProviders = loadServices[StreamWriterFactoryProvider, StreamWriterFactory]() + factoryProviders should contain theSameElementsAs Seq(ParquetPartitioningStreamWriter, ParquetStreamWriter) + } + + private def loadServices[P <: ComponentFactoryProvider[F], F <: ComponentFactory[_]]()(implicit classTag: ClassTag[P]): Iterable[F] = { + val classLoader = this.getClass.getClassLoader + import scala.collection.JavaConverters._ + ServiceLoader.load(classTag.runtimeClass, classLoader) + .asScala + .map(_.asInstanceOf[P]) + .map(_.getComponentFactory) + .toList + } +}