Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: adapt source task shutdown logic to work with newer versions of … #334

Merged
merged 1 commit into from
Jun 7, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 65 additions & 61 deletions src/main/java/io/aiven/connect/jdbc/source/JdbcSourceTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,13 @@ public class JdbcSourceTask extends SourceTask {
// Visible for testing
public static final int MAX_QUERY_SLEEP_MS = 100;

private Time time;
private final Time time;
private JdbcSourceTaskConfig config;
private DatabaseDialect dialect;
private CachedConnectionProvider cachedConnectionProvider;
private PriorityQueue<TableQuerier> tableQueue = new PriorityQueue<TableQuerier>();
private final PriorityQueue<TableQuerier> tableQueue = new PriorityQueue<TableQuerier>();
private final AtomicBoolean running = new AtomicBoolean(false);
private final Object pollLock = new Object();

public JdbcSourceTask() {
this.time = Time.SYSTEM;
Expand Down Expand Up @@ -269,8 +270,14 @@ private List<Map<String, String>> possibleTablePartitions(final String table) {
public void stop() throws ConnectException {
log.info("Stopping JDBC source task");
running.set(false);
// All resources are closed at the end of 'poll()' when no longer running or
// if there is an error
// Wait for any in-progress polls to stop before closing resources
// On older versions of Kafka Connect, SourceTask::stop and SourceTask::poll may
// be called concurrently on different threads
// On more recent versions, SourceTask::stop is always called after the last invocation
// of SourceTask::poll
synchronized (pollLock) {
closeResources();
}
}

protected void closeResources() {
Expand All @@ -297,72 +304,69 @@ protected void closeResources() {

@Override
public List<SourceRecord> poll() throws InterruptedException {
log.trace("Polling for new data");

while (running.get()) {
final TableQuerier querier = tableQueue.peek();

if (!querier.querying()) {
// If not in the middle of an update, wait for next update time
final long nextUpdate = querier.getLastUpdate()
+ config.getInt(JdbcSourceTaskConfig.POLL_INTERVAL_MS_CONFIG);
final long untilNext = nextUpdate - time.milliseconds();
final long sleepMs = Math.min(untilNext, MAX_QUERY_SLEEP_MS);
if (sleepMs > 0) {
log.trace("Waiting {} ms to poll {} next ({} ms total left to wait)",
sleepMs, querier.toString(), untilNext);
time.sleep(sleepMs);
// Return control to the Connect runtime periodically
// See https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/source/SourceTask.html#poll():
// "If no data is currently available, this method should block but return control to the caller
// regularly (by returning null)"
return null;
synchronized (pollLock) {
log.trace("Polling for new data");

while (running.get()) {
final TableQuerier querier = tableQueue.peek();

if (!querier.querying()) {
// If not in the middle of an update, wait for next update time
final long nextUpdate = querier.getLastUpdate()
+ config.getInt(JdbcSourceTaskConfig.POLL_INTERVAL_MS_CONFIG);
final long untilNext = nextUpdate - time.milliseconds();
final long sleepMs = Math.min(untilNext, MAX_QUERY_SLEEP_MS);
if (sleepMs > 0) {
log.trace("Waiting {} ms to poll {} next ({} ms total left to wait)",
sleepMs, querier.toString(), untilNext);
time.sleep(sleepMs);
// Return control to the Connect runtime periodically
// See https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/source/SourceTask.html#poll():
// "If no data is currently available, this method should block but return control to the caller
// regularly (by returning null)"
return null;
}
}
}

final List<SourceRecord> results = new ArrayList<>();
try {
log.debug("Checking for next block of results from {}", querier.toString());
querier.maybeStartQuery(cachedConnectionProvider.getConnection());
final List<SourceRecord> results = new ArrayList<>();
try {
log.debug("Checking for next block of results from {}", querier.toString());
querier.maybeStartQuery(cachedConnectionProvider.getConnection());

final int batchMaxRows = config.getInt(JdbcSourceTaskConfig.BATCH_MAX_ROWS_CONFIG);
boolean hadNext = true;
while (results.size() < batchMaxRows && (hadNext = querier.next())) {
results.add(querier.extractRecord());
}
final int batchMaxRows = config.getInt(JdbcSourceTaskConfig.BATCH_MAX_ROWS_CONFIG);
boolean hadNext = true;
while (results.size() < batchMaxRows && (hadNext = querier.next())) {
results.add(querier.extractRecord());
}

if (!hadNext) {
// If we finished processing the results from the current query, we can reset and send
// the querier to the tail of the queue
resetAndRequeueHead(querier);
}
if (!hadNext) {
// If we finished processing the results from the current query, we can reset and send
// the querier to the tail of the queue
resetAndRequeueHead(querier);
}

if (results.isEmpty()) {
log.trace("No updates for {}", querier.toString());
continue;
}
if (results.isEmpty()) {
log.trace("No updates for {}", querier.toString());
continue;
}

log.debug("Returning {} records for {}", results.size(), querier.toString());
return results;
} catch (final SQLException sqle) {
log.error("Failed to run query for table {}: {}", querier.toString(), sqle);
resetAndRequeueHead(querier);
return null;
} catch (final Throwable t) {
resetAndRequeueHead(querier);
// This task has failed, so close any resources (may be reopened if needed) before throwing
closeResources();
throw t;
log.debug("Returning {} records for {}", results.size(), querier.toString());
return results;
} catch (final SQLException sqle) {
log.error("Failed to run query for table {}: {}", querier.toString(), sqle);
resetAndRequeueHead(querier);
return null;
} catch (final Throwable t) {
resetAndRequeueHead(querier);
// This task has failed, so close any resources (may be reopened if needed) before throwing
closeResources();
throw t;
}
}
}

// Only in case of shutdown
final TableQuerier querier = tableQueue.peek();
if (querier != null) {
resetAndRequeueHead(querier);
// Only in case of shutdown
return null;
}
closeResources();
return null;
}

private void resetAndRequeueHead(final TableQuerier expectedHead) {
Expand Down
Loading