You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
We're seeing transient network issues cause some unexpected behavior with this connector. While I'm not sure of the exact network issues causing this, I can reproduce using a local docker compose and playing around the docker networks.
I have a setup where I've spun up a redis instance and kafka broker + connect and if I do the following:
docker stop redis
docker start redis
I see instant attempts to reconnect in the connector logs (as expected):
[2023-10-10 00:24:45,127] INFO Reconnecting, last destination was redis:6379 (com.redis.kafka.connect.shaded.io.lettuce.core.protocol.ConnectionWatchdog)
[2023-10-10 00:24:45,128] WARN Cannot reconnect to [redis:6379]: redis (com.redis.kafka.connect.shaded.io.lettuce.core.protocol.ConnectionWatchdog)
java.net.UnknownHostException: redis
at java.base/java.net.InetAddress$CachedAddresses.get(InetAddress.java:797)
at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1524)
at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1382)
at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1306)
at java.base/java.net.InetAddress.getByName(InetAddress.java:1256)
at com.redis.kafka.connect.shaded.io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:156)
at com.redis.kafka.connect.shaded.io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:153)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at com.redis.kafka.connect.shaded.io.netty.util.internal.SocketUtils.addressByName(SocketUtils.java:153)
at com.redis.kafka.connect.shaded.io.netty.resolver.DefaultNameResolver.doResolve(DefaultNameResolver.java:41)
at com.redis.kafka.connect.shaded.io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:61)
at com.redis.kafka.connect.shaded.io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:53)
at com.redis.kafka.connect.shaded.io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:55)
at com.redis.kafka.connect.shaded.io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:31)
at com.redis.kafka.connect.shaded.io.netty.resolver.AbstractAddressResolver.resolve(AbstractAddressResolver.java:106)
at com.redis.kafka.connect.shaded.io.netty.bootstrap.Bootstrap.doResolveAndConnect0(Bootstrap.java:206)
at com.redis.kafka.connect.shaded.io.netty.bootstrap.Bootstrap.access$000(Bootstrap.java:46)
at com.redis.kafka.connect.shaded.io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:180)
at com.redis.kafka.connect.shaded.io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:166)
at com.redis.kafka.connect.shaded.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590)
at com.redis.kafka.connect.shaded.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:557)
at com.redis.kafka.connect.shaded.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:492)
at com.redis.kafka.connect.shaded.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:636)
at com.redis.kafka.connect.shaded.io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:625)
at com.redis.kafka.connect.shaded.io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:105)
at com.redis.kafka.connect.shaded.io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84)
at com.redis.kafka.connect.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetSuccess(AbstractChannel.java:990)
at com.redis.kafka.connect.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe.register0(AbstractChannel.java:516)
at com.redis.kafka.connect.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe.access$200(AbstractChannel.java:429)
at com.redis.kafka.connect.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe$1.run(AbstractChannel.java:486)
at com.redis.kafka.connect.shaded.io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
at com.redis.kafka.connect.shaded.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
at com.redis.kafka.connect.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
at com.redis.kafka.connect.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
at com.redis.kafka.connect.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at com.redis.kafka.connect.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at com.redis.kafka.connect.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:829)
when redis is started again the connector will then reconnect and carry on as normal
[2023-10-10 00:25:25,028] INFO Reconnected to redis:6379 (com.redis.kafka.connect.shaded.io.lettuce.core.protocol.ReconnectionHandler)
[2023-10-10 00:25:22,876] INFO Wrote 0 records (com.redis.kafka.connect.sink.RedisSinkTask)
However, if I disconnect redis from the docker network I'm using for connectivity between the redis instance and connect, restart redis, and re-connect to the network:
then the connector carries on as if nothing is wrong until the next time a poll returns records, at which point the connector fails:
[2023-10-10 20:12:56,941] WARN Could not write 1 records (com.redis.kafka.connect.sink.RedisSinkTask)
io.lettuce.core.RedisException: java.io.IOException: Connection reset by peer
at io.lettuce.core.internal.Exceptions.bubble(Exceptions.java:83)
at io.lettuce.core.internal.Exceptions.fromSynchronization(Exceptions.java:109)
at io.lettuce.core.internal.Futures.awaitAll(Futures.java:226)
at io.lettuce.core.LettuceFutures.awaitAll(LettuceFutures.java:59)
at com.redis.spring.batch.RedisItemWriter.write(RedisItemWriter.java:74)
at com.redis.kafka.connect.sink.RedisSinkTask.put(RedisSinkTask.java:304)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:588)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:235)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:256)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.IOException: Connection reset by peer
at java.base/sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at java.base/sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at java.base/sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:276)
at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:233)
at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:223)
at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:356)
at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:259)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:357)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
... 1 more
[2023-10-10 20:12:56,947] ERROR WorkerSinkTask{id=debug_redis_sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: java.io.IOException: Connection reset by peer (org.apache.kafka.connect.runtime.WorkerSinkTask)
io.lettuce.core.RedisException: java.io.IOException: Connection reset by peer
at io.lettuce.core.internal.Exceptions.bubble(Exceptions.java:83)
at io.lettuce.core.internal.Futures.awaitOrCancel(Futures.java:250)
at io.lettuce.core.FutureSyncInvocationHandler.handleInvocation(FutureSyncInvocationHandler.java:75)
at io.lettuce.core.internal.AbstractInvocationHandler.invoke(AbstractInvocationHandler.java:80)
at com.sun.proxy.$Proxy52.mset(Unknown Source)
at com.redis.kafka.connect.sink.RedisSinkTask.put(RedisSinkTask.java:335)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:588)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:235)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:256)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.IOException: Connection reset by peer
at java.base/sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at java.base/sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at java.base/sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:276)
at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:233)
at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:223)
at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:356)
at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:259)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:357)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
... 1 more
My hope would be that the connector behaves the same in both scenarios and attempts to reconnect before failing. I've had a look at the sink connector code and to be honest I'm not quite sure where a fix would belong for this (whether it should be a fix to this repo, lettuce, or spring-batch-redis.
Any help appreciated, cheers. 😃
The text was updated successfully, but these errors were encountered:
Hey there,
We're seeing transient network issues cause some unexpected behavior with this connector. While I'm not sure of the exact network issues causing this, I can reproduce using a local docker compose and playing around the docker networks.
I have a setup where I've spun up a redis instance and kafka broker + connect and if I do the following:
I see instant attempts to reconnect in the connector logs (as expected):
when redis is started again the connector will then reconnect and carry on as normal
However, if I disconnect redis from the docker network I'm using for connectivity between the redis instance and connect, restart redis, and re-connect to the network:
then the connector carries on as if nothing is wrong until the next time a poll returns records, at which point the connector fails:
My hope would be that the connector behaves the same in both scenarios and attempts to reconnect before failing. I've had a look at the sink connector code and to be honest I'm not quite sure where a fix would belong for this (whether it should be a fix to this repo, lettuce, or spring-batch-redis.
Any help appreciated, cheers. 😃
The text was updated successfully, but these errors were encountered: