Skip to content
This repository has been archived by the owner on Dec 15, 2022. It is now read-only.

Commit

Permalink
Issue #32, #30, #26
Browse files Browse the repository at this point in the history
  • Loading branch information
mattcasters committed Feb 24, 2019
1 parent 74a7b4c commit b8ba942
Show file tree
Hide file tree
Showing 7 changed files with 281 additions and 98 deletions.
26 changes: 26 additions & 0 deletions flink-notes-matt.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@

# This is how I'm running on Flink
# Go to the place where you installed flink, say $HOME/flink-1.7.2/
# Place this script there.
# NOTE: Files like metastore.json, the .ktr or the fat jar can be also stored in hdfs://
# File name resolution is done using HDFS.
# Make sure to configure your local environment for HDFS in that case,
# set HADOOP_CONF_DIR for example
#
# Put the fat jar (for now generated with Spoon) in $HOME/software/
# Put your MetaStore export (for now exported with Spoon) in $HOME/metadata
#
# One argument: The transformation ktr
#

TRANS=$1

bin/flink run \
--class org.kettle.beam.pipeline.flink.MainFlink \
--parallelism 1 \
~/software/kettle-beam-fat.jar \
"${TRANS}" \
file:///home/kettle/metadata/metastore.json \
"Flink server" \
org.kettle.beam.steps.io.BeamInputMeta,org.kettle.beam.steps.bq.BeamBQOutputMeta,org.kettle.beam.steps.pubsub.BeamPublishMeta,org.kettle.beam.steps.pubsub.BeamSubscribeMeta,org.kettle.beam.steps.window.BeamTimestampMeta,org.kettle.beam.steps.io.BeamOutputMeta,org.kettle.beam.steps.window.BeamWindowMeta,org.kettle.beam.steps.bq.BeamBQInputMeta \
org.kettle.beam.xp.RunBeamTransExecutionPoint
74 changes: 74 additions & 0 deletions src/main/java/org/kettle/beam/perspective/BeamHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,14 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.eclipse.core.runtime.IProgressMonitor;
import org.eclipse.jface.dialogs.ProgressMonitorDialog;
import org.eclipse.jface.operation.IRunnableWithProgress;
import org.eclipse.swt.SWT;
import org.eclipse.swt.custom.CTabItem;
import org.eclipse.swt.widgets.MessageBox;
import org.eclipse.swt.widgets.Shell;
import org.kettle.beam.core.metastore.SerializableMetaStore;
import org.kettle.beam.metastore.BeamJobConfig;
import org.kettle.beam.metastore.BeamJobConfigDialog;
import org.kettle.beam.metastore.FileDefinition;
Expand All @@ -52,26 +57,32 @@
import org.kettle.beam.metastore.RunnerType;
import org.kettle.beam.pipeline.KettleBeamPipelineExecutor;
import org.kettle.beam.pipeline.TransMetaPipelineConverter;
import org.kettle.beam.pipeline.fatjar.FatJarBuilder;
import org.kettle.beam.util.BeamConst;
import org.pentaho.di.core.Const;
import org.pentaho.di.core.ProgressMonitorAdapter;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.logging.LogChannelInterface;
import org.pentaho.di.core.parameters.UnknownParamException;
import org.pentaho.di.core.plugins.KettleURLClassLoader;
import org.pentaho.di.core.variables.VariableSpace;
import org.pentaho.di.core.variables.Variables;
import org.pentaho.di.i18n.BaseMessages;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.ui.core.dialog.EnterSelectionDialog;
import org.pentaho.di.ui.core.dialog.ErrorDialog;
import org.pentaho.di.ui.spoon.ISpoonMenuController;
import org.pentaho.di.ui.spoon.Spoon;
import org.pentaho.di.ui.spoon.trans.TransGraph;
import org.pentaho.metastore.api.IMetaStore;
import org.pentaho.metastore.persist.MetaStoreFactory;
import org.pentaho.metastore.util.PentahoDefaults;
import org.pentaho.ui.xul.dom.Document;
import org.pentaho.ui.xul.impl.AbstractXulEventHandler;

import java.io.File;
import java.io.FileOutputStream;
import java.lang.reflect.InvocationTargetException;
import java.net.MalformedURLException;
import java.net.URL;
import java.text.SimpleDateFormat;
Expand Down Expand Up @@ -427,5 +438,68 @@ public void deleteBeamJobConfig() {
}
}

