Skip to content
This repository has been archived by the owner on Dec 15, 2022. It is now read-only.

Commit

Permalink
issue #51 : Beam Kafka Consume
Browse files Browse the repository at this point in the history
  • Loading branch information
mattcasters committed Feb 10, 2020
1 parent 9a9a0c5 commit 994db07
Show file tree
Hide file tree
Showing 8 changed files with 640 additions and 75 deletions.
10 changes: 9 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<artifactId>kettle-beam</artifactId>
<name>Kettle Beam</name>
<description>Kettle runtime plugins for Apache Beam</description>
<version>1.0.1-SNAPSHOT</version>
<version>1.0.2</version>

<parent>
<groupId>org.pentaho</groupId>
Expand Down Expand Up @@ -200,6 +200,14 @@


<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.2</version>
<configuration>
<argLine>-Xmx2g</argLine>
</configuration>
</plugin>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.kettle.beam.core.transform;

import com.google.common.collect.ImmutableMap;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
Expand All @@ -9,17 +10,21 @@
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.joda.time.Duration;
import org.kettle.beam.core.BeamKettle;
import org.kettle.beam.core.KettleRow;
import org.kettle.beam.core.fn.KVStringStringToKettleRowFn;
import org.kettle.beam.steps.kafka.ConfigOption;
import org.pentaho.di.core.Const;
import org.pentaho.di.core.row.RowMetaInterface;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class BeamKafkaInputTransform extends PTransform<PBegin, PCollection<KettleRow>> {

Expand All @@ -28,6 +33,14 @@ public class BeamKafkaInputTransform extends PTransform<PBegin, PCollection<Kett
private String stepname;
private String bootstrapServers;
private String topics;
private String groupId;
private boolean usingProcessingTime; // default
private boolean usingLogAppendTime;
private boolean usingCreateTime;
private boolean restrictedToCommitted;
private boolean allowingCommitOnConsumedOffset;
private List<ConfigOption> configOptions;

private String rowMetaJson;
private List<String> stepPluginClasses;
private List<String> xpPluginClasses;
Expand All @@ -41,11 +54,25 @@ public class BeamKafkaInputTransform extends PTransform<PBegin, PCollection<Kett
public BeamKafkaInputTransform() {
}

public BeamKafkaInputTransform( @Nullable String name, String stepname, String bootstrapServers, String topics, String rowMetaJson, List<String> stepPluginClasses, List<String> xpPluginClasses ) {
public BeamKafkaInputTransform( @Nullable String name, String stepname, String bootstrapServers, String topics, String groupId,
boolean usingProcessingTime, boolean usingLogAppendTime, boolean usingCreateTime,
boolean restrictedToCommitted, boolean allowingCommitOnConsumedOffset,
String[] configOptionParameters, String[] configOptionValues, String configOptionTypes[],
String rowMetaJson, List<String> stepPluginClasses, List<String> xpPluginClasses ) {
super( name );
this.stepname = stepname;
this.bootstrapServers = bootstrapServers;
this.topics = topics;
this.groupId = groupId;
this.usingProcessingTime = usingProcessingTime;
this.usingLogAppendTime = usingLogAppendTime;
this.usingCreateTime = usingCreateTime;
this.restrictedToCommitted = restrictedToCommitted;
this.allowingCommitOnConsumedOffset = allowingCommitOnConsumedOffset;
this.configOptions = new ArrayList<>( );
for (int i=0;i<configOptionParameters.length;i++) {
this.configOptions.add(new ConfigOption( configOptionParameters[i], configOptionValues[i], ConfigOption.Type.getTypeFromName( configOptionTypes[i] ) ));
}
this.rowMetaJson = rowMetaJson;
this.stepPluginClasses = stepPluginClasses;
this.xpPluginClasses = xpPluginClasses;
Expand All @@ -64,23 +91,58 @@ public BeamKafkaInputTransform( @Nullable String name, String stepname, String b
topicList.add( Const.trim( topic ) );
}

PTransform<PBegin, PCollection<KV<String, String>>> kafkaReadTransform = KafkaIO.<String, String>read()
// TODO: add custom configuration options to this map:
Map<String, Object> consumerConfigUpdates = new HashMap<>( );
consumerConfigUpdates.put( "group.id", groupId );
for (ConfigOption configOption : configOptions) {
Object value;
String optionValue = configOption.getValue();
switch(configOption.getType()) {
case String:value=optionValue; break;
case Short: value=Short.valueOf( optionValue ); break;
case Int: value = Integer.valueOf( optionValue ); break;
case Long: value = Long.valueOf( optionValue ); break;
case Double: value = Double.valueOf( optionValue ); break;
case Boolean: value = Boolean.valueOf( optionValue ); break;
default:
throw new RuntimeException( "Config option parameter "+configOption.getParameter()+" uses unsupported type "+configOption.getType().name() );
}
consumerConfigUpdates.put(configOption.getParameter(), value);
}

KafkaIO.Read<String, String> io = KafkaIO.<String, String>read()
.withBootstrapServers( bootstrapServers )
.withConsumerConfigUpdates( consumerConfigUpdates )
.withTopics( topicList )
.withKeyDeserializer( StringDeserializer.class )
.withValueDeserializer( StringDeserializer.class )
.withoutMetadata();
.withValueDeserializer( StringDeserializer.class );

if (usingProcessingTime) {
io = io.withProcessingTime();
}
if (usingLogAppendTime) {
io = io.withLogAppendTime();
}
if (usingCreateTime) {
io = io.withCreateTime( Duration.ZERO ); // TODO Configure this
}

if (restrictedToCommitted) {
io = io.withReadCommitted();
}
if (allowingCommitOnConsumedOffset) {
io = io.commitOffsetsInFinalize();
}

// Read keys and values from Kafka
//
PCollection<KV<String, String>> kafkaConsumerOutput = input
.apply( kafkaReadTransform );
PCollection<KV<String, String>> kafkaConsumerOutput = input.apply( io.withoutMetadata() );

// Now convert this into Kettle rows with a single String value in them
//
PCollection<KettleRow> output = kafkaConsumerOutput.apply( ParDo.of(
new KVStringStringToKettleRowFn( stepname, rowMetaJson, stepPluginClasses, xpPluginClasses )
) );
PCollection<KettleRow> output = kafkaConsumerOutput.apply(
ParDo.of(new KVStringStringToKettleRowFn( stepname, rowMetaJson, stepPluginClasses, xpPluginClasses ))
);

return output;

Expand Down
139 changes: 78 additions & 61 deletions src/main/java/org/kettle/beam/core/transform/StepTransform.java
Original file line number Diff line number Diff line change
Expand Up @@ -249,20 +249,20 @@ public StepFn( List<VariableValue> variableValues, String metastoreJson, List<St
* @param startBundleContext
*/
@StartBundle
public void startBundle(StartBundleContext startBundleContext) {
public void startBundle( StartBundleContext startBundleContext ) {
rowBuffer = new ArrayList<>();
if (startBundleCounter==null) {
if ( startBundleCounter == null ) {
startBundleCounter = Metrics.counter( "startBundle", stepname );
}
startBundleCounter.inc();
if ("ScriptValueMod".equals(stepPluginId) && trans!=null) {
initialize=true;
if ( "ScriptValueMod".equals( stepPluginId ) && trans != null ) {
initialize = true;
}
}

@Teardown
public void tearDown() {
if (timer!=null) {
if ( timer != null ) {
timer.cancel();
}
}
Expand All @@ -272,7 +272,7 @@ public void processElement( ProcessContext context, BoundedWindow window ) {

try {

if (initialize) {
if ( initialize ) {
initialize = false;

// Initialize Kettle and load extra plugins as well
Expand Down Expand Up @@ -531,29 +531,29 @@ public void processElement( ProcessContext context, BoundedWindow window ) {
// When the first row ends up in the buffer we start the timer.
// If the rows are flushed out we reset back to -1;
//
bufferStartTime = new AtomicLong(-1L);
bufferStartTime = new AtomicLong( -1L );
flushing = new AtomicBoolean( false );

// Install a timer to check every second if the buffer is stale and needs to be flushed...
//
if (flushIntervalSeconds>0) {
if ( flushIntervalSeconds > 0 ) {
TimerTask timerTask = new TimerTask() {
@Override public void run() {
// Check on the state of the buffer, flush if needed...
//
if (!flushing.get() && !rowBuffer.isEmpty() && bufferStartTime.get()>0) {
if ( !flushing.get() && !rowBuffer.isEmpty() && bufferStartTime.get() > 0 ) {
long difference = System.currentTimeMillis() - bufferStartTime.get();
if (difference>flushIntervalSeconds*1000) {
if ( difference > flushIntervalSeconds * 1000 ) {
try {
emptyRowBuffer( new StepProcessContext( context ) );
} catch(Exception e) {
throw new RuntimeException( "Unable to flush row buffer when it got stale after "+difference+" ms", e );
} catch ( Exception e ) {
throw new RuntimeException( "Unable to flush row buffer when it got stale after " + difference + " ms", e );
}
}
}
}
};
timer = new Timer("Flush timer of step "+stepname);
timer = new Timer( "Flush timer of step " + stepname );
timer.schedule( timerTask, 1000, 1000 );
}
}
Expand All @@ -566,16 +566,16 @@ public void processElement( ProcessContext context, BoundedWindow window ) {

// Take care of the age of the buffer...
//
if (flushIntervalSeconds>0 && rowBuffer.isEmpty()) {
if ( flushIntervalSeconds > 0 && rowBuffer.isEmpty() ) {
bufferStartTime.set( System.currentTimeMillis() );
}

// Add the row to the buffer.
//
rowBuffer.add(inputRow);
rowBuffer.add( inputRow );
batchWindow = window;

if (rowBuffer.size()>=batchSize) {
if ( rowBuffer.size() >= batchSize ) {
emptyRowBuffer( new StepProcessContext( context ) );
}
} catch ( Exception e ) {
Expand All @@ -586,72 +586,89 @@ public void processElement( ProcessContext context, BoundedWindow window ) {
}

@FinishBundle
public void finishBundle(FinishBundleContext context) {
public void finishBundle( FinishBundleContext context ) {
try {
if ( rowBuffer.size() > 0 ) {
emptyRowBuffer( new StepFinishBundleContext( context, batchWindow ) );
}
} catch(Exception e) {
} catch ( Exception e ) {
numErrors.inc();
LOG.info( "Step finishing bundle error :" + e.getMessage() );
throw new RuntimeException( "Error finishing bundle of StepFn", e );
}

}

private synchronized void emptyRowBuffer( TupleOutputContext<KettleRow> context ) throws KettleException {
private transient int maxInputBufferSize = 0;
private transient int minInputBufferSize = Integer.MAX_VALUE;

if (!flushing.get()) {
flushing.set( true );
private synchronized void emptyRowBuffer( TupleOutputContext<KettleRow> context ) throws KettleException {

// Empty all the row buffers for another iteration
//
resultRows.clear();
for ( int t = 0; t < targetSteps.size(); t++ ) {
targetResultRowsList.get( t ).clear();
}
if ( !flushing.get() ) {
try {
flushing.set( true );

// Pass the rows in the rowBuffer to the input RowSet
//
if ( !inputStep ) {
for ( KettleRow inputRow : rowBuffer ) {
rowProducer.putRow( inputRowMeta, inputRow.getRow() );
// Empty all the row buffers for another iteration
//
resultRows.clear();
for ( int t = 0; t < targetSteps.size(); t++ ) {
targetResultRowsList.get( t ).clear();
}
}

// Execute all steps in the transformation
//
executor.oneIteration();
// Pass the rows in the rowBuffer to the input RowSet
//
if ( !inputStep ) {
if ( stepname.equals( "(:Beers)" ) ) {

// Evaluate the results...
//
if ( maxInputBufferSize < rowBuffer.size() ) {
Metrics.counter( "maxInputSize", stepname ).inc( rowBuffer.size() - maxInputBufferSize );
maxInputBufferSize = rowBuffer.size();
}
if ( minInputBufferSize > rowBuffer.size() ) {
Metrics.counter( "minInputSize", stepname ).dec( minInputBufferSize - minInputBufferSize );
minInputBufferSize = rowBuffer.size();
}
}

// Pass all rows in the output to the process context
//
// System.out.println("Rows read in main output of step '"+stepname+"' : "+resultRows.size());
for ( Object[] resultRow : resultRows ) {
for ( KettleRow inputRow : rowBuffer ) {
rowProducer.putRow( inputRowMeta, inputRow.getRow() );
}
}

// Pass the row to the process context
// Execute all steps in the transformation
//
context.output( mainTupleTag, new KettleRow( resultRow ) );
writtenCounter.inc();
}
executor.oneIteration();

// Pass whatever ended up on the target nodes
//
for ( int t = 0; t < targetResultRowsList.size(); t++ ) {
List<Object[]> targetRowsList = targetResultRowsList.get( t );
TupleTag<KettleRow> tupleTag = tupleTagList.get( t );
// Evaluate the results...
//

// Pass all rows in the output to the process context
//
// System.out.println("Rows read in main output of step '"+stepname+"' : "+resultRows.size());
for ( Object[] resultRow : resultRows ) {

for ( Object[] targetRow : targetRowsList ) {
context.output( tupleTag, new KettleRow( targetRow ) );
// Pass the row to the process context
//
context.output( mainTupleTag, new KettleRow( resultRow ) );
writtenCounter.inc();
}
}

flushBufferCounter.inc();
rowBuffer.clear();
bufferStartTime.set(-1L);
flushing.set(false);
// Pass whatever ended up on the target nodes
//
for ( int t = 0; t < targetResultRowsList.size(); t++ ) {
List<Object[]> targetRowsList = targetResultRowsList.get( t );
TupleTag<KettleRow> tupleTag = tupleTagList.get( t );

for ( Object[] targetRow : targetRowsList ) {
context.output( tupleTag, new KettleRow( targetRow ) );
}
}

flushBufferCounter.inc();
rowBuffer.clear();
bufferStartTime.set( -1L );
} finally {
flushing.set( false );
}
}
}

Expand Down Expand Up @@ -684,10 +701,10 @@ private StepMetaDataCombi findCombi( Trans trans, String stepname ) {
}

private interface TupleOutputContext<T> {
void output( TupleTag<T> tupleTag, T output ) ;
void output( TupleTag<T> tupleTag, T output );
}

private class StepProcessContext implements TupleOutputContext<KettleRow> {
private class StepProcessContext implements TupleOutputContext<KettleRow> {

private DoFn.ProcessContext context;

Expand Down
Loading

0 comments on commit 994db07

Please sign in to comment.