diff --git a/.gitignore b/.gitignore index e45d049..ec6db2f 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,5 @@ /classpath/ build/ .idea +*.iml +example/config.yml diff --git a/README.md b/README.md index 9b8ff8f..b20662a 100644 --- a/README.md +++ b/README.md @@ -1,32 +1,59 @@ # Sftp file output plugin for Embulk -TODO: Write short description here and build.gradle file. +Stores files on a SFTP Server ## Overview * **Plugin type**: file output * **Load all or nothing**: no * **Resume supported**: no -* **Cleanup supported**: yes +* **Cleanup supported**: no ## Configuration -- **option1**: description (integer, required) -- **option2**: description (string, default: `"myvalue"`) -- **option3**: description (string, default: `null`) +- **host**: (string, required) +- **port**: (string, default: `22`) +- **user**: (string, required) +- **password**: (string, default: `null`) +- **secret_key_file**: (string, default: `null`) +- **secret_key_passphrase**: (string, default: `""`) +- **user_directory_is_root**: (boolean, default: `true`) +- **timeout**: sftp connection timeout seconds (integer, default: `600`) +- **path_prefix**: Prefix of output paths (string, required) +- **file_ext**: Extension of output files (string, required) +- **sequence_format**: Format for sequence part of output files (string, default: `".%03d.%02d"`) ## Example ```yaml out: type: sftp - option1: example1 - option2: example2 + host: 127.0.0.1 + port: 22 + user: civitaspo + secret_key_file: /Users/civitaspo/.ssh/id_rsa + secret_key_passphrase: secret_pass + user_directory_is_root: false + timeout: 600 + path_prefix: /data/sftp + file_ext: _20151020.tsv + sequence_format: ".%01d%01d" ``` +## Run Example +replace settings in `example/sample.yml` before running. + +``` +$ ./gradlew classpath +$ embulk run -Ilib example/sample.yml +``` ## Build ``` $ ./gradlew gem # -t to watch change of files and rebuild continuously ``` + +## Note + +This plugin uses "org.apache.commons:commons-vfs" and the library uses the logger "org.apache.commons.logging.Log". So, this plugin suppress the logger's message except when embulk log level is debug. diff --git a/build.gradle b/build.gradle index 0ca8543..d0ad60b 100644 --- a/build.gradle +++ b/build.gradle @@ -12,12 +12,16 @@ configurations { provided } -version = "0.1.0" +version = "0.0.1" +sourceCompatibility = 1.7 +targetCompatibility = 1.7 dependencies { compile "org.embulk:embulk-core:0.7.4" provided "org.embulk:embulk-core:0.7.4" // compile "YOUR_JAR_DEPENDENCY_GROUP:YOUR_JAR_DEPENDENCY_MODULE:YOUR_JAR_DEPENDENCY_VERSION" + compile "org.apache.commons:commons-vfs2:2.+" + compile "com.jcraft:jsch:0.1.53" testCompile "junit:junit:4.+" } @@ -57,7 +61,7 @@ Gem::Specification.new do |spec| spec.description = %[Stores files on Sftp.] spec.email = ["civitaspo@gmail.com"] spec.licenses = ["MIT"] - # TODO set this: spec.homepage = "https://github.com/civitaspo/embulk-output-sftp" + spec.homepage = "https://github.com/civitaspo/embulk-output-sftp" spec.files = `git ls-files`.split("\n") + Dir["classpath/*.jar"] spec.test_files = spec.files.grep(%r"^(test|spec)/") diff --git a/example/data.csv b/example/data.csv new file mode 100644 index 0000000..bfdf207 --- /dev/null +++ b/example/data.csv @@ -0,0 +1,100 @@ +time,id,name,score +2015-07-13,0,Vqjht6YEUBsMPXmoW1iOGFROZF27pBzz0TUkOKeDXEY,1370 +2015-07-13,1,VmjbjAA0tOoSEPv_vKAGMtD_0aXZji0abGe7_VXHmUQ,3962 +2015-07-13,2,C40P5H1WcBx-aWFDJCI8th6QPEI2DOUgupt_gB8UutE,7323 +2015-07-13,3,Prr0_u_T1ts4myUofBorOJFpCYcOTLOmNBMuRmKIPJU,5905 +2015-07-13,4,AEGIhHVW5cV6Xlb62uvx3TVl3kmh3Do8AvvtLDS7MDw,8378 +2015-07-13,5,eupqWLrnCHr_1UaX4dUInLRxx5Q_cyQ4t0oSJBcw0MA,275 +2015-07-13,6,BN8cQ47EXRb_oCGOoN96bhBldoiyoCp5O_vGHwg0XCg,9303 +2015-07-13,7,RvV35-6jY6MC9_Wnm4nPsmyyfqcr-hlnBt88sXtn1nU,6130 +2015-07-13,8,6OZiuPiJKjWNLMPgiEbJarB0F80lTPYkkePP8LMliv0,6652 +2015-07-13,9,13CgEU_ApAMVE6Ll6Y-mSu-aubskNgHbynj2rj8f8oE,6822 +2015-07-13,10,j1evoWRzKrJR0sfo014ZxhZtKigWDkRip5FwpAHAsmU,1311 +2015-07-13,11,4vBBBcArfMGhediXV5Sn80hj4KkI4nUCllECNKxNgnI,4748 +2015-07-13,12,6LSLQGjv46TWsvXrxYCfM5yIz4JGiGd1eEQI4TC-4yc,43 +2015-07-13,13,bgLJeacIPOMH6sDb5tEmca1oYyaMdfqZomGEI2uby7k,1214 +2015-07-13,14,bRHc-42RqKVv3ORxhVCA4T4dLEXyBzBCQoed8VOrDCo,7048 +2015-07-13,15,ysiB3w-K5jb3FxpQY61OHYTlK9qklz3nW84RLvBnh9s,8795 +2015-07-13,16,Rvn7-tMbQM3q0yWQD8AUdURhFB0ZkzLGdIiDg-AJokM,7838 +2015-07-13,17,FDEI99QVJ8xRTOiQ-UDVlPMOBfuA0IwIAbJ872XnKOo,9507 +2015-07-13,18,lZUazYHDEGbQbzN7vEFeLjmnzp1wsjR0D8r8f7Cs6x0,3378 +2015-07-13,19,WmDFEQsDPSnVs8AiAdO3QJqlSFer1K0I8z7F0cl_WRk,1661 +2015-07-13,20,OEDSi7YIj4OjMNqTw12EA04BNtNuVWva6YRhokxL4xQ,5934 +2015-07-13,21,fXYhm19m2FsbWcRQGqJvVOSl2ZIRSNhWTfke-iG8e7Q,680 +2015-07-13,22,LK59zfxizCwr5CI2Wu88B8gY8-G4OeyAXZobplwGzKk,8758 +2015-07-13,23,8i5TVZorCp4YATsaxgybkdOHcmDywvb35Sf-Eb-sl9E,8392 +2015-07-13,24,MrM9vy1U-9_OEYOQAxbshenvvUGdCZfqjx7l3KKBQ2I,8708 +2015-07-13,25,miVWwEwur_7baTxIBHUT9y351AU3tnAcCXgBzvyUR5I,2843 +2015-07-13,26,_vxViqC02KVb7RRBeDGYs9VZ52KB8QmvguzSXUYGfwI,6681 +2015-07-13,27,Ui6BqkQDipo5kQEeVUuC2OFFIB1O4T8ALlM2GI_zvtk,7542 +2015-07-13,28,OT3VLH-RdK0sIgQM3f6LIbBa_rt0YzCD5YOw4qpu6p8,5791 +2015-07-13,29,vassmNeEo_jbn88g7QP58mTxH-b1jhHfwFhy-FL6T8c,9613 +2015-07-13,30,VjzTphngC6V5fphi9fkGeYGCPIQNpDajfkHxrJopF6k,3064 +2015-07-13,31,aqw27tMVvSsLJ8EEY3hphHMb0BRLm-LZysjVV3aX7pQ,7862 +2015-07-13,32,ZXepGbCv7Yw_ejNQyAPjrqG_VwNH_RZoG8lKODl-f9c,397 +2015-07-13,33,-yRoubVSa0oPfg0E1Gh7zYBQfBO8dIxZvQH9c5OsZAU,5003 +2015-07-13,34,UkhBEKU7G0rV58Urs6JTAgC0UF5Y2kP-dffmE6H4nGs,6514 +2015-07-13,35,ktLO3RTpHLZon7AhE9XMwPPh0t_GiOpS8vwCCqoPPnk,8634 +2015-07-13,36,3ktjc_W87j3S8qLOJ0CVEVSSpz_nUAEQVBsqOMabrp0,3679 +2015-07-13,37,KscV-oPqhG_CZXYUgdCmekKdR9FIT5tSt7rd3wpQDcU,1013 +2015-07-13,38,VFiC8YyBk6zZk5bpfZG8s1a3kYfMA1zvnbs6DDSplGY,1556 +2015-07-13,39,s0bxCQyW048GkhEAoEzXYGcTV8BZo6MLnRhL62nepYk,2844 +2015-07-13,40,aWbMyvSxxTqrVONKAeQVvqi_bGqROu9UeR5NqPPlI4A,8035 +2015-07-13,41,qfjEvEY8XSgMEmc-vIZLinOeIdIz6xprQbsYAe0i2WU,3205 +2015-07-13,42,NK2ddaghTrUTS6Y7U1e-l57922ccVOKnqlODcA6lyBQ,302 +2015-07-13,43,JRQpF1luRmNk2stUaZzDQDj93hy4RSW_iWybVgsgzJA,6534 +2015-07-13,44,lz7bs1xZi4qdWLE7fQwpykWDNgp_o9oUuCZXipSLSqw,9250 +2015-07-13,45,TxcwVGwelHKJws_6Q0Nk6I4Eeo9sSThM7M9KorqIGhA,5549 +2015-07-13,46,u_uy6k3TgUIp3NWMFJ8EOH1mKtFhozGBD208z9um88s,3624 +2015-07-13,47,RaI9xr82f0D7Jjuc4QY8Rz-UlCg3V5tw7KgJtczEo44,5278 +2015-07-13,48,u6Nqudxl6vrbKGemO8xXgYojhtBGK3SQkTRPSYcaZuI,9588 +2015-07-13,49,r-IgfD4fE9TiQWarsVxO_4AdieYIUZ9cczPD44_snQ4,4795 +2015-07-13,50,KIiUpd04d3zYDul1mFlcJ3934AYvA_YeXDYG089ub-M,4344 +2015-07-13,51,zZs0iuqm7liPKKHHn8wz-kNvd2zLCqRdXAng4B3gL0A,2116 +2015-07-13,52,Rg7T2IsH0-HIvhgq0mNRC-4q5JoZ5Rcjq4tP7dz_3Ew,5323 +2015-07-13,53,uBNgdXPL6kZGXP-gTic2N-uDRCxAtmI-KixkJWgrObA,9 +2015-07-13,54,fQ_TLG3oByt3sDqM3Kruo69fBd1qLMXbbg10myfFXkQ,2471 +2015-07-13,55,0uNd4TrRpEA1lY_zWikyELZ3MmCTzON_5ftfi-45wic,9831 +2015-07-13,56,Jfp4VCtsFElA6UzzZyPxOwegfGqsYwrimSFp59YshTs,3177 +2015-07-13,57,KAHSwcCwblbPRysuImbzUxx0SLAMIMb6LmMAXJBjUww,1182 +2015-07-13,58,wuyDbV5ljr5275eGWhAe8wkElCzd2d_gRW3SpBkLIyY,183 +2015-07-13,59,R3KTTvKRvPn6vu4qtooBbqYmwdOCC9vjmcsnf_fyu5g,5001 +2015-07-13,60,Pgsf32JIv2cUMdTE9Vydh2Y36B_Xi4T1ufIy7QiKFSU,6182 +2015-07-13,61,EZmz-tWhPPAsXsDZms_HHsDLKBOuZisUDotr72xXQnI,5228 +2015-07-13,62,mk4y32O73DU2z65dFuW1PvIokdB7bB7btUnCoDlSVxM,8094 +2015-07-13,63,fs1HvYjpOvAHnT5W1rCPU9A3k8_Px2XwfprrLrkQibM,5849 +2015-07-13,64,x8WAAde6AqG2YaOEIpCFMzItRrfUXqgc8bwcoWSiMEo,6076 +2015-07-13,65,zuvlwNyn8AgPEvg6qIxzkUp_ClPkMn5A__YyksWbxTo,6439 +2015-07-13,66,ZWWjbJAqVtZz3AzCpacgEabm7SMloLHPBTlS3NMk7GA,6531 +2015-07-13,67,wdHfAVpHp9rFaGhZOC81AusTsZX0KHxTf5RkFBw6gpI,8088 +2015-07-13,68,hw8HUkIQMSS-2gAT7rvA2kgdhXfhHlySxKtssINvcFc,7808 +2015-07-13,69,x1_SLENL6-M1y0n5qmfBF1-GCslEHpVM4Fo1Rdz9Ofg,3617 +2015-07-13,70,E2Uj3TGAwd_B3FOS6KQ1Gjyql2YpoNtbdzWBTUOWmxY,8401 +2015-07-13,71,WkpwSIP4fA42gYd1H3ohw7EmtqdQSqh4ooA7aX8v_7o,1309 +2015-07-13,72,xDdMCHpSKFSZWQBJJgNzNh1R4hXouCsUfKFZpio5cgY,7867 +2015-07-13,73,l0QVMlih2NmGSajDXytku9Em9p61erNKe1LEyk1VZ-Q,7964 +2015-07-13,74,R8G5juHaD9sit1oujjp4FoXzXJT7hdIjEY3Lhu-ep6o,5680 +2015-07-13,75,Ckpy3y166odB33VVWb27XNG_Wi65_qyikeL7dGHceSE,8603 +2015-07-13,76,elFu5tPgUNzhuyswgr3QS7TXR2fInI4PWVZIEffxq6c,4972 +2015-07-13,77,kz663CgkMh9VfcJrfMZb735vJJWYUPAuaskNeg7xRDk,8396 +2015-07-13,78,evuBVl0RR1XQfJHN4jxSBpLcKxjZ7RtpDGYrU2ONYZA,6433 +2015-07-13,79,ZbIJwmWRWscOurtrCam-iLB2mIqREwQwGFRfVYzGxwk,2917 +2015-07-13,80,mzCWiiJFzo1R_anxGFALosK0eKvGfv_RT7iRGZnL790,3162 +2015-07-13,81,JyrXoXLq5RpRwwXNpiW1NFK6ZkVmS55hJsNBGsuY7xY,2385 +2015-07-13,82,fO7A_MQGh3Zojp6HlVZayvJHLu_RQ082ix3Y6BlRCu0,5965 +2015-07-13,83,ib-pOMBLU1sN5fyyJbAElIdWEJgkoqRcBuwo6CVVYsk,3265 +2015-07-13,84,X_6Ren6P7TpqyiWViO72kEwIulMqbTU_v8eAGfEo8k0,8049 +2015-07-13,85,hNI30i9IYx7EreMyG7rI56Y-ZtrRe4sBYjzKMnSrL5I,9222 +2015-07-13,86,kzokOacUOXELAeIHfPbnl-Er8rnHYq2JnksqN1roOSQ,2972 +2015-07-13,87,qKIfkhQObWMadIi5vshcDRv95je4TYcAPSYITfwVTRk,5390 +2015-07-13,88,9xKf3bfWj8Gr1NNocYHZuL0kIkAVD750LCMYDZ-R1tA,4759 +2015-07-13,89,ohbmpvNy7aaaIVZ74SlHSfm0ffdwV-AqJP1bfDSjNUU,2279 +2015-07-13,90,l6lTsvxdlcTfcqx2c0lQSd9HejVQg40W25f0wGNQViY,9034 +2015-07-13,91,XoALSEQg9ycuGqrEWHOb8vdrLbheZSgFO53Wr3mciXY,3945 +2015-07-13,92,0hgDRI_mijs5w7rkiLIe__LEayOOLxL0qVT1IHa5QBw,8109 +2015-07-13,93,KjCRAc-AVcS-R13toBUR6pK_7d9Y8Gl4TRdYYMaSirc,4774 +2015-07-13,94,fyQVGlT8Bqmu_LiajPlgfbmavoNyAqXaBsBP_e4OnN8,7253 +2015-07-13,95,FpBYRPWKu6DmLpx5tsB25URWfj3sNCbcydNAXULaiD8,3166 +2015-07-13,96,9ikvnUqp1Rf2yVwLvs5bBvxQP-KyqxGi4gZRSZ8c1d4,3695 +2015-07-13,97,RRNYDAzKaq4Trtt96Bxgk3N0fXLIV8hXoK0qQ7uw_Wc,5065 +,,,9170 diff --git a/example/sample.yml b/example/sample.yml new file mode 100644 index 0000000..bee13cc --- /dev/null +++ b/example/sample.yml @@ -0,0 +1,39 @@ +in: + type: file + path_prefix: example/data.csv + parser: + type: csv + charset: UTF-8 + newline: CRLF + null_string: 'NULL' + skip_header_lines: 1 + comment_line_marker: '#' + columns: + - {name: time, type: timestamp, format: "%Y-%m-%d"} + - {name: id, type: long} + - {name: name, type: string} + - {name: score, type: double} +out: + type: sftp + host: 127.0.0.1 + port: 22 + user: your_name + password: your_password + secret_key_file: your_secret_key_file + secret_key_passphrase: your_secret_key_passphrase + user_directory_is_root: true + path_prefix: /tmp/embulk_output_sftp/data + file_ext: .tsv + sequence_format: ".%01d%01d" + formatter: + type: csv + delimiter: "\t" + newline: CRLF + newline_in_field: LF + header_line: true + charset: UTF-8 + quote_policy: NONE + quote: "\"" + escape: "\\" + null_string: "" + default_timezone: 'UTC' \ No newline at end of file diff --git a/src/main/java/org/embulk/output/sftp/SftpFileOutput.java b/src/main/java/org/embulk/output/sftp/SftpFileOutput.java new file mode 100644 index 0000000..a73e23b --- /dev/null +++ b/src/main/java/org/embulk/output/sftp/SftpFileOutput.java @@ -0,0 +1,209 @@ +package org.embulk.output.sftp; + +import com.google.common.base.Throwables; +import org.apache.commons.vfs2.FileObject; +import org.apache.commons.vfs2.FileSystemException; +import org.apache.commons.vfs2.FileSystemOptions; +import org.apache.commons.vfs2.Selectors; +import org.apache.commons.vfs2.impl.StandardFileSystemManager; +import org.apache.commons.vfs2.provider.sftp.IdentityInfo; +import org.apache.commons.vfs2.provider.sftp.SftpFileSystemConfigBuilder; +import org.embulk.config.TaskReport; +import org.embulk.spi.Buffer; +import org.embulk.spi.Exec; +import org.embulk.spi.FileOutput; +import org.embulk.spi.TransactionalFileOutput; +import org.slf4j.Logger; + +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.net.URI; +import java.net.URISyntaxException; + +import static org.embulk.output.sftp.SftpFileOutputPlugin.PluginTask; + +/** + * Created by takahiro.nakayama on 10/20/15. + */ +public class SftpFileOutput + implements FileOutput, TransactionalFileOutput +{ + private final Logger logger = Exec.getLogger(SftpFileOutput.class); + private final StandardFileSystemManager manager; + private final FileSystemOptions fsOptions; + private final String userInfo; + private final String host; + private final int port; + private final String pathPrefix; + private final String sequenceFormat; + private final String fileNameExtension; + + private final int taskIndex; + private int fileIndex = 0; + private FileObject currentFile; + private OutputStream currentFileOutputStream; + + private StandardFileSystemManager initializeStandardFileSystemManager() + { + if (logger.isDebugEnabled()) { + // TODO: change logging format: org.apache.commons.logging.Log + System.setProperty("org.apache.commons.logging.Log", "org.apache.commons.logging.impl.NoOpLog"); + } + StandardFileSystemManager manager = new StandardFileSystemManager(); + try { + manager.init(); + } + catch (FileSystemException e) { + logger.error(e.getMessage()); + throw new RuntimeException(e); + } + + return manager; + } + + private String initializeUserInfo(PluginTask task) + { + String userInfo = task.getUser(); + if (task.getPassword().isPresent()) { + userInfo += ":" + task.getPassword().get(); + } + return userInfo; + } + + private FileSystemOptions initializeFsOptions(PluginTask task) + { + FileSystemOptions fsOptions = new FileSystemOptions(); + + try { + SftpFileSystemConfigBuilder.getInstance().setUserDirIsRoot(fsOptions, task.getUserDirIsRoot()); + SftpFileSystemConfigBuilder.getInstance().setTimeout(fsOptions, task.getSftpConnectionTimeout()); + SftpFileSystemConfigBuilder.getInstance().setStrictHostKeyChecking(fsOptions, "no"); + if (task.getSecretKeyFilePath().isPresent()) { + IdentityInfo identityInfo = new IdentityInfo(new File((task.getSecretKeyFilePath().get())), task.getSecretKeyPassphrase().getBytes()); + SftpFileSystemConfigBuilder.getInstance().setIdentityInfo(fsOptions, identityInfo); + } + } + catch (FileSystemException e) { + logger.error(e.getMessage()); + throw new RuntimeException(e); + } + + return fsOptions; + } + + SftpFileOutput(PluginTask task, int taskIndex) + { + this.manager = initializeStandardFileSystemManager(); + this.userInfo = initializeUserInfo(task); + this.fsOptions = initializeFsOptions(task); + this.host = task.getHost(); + this.port = task.getPort(); + this.pathPrefix = task.getPathPrefix(); + this.sequenceFormat = task.getSequenceFormat(); + this.fileNameExtension = task.getFileNameExtension(); + this.taskIndex = taskIndex; + } + + @Override + public void nextFile() + { + closeCurrentFile(); + + try { + currentFile = newSftpFile(getSftpFileUri(getOutputFilePath())); + currentFileOutputStream = currentFile.getContent().getOutputStream(); + logger.info("new sftp file: {}", currentFile.getPublicURIString()); + } + catch (FileSystemException e) { + logger.error(e.getMessage()); + Throwables.propagate(e); + } + } + + @Override + public void add(Buffer buffer) + { + if (currentFile == null) { + throw new IllegalStateException("nextFile() must be called before poll()"); + } + + try { + currentFileOutputStream.write(buffer.array(), buffer.offset(), buffer.limit()); + } + catch (IOException e) { + logger.error(e.getMessage()); + Throwables.propagate(e); + } + buffer.release(); + } + + @Override + public void finish() + { + closeCurrentFile(); + } + + @Override + public void close() + { + closeCurrentFile(); + manager.close(); + } + + @Override + public void abort() + { + } + + @Override + public TaskReport commit() + { + return null; + } + + + private void closeCurrentFile() + { + if (currentFile == null) { + return; + } + + try { + currentFileOutputStream.close(); + currentFile.getContent().close(); + currentFile.close(); + } + catch (IOException e) { + logger.error(e.getMessage()); + Throwables.propagate(e); + } + finally { + fileIndex++; + currentFile = null; + currentFileOutputStream = null; + } + } + + private URI getSftpFileUri(String remoteFilePath) + { + try { + return new URI("sftp", userInfo, host, port, remoteFilePath, null, null); + } + catch (URISyntaxException e) { + logger.error(e.getMessage()); + throw new RuntimeException(e); + } + } + + private String getOutputFilePath() + { + return pathPrefix + String.format(sequenceFormat, taskIndex, fileIndex) + fileNameExtension; + } + + private FileObject newSftpFile(URI sftpUri) + throws FileSystemException + { + return manager.resolveFile(sftpUri.toString(), fsOptions); + } +} diff --git a/src/main/java/org/embulk/output/sftp/SftpFileOutputPlugin.java b/src/main/java/org/embulk/output/sftp/SftpFileOutputPlugin.java index 231bff2..08e0bf8 100644 --- a/src/main/java/org/embulk/output/sftp/SftpFileOutputPlugin.java +++ b/src/main/java/org/embulk/output/sftp/SftpFileOutputPlugin.java @@ -1,53 +1,68 @@ package org.embulk.output.sftp; -import java.util.List; import com.google.common.base.Optional; -import org.embulk.config.TaskReport; import org.embulk.config.Config; import org.embulk.config.ConfigDefault; import org.embulk.config.ConfigDiff; import org.embulk.config.ConfigSource; import org.embulk.config.Task; +import org.embulk.config.TaskReport; import org.embulk.config.TaskSource; import org.embulk.spi.Exec; import org.embulk.spi.FileOutputPlugin; import org.embulk.spi.TransactionalFileOutput; +import org.slf4j.Logger; + +import java.util.List; public class SftpFileOutputPlugin implements FileOutputPlugin { + private Logger logger = Exec.getLogger(SftpFileOutputPlugin.class); + public interface PluginTask extends Task { - // configuration option 1 (required integer) - @Config("option1") - public int getOption1(); + @Config("host") + public String getHost(); + + @Config("port") + @ConfigDefault("22") + public int getPort(); - // configuration option 2 (optional string, null is not allowed) - @Config("optoin2") - @ConfigDefault("\"myvalue\"") - public String getOption2(); + @Config("user") + public String getUser(); - // configuration option 3 (optional string, null is allowed) - @Config("optoin3") + @Config("password") @ConfigDefault("null") - public Optional getOption3(); + public Optional getPassword(); - // usually, run() method needs to write multiple files because size of a file - // can be very large. So, file name will be: - // - // path_prefix + String.format(sequence_format, taskIndex, sequenceCounterInRunMethod) + file_ext - // + @Config("secret_key_file") + @ConfigDefault("null") + public Optional getSecretKeyFilePath(); + + @Config("secret_key_passphrase") + @ConfigDefault("\"\"") + public String getSecretKeyPassphrase(); + + @Config("user_directory_is_root") + @ConfigDefault("true") + public Boolean getUserDirIsRoot(); + + @Config("timeout") + @ConfigDefault("600") // 10 minutes + public int getSftpConnectionTimeout(); - //@Config("path_prefix") - //public String getPathPrefix(); + @Config("path_prefix") + public String getPathPrefix(); - //@Config("file_ext") - //public String getFileNameExtension(); + @Config("file_ext") + public String getFileNameExtension(); + + @Config("sequence_format") + @ConfigDefault("\"%03d.%02d.\"") + public String getSequenceFormat(); - //@Config("sequence_format") - //@ConfigDefault("\"%03d.%02d.\"") - //public String getSequenceFormat(); } @Override @@ -82,12 +97,7 @@ public void cleanup(TaskSource taskSource, @Override public TransactionalFileOutput open(TaskSource taskSource, final int taskIndex) { - PluginTask task = taskSource.loadTask(PluginTask.class); - - // Write your code here :) - throw new UnsupportedOperationException("SftpFileOutputPlugin.open method is not implemented yet"); - - // See LocalFileOutputPlugin as an example implementation: - // https://github.com/embulk/embulk/blob/master/embulk-standards/src/main/java/org/embulk/standards/LocalFileOutputPlugin.java + final PluginTask task = taskSource.loadTask(PluginTask.class); + return new SftpFileOutput(task, taskIndex); } }