Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RecoveryHelper to speed up recovery after restart #471

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/main/java/io/confluent/connect/hdfs/FileUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,17 @@ public static FileStatus fileStatusWithMaxOffset(
return fileStatusWithMaxOffset;
}

public static TopicPartition extractTopicPartition(String filename) {
Matcher m = HdfsSinkConnectorConstants.COMMITTED_FILENAME_PATTERN.matcher(filename);
// NB: if statement has side effect of enabling group() call
if (!m.matches()) {
throw new IllegalArgumentException(filename + " does not match COMMITTED_FILENAME_PATTERN");
}
String topic = m.group(HdfsSinkConnectorConstants.PATTERN_TOPIC_GROUP);
int partition = Integer.parseInt(m.group(HdfsSinkConnectorConstants.PATTERN_PARTITION_GROUP));
return new TopicPartition(topic, partition);
}

/**
* Obtain the offset of the last record that was written to the specified HDFS file.
* @param filename the name of the HDFS file; may not be null
Expand Down
94 changes: 94 additions & 0 deletions src/main/java/io/confluent/connect/hdfs/RecoveryHelper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.connect.hdfs;

import io.confluent.connect.hdfs.storage.HdfsStorage;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.common.TopicPartition;

public class RecoveryHelper {

public static final String RECOVERY_RECORD_KEY = "latestFilename";

static RecoveryHelper instance = new RecoveryHelper();

public static RecoveryHelper getInstance() {
return instance;
}

private final ConcurrentHashMap<TopicPartition, List<String>> files = new ConcurrentHashMap<>();

public List<String> getCommittedFiles(TopicPartition tp) {
return files.get(tp);
}

public void addFile(TopicPartition tp, String name) {
List<String> newList = new ArrayList<>();
newList.add(name);
files.merge(tp, newList, (a, b) -> {
a.addAll(b);
return a;
});
}

public void addFile(String name) {
TopicPartition tp = FileUtils.extractTopicPartition(new Path(name).getName());
addFile(tp, name);
}

public static class RecoveryPoint {
public final TopicPartition tp;
public final long offset;
public final String filename;

public RecoveryPoint(TopicPartition tp, long offset, String filename) {
this.tp = tp;
this.offset = offset;
this.filename = filename;
}
}

public static RecoveryPoint getRecoveryPoint(TopicPartition tp, HdfsStorage storage) {
if (getInstance().getCommittedFiles(tp) != null) {
List<String> files = getInstance().getCommittedFiles(tp);
files.sort(Comparator.comparing(a -> FileUtils.extractOffset(new Path(a).getName())));
long maxOffset = -1;
String latestFilename = null;

// go backward from the latest file until we find a file that exists.
for (int i = files.size() - 1; i >= 0; i--) {
String filename = files.get(i);
if (!storage.exists(filename)) {
continue;
}
long endOffset = FileUtils.extractOffset(new Path(filename).getName());
if (maxOffset < endOffset) {
maxOffset = endOffset;
latestFilename = filename;
}
}
if (maxOffset > 0) {
return new RecoveryPoint(tp, maxOffset, latestFilename);
}
}

return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import io.confluent.connect.hdfs.hive.HiveUtil;
import io.confluent.connect.hdfs.partitioner.Partitioner;
import io.confluent.connect.hdfs.storage.HdfsStorage;
import io.confluent.connect.hdfs.RecoveryHelper.RecoveryPoint;
import io.confluent.connect.storage.StorageSinkConnectorConfig;
import io.confluent.connect.storage.common.StorageCommonConfig;
import io.confluent.connect.storage.hive.HiveConfig;
Expand Down Expand Up @@ -569,6 +570,14 @@ private boolean shouldRotateAndMaybeUpdateTimers(SinkRecord currentRecord, long
}

private void readOffset() throws ConnectException {
RecoveryPoint recoveryPoint = RecoveryHelper.getRecoveryPoint(tp, storage);
if (recoveryPoint != null) {
offset = recoveryPoint.offset + 1;
// yes, we intentionally do not write begin marker here, so it is skipped in WAL.apply()
wal.append(RecoveryHelper.RECOVERY_RECORD_KEY, recoveryPoint.filename);
return;
}

String path = FileUtils.topicDirectory(url, topicsDir, tp.topic());
CommittedFileFilter filter = new TopicPartitionCommittedFileFilter(tp);
FileStatus fileStatusWithMaxOffset = FileUtils.fileStatusWithMaxOffset(
Expand All @@ -581,6 +590,7 @@ private void readOffset() throws ConnectException {
fileStatusWithMaxOffset.getPath().getName());
// `offset` represents the next offset to read after the most recent commit
offset = lastCommittedOffsetToHdfs + 1;
wal.append(RecoveryHelper.RECOVERY_RECORD_KEY, fileStatusWithMaxOffset.getPath().toString());
}
}

Expand Down
5 changes: 5 additions & 0 deletions src/main/java/io/confluent/connect/hdfs/wal/FSWAL.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package io.confluent.connect.hdfs.wal;

import io.confluent.connect.hdfs.RecoveryHelper;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.kafka.common.TopicPartition;
Expand Down Expand Up @@ -120,6 +121,7 @@ public void apply() throws ConnectException {
for (Map.Entry<WALEntry, WALEntry> entry: entries.entrySet()) {
String tempFile = entry.getKey().getName();
String committedFile = entry.getValue().getName();
RecoveryHelper.getInstance().addFile(committedFile);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am concerned that the lists in the map grow without bounds and eventually may cause OOM if the process is run long enough.

if (!storage.exists(committedFile)) {
storage.commit(tempFile, committedFile);
}
Expand All @@ -128,6 +130,9 @@ public void apply() throws ConnectException {
WALEntry mapKey = new WALEntry(key.getName());
WALEntry mapValue = new WALEntry(value.getName());
entries.put(mapKey, mapValue);
if (key.getName().equals(RecoveryHelper.RECOVERY_RECORD_KEY)) {
RecoveryHelper.getInstance().addFile(value.getName());
}
}
}
} catch (IOException e) {
Expand Down
90 changes: 80 additions & 10 deletions src/test/java/io/confluent/connect/hdfs/HdfsSinkTaskTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import io.confluent.connect.avro.AvroData;
import io.confluent.connect.hdfs.avro.AvroDataFileReader;
import io.confluent.connect.hdfs.avro.AvroFileReader;
import io.confluent.connect.hdfs.storage.HdfsStorage;
import io.confluent.connect.storage.StorageFactory;
import io.confluent.connect.storage.common.StorageCommonConfig;
Expand Down Expand Up @@ -176,6 +178,62 @@ public void testSinkTaskStartWithRecovery() throws Exception {
task.stop();
}

@Test
public void testSinkTaskStartWithRecoveryHelper() throws Exception {
setUp();
Map<TopicPartition, List<String>> tempfiles = new HashMap<>();

Map<TopicPartition, String> recoveryPoints = new HashMap<>();
Map<TopicPartition, List<String>> committedFiles = new HashMap<>();

tempfiles.put(TOPIC_PARTITION, new ArrayList<>());

String fileName1 = FileUtils.committedFileName(url, topicsDir, DIRECTORY1, TOPIC_PARTITION, 200, 300,
extension, ZERO_PAD_FMT);
String fileName2 = FileUtils.committedFileName(url, topicsDir, DIRECTORY1, TOPIC_PARTITION, 301, 400,
extension, ZERO_PAD_FMT);

committedFiles.put(TOPIC_PARTITION, new ArrayList<>());

tempfiles.get(TOPIC_PARTITION).add(FileUtils.tempFileName(url, topicsDir, DIRECTORY1, extension));
fs.createNewFile(new Path(fileName1));
committedFiles.get(TOPIC_PARTITION).add(fileName1);
tempfiles.get(TOPIC_PARTITION).add(FileUtils.tempFileName(url, topicsDir, DIRECTORY1, extension));
committedFiles.get(TOPIC_PARTITION).add(fileName2);
fs.createNewFile(new Path(fileName2));

// simulate deleted temp file
tempfiles.get(TOPIC_PARTITION).add(FileUtils.tempFileName(url, topicsDir, DIRECTORY1, extension));
committedFiles.get(TOPIC_PARTITION)
.add(FileUtils.committedFileName(url, topicsDir, DIRECTORY1, TOPIC_PARTITION, 401, 500, extension, ZERO_PAD_FMT));

recoveryPoints.put(TOPIC_PARTITION, fileName1);

fileName1 = FileUtils.committedFileName(url, topicsDir, DIRECTORY2, TOPIC_PARTITION2, 400, 500,
extension, ZERO_PAD_FMT);
fileName2 = FileUtils.committedFileName(url, topicsDir, DIRECTORY2, TOPIC_PARTITION2, 501, 800,
extension, ZERO_PAD_FMT);
fs.createNewFile(new Path(fileName1));
fs.createNewFile(new Path(fileName2));

recoveryPoints.put(TOPIC_PARTITION2, fileName2);

createWALs(tempfiles, committedFiles, recoveryPoints);
HdfsSinkTask task = new HdfsSinkTask();

task.initialize(context);
task.start(properties);

Map<TopicPartition, Long> offsets = context.offsets();
assertEquals(2, offsets.size());
assertTrue(offsets.containsKey(TOPIC_PARTITION));
assertEquals(401, (long) offsets.get(TOPIC_PARTITION));
assertTrue(offsets.containsKey(TOPIC_PARTITION2));
assertEquals(801, (long) offsets.get(TOPIC_PARTITION2));

task.stop();
}

@Test
public void testSinkTaskPut() throws Exception {
setUp();
Expand Down Expand Up @@ -277,8 +335,13 @@ private void createCommittedFiles() throws IOException {
}

private void createWALs(Map<TopicPartition, List<String>> tempfiles,
Map<TopicPartition, List<String>> committedFiles) throws Exception {
@SuppressWarnings("unchecked")
Map<TopicPartition, List<String>> committedFiles) throws Exception {
createWALs(tempfiles, committedFiles, Collections.emptyMap());
}
private void createWALs(Map<TopicPartition, List<String>> tempfiles,
Map<TopicPartition, List<String>> committedFiles,
Map<TopicPartition, String> recoveryRecords) throws Exception {
@SuppressWarnings("unchecked")
Class<? extends HdfsStorage> storageClass = (Class<? extends HdfsStorage>)
connectorConfig.getClass(StorageCommonConfig.STORAGE_CLASS_CONFIG);
HdfsStorage storage = StorageFactory.createStorage(
Expand All @@ -288,15 +351,22 @@ private void createWALs(Map<TopicPartition, List<String>> tempfiles,
url
);

for (TopicPartition tp: tempfiles.keySet()) {
Set<TopicPartition> tps = new HashSet<>(tempfiles.keySet());
tps.addAll(recoveryRecords.keySet());
for (TopicPartition tp: tps) {
WAL wal = storage.wal(logsDir, tp);
List<String> tempList = tempfiles.get(tp);
List<String> committedList = committedFiles.get(tp);
wal.append(WAL.beginMarker, "");
for (int i = 0; i < tempList.size(); ++i) {
wal.append(tempList.get(i), committedList.get(i));
if (recoveryRecords.containsKey(tp)) {
wal.append(RecoveryHelper.RECOVERY_RECORD_KEY,recoveryRecords.get(tp));
}
if (tempfiles.containsKey(tp)) {
List<String> tempList = tempfiles.get(tp);
List<String> committedList = committedFiles.get(tp);
wal.append(WAL.beginMarker, "");
for (int i = 0; i < tempList.size(); ++i) {
wal.append(tempList.get(i), committedList.get(i));
}
wal.append(WAL.endMarker, "");
}
wal.append(WAL.endMarker, "");
wal.close();
}
}
Expand Down