Skip to content
This repository has been archived by the owner on Dec 15, 2022. It is now read-only.

Commit

Permalink
Issue #8 : Beam Run Config support
Browse files Browse the repository at this point in the history
  • Loading branch information
mattcasters committed Dec 16, 2018
1 parent d7f1986 commit bf21d21
Show file tree
Hide file tree
Showing 5 changed files with 387 additions and 12 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -274,5 +274,5 @@


<groupId>kettle-beam</groupId>
<version>0.0.4-SNAPSHOT</version>
<version>0.0.5-SNAPSHOT</version>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package org.kettle.beam.pipeline;

import org.apache.beam.sdk.PipelineResult;

public interface BeamMetricsUpdatedListener {

void beamMetricsUpdated( PipelineResult pipelineResult );
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,45 +26,49 @@
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.logging.LogChannelInterface;
import org.pentaho.di.core.parameters.UnknownParamException;
import org.pentaho.di.core.plugins.KettleURLClassLoader;
import org.pentaho.di.core.variables.VariableSpace;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.metastore.api.IMetaStore;

import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;

public class KettleBeamPipelineExecutor {

private final LogChannelInterface log;
private LogChannelInterface log;
private TransMeta transMeta;
private BeamJobConfig jobConfig;
private final IMetaStore metaStore;
private IMetaStore metaStore;
private ClassLoader classLoader;
private List<BeamMetricsUpdatedListener> updatedListeners;
private boolean loggingMetrics;

private KettleBeamPipelineExecutor() {
this.updatedListeners = new ArrayList<>( );
}

public KettleBeamPipelineExecutor( LogChannelInterface log, TransMeta transMeta, BeamJobConfig jobConfig, IMetaStore metaStore, ClassLoader classLoader ) {
this();
this.log = log;
this.transMeta = transMeta;
this.jobConfig = jobConfig;
this.metaStore = metaStore;
this.classLoader = classLoader;
this.loggingMetrics = true;
}

public void execute() throws KettleException {
public PipelineResult execute() throws KettleException {
ClassLoader oldContextClassLoader = Thread.currentThread().getContextClassLoader();
try {
// Explain to various classes in the Beam API (@see org.apache.beam.sdk.io.FileSystems)
// what the context classloader is.
// Set it back when we're done here.
//

Thread.currentThread().setContextClassLoader( classLoader );

final Pipeline pipeline = getPipeline( transMeta, jobConfig );
Expand All @@ -74,13 +78,24 @@ public void execute() throws KettleException {
Timer timer = new Timer();
TimerTask timerTask = new TimerTask() {
@Override public void run() {
logMetrics( pipelineResult );

// Log the metrics...
//
if (isLoggingMetrics()) {
logMetrics( pipelineResult );
}

// Update the listeners.
//
updateListeners( pipelineResult );
}
};
// Every 10 seconds
timer.schedule( timerTask, 10000, 10000 );
// Every 5 seconds
//
timer.schedule( timerTask, 5000, 5000 );

// Wait until we're done
//
pipelineResult.waitUntilFinish();

timer.cancel();
Expand All @@ -89,7 +104,13 @@ public void execute() throws KettleException {
// Log the metrics at the end.
logMetrics( pipelineResult );

// Update a last time
//
updateListeners( pipelineResult );

log.logBasic( " ----------------- End of Beam job " + pipeline.getOptions().getJobName() + " -----------------------" );

return pipelineResult;
} finally {
Thread.currentThread().setContextClassLoader( oldContextClassLoader );
}
Expand All @@ -107,6 +128,12 @@ private void logMetrics( PipelineResult pipelineResult ) {
}
}

public void updateListeners(PipelineResult pipelineResult) {
for (BeamMetricsUpdatedListener listener : updatedListeners) {
listener.beamMetricsUpdated(pipelineResult);
}
}

public Pipeline getPipeline( TransMeta transMeta, BeamJobConfig config ) throws KettleException {

try {
Expand Down Expand Up @@ -307,4 +334,36 @@ private void configureFlinkOptions( BeamJobConfig config, FlinkPipelineOptions o

}


/**
* Gets updatedListeners
*
* @return value of updatedListeners
*/
public List<BeamMetricsUpdatedListener> getUpdatedListeners() {
return updatedListeners;
}

/**
* @param updatedListeners The updatedListeners to set
*/
public void setUpdatedListeners( List<BeamMetricsUpdatedListener> updatedListeners ) {
this.updatedListeners = updatedListeners;
}

/**
* Gets loggingMetrics
*
* @return value of loggingMetrics
*/
public boolean isLoggingMetrics() {
return loggingMetrics;
}

/**
* @param loggingMetrics The loggingMetrics to set
*/
public void setLoggingMetrics( boolean loggingMetrics ) {
this.loggingMetrics = loggingMetrics;
}
}
25 changes: 25 additions & 0 deletions src/main/java/org/kettle/beam/xp/BeamDummyTrans.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package org.kettle.beam.xp;

import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.StepDataInterface;
import org.pentaho.di.trans.step.StepInterface;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.steps.dummytrans.DummyTrans;

public class BeamDummyTrans extends DummyTrans implements StepInterface {

protected boolean init;

public BeamDummyTrans( StepMeta stepMeta, StepDataInterface stepDataInterface, int copyNr, TransMeta transMeta, Trans trans ) {
super( stepMeta, stepDataInterface, copyNr, transMeta, trans );
}

public void setInit( boolean init ) {
this.init = init;
}

@Override public boolean isInitialising() {
return init;
}
}
Loading

0 comments on commit bf21d21

Please sign in to comment.