Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retries only attempted on some connectivity issues #29

Open
jakedorne opened this issue Oct 10, 2023 · 1 comment
Open

Retries only attempted on some connectivity issues #29

jakedorne opened this issue Oct 10, 2023 · 1 comment

Comments

@jakedorne
Copy link

Hey there,

We're seeing transient network issues cause some unexpected behavior with this connector. While I'm not sure of the exact network issues causing this, I can reproduce using a local docker compose and playing around the docker networks.

I have a setup where I've spun up a redis instance and kafka broker + connect and if I do the following:

docker stop redis
docker start redis

I see instant attempts to reconnect in the connector logs (as expected):

[2023-10-10 00:24:45,127] INFO Reconnecting, last destination was redis:6379 (com.redis.kafka.connect.shaded.io.lettuce.core.protocol.ConnectionWatchdog)
[2023-10-10 00:24:45,128] WARN Cannot reconnect to [redis:6379]: redis (com.redis.kafka.connect.shaded.io.lettuce.core.protocol.ConnectionWatchdog)
java.net.UnknownHostException: redis
	at java.base/java.net.InetAddress$CachedAddresses.get(InetAddress.java:797)
	at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1524)
	at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1382)
	at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1306)
	at java.base/java.net.InetAddress.getByName(InetAddress.java:1256)
	at com.redis.kafka.connect.shaded.io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:156)
	at com.redis.kafka.connect.shaded.io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:153)
	at java.base/java.security.AccessController.doPrivileged(Native Method)
	at com.redis.kafka.connect.shaded.io.netty.util.internal.SocketUtils.addressByName(SocketUtils.java:153)
	at com.redis.kafka.connect.shaded.io.netty.resolver.DefaultNameResolver.doResolve(DefaultNameResolver.java:41)
	at com.redis.kafka.connect.shaded.io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:61)
	at com.redis.kafka.connect.shaded.io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:53)
	at com.redis.kafka.connect.shaded.io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:55)
	at com.redis.kafka.connect.shaded.io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:31)
	at com.redis.kafka.connect.shaded.io.netty.resolver.AbstractAddressResolver.resolve(AbstractAddressResolver.java:106)
	at com.redis.kafka.connect.shaded.io.netty.bootstrap.Bootstrap.doResolveAndConnect0(Bootstrap.java:206)
	at com.redis.kafka.connect.shaded.io.netty.bootstrap.Bootstrap.access$000(Bootstrap.java:46)
	at com.redis.kafka.connect.shaded.io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:180)
	at com.redis.kafka.connect.shaded.io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:166)
	at com.redis.kafka.connect.shaded.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590)
	at com.redis.kafka.connect.shaded.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:557)
	at com.redis.kafka.connect.shaded.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:492)
	at com.redis.kafka.connect.shaded.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:636)
	at com.redis.kafka.connect.shaded.io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:625)
	at com.redis.kafka.connect.shaded.io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:105)
	at com.redis.kafka.connect.shaded.io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84)
	at com.redis.kafka.connect.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetSuccess(AbstractChannel.java:990)
	at com.redis.kafka.connect.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe.register0(AbstractChannel.java:516)
	at com.redis.kafka.connect.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe.access$200(AbstractChannel.java:429)
	at com.redis.kafka.connect.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe$1.run(AbstractChannel.java:486)
	at com.redis.kafka.connect.shaded.io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
	at com.redis.kafka.connect.shaded.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
	at com.redis.kafka.connect.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
	at com.redis.kafka.connect.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
	at com.redis.kafka.connect.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at com.redis.kafka.connect.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at com.redis.kafka.connect.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:829)

when redis is started again the connector will then reconnect and carry on as normal

[2023-10-10 00:25:25,028] INFO Reconnected to redis:6379 (com.redis.kafka.connect.shaded.io.lettuce.core.protocol.ReconnectionHandler)
[2023-10-10 00:25:22,876] INFO Wrote 0 records (com.redis.kafka.connect.sink.RedisSinkTask)

However, if I disconnect redis from the docker network I'm using for connectivity between the redis instance and connect, restart redis, and re-connect to the network:

docker network disconnect kafka-connect-local_redis redis
docker stop redis
docker start redis
docker network connect kafka-connect-local_redis redis

then the connector carries on as if nothing is wrong until the next time a poll returns records, at which point the connector fails:

[2023-10-10 20:12:56,941] WARN Could not write 1 records (com.redis.kafka.connect.sink.RedisSinkTask)
io.lettuce.core.RedisException: java.io.IOException: Connection reset by peer
	at io.lettuce.core.internal.Exceptions.bubble(Exceptions.java:83)
	at io.lettuce.core.internal.Exceptions.fromSynchronization(Exceptions.java:109)
	at io.lettuce.core.internal.Futures.awaitAll(Futures.java:226)
	at io.lettuce.core.LettuceFutures.awaitAll(LettuceFutures.java:59)
	at com.redis.spring.batch.RedisItemWriter.write(RedisItemWriter.java:74)
	at com.redis.kafka.connect.sink.RedisSinkTask.put(RedisSinkTask.java:304)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:588)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:235)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:201)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:256)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.IOException: Connection reset by peer
	at java.base/sun.nio.ch.FileDispatcherImpl.read0(Native Method)
	at java.base/sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
	at java.base/sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:276)
	at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:233)
	at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:223)
	at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:356)
	at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:259)
	at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
	at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:357)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	... 1 more
[2023-10-10 20:12:56,947] ERROR WorkerSinkTask{id=debug_redis_sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: java.io.IOException: Connection reset by peer (org.apache.kafka.connect.runtime.WorkerSinkTask)
io.lettuce.core.RedisException: java.io.IOException: Connection reset by peer
	at io.lettuce.core.internal.Exceptions.bubble(Exceptions.java:83)
	at io.lettuce.core.internal.Futures.awaitOrCancel(Futures.java:250)
	at io.lettuce.core.FutureSyncInvocationHandler.handleInvocation(FutureSyncInvocationHandler.java:75)
	at io.lettuce.core.internal.AbstractInvocationHandler.invoke(AbstractInvocationHandler.java:80)
	at com.sun.proxy.$Proxy52.mset(Unknown Source)
	at com.redis.kafka.connect.sink.RedisSinkTask.put(RedisSinkTask.java:335)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:588)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:235)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:201)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:256)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.IOException: Connection reset by peer
	at java.base/sun.nio.ch.FileDispatcherImpl.read0(Native Method)
	at java.base/sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
	at java.base/sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:276)
	at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:233)
	at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:223)
	at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:356)
	at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:259)
	at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
	at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:357)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	... 1 more

My hope would be that the connector behaves the same in both scenarios and attempts to reconnect before failing. I've had a look at the sink connector code and to be honest I'm not quite sure where a fix would belong for this (whether it should be a fix to this repo, lettuce, or spring-batch-redis.

Any help appreciated, cheers. 😃

@jruaux
Copy link
Collaborator

jruaux commented Oct 17, 2023

Hi, this looks like a lower-level, networking issue. It might be related to this one in Lettuce: redis/lettuce#1466

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants