Skip to content
This repository has been archived by the owner on Dec 15, 2022. It is now read-only.

Commit

Permalink
Issue #21 (DateTime fixes) for 0.2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
mattcasters committed Jan 14, 2019
1 parent 4b34111 commit 2c93836
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public boolean isOutput() {
xpPluginClasses
)) );


// Save this in the map
//
stepCollectionMap.put( stepMeta.getName(), stepPCollection );
Expand Down
88 changes: 62 additions & 26 deletions src/main/java/org/kettle/beam/steps/bq/BeamBQInputDialog.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@

package org.kettle.beam.steps.bq;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.FieldList;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableDefinition;
import org.apache.commons.lang.StringUtils;
import org.eclipse.swt.SWT;
import org.eclipse.swt.events.SelectionAdapter;
import org.eclipse.swt.events.SelectionEvent;
Expand All @@ -10,34 +18,29 @@
import org.eclipse.swt.layout.FormData;
import org.eclipse.swt.layout.FormLayout;
import org.eclipse.swt.widgets.Button;
import org.eclipse.swt.widgets.Combo;
import org.eclipse.swt.widgets.Control;
import org.eclipse.swt.widgets.Display;
import org.eclipse.swt.widgets.Event;
import org.eclipse.swt.widgets.Label;
import org.eclipse.swt.widgets.Listener;
import org.eclipse.swt.widgets.Shell;
import org.eclipse.swt.widgets.TableItem;
import org.eclipse.swt.widgets.Text;
import org.kettle.beam.metastore.FileDefinition;
import org.kettle.beam.core.fn.BQSchemaAndRecordToKettleFn;
import org.pentaho.di.core.Const;
import org.pentaho.di.core.Props;
import org.pentaho.di.core.row.RowMeta;
import org.pentaho.di.core.row.RowMetaInterface;
import org.pentaho.di.core.row.value.ValueMetaFactory;
import org.pentaho.di.core.util.Utils;
import org.pentaho.di.core.variables.Variables;
import org.pentaho.di.i18n.BaseMessages;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.BaseStepMeta;
import org.pentaho.di.trans.step.StepDialogInterface;
import org.pentaho.di.ui.core.dialog.ErrorDialog;
import org.pentaho.di.ui.core.widget.ColumnInfo;
import org.pentaho.di.ui.core.widget.TableView;
import org.pentaho.di.ui.core.widget.TextVar;
import org.pentaho.di.ui.trans.step.BaseStepDialog;
import org.pentaho.metastore.persist.MetaStoreFactory;
import org.pentaho.metastore.util.PentahoDefaults;

import java.util.Collections;
import java.util.List;