public void generateFatJar() {

final Shell shell = Spoon.getInstance().getShell();

// TODO: Ask the user about these 2 next lines...
//
final String filename = "/tmp/kettle-beam-fat.jar";
final String pluginFolders = "kettle-beam,kettle-json-plugin,kettle-json-plugin,Neo4JOutput";

try {
IRunnableWithProgress op = new IRunnableWithProgress() {
public void run( IProgressMonitor monitor ) throws InvocationTargetException, InterruptedException {
try {

VariableSpace space = Variables.getADefaultVariableSpace();
List<String> files = BeamConst.findLibraryFilesToStage( null, pluginFolders, true, true );
files.removeIf( s -> s.contains( "commons-logging" ) || s.contains( "log4j" ) || s.contains("xml-apis") );

FatJarBuilder fatJarBuilder = new FatJarBuilder( filename, files );
fatJarBuilder.buildTargetJar();

} catch ( Exception e ) {
throw new InvocationTargetException( e, "Error building fat jar: "+e.getMessage());
}
}
};

ProgressMonitorDialog pmd = new ProgressMonitorDialog( shell );
pmd.run( true, true, op );

MessageBox box = new MessageBox( shell, SWT.CLOSE | SWT.ICON_INFORMATION );
box.setText( "Fat jar created" );
box.setMessage( "A fat jar was successfully created : "+filename+Const.CR+"Included plugin folders: "+pluginFolders );
box.open();

} catch(Exception e) {
new ErrorDialog( shell, "Error", "Error creating fat jar", e );
}

}

public void exportMetaStore() {
final Shell shell = Spoon.getInstance().getShell();
final IMetaStore metaStore = Spoon.getInstance().getMetaStore();
final String filename = "/tmp/metastore.json";

try {
SerializableMetaStore sms = new SerializableMetaStore( metaStore );
FileOutputStream fos = new FileOutputStream( filename );
fos.write( sms.toJson().getBytes( "UTF-8" ));
fos.flush();
fos.close();

MessageBox box = new MessageBox( shell, SWT.CLOSE | SWT.ICON_INFORMATION );
box.setText( "Metastore exported" );
box.setMessage( "All current metastore entries were exported to "+filename);
box.open();

} catch(Exception e) {
new ErrorDialog( shell, "Error", "Error exporting metastore json", e );
}

}

}
35 changes: 35 additions & 0 deletions src/main/java/org/kettle/beam/pipeline/flink/MainFlink.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package org.kettle.beam.pipeline.flink;

import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.kettle.beam.core.BeamKettle;
import org.kettle.beam.core.metastore.SerializableMetaStore;
import org.kettle.beam.metastore.BeamJobConfig;
import org.kettle.beam.pipeline.KettleBeamPipelineExecutor;
import org.kettle.beam.pipeline.main.MainBeam;
import org.kettle.beam.util.BeamConst;
import org.pentaho.di.core.Const;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.logging.LogChannel;
import org.pentaho.di.core.plugins.PluginInterface;
import org.pentaho.di.core.plugins.PluginRegistry;
import org.pentaho.di.core.plugins.StepPluginType;
import org.pentaho.di.core.xml.XMLHandler;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.metastore.api.IMetaStore;
import org.pentaho.metastore.persist.MetaStoreFactory;
import org.pentaho.metastore.util.PentahoDefaults;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class MainFlink {
public static void main( String[] args ) {
MainBeam.mainMethod(args, "Apache Flink");
}
}
116 changes: 116 additions & 0 deletions src/main/java/org/kettle/beam/pipeline/main/MainBeam.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package org.kettle.beam.pipeline.main;

