From f19b2037f19525ad5fe10c7b71d837c68b786b0b Mon Sep 17 00:00:00 2001
From: dvitiiuk <dmitriivitiiuk@gmail.com>
Date: Sun, 24 Nov 2019 21:30:12 +0200
Subject: [PATCH] PLUGIN-68. Integration tests for Google Drive plugins (SaaS
 source).

---
 .../app/etl/google/drive/GoogleDriveTest.java | 769 ++++++++++++++++++
 .../google/drive/UserCredentialsTestBase.java |  52 ++
 pom.xml                                       |   6 +
 3 files changed, 827 insertions(+)
 create mode 100644 integration-test-remote/src/test/java/io/cdap/cdap/app/etl/google/drive/GoogleDriveTest.java
 create mode 100644 integration-test-remote/src/test/java/io/cdap/cdap/app/etl/google/drive/UserCredentialsTestBase.java

diff --git a/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/google/drive/GoogleDriveTest.java b/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/google/drive/GoogleDriveTest.java
new file mode 100644
index 000000000..74bc99c61
--- /dev/null
+++ b/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/google/drive/GoogleDriveTest.java
@@ -0,0 +1,769 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.cdap.app.etl.google.drive;
+
+import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
+import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
+import com.google.api.client.http.ByteArrayContent;
+import com.google.api.client.http.javanet.NetHttpTransport;
+import com.google.api.client.json.JsonFactory;
+import com.google.api.client.json.jackson2.JacksonFactory;
+import com.google.api.services.drive.Drive;
+import com.google.api.services.drive.model.File;
+import com.google.api.services.drive.model.FileList;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import io.cdap.cdap.api.artifact.ArtifactScope;
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.cdap.common.ArtifactNotFoundException;
+import io.cdap.cdap.common.conf.Constants;
+import io.cdap.cdap.common.utils.Tasks;
+import io.cdap.cdap.datapipeline.SmartWorkflow;
+import io.cdap.cdap.etl.api.batch.BatchSink;
+import io.cdap.cdap.etl.api.batch.BatchSource;
+import io.cdap.cdap.etl.proto.ArtifactSelectorConfig;
+import io.cdap.cdap.etl.proto.v2.ETLBatchConfig;
+import io.cdap.cdap.etl.proto.v2.ETLPlugin;
+import io.cdap.cdap.etl.proto.v2.ETLStage;
+import io.cdap.cdap.proto.ProgramRunStatus;
+import io.cdap.cdap.proto.artifact.AppRequest;
+import io.cdap.cdap.proto.artifact.PluginSummary;
+import io.cdap.cdap.proto.id.ApplicationId;
+import io.cdap.cdap.proto.id.ArtifactId;
+import io.cdap.cdap.test.ApplicationManager;
+import io.cdap.cdap.test.WorkflowManager;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * Tests reading to and writing from Google Drive within a sandbox cluster.
+ */
+public class GoogleDriveTest extends UserCredentialsTestBase {
+  protected static final ArtifactSelectorConfig GOOGLE_DRIVE_ARTIFACT =
+    new ArtifactSelectorConfig("SYSTEM", "google-drive-plugins", "[0.0.0, 100.0.0)");
+  protected static final ArtifactSelectorConfig FILE_ARTIFACT =
+    new ArtifactSelectorConfig("SYSTEM", "core-plugins", "[0.0.0, 100.0.0)");
+  private static final JsonFactory JSON_FACTORY = JacksonFactory.getDefaultInstance();
+  private static final String GOOGLE_DRIVE_PLUGIN_NAME = "GoogleDrive";
+  private static final String FILE_PLUGIN_NAME = "File";
+  private static final int GENERATED_NAME_LENGTH = 16;
+  private static final String TEXT_PLAIN_MIME = "text/plain";
+  private static final String TEXT_CSV_MIME = "text/csv";
+  private static final String UNDEFINED_MIME = "application/octet-stream";
+
+  private static final String TEST_TEXT_FILE_NAME = "textFile";
+  private static final String TEST_DOC_FILE_NAME = "docFile";
+  private static final String TEST_SHEET_FILE_NAME = "sheetFile";
+  private static final String TEST_TEXT_FILE_CONTENT = "text file content";
+  private static final String TEST_DOC_FILE_CONTENT = "Google Document file content";
+  private static final String TEST_SHEET_FILE_CONTENT = "a,b,c\r\n,d,e";
+  public static final String TMP_FOLDER_NAME = "googleDriveTestFolder";
+
+  private static Drive service;
+  private String sourceFolderId;
+  private String sinkFolderId;
+  private String testTextFileId;
+  private String testDocFileId;
+  private String testSheetFileId;
+  private Path tmpFolder;
+
+  @BeforeClass
+  public static void setupDrive() throws GeneralSecurityException, IOException {
+    final NetHttpTransport HTTP_TRANSPORT = GoogleNetHttpTransport.newTrustedTransport();
+
+    GoogleCredential credential = new GoogleCredential.Builder()
+      .setTransport(HTTP_TRANSPORT)
+      .setJsonFactory(JSON_FACTORY)
+      .setClientSecrets(getClientId(),
+                        getClientSecret())
+      .build();
+    credential.setRefreshToken(getRefreshToken());
+
+    service = new Drive.Builder(HTTP_TRANSPORT, JSON_FACTORY, credential).build();
+  }
+
+  @Before
+  public void testClassSetup() throws IOException {
+    ImmutableList.of(ImmutableList.of(GOOGLE_DRIVE_PLUGIN_NAME, BatchSource.PLUGIN_TYPE, "cdap-data-pipeline"),
+                     ImmutableList.of(GOOGLE_DRIVE_PLUGIN_NAME, BatchSink.PLUGIN_TYPE, "cdap-data-pipeline"))
+      .forEach((pluginInfo) -> checkPluginExists(pluginInfo.get(0), pluginInfo.get(1), pluginInfo.get(2)));
+
+    String sourceFolderName = RandomStringUtils.randomAlphanumeric(16);
+    String sinkFolderName = RandomStringUtils.randomAlphanumeric(16);
+
+    sourceFolderId = createFolder(service, sourceFolderName);
+    sinkFolderId = createFolder(service, sinkFolderName);
+
+    testTextFileId = createFile(service, TEST_TEXT_FILE_CONTENT.getBytes(), TEST_TEXT_FILE_NAME,
+                                TEXT_PLAIN_MIME, null, sourceFolderId);
+    testDocFileId = createFile(service, TEST_DOC_FILE_CONTENT.getBytes(), TEST_DOC_FILE_NAME,
+                               "application/vnd.google-apps.document", TEXT_PLAIN_MIME, sourceFolderId);
+    testSheetFileId = createFile(service, TEST_SHEET_FILE_CONTENT.getBytes(), TEST_SHEET_FILE_NAME,
+                                 "application/vnd.google-apps.spreadsheet", TEXT_CSV_MIME, sourceFolderId);
+    tmpFolder = createFileSystemFolder(TMP_FOLDER_NAME);
+  }
+
+  @After
+  public void removeFolders() throws IOException {
+    removeFile(service, testTextFileId);
+    removeFile(service, testDocFileId);
+    removeFile(service, testSheetFileId);
+    removeFile(service, sourceFolderId);
+    removeFile(service, sinkFolderId);
+
+    Files.walk(tmpFolder)
+      .sorted(Comparator.reverseOrder())
+      .map(Path::toFile)
+      .forEach(java.io.File::delete);
+  }
+
+  @Test
+  public void testBinaryOnly() throws Exception {
+    Map<String, String> sourceProps = getDriveSourceMinimalDefaultConfigs();
+    sourceProps.put("fileTypesToPull", "binary");
+    Map<String, String> sinkProps = getDriveSinkMinimalDefaultConfigs();
+
+    DeploymentDetails deploymentDetails =
+      deployGoogleDriveApplication(sourceProps, sinkProps,
+                        GOOGLE_DRIVE_PLUGIN_NAME + "-testBinaryOnly");
+    startWorkFlow(deploymentDetails.getAppManager(), ProgramRunStatus.COMPLETED);
+
+    // check number of rows in and out
+    checkRowsNumber(deploymentDetails, 1);
+
+    List<File> destFiles = getFiles(sinkFolderId);
+    Assert.assertEquals(1, destFiles.size());
+
+    File textFile = destFiles.get(0);
+
+    Assert.assertEquals(UNDEFINED_MIME, textFile.getMimeType());
+    Assert.assertNotEquals(TEST_TEXT_FILE_NAME, textFile.getName());
+    Assert.assertEquals(GENERATED_NAME_LENGTH, textFile.getName().length());
+
+    String content = getFileContent(textFile.getId());
+    Assert.assertEquals(TEST_TEXT_FILE_CONTENT, content);
+  }
+
+  @Test
+  public void testDocFileOnly() throws Exception {
+    Map<String, String> sourceProps = getDriveSourceMinimalDefaultConfigs();
+    sourceProps.put("fileTypesToPull", "documents");
+    Map<String, String> sinkProps = getDriveSinkMinimalDefaultConfigs();
+
+    DeploymentDetails deploymentDetails =
+      deployGoogleDriveApplication(sourceProps, sinkProps,
+                        GOOGLE_DRIVE_PLUGIN_NAME + "-testDocFileOnly");
+    startWorkFlow(deploymentDetails.getAppManager(), ProgramRunStatus.COMPLETED);
+
+    // check number of rows in and out
+    checkRowsNumber(deploymentDetails, 1);
+
+    List<File> destFiles = getFiles(sinkFolderId);
+    Assert.assertEquals(1, destFiles.size());
+
+    File docFile = destFiles.get(0);
+
+    Assert.assertEquals(UNDEFINED_MIME, docFile.getMimeType());
+    Assert.assertNotEquals(TEST_TEXT_FILE_NAME, docFile.getName());
+    Assert.assertEquals(GENERATED_NAME_LENGTH, docFile.getName().length());
+
+    String content = getFileContent(docFile.getId());
+    // check BOM
+    Assert.assertEquals('\uFEFF', content.charAt(0));
+    Assert.assertEquals(TEST_DOC_FILE_CONTENT, content.replace("\uFEFF", ""));
+  }
+
+  @Test
+  public void testAllFileTypes() throws Exception {
+    Map<String, String> sourceProps = getDriveSourceMinimalDefaultConfigs();
+    sourceProps.put("fileTypesToPull", "binary,documents,spreadsheets");
+    Map<String, String> sinkProps = getDriveSinkMinimalDefaultConfigs();
+
+    DeploymentDetails deploymentDetails =
+      deployGoogleDriveApplication(sourceProps, sinkProps,
+                        GOOGLE_DRIVE_PLUGIN_NAME + "-testAllFileTypes");
+    startWorkFlow(deploymentDetails.getAppManager(), ProgramRunStatus.COMPLETED);
+
+    // check number of rows in and out
+    checkRowsNumber(deploymentDetails, 3);
+
+    List<File> destFiles = getFiles(sinkFolderId);
+    Assert.assertEquals(3, destFiles.size());
+
+    destFiles.forEach(file -> {
+      Assert.assertEquals(UNDEFINED_MIME, file.getMimeType());
+      Assert.assertNotEquals(TEST_TEXT_FILE_NAME, file.getName());
+      Assert.assertNotEquals(TEST_DOC_FILE_NAME, file.getName());
+      Assert.assertNotEquals(TEST_SHEET_FILE_NAME, file.getName());
+      Assert.assertEquals(GENERATED_NAME_LENGTH, file.getName().length());
+    });
+  }
+
+  @Test
+  public void testAllFileTypesNamed() throws Exception {
+    Map<String, String> sourceProps = getDriveSourceMinimalDefaultConfigs();
+    sourceProps.put("fileTypesToPull", "binary,documents,spreadsheets");
+    sourceProps.put("fileMetadataProperties", "name");
+    Map<String, String> sinkProps = getDriveSinkMinimalDefaultConfigs();
+    sinkProps.put("schemaNameFieldName", "name");
+
+    DeploymentDetails deploymentDetails =
+      deployGoogleDriveApplication(sourceProps, sinkProps,
+                        GOOGLE_DRIVE_PLUGIN_NAME + "-testAllFileTypesNamed");
+    startWorkFlow(deploymentDetails.getAppManager(), ProgramRunStatus.COMPLETED);
+
+    // check number of rows in and out
+    checkRowsNumber(deploymentDetails, 3);
+
+    List<File> destFiles = getFiles(sinkFolderId);
+    Assert.assertEquals(3, destFiles.size());
+
+    destFiles.forEach(file -> {
+      Assert.assertEquals(UNDEFINED_MIME, file.getMimeType());
+      Assert.assertNotNull(file.getName());
+      try {
+        String fileName = file.getName();
+        String content = getFileContent(file.getId());
+        switch (fileName) {
+          case TEST_TEXT_FILE_NAME:
+            Assert.assertEquals(TEST_TEXT_FILE_CONTENT, content);
+            break;
+          case TEST_DOC_FILE_NAME:
+            // check BOM
+            Assert.assertEquals('\uFEFF', content.charAt(0));
+            Assert.assertEquals(TEST_DOC_FILE_CONTENT, content.replace("\uFEFF", ""));
+            break;
+          case TEST_SHEET_FILE_NAME:
+            Assert.assertEquals(TEST_SHEET_FILE_CONTENT, content);
+            break;
+          default:
+            Assert.fail(String.format("Invalid file name after pipeline completion: '%s', content: '%s'", fileName, content));
+        }
+      } catch (IOException e) {
+        Assert.fail(String.format("Exception during test results check: [%s]", e.getMessage()));
+      }
+    });
+  }
+
+  @Test
+  public void testAllFileTypesNamedAndMimed() throws Exception {
+    Map<String, String> sourceProps = getDriveSourceMinimalDefaultConfigs();
+    sourceProps.put("fileTypesToPull", "binary,documents,spreadsheets");
+    sourceProps.put("fileMetadataProperties", "name,mimeType");
+    Map<String, String> sinkProps = getDriveSinkMinimalDefaultConfigs();
+    sinkProps.put("schemaNameFieldName", "name");
+    sinkProps.put("schemaMimeFieldName", "mimeType");
+
+    DeploymentDetails deploymentDetails =
+      deployGoogleDriveApplication(sourceProps, sinkProps,
+                        GOOGLE_DRIVE_PLUGIN_NAME + "-testAllFileTypesNamedAndMimed");
+    startWorkFlow(deploymentDetails.getAppManager(), ProgramRunStatus.COMPLETED);
+
+    // check number of rows in and out
+    checkRowsNumber(deploymentDetails, 3);
+
+    List<File> destFiles = getFiles(sinkFolderId);
+    Assert.assertEquals(3, destFiles.size());
+
+    destFiles.forEach(file -> {
+      Assert.assertNotNull(file.getName());
+      String fileName = null;
+      String mimeType = null;
+      try {
+        fileName = file.getName();
+        mimeType = file.getMimeType();
+        String content = getFileContent(file.getId());
+        switch (fileName) {
+          case TEST_TEXT_FILE_NAME:
+            Assert.assertEquals(TEST_TEXT_FILE_CONTENT, content);
+            Assert.assertEquals(TEXT_PLAIN_MIME, mimeType);
+            break;
+          case TEST_DOC_FILE_NAME:
+            // check BOM
+            Assert.assertEquals('\uFEFF', content.charAt(0));
+            Assert.assertEquals(TEST_DOC_FILE_CONTENT, content.replace("\uFEFF", ""));
+            Assert.assertEquals(TEXT_PLAIN_MIME, mimeType);
+            break;
+          case TEST_SHEET_FILE_NAME:
+            Assert.assertEquals(TEST_SHEET_FILE_CONTENT, content);
+            Assert.assertEquals(TEXT_CSV_MIME, mimeType);
+            break;
+          default:
+            Assert.fail(String.format("Invalid file name after pipeline completion: '%s', content: '%s', mime type: '%s'",
+                               fileName, content, mimeType));
+        }
+      } catch (IOException e) {
+        Assert.fail(String.format("Exception during test results check: [%s], file name '%s', mimeType '%s'",
+                           e.getMessage(),
+                           fileName == null ? "unknown" : fileName,
+                           mimeType == null ? "unknown" : mimeType));
+      }
+    });
+  }
+
+  @Test
+  public void testPartitionSize() throws Exception {
+    int testMaxPartitionSize = 10;
+
+    Map<String, String> sourceProps = getDriveSourceMinimalDefaultConfigs();
+    sourceProps.put("fileTypesToPull", "binary,documents,spreadsheets");
+    sourceProps.put("fileMetadataProperties", "name,mimeType");
+    sourceProps.put("maxPartitionSize", Integer.toString(testMaxPartitionSize));
+    Map<String, String> sinkProps = getDriveSinkMinimalDefaultConfigs();
+    sinkProps.put("schemaNameFieldName", "name");
+    sinkProps.put("schemaMimeFieldName", "mimeType");
+
+    DeploymentDetails deploymentDetails =
+      deployGoogleDriveApplication(sourceProps, sinkProps,
+                        GOOGLE_DRIVE_PLUGIN_NAME + "-testAllFileTypesNamedAndMimed");
+    startWorkFlow(deploymentDetails.getAppManager(), ProgramRunStatus.COMPLETED);
+
+    // check number of rows in and out
+    checkRowsNumber(deploymentDetails, 4);
+
+    List<File> destFiles = getFiles(sinkFolderId);
+    Assert.assertEquals(4, destFiles.size());
+
+    // flags to check partitioning work
+    boolean firstTextPart = false;
+    boolean secondTextPart = false;
+    List<String> parts = new ArrayList<>();
+
+    // Document and Sheets don't support partitioning
+    for (File file : destFiles) {
+      Assert.assertNotNull(file.getName());
+      try {
+        String fileName = file.getName();
+        String mimeType = file.getMimeType();
+        String content = getFileContent(file.getId());
+        switch (fileName) {
+          case TEST_TEXT_FILE_NAME:
+            Assert.assertNotEquals(TEST_TEXT_FILE_CONTENT, content);
+            Assert.assertEquals(TEXT_PLAIN_MIME, mimeType);
+            parts.add(content);
+            if (content.equals(TEST_TEXT_FILE_CONTENT.substring(0, testMaxPartitionSize))) {
+              firstTextPart = true;
+            }
+            if (content.equals(TEST_TEXT_FILE_CONTENT.substring(testMaxPartitionSize))) {
+              secondTextPart = true;
+            }
+            break;
+          case TEST_DOC_FILE_NAME:
+            // check BOM
+            Assert.assertEquals('\uFEFF', content.charAt(0));
+            Assert.assertEquals(TEST_DOC_FILE_CONTENT, content.replace("\uFEFF", ""));
+            Assert.assertEquals(TEXT_PLAIN_MIME, mimeType);
+            break;
+          case TEST_SHEET_FILE_NAME:
+            Assert.assertEquals(TEST_SHEET_FILE_CONTENT, content);
+            Assert.assertEquals(TEXT_CSV_MIME, mimeType);
+            break;
+          default:
+            Assert.fail(String.format("Invalid file name after pipeline completion: '%s', content: '%s', mime type: '%s'",
+                               fileName, content, mimeType));
+        }
+      } catch (IOException e) {
+        Assert.fail(String.format("Exception during test results check: [%s]", e.getMessage()));
+      }
+    }
+    Assert.assertTrue(String.format("Text file was separated incorrectly: %s", parts.toString()), firstTextPart);
+    Assert.assertTrue(String.format("Text file was separated incorrectly: %s", parts.toString()), secondTextPart);
+  }
+
+  @Test
+  public void testWithFileSource() throws Exception {
+    // create test file
+    createFileSystemTextFile(tmpFolder, TEST_TEXT_FILE_NAME, TEST_TEXT_FILE_CONTENT);
+
+    Map<String, String> sourceProps = getFileSourceMinimalDefaultConfigs();
+    Map<String, String> sinkProps = getDriveSinkMinimalDefaultConfigs();
+
+    DeploymentDetails deploymentDetails =
+      deployApplication(sourceProps, sinkProps, "FileSource", "GoogleDriveSink",
+                        FILE_PLUGIN_NAME, GOOGLE_DRIVE_PLUGIN_NAME, FILE_ARTIFACT, GOOGLE_DRIVE_ARTIFACT,
+                        GOOGLE_DRIVE_PLUGIN_NAME + "-testWithFileSource");
+    startWorkFlow(deploymentDetails.getAppManager(), ProgramRunStatus.COMPLETED);
+
+    // check number of rows in and out
+    checkRowsNumber(deploymentDetails, 1);
+
+    List<File> destFiles = getFiles(sinkFolderId);
+    Assert.assertEquals(1, destFiles.size());
+
+    File textFile = destFiles.get(0);
+
+    Assert.assertEquals(UNDEFINED_MIME, textFile.getMimeType());
+    Assert.assertNotEquals(TEST_TEXT_FILE_NAME, textFile.getName());
+    Assert.assertEquals(GENERATED_NAME_LENGTH, textFile.getName().length());
+
+    String content = getFileContent(textFile.getId());
+    Assert.assertEquals(TEST_TEXT_FILE_CONTENT, content);
+  }
+
+  @Test
+  public void testWithFileSink() throws Exception {
+    int testMaxPartitionSize = 50;
+    String testFileName = "Image.png";
+    String testFileMime = "image/png";
+    byte[] testPNGContent = new byte[]{-119,80,78,71,13,10,26,10,0,0,0,13,73,72,68,82,0,0,0,5,0,0,0,5,8,2,0,0,0,2,13,
+      -79,-78,0,0,0,9,112,72,89,115,0,0,11,19,0,0,11,19,1,0,-102,-100,24,0,0,0,7,116,73,77,69,7,-29,10,18,13,43,
+      15,2,-77,55,-110,0,0,0,25,116,69,88,116,67,111,109,109,101,110,116,0,67,114,101,97,116,101,100,32,
+      119,105,116,104,32,71,73,77,80,87,-127,14,23,0,0,0,40,73,68,65,84,8,-41,93,-117,65,10,0,48,12,-62,
+      -30,-1,31,-99,29,108,-95,-52,-125,72,-44,-88,73,84,0,8,-85,65,106,83,-3,44,-5,-6,-6,7,-62,-105,32,
+      -23,115,33,-2,-49,0,0,0,0,73,69,78,68,-82,66,96,-126};
+    int contentLength = testPNGContent.length;
+
+    // create png file with metadata in Google Drive
+    // size: 174 bytes
+    createFile(service, testPNGContent, testFileName, "image/png", null, sourceFolderId);
+
+    Map<String, String> sourceProps = getDriveSourceMinimalDefaultConfigs();
+    sourceProps.put("fileTypesToPull", "binary,documents,spreadsheets");
+    sourceProps.put("bodyFormat", "bytes");
+    sourceProps.put("filter", String.format("name='%s'", testFileName));
+    sourceProps.put("fileMetadataProperties", "name,mimeType,size,imageMediaMetadata.width," +
+      "imageMediaMetadata.height,imageMediaMetadata.rotation");
+    sourceProps.put("maxPartitionSize", Integer.toString(testMaxPartitionSize));
+    Map<String, String> sinkProps = getFileSinkMinimalDefaultConfigs();
+
+    DeploymentDetails deploymentDetails =
+      deployApplication(sourceProps, sinkProps, "GoogleDriveSource", "FileSink",
+                        GOOGLE_DRIVE_PLUGIN_NAME, FILE_PLUGIN_NAME, GOOGLE_DRIVE_ARTIFACT, FILE_ARTIFACT,
+                        GOOGLE_DRIVE_PLUGIN_NAME + "-testWithFileSink");
+    startWorkFlow(deploymentDetails.getAppManager(), ProgramRunStatus.COMPLETED);
+
+    // check number of rows in and out
+    checkRowsNumber(deploymentDetails, 4);
+
+    Assert.assertTrue(Files.isDirectory(tmpFolder));
+
+    List<Path> allDeploysResults = Files.list(tmpFolder).collect(Collectors.toList());
+    Assert.assertEquals(1, allDeploysResults.size());
+
+    Path deployResult = allDeploysResults.get(0);
+    Assert.assertTrue(Files.isDirectory(deployResult));
+    Assert.assertEquals(1,
+                 Files.list(deployResult).filter(p -> p.getFileName().toString().equals("_SUCCESS")).count());
+
+    List<Path> destFiles =
+      Files.list(deployResult).filter(p -> p.getFileName().toString().startsWith("part")).collect(Collectors.toList());
+    Assert.assertEquals(4, destFiles.size());
+
+    JsonParser jsonParser = new JsonParser();
+
+    Map<Integer, byte[]> partitionedContent = new HashMap<>();
+    for (Path destFile : destFiles) {
+      List<String> fileLines = null;
+      try {
+        fileLines = Files.readAllLines(destFile);
+      } catch (IOException e) {
+        Assert.fail(String.format("Exception during reading file '%s': %s", destFile.toString(), e.getMessage()));
+      }
+      String fileContent = String.join(",", fileLines);
+      JsonElement rootElement = jsonParser.parse(fileContent);
+      Assert.assertTrue(rootElement.isJsonObject());
+
+      JsonObject rootObject = rootElement.getAsJsonObject();
+
+      // Entries: name, mimeType, size, imageMediaMetadata, offset, body
+      Assert.assertEquals(6, rootObject.entrySet().size());
+      Assert.assertEquals(testFileName, rootObject.get("name").getAsString());
+      Assert.assertEquals(testFileMime, rootObject.get("mimeType").getAsString());
+      Assert.assertEquals(contentLength, rootObject.get("size").getAsInt());
+
+      JsonObject imageMediaMetadataObject = rootObject.get("imageMediaMetadata").getAsJsonObject();
+
+      // Image metadata entries: width, height, rotation
+      Assert.assertEquals(3, imageMediaMetadataObject.entrySet().size());
+      Assert.assertEquals(5, imageMediaMetadataObject.get("width").getAsInt());
+      Assert.assertEquals(5, imageMediaMetadataObject.get("height").getAsInt());
+      Assert.assertEquals(0, imageMediaMetadataObject.get("rotation").getAsInt());
+
+      // collect bodies for next check
+      int resultOffset = rootObject.get("offset").getAsInt();
+      JsonArray bytes = rootObject.get("body").getAsJsonArray();
+      byte[]  resultBody = new byte[bytes.size()];
+      for (int i = 0; i < bytes.size(); i++) {
+        resultBody[i] = bytes.get(i).getAsByte();
+      }
+      partitionedContent.put(resultOffset, resultBody);
+    }
+    byte[] assembledContent = new byte[contentLength];
+    ByteBuffer buffer = ByteBuffer.wrap(assembledContent);
+    for (Map.Entry<Integer, byte[]> part : partitionedContent.entrySet()) {
+      buffer.position(part.getKey());
+      buffer.put(part.getValue());
+    }
+    Assert.assertArrayEquals(testPNGContent, buffer.array());
+  }
+
+  private Map<String, String> getDriveSourceMinimalDefaultConfigs() {
+    Map<String, String> sourceProps = new HashMap<>();
+    sourceProps.put("referenceName", "ref");
+    sourceProps.put("directoryIdentifier", sourceFolderId);
+    sourceProps.put("modificationDateRange", "lifetime");
+    sourceProps.put("fileTypesToPull", "binary");
+    sourceProps.put("maxPartitionSize", "0");
+    sourceProps.put("bodyFormat", "bytes");
+    sourceProps.put("docsExportingFormat", "text/plain");
+    sourceProps.put("sheetsExportingFormat", "text/csv");
+    sourceProps.put("drawingsExportingFormat", "image/svg+xml");
+    sourceProps.put("presentationsExportingFormat", "text/plain");
+    sourceProps.put("authType", "oAuth2");
+    sourceProps.put("clientId", getClientId());
+    sourceProps.put("clientSecret", getClientSecret());
+    sourceProps.put("refreshToken", getRefreshToken());
+    sourceProps.put("maxRetryCount", "8");
+    sourceProps.put("maxRetryWait", "200");
+    sourceProps.put("maxRetryJitterWait", "100");
+    return sourceProps;
+  }
+
+  private Map<String, String> getFileSourceMinimalDefaultConfigs() {
+    Set<Schema.Field> schemaFields = new HashSet<>();
+    schemaFields.add(Schema.Field.of("body", Schema.nullableOf(Schema.of(Schema.Type.BYTES))));
+    Schema fileSchema = Schema.recordOf(
+      "blob",
+      schemaFields);
+    Map<String, String> sourceProps = new HashMap<>();
+    sourceProps.put("path", tmpFolder.toString());
+    sourceProps.put("referenceName", "fileref");
+    sourceProps.put("format", "blob");
+    sourceProps.put("schema", fileSchema.toString());
+    return sourceProps;
+  }
+
+  private Map<String, String> getDriveSinkMinimalDefaultConfigs() {
+    Map<String, String> sinkProps = new HashMap<>();
+    sinkProps.put("referenceName", "refd");
+    sinkProps.put("directoryIdentifier", sinkFolderId);
+    sinkProps.put("schemaBodyFieldName", "body");
+    sinkProps.put("authType", "oAuth2");
+    sinkProps.put("clientId", getClientId());
+    sinkProps.put("clientSecret", getClientSecret());
+    sinkProps.put("refreshToken", getRefreshToken());
+    sinkProps.put("maxRetryCount", "8");
+    sinkProps.put("maxRetryWait", "200");
+    sinkProps.put("maxRetryJitterWait", "100");
+    return sinkProps;
+  }
+
+  private Map<String, String> getFileSinkMinimalDefaultConfigs() {
+    Map<String, String> sinkProps = new HashMap<>();
+    sinkProps.put("suffix", "yyyy-MM-dd-HH-mm");
+    sinkProps.put("path", tmpFolder.toString());
+    sinkProps.put("referenceName", "fileref");
+    sinkProps.put("format", "json");
+    return sinkProps;
+  }
+
+  protected void startWorkFlow(ApplicationManager appManager, ProgramRunStatus expectedStatus) throws Exception {
+    WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME);
+    workflowManager.startAndWaitForRun(expectedStatus, 5, TimeUnit.MINUTES);
+  }
+
+  private void checkRowsNumber(DeploymentDetails deploymentDetails, int expectedCount) throws Exception {
+    ApplicationId appId = deploymentDetails.getAppId();
+    Map<String, String> tags = ImmutableMap.of(Constants.Metrics.Tag.NAMESPACE, appId.getNamespace(),
+                                               Constants.Metrics.Tag.APP, appId.getEntityName());
+    checkMetric(tags, "user." + deploymentDetails.getSource().getName() + ".records.out",
+                expectedCount, 10);
+    checkMetric(tags, "user." + deploymentDetails.getSink().getName() + ".records.in",
+                expectedCount, 10);
+  }
+
+  void checkPluginExists(String pluginName, String pluginType, String artifact) {
+    Preconditions.checkNotNull(pluginName);
+    Preconditions.checkNotNull(pluginType);
+    Preconditions.checkNotNull(artifact);
+
+    try {
+      Tasks.waitFor(true, () -> {
+        try {
+          final ArtifactId artifactId = TEST_NAMESPACE.artifact(artifact, version);
+          List<PluginSummary> plugins =
+            artifactClient.getPluginSummaries(artifactId, pluginType, ArtifactScope.SYSTEM);
+          return plugins.stream().anyMatch(pluginSummary -> pluginName.equals(pluginSummary.getName()));
+        } catch (ArtifactNotFoundException e) {
+          // happens if the relevant artifact(s) were not added yet
+          return false;
+        }
+      }, 5, TimeUnit.MINUTES, 3, TimeUnit.SECONDS);
+    } catch (Exception e) {
+      Throwables.throwIfUnchecked(e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  private DeploymentDetails deployGoogleDriveApplication(Map<String, String> sourceProperties,
+                                              Map<String, String> sinkProperties,
+                                              String applicationName) throws Exception {
+    return deployApplication(sourceProperties, sinkProperties,
+                             "GoogleDriveSource", "GoogleDriveSink",
+                             GOOGLE_DRIVE_PLUGIN_NAME, GOOGLE_DRIVE_PLUGIN_NAME,
+                             GOOGLE_DRIVE_ARTIFACT, GOOGLE_DRIVE_ARTIFACT, applicationName);
+  }
+
+  private DeploymentDetails deployApplication(Map<String, String> sourceProperties, Map<String, String> sinkProperties,
+                                              String sourceStageName, String sinkStageName,
+                                              String sourcePluginName, String sinkPluginName,
+                                              ArtifactSelectorConfig sourceArtifact, ArtifactSelectorConfig sinkArtifact,
+                                              String applicationName) throws Exception {
+    ETLStage source = new ETLStage(sourceStageName,
+                                   new ETLPlugin(sourcePluginName,
+                                                 BatchSource.PLUGIN_TYPE,
+                                                 sourceProperties,
+                                                 sourceArtifact));
+    ETLStage sink = new ETLStage(sinkStageName, new ETLPlugin(sinkPluginName,
+                                                              BatchSink.PLUGIN_TYPE,
+                                                              sinkProperties,
+                                                              sinkArtifact));
+
+    ETLBatchConfig etlConfig = ETLBatchConfig.builder()
+      .addStage(source)
+      .addStage(sink)
+      .addConnection(source.getName(), sink.getName())
+      .build();
+
+    AppRequest<ETLBatchConfig> appRequest = getBatchAppRequestV2(etlConfig);
+    ApplicationId appId = TEST_NAMESPACE.app(applicationName);
+    ApplicationManager applicationManager = deployApplication(appId, appRequest);
+    return new DeploymentDetails(source, sink, appId, applicationManager);
+  }
+
+  private static String createFile(Drive service, byte[] content, String name, String mime, String subMime,
+                                   String folderId) throws IOException {
+    File fileToWrite = new File();
+    fileToWrite.setName(name);
+    fileToWrite.setParents(Collections.singletonList(folderId));
+    fileToWrite.setMimeType(mime);
+    ByteArrayContent fileContent = new ByteArrayContent(subMime, content);
+
+    File file = service.files().create(fileToWrite, fileContent)
+      .setFields("id, parents, mimeType")
+      .execute();
+    return file.getId();
+  }
+
+  private static String createFolder(Drive service, String folderName) throws IOException {
+    File fileMetadata = new File();
+    fileMetadata.setName(folderName);
+    fileMetadata.setMimeType("application/vnd.google-apps.folder");
+
+    File createdFolder = service.files().create(fileMetadata).setFields("id").execute();
+    return createdFolder.getId();
+  }
+
+  private static void removeFile(Drive service, String fileId) throws IOException {
+    service.files().delete(fileId).execute();
+  }
+
+  private List<File> getFiles(String parentFolderId) {
+    try {
+      List<File> files = new ArrayList<>();
+      String nextToken = "";
+      Drive.Files.List request = service.files().list()
+        .setQ(String.format("'%s' in parents", parentFolderId))
+        .setFields("nextPageToken, files(id, name, size, mimeType)");
+      while (nextToken != null) {
+        FileList result = request.execute();
+        files.addAll(result.getFiles());
+        nextToken = result.getNextPageToken();
+        request.setPageToken(nextToken);
+      }
+      return files;
+    } catch (IOException e) {
+      throw new RuntimeException("Issue during retrieving summary for files.", e);
+    }
+  }
+
+  private String getFileContent(String fileId) throws IOException {
+    OutputStream outputStream = new ByteArrayOutputStream();
+    Drive.Files.Get get = service.files().get(fileId);
+
+    get.executeMediaAndDownloadTo(outputStream);
+    return ((ByteArrayOutputStream) outputStream).toString();
+  }
+
+  private Path createFileSystemFolder(String path) throws IOException {
+    return Files.createTempDirectory(path);
+  }
+
+  private void createFileSystemTextFile(Path dirPath, String name, String content) throws IOException {
+    Path createdFile = Files.createTempFile(dirPath, name, null);
+    Files.write(createdFile, content.getBytes());
+  }
+
+  private class DeploymentDetails {
+
+    private final ApplicationId appId;
+    private final ETLStage source;
+    private final ETLStage sink;
+    private final ApplicationManager appManager;
+
+    DeploymentDetails(ETLStage source, ETLStage sink, ApplicationId appId, ApplicationManager appManager) {
+      this.appId = appId;
+      this.source = source;
+      this.sink = sink;
+      this.appManager = appManager;
+    }
+
+    public ApplicationId getAppId() {
+      return appId;
+    }
+
+    public ETLStage getSource() {
+      return source;
+    }
+
+    public ETLStage getSink() {
+      return sink;
+    }
+
+    public ApplicationManager getAppManager() {
+      return appManager;
+    }
+  }
+}
diff --git a/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/google/drive/UserCredentialsTestBase.java b/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/google/drive/UserCredentialsTestBase.java
new file mode 100644
index 000000000..adb10e5b1
--- /dev/null
+++ b/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/google/drive/UserCredentialsTestBase.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.cdap.app.etl.google.drive;
+
+import org.junit.BeforeClass;
+
+import io.cdap.cdap.app.etl.ETLTestBase;
+
+/**
+ * An abstract class used for running integration tests with Google OAuth2 user account credentials.
+ */
+public abstract class UserCredentialsTestBase extends ETLTestBase {
+  private static String clientId;
+  private static String clientSecret;
+  private static String refreshToken;
+
+  @BeforeClass
+  public static void userCredentialsSetup() {
+    clientId = System.getProperty("google.application.clientId");
+    clientSecret = System.getProperty("google.application.clientSecret");
+    refreshToken = System.getProperty("google.application.refreshToken");
+    if (clientId == null || clientSecret== null || refreshToken == null) {
+      throw new IllegalArgumentException("Invalid user credential parameters");
+    }
+  }
+
+  public static String getClientId() {
+    return clientId;
+  }
+
+  public static String getClientSecret() {
+    return clientSecret;
+  }
+
+  public static String getRefreshToken() {
+    return refreshToken;
+  }
+}
diff --git a/pom.xml b/pom.xml
index 28c285bdc..413aba7a6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -43,6 +43,7 @@
     <!-- cdap.examples.version is overridden in the pre-stage of upgrade tests -->
     <cdap.examples.version>${cdap.version}</cdap.examples.version>
     <cdap.common.version>0.12.0</cdap.common.version>
+    <drive-api.version>v3-rev173-1.25.0</drive-api.version>
     <junit.version>4.12</junit.version>
     <kafka.version>0.8.2.2</kafka.version>
     <snappy.version>1.1.1.7</snappy.version>
@@ -99,6 +100,11 @@
       <artifactId>google-cloud-bigquery</artifactId>
       <version>1.36.0</version>
     </dependency>
+    <dependency>
+      <groupId>com.google.apis</groupId>
+      <artifactId>google-api-services-drive</artifactId>
+      <version>${drive-api.version}</version>
+    </dependency>
     <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>