diff --git a/flink-notes-matt.txt b/flink-notes-matt.txt index 655ef58..3635f52 100644 --- a/flink-notes-matt.txt +++ b/flink-notes-matt.txt @@ -1,26 +1,25 @@ -# This is how I'm running on Flink -# Go to the place where you installed flink, say $HOME/flink-1.7.2/ -# Place this script there. -# NOTE: Files like metastore.json, the .ktr or the fat jar can be also stored in hdfs:// -# File name resolution is done using HDFS. -# Make sure to configure your local environment for HDFS in that case, -# set HADOOP_CONF_DIR for example -# -# Put the fat jar (for now generated with Spoon) in $HOME/software/ -# Put your MetaStore export (for now exported with Spoon) in $HOME/metadata -# -# One argument: The transformation ktr -# +This is how I'm running on Flink +Go to the place where you installed flink, say $HOME/flink-1.8.0/ +Place the script below --- there. +NOTES: +- Files like metastore.json, the .ktr or the fat jar can be also stored in hdfs:// +- File name resolution is done using HDFS. Make sure to configure your local environment for HDFS in that case, set HADOOP_CONF_DIR for example +- Put the fat jar (for now generated with Spoon) in $HOME/software/ +- Put your MetaStore export (for now exported with Spoon) in $HOME/metadata + +Pass one argument to the script: The transformation file (.ktr) + +--- +#!/bin/bash TRANS=$1 bin/flink run \ --class org.kettle.beam.pipeline.flink.MainFlink \ - --parallelism 1 \ - ~/software/kettle-beam-fat.jar \ + --parallelism 6 \ + ~/software/kettle-8.2-beam-2.13-fat.jar \ "${TRANS}" \ file:///home/kettle/metadata/metastore.json \ - "Flink server" \ - org.kettle.beam.steps.io.BeamInputMeta,org.kettle.beam.steps.bq.BeamBQOutputMeta,org.kettle.beam.steps.pubsub.BeamPublishMeta,org.kettle.beam.steps.pubsub.BeamSubscribeMeta,org.kettle.beam.steps.window.BeamTimestampMeta,org.kettle.beam.steps.io.BeamOutputMeta,org.kettle.beam.steps.window.BeamWindowMeta,org.kettle.beam.steps.bq.BeamBQInputMeta \ - org.kettle.beam.xp.RunBeamTransExecutionPoint + "Flink server" + diff --git a/src/main/java/org/kettle/beam/metastore/BeamJobConfigDialog.java b/src/main/java/org/kettle/beam/metastore/BeamJobConfigDialog.java index 02e85a4..732b340 100644 --- a/src/main/java/org/kettle/beam/metastore/BeamJobConfigDialog.java +++ b/src/main/java/org/kettle/beam/metastore/BeamJobConfigDialog.java @@ -46,7 +46,6 @@ import org.pentaho.di.ui.core.widget.TextVar; import org.pentaho.di.ui.trans.step.BaseStepDialog; -import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -129,7 +128,7 @@ public class BeamJobConfigDialog { private ComboVar wSparkStorageLevel; // Flink settings - + private Button wFlinkLocal; private TextVar wFlinkMaster; private TextVar wFlinkParallelism; @@ -150,10 +149,10 @@ public class BeamJobConfigDialog { private TextVar wFlinkLatencyTrackingInterval; private TextVar wFlinkAutoWatermarkInterval; private ComboVar wFlinkExecutionModeForBatch; - + private Button wOK; private Button wCancel; - + private VariableSpace space; private PropsUI props; @@ -341,7 +340,7 @@ private void addFormWidgets() { props.setLook( wlRunner ); wlRunner.setText( BaseMessages.getString( PKG, "BeamJobConfigDialog.Runner.Label" ) ); FormData fdlRunner = new FormData(); - fdlRunner.top = new FormAttachment( lastControl, margin); + fdlRunner.top = new FormAttachment( lastControl, margin ); fdlRunner.left = new FormAttachment( 0, -margin ); // First one in the left top corner fdlRunner.right = new FormAttachment( middle, -margin ); wlRunner.setLayoutData( fdlRunner ); @@ -362,10 +361,10 @@ private void addFormWidgets() { FormData fdTabFolder = new FormData(); fdTabFolder.left = new FormAttachment( 0, 0 ); fdTabFolder.right = new FormAttachment( 100, 0 ); - fdTabFolder.top = new FormAttachment( lastControl, margin*2 ); - fdTabFolder.bottom = new FormAttachment( wOK, -margin*2 ); + fdTabFolder.top = new FormAttachment( lastControl, margin * 2 ); + fdTabFolder.bottom = new FormAttachment( wOK, -margin * 2 ); wTabFolder.setLayoutData( fdTabFolder ); - + addGeneralTab(); addParametersTab(); addDataflowTab(); @@ -381,7 +380,7 @@ private void addGeneralTab() { int middle = Const.MIDDLE_PCT; wGeneralTab = new CTabItem( wTabFolder, SWT.NONE ); - wGeneralTab .setText( " General " ); + wGeneralTab.setText( " General " ); wGeneralSComp = new ScrolledComposite( wTabFolder, SWT.V_SCROLL | SWT.H_SCROLL ); wGeneralSComp.setLayout( new FillLayout() ); @@ -470,7 +469,7 @@ private void addGeneralTab() { fdStepPluginClasses.left = new FormAttachment( middle, 0 ); // To the right of the label fdStepPluginClasses.right = new FormAttachment( 95, 0 ); wStepPluginClasses.setLayoutData( fdStepPluginClasses ); - wbStepPluginClasses.setText(BaseMessages.getString( PKG, "BeamJobConfigDialog.StepPluginClasses.Button") ); + wbStepPluginClasses.setText( BaseMessages.getString( PKG, "BeamJobConfigDialog.StepPluginClasses.Button" ) ); FormData fdbStepPluginClasses = new FormData(); fdbStepPluginClasses.top = new FormAttachment( lastControl, margin ); fdbStepPluginClasses.left = new FormAttachment( wStepPluginClasses, margin ); @@ -498,7 +497,7 @@ private void addGeneralTab() { fdXpPluginClasses.left = new FormAttachment( middle, 0 ); // To the right of the label fdXpPluginClasses.right = new FormAttachment( 95, 0 ); wXpPluginClasses.setLayoutData( fdXpPluginClasses ); - wbXpPluginClasses.setText(BaseMessages.getString( PKG, "BeamJobConfigDialog.XpPluginClasses.Button") ); + wbXpPluginClasses.setText( BaseMessages.getString( PKG, "BeamJobConfigDialog.XpPluginClasses.Button" ) ); FormData fdbXpPluginClasses = new FormData(); fdbXpPluginClasses.top = new FormAttachment( lastControl, margin ); fdbXpPluginClasses.left = new FormAttachment( wXpPluginClasses, margin ); @@ -525,7 +524,7 @@ private void addGeneralTab() { fdFatJar.right = new FormAttachment( 95, 0 ); wFatJar.setLayoutData( fdFatJar ); Button wbFatJar = new Button( wGeneralComp, SWT.PUSH ); - wbFatJar.setText(BaseMessages.getString( PKG, "BeamJobConfigDialog.FatJar.Button") ); + wbFatJar.setText( BaseMessages.getString( PKG, "BeamJobConfigDialog.FatJar.Button" ) ); FormData fdbFatJar = new FormData(); fdbFatJar.top = new FormAttachment( lastControl, margin ); fdbFatJar.left = new FormAttachment( wFatJar, margin ); @@ -552,7 +551,7 @@ private void addGeneralTab() { fdStreamingKettleStepsFlushInterval.right = new FormAttachment( 95, 0 ); wStreamingKettleStepsFlushInterval.setLayoutData( fdStreamingKettleStepsFlushInterval ); lastControl = wStreamingKettleStepsFlushInterval; - + FormData fdGeneralComp = new FormData(); fdGeneralComp.left = new FormAttachment( 0, 0 ); fdGeneralComp.top = new FormAttachment( 0, 0 ); @@ -579,23 +578,47 @@ private void buildFatJar( Event event ) { BeamJobConfig jobConfig = new BeamJobConfig(); getInfo( jobConfig ); - FileDialog dialog = new FileDialog(shell, SWT.SAVE); + FileDialog dialog = new FileDialog( shell, SWT.SAVE ); dialog.setText( "Select the location of the Kettle+Beam+Plugins fat jar" ); - dialog.setFilterNames(new String[] { "Jar files (*.jar)", "All Files (*.*)" }); - dialog.setFilterExtensions(new String[] { "*.jar", "*.*" }); // Windows - if (StringUtils.isNotEmpty( jobConfig.getFatJar() )) { - dialog.setFileName( space.environmentSubstitute(jobConfig.getFatJar()) ); + dialog.setFilterNames( new String[] { "Jar files (*.jar)", "All Files (*.*)" } ); + dialog.setFilterExtensions( new String[] { "*.jar", "*.*" } ); // Windows + if ( StringUtils.isNotEmpty( jobConfig.getFatJar() ) ) { + dialog.setFileName( space.environmentSubstitute( jobConfig.getFatJar() ) ); } String filename = dialog.open(); - if (StringUtils.isEmpty( filename )) { + if ( StringUtils.isEmpty( filename ) ) { return; } List files = BeamConst.findLibraryFilesToStage( null, jobConfig.getPluginsToStage(), true, true ); - files.removeIf( s -> s.contains( "commons-logging" ) || s.contains( "log4j" ) || s.contains("xml-apis") ); + files.removeIf( s -> s.contains( "commons-logging" ) || s.startsWith( "log4j" ) || s.contains( "xml-apis" ) ); + + // Find the plugin classes for the specified plugins... + // + String stepPluginClasses = findPluginClasses( Step.class.getName() ); + if (StringUtils.isNotEmpty(jobConfig.getStepPluginClasses())) { + if (StringUtils.isEmpty( stepPluginClasses )) { + stepPluginClasses=""; + } else { + stepPluginClasses+=","; + } + stepPluginClasses+=jobConfig.getStepPluginClasses(); + } + String xpPluginClasses = findPluginClasses( ExtensionPoint.class.getName() ); + if (StringUtils.isNotEmpty(jobConfig.getXpPluginClasses())) { + if (StringUtils.isEmpty( xpPluginClasses )) { + xpPluginClasses=""; + } else { + xpPluginClasses+=","; + } + xpPluginClasses+=jobConfig.getStepPluginClasses(); + } + FatJarBuilder fatJarBuilder = new FatJarBuilder( filename, files ); - Cursor waitCursor = new Cursor(shell.getDisplay(), SWT.CURSOR_WAIT); + fatJarBuilder.setExtraStepPluginClasses( stepPluginClasses ); + fatJarBuilder.setExtraXpPluginClasses( xpPluginClasses ); + Cursor waitCursor = new Cursor( shell.getDisplay(), SWT.CURSOR_WAIT ); Cursor regularCursor = shell.getCursor(); try { @@ -611,76 +634,47 @@ private void buildFatJar( Event event ) { wFatJar.setText( filename ); } catch ( Exception e ) { - new ErrorDialog( shell, "Error", "Error building fat jar: "+e.getMessage(), e); + new ErrorDialog( shell, "Error", "Error building fat jar: " + e.getMessage(), e ); } } private void findStepClasses( Event event ) { String stepPluginClasses = findPluginClasses( Step.class.getName() ); - if (stepPluginClasses!=null) { + if ( stepPluginClasses != null ) { wStepPluginClasses.setText( stepPluginClasses ); } } private void findXpClasses( Event event ) { String xpPluginClasses = findPluginClasses( ExtensionPoint.class.getName() ); - if (xpPluginClasses!=null) { + if ( xpPluginClasses != null ) { wXpPluginClasses.setText( xpPluginClasses ); } } - private String findPluginClasses( String pluginClassName) { + private String findPluginClasses( String pluginClassName ) { BeamJobConfig jobConfig = new BeamJobConfig(); getInfo( jobConfig ); - String plugins = jobConfig.getPluginsToStage(); - if (StringUtils.isNotEmpty( plugins )) { - - Set classes = new HashSet<>(); - String[] pluginFolders = plugins.split(","); - for (String pluginFolder : pluginFolders) { - try { - List stepClasses = TransMetaPipelineConverter.findAnnotatedClasses( pluginFolder, pluginClassName ); - for (String stepClass : stepClasses) { - classes.add( stepClass ); - } - } catch(Exception e) { - new ErrorDialog(shell, "Error", "Error find plugin classes of annotation type '"+pluginClassName+"' in folder '"+pluginFolder+"' : ", e); - } - } - - // OK, we now have all the classes... - // Let's sort by name and add them in the dialog comma separated... - // - List classesList = new ArrayList<>( ); - classesList.addAll( classes ); - Collections.sort(classesList); - - StringBuffer all = new StringBuffer( ); - for (String pluginClass : classesList) { - if (all.length()>0) { - all.append( "," ); - } - all.append(pluginClass); - } - - return all.toString(); + try { + return FatJarBuilder.findPluginClasses( pluginClassName, jobConfig.getPluginsToStage() ); + } catch ( Exception e ) { + new ErrorDialog( shell, "Error", "Error find plugin classes of annotation type '" + pluginClassName, e ); + return null; } - - return null; } private void addParametersTab() { wParametersTab = new CTabItem( wTabFolder, SWT.NONE ); - wParametersTab .setText( "Parameters" ); + wParametersTab.setText( "Parameters" ); wParametersComp = new Composite( wTabFolder, SWT.NO_BACKGROUND ); props.setLook( wParametersComp ); - wParametersComp.setLayout( new FormLayout( ) ); + wParametersComp.setLayout( new FormLayout() ); ColumnInfo[] columnInfos = new ColumnInfo[] { - new ColumnInfo("Name", ColumnInfo.COLUMN_TYPE_TEXT, false, false), - new ColumnInfo("Value", ColumnInfo.COLUMN_TYPE_TEXT, false, false), + new ColumnInfo( "Name", ColumnInfo.COLUMN_TYPE_TEXT, false, false ), + new ColumnInfo( "Value", ColumnInfo.COLUMN_TYPE_TEXT, false, false ), }; wParameters = new TableView( space, wParametersComp, SWT.BORDER, columnInfos, config.getParameters().size(), null, props ); @@ -688,14 +682,14 @@ private void addParametersTab() { FormData fdParameters = new FormData(); fdParameters.left = new FormAttachment( 0, 0 ); fdParameters.right = new FormAttachment( 100, 0 ); - fdParameters.top = new FormAttachment( 0, 0); + fdParameters.top = new FormAttachment( 0, 0 ); fdParameters.bottom = new FormAttachment( 100, 0 ); wParameters.setLayoutData( fdParameters ); FormData fdParametersComp = new FormData(); fdParametersComp.left = new FormAttachment( 0, 0 ); fdParametersComp.right = new FormAttachment( 100, 0 ); - fdParametersComp.top = new FormAttachment( 0, 0); + fdParametersComp.top = new FormAttachment( 0, 0 ); fdParametersComp.bottom = new FormAttachment( 100, 0 ); wParametersComp.setLayoutData( fdParametersComp ); @@ -707,7 +701,7 @@ private void addDataflowTab() { int middle = Const.MIDDLE_PCT; wDataflowTab = new CTabItem( wTabFolder, SWT.NONE ); - wDataflowTab .setText( " Dataflow " ); + wDataflowTab.setText( " Dataflow " ); wDataflowSComp = new ScrolledComposite( wTabFolder, SWT.V_SCROLL | SWT.H_SCROLL ); wDataflowSComp.setLayout( new FillLayout() ); @@ -757,7 +751,7 @@ private void addDataflowTab() { fdGcpAppName.right = new FormAttachment( 95, 0 ); wGcpAppName.setLayoutData( fdGcpAppName ); lastControl = wGcpAppName; - + // Staging location // Label wlGcpStagingLocation = new Label( wDataflowComp, SWT.RIGHT ); @@ -905,7 +899,7 @@ private void addDataflowTab() { wlGcpRegion.setLayoutData( fdlGcpRegion ); wGcpRegion = new ComboVar( space, wDataflowComp, SWT.SINGLE | SWT.LEFT | SWT.BORDER ); props.setLook( wGcpRegion ); - wGcpRegion.setItems(BeamConst.getGcpRegionDescriptions()); + wGcpRegion.setItems( BeamConst.getGcpRegionDescriptions() ); FormData fdGcpRegion = new FormData(); fdGcpRegion.top = new FormAttachment( wlGcpRegion, 0, SWT.CENTER ); fdGcpRegion.left = new FormAttachment( middle, 0 ); // To the right of the label @@ -975,7 +969,7 @@ private void addSparkTab() { int middle = 65; wSparkTab = new CTabItem( wTabFolder, SWT.NONE ); - wSparkTab .setText( " Spark " ); + wSparkTab.setText( " Spark " ); wSparkSComp = new ScrolledComposite( wTabFolder, SWT.V_SCROLL | SWT.H_SCROLL ); wSparkSComp.setLayout( new FillLayout() ); @@ -1007,7 +1001,9 @@ private void addSparkTab() { wSparkLocal.setLayoutData( fdSparkLocal ); Control lastControl = wSparkLocal; - wSparkLocal.addListener( SWT.Selection, e-> { enableFields(); } ); + wSparkLocal.addListener( SWT.Selection, e -> { + enableFields(); + } ); // Spark master // @@ -1114,7 +1110,7 @@ private void addSparkTab() { fdlsparkEnableSparkMetricSinks.left = new FormAttachment( 0, -margin ); // First one in the left top corner fdlsparkEnableSparkMetricSinks.right = new FormAttachment( middle, -margin ); wlsparkEnableSparkMetricSinks.setLayoutData( fdlsparkEnableSparkMetricSinks ); - wsparkEnableSparkMetricSinks = new Button( wSparkComp, SWT.CHECK| SWT.LEFT | SWT.BORDER ); + wsparkEnableSparkMetricSinks = new Button( wSparkComp, SWT.CHECK | SWT.LEFT | SWT.BORDER ); props.setLook( wsparkEnableSparkMetricSinks ); FormData fdsparkEnableSparkMetricSinks = new FormData(); fdsparkEnableSparkMetricSinks.top = new FormAttachment( wlsparkEnableSparkMetricSinks, 0, SWT.CENTER ); @@ -1245,7 +1241,7 @@ private void addFlinkTab() { int middle = 65; wFlinkTab = new CTabItem( wTabFolder, SWT.NONE ); - wFlinkTab .setText( " Flink " ); + wFlinkTab.setText( " Flink " ); wFlinkSComp = new ScrolledComposite( wTabFolder, SWT.V_SCROLL | SWT.H_SCROLL ); wFlinkSComp.setLayout( new FillLayout() ); @@ -1277,7 +1273,9 @@ private void addFlinkTab() { wFlinkLocal.setLayoutData( fdFlinkLocal ); Control lastControl = wFlinkLocal; - wFlinkLocal.addListener( SWT.Selection, e-> { enableFields(); } ); + wFlinkLocal.addListener( SWT.Selection, e -> { + enableFields(); + } ); // Flink master // @@ -1297,7 +1295,7 @@ private void addFlinkTab() { fdFlinkMaster.right = new FormAttachment( 95, 0 ); wFlinkMaster.setLayoutData( fdFlinkMaster ); lastControl = wFlinkMaster; - + Label wlFlinkParallelism = new Label( wFlinkComp, SWT.RIGHT ); props.setLook( wlFlinkParallelism ); wlFlinkParallelism.setText( BaseMessages.getString( PKG, "BeamJobConfigDialog.FlinkParallelism.Label" ) ); @@ -1651,94 +1649,94 @@ public void getData() { wDescription.setText( Const.NVL( config.getDescription(), "" ) ); // General - wRunner.setText( Const.NVL(config.getRunnerTypeName(), "") ); - wUserAgent.setText( Const.NVL(config.getUserAgent(), "") ); - wTempLocation.setText( Const.NVL(config.getTempLocation(), "") ); - wPluginsToStage.setText( Const.NVL(config.getPluginsToStage(), "") ); - wStepPluginClasses.setText( Const.NVL(config.getStepPluginClasses(), "") ); - wXpPluginClasses.setText( Const.NVL(config.getXpPluginClasses(), "") ); - wFatJar.setText( Const.NVL(config.getFatJar(), "") ); - wStreamingKettleStepsFlushInterval.setText(Const.NVL(config.getStreamingKettleStepsFlushInterval(), "")); + wRunner.setText( Const.NVL( config.getRunnerTypeName(), "" ) ); + wUserAgent.setText( Const.NVL( config.getUserAgent(), "" ) ); + wTempLocation.setText( Const.NVL( config.getTempLocation(), "" ) ); + wPluginsToStage.setText( Const.NVL( config.getPluginsToStage(), "" ) ); + wStepPluginClasses.setText( Const.NVL( config.getStepPluginClasses(), "" ) ); + wXpPluginClasses.setText( Const.NVL( config.getXpPluginClasses(), "" ) ); + wFatJar.setText( Const.NVL( config.getFatJar(), "" ) ); + wStreamingKettleStepsFlushInterval.setText( Const.NVL( config.getStreamingKettleStepsFlushInterval(), "" ) ); // GCP /* */ - wGcpProjectId.setText( Const.NVL(config.getGcpProjectId(), "") ); - wGcpAppName.setText( Const.NVL(config.getGcpAppName(), "") ); - wGcpStagingLocation.setText( Const.NVL(config.getGcpStagingLocation(), "") ); + wGcpProjectId.setText( Const.NVL( config.getGcpProjectId(), "" ) ); + wGcpAppName.setText( Const.NVL( config.getGcpAppName(), "" ) ); + wGcpStagingLocation.setText( Const.NVL( config.getGcpStagingLocation(), "" ) ); String workerCode = config.getGcpWorkerMachineType(); String workerDescription = ""; - if (StringUtils.isNotEmpty( workerCode )) { + if ( StringUtils.isNotEmpty( workerCode ) ) { int index = Const.indexOfString( workerCode, BeamConst.getGcpWorkerMachineTypeCodes() ); - if (index<0) { + if ( index < 0 ) { workerDescription = workerCode; // variable, manually entered in general } else { - workerDescription = BeamConst.getGcpWorkerMachineTypeDescriptions()[index]; + workerDescription = BeamConst.getGcpWorkerMachineTypeDescriptions()[ index ]; } } - wGcpWorkerMachineType.setText(workerDescription); - wGcpWorkerDiskType.setText(Const.NVL(config.getGcpWorkerDiskType(), "")); - wGcpDiskSizeGb.setText(Const.NVL(config.getGcpDiskSizeGb(), "")); - wGcpInitialNumberOfWorkers.setText( Const.NVL(config.getGcpInitialNumberOfWorkers(), "") ); - wGcpMaximumNumberOfWorkers.setText( Const.NVL(config.getGcpMaximumNumberOfWokers(), "") ); - wGcpAutoScalingAlgorithm.setText(Const.NVL(config.getGcpAutoScalingAlgorithm(), "") ); + wGcpWorkerMachineType.setText( workerDescription ); + wGcpWorkerDiskType.setText( Const.NVL( config.getGcpWorkerDiskType(), "" ) ); + wGcpDiskSizeGb.setText( Const.NVL( config.getGcpDiskSizeGb(), "" ) ); + wGcpInitialNumberOfWorkers.setText( Const.NVL( config.getGcpInitialNumberOfWorkers(), "" ) ); + wGcpMaximumNumberOfWorkers.setText( Const.NVL( config.getGcpMaximumNumberOfWokers(), "" ) ); + wGcpAutoScalingAlgorithm.setText( Const.NVL( config.getGcpAutoScalingAlgorithm(), "" ) ); wGcpStreaming.setSelection( config.isGcpStreaming() ); String regionCode = config.getGcpRegion(); String regionDescription = ""; - if (StringUtils.isNotEmpty( regionCode )) { + if ( StringUtils.isNotEmpty( regionCode ) ) { int index = Const.indexOfString( regionCode, BeamConst.getGcpRegionCodes() ); - if (index<0) { + if ( index < 0 ) { regionDescription = regionCode; // variable, manually entered in general } else { - regionDescription = BeamConst.getGcpRegionDescriptions()[index]; + regionDescription = BeamConst.getGcpRegionDescriptions()[ index ]; } } - wGcpRegion.setText(regionDescription); - wGcpZone.setText(Const.NVL(config.getGcpZone(), "")); + wGcpRegion.setText( regionDescription ); + wGcpZone.setText( Const.NVL( config.getGcpZone(), "" ) ); // Spark - wSparkLocal.setSelection(config.isSparkLocal()); - wSparkMaster.setText(Const.NVL(config.getSparkMaster(), "")); - wSparkDeployFolder.setText(Const.NVL(config.getSparkDeployFolder(), "")); - wSparkBatchIntervalMillis.setText(Const.NVL(config.getSparkBatchIntervalMillis(), "")); - wSparkCheckpointDir.setText(Const.NVL(config.getSparkCheckpointDir(), "")); - wSparkCheckpointDurationMillis.setText(Const.NVL(config.getSparkCheckpointDurationMillis(), "")); - wsparkEnableSparkMetricSinks.setSelection(config.isSparkEnableSparkMetricSinks()); - wSparkMaxRecordsPerBatch.setText(Const.NVL(config.getSparkMaxRecordsPerBatch(), "")); - wSparkMinReadTimeMillis.setText(Const.NVL(config.getSparkMinReadTimeMillis(), "")); - wSparkReadTimePercentage.setText(Const.NVL(config.getSparkReadTimePercentage(), "")); - wSparkBundleSize.setText(Const.NVL(config.getSparkBundleSize(), "")); - wSparkStorageLevel.setText(Const.NVL(config.getSparkStorageLevel(), "")); + wSparkLocal.setSelection( config.isSparkLocal() ); + wSparkMaster.setText( Const.NVL( config.getSparkMaster(), "" ) ); + wSparkDeployFolder.setText( Const.NVL( config.getSparkDeployFolder(), "" ) ); + wSparkBatchIntervalMillis.setText( Const.NVL( config.getSparkBatchIntervalMillis(), "" ) ); + wSparkCheckpointDir.setText( Const.NVL( config.getSparkCheckpointDir(), "" ) ); + wSparkCheckpointDurationMillis.setText( Const.NVL( config.getSparkCheckpointDurationMillis(), "" ) ); + wsparkEnableSparkMetricSinks.setSelection( config.isSparkEnableSparkMetricSinks() ); + wSparkMaxRecordsPerBatch.setText( Const.NVL( config.getSparkMaxRecordsPerBatch(), "" ) ); + wSparkMinReadTimeMillis.setText( Const.NVL( config.getSparkMinReadTimeMillis(), "" ) ); + wSparkReadTimePercentage.setText( Const.NVL( config.getSparkReadTimePercentage(), "" ) ); + wSparkBundleSize.setText( Const.NVL( config.getSparkBundleSize(), "" ) ); + wSparkStorageLevel.setText( Const.NVL( config.getSparkStorageLevel(), "" ) ); // Flink wFlinkLocal.setSelection( config.isFlinkLocal() ); - wFlinkMaster.setText( Const.NVL(config.getFlinkMaster(), "") ); - wFlinkParallelism.setText( Const.NVL(config.getFlinkParallelism(), "") ); - wFlinkCheckpointingInterval.setText( Const.NVL(config.getFlinkCheckpointingInterval(), "") ); - wFlinkCheckpointingMode.setText( Const.NVL(config.getFlinkCheckpointingMode(), "") ); - wFlinkCheckpointTimeoutMillis.setText( Const.NVL(config.getFlinkCheckpointTimeoutMillis(), "") ); - wFlinkMinPauseBetweenCheckpoints.setText( Const.NVL(config.getFlinkMinPauseBetweenCheckpoints(), "") ); - wFlinkNumberOfExecutionRetries.setText( Const.NVL(config.getFlinkNumberOfExecutionRetries(), "") ); - wFlinkExecutionRetryDelay.setText( Const.NVL(config.getFlinkExecutionRetryDelay(), "") ); - wFlinkObjectReuse.setText( Const.NVL(config.getFlinkObjectReuse(), "") ); - wFlinkStateBackend.setText( Const.NVL(config.getFlinkStateBackend(), "") ); - wFlinkEnableMetrics.setText( Const.NVL(config.getFlinkEnableMetrics(), "") ); - wFlinkExternalizedCheckpointsEnabled.setText( Const.NVL(config.getFlinkExternalizedCheckpointsEnabled(), "") ); - wFlinkRetainExternalizedCheckpointsOnCancellation.setText( Const.NVL(config.getFlinkRetainExternalizedCheckpointsOnCancellation(), "") ); - wFlinkMaxBundleSize.setText( Const.NVL(config.getFlinkMaxBundleSize(), "") ); - wFlinkMaxBundleTimeMills.setText( Const.NVL(config.getFlinkMaxBundleTimeMills(), "") ); - wFlinkShutdownSourcesOnFinalWatermark.setText( Const.NVL(config.getFlinkShutdownSourcesOnFinalWatermark(), "") ); - wFlinkLatencyTrackingInterval.setText( Const.NVL(config.getFlinkLatencyTrackingInterval(), "") ); - wFlinkAutoWatermarkInterval.setText( Const.NVL(config.getFlinkAutoWatermarkInterval(), "") ); - wFlinkExecutionModeForBatch.setText( Const.NVL(config.getFlinkExecutionModeForBatch(), "") ); + wFlinkMaster.setText( Const.NVL( config.getFlinkMaster(), "" ) ); + wFlinkParallelism.setText( Const.NVL( config.getFlinkParallelism(), "" ) ); + wFlinkCheckpointingInterval.setText( Const.NVL( config.getFlinkCheckpointingInterval(), "" ) ); + wFlinkCheckpointingMode.setText( Const.NVL( config.getFlinkCheckpointingMode(), "" ) ); + wFlinkCheckpointTimeoutMillis.setText( Const.NVL( config.getFlinkCheckpointTimeoutMillis(), "" ) ); + wFlinkMinPauseBetweenCheckpoints.setText( Const.NVL( config.getFlinkMinPauseBetweenCheckpoints(), "" ) ); + wFlinkNumberOfExecutionRetries.setText( Const.NVL( config.getFlinkNumberOfExecutionRetries(), "" ) ); + wFlinkExecutionRetryDelay.setText( Const.NVL( config.getFlinkExecutionRetryDelay(), "" ) ); + wFlinkObjectReuse.setText( Const.NVL( config.getFlinkObjectReuse(), "" ) ); + wFlinkStateBackend.setText( Const.NVL( config.getFlinkStateBackend(), "" ) ); + wFlinkEnableMetrics.setText( Const.NVL( config.getFlinkEnableMetrics(), "" ) ); + wFlinkExternalizedCheckpointsEnabled.setText( Const.NVL( config.getFlinkExternalizedCheckpointsEnabled(), "" ) ); + wFlinkRetainExternalizedCheckpointsOnCancellation.setText( Const.NVL( config.getFlinkRetainExternalizedCheckpointsOnCancellation(), "" ) ); + wFlinkMaxBundleSize.setText( Const.NVL( config.getFlinkMaxBundleSize(), "" ) ); + wFlinkMaxBundleTimeMills.setText( Const.NVL( config.getFlinkMaxBundleTimeMills(), "" ) ); + wFlinkShutdownSourcesOnFinalWatermark.setText( Const.NVL( config.getFlinkShutdownSourcesOnFinalWatermark(), "" ) ); + wFlinkLatencyTrackingInterval.setText( Const.NVL( config.getFlinkLatencyTrackingInterval(), "" ) ); + wFlinkAutoWatermarkInterval.setText( Const.NVL( config.getFlinkAutoWatermarkInterval(), "" ) ); + wFlinkExecutionModeForBatch.setText( Const.NVL( config.getFlinkExecutionModeForBatch(), "" ) ); // Parameters // - int rowNr=0; - for ( JobParameter parameter : config.getParameters()) { + int rowNr = 0; + for ( JobParameter parameter : config.getParameters() ) { TableItem item = wParameters.table.getItem( rowNr++ ); - item.setText( 1, Const.NVL(parameter.getVariable(), "") ); - item.setText( 2, Const.NVL(parameter.getValue(), "") ); + item.setText( 1, Const.NVL( parameter.getVariable(), "" ) ); + item.setText( 2, Const.NVL( parameter.getValue(), "" ) ); } wParameters.setRowNums(); wParameters.optWidth( true ); @@ -1775,8 +1773,8 @@ private void getInfo( BeamJobConfig cfg ) { cfg.setUserAgent( wUserAgent.getText() ); cfg.setTempLocation( wTempLocation.getText() ); cfg.setPluginsToStage( wPluginsToStage.getText() ); - cfg.setStepPluginClasses( (wStepPluginClasses.getText()) ); - cfg.setXpPluginClasses( (wXpPluginClasses.getText()) ); + cfg.setStepPluginClasses( ( wStepPluginClasses.getText() ) ); + cfg.setXpPluginClasses( ( wXpPluginClasses.getText() ) ); cfg.setStreamingKettleStepsFlushInterval( wStreamingKettleStepsFlushInterval.getText() ); cfg.setFatJar( wFatJar.getText() ); cfg.setGcpProjectId( wGcpProjectId.getText() ); @@ -1789,12 +1787,12 @@ private void getInfo( BeamJobConfig cfg ) { String workerMachineDesciption = wGcpWorkerMachineType.getText(); String workerMachineCode = null; - if (StringUtils.isNotEmpty( workerMachineDesciption )) { + if ( StringUtils.isNotEmpty( workerMachineDesciption ) ) { int index = Const.indexOfString( workerMachineDesciption, BeamConst.getGcpWorkerMachineTypeDescriptions() ); - if (index<0) { + if ( index < 0 ) { workerMachineCode = workerMachineDesciption; // Variable or manually entered } else { - workerMachineCode = BeamConst.getGcpWorkerMachineTypeCodes()[index]; + workerMachineCode = BeamConst.getGcpWorkerMachineTypeCodes()[ index ]; } } cfg.setGcpWorkerMachineType( workerMachineCode ); @@ -1805,12 +1803,12 @@ private void getInfo( BeamJobConfig cfg ) { String regionDesciption = wGcpRegion.getText(); String regionCode = null; - if (StringUtils.isNotEmpty( regionDesciption )) { + if ( StringUtils.isNotEmpty( regionDesciption ) ) { int index = Const.indexOfString( regionDesciption, BeamConst.getGcpRegionDescriptions() ); - if (index<0) { + if ( index < 0 ) { regionCode = regionDesciption; // Variable or manually entered } else { - regionCode = BeamConst.getGcpRegionCodes()[index]; + regionCode = BeamConst.getGcpRegionCodes()[ index ]; } } cfg.setGcpRegion( regionCode ); @@ -1828,12 +1826,12 @@ private void getInfo( BeamJobConfig cfg ) { cfg.setSparkBundleSize( wSparkBundleSize.getText() ); cfg.setSparkStorageLevel( wSparkStorageLevel.getText() ); - cfg.setFlinkLocal(wFlinkLocal.getSelection()); - cfg.setFlinkMaster(wFlinkMaster.getText()); - cfg.setFlinkParallelism(wFlinkParallelism.getText()); - cfg.setFlinkCheckpointingInterval(wFlinkCheckpointingInterval.getText()); - cfg.setFlinkCheckpointingMode(wFlinkCheckpointingMode.getText()); - cfg.setFlinkCheckpointTimeoutMillis(wFlinkCheckpointTimeoutMillis.getText()); + cfg.setFlinkLocal( wFlinkLocal.getSelection() ); + cfg.setFlinkMaster( wFlinkMaster.getText() ); + cfg.setFlinkParallelism( wFlinkParallelism.getText() ); + cfg.setFlinkCheckpointingInterval( wFlinkCheckpointingInterval.getText() ); + cfg.setFlinkCheckpointingMode( wFlinkCheckpointingMode.getText() ); + cfg.setFlinkCheckpointTimeoutMillis( wFlinkCheckpointTimeoutMillis.getText() ); cfg.setFlinkMinPauseBetweenCheckpoints( wFlinkMinPauseBetweenCheckpoints.getText() ); cfg.setFlinkNumberOfExecutionRetries( wFlinkNumberOfExecutionRetries.getText() ); cfg.setFlinkExecutionRetryDelay( wFlinkExecutionRetryDelay.getText() ); @@ -1844,15 +1842,15 @@ private void getInfo( BeamJobConfig cfg ) { cfg.setFlinkRetainExternalizedCheckpointsOnCancellation( wFlinkRetainExternalizedCheckpointsOnCancellation.getText() ); cfg.setFlinkMaxBundleSize( wFlinkMaxBundleSize.getText() ); cfg.setFlinkMaxBundleTimeMills( wFlinkMaxBundleTimeMills.getText() ); - cfg.setFlinkShutdownSourcesOnFinalWatermark( wFlinkShutdownSourcesOnFinalWatermark.getText() );; + cfg.setFlinkShutdownSourcesOnFinalWatermark( wFlinkShutdownSourcesOnFinalWatermark.getText() ); ; cfg.setFlinkLatencyTrackingInterval( wFlinkLatencyTrackingInterval.getText() ); cfg.setFlinkAutoWatermarkInterval( wFlinkAutoWatermarkInterval.getText() ); cfg.setFlinkExecutionModeForBatch( wFlinkExecutionModeForBatch.getText() ); cfg.getParameters().clear(); - for (int i=0;i factory = new MetaStoreFactory<>( BeamJobConfig.class, spoon.getMetaStore(), PentahoDefaults.NAMESPACE ); + VariableSpace space = Variables.getADefaultVariableSpace(); + String pluginFolders; + String filename; try { - IRunnableWithProgress op = new IRunnableWithProgress() { - public void run( IProgressMonitor monitor ) throws InvocationTargetException, InterruptedException { - try { + List elementNames = factory.getElementNames(); + Collections.sort( elementNames ); + String[] names = elementNames.toArray( new String[ 0 ] ); - VariableSpace space = Variables.getADefaultVariableSpace(); - List files = BeamConst.findLibraryFilesToStage( null, pluginFolders, true, true ); - files.removeIf( s -> s.contains( "commons-logging" ) || s.contains( "log4j" ) || s.contains("xml-apis") ); + EnterSelectionDialog selectionDialog = new EnterSelectionDialog( spoon.getShell(), names, + BaseMessages.getString( PKG, "BeamHelper.SelectJobConfigForFatJar.Title" ), + BaseMessages.getString( PKG, "BeamHelper.SelectJobConfigForFatJar.Message" ) + ); + String choice = selectionDialog.open(); + if ( choice != null ) { - FatJarBuilder fatJarBuilder = new FatJarBuilder( filename, files ); - fatJarBuilder.buildTargetJar(); + BeamJobConfig jobConfig = factory.loadElement( choice ); + pluginFolders = jobConfig.getPluginsToStage(); - } catch ( Exception e ) { - throw new InvocationTargetException( e, "Error building fat jar: "+e.getMessage()); - } + FileDialog dialog = new FileDialog( shell, SWT.SAVE ); + dialog.setText( "Select the location of the Kettle+Beam+Plugins fat jar" ); + dialog.setFilterNames( new String[] { "Jar files (*.jar)", "All Files (*.*)" } ); + dialog.setFilterExtensions( new String[] { "*.jar", "*.*" } ); // Windows + if ( StringUtils.isNotEmpty( jobConfig.getFatJar() ) ) { + dialog.setFileName( space.environmentSubstitute( jobConfig.getFatJar() ) ); + } + filename = dialog.open(); + if ( StringUtils.isEmpty( filename ) ) { + return; } - }; - ProgressMonitorDialog pmd = new ProgressMonitorDialog( shell ); - pmd.run( true, true, op ); + IRunnableWithProgress op = new IRunnableWithProgress() { + public void run( IProgressMonitor monitor ) throws InvocationTargetException, InterruptedException { + try { - MessageBox box = new MessageBox( shell, SWT.CLOSE | SWT.ICON_INFORMATION ); - box.setText( "Fat jar created" ); - box.setMessage( "A fat jar was successfully created : "+filename+Const.CR+"Included plugin folders: "+pluginFolders ); - box.open(); + List files = BeamConst.findLibraryFilesToStage( null, jobConfig.getPluginsToStage(), true, true ); + files.removeIf( s -> s.contains( "commons-logging" ) || s.contains( "log4j" ) || s.contains( "xml-apis" ) ); + + // Find the plugin classes for the specified plugins... + // + String stepPluginClasses = FatJarBuilder.findPluginClasses( Step.class.getName(), pluginFolders ); + if (StringUtils.isNotEmpty(jobConfig.getStepPluginClasses())) { + if (StringUtils.isEmpty( stepPluginClasses )) { + stepPluginClasses=""; + } else { + stepPluginClasses+=","; + } + stepPluginClasses+=jobConfig.getStepPluginClasses(); + } + String xpPluginClasses = FatJarBuilder.findPluginClasses( ExtensionPoint.class.getName(), pluginFolders ); + if (StringUtils.isNotEmpty(jobConfig.getXpPluginClasses())) { + if (StringUtils.isEmpty( xpPluginClasses )) { + xpPluginClasses=""; + } else { + xpPluginClasses+=","; + } + xpPluginClasses+=jobConfig.getStepPluginClasses(); + } + + FatJarBuilder fatJarBuilder = new FatJarBuilder( filename, files ); + fatJarBuilder.setExtraStepPluginClasses( stepPluginClasses ); + fatJarBuilder.setExtraXpPluginClasses( xpPluginClasses ); + fatJarBuilder.buildTargetJar(); + + } catch ( Exception e ) { + throw new InvocationTargetException( e, "Error building fat jar: " + e.getMessage() ); + } + } + }; + + ProgressMonitorDialog pmd = new ProgressMonitorDialog( shell ); + pmd.run( true, true, op ); + MessageBox box = new MessageBox( shell, SWT.CLOSE | SWT.ICON_INFORMATION ); + box.setText( "Fat jar created" ); + box.setMessage( "A fat jar was successfully created : " + filename + Const.CR + "Included plugin folders: " + pluginFolders ); + box.open(); + } } catch(Exception e) { new ErrorDialog( shell, "Error", "Error creating fat jar", e ); } diff --git a/src/main/java/org/kettle/beam/perspective/messages/messages_en_US.properties b/src/main/java/org/kettle/beam/perspective/messages/messages_en_US.properties index a3889fb..a692c2d 100644 --- a/src/main/java/org/kettle/beam/perspective/messages/messages_en_US.properties +++ b/src/main/java/org/kettle/beam/perspective/messages/messages_en_US.properties @@ -41,3 +41,6 @@ BeamHelper.SelectBeamJobConfigToRun.Message = Select the Beam Job Configuration BeamHelper.ErrorRunningTransOnBeam.Title = Error BeamHelper.ErrorRunningTransOnBeam.Message = There was an error running your transformation on Beam + +BeamHelper.SelectJobConfigForFatJar.Title = Select Beam Job Config +BeamHelper.SelectJobConfigForFatJar.Message = Select the job config to know which plugins to include in the fat jar: diff --git a/src/main/java/org/kettle/beam/pipeline/fatjar/FatJarBuilder.java b/src/main/java/org/kettle/beam/pipeline/fatjar/FatJarBuilder.java index c05119b..d238c3e 100644 --- a/src/main/java/org/kettle/beam/pipeline/fatjar/FatJarBuilder.java +++ b/src/main/java/org/kettle/beam/pipeline/fatjar/FatJarBuilder.java @@ -1,16 +1,35 @@ package org.kettle.beam.pipeline.fatjar; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; +import org.eclipse.swt.widgets.Event; +import org.kettle.beam.metastore.BeamJobConfig; +import org.kettle.beam.pipeline.TransMetaPipelineConverter; import org.pentaho.di.core.Const; +import org.pentaho.di.core.annotations.Step; import org.pentaho.di.core.exception.KettleException; +import org.pentaho.di.core.extension.ExtensionPoint; +import org.pentaho.di.core.extension.ExtensionPointInterface; +import org.pentaho.di.core.extension.ExtensionPointPluginType; +import org.pentaho.di.core.plugins.JobEntryPluginType; +import org.pentaho.di.core.plugins.PluginInterface; +import org.pentaho.di.core.plugins.PluginRegistry; +import org.pentaho.di.core.plugins.PluginTypeInterface; +import org.pentaho.di.core.plugins.StepPluginType; +import org.pentaho.di.core.xml.XMLHandler; +import org.pentaho.di.job.entry.JobEntryInterface; +import org.pentaho.di.trans.step.StepMetaInterface; +import org.pentaho.di.ui.core.dialog.ErrorDialog; import java.io.FileInputStream; import java.io.FileOutputStream; -import java.io.StringWriter; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.zip.ZipEntry; import java.util.zip.ZipException; import java.util.zip.ZipInputStream; @@ -20,14 +39,19 @@ public class FatJarBuilder { private String targetJarFile; private List jarFiles; + private String extraStepPluginClasses; + private String extraXpPluginClasses; private transient Map fileContentMap; public FatJarBuilder() { jarFiles = new ArrayList<>(); + extraStepPluginClasses = null; + extraXpPluginClasses = null; } public FatJarBuilder( String targetJarFile, List jarFiles ) { + this(); this.targetJarFile = targetJarFile; this.jarFiles = jarFiles; } @@ -67,6 +91,15 @@ public void buildTargetJar() throws KettleException { if ( entryName.startsWith( "META-INF" ) && entryName.endsWith( ".RSA" ) ) { skip = true; } + if ( entryName.equals(Const.XML_FILE_KETTLE_STEPS)) { + skip = true; + } + if ( entryName.equals(Const.XML_FILE_KETTLE_EXTENSION_POINTS)) { + skip = true; + } + if ( entryName.equals(Const.XML_FILE_KETTLE_JOB_ENTRIES)) { + skip = true; + } if ( entryName.startsWith( "META-INF/services/" ) ) { merge = true; skip = true; @@ -82,13 +115,13 @@ public void buildTargetJar() throws KettleException { } } - if (merge) { + if ( merge ) { String fileContent = IOUtils.toString( zipInputStream, "UTF-8" ); String previousContent = fileContentMap.get( entryName ); - if (previousContent==null) { - fileContentMap.put(entryName, fileContent); + if ( previousContent == null ) { + fileContentMap.put( entryName, fileContent ); } else { - fileContentMap.put(entryName, previousContent+ Const.CR+fileContent); + fileContentMap.put( entryName, previousContent + Const.CR + fileContent ); } } else { int len; @@ -113,14 +146,20 @@ public void buildTargetJar() throws KettleException { // Add the META-INF/services files... // - for (String entryName : fileContentMap.keySet()) { - System.out.println("Entry merged: "+entryName); - String fileContent = fileContentMap.get(entryName); + for ( String entryName : fileContentMap.keySet() ) { + System.out.println( "Entry merged: " + entryName ); + String fileContent = fileContentMap.get( entryName ); zipOutputStream.putNextEntry( new ZipEntry( entryName ) ); zipOutputStream.write( fileContent.getBytes( "UTF-8" ) ); zipOutputStream.closeEntry(); } + // Add Steps, job entries and extension point plugins in XML files + // + addPluginsXmlFile(zipOutputStream, Const.XML_FILE_KETTLE_STEPS, "steps", "step", StepPluginType.class, StepMetaInterface.class, extraStepPluginClasses); + addPluginsXmlFile(zipOutputStream, Const.XML_FILE_KETTLE_JOB_ENTRIES, "job-entries", "job-entry", JobEntryPluginType.class, JobEntryInterface.class, null); + addPluginsXmlFile(zipOutputStream, Const.XML_FILE_KETTLE_EXTENSION_POINTS, "extension-points", "extension-point", ExtensionPointPluginType.class, ExtensionPointInterface.class, extraXpPluginClasses); + zipOutputStream.close(); } catch ( Exception e ) { throw new KettleException( "Unable to build far jar file '" + targetJarFile + "'", e ); @@ -130,6 +169,102 @@ public void buildTargetJar() throws KettleException { } + private void addPluginsXmlFile( ZipOutputStream zipOutputStream, String filename, String mainTag, String pluginTag, Class pluginTypeClass, Class mainPluginClass, String extraClasses ) throws Exception { + + // Write all the internal steps plus the selected classes... + // + StringBuilder xml = new StringBuilder( ); + xml.append( XMLHandler.openTag( mainTag ) ); + PluginRegistry registry = PluginRegistry.getInstance(); + List plugins = registry.getPlugins( pluginTypeClass ); + for (PluginInterface plugin : plugins) { + if (plugin.isNativePlugin()) { + addPluginToXml(xml, pluginTag, plugin, mainPluginClass); + } + } + if ( StringUtils.isNotEmpty(extraClasses)) { + for (String extraPluginClass : extraClasses.split( "," )) { + PluginInterface plugin = findPluginWithMainClass(extraPluginClass, pluginTypeClass, mainPluginClass); + if (plugin!=null) { + addPluginToXml( xml, pluginTag, plugin, mainPluginClass ); + } + } + } + + xml.append( XMLHandler.closeTag( mainTag ) ); + + zipOutputStream.putNextEntry( new ZipEntry( filename ) ); + zipOutputStream.write( xml.toString().getBytes( "UTF-8" ) ); + zipOutputStream.closeEntry(); + } + + private PluginInterface findPluginWithMainClass( String extraPluginClass, Class pluginTypeClass, Class mainClass) { + List plugins = PluginRegistry.getInstance().getPlugins( pluginTypeClass ); + for (PluginInterface plugin : plugins) { + String check = plugin.getClassMap().get( mainClass ); + if (check!=null && check.equals(extraPluginClass)) { + return plugin; + } + } + return null; + + } + + private void addPluginToXml( StringBuilder xml, String pluginTag, PluginInterface plugin, Class mainClass ) { + xml.append("<").append(pluginTag).append(" id=\""); + xml.append(plugin.getIds()[0]); + xml.append("\">"); + xml.append(XMLHandler.addTagValue( "description", plugin.getName() )); + xml.append(XMLHandler.addTagValue( "tooltip", plugin.getDescription() )); + xml.append(XMLHandler.addTagValue( "classname", plugin.getClassMap().get( mainClass ) )); + xml.append(XMLHandler.addTagValue( "category", plugin.getCategory() )); + xml.append(XMLHandler.addTagValue( "iconfile", plugin.getImageFile() )); + xml.append(XMLHandler.addTagValue( "documentation_url", plugin.getDocumentationUrl() )); + xml.append(XMLHandler.addTagValue( "cases_url", plugin.getCasesUrl() )); + xml.append(XMLHandler.addTagValue( "forum_url", plugin.getForumUrl() )); + xml.append(XMLHandler.closeTag( pluginTag )); + } + + public static String findPluginClasses( String pluginClassName, String pluginsToInclude) throws KettleException { + String plugins = pluginsToInclude; + + if ( StringUtils.isEmpty( plugins ) ) { + plugins = "kettle-beam"; + } else { + plugins += ",kettle-beam"; + } + + Set classes = new HashSet<>(); + String[] pluginFolders = plugins.split( "," ); + for ( String pluginFolder : pluginFolders ) { + try { + List stepClasses = TransMetaPipelineConverter.findAnnotatedClasses( pluginFolder, pluginClassName ); + for ( String stepClass : stepClasses ) { + classes.add( stepClass ); + } + } catch ( Exception e ) { + throw new KettleException( "Error find plugin classes of annotation type '" + pluginClassName + "' in folder '" + pluginFolder, e ); + } + } + + // OK, we now have all the classes... + // Let's sort by name and add them in the dialog comma separated... + // + List classesList = new ArrayList<>(); + classesList.addAll( classes ); + Collections.sort( classesList ); + + StringBuffer all = new StringBuffer(); + for ( String pluginClass : classesList ) { + if ( all.length() > 0 ) { + all.append( "," ); + } + all.append( pluginClass ); + } + + return all.toString(); + } + /** * Gets targetJarFile @@ -162,4 +297,52 @@ public List getJarFiles() { public void setJarFiles( List jarFiles ) { this.jarFiles = jarFiles; } + + /** + * Gets extraStepPluginClasses + * + * @return value of extraStepPluginClasses + */ + public String getExtraStepPluginClasses() { + return extraStepPluginClasses; + } + + /** + * @param extraStepPluginClasses The extraStepPluginClasses to set + */ + public void setExtraStepPluginClasses( String extraStepPluginClasses ) { + this.extraStepPluginClasses = extraStepPluginClasses; + } + + /** + * Gets extraXpPluginClasses + * + * @return value of extraXpPluginClasses + */ + public String getExtraXpPluginClasses() { + return extraXpPluginClasses; + } + + /** + * @param extraXpPluginClasses The extraXpPluginClasses to set + */ + public void setExtraXpPluginClasses( String extraXpPluginClasses ) { + this.extraXpPluginClasses = extraXpPluginClasses; + } + + /** + * Gets fileContentMap + * + * @return value of fileContentMap + */ + public Map getFileContentMap() { + return fileContentMap; + } + + /** + * @param fileContentMap The fileContentMap to set + */ + public void setFileContentMap( Map fileContentMap ) { + this.fileContentMap = fileContentMap; + } } diff --git a/src/main/java/org/kettle/beam/pipeline/main/MainBeam.java b/src/main/java/org/kettle/beam/pipeline/main/MainBeam.java index 36d801a..67da97f 100644 --- a/src/main/java/org/kettle/beam/pipeline/main/MainBeam.java +++ b/src/main/java/org/kettle/beam/pipeline/main/MainBeam.java @@ -10,6 +10,7 @@ import org.kettle.beam.metastore.BeamJobConfig; import org.kettle.beam.pipeline.KettleBeamPipelineExecutor; import org.kettle.beam.util.BeamConst; +import org.pentaho.di.core.Const; import org.pentaho.di.core.exception.KettleException; import org.pentaho.di.core.logging.LogChannel; import org.pentaho.di.core.plugins.PluginInterface; @@ -36,8 +37,6 @@ public static final int mainMethod( final String[] args, final String environmen System.out.println( "Transformation ktr / args[0] : " + args[ 0 ] ); System.out.println( "MetaStore JSON / args[1] : " + args[ 1 ] ); System.out.println( "Beam Job Config / args[2] : " + args[ 2 ] ); - System.out.println( "Step plugins / args[3] : " + args[ 3 ] ); - System.out.println( "XP plugins / args[4] : " + args[ 4 ] ); // Read the transformation XML and MetaStore from Hadoop FS // @@ -49,17 +48,16 @@ public static final int mainMethod( final String[] args, final String environmen // String jobConfigName = args[ 2 ]; - // Extra plugins to load from the fat jar file... - // - String stepPlugins = args[ 3 ]; - String xpPlugins = args[ 4 ]; - // Inflate the metaStore... // IMetaStore metaStore = new SerializableMetaStore( metaStoreJson ); - List stepPluginsList = new ArrayList<>( Arrays.asList( stepPlugins.split( "," ) ) ); - List xpPluginsList = new ArrayList<>( Arrays.asList( xpPlugins.split( "," ) ) ); + System.out.println( ">>>>>> Loading Kettle Beam Job Config '" + jobConfigName + "'" ); + MetaStoreFactory configFactory = new MetaStoreFactory<>( BeamJobConfig.class, metaStore, PentahoDefaults.NAMESPACE ); + BeamJobConfig jobConfig = configFactory.loadElement( jobConfigName ); + + List stepPluginsList = new ArrayList<>( Arrays.asList( Const.NVL(jobConfig.getStepPluginClasses(), "").split( "," ) ) ); + List xpPluginsList = new ArrayList<>( Arrays.asList( Const.NVL(jobConfig.getXpPluginClasses(), "").split( "," ) ) ); System.out.println( ">>>>>> Initializing Kettle runtime (" + stepPluginsList.size() + " step classes, " + xpPluginsList.size() + " XP classes)" ); @@ -69,9 +67,6 @@ public static final int mainMethod( final String[] args, final String environmen TransMeta transMeta = new TransMeta( XMLHandler.loadXMLString( transMetaXml, TransMeta.XML_TAG ), null ); transMeta.setMetaStore( metaStore ); - System.out.println( ">>>>>> Loading Kettle Beam Job Config '" + jobConfigName + "'" ); - MetaStoreFactory configFactory = new MetaStoreFactory<>( BeamJobConfig.class, metaStore, PentahoDefaults.NAMESPACE ); - BeamJobConfig jobConfig = configFactory.loadElement( jobConfigName ); String hadoopConfDir = System.getenv( "HADOOP_CONF_DIR" ); System.out.println( ">>>>>> HADOOP_CONF_DIR='" + hadoopConfDir + "'" );