public class BeamBQInputDialog extends BaseStepDialog implements StepDialogInterface {
Expand Down Expand Up @@ -148,19 +151,19 @@ public String open() {
wTableId.setLayoutData( fdTableId );
lastControl = wTableId;

Label wlQuery = new Label( shell, SWT.RIGHT | SWT.MULTI | SWT.H_SCROLL | SWT.V_SCROLL );
Label wlQuery = new Label( shell, SWT.LEFT );
wlQuery.setText( BaseMessages.getString( PKG, "BeamBQInputDialog.Query" ) );
props.setLook( wlQuery );
FormData fdlQuery = new FormData();
fdlQuery.left = new FormAttachment( 0, 0 );
fdlQuery.top = new FormAttachment( lastControl, margin );
fdlQuery.right = new FormAttachment( middle, -margin );
fdlQuery.right = new FormAttachment( 100, 0 );
wlQuery.setLayoutData( fdlQuery );
wQuery = new TextVar( transMeta, shell, SWT.SINGLE | SWT.LEFT | SWT.BORDER );
wQuery = new TextVar( transMeta, shell, SWT.LEFT | SWT.MULTI | SWT.H_SCROLL | SWT.V_SCROLL );
props.setLook( wQuery, Props.WIDGET_STYLE_FIXED);
FormData fdQuery = new FormData();
fdQuery.left = new FormAttachment( middle, 0 );
fdQuery.top = new FormAttachment( wlQuery, 0, SWT.CENTER );
fdQuery.left = new FormAttachment( 0, 0 );
fdQuery.top = new FormAttachment( wlQuery, margin );
fdQuery.right = new FormAttachment( 100, 0 );
fdQuery.bottom = new FormAttachment( wlQuery, 250);
wQuery.setLayoutData( fdQuery );
Expand All @@ -177,11 +180,12 @@ public String open() {

wOK = new Button( shell, SWT.PUSH );
wOK.setText( BaseMessages.getString( PKG, "System.Button.OK" ) );

wGet = new Button(shell, SWT.PUSH);
wGet.setText(BaseMessages.getString( PKG, "System.Button.GetFields" ) );
wCancel = new Button( shell, SWT.PUSH );
wCancel.setText( BaseMessages.getString( PKG, "System.Button.Cancel" ) );

setButtonPositions( new Button[] { wOK, wCancel }, margin, null );
setButtonPositions( new Button[] { wOK, wGet, wCancel }, margin, null );

ColumnInfo[] columns = new ColumnInfo[] {
new ColumnInfo( BaseMessages.getString( PKG, "BeamBQInputDialog.Fields.Column.Name" ), ColumnInfo.COLUMN_TYPE_TEXT, false, false ),
Expand All @@ -199,18 +203,11 @@ public String open() {
lastControl = wFields;

// Add listeners
lsOK = new Listener() {
public void handleEvent( Event e ) {
ok();
}
};
lsCancel = new Listener() {
public void handleEvent( Event e ) {
cancel();
}
};
lsOK = e -> ok();
lsCancel = e -> cancel();

wOK.addListener( SWT.Selection, lsOK );
wGet.addListener( SWT.Selection, e-> getFields() );
wCancel.addListener( SWT.Selection, lsCancel );

lsDef = new SelectionAdapter() {
Expand Down Expand Up @@ -244,6 +241,45 @@ public void shellClosed( ShellEvent e ) {
return stepname;
}

public void getFields() {
try {

BeamBQInputMeta meta = new BeamBQInputMeta();
getInfo(meta);

BigQuery bigQuery = BigQueryOptions.getDefaultInstance().getService();

if ( StringUtils.isNotEmpty( meta.getDatasetId() ) &&
StringUtils.isNotEmpty( meta.getTableId() )) {

Table table = bigQuery.getTable(
transMeta.environmentSubstitute( meta.getDatasetId()),
transMeta.environmentSubstitute( meta.getTableId() )
);

TableDefinition definition = table.getDefinition();
Schema schema = definition.getSchema();
FieldList fieldList = schema.getFields();

RowMetaInterface rowMeta = new RowMeta();
for ( int i = 0; i< fieldList.size(); i++) {
Field field = fieldList.get( i );

String name = field.getName();
String type = field.getType().name();

int kettleType = BQSchemaAndRecordToKettleFn.AvroType.valueOf( type ).getKettleType();
rowMeta.addValueMeta( ValueMetaFactory.createValueMeta( name, kettleType ) );
}

BaseStepDialog.getFieldsFromPrevious( rowMeta, wFields, 1, new int[] { 1 }, new int[] { 3 }, -1, -1, true, null );
}

} catch ( Exception e ) {
new ErrorDialog( shell, "Error", "Error getting BQ fields", e );
}
}


/**
* Populate the widgets.
Expand Down
47 changes: 29 additions & 18 deletions src/main/java/org/kettle/beam/steps/bq/BeamBQInputMeta.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,21 @@
package org.kettle.beam.steps.bq;

import com.google.api.services.bigquery.model.TableReference;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.FieldList;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableDefinition;
import org.apache.commons.lang.StringUtils;
import org.pentaho.di.core.annotations.Step;
import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.exception.KettleStepException;
import org.pentaho.di.core.exception.KettleXMLException;
import org.pentaho.di.core.logging.LogChannel;
import org.pentaho.di.core.row.RowMetaInterface;
import org.pentaho.di.core.row.ValueMetaInterface;
import org.pentaho.di.core.row.value.ValueMetaFactory;
Expand Down Expand Up @@ -77,7 +88,7 @@ public BeamBQInputMeta() {
valueMeta.setOrigin( name );
inputRowMeta.addValueMeta( valueMeta );
}
} catch(Exception e) {
} catch ( Exception e ) {
throw new KettleStepException( "Error getting Beam BQ Input step output", e );
}
}
Expand All @@ -88,37 +99,37 @@ public BeamBQInputMeta() {

xml.append( XMLHandler.addTagValue( PROJECT_ID, projectId ) );
xml.append( XMLHandler.addTagValue( DATASET_ID, datasetId ) );
xml.append( XMLHandler.addTagValue( TABLE_ID, tableId) );
xml.append( XMLHandler.addTagValue( QUERY, query) );

xml.append( XMLHandler.openTag( "fields") );
for (BQField field : fields ) {
xml.append( XMLHandler.openTag( "field") );
xml.append( XMLHandler.addTagValue( "name", field.getName()) );
xml.append( XMLHandler.addTagValue( "new_name", field.getNewName()) );
xml.append( XMLHandler.addTagValue( "type", field.getKettleType()) );
xml.append( XMLHandler.closeTag( "field") );
xml.append( XMLHandler.addTagValue( TABLE_ID, tableId ) );
xml.append( XMLHandler.addTagValue( QUERY, query ) );

xml.append( XMLHandler.openTag( "fields" ) );
for ( BQField field : fields ) {
xml.append( XMLHandler.openTag( "field" ) );
xml.append( XMLHandler.addTagValue( "name", field.getName() ) );
xml.append( XMLHandler.addTagValue( "new_name", field.getNewName() ) );
xml.append( XMLHandler.addTagValue( "type", field.getKettleType() ) );
xml.append( XMLHandler.closeTag( "field" ) );
}
xml.append( XMLHandler.closeTag( "fields") );
xml.append( XMLHandler.closeTag( "fields" ) );

return xml.toString();
}

@Override public void loadXML( Node stepNode, List<DatabaseMeta> databases, IMetaStore metaStore ) throws KettleXMLException {

projectId = XMLHandler.getTagValue( stepNode, PROJECT_ID );
datasetId= XMLHandler.getTagValue( stepNode, DATASET_ID );
tableId= XMLHandler.getTagValue( stepNode, TABLE_ID);
query= XMLHandler.getTagValue( stepNode, QUERY);
datasetId = XMLHandler.getTagValue( stepNode, DATASET_ID );
tableId = XMLHandler.getTagValue( stepNode, TABLE_ID );
query = XMLHandler.getTagValue( stepNode, QUERY );

Node fieldsNode = XMLHandler.getSubNode( stepNode, "fields" );
List<Node> fieldNodes = XMLHandler.getNodes( fieldsNode, "field" );
fields =new ArrayList<>( );
for (Node fieldNode : fieldNodes) {
fields = new ArrayList<>();
for ( Node fieldNode : fieldNodes ) {
String name = XMLHandler.getTagValue( fieldNode, "name" );
String newName = XMLHandler.getTagValue( fieldNode, "new_name" );
String kettleType = XMLHandler.getTagValue( fieldNode, "type" );
fields.add(new BQField( name, newName, kettleType ));
fields.add( new BQField( name, newName, kettleType ) );
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ BeamBQInputDialog.DialogTitle = Beam BigQuery Input
BeamBQInputDialog.ProjectId = Project ID
BeamBQInputDialog.DatasetId = Data Set ID
BeamBQInputDialog.TableId = Table ID
BeamBQInputDialog.Query = Query (blank mean everything)
BeamBQInputDialog.Query = Query. Blank means everything. Manually specify result fields below:
BeamBQInputDialog.Fields = Return fields selection:
BeamBQInputDialog.Fields.Column.Name = BQ Field name
BeamBQInputDialog.Fields.Column.NewName = Rename to... (optional)
Expand Down

0 comments on commit 2c93836

Please sign in to comment.