import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.kettle.beam.core.BeamKettle;
import org.kettle.beam.core.metastore.SerializableMetaStore;
import org.kettle.beam.metastore.BeamJobConfig;
import org.kettle.beam.pipeline.KettleBeamPipelineExecutor;
import org.kettle.beam.util.BeamConst;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.logging.LogChannel;
import org.pentaho.di.core.plugins.PluginInterface;
import org.pentaho.di.core.plugins.PluginRegistry;
import org.pentaho.di.core.plugins.StepPluginType;
import org.pentaho.di.core.xml.XMLHandler;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.metastore.api.IMetaStore;
import org.pentaho.metastore.persist.MetaStoreFactory;
import org.pentaho.metastore.util.PentahoDefaults;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class MainBeam {

public static final int mainMethod( final String[] args, final String environment ) {

try {
System.out.println( "Starting clustered transformation execution on environment: '"+environment+"'" );

System.out.println( "Transformation ktr / args[0] : " + args[ 0 ] );
System.out.println( "MetaStore JSON / args[1] : " + args[ 1 ] );
System.out.println( "Beam Job Config / args[2] : " + args[ 2 ] );
System.out.println( "Step plugins / args[3] : " + args[ 3 ] );
System.out.println( "XP plugins / args[4] : " + args[ 4 ] );

// Read the transformation XML and MetaStore from Hadoop FS
//
Configuration hadoopConfiguration = new Configuration();
String transMetaXml = readFileIntoString( args[ 0 ], hadoopConfiguration, "UTF-8" );
String metaStoreJson = readFileIntoString( args[ 1 ], hadoopConfiguration, "UTF-8" );

// Third argument: the beam job config
//
String jobConfigName = args[ 2 ];

// Extra plugins to load from the fat jar file...
//
String stepPlugins = args[ 3 ];
String xpPlugins = args[ 4 ];

// Inflate the metaStore...
//
IMetaStore metaStore = new SerializableMetaStore( metaStoreJson );

List<String> stepPluginsList = new ArrayList<>( Arrays.asList( stepPlugins.split( "," ) ) );
List<String> xpPluginsList = new ArrayList<>( Arrays.asList( xpPlugins.split( "," ) ) );

System.out.println( ">>>>>> Initializing Kettle runtime (" + stepPluginsList.size() + " step classes, " + xpPluginsList.size() + " XP classes)" );

BeamKettle.init( stepPluginsList, xpPluginsList );

System.out.println( ">>>>>> Loading transformation metadata" );
TransMeta transMeta = new TransMeta( XMLHandler.loadXMLString( transMetaXml, TransMeta.XML_TAG ), null );
transMeta.setMetaStore( metaStore );

System.out.println( ">>>>>> Loading Kettle Beam Job Config '" + jobConfigName + "'" );
MetaStoreFactory<BeamJobConfig> configFactory = new MetaStoreFactory<>( BeamJobConfig.class, metaStore, PentahoDefaults.NAMESPACE );
BeamJobConfig jobConfig = configFactory.loadElement( jobConfigName );

String hadoopConfDir = System.getenv( "HADOOP_CONF_DIR" );
System.out.println( ">>>>>> HADOOP_CONF_DIR='" + hadoopConfDir + "'" );

System.out.println( ">>>>>> Building Apache Beam Kettle Pipeline..." );
PluginRegistry registry = PluginRegistry.getInstance();
PluginInterface beamInputPlugin = registry.getPlugin( StepPluginType.class, BeamConst.STRING_BEAM_INPUT_PLUGIN_ID );
if ( beamInputPlugin != null ) {
System.out.println( ">>>>>> Found Beam Input step plugin class loader" );
} else {
throw new KettleException( "Unable to find Beam Input step plugin, bailing out!" );
}
ClassLoader pluginClassLoader = PluginRegistry.getInstance().getClassLoader( beamInputPlugin );
if ( pluginClassLoader != null ) {
System.out.println( ">>>>>> Found Beam Input step plugin class loader" );
} else {
System.out.println( ">>>>>> NOT found Beam Input step plugin class loader, using system classloader" );
pluginClassLoader = ClassLoader.getSystemClassLoader();
}
KettleBeamPipelineExecutor executor = new KettleBeamPipelineExecutor( LogChannel.GENERAL, transMeta, jobConfig, metaStore, pluginClassLoader, stepPluginsList, xpPluginsList );

System.out.println( ">>>>>> Pipeline executing starting..." );
executor.setLoggingMetrics( true );
executor.execute( true );
System.out.println( ">>>>>> Execution finished..." );
return 0;
} catch ( Exception e ) {
System.err.println( "Error running Beam pipeline on '"+environment+"': " + e.getMessage() );
e.printStackTrace();
return 1;
}

}

private static String readFileIntoString( String filename, Configuration hadoopConfiguration, String encoding ) throws IOException {
Path path = new Path( filename );
FileSystem fileSystem = FileSystem.get( path.toUri(), hadoopConfiguration );
FSDataInputStream inputStream = fileSystem.open( path );
String fileContent = IOUtils.toString( inputStream, encoding );
return fileContent;
}
}
85 changes: 2 additions & 83 deletions src/main/java/org/kettle/beam/pipeline/spark/MainSpark.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.kettle.beam.core.BeamDefaults;
import org.kettle.beam.core.BeamKettle;
import org.kettle.beam.core.metastore.SerializableMetaStore;
import org.kettle.beam.metastore.BeamJobConfig;
import org.kettle.beam.pipeline.KettleBeamPipelineExecutor;
import org.kettle.beam.pipeline.main.MainBeam;
import org.kettle.beam.util.BeamConst;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.logging.LogChannel;
Expand All @@ -28,88 +28,7 @@
import java.util.List;

