From 2c93836310cb787937c2f03f5f864c2b640eadc8 Mon Sep 17 00:00:00 2001 From: Matt Casters Date: Mon, 14 Jan 2019 09:37:27 +0100 Subject: [PATCH] Issue #21 (DateTime fixes) for 0.2.0 --- .../handler/BeamTimestampStepHandler.java | 1 + .../beam/steps/bq/BeamBQInputDialog.java | 88 +++++++++++++------ .../kettle/beam/steps/bq/BeamBQInputMeta.java | 47 ++++++---- .../bq/messages/messages_en_US.properties | 2 +- 4 files changed, 93 insertions(+), 45 deletions(-) diff --git a/src/main/java/org/kettle/beam/pipeline/handler/BeamTimestampStepHandler.java b/src/main/java/org/kettle/beam/pipeline/handler/BeamTimestampStepHandler.java index bd24905..a567325 100644 --- a/src/main/java/org/kettle/beam/pipeline/handler/BeamTimestampStepHandler.java +++ b/src/main/java/org/kettle/beam/pipeline/handler/BeamTimestampStepHandler.java @@ -70,6 +70,7 @@ public boolean isOutput() { xpPluginClasses )) ); + // Save this in the map // stepCollectionMap.put( stepMeta.getName(), stepPCollection ); diff --git a/src/main/java/org/kettle/beam/steps/bq/BeamBQInputDialog.java b/src/main/java/org/kettle/beam/steps/bq/BeamBQInputDialog.java index b8de8b9..43ae742 100644 --- a/src/main/java/org/kettle/beam/steps/bq/BeamBQInputDialog.java +++ b/src/main/java/org/kettle/beam/steps/bq/BeamBQInputDialog.java @@ -1,6 +1,14 @@ package org.kettle.beam.steps.bq; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.FieldList; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.Table; +import com.google.cloud.bigquery.TableDefinition; +import org.apache.commons.lang.StringUtils; import org.eclipse.swt.SWT; import org.eclipse.swt.events.SelectionAdapter; import org.eclipse.swt.events.SelectionEvent; @@ -10,18 +18,17 @@ import org.eclipse.swt.layout.FormData; import org.eclipse.swt.layout.FormLayout; import org.eclipse.swt.widgets.Button; -import org.eclipse.swt.widgets.Combo; import org.eclipse.swt.widgets.Control; import org.eclipse.swt.widgets.Display; -import org.eclipse.swt.widgets.Event; import org.eclipse.swt.widgets.Label; -import org.eclipse.swt.widgets.Listener; import org.eclipse.swt.widgets.Shell; import org.eclipse.swt.widgets.TableItem; import org.eclipse.swt.widgets.Text; -import org.kettle.beam.metastore.FileDefinition; +import org.kettle.beam.core.fn.BQSchemaAndRecordToKettleFn; import org.pentaho.di.core.Const; import org.pentaho.di.core.Props; +import org.pentaho.di.core.row.RowMeta; +import org.pentaho.di.core.row.RowMetaInterface; import org.pentaho.di.core.row.value.ValueMetaFactory; import org.pentaho.di.core.util.Utils; import org.pentaho.di.core.variables.Variables; @@ -29,15 +36,11 @@ import org.pentaho.di.trans.TransMeta; import org.pentaho.di.trans.step.BaseStepMeta; import org.pentaho.di.trans.step.StepDialogInterface; +import org.pentaho.di.ui.core.dialog.ErrorDialog; import org.pentaho.di.ui.core.widget.ColumnInfo; import org.pentaho.di.ui.core.widget.TableView; import org.pentaho.di.ui.core.widget.TextVar; import org.pentaho.di.ui.trans.step.BaseStepDialog; -import org.pentaho.metastore.persist.MetaStoreFactory; -import org.pentaho.metastore.util.PentahoDefaults; - -import java.util.Collections; -import java.util.List; public class BeamBQInputDialog extends BaseStepDialog implements StepDialogInterface { @@ -148,19 +151,19 @@ public String open() { wTableId.setLayoutData( fdTableId ); lastControl = wTableId; - Label wlQuery = new Label( shell, SWT.RIGHT | SWT.MULTI | SWT.H_SCROLL | SWT.V_SCROLL ); + Label wlQuery = new Label( shell, SWT.LEFT ); wlQuery.setText( BaseMessages.getString( PKG, "BeamBQInputDialog.Query" ) ); props.setLook( wlQuery ); FormData fdlQuery = new FormData(); fdlQuery.left = new FormAttachment( 0, 0 ); fdlQuery.top = new FormAttachment( lastControl, margin ); - fdlQuery.right = new FormAttachment( middle, -margin ); + fdlQuery.right = new FormAttachment( 100, 0 ); wlQuery.setLayoutData( fdlQuery ); - wQuery = new TextVar( transMeta, shell, SWT.SINGLE | SWT.LEFT | SWT.BORDER ); + wQuery = new TextVar( transMeta, shell, SWT.LEFT | SWT.MULTI | SWT.H_SCROLL | SWT.V_SCROLL ); props.setLook( wQuery, Props.WIDGET_STYLE_FIXED); FormData fdQuery = new FormData(); - fdQuery.left = new FormAttachment( middle, 0 ); - fdQuery.top = new FormAttachment( wlQuery, 0, SWT.CENTER ); + fdQuery.left = new FormAttachment( 0, 0 ); + fdQuery.top = new FormAttachment( wlQuery, margin ); fdQuery.right = new FormAttachment( 100, 0 ); fdQuery.bottom = new FormAttachment( wlQuery, 250); wQuery.setLayoutData( fdQuery ); @@ -177,11 +180,12 @@ public String open() { wOK = new Button( shell, SWT.PUSH ); wOK.setText( BaseMessages.getString( PKG, "System.Button.OK" ) ); - + wGet = new Button(shell, SWT.PUSH); + wGet.setText(BaseMessages.getString( PKG, "System.Button.GetFields" ) ); wCancel = new Button( shell, SWT.PUSH ); wCancel.setText( BaseMessages.getString( PKG, "System.Button.Cancel" ) ); - setButtonPositions( new Button[] { wOK, wCancel }, margin, null ); + setButtonPositions( new Button[] { wOK, wGet, wCancel }, margin, null ); ColumnInfo[] columns = new ColumnInfo[] { new ColumnInfo( BaseMessages.getString( PKG, "BeamBQInputDialog.Fields.Column.Name" ), ColumnInfo.COLUMN_TYPE_TEXT, false, false ), @@ -199,18 +203,11 @@ public String open() { lastControl = wFields; // Add listeners - lsOK = new Listener() { - public void handleEvent( Event e ) { - ok(); - } - }; - lsCancel = new Listener() { - public void handleEvent( Event e ) { - cancel(); - } - }; + lsOK = e -> ok(); + lsCancel = e -> cancel(); wOK.addListener( SWT.Selection, lsOK ); + wGet.addListener( SWT.Selection, e-> getFields() ); wCancel.addListener( SWT.Selection, lsCancel ); lsDef = new SelectionAdapter() { @@ -244,6 +241,45 @@ public void shellClosed( ShellEvent e ) { return stepname; } + public void getFields() { + try { + + BeamBQInputMeta meta = new BeamBQInputMeta(); + getInfo(meta); + + BigQuery bigQuery = BigQueryOptions.getDefaultInstance().getService(); + + if ( StringUtils.isNotEmpty( meta.getDatasetId() ) && + StringUtils.isNotEmpty( meta.getTableId() )) { + + Table table = bigQuery.getTable( + transMeta.environmentSubstitute( meta.getDatasetId()), + transMeta.environmentSubstitute( meta.getTableId() ) + ); + + TableDefinition definition = table.getDefinition(); + Schema schema = definition.getSchema(); + FieldList fieldList = schema.getFields(); + + RowMetaInterface rowMeta = new RowMeta(); + for ( int i = 0; i< fieldList.size(); i++) { + Field field = fieldList.get( i ); + + String name = field.getName(); + String type = field.getType().name(); + + int kettleType = BQSchemaAndRecordToKettleFn.AvroType.valueOf( type ).getKettleType(); + rowMeta.addValueMeta( ValueMetaFactory.createValueMeta( name, kettleType ) ); + } + + BaseStepDialog.getFieldsFromPrevious( rowMeta, wFields, 1, new int[] { 1 }, new int[] { 3 }, -1, -1, true, null ); + } + + } catch ( Exception e ) { + new ErrorDialog( shell, "Error", "Error getting BQ fields", e ); + } + } + /** * Populate the widgets. diff --git a/src/main/java/org/kettle/beam/steps/bq/BeamBQInputMeta.java b/src/main/java/org/kettle/beam/steps/bq/BeamBQInputMeta.java index 267b305..5d0341d 100644 --- a/src/main/java/org/kettle/beam/steps/bq/BeamBQInputMeta.java +++ b/src/main/java/org/kettle/beam/steps/bq/BeamBQInputMeta.java @@ -1,10 +1,21 @@ package org.kettle.beam.steps.bq; +import com.google.api.services.bigquery.model.TableReference; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.FieldList; +import com.google.cloud.bigquery.QueryJobConfiguration; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.Table; +import com.google.cloud.bigquery.TableDefinition; +import org.apache.commons.lang.StringUtils; import org.pentaho.di.core.annotations.Step; import org.pentaho.di.core.database.DatabaseMeta; import org.pentaho.di.core.exception.KettleException; import org.pentaho.di.core.exception.KettleStepException; import org.pentaho.di.core.exception.KettleXMLException; +import org.pentaho.di.core.logging.LogChannel; import org.pentaho.di.core.row.RowMetaInterface; import org.pentaho.di.core.row.ValueMetaInterface; import org.pentaho.di.core.row.value.ValueMetaFactory; @@ -77,7 +88,7 @@ public BeamBQInputMeta() { valueMeta.setOrigin( name ); inputRowMeta.addValueMeta( valueMeta ); } - } catch(Exception e) { + } catch ( Exception e ) { throw new KettleStepException( "Error getting Beam BQ Input step output", e ); } } @@ -88,18 +99,18 @@ public BeamBQInputMeta() { xml.append( XMLHandler.addTagValue( PROJECT_ID, projectId ) ); xml.append( XMLHandler.addTagValue( DATASET_ID, datasetId ) ); - xml.append( XMLHandler.addTagValue( TABLE_ID, tableId) ); - xml.append( XMLHandler.addTagValue( QUERY, query) ); - - xml.append( XMLHandler.openTag( "fields") ); - for (BQField field : fields ) { - xml.append( XMLHandler.openTag( "field") ); - xml.append( XMLHandler.addTagValue( "name", field.getName()) ); - xml.append( XMLHandler.addTagValue( "new_name", field.getNewName()) ); - xml.append( XMLHandler.addTagValue( "type", field.getKettleType()) ); - xml.append( XMLHandler.closeTag( "field") ); + xml.append( XMLHandler.addTagValue( TABLE_ID, tableId ) ); + xml.append( XMLHandler.addTagValue( QUERY, query ) ); + + xml.append( XMLHandler.openTag( "fields" ) ); + for ( BQField field : fields ) { + xml.append( XMLHandler.openTag( "field" ) ); + xml.append( XMLHandler.addTagValue( "name", field.getName() ) ); + xml.append( XMLHandler.addTagValue( "new_name", field.getNewName() ) ); + xml.append( XMLHandler.addTagValue( "type", field.getKettleType() ) ); + xml.append( XMLHandler.closeTag( "field" ) ); } - xml.append( XMLHandler.closeTag( "fields") ); + xml.append( XMLHandler.closeTag( "fields" ) ); return xml.toString(); } @@ -107,18 +118,18 @@ public BeamBQInputMeta() { @Override public void loadXML( Node stepNode, List databases, IMetaStore metaStore ) throws KettleXMLException { projectId = XMLHandler.getTagValue( stepNode, PROJECT_ID ); - datasetId= XMLHandler.getTagValue( stepNode, DATASET_ID ); - tableId= XMLHandler.getTagValue( stepNode, TABLE_ID); - query= XMLHandler.getTagValue( stepNode, QUERY); + datasetId = XMLHandler.getTagValue( stepNode, DATASET_ID ); + tableId = XMLHandler.getTagValue( stepNode, TABLE_ID ); + query = XMLHandler.getTagValue( stepNode, QUERY ); Node fieldsNode = XMLHandler.getSubNode( stepNode, "fields" ); List fieldNodes = XMLHandler.getNodes( fieldsNode, "field" ); - fields =new ArrayList<>( ); - for (Node fieldNode : fieldNodes) { + fields = new ArrayList<>(); + for ( Node fieldNode : fieldNodes ) { String name = XMLHandler.getTagValue( fieldNode, "name" ); String newName = XMLHandler.getTagValue( fieldNode, "new_name" ); String kettleType = XMLHandler.getTagValue( fieldNode, "type" ); - fields.add(new BQField( name, newName, kettleType )); + fields.add( new BQField( name, newName, kettleType ) ); } } diff --git a/src/main/java/org/kettle/beam/steps/bq/messages/messages_en_US.properties b/src/main/java/org/kettle/beam/steps/bq/messages/messages_en_US.properties index 81a1150..9b5bb8c 100644 --- a/src/main/java/org/kettle/beam/steps/bq/messages/messages_en_US.properties +++ b/src/main/java/org/kettle/beam/steps/bq/messages/messages_en_US.properties @@ -2,7 +2,7 @@ BeamBQInputDialog.DialogTitle = Beam BigQuery Input BeamBQInputDialog.ProjectId = Project ID BeamBQInputDialog.DatasetId = Data Set ID BeamBQInputDialog.TableId = Table ID -BeamBQInputDialog.Query = Query (blank mean everything) +BeamBQInputDialog.Query = Query. Blank means everything. Manually specify result fields below: BeamBQInputDialog.Fields = Return fields selection: BeamBQInputDialog.Fields.Column.Name = BQ Field name BeamBQInputDialog.Fields.Column.NewName = Rename to... (optional)