diff --git a/flink-notes-matt.txt b/flink-notes-matt.txt new file mode 100644 index 0000000..655ef58 --- /dev/null +++ b/flink-notes-matt.txt @@ -0,0 +1,26 @@ + +# This is how I'm running on Flink +# Go to the place where you installed flink, say $HOME/flink-1.7.2/ +# Place this script there. +# NOTE: Files like metastore.json, the .ktr or the fat jar can be also stored in hdfs:// +# File name resolution is done using HDFS. +# Make sure to configure your local environment for HDFS in that case, +# set HADOOP_CONF_DIR for example +# +# Put the fat jar (for now generated with Spoon) in $HOME/software/ +# Put your MetaStore export (for now exported with Spoon) in $HOME/metadata +# +# One argument: The transformation ktr +# + +TRANS=$1 + +bin/flink run \ + --class org.kettle.beam.pipeline.flink.MainFlink \ + --parallelism 1 \ + ~/software/kettle-beam-fat.jar \ + "${TRANS}" \ + file:///home/kettle/metadata/metastore.json \ + "Flink server" \ + org.kettle.beam.steps.io.BeamInputMeta,org.kettle.beam.steps.bq.BeamBQOutputMeta,org.kettle.beam.steps.pubsub.BeamPublishMeta,org.kettle.beam.steps.pubsub.BeamSubscribeMeta,org.kettle.beam.steps.window.BeamTimestampMeta,org.kettle.beam.steps.io.BeamOutputMeta,org.kettle.beam.steps.window.BeamWindowMeta,org.kettle.beam.steps.bq.BeamBQInputMeta \ + org.kettle.beam.xp.RunBeamTransExecutionPoint diff --git a/src/main/java/org/kettle/beam/perspective/BeamHelper.java b/src/main/java/org/kettle/beam/perspective/BeamHelper.java index 02c3383..fea401d 100755 --- a/src/main/java/org/kettle/beam/perspective/BeamHelper.java +++ b/src/main/java/org/kettle/beam/perspective/BeamHelper.java @@ -41,9 +41,14 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.commons.io.FileUtils; import org.apache.commons.lang.StringUtils; +import org.eclipse.core.runtime.IProgressMonitor; +import org.eclipse.jface.dialogs.ProgressMonitorDialog; +import org.eclipse.jface.operation.IRunnableWithProgress; import org.eclipse.swt.SWT; import org.eclipse.swt.custom.CTabItem; import org.eclipse.swt.widgets.MessageBox; +import org.eclipse.swt.widgets.Shell; +import org.kettle.beam.core.metastore.SerializableMetaStore; import org.kettle.beam.metastore.BeamJobConfig; import org.kettle.beam.metastore.BeamJobConfigDialog; import org.kettle.beam.metastore.FileDefinition; @@ -52,13 +57,16 @@ import org.kettle.beam.metastore.RunnerType; import org.kettle.beam.pipeline.KettleBeamPipelineExecutor; import org.kettle.beam.pipeline.TransMetaPipelineConverter; +import org.kettle.beam.pipeline.fatjar.FatJarBuilder; import org.kettle.beam.util.BeamConst; import org.pentaho.di.core.Const; +import org.pentaho.di.core.ProgressMonitorAdapter; import org.pentaho.di.core.exception.KettleException; import org.pentaho.di.core.logging.LogChannelInterface; import org.pentaho.di.core.parameters.UnknownParamException; import org.pentaho.di.core.plugins.KettleURLClassLoader; import org.pentaho.di.core.variables.VariableSpace; +import org.pentaho.di.core.variables.Variables; import org.pentaho.di.i18n.BaseMessages; import org.pentaho.di.trans.TransMeta; import org.pentaho.di.ui.core.dialog.EnterSelectionDialog; @@ -66,12 +74,15 @@ import org.pentaho.di.ui.spoon.ISpoonMenuController; import org.pentaho.di.ui.spoon.Spoon; import org.pentaho.di.ui.spoon.trans.TransGraph; +import org.pentaho.metastore.api.IMetaStore; import org.pentaho.metastore.persist.MetaStoreFactory; import org.pentaho.metastore.util.PentahoDefaults; import org.pentaho.ui.xul.dom.Document; import org.pentaho.ui.xul.impl.AbstractXulEventHandler; import java.io.File; +import java.io.FileOutputStream; +import java.lang.reflect.InvocationTargetException; import java.net.MalformedURLException; import java.net.URL; import java.text.SimpleDateFormat; @@ -427,5 +438,68 @@ public void deleteBeamJobConfig() { } } + public void generateFatJar() { + + final Shell shell = Spoon.getInstance().getShell(); + + // TODO: Ask the user about these 2 next lines... + // + final String filename = "/tmp/kettle-beam-fat.jar"; + final String pluginFolders = "kettle-beam,kettle-json-plugin,kettle-json-plugin,Neo4JOutput"; + + try { + IRunnableWithProgress op = new IRunnableWithProgress() { + public void run( IProgressMonitor monitor ) throws InvocationTargetException, InterruptedException { + try { + + VariableSpace space = Variables.getADefaultVariableSpace(); + List files = BeamConst.findLibraryFilesToStage( null, pluginFolders, true, true ); + files.removeIf( s -> s.contains( "commons-logging" ) || s.contains( "log4j" ) || s.contains("xml-apis") ); + + FatJarBuilder fatJarBuilder = new FatJarBuilder( filename, files ); + fatJarBuilder.buildTargetJar(); + + } catch ( Exception e ) { + throw new InvocationTargetException( e, "Error building fat jar: "+e.getMessage()); + } + } + }; + + ProgressMonitorDialog pmd = new ProgressMonitorDialog( shell ); + pmd.run( true, true, op ); + + MessageBox box = new MessageBox( shell, SWT.CLOSE | SWT.ICON_INFORMATION ); + box.setText( "Fat jar created" ); + box.setMessage( "A fat jar was successfully created : "+filename+Const.CR+"Included plugin folders: "+pluginFolders ); + box.open(); + + } catch(Exception e) { + new ErrorDialog( shell, "Error", "Error creating fat jar", e ); + } + + } + + public void exportMetaStore() { + final Shell shell = Spoon.getInstance().getShell(); + final IMetaStore metaStore = Spoon.getInstance().getMetaStore(); + final String filename = "/tmp/metastore.json"; + + try { + SerializableMetaStore sms = new SerializableMetaStore( metaStore ); + FileOutputStream fos = new FileOutputStream( filename ); + fos.write( sms.toJson().getBytes( "UTF-8" )); + fos.flush(); + fos.close(); + + MessageBox box = new MessageBox( shell, SWT.CLOSE | SWT.ICON_INFORMATION ); + box.setText( "Metastore exported" ); + box.setMessage( "All current metastore entries were exported to "+filename); + box.open(); + + } catch(Exception e) { + new ErrorDialog( shell, "Error", "Error exporting metastore json", e ); + } + + } } diff --git a/src/main/java/org/kettle/beam/pipeline/flink/MainFlink.java b/src/main/java/org/kettle/beam/pipeline/flink/MainFlink.java new file mode 100644 index 0000000..585922f --- /dev/null +++ b/src/main/java/org/kettle/beam/pipeline/flink/MainFlink.java @@ -0,0 +1,35 @@ + package org.kettle.beam.pipeline.flink; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.kettle.beam.core.BeamKettle; +import org.kettle.beam.core.metastore.SerializableMetaStore; +import org.kettle.beam.metastore.BeamJobConfig; +import org.kettle.beam.pipeline.KettleBeamPipelineExecutor; +import org.kettle.beam.pipeline.main.MainBeam; +import org.kettle.beam.util.BeamConst; +import org.pentaho.di.core.Const; +import org.pentaho.di.core.exception.KettleException; +import org.pentaho.di.core.logging.LogChannel; +import org.pentaho.di.core.plugins.PluginInterface; +import org.pentaho.di.core.plugins.PluginRegistry; +import org.pentaho.di.core.plugins.StepPluginType; +import org.pentaho.di.core.xml.XMLHandler; +import org.pentaho.di.trans.TransMeta; +import org.pentaho.metastore.api.IMetaStore; +import org.pentaho.metastore.persist.MetaStoreFactory; +import org.pentaho.metastore.util.PentahoDefaults; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class MainFlink { + public static void main( String[] args ) { + MainBeam.mainMethod(args, "Apache Flink"); + } +} diff --git a/src/main/java/org/kettle/beam/pipeline/main/MainBeam.java b/src/main/java/org/kettle/beam/pipeline/main/MainBeam.java new file mode 100644 index 0000000..36d801a --- /dev/null +++ b/src/main/java/org/kettle/beam/pipeline/main/MainBeam.java @@ -0,0 +1,116 @@ +package org.kettle.beam.pipeline.main; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.kettle.beam.core.BeamKettle; +import org.kettle.beam.core.metastore.SerializableMetaStore; +import org.kettle.beam.metastore.BeamJobConfig; +import org.kettle.beam.pipeline.KettleBeamPipelineExecutor; +import org.kettle.beam.util.BeamConst; +import org.pentaho.di.core.exception.KettleException; +import org.pentaho.di.core.logging.LogChannel; +import org.pentaho.di.core.plugins.PluginInterface; +import org.pentaho.di.core.plugins.PluginRegistry; +import org.pentaho.di.core.plugins.StepPluginType; +import org.pentaho.di.core.xml.XMLHandler; +import org.pentaho.di.trans.TransMeta; +import org.pentaho.metastore.api.IMetaStore; +import org.pentaho.metastore.persist.MetaStoreFactory; +import org.pentaho.metastore.util.PentahoDefaults; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class MainBeam { + + public static final int mainMethod( final String[] args, final String environment ) { + + try { + System.out.println( "Starting clustered transformation execution on environment: '"+environment+"'" ); + + System.out.println( "Transformation ktr / args[0] : " + args[ 0 ] ); + System.out.println( "MetaStore JSON / args[1] : " + args[ 1 ] ); + System.out.println( "Beam Job Config / args[2] : " + args[ 2 ] ); + System.out.println( "Step plugins / args[3] : " + args[ 3 ] ); + System.out.println( "XP plugins / args[4] : " + args[ 4 ] ); + + // Read the transformation XML and MetaStore from Hadoop FS + // + Configuration hadoopConfiguration = new Configuration(); + String transMetaXml = readFileIntoString( args[ 0 ], hadoopConfiguration, "UTF-8" ); + String metaStoreJson = readFileIntoString( args[ 1 ], hadoopConfiguration, "UTF-8" ); + + // Third argument: the beam job config + // + String jobConfigName = args[ 2 ]; + + // Extra plugins to load from the fat jar file... + // + String stepPlugins = args[ 3 ]; + String xpPlugins = args[ 4 ]; + + // Inflate the metaStore... + // + IMetaStore metaStore = new SerializableMetaStore( metaStoreJson ); + + List stepPluginsList = new ArrayList<>( Arrays.asList( stepPlugins.split( "," ) ) ); + List xpPluginsList = new ArrayList<>( Arrays.asList( xpPlugins.split( "," ) ) ); + + System.out.println( ">>>>>> Initializing Kettle runtime (" + stepPluginsList.size() + " step classes, " + xpPluginsList.size() + " XP classes)" ); + + BeamKettle.init( stepPluginsList, xpPluginsList ); + + System.out.println( ">>>>>> Loading transformation metadata" ); + TransMeta transMeta = new TransMeta( XMLHandler.loadXMLString( transMetaXml, TransMeta.XML_TAG ), null ); + transMeta.setMetaStore( metaStore ); + + System.out.println( ">>>>>> Loading Kettle Beam Job Config '" + jobConfigName + "'" ); + MetaStoreFactory configFactory = new MetaStoreFactory<>( BeamJobConfig.class, metaStore, PentahoDefaults.NAMESPACE ); + BeamJobConfig jobConfig = configFactory.loadElement( jobConfigName ); + + String hadoopConfDir = System.getenv( "HADOOP_CONF_DIR" ); + System.out.println( ">>>>>> HADOOP_CONF_DIR='" + hadoopConfDir + "'" ); + + System.out.println( ">>>>>> Building Apache Beam Kettle Pipeline..." ); + PluginRegistry registry = PluginRegistry.getInstance(); + PluginInterface beamInputPlugin = registry.getPlugin( StepPluginType.class, BeamConst.STRING_BEAM_INPUT_PLUGIN_ID ); + if ( beamInputPlugin != null ) { + System.out.println( ">>>>>> Found Beam Input step plugin class loader" ); + } else { + throw new KettleException( "Unable to find Beam Input step plugin, bailing out!" ); + } + ClassLoader pluginClassLoader = PluginRegistry.getInstance().getClassLoader( beamInputPlugin ); + if ( pluginClassLoader != null ) { + System.out.println( ">>>>>> Found Beam Input step plugin class loader" ); + } else { + System.out.println( ">>>>>> NOT found Beam Input step plugin class loader, using system classloader" ); + pluginClassLoader = ClassLoader.getSystemClassLoader(); + } + KettleBeamPipelineExecutor executor = new KettleBeamPipelineExecutor( LogChannel.GENERAL, transMeta, jobConfig, metaStore, pluginClassLoader, stepPluginsList, xpPluginsList ); + + System.out.println( ">>>>>> Pipeline executing starting..." ); + executor.setLoggingMetrics( true ); + executor.execute( true ); + System.out.println( ">>>>>> Execution finished..." ); + return 0; + } catch ( Exception e ) { + System.err.println( "Error running Beam pipeline on '"+environment+"': " + e.getMessage() ); + e.printStackTrace(); + return 1; + } + + } + + private static String readFileIntoString( String filename, Configuration hadoopConfiguration, String encoding ) throws IOException { + Path path = new Path( filename ); + FileSystem fileSystem = FileSystem.get( path.toUri(), hadoopConfiguration ); + FSDataInputStream inputStream = fileSystem.open( path ); + String fileContent = IOUtils.toString( inputStream, encoding ); + return fileContent; + } +} diff --git a/src/main/java/org/kettle/beam/pipeline/spark/MainSpark.java b/src/main/java/org/kettle/beam/pipeline/spark/MainSpark.java index 1a00fbd..ab909a1 100644 --- a/src/main/java/org/kettle/beam/pipeline/spark/MainSpark.java +++ b/src/main/java/org/kettle/beam/pipeline/spark/MainSpark.java @@ -5,11 +5,11 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.kettle.beam.core.BeamDefaults; import org.kettle.beam.core.BeamKettle; import org.kettle.beam.core.metastore.SerializableMetaStore; import org.kettle.beam.metastore.BeamJobConfig; import org.kettle.beam.pipeline.KettleBeamPipelineExecutor; +import org.kettle.beam.pipeline.main.MainBeam; import org.kettle.beam.util.BeamConst; import org.pentaho.di.core.exception.KettleException; import org.pentaho.di.core.logging.LogChannel; @@ -28,88 +28,7 @@ import java.util.List; public class MainSpark { - public static void main( String[] args ) { - - try { - - System.out.println( "Transformation ktr / args[0] : " + args[ 0 ] ); - System.out.println( "MetaStore JSON / args[1] : " + args[ 1 ] ); - System.out.println( "Beam Job Config / args[2] : " + args[ 2 ] ); - System.out.println( "Step plugins / args[3] : " + args[ 3 ] ); - System.out.println( "XP plugins / args[4] : " + args[ 4 ] ); - - // Read the transformation XML and MetaStore from Hadoop FS - // - Configuration hadoopConfiguration = new Configuration(); - String transMetaXml = readFileIntoString( args[ 0 ], hadoopConfiguration, "UTF-8" ); - String metaStoreJson = readFileIntoString( args[ 1 ], hadoopConfiguration, "UTF-8" ); - - // Third argument: the beam job config - // - String jobConfigName = args[ 2 ]; - - // Extra plugins to load from the fat jar file... - // - String stepPlugins = args[ 3 ]; - String xpPlugins = args[ 4 ]; - - // Inflate the metaStore... - // - IMetaStore metaStore = new SerializableMetaStore( metaStoreJson ); - - List stepPluginsList = new ArrayList<>( Arrays.asList( stepPlugins.split( "," ) ) ); - List xpPluginsList = new ArrayList<>( Arrays.asList( xpPlugins.split( "," ) ) ); - - System.out.println( ">>>>>> Initializing Kettle runtime (" + stepPluginsList.size() + " step classes, " + xpPluginsList.size() + " XP classes)" ); - - BeamKettle.init( stepPluginsList, xpPluginsList ); - - System.out.println( ">>>>>> Loading transformation metadata" ); - TransMeta transMeta = new TransMeta( XMLHandler.loadXMLString( transMetaXml, TransMeta.XML_TAG ), null ); - transMeta.setMetaStore( metaStore ); - - System.out.println( ">>>>>> Loading Kettle Beam Job Config '" + jobConfigName + "'" ); - MetaStoreFactory configFactory = new MetaStoreFactory<>( BeamJobConfig.class, metaStore, PentahoDefaults.NAMESPACE ); - BeamJobConfig jobConfig = configFactory.loadElement( jobConfigName ); - - String hadoopConfDir = System.getenv( "HADOOP_CONF_DIR" ); - System.out.println( ">>>>>> HADOOP_CONF_DIR='" + hadoopConfDir + "'" ); - - System.out.println( ">>>>>> Building Apache Beam Kettle Pipeline..." ); - PluginRegistry registry = PluginRegistry.getInstance(); - PluginInterface beamInputPlugin = registry.getPlugin( StepPluginType.class, BeamConst.STRING_BEAM_INPUT_PLUGIN_ID ); - if ( beamInputPlugin != null ) { - System.out.println( ">>>>>> Found Beam Input step plugin class loader" ); - } else { - throw new KettleException( "Unable to find Beam Input step plugin, bailing out!" ); - } - ClassLoader pluginClassLoader = PluginRegistry.getInstance().getClassLoader( beamInputPlugin ); - if ( pluginClassLoader != null ) { - System.out.println( ">>>>>> Found Beam Input step plugin class loader" ); - } else { - System.out.println( ">>>>>> NOT found Beam Input step plugin class loader, using system classloader" ); - pluginClassLoader = ClassLoader.getSystemClassLoader(); - } - KettleBeamPipelineExecutor executor = new KettleBeamPipelineExecutor( LogChannel.GENERAL, transMeta, jobConfig, metaStore, pluginClassLoader, stepPluginsList, xpPluginsList ); - - System.out.println( ">>>>>> Pipeline executing starting..." ); - executor.setLoggingMetrics( true ); - executor.execute( true ); - System.out.println( ">>>>>> Execution finished..." ); - } catch ( Exception e ) { - System.err.println( "Error running Beam pipeline on the Spark master: " + e.getMessage() ); - e.printStackTrace(); - System.exit( 1 ); - } - - } - - private static String readFileIntoString( String filename, Configuration hadoopConfiguration, String encoding ) throws IOException { - Path path = new Path( filename ); - FileSystem fileSystem = FileSystem.get( path.toUri(), hadoopConfiguration ); - FSDataInputStream inputStream = fileSystem.open( path ); - String fileContent = IOUtils.toString( inputStream, encoding ); - return fileContent; + MainBeam.mainMethod(args, "Apache Spark"); } } diff --git a/src/main/java/org/kettle/beam/util/BeamConst.java b/src/main/java/org/kettle/beam/util/BeamConst.java index ce8f177..57fcac4 100644 --- a/src/main/java/org/kettle/beam/util/BeamConst.java +++ b/src/main/java/org/kettle/beam/util/BeamConst.java @@ -154,15 +154,19 @@ public static final List findLibraryFilesToStage( String baseFolder, Str // for ( String pluginFolder : pluginFoldersSet ) { File pluginsFolder = new File( base.toString() + "/plugins/" + pluginFolder ); - Collection pluginFiles = FileUtils.listFiles( pluginsFolder, new String[] { "jar" }, true ); - if ( pluginFiles != null ) { - for ( File file : pluginFiles ) { - String shortName = file.getName(); - if ( !uniqueNames.contains( shortName ) ) { - uniqueNames.add( shortName ); - libraries.add( file.getCanonicalPath() ); + if (pluginsFolder.exists()) { + Collection pluginFiles = FileUtils.listFiles( pluginsFolder, new String[] { "jar" }, true ); + if ( pluginFiles != null ) { + for ( File file : pluginFiles ) { + String shortName = file.getName(); + if ( !uniqueNames.contains( shortName ) ) { + uniqueNames.add( shortName ); + libraries.add( file.getCanonicalPath() ); + } } } + } else { + System.out.println("Warning: couldn't find plugins folder: "+pluginsFolder); } } @@ -172,16 +176,22 @@ public static final List findLibraryFilesToStage( String baseFolder, Str File libFolder = new File( base.toString() + "/lib" ); - Collection files = FileUtils.listFiles( libFolder, new String[] { "jar" }, true ); - if ( files != null ) { - for ( File file : files ) { - String shortName = file.getName(); - if ( !uniqueNames.contains( shortName ) ) { - uniqueNames.add( shortName ); - libraries.add( file.getCanonicalPath() ); - // System.out.println( "Adding library : " + file.getAbsolutePath() ); + if (libFolder.exists()) { + + Collection files = FileUtils.listFiles( libFolder, new String[] { "jar" }, true ); + if ( files != null ) { + for ( File file : files ) { + String shortName = file.getName(); + if ( !uniqueNames.contains( shortName ) ) { + uniqueNames.add( shortName ); + libraries.add( file.getCanonicalPath() ); + // System.out.println( "Adding library : " + file.getAbsolutePath() ); + } } } + } else { + System.out.println("Warning: couldn't find kettle lib folder: "+libFolder); + } } diff --git a/src/main/resources/beam_spoon_overlays.xul b/src/main/resources/beam_spoon_overlays.xul index 8735261..ec6e8e2 100644 --- a/src/main/resources/beam_spoon_overlays.xul +++ b/src/main/resources/beam_spoon_overlays.xul @@ -21,6 +21,9 @@ + + +