From 6500722d4bda96ce9c90819d71687d706e8bd107 Mon Sep 17 00:00:00 2001 From: Roman Studenikin Date: Thu, 26 Sep 2019 11:12:57 +0200 Subject: [PATCH 1/2] RecoveryHelper to speed up recovery after restart This patch introduces a change in WAL and recovery process. The idea is to avoid expensive scan of all table files and maintain a recovery record in the WAL instead. This record is written in the beginning of WAL log and it is not surrounded with 'begin and 'end' markers. There are following situations possible: a. There is no recovery record, there are normal records in WAL b. There is no recovery record, no other records in WAL c. There is a recovery record, there are normal records in WAL d. There is a recovery record, no other records in WAL Since recovery record is writted in the beginning, then it contains the latest offset only in a case when there is nothing else in the log, or other records are invalid(temp files are deleted). So in cases a,c and d recovery process will pick the committed file from WAL with highest offset - either from recovery record or from normal records. In case b when WAL log is empty or doesn't exist - latest offset will be discovered through full recoursive folder scan. --- .../io/confluent/connect/hdfs/FileUtils.java | 11 +++ .../connect/hdfs/RecoveryHelper.java | 91 +++++++++++++++++++ .../connect/hdfs/TopicPartitionWriter.java | 10 ++ .../io/confluent/connect/hdfs/wal/FSWAL.java | 5 + .../connect/hdfs/HdfsSinkTaskTest.java | 80 ++++++++++++++-- 5 files changed, 187 insertions(+), 10 deletions(-) create mode 100644 src/main/java/io/confluent/connect/hdfs/RecoveryHelper.java diff --git a/src/main/java/io/confluent/connect/hdfs/FileUtils.java b/src/main/java/io/confluent/connect/hdfs/FileUtils.java index 184236562..20162de8b 100644 --- a/src/main/java/io/confluent/connect/hdfs/FileUtils.java +++ b/src/main/java/io/confluent/connect/hdfs/FileUtils.java @@ -140,6 +140,17 @@ public static FileStatus fileStatusWithMaxOffset( return fileStatusWithMaxOffset; } + public static TopicPartition extractTopicPartition(String filename) { + Matcher m = HdfsSinkConnectorConstants.COMMITTED_FILENAME_PATTERN.matcher(filename); + // NB: if statement has side effect of enabling group() call + if (!m.matches()) { + throw new IllegalArgumentException(filename + " does not match COMMITTED_FILENAME_PATTERN"); + } + String topic = m.group(HdfsSinkConnectorConstants.PATTERN_TOPIC_GROUP); + int partition = Integer.parseInt(m.group(HdfsSinkConnectorConstants.PATTERN_PARTITION_GROUP)); + return new TopicPartition(topic, partition); + } + /** * Obtain the offset of the last record that was written to the specified HDFS file. * @param filename the name of the HDFS file; may not be null diff --git a/src/main/java/io/confluent/connect/hdfs/RecoveryHelper.java b/src/main/java/io/confluent/connect/hdfs/RecoveryHelper.java new file mode 100644 index 000000000..edd03812c --- /dev/null +++ b/src/main/java/io/confluent/connect/hdfs/RecoveryHelper.java @@ -0,0 +1,91 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.connect.hdfs; + +import io.confluent.connect.hdfs.storage.HdfsStorage; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.fs.Path; +import org.apache.kafka.common.TopicPartition; + +public class RecoveryHelper { + + public static final String RECOVERY_RECORD_KEY = "latestFilename"; + + static RecoveryHelper instance = new RecoveryHelper(); + + public static RecoveryHelper getInstance() { + return instance; + } + + private final Map> files = new HashMap<>(); + + public List getCommittedFiles(TopicPartition tp) { + return files.get(tp); + } + + public void addFile(TopicPartition tp, String name) { + files.computeIfAbsent(tp, (x) -> new ArrayList<>()); + files.get(tp).add(name); + } + + public void addFile(String name) { + TopicPartition tp = FileUtils.extractTopicPartition(new Path(name).getName()); + addFile(tp, name); + } + + public static class RecoveryPoint { + public final TopicPartition tp; + public final long offset; + public final String filename; + + public RecoveryPoint(TopicPartition tp, long offset, String filename) { + this.tp = tp; + this.offset = offset; + this.filename = filename; + } + } + + public static RecoveryPoint getRecoveryPoint(TopicPartition tp, HdfsStorage storage) { + if (getInstance().getCommittedFiles(tp) != null) { + List files = getInstance().getCommittedFiles(tp); + files.sort(Comparator.comparing(a -> FileUtils.extractOffset(new Path(a).getName()))); + long maxOffset = -1; + String latestFilename = null; + + // go backward from the latest file until we find a file that exists. + for (int i = files.size() - 1; i >= 0; i--) { + String filename = files.get(i); + if (!storage.exists(filename)) { + continue; + } + long endOffset = FileUtils.extractOffset(new Path(filename).getName()); + if (maxOffset < endOffset) { + maxOffset = endOffset; + latestFilename = filename; + } + } + if (maxOffset > 0) { + return new RecoveryPoint(tp, maxOffset, latestFilename); + } + } + + return null; + } +} diff --git a/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java b/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java index 1a0be1e64..9c3cab9ca 100644 --- a/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java +++ b/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java @@ -52,6 +52,7 @@ import io.confluent.connect.hdfs.hive.HiveUtil; import io.confluent.connect.hdfs.partitioner.Partitioner; import io.confluent.connect.hdfs.storage.HdfsStorage; +import io.confluent.connect.hdfs.RecoveryHelper.RecoveryPoint; import io.confluent.connect.storage.StorageSinkConnectorConfig; import io.confluent.connect.storage.common.StorageCommonConfig; import io.confluent.connect.storage.hive.HiveConfig; @@ -569,6 +570,14 @@ private boolean shouldRotateAndMaybeUpdateTimers(SinkRecord currentRecord, long } private void readOffset() throws ConnectException { + RecoveryPoint recoveryPoint = RecoveryHelper.getRecoveryPoint(tp, storage); + if (recoveryPoint != null) { + offset = recoveryPoint.offset + 1; + // yes, we intentionally do not write begin marker here, so it is skipped in WAL.apply() + wal.append(RecoveryHelper.RECOVERY_RECORD_KEY, recoveryPoint.filename); + return; + } + String path = FileUtils.topicDirectory(url, topicsDir, tp.topic()); CommittedFileFilter filter = new TopicPartitionCommittedFileFilter(tp); FileStatus fileStatusWithMaxOffset = FileUtils.fileStatusWithMaxOffset( @@ -581,6 +590,7 @@ private void readOffset() throws ConnectException { fileStatusWithMaxOffset.getPath().getName()); // `offset` represents the next offset to read after the most recent commit offset = lastCommittedOffsetToHdfs + 1; + wal.append(RecoveryHelper.RECOVERY_RECORD_KEY, fileStatusWithMaxOffset.getPath().toString()); } } diff --git a/src/main/java/io/confluent/connect/hdfs/wal/FSWAL.java b/src/main/java/io/confluent/connect/hdfs/wal/FSWAL.java index cde9b1bb5..e7d79e7a6 100644 --- a/src/main/java/io/confluent/connect/hdfs/wal/FSWAL.java +++ b/src/main/java/io/confluent/connect/hdfs/wal/FSWAL.java @@ -15,6 +15,7 @@ package io.confluent.connect.hdfs.wal; +import io.confluent.connect.hdfs.RecoveryHelper; import org.apache.hadoop.fs.Path; import org.apache.hadoop.ipc.RemoteException; import org.apache.kafka.common.TopicPartition; @@ -120,6 +121,7 @@ public void apply() throws ConnectException { for (Map.Entry entry: entries.entrySet()) { String tempFile = entry.getKey().getName(); String committedFile = entry.getValue().getName(); + RecoveryHelper.getInstance().addFile(committedFile); if (!storage.exists(committedFile)) { storage.commit(tempFile, committedFile); } @@ -128,6 +130,9 @@ public void apply() throws ConnectException { WALEntry mapKey = new WALEntry(key.getName()); WALEntry mapValue = new WALEntry(value.getName()); entries.put(mapKey, mapValue); + if (value.getName().equals(RecoveryHelper.RECOVERY_RECORD_KEY)) { + RecoveryHelper.getInstance().addFile(value.getName()); + } } } } catch (IOException e) { diff --git a/src/test/java/io/confluent/connect/hdfs/HdfsSinkTaskTest.java b/src/test/java/io/confluent/connect/hdfs/HdfsSinkTaskTest.java index 43d770528..8fb952a97 100644 --- a/src/test/java/io/confluent/connect/hdfs/HdfsSinkTaskTest.java +++ b/src/test/java/io/confluent/connect/hdfs/HdfsSinkTaskTest.java @@ -25,13 +25,15 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import io.confluent.connect.avro.AvroData; import io.confluent.connect.hdfs.avro.AvroDataFileReader; -import io.confluent.connect.hdfs.avro.AvroFileReader; import io.confluent.connect.hdfs.storage.HdfsStorage; import io.confluent.connect.storage.StorageFactory; import io.confluent.connect.storage.common.StorageCommonConfig; @@ -176,6 +178,52 @@ public void testSinkTaskStartWithRecovery() throws Exception { task.stop(); } + @Test + public void testSinkTaskStartWithRecoveryHelper() throws Exception { + setUp(); + Map> tempfiles = new HashMap<>(); + + Map recoveryPoints = new HashMap<>(); + Map> committedFiles = new HashMap<>(); + // simulate deleted temp file + tempfiles.put(TOPIC_PARTITION, Collections.singletonList(FileUtils.tempFileName(url, topicsDir, DIRECTORY1, extension))); + String fileName1 = FileUtils.committedFileName(url, topicsDir, DIRECTORY1, TOPIC_PARTITION, 200, 300, + extension, ZERO_PAD_FMT); + String fileName2 = FileUtils.committedFileName(url, topicsDir, DIRECTORY1, TOPIC_PARTITION, 301, 400, + extension, ZERO_PAD_FMT); + fs.createNewFile(new Path(fileName1)); + fs.createNewFile(new Path(fileName2)); + + committedFiles.put(TOPIC_PARTITION, Collections.singletonList(FileUtils.committedFileName(url, topicsDir, DIRECTORY1, TOPIC_PARTITION, 401, 500, + extension, ZERO_PAD_FMT))); + + recoveryPoints.put(TOPIC_PARTITION, fileName1); + + fileName1 = FileUtils.committedFileName(url, topicsDir, DIRECTORY2, TOPIC_PARTITION2, 400, 500, + extension, ZERO_PAD_FMT); + fileName2 = FileUtils.committedFileName(url, topicsDir, DIRECTORY2, TOPIC_PARTITION2, 501, 800, + extension, ZERO_PAD_FMT); + fs.createNewFile(new Path(fileName1)); + fs.createNewFile(new Path(fileName2)); + + recoveryPoints.put(TOPIC_PARTITION2, fileName2); + + createWALs(tempfiles, committedFiles, recoveryPoints); + HdfsSinkTask task = new HdfsSinkTask(); + + task.initialize(context); + task.start(properties); + + Map offsets = context.offsets(); + assertEquals(2, offsets.size()); + assertTrue(offsets.containsKey(TOPIC_PARTITION)); + assertEquals(401, (long) offsets.get(TOPIC_PARTITION)); + assertTrue(offsets.containsKey(TOPIC_PARTITION2)); + assertEquals(801, (long) offsets.get(TOPIC_PARTITION2)); + + task.stop(); + } + @Test public void testSinkTaskPut() throws Exception { setUp(); @@ -277,8 +325,13 @@ private void createCommittedFiles() throws IOException { } private void createWALs(Map> tempfiles, - Map> committedFiles) throws Exception { - @SuppressWarnings("unchecked") + Map> committedFiles) throws Exception { + createWALs(tempfiles, committedFiles, Collections.emptyMap()); + } + private void createWALs(Map> tempfiles, + Map> committedFiles, + Map recoveryRecords) throws Exception { + @SuppressWarnings("unchecked") Class storageClass = (Class) connectorConfig.getClass(StorageCommonConfig.STORAGE_CLASS_CONFIG); HdfsStorage storage = StorageFactory.createStorage( @@ -288,15 +341,22 @@ private void createWALs(Map> tempfiles, url ); - for (TopicPartition tp: tempfiles.keySet()) { + Set tps = new HashSet<>(tempfiles.keySet()); + tps.addAll(recoveryRecords.keySet()); + for (TopicPartition tp: tps) { WAL wal = storage.wal(logsDir, tp); - List tempList = tempfiles.get(tp); - List committedList = committedFiles.get(tp); - wal.append(WAL.beginMarker, ""); - for (int i = 0; i < tempList.size(); ++i) { - wal.append(tempList.get(i), committedList.get(i)); + if (recoveryRecords.containsKey(tp)) { + wal.append(RecoveryHelper.RECOVERY_RECORD_KEY,recoveryRecords.get(tp)); + } + if (tempfiles.containsKey(tp)) { + List tempList = tempfiles.get(tp); + List committedList = committedFiles.get(tp); + wal.append(WAL.beginMarker, ""); + for (int i = 0; i < tempList.size(); ++i) { + wal.append(tempList.get(i), committedList.get(i)); + } + wal.append(WAL.endMarker, ""); } - wal.append(WAL.endMarker, ""); wal.close(); } } From 796129505aea000b5cc5ab1e056ce63975e0812a Mon Sep 17 00:00:00 2001 From: Roman Studenikin Date: Mon, 23 Dec 2019 16:45:02 +0100 Subject: [PATCH 2/2] Fix after code review --- .../confluent/connect/hdfs/RecoveryHelper.java | 13 ++++++++----- .../io/confluent/connect/hdfs/wal/FSWAL.java | 2 +- .../connect/hdfs/HdfsSinkTaskTest.java | 18 ++++++++++++++---- 3 files changed, 23 insertions(+), 10 deletions(-) diff --git a/src/main/java/io/confluent/connect/hdfs/RecoveryHelper.java b/src/main/java/io/confluent/connect/hdfs/RecoveryHelper.java index edd03812c..42b1a7c86 100644 --- a/src/main/java/io/confluent/connect/hdfs/RecoveryHelper.java +++ b/src/main/java/io/confluent/connect/hdfs/RecoveryHelper.java @@ -18,9 +18,8 @@ import io.confluent.connect.hdfs.storage.HdfsStorage; import java.util.ArrayList; import java.util.Comparator; -import java.util.HashMap; import java.util.List; -import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.fs.Path; import org.apache.kafka.common.TopicPartition; @@ -34,15 +33,19 @@ public static RecoveryHelper getInstance() { return instance; } - private final Map> files = new HashMap<>(); + private final ConcurrentHashMap> files = new ConcurrentHashMap<>(); public List getCommittedFiles(TopicPartition tp) { return files.get(tp); } public void addFile(TopicPartition tp, String name) { - files.computeIfAbsent(tp, (x) -> new ArrayList<>()); - files.get(tp).add(name); + List newList = new ArrayList<>(); + newList.add(name); + files.merge(tp, newList, (a, b) -> { + a.addAll(b); + return a; + }); } public void addFile(String name) { diff --git a/src/main/java/io/confluent/connect/hdfs/wal/FSWAL.java b/src/main/java/io/confluent/connect/hdfs/wal/FSWAL.java index e7d79e7a6..61943a281 100644 --- a/src/main/java/io/confluent/connect/hdfs/wal/FSWAL.java +++ b/src/main/java/io/confluent/connect/hdfs/wal/FSWAL.java @@ -130,7 +130,7 @@ public void apply() throws ConnectException { WALEntry mapKey = new WALEntry(key.getName()); WALEntry mapValue = new WALEntry(value.getName()); entries.put(mapKey, mapValue); - if (value.getName().equals(RecoveryHelper.RECOVERY_RECORD_KEY)) { + if (key.getName().equals(RecoveryHelper.RECOVERY_RECORD_KEY)) { RecoveryHelper.getInstance().addFile(value.getName()); } } diff --git a/src/test/java/io/confluent/connect/hdfs/HdfsSinkTaskTest.java b/src/test/java/io/confluent/connect/hdfs/HdfsSinkTaskTest.java index 8fb952a97..ae066ff0b 100644 --- a/src/test/java/io/confluent/connect/hdfs/HdfsSinkTaskTest.java +++ b/src/test/java/io/confluent/connect/hdfs/HdfsSinkTaskTest.java @@ -185,17 +185,27 @@ public void testSinkTaskStartWithRecoveryHelper() throws Exception { Map recoveryPoints = new HashMap<>(); Map> committedFiles = new HashMap<>(); - // simulate deleted temp file - tempfiles.put(TOPIC_PARTITION, Collections.singletonList(FileUtils.tempFileName(url, topicsDir, DIRECTORY1, extension))); + + tempfiles.put(TOPIC_PARTITION, new ArrayList<>()); + String fileName1 = FileUtils.committedFileName(url, topicsDir, DIRECTORY1, TOPIC_PARTITION, 200, 300, extension, ZERO_PAD_FMT); String fileName2 = FileUtils.committedFileName(url, topicsDir, DIRECTORY1, TOPIC_PARTITION, 301, 400, extension, ZERO_PAD_FMT); + + committedFiles.put(TOPIC_PARTITION, new ArrayList<>()); + + tempfiles.get(TOPIC_PARTITION).add(FileUtils.tempFileName(url, topicsDir, DIRECTORY1, extension)); fs.createNewFile(new Path(fileName1)); + committedFiles.get(TOPIC_PARTITION).add(fileName1); + tempfiles.get(TOPIC_PARTITION).add(FileUtils.tempFileName(url, topicsDir, DIRECTORY1, extension)); + committedFiles.get(TOPIC_PARTITION).add(fileName2); fs.createNewFile(new Path(fileName2)); - committedFiles.put(TOPIC_PARTITION, Collections.singletonList(FileUtils.committedFileName(url, topicsDir, DIRECTORY1, TOPIC_PARTITION, 401, 500, - extension, ZERO_PAD_FMT))); + // simulate deleted temp file + tempfiles.get(TOPIC_PARTITION).add(FileUtils.tempFileName(url, topicsDir, DIRECTORY1, extension)); + committedFiles.get(TOPIC_PARTITION) + .add(FileUtils.committedFileName(url, topicsDir, DIRECTORY1, TOPIC_PARTITION, 401, 500, extension, ZERO_PAD_FMT)); recoveryPoints.put(TOPIC_PARTITION, fileName1);