public class MainSpark {

public static void main( String[] args ) {

try {

System.out.println( "Transformation ktr / args[0] : " + args[ 0 ] );
System.out.println( "MetaStore JSON / args[1] : " + args[ 1 ] );
System.out.println( "Beam Job Config / args[2] : " + args[ 2 ] );
System.out.println( "Step plugins / args[3] : " + args[ 3 ] );
System.out.println( "XP plugins / args[4] : " + args[ 4 ] );

// Read the transformation XML and MetaStore from Hadoop FS
//
Configuration hadoopConfiguration = new Configuration();
String transMetaXml = readFileIntoString( args[ 0 ], hadoopConfiguration, "UTF-8" );
String metaStoreJson = readFileIntoString( args[ 1 ], hadoopConfiguration, "UTF-8" );

// Third argument: the beam job config
//
String jobConfigName = args[ 2 ];

// Extra plugins to load from the fat jar file...
//
String stepPlugins = args[ 3 ];
String xpPlugins = args[ 4 ];

// Inflate the metaStore...
//
IMetaStore metaStore = new SerializableMetaStore( metaStoreJson );

List<String> stepPluginsList = new ArrayList<>( Arrays.asList( stepPlugins.split( "," ) ) );
List<String> xpPluginsList = new ArrayList<>( Arrays.asList( xpPlugins.split( "," ) ) );

System.out.println( ">>>>>> Initializing Kettle runtime (" + stepPluginsList.size() + " step classes, " + xpPluginsList.size() + " XP classes)" );

BeamKettle.init( stepPluginsList, xpPluginsList );

System.out.println( ">>>>>> Loading transformation metadata" );
TransMeta transMeta = new TransMeta( XMLHandler.loadXMLString( transMetaXml, TransMeta.XML_TAG ), null );
transMeta.setMetaStore( metaStore );

System.out.println( ">>>>>> Loading Kettle Beam Job Config '" + jobConfigName + "'" );
MetaStoreFactory<BeamJobConfig> configFactory = new MetaStoreFactory<>( BeamJobConfig.class, metaStore, PentahoDefaults.NAMESPACE );
BeamJobConfig jobConfig = configFactory.loadElement( jobConfigName );

String hadoopConfDir = System.getenv( "HADOOP_CONF_DIR" );
System.out.println( ">>>>>> HADOOP_CONF_DIR='" + hadoopConfDir + "'" );

System.out.println( ">>>>>> Building Apache Beam Kettle Pipeline..." );
PluginRegistry registry = PluginRegistry.getInstance();
PluginInterface beamInputPlugin = registry.getPlugin( StepPluginType.class, BeamConst.STRING_BEAM_INPUT_PLUGIN_ID );
if ( beamInputPlugin != null ) {
System.out.println( ">>>>>> Found Beam Input step plugin class loader" );
} else {
throw new KettleException( "Unable to find Beam Input step plugin, bailing out!" );
}
ClassLoader pluginClassLoader = PluginRegistry.getInstance().getClassLoader( beamInputPlugin );
if ( pluginClassLoader != null ) {
System.out.println( ">>>>>> Found Beam Input step plugin class loader" );
} else {
System.out.println( ">>>>>> NOT found Beam Input step plugin class loader, using system classloader" );
pluginClassLoader = ClassLoader.getSystemClassLoader();
}
KettleBeamPipelineExecutor executor = new KettleBeamPipelineExecutor( LogChannel.GENERAL, transMeta, jobConfig, metaStore, pluginClassLoader, stepPluginsList, xpPluginsList );

System.out.println( ">>>>>> Pipeline executing starting..." );
executor.setLoggingMetrics( true );
executor.execute( true );
System.out.println( ">>>>>> Execution finished..." );
} catch ( Exception e ) {
System.err.println( "Error running Beam pipeline on the Spark master: " + e.getMessage() );
e.printStackTrace();
System.exit( 1 );
}

}

private static String readFileIntoString( String filename, Configuration hadoopConfiguration, String encoding ) throws IOException {
Path path = new Path( filename );
FileSystem fileSystem = FileSystem.get( path.toUri(), hadoopConfiguration );
FSDataInputStream inputStream = fileSystem.open( path );
String fileContent = IOUtils.toString( inputStream, encoding );
return fileContent;
MainBeam.mainMethod(args, "Apache Spark");
}
}
Loading

0 comments on commit b8ba942

Please sign in to comment.