From fc177f12cbbbd59d1716c47196eb76382f9a645a Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Mon, 22 Feb 2016 15:02:31 +0900 Subject: [PATCH 1/8] Add more examples --- example/config.yml | 4 +- example/data2.csv.gz | Bin 0 -> 182 bytes ...tream.java => PartialFileInputStream.java} | 6 +- .../embulk/input/hdfs/PartialFileList.java | 346 ++++++++++++++++++ .../org/embulk/input/hdfs/file/HDFSFile.java | 88 ----- .../input/hdfs/file/HDFSPartialFile.java | 103 ------ .../input/hdfs/util/ConfigurationFactory.java | 53 --- .../input/hdfs/util/HDFSFileFactory.java | 34 -- .../input/hdfs/util/HDFSFilePartitioner.java | 37 -- 9 files changed, 352 insertions(+), 319 deletions(-) create mode 100644 example/data2.csv.gz rename src/main/java/org/embulk/input/hdfs/{file/HDFSPartialFileInputStream.java => PartialFileInputStream.java} (95%) create mode 100644 src/main/java/org/embulk/input/hdfs/PartialFileList.java delete mode 100644 src/main/java/org/embulk/input/hdfs/file/HDFSFile.java delete mode 100644 src/main/java/org/embulk/input/hdfs/file/HDFSPartialFile.java delete mode 100644 src/main/java/org/embulk/input/hdfs/util/ConfigurationFactory.java delete mode 100644 src/main/java/org/embulk/input/hdfs/util/HDFSFileFactory.java delete mode 100644 src/main/java/org/embulk/input/hdfs/util/HDFSFilePartitioner.java diff --git a/example/config.yml b/example/config.yml index 982d42c..429d1fb 100644 --- a/example/config.yml +++ b/example/config.yml @@ -16,7 +16,9 @@ local_fs_example: &local_fs_example in: type: hdfs <<: *local_fs_example - path: example/data.csv + path: example/data* + skip_header_lines: 1 + decompression: true parser: charset: UTF-8 newline: CRLF diff --git a/example/data2.csv.gz b/example/data2.csv.gz new file mode 100644 index 0000000000000000000000000000000000000000..669b753528c4c7791becb04885e5ac3ed1728e64 GIT binary patch literal 182 zcmV;n07?HJiwFqxoXS=J17u-zVJ>5Hb}|56j6n{AAP@lWyn=9E6Ltj!?!B4b`T(V* zwJmB;n)LTIZH$T6$;|XuXuTivu)xyC08>u>-p&F17-JX~hES5J6p%z+NGz!4XdRVK z*&mZgHH6sZ*OL*TGIVq$uKLmCkCeL)TY0Uu-3!AdIg(D&Ru~Yce; entries = new ArrayList<>(); + private String last = null; + + private int limitCount = Integer.MAX_VALUE; + private long minTaskSize = 1; + private Pattern pathMatchPattern; + + private final ByteBuffer castBuffer = ByteBuffer.allocate(4); + + public Builder(Task task) + { + this(); + this.pathMatchPattern = Pattern.compile(task.getPathMatchPattern()); + this.limitCount = task.getTotalFileCountLimit(); + this.minTaskSize = task.getMinTaskSize(); + } + + public Builder(ConfigSource config) + { + this(); + this.pathMatchPattern = Pattern.compile(config.get(String.class, "path_match_pattern", ".*")); + this.limitCount = config.get(int.class, "total_file_count_limit", Integer.MAX_VALUE); + this.minTaskSize = config.get(long.class, "min_task_size", 0L); + } + + public Builder() + { + binary = new ByteArrayOutputStream(); + try { + stream = new BufferedOutputStream(new GZIPOutputStream(binary)); + } + catch (IOException ex) { + throw Throwables.propagate(ex); + } + } + + public Builder limitTotalFileCount(int limitCount) + { + this.limitCount = limitCount; + return this; + } + + public Builder minTaskSize(long bytes) + { + this.minTaskSize = bytes; + return this; + } + + public Builder pathMatchPattern(String pattern) + { + this.pathMatchPattern = Pattern.compile(pattern); + return this; + } + + public int size() + { + return entries.size(); + } + + public boolean needsMore() + { + return size() < limitCount; + } + + // returns true if this file is used + public synchronized boolean add(String path, long start, long end) + { + // TODO throw IllegalStateException if stream is already closed + + if (!needsMore()) { + return false; + } + + if (!pathMatchPattern.matcher(path).find()) { + return false; + } + + int index = entries.size(); + entries.add(new Entry(index, start, end)); + + byte[] data = path.getBytes(StandardCharsets.UTF_8); + castBuffer.putInt(0, data.length); + try { + stream.write(castBuffer.array()); + stream.write(data); + } + catch (IOException ex) { + throw Throwables.propagate(ex); + } + + last = path; + return true; + } + + public PartialFileList build() + { + try { + stream.close(); + } + catch (IOException ex) { + throw Throwables.propagate(ex); + } + return new PartialFileList(binary.toByteArray(), getSplits(entries), Optional.fromNullable(last)); + } + + private List> getSplits(List all) + { + List> tasks = new ArrayList<>(); + long currentTaskSize = 0; + List currentTask = new ArrayList<>(); + for (Entry entry : all) { + currentTask.add(entry); + currentTaskSize += entry.getSize(); // TODO consider to multiply the size by cost_per_byte, and add cost_per_file + if (currentTaskSize >= minTaskSize) { + tasks.add(currentTask); + currentTask = new ArrayList<>(); + currentTaskSize = 0; + } + } + if (!currentTask.isEmpty()) { + tasks.add(currentTask); + } + return tasks; + } + } + + private final byte[] data; + private final List> tasks; + private final Optional last; + + @JsonCreator + @Deprecated + public PartialFileList( + @JsonProperty("data") byte[] data, + @JsonProperty("tasks") List> tasks, + @JsonProperty("last") Optional last) + { + this.data = data; + this.tasks = tasks; + this.last = last; + } + + @JsonIgnore + public Optional getLastPath(Optional lastLastPath) + { + if (last.isPresent()) { + return last; + } + return lastLastPath; + } + + @JsonIgnore + public int getTaskCount() + { + return tasks.size(); + } + + @JsonIgnore + public List get(int i) + { + return new EntryList(data, tasks.get(i)); + } + + private class EntryList + extends AbstractList + { + private final byte[] data; + private final List entries; + private InputStream stream; + private int current; + + private final ByteBuffer castBuffer = ByteBuffer.allocate(4); + + public EntryList(byte[] data, List entries) + { + this.data = data; + this.entries = entries; + try { + this.stream = new BufferedInputStream(new GZIPInputStream(new ByteArrayInputStream(data))); + } + catch (IOException ex) { + throw Throwables.propagate(ex); + } + this.current = 0; + } + + @Override + public synchronized PartialFile get(int i) + { + Entry e = entries.get(i); + if (e.getIndex() < current) { + // rewind to the head + try { + stream.close(); + stream = new BufferedInputStream(new GZIPInputStream(new ByteArrayInputStream(data))); + } + catch (IOException ex) { + throw Throwables.propagate(ex); + } + current = 0; + } + + while (current < e.getIndex()) { + readNext(); + } + // now current == e.getIndex() + return readNextString(); + } + + @Override + public int size() + { + return entries.size(); + } + + private byte[] readNext() + { + try { + stream.read(castBuffer.array()); + int n = castBuffer.getInt(0); + byte[] b = new byte[n]; // here should be able to use a pooled buffer because read data is ignored if readNextString doesn't call this method + stream.read(b); + + current++; + + return b; + } + catch (IOException ex) { + throw Throwables.propagate(ex); + } + } + + private String readNextString() + { + return new String(readNext(), StandardCharsets.UTF_8); + } + } +} \ No newline at end of file diff --git a/src/main/java/org/embulk/input/hdfs/file/HDFSFile.java b/src/main/java/org/embulk/input/hdfs/file/HDFSFile.java deleted file mode 100644 index 865d263..0000000 --- a/src/main/java/org/embulk/input/hdfs/file/HDFSFile.java +++ /dev/null @@ -1,88 +0,0 @@ -package org.embulk.input.hdfs.file; - -import org.apache.avro.reflect.Nullable; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.compress.CodecPool; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.Decompressor; - -import java.io.IOException; -import java.io.InputStream; - -/** - * Created by takahiro.nakayama on 2/13/16. - */ -public class HDFSFile -{ - private final Path path; - private final CompressionCodec codec; - private final boolean decompression; - - public HDFSFile(Path path, @Nullable CompressionCodec codec, boolean decompression) - { - this.path = path; - this.codec = codec; - this.decompression = decompression; - } - - public Path getPath() - { - return path; - } - - public CompressionCodec getCodec() - { - return codec; - } - - public boolean getDecompression() - { - return decompression; - } - - public boolean canDecompress() - { - return getCodec() != null && getDecompression(); - } - - public long getLength(FileSystem fs) - throws IOException - { - if (getCodec() == null) { - return fs.getFileStatus(getPath()).getLen(); - } - else if (!decompression) { - return fs.getFileStatus(getPath()).getLen(); - } - else { - long fileLength = 0; - try (InputStream is = createInputStream(fs)) { - while (is.read() > 0) { - fileLength++; - } - } - return fileLength; - } - } - - public InputStream createInputStream(FileSystem fs) - throws IOException - { - if (getCodec() == null) { - return fs.open(getPath()); - } - else if (!getDecompression()) { - return fs.open(getPath()); - } - else { - Decompressor decompressor = CodecPool.getDecompressor(getCodec()); - try { - return getCodec().createInputStream(fs.open(getPath()), decompressor); - } - finally { - CodecPool.returnDecompressor(decompressor); - } - } - } -} diff --git a/src/main/java/org/embulk/input/hdfs/file/HDFSPartialFile.java b/src/main/java/org/embulk/input/hdfs/file/HDFSPartialFile.java deleted file mode 100644 index bd36d7c..0000000 --- a/src/main/java/org/embulk/input/hdfs/file/HDFSPartialFile.java +++ /dev/null @@ -1,103 +0,0 @@ -package org.embulk.input.hdfs.file; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.compress.CompressionCodec; - -import java.io.BufferedInputStream; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.SequenceInputStream; - -/** - * Created by takahiro.nakayama on 2/13/16. - * ref. https://github.com/hito4t/embulk-input-filesplit/blob/master/src/main/java/org/embulk/input/filesplit/PartialFile.java - */ -public class HDFSPartialFile -{ - private final HDFSFile hdfsFile; - private final long start; - private final long end; - - public HDFSPartialFile(HDFSFile hdfsFile, long start, long end) - { - this.hdfsFile = hdfsFile; - this.start = start; - this.end = end; - } - - public HDFSFile getHdfsFile() - { - return hdfsFile; - } - - public long getStart() - { - return start; - } - - public long getEnd() - { - return end; - } - - public Path getPath() - { - return getHdfsFile().getPath(); - } - - public CompressionCodec getCodec() - { - return getHdfsFile().getCodec(); - } - - public InputStream createInputStream(FileSystem fs) - throws IOException - { - InputStream original = getHdfsFile().createInputStream(fs); - return new HDFSPartialFileInputStream(original, getStart(), getEnd()); - } - - public InputStream createHeadersInputStream(FileSystem fs, int numHeaderLines) - throws IOException - { - ByteArrayOutputStream header = new ByteArrayOutputStream(); - int skippedHeaders = 0; - InputStream hdfsFileInputStream = createInputStream(fs); - try (BufferedInputStream in = new BufferedInputStream(hdfsFileInputStream)) { - while (true) { - int c = in.read(); - if (c < 0) { - break; - } - - header.write(c); - - if (c == '\n') { - skippedHeaders++; - } - else if (c == '\r') { - int c2 = in.read(); - if (c2 == '\n') { - header.write(c2); - } - skippedHeaders++; - } - - if (skippedHeaders >= numHeaderLines) { - break; - } - } - } - header.close(); - return new ByteArrayInputStream(header.toByteArray()); - } - - public InputStream createInputStreamWithHeaders(FileSystem fs, int numHeaderLines) - throws IOException - { - return new SequenceInputStream(createHeadersInputStream(fs, numHeaderLines), createInputStream(fs)); - } -} diff --git a/src/main/java/org/embulk/input/hdfs/util/ConfigurationFactory.java b/src/main/java/org/embulk/input/hdfs/util/ConfigurationFactory.java deleted file mode 100644 index 8e0794a..0000000 --- a/src/main/java/org/embulk/input/hdfs/util/ConfigurationFactory.java +++ /dev/null @@ -1,53 +0,0 @@ -package org.embulk.input.hdfs.util; - -import org.apache.hadoop.conf.Configuration; -import org.embulk.config.ConfigException; -import org.embulk.input.hdfs.HdfsFileInputPlugin.PluginTask; -import org.embulk.spi.Exec; -import org.slf4j.Logger; - -import java.io.File; -import java.net.MalformedURLException; -import java.util.Map; - -/** - * Created by takahiro.nakayama on 2/9/16. - */ - -public class ConfigurationFactory -{ - private static final Logger logger = Exec.getLogger(ConfigurationFactory.class); - - private ConfigurationFactory() - { - } - - public static Configuration fromTask(PluginTask task) - { - Configuration configuration = newConfiguration(); - for (String configFile : task.getConfigFiles()) { - File file = new File(configFile); - try { - configuration.addResource(file.toURI().toURL()); - } - catch (MalformedURLException e) { - throw new ConfigException(e); - } - } - for (Map.Entry entry : task.getConfig().entrySet()) { - configuration.set(entry.getKey(), entry.getValue()); - } - // For debug - for (Map.Entry entry : configuration) { - logger.trace("{}: {}", entry.getKey(), entry.getValue()); - } - logger.trace("Resource Files: {}", configuration); - return configuration; - - } - - public static Configuration newConfiguration() - { - return new Configuration(); - } -} diff --git a/src/main/java/org/embulk/input/hdfs/util/HDFSFileFactory.java b/src/main/java/org/embulk/input/hdfs/util/HDFSFileFactory.java deleted file mode 100644 index 35daf0b..0000000 --- a/src/main/java/org/embulk/input/hdfs/util/HDFSFileFactory.java +++ /dev/null @@ -1,34 +0,0 @@ -package org.embulk.input.hdfs.util; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.compress.CompressionCodecFactory; -import org.embulk.input.hdfs.file.HDFSFile; - -/** - * Created by takahiro.nakayama on 2/15/16. - */ -public class HDFSFileFactory -{ - private final CompressionCodecFactory codecFactory; - - public HDFSFileFactory(Configuration configuration) - { - this.codecFactory = new CompressionCodecFactory(configuration); - } - - private CompressionCodecFactory getCodecFactory() - { - return codecFactory; - } - - public HDFSFile create(Path path) - { - return create(path, true); - } - - public HDFSFile create(Path path, boolean decompression) - { - return new HDFSFile(path, getCodecFactory().getCodec(path), decompression); - } -} diff --git a/src/main/java/org/embulk/input/hdfs/util/HDFSFilePartitioner.java b/src/main/java/org/embulk/input/hdfs/util/HDFSFilePartitioner.java deleted file mode 100644 index 8f5304c..0000000 --- a/src/main/java/org/embulk/input/hdfs/util/HDFSFilePartitioner.java +++ /dev/null @@ -1,37 +0,0 @@ -package org.embulk.input.hdfs.util; - -import com.google.common.collect.ImmutableList; -import org.embulk.input.hdfs.file.HDFSFile; -import org.embulk.input.hdfs.file.HDFSPartialFile; - -import java.util.List; - -/** - * Created by takahiro.nakayama on 2/15/16. - */ -public class HDFSFilePartitioner -{ - private final HDFSFile hdfsFile; - private final long fileLength; - private final long numPartitions; - - public HDFSFilePartitioner(HDFSFile hdfsFile, long fileLength, long numPartitions) - { - this.hdfsFile = hdfsFile; - this.fileLength = fileLength; - this.numPartitions = numPartitions; - } - - public List generateHDFSPartialFiles() - { - ImmutableList.Builder builder = ImmutableList.builder(); - for (long i = 0; i < numPartitions; i++) { - long start = fileLength * i / numPartitions; - long end = fileLength * (i + 1) / numPartitions; - if (start < end) { - builder.add(new HDFSPartialFile(hdfsFile, start, end)); - } - } - return builder.build(); - } -} From b2aed1f1649ec2bcc98a22f9db2b704ef9286c9e Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Mon, 22 Feb 2016 15:05:04 +0900 Subject: [PATCH 2/8] Use FileList class as PartialFileList from https://github.com/embulk/embulk-input-s3/blob/b546158123a734acf0785d61400c69fcdd910ed6/embulk-input-s3/src/main/java/org/embulk/input/s3/FileList.java --- .../embulk/input/hdfs/PartialFileList.java | 96 +++++++++++-------- 1 file changed, 55 insertions(+), 41 deletions(-) diff --git a/src/main/java/org/embulk/input/hdfs/PartialFileList.java b/src/main/java/org/embulk/input/hdfs/PartialFileList.java index 08d398b..3950f6f 100644 --- a/src/main/java/org/embulk/input/hdfs/PartialFileList.java +++ b/src/main/java/org/embulk/input/hdfs/PartialFileList.java @@ -26,6 +26,8 @@ /** * Created by takahiro.nakayama on 2/20/16. + * Ported from https://github.com/embulk/embulk-input-s3/blob/master/embulk-input-s3/src/main/java/org/embulk/input/s3/FileList.java + * and Modified for this package. */ public class PartialFileList { @@ -45,46 +47,31 @@ public interface Task long getMinTaskSize(); } -// public static class Entry -// { -// private int index; -// private long size; -// -// @JsonCreator -// public Entry( -// @JsonProperty("index") int index, -// @JsonProperty("size") long size) -// { -// this.index = index; -// this.size = size; -// } -// -// @JsonProperty("index") -// public int getIndex() { return index; } -// -// @JsonProperty("size") -// public long getSize() { return size; } -// } - public static class Entry { private int index; private long start; private long end; + private boolean canDecompress; @JsonCreator public Entry( @JsonProperty("index") int index, @JsonProperty("start") long start, - @JsonProperty("end") long end) + @JsonProperty("end") long end, + @JsonProperty("can_decompress") boolean canDecompress) { this.index = index; this.start = start; this.end = end; + this.canDecompress = canDecompress; } @JsonProperty("index") - public int getIndex() { return index; } + public int getIndex() + { + return index; + } @JsonProperty("start") public long getStart() @@ -98,8 +85,17 @@ public long getEnd() return end; } + @JsonProperty("can_decompress") + public boolean getCanDecompress() + { + return canDecompress; + } + @JsonIgnore - public long getSize() { return getEnd() - getStart(); } + public long getSize() + { + return getEnd() - getStart(); + } } public static class Builder @@ -171,7 +167,7 @@ public boolean needsMore() } // returns true if this file is used - public synchronized boolean add(String path, long start, long end) + public synchronized boolean add(String path, long start, long end, boolean canDecompress) { // TODO throw IllegalStateException if stream is already closed @@ -184,7 +180,7 @@ public synchronized boolean add(String path, long start, long end) } int index = entries.size(); - entries.add(new Entry(index, start, end)); + entries.add(new Entry(index, start, end, canDecompress)); byte[] data = path.getBytes(StandardCharsets.UTF_8); castBuffer.putInt(0, data.length); @@ -192,8 +188,8 @@ public synchronized boolean add(String path, long start, long end) stream.write(castBuffer.array()); stream.write(data); } - catch (IOException ex) { - throw Throwables.propagate(ex); + catch (IOException e) { + throw Throwables.propagate(e); } last = path; @@ -205,8 +201,8 @@ public PartialFileList build() try { stream.close(); } - catch (IOException ex) { - throw Throwables.propagate(ex); + catch (IOException e) { + throw Throwables.propagate(e); } return new PartialFileList(binary.toByteArray(), getSplits(entries), Optional.fromNullable(last)); } @@ -237,7 +233,6 @@ private List> getSplits(List all) private final Optional last; @JsonCreator - @Deprecated public PartialFileList( @JsonProperty("data") byte[] data, @JsonProperty("tasks") List> tasks, @@ -269,6 +264,24 @@ public List get(int i) return new EntryList(data, tasks.get(i)); } + @JsonProperty("data") + public byte[] getData() + { + return data; + } + + @JsonProperty("tasks") + public List> getTasks() + { + return tasks; + } + + @JsonProperty("last") + public Optional getLast() + { + return last; + } + private class EntryList extends AbstractList { @@ -286,8 +299,8 @@ public EntryList(byte[] data, List entries) try { this.stream = new BufferedInputStream(new GZIPInputStream(new ByteArrayInputStream(data))); } - catch (IOException ex) { - throw Throwables.propagate(ex); + catch (IOException e) { + throw Throwables.propagate(e); } this.current = 0; } @@ -295,24 +308,25 @@ public EntryList(byte[] data, List entries) @Override public synchronized PartialFile get(int i) { - Entry e = entries.get(i); - if (e.getIndex() < current) { + Entry entry = entries.get(i); + if (entry.getIndex() < current) { // rewind to the head try { stream.close(); stream = new BufferedInputStream(new GZIPInputStream(new ByteArrayInputStream(data))); } - catch (IOException ex) { - throw Throwables.propagate(ex); + catch (IOException e) { + throw Throwables.propagate(e); } current = 0; } - while (current < e.getIndex()) { + while (current < entry.getIndex()) { readNext(); } // now current == e.getIndex() - return readNextString(); + return new PartialFile(readNextString(), + entry.getStart(), entry.getEnd(), entry.getCanDecompress()); } @Override @@ -333,8 +347,8 @@ private byte[] readNext() return b; } - catch (IOException ex) { - throw Throwables.propagate(ex); + catch (IOException e) { + throw Throwables.propagate(e); } } From eff77cbb31fac370d03a930871934e8009f56493 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Mon, 22 Feb 2016 15:05:37 +0900 Subject: [PATCH 3/8] Modify as following PartialFileList --- .../input/hdfs/TestHdfsFileInputPlugin.java | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/src/test/java/org/embulk/input/hdfs/TestHdfsFileInputPlugin.java b/src/test/java/org/embulk/input/hdfs/TestHdfsFileInputPlugin.java index ffd499a..f896575 100644 --- a/src/test/java/org/embulk/input/hdfs/TestHdfsFileInputPlugin.java +++ b/src/test/java/org/embulk/input/hdfs/TestHdfsFileInputPlugin.java @@ -12,7 +12,6 @@ import org.embulk.config.TaskReport; import org.embulk.config.TaskSource; import org.embulk.input.hdfs.HdfsFileInputPlugin.PluginTask; -import org.embulk.input.hdfs.file.HDFSPartialFile; import org.embulk.spi.Exec; import org.embulk.spi.FileInputPlugin; import org.embulk.spi.FileInputRunner; @@ -31,6 +30,7 @@ import java.io.File; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import static org.junit.Assert.assertEquals; @@ -102,17 +102,12 @@ public String apply(@Nullable String input) } }); - List resultFList = Lists.transform(plugin.getHDFSPartialFiles(), new Function() - { - @Nullable - @Override - public String apply(@Nullable HDFSPartialFile input) - { - assert input != null; - return input.getPath().toString(); + List resultFList = Lists.newArrayList(); + for (int i = 0; i < task.getPartialFileList().getTaskCount();i++) { + for (PartialFile partialFile : task.getPartialFileList().get(i)) { + resultFList.add(partialFile.getPath().toString()); } - }); - + } assertEquals(fileList.size(), resultFList.size()); assert fileList.containsAll(resultFList); return emptyTaskReports(taskCount); @@ -157,7 +152,7 @@ public void testStrftime() config.set("path", "/tmp/%Y-%m-%d"); config.set("rewind_seconds", 86400); PluginTask task = config.loadConfig(PluginTask.class); - String result = plugin.strftime(task, task.getPath(), task.getRewindSeconds()); + String result = plugin.strftime(task.getJRuby(), task.getPath(), task.getRewindSeconds()); String expected = task.getJRuby().runScriptlet("(Time.now - 86400).strftime('/tmp/%Y-%m-%d')").toString(); assertEquals(expected, result); } From 3427f205a532bcd024cc88f644795ddbb1dd1935 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Mon, 22 Feb 2016 15:06:08 +0900 Subject: [PATCH 4/8] Add ConfigurationBuilder --- .../input/hdfs/ConfigurationBuilder.java | 82 +++++++++++++++++++ 1 file changed, 82 insertions(+) create mode 100644 src/main/java/org/embulk/input/hdfs/ConfigurationBuilder.java diff --git a/src/main/java/org/embulk/input/hdfs/ConfigurationBuilder.java b/src/main/java/org/embulk/input/hdfs/ConfigurationBuilder.java new file mode 100644 index 0000000..3fe0bd3 --- /dev/null +++ b/src/main/java/org/embulk/input/hdfs/ConfigurationBuilder.java @@ -0,0 +1,82 @@ +package org.embulk.input.hdfs; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.embulk.config.ConfigException; +import org.embulk.spi.Exec; +import org.slf4j.Logger; + +import java.io.File; +import java.net.MalformedURLException; +import java.util.List; +import java.util.Map; + +/** + * Created by takahiro.nakayama on 2/22/16. + */ +public class ConfigurationBuilder +{ + private static final Logger logger = Exec.getLogger(ConfigurationBuilder.class); + private final ImmutableList.Builder configFilesBuilder; + private final ImmutableMap.Builder configMapBuilder; + + public ConfigurationBuilder() + { + this.configFilesBuilder = ImmutableList.builder(); + this.configMapBuilder = ImmutableMap.builder(); + } + + public ConfigurationBuilder addConfigFiles(List configFiles) + { + for (String configFile : configFiles) { + addConfigFile(configFile); + } + return this; + } + + public ConfigurationBuilder addConfigFile(String configFile) + { + configFilesBuilder.add(configFile); + return this; + } + + public ConfigurationBuilder addConfigMap(Map configMap) + { + for (Map.Entry entry : configMap.entrySet()) { + addConfig(entry.getKey(), entry.getValue()); + } + return this; + } + + public ConfigurationBuilder addConfig(String key, String value) + { + configMapBuilder.put(key, value); + return this; + } + + public Configuration build() + { + Configuration configuration = new Configuration(); + for (String configFile : configFilesBuilder.build()) { + File file = new File(configFile); + try { + configuration.addResource(file.toURI().toURL()); + } + catch (MalformedURLException e) { + throw new ConfigException(e); + } + } + for (Map.Entry entry : configMapBuilder.build().entrySet()) { + configuration.set(entry.getKey(), entry.getValue()); + } + // For debug + for (Map.Entry entry : configuration) { + logger.trace("{}: {}", entry.getKey(), entry.getValue()); + } + logger.trace("Resource Files: {}", configuration); + return configuration; + } +} From 111ce227fc4fb7e2bc3a4c8695a82f15ac515746 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Mon, 22 Feb 2016 15:07:10 +0900 Subject: [PATCH 5/8] Add PartialFile/PartialFileInputStreamBuilder --- .../org/embulk/input/hdfs/PartialFile.java | 48 +++++++ .../hdfs/PartialFileInputStreamBuilder.java | 125 ++++++++++++++++++ 2 files changed, 173 insertions(+) create mode 100644 src/main/java/org/embulk/input/hdfs/PartialFile.java create mode 100644 src/main/java/org/embulk/input/hdfs/PartialFileInputStreamBuilder.java diff --git a/src/main/java/org/embulk/input/hdfs/PartialFile.java b/src/main/java/org/embulk/input/hdfs/PartialFile.java new file mode 100644 index 0000000..cf5faaf --- /dev/null +++ b/src/main/java/org/embulk/input/hdfs/PartialFile.java @@ -0,0 +1,48 @@ +package org.embulk.input.hdfs; + +import org.apache.hadoop.fs.Path; + +/** + * Created by takahiro.nakayama on 2/20/16. + * is the same as PartialFileList.Entry, so this class does not need? + */ +public class PartialFile +{ + private final Path path; + private final long start; + private final long end; + private final boolean canDecompress; + + public PartialFile(String path, long start, long end, boolean canDecompress) + { + this(new Path(path), start, end, canDecompress); + } + + public PartialFile(Path path, long start, long end, boolean canDecompress) + { + this.path = path; + this.start = start; + this.end = end; + this.canDecompress = canDecompress; + } + + public Path getPath() + { + return path; + } + + public long getStart() + { + return start; + } + + public long getEnd() + { + return end; + } + + public boolean getCanDecompress() + { + return canDecompress; + } +} diff --git a/src/main/java/org/embulk/input/hdfs/PartialFileInputStreamBuilder.java b/src/main/java/org/embulk/input/hdfs/PartialFileInputStreamBuilder.java new file mode 100644 index 0000000..78a17db --- /dev/null +++ b/src/main/java/org/embulk/input/hdfs/PartialFileInputStreamBuilder.java @@ -0,0 +1,125 @@ +package org.embulk.input.hdfs; + +import com.google.common.base.Optional; +import com.google.common.base.Throwables; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.compress.CodecPool; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.io.compress.Decompressor; +import org.embulk.spi.Exec; +import org.slf4j.Logger; + +import java.io.BufferedInputStream; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.SequenceInputStream; + +/** + * Created by takahiro.nakayama on 2/21/16. + */ +public class PartialFileInputStreamBuilder +{ + private static final Logger logger = Exec.getLogger(PartialFileInputStreamBuilder.class); + private final FileSystem fs; + private final PartialFile partialFile; + private int numHeaderLines = 0; + + public PartialFileInputStreamBuilder(FileSystem fs, PartialFile partialFile) + { + this.fs = fs; + this.partialFile = partialFile; + } + + public InputStream build() + throws IOException + { + logger.trace("path: {}, start: {}, end: {}, num_header_lines: {}", + partialFile.getPath(), partialFile.getStart(), partialFile.getEnd(), numHeaderLines); + if (partialFile.getStart() > 0 && numHeaderLines > 0) { + return new SequenceInputStream(createHeadersInputStream(), createPartialFileInputStream()); + } + else { + return createPartialFileInputStream(); + } + } + + public PartialFileInputStreamBuilder withHeaders(int numHeaderLines) + { + this.numHeaderLines = numHeaderLines; + return this; + } + + private InputStream createOriginalFileWrappedInputStream() + { + InputStream original = createOriginalFileInputStream(); + CompressionCodec codec = new CompressionCodecFactory(fs.getConf()).getCodec(partialFile.getPath()); + if (partialFile.getCanDecompress() && codec != null) { + try { + return codec.createInputStream(original); + } + catch (IOException e) { + throw Throwables.propagate(e); + } + } + else { + return original; + } + } + + private InputStream createOriginalFileInputStream() + { + try { + return fs.open(partialFile.getPath()); + } + catch (IOException e) { + throw Throwables.propagate(e); + } + } + + // memo: compressioncodec使ったinputstream作る奴いても良いような… + // じゃないと、headers もおかしくなるような…ならんか + + private InputStream createPartialFileInputStream() + { + InputStream original = createOriginalFileWrappedInputStream(); + return new PartialFileInputStream(original, partialFile.getStart(), partialFile.getEnd()); + } + + private InputStream createHeadersInputStream() + throws IOException + { + ByteArrayOutputStream header = new ByteArrayOutputStream(); + int skippedHeaders = 0; + InputStream original = createOriginalFileWrappedInputStream(); + try (BufferedInputStream in = new BufferedInputStream(original)) { + while (true) { + int c = in.read(); + if (c < 0) { + break; + } + + header.write(c); + + if (c == '\n') { + skippedHeaders++; + } + else if (c == '\r') { + int c2 = in.read(); + if (c2 == '\n') { + header.write(c2); + } + skippedHeaders++; + } + + if (skippedHeaders >= numHeaderLines) { + break; + } + } + } + header.close(); + return new ByteArrayInputStream(header.toByteArray()); + } +} From 460737679c803316081742afaf2236225773658f Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Mon, 22 Feb 2016 15:07:36 +0900 Subject: [PATCH 6/8] Refactor HdfsFileInputPlugin --- .../input/hdfs/HdfsFileInputPlugin.java | 303 +++++++++++------- 1 file changed, 183 insertions(+), 120 deletions(-) diff --git a/src/main/java/org/embulk/input/hdfs/HdfsFileInputPlugin.java b/src/main/java/org/embulk/input/hdfs/HdfsFileInputPlugin.java index 219f486..39cd604 100644 --- a/src/main/java/org/embulk/input/hdfs/HdfsFileInputPlugin.java +++ b/src/main/java/org/embulk/input/hdfs/HdfsFileInputPlugin.java @@ -2,6 +2,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; +import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; @@ -10,6 +11,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.PathNotFoundException; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.embulk.config.Config; import org.embulk.config.ConfigDefault; import org.embulk.config.ConfigDiff; @@ -18,21 +21,17 @@ import org.embulk.config.Task; import org.embulk.config.TaskReport; import org.embulk.config.TaskSource; -import org.embulk.input.hdfs.file.HDFSFile; -import org.embulk.input.hdfs.file.HDFSPartialFile; -import org.embulk.input.hdfs.util.ConfigurationFactory; -import org.embulk.input.hdfs.util.HDFSFileFactory; -import org.embulk.input.hdfs.util.HDFSFilePartitioner; +import org.embulk.spi.BufferAllocator; import org.embulk.spi.Exec; import org.embulk.spi.FileInputPlugin; import org.embulk.spi.TransactionalFileInput; -import org.embulk.spi.util.InputStreamTransactionalFileInput; +import org.embulk.spi.util.InputStreamFileInput; import org.jruby.embed.ScriptingContainer; import org.slf4j.Logger; import java.io.IOException; import java.io.InputStream; -import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.Map; @@ -40,7 +39,7 @@ public class HdfsFileInputPlugin implements FileInputPlugin { public interface PluginTask - extends Task + extends Task, PartialFileList.Task { @Config("config_files") @ConfigDefault("[]") @@ -73,114 +72,95 @@ public interface PluginTask @ConfigDefault("false") // when getting FileInputStream. boolean getDecompression(); + PartialFileList getPartialFileList(); + void setPartialFileList(PartialFileList partialFileList); + @ConfigInject ScriptingContainer getJRuby(); + + @ConfigInject + BufferAllocator getBufferAllocator(); } private static final Logger logger = Exec.getLogger(HdfsFileInputPlugin.class); private Optional configurationContainer = Optional.absent(); - private Optional fsContainer = Optional.absent(); - private Optional> hdfsPartialFilesContainer = Optional.absent(); - - private void setContainers(PluginTask task) - { - if (!configurationContainer.isPresent()) { - setConfiguration(ConfigurationFactory.fromTask(task)); - } - - if (!fsContainer.isPresent()) { - try { - setFileSystem(FileSystem.get(getConfiguration())); - } - catch (IOException e) { - throw new RuntimeException(e); - } - } - } - - private void setConfiguration(Configuration configuration) - { - configurationContainer = Optional.of(configuration); - } - - private Configuration getConfiguration() - { - return configurationContainer.get(); - } - - private void setFileSystem(FileSystem fs) - { - fsContainer = Optional.of(fs); - } - - private FileSystem getFS() - { - return fsContainer.get(); - } - - @VisibleForTesting - void setHDFSPartialFiles(List hdfsPartialFiles) - { - hdfsPartialFilesContainer = Optional.of(hdfsPartialFiles); - } - - @VisibleForTesting - List getHDFSPartialFiles() - { - return hdfsPartialFilesContainer.get(); - } @Override public ConfigDiff transaction(ConfigSource config, FileInputPlugin.Control control) { PluginTask task = config.loadConfig(PluginTask.class); - setContainers(task); + Configuration configuration = getConfiguration(task); // listing Files try { - String pathString = strftime(task, task.getPath(), task.getRewindSeconds()); + FileSystem fs = getFS(configuration); + + String pathString = strftime(task.getJRuby(), task.getPath(), task.getRewindSeconds()); Path rootPath = new Path(pathString); - List originalFileList = buildFileList(getFS(), rootPath); + List originalFileList = buildOriginalFileList(fs, rootPath); if (originalFileList.isEmpty()) { throw new PathNotFoundException(pathString); } logger.debug("embulk-input-hdfs: Loading target files: {}", originalFileList); - setHDFSPartialFiles(allocateHdfsFilesToNumTasks(task, originalFileList)); + PartialFileList list = buildPartialFileList(task, originalFileList); + task.setPartialFileList(list); } catch (IOException e) { logger.error(e.getMessage()); throw new RuntimeException(e); } - // log the detail of partial files. - for (HDFSPartialFile partialFile : getHDFSPartialFiles()) { - logger.debug("embulk-input-hdfs: target file: {}, start: {}, end: {}", - partialFile.getPath(), partialFile.getStart(), partialFile.getEnd()); - } - // number of processors is same with number of targets - int taskCount = getHDFSPartialFiles().size(); + int taskCount = task.getPartialFileList().getTaskCount(); logger.info("embulk-input-hdfs: task size: {}", taskCount); return resume(task.dump(), taskCount, control); } + private Configuration getConfiguration(PluginTask task) + { + if (configurationContainer.isPresent()) { + return configurationContainer.get(); + } + + ConfigurationBuilder builder = new ConfigurationBuilder(); + builder.addConfigFiles(task.getConfigFiles()); + builder.addConfigMap(task.getConfig()); + configurationContainer = Optional.of(builder.build()); + return configurationContainer.get(); + } + + private FileSystem getFS(Configuration configuration) + { + try { + return FileSystem.get(configuration); + } + catch (IOException e) { + throw Throwables.propagate(e); + } + } + @VisibleForTesting - String strftime(final PluginTask task, final String format, final int rewindSeconds) + String strftime(final ScriptingContainer jruby, final String format, final int rewindSeconds) { String script = String.format("(Time.now - %d).strftime('%s')", rewindSeconds, format); - return task.getJRuby().runScriptlet(script).toString(); + return jruby.runScriptlet(script).toString(); } - private List buildFileList(FileSystem fs, Path rootPath) - throws IOException + private List buildOriginalFileList(FileSystem fs, Path rootPath) { List fileList = Lists.newArrayList(); - final FileStatus[] entries = fs.globStatus(rootPath); + final FileStatus[] entries; + try { + entries = fs.globStatus(rootPath); + } + catch (IOException e) { + throw Throwables.propagate(e); + } // `globStatus` does not throw PathNotFoundException. // return null instead. // see: https://github.com/apache/hadoop/blob/branch-2.7.0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Globber.java#L286 @@ -190,7 +170,8 @@ private List buildFileList(FileSystem fs, Path rootPath) for (FileStatus entry : entries) { if (entry.isDirectory()) { - fileList.addAll(listRecursive(fs, entry)); + List subEntries = listRecursive(fs, entry); + fileList.addAll(subEntries); } else { fileList.add(entry.getPath()); @@ -201,11 +182,18 @@ private List buildFileList(FileSystem fs, Path rootPath) } private List listRecursive(final FileSystem fs, FileStatus status) - throws IOException { - List fileList = new ArrayList<>(); + List fileList = Lists.newArrayList(); if (status.isDirectory()) { - for (FileStatus entry : fs.listStatus(status.getPath())) { + FileStatus[] entries; + try { + entries = fs.listStatus(status.getPath()); + } + catch (IOException e) { + throw Throwables.propagate(e); + } + + for (FileStatus entry : entries) { fileList.addAll(listRecursive(fs, entry)); } } @@ -215,30 +203,32 @@ private List listRecursive(final FileSystem fs, FileStatus status) return fileList; } - private List allocateHdfsFilesToNumTasks(PluginTask task, List pathList) - throws IOException + private PartialFileList buildPartialFileList(PluginTask task, List pathList) { - HDFSFileFactory factory = new HDFSFileFactory(getConfiguration()); - FileSystem fs = getFS(); + Configuration configuration = getConfiguration(task); + FileSystem fs = getFS(configuration); + boolean shouldPartition = task.getPartition(); + boolean shouldDecompress = task.getDecompression(); - Map hdfsFileLengthMap = Maps.newHashMap(); + Map pathLengthMap = Maps.newHashMap(); long totalFileLength = 0; for (Path path : pathList) { - HDFSFile file = factory.create(path, task.getDecompression()); - long fileLength = file.getLength(fs); + long fileLength = getHdfsFileLength(fs, path, shouldDecompress); if (fileLength <= 0) { - logger.info("`embulk-input-hdfs`: Skip the 0 byte target file: {}", path); + logger.info("Skip the 0 byte target file: {}", path); continue; } - hdfsFileLengthMap.put(file, fileLength); + pathLengthMap.put(path, fileLength); totalFileLength += fileLength; } if (totalFileLength <= 0) { - throw new PathIOException(task.getPath(), "`embulk-input-hdfs`: All files are empty"); + throw Throwables.propagate(new PathIOException(task.getPath(), "All files are empty")); } + PartialFileList.Builder builder = new PartialFileList.Builder(task); + // TODO: optimum allocation of resources final long approximateNumPartitions; if (task.getApproximateNumPartitions() <= 0) { @@ -253,17 +243,16 @@ private List allocateHdfsFilesToNumTasks(PluginTask task, List< partitionSizeByOneTask = 1; } - List hdfsPartialFiles = Lists.newArrayList(); - for (Map.Entry entry : hdfsFileLengthMap.entrySet()) { - HDFSFile file = entry.getKey(); + for (Map.Entry entry : pathLengthMap.entrySet()) { + Path path = entry.getKey(); long fileLength = entry.getValue(); long numPartitions; - if (task.getPartition()) { - if (file.canDecompress()) { + if (shouldPartition) { + if (shouldDecompress && getHdfsFileCompressionCodec(fs, path) != null) { numPartitions = ((fileLength - 1) / partitionSizeByOneTask) + 1; } - else if (file.getCodec() != null) { // if not null, the file is compressed. + else if (getHdfsFileCompressionCodec(fs, path) != null) { // if not null, the file is compressed. numPartitions = 1; } else { @@ -274,11 +263,60 @@ else if (file.getCodec() != null) { // if not null, the file is compressed. numPartitions = 1; } - HDFSFilePartitioner partitioner = new HDFSFilePartitioner(file, fileLength, numPartitions); - hdfsPartialFiles.addAll(partitioner.generateHDFSPartialFiles()); + for (long i = 0; i < numPartitions; i++) { + long start = fileLength * i / numPartitions; + long end = fileLength * (i + 1) / numPartitions; + if (start < end) { + logger.debug("PartialFile: path {}, start: {}, end: {}", path, start, end); + builder.add(path.toString(), start, end, shouldDecompress && getHdfsFileCompressionCodec(fs, path) != null); + } + } } - return hdfsPartialFiles; + return builder.build(); + } + + private Long getHdfsFileLength(FileSystem fs, Path path, boolean shouldDecompression) + { + CompressionCodec codec = getHdfsFileCompressionCodec(fs, path); + if (codec == null) { + try { + return fs.getFileStatus(path).getLen(); + } + catch (IOException e) { + throw Throwables.propagate(e); + } + } + else if (!shouldDecompression) { + try { + return fs.getFileStatus(path).getLen(); + } + catch (IOException e) { + throw Throwables.propagate(e); + } + } + else { + long fileLength = 0; + try (InputStream is = codec.createInputStream(fs.open(path))) { + while (is.read() > 0) { + fileLength++; + } + } + catch (IOException e) { + throw Throwables.propagate(e); + } + return fileLength; + } + } + + private CompressionCodec getHdfsFileCompressionCodec(FileSystem fs, Path path) + { + return getHdfsFileCompressionCodec(fs.getConf(), path); + } + + private CompressionCodec getHdfsFileCompressionCodec(Configuration configuration, Path path) + { + return new CompressionCodecFactory(configuration).getCodec(path); } @Override @@ -302,34 +340,59 @@ public void cleanup(TaskSource taskSource, public TransactionalFileInput open(TaskSource taskSource, int taskIndex) { final PluginTask task = taskSource.loadTask(PluginTask.class); - setContainers(task); + return new HdfsFileInput(task, taskIndex); + } - InputStream input; - final HDFSPartialFile file = getHDFSPartialFiles().get(taskIndex); - try { - if (file.getStart() > 0 && task.getSkipHeaderLines() > 0) { - input = file.createInputStreamWithHeaders(getFS(), task.getSkipHeaderLines()); - } - else { - input = file.createInputStream(getFS()); - } + public class HdfsFileInput + extends InputStreamFileInput + implements TransactionalFileInput + { + + public HdfsFileInput(PluginTask task, int taskIndex) + { + super(task.getBufferAllocator(), new SingleFileProvider(task, taskIndex)); } - catch (IOException e) { - logger.error(e.getMessage()); - throw new RuntimeException(e); + + @Override + public void abort() + { + } + + @Override + public TaskReport commit() + { + return Exec.newTaskReport(); + } + } + + // TODO create single-file InputStreamFileInput utility + private class SingleFileProvider + implements InputStreamFileInput.Provider + { + private final FileSystem fs; + private final int numHeaderLines; + private final Iterator iterator; + + public SingleFileProvider(PluginTask task, int taskIndex) + { + this.fs = getFS(getConfiguration(task)); + this.numHeaderLines = task.getSkipHeaderLines(); + this.iterator = task.getPartialFileList().get(taskIndex).iterator(); } - return new InputStreamTransactionalFileInput(Exec.getBufferAllocator(), input) + @Override + public InputStream openNext() throws IOException { - @Override - public void abort() - { } - - @Override - public TaskReport commit() - { - return Exec.newTaskReport(); + if (!iterator.hasNext()) { + return null; } - }; + PartialFileInputStreamBuilder builder = new PartialFileInputStreamBuilder(fs, iterator.next()).withHeaders(numHeaderLines); + return builder.build(); + } + + @Override + public void close() + { + } } } From ca8ef2578bd0507c42441b7a1714884933b99e76 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Mon, 22 Feb 2016 19:53:10 +0900 Subject: [PATCH 7/8] Avoid null pointer exception --- example/config.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/example/config.yml b/example/config.yml index 429d1fb..7200145 100644 --- a/example/config.yml +++ b/example/config.yml @@ -12,6 +12,7 @@ local_fs_example: &local_fs_example fs.defaultFS: 'file:///' fs.hdfs.impl: 'org.apache.hadoop.fs.LocalFileSystem' fs.file.impl: 'org.apache.hadoop.fs.LocalFileSystem' + io.compression.codecs: 'org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.BZip2Codec' in: type: hdfs From 33bcdfcbd7664c88f993e89eaf2ccd8ec8bc82bf Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Thu, 25 Feb 2016 10:47:32 +0900 Subject: [PATCH 8/8] Release v0.2.1 --- CHENGELOG.md | 6 +++++- build.gradle | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/CHENGELOG.md b/CHENGELOG.md index 83ac1bd..602c2ea 100644 --- a/CHENGELOG.md +++ b/CHENGELOG.md @@ -1,3 +1,7 @@ -0.2.0 (2016-02-xx) +0.2.1 (2016-02-25) +================== +- [Fix] does not work + +0.2.0 (2016-02-15) ================== - [Add] `decompression` option diff --git a/build.gradle b/build.gradle index 1b7e180..89e3677 100644 --- a/build.gradle +++ b/build.gradle @@ -15,7 +15,7 @@ configurations { provided } -version = "0.2.0" +version = "0.2.1" sourceCompatibility = 1.7 targetCompatibility = 1.7