diff --git a/src/main/java/io/confluent/connect/hdfs/FileUtils.java b/src/main/java/io/confluent/connect/hdfs/FileUtils.java index 184236562..20162de8b 100644 --- a/src/main/java/io/confluent/connect/hdfs/FileUtils.java +++ b/src/main/java/io/confluent/connect/hdfs/FileUtils.java @@ -140,6 +140,17 @@ public static FileStatus fileStatusWithMaxOffset( return fileStatusWithMaxOffset; } + public static TopicPartition extractTopicPartition(String filename) { + Matcher m = HdfsSinkConnectorConstants.COMMITTED_FILENAME_PATTERN.matcher(filename); + // NB: if statement has side effect of enabling group() call + if (!m.matches()) { + throw new IllegalArgumentException(filename + " does not match COMMITTED_FILENAME_PATTERN"); + } + String topic = m.group(HdfsSinkConnectorConstants.PATTERN_TOPIC_GROUP); + int partition = Integer.parseInt(m.group(HdfsSinkConnectorConstants.PATTERN_PARTITION_GROUP)); + return new TopicPartition(topic, partition); + } + /** * Obtain the offset of the last record that was written to the specified HDFS file. * @param filename the name of the HDFS file; may not be null diff --git a/src/main/java/io/confluent/connect/hdfs/RecoveryHelper.java b/src/main/java/io/confluent/connect/hdfs/RecoveryHelper.java new file mode 100644 index 000000000..42b1a7c86 --- /dev/null +++ b/src/main/java/io/confluent/connect/hdfs/RecoveryHelper.java @@ -0,0 +1,94 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.connect.hdfs; + +import io.confluent.connect.hdfs.storage.HdfsStorage; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.hadoop.fs.Path; +import org.apache.kafka.common.TopicPartition; + +public class RecoveryHelper { + + public static final String RECOVERY_RECORD_KEY = "latestFilename"; + + static RecoveryHelper instance = new RecoveryHelper(); + + public static RecoveryHelper getInstance() { + return instance; + } + + private final ConcurrentHashMap> files = new ConcurrentHashMap<>(); + + public List getCommittedFiles(TopicPartition tp) { + return files.get(tp); + } + + public void addFile(TopicPartition tp, String name) { + List newList = new ArrayList<>(); + newList.add(name); + files.merge(tp, newList, (a, b) -> { + a.addAll(b); + return a; + }); + } + + public void addFile(String name) { + TopicPartition tp = FileUtils.extractTopicPartition(new Path(name).getName()); + addFile(tp, name); + } + + public static class RecoveryPoint { + public final TopicPartition tp; + public final long offset; + public final String filename; + + public RecoveryPoint(TopicPartition tp, long offset, String filename) { + this.tp = tp; + this.offset = offset; + this.filename = filename; + } + } + + public static RecoveryPoint getRecoveryPoint(TopicPartition tp, HdfsStorage storage) { + if (getInstance().getCommittedFiles(tp) != null) { + List files = getInstance().getCommittedFiles(tp); + files.sort(Comparator.comparing(a -> FileUtils.extractOffset(new Path(a).getName()))); + long maxOffset = -1; + String latestFilename = null; + + // go backward from the latest file until we find a file that exists. + for (int i = files.size() - 1; i >= 0; i--) { + String filename = files.get(i); + if (!storage.exists(filename)) { + continue; + } + long endOffset = FileUtils.extractOffset(new Path(filename).getName()); + if (maxOffset < endOffset) { + maxOffset = endOffset; + latestFilename = filename; + } + } + if (maxOffset > 0) { + return new RecoveryPoint(tp, maxOffset, latestFilename); + } + } + + return null; + } +} diff --git a/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java b/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java index 1a0be1e64..9c3cab9ca 100644 --- a/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java +++ b/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java @@ -52,6 +52,7 @@ import io.confluent.connect.hdfs.hive.HiveUtil; import io.confluent.connect.hdfs.partitioner.Partitioner; import io.confluent.connect.hdfs.storage.HdfsStorage; +import io.confluent.connect.hdfs.RecoveryHelper.RecoveryPoint; import io.confluent.connect.storage.StorageSinkConnectorConfig; import io.confluent.connect.storage.common.StorageCommonConfig; import io.confluent.connect.storage.hive.HiveConfig; @@ -569,6 +570,14 @@ private boolean shouldRotateAndMaybeUpdateTimers(SinkRecord currentRecord, long } private void readOffset() throws ConnectException { + RecoveryPoint recoveryPoint = RecoveryHelper.getRecoveryPoint(tp, storage); + if (recoveryPoint != null) { + offset = recoveryPoint.offset + 1; + // yes, we intentionally do not write begin marker here, so it is skipped in WAL.apply() + wal.append(RecoveryHelper.RECOVERY_RECORD_KEY, recoveryPoint.filename); + return; + } + String path = FileUtils.topicDirectory(url, topicsDir, tp.topic()); CommittedFileFilter filter = new TopicPartitionCommittedFileFilter(tp); FileStatus fileStatusWithMaxOffset = FileUtils.fileStatusWithMaxOffset( @@ -581,6 +590,7 @@ private void readOffset() throws ConnectException { fileStatusWithMaxOffset.getPath().getName()); // `offset` represents the next offset to read after the most recent commit offset = lastCommittedOffsetToHdfs + 1; + wal.append(RecoveryHelper.RECOVERY_RECORD_KEY, fileStatusWithMaxOffset.getPath().toString()); } } diff --git a/src/main/java/io/confluent/connect/hdfs/wal/FSWAL.java b/src/main/java/io/confluent/connect/hdfs/wal/FSWAL.java index cde9b1bb5..61943a281 100644 --- a/src/main/java/io/confluent/connect/hdfs/wal/FSWAL.java +++ b/src/main/java/io/confluent/connect/hdfs/wal/FSWAL.java @@ -15,6 +15,7 @@ package io.confluent.connect.hdfs.wal; +import io.confluent.connect.hdfs.RecoveryHelper; import org.apache.hadoop.fs.Path; import org.apache.hadoop.ipc.RemoteException; import org.apache.kafka.common.TopicPartition; @@ -120,6 +121,7 @@ public void apply() throws ConnectException { for (Map.Entry entry: entries.entrySet()) { String tempFile = entry.getKey().getName(); String committedFile = entry.getValue().getName(); + RecoveryHelper.getInstance().addFile(committedFile); if (!storage.exists(committedFile)) { storage.commit(tempFile, committedFile); } @@ -128,6 +130,9 @@ public void apply() throws ConnectException { WALEntry mapKey = new WALEntry(key.getName()); WALEntry mapValue = new WALEntry(value.getName()); entries.put(mapKey, mapValue); + if (key.getName().equals(RecoveryHelper.RECOVERY_RECORD_KEY)) { + RecoveryHelper.getInstance().addFile(value.getName()); + } } } } catch (IOException e) { diff --git a/src/test/java/io/confluent/connect/hdfs/HdfsSinkTaskTest.java b/src/test/java/io/confluent/connect/hdfs/HdfsSinkTaskTest.java index 43d770528..ae066ff0b 100644 --- a/src/test/java/io/confluent/connect/hdfs/HdfsSinkTaskTest.java +++ b/src/test/java/io/confluent/connect/hdfs/HdfsSinkTaskTest.java @@ -25,13 +25,15 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import io.confluent.connect.avro.AvroData; import io.confluent.connect.hdfs.avro.AvroDataFileReader; -import io.confluent.connect.hdfs.avro.AvroFileReader; import io.confluent.connect.hdfs.storage.HdfsStorage; import io.confluent.connect.storage.StorageFactory; import io.confluent.connect.storage.common.StorageCommonConfig; @@ -176,6 +178,62 @@ public void testSinkTaskStartWithRecovery() throws Exception { task.stop(); } + @Test + public void testSinkTaskStartWithRecoveryHelper() throws Exception { + setUp(); + Map> tempfiles = new HashMap<>(); + + Map recoveryPoints = new HashMap<>(); + Map> committedFiles = new HashMap<>(); + + tempfiles.put(TOPIC_PARTITION, new ArrayList<>()); + + String fileName1 = FileUtils.committedFileName(url, topicsDir, DIRECTORY1, TOPIC_PARTITION, 200, 300, + extension, ZERO_PAD_FMT); + String fileName2 = FileUtils.committedFileName(url, topicsDir, DIRECTORY1, TOPIC_PARTITION, 301, 400, + extension, ZERO_PAD_FMT); + + committedFiles.put(TOPIC_PARTITION, new ArrayList<>()); + + tempfiles.get(TOPIC_PARTITION).add(FileUtils.tempFileName(url, topicsDir, DIRECTORY1, extension)); + fs.createNewFile(new Path(fileName1)); + committedFiles.get(TOPIC_PARTITION).add(fileName1); + tempfiles.get(TOPIC_PARTITION).add(FileUtils.tempFileName(url, topicsDir, DIRECTORY1, extension)); + committedFiles.get(TOPIC_PARTITION).add(fileName2); + fs.createNewFile(new Path(fileName2)); + + // simulate deleted temp file + tempfiles.get(TOPIC_PARTITION).add(FileUtils.tempFileName(url, topicsDir, DIRECTORY1, extension)); + committedFiles.get(TOPIC_PARTITION) + .add(FileUtils.committedFileName(url, topicsDir, DIRECTORY1, TOPIC_PARTITION, 401, 500, extension, ZERO_PAD_FMT)); + + recoveryPoints.put(TOPIC_PARTITION, fileName1); + + fileName1 = FileUtils.committedFileName(url, topicsDir, DIRECTORY2, TOPIC_PARTITION2, 400, 500, + extension, ZERO_PAD_FMT); + fileName2 = FileUtils.committedFileName(url, topicsDir, DIRECTORY2, TOPIC_PARTITION2, 501, 800, + extension, ZERO_PAD_FMT); + fs.createNewFile(new Path(fileName1)); + fs.createNewFile(new Path(fileName2)); + + recoveryPoints.put(TOPIC_PARTITION2, fileName2); + + createWALs(tempfiles, committedFiles, recoveryPoints); + HdfsSinkTask task = new HdfsSinkTask(); + + task.initialize(context); + task.start(properties); + + Map offsets = context.offsets(); + assertEquals(2, offsets.size()); + assertTrue(offsets.containsKey(TOPIC_PARTITION)); + assertEquals(401, (long) offsets.get(TOPIC_PARTITION)); + assertTrue(offsets.containsKey(TOPIC_PARTITION2)); + assertEquals(801, (long) offsets.get(TOPIC_PARTITION2)); + + task.stop(); + } + @Test public void testSinkTaskPut() throws Exception { setUp(); @@ -277,8 +335,13 @@ private void createCommittedFiles() throws IOException { } private void createWALs(Map> tempfiles, - Map> committedFiles) throws Exception { - @SuppressWarnings("unchecked") + Map> committedFiles) throws Exception { + createWALs(tempfiles, committedFiles, Collections.emptyMap()); + } + private void createWALs(Map> tempfiles, + Map> committedFiles, + Map recoveryRecords) throws Exception { + @SuppressWarnings("unchecked") Class storageClass = (Class) connectorConfig.getClass(StorageCommonConfig.STORAGE_CLASS_CONFIG); HdfsStorage storage = StorageFactory.createStorage( @@ -288,15 +351,22 @@ private void createWALs(Map> tempfiles, url ); - for (TopicPartition tp: tempfiles.keySet()) { + Set tps = new HashSet<>(tempfiles.keySet()); + tps.addAll(recoveryRecords.keySet()); + for (TopicPartition tp: tps) { WAL wal = storage.wal(logsDir, tp); - List tempList = tempfiles.get(tp); - List committedList = committedFiles.get(tp); - wal.append(WAL.beginMarker, ""); - for (int i = 0; i < tempList.size(); ++i) { - wal.append(tempList.get(i), committedList.get(i)); + if (recoveryRecords.containsKey(tp)) { + wal.append(RecoveryHelper.RECOVERY_RECORD_KEY,recoveryRecords.get(tp)); + } + if (tempfiles.containsKey(tp)) { + List tempList = tempfiles.get(tp); + List committedList = committedFiles.get(tp); + wal.append(WAL.beginMarker, ""); + for (int i = 0; i < tempList.size(); ++i) { + wal.append(tempList.get(i), committedList.get(i)); + } + wal.append(WAL.endMarker, ""); } - wal.append(WAL.endMarker, ""); wal.close(); } }