From 6291ef501adc69727fb735902e1c088f305a3b76 Mon Sep 17 00:00:00 2001 From: jxsl13 Date: Wed, 15 Nov 2023 15:54:48 +0100 Subject: [PATCH] [WIP] refactor pause and resume for handler & batch handler --- DEBUG.md | 453 ++++++++++++++++++++++++++ Makefile | 5 +- amqpx.go | 24 +- amqpx_test.go | 446 ++++++++++++++----------- helpers_test.go | 4 +- pool/connection.go | 26 -- pool/connection_options.go | 12 - pool/connection_pool.go | 9 +- pool/connection_pool_options.go | 11 - pool/connection_pool_test.go | 4 +- pool/connection_test.go | 12 +- pool/helpers_context.go | 256 ++++++++++++++- pool/pool.go | 2 - pool/pool_options.go | 8 - pool/pool_test.go | 14 +- pool/publisher_test.go | 2 +- pool/session_pool_test.go | 2 +- pool/session_test.go | 9 +- pool/subscriber.go | 249 +++++++------- pool/subscriber_batch_handler.go | 193 +++-------- pool/subscriber_batch_handler_view.go | 12 +- pool/subscriber_handler.go | 187 +++-------- pool/subscriber_handler_view.go | 11 +- pool/subscriber_options.go | 13 + pool/subscriber_test.go | 4 +- 25 files changed, 1250 insertions(+), 718 deletions(-) create mode 100644 DEBUG.md diff --git a/DEBUG.md b/DEBUG.md new file mode 100644 index 0000000..89d8e25 --- /dev/null +++ b/DEBUG.md @@ -0,0 +1,453 @@ +=== RUN TestHandlerPauseAndResume + amqpx_test.go:430: + Error Trace: /home/behm015/Development/amqpx/amqpx_test.go:430 + /home/behm015/Development/amqpx/pool/subscriber.go:287 + /home/behm015/Development/amqpx/pool/subscriber.go:222 + /usr/local/go/src/runtime/asm_amd64.s:1650 + Error: Not equal: + expected: false + actual : true + Test: TestHandlerPauseAndResume + Messages: expected active to be false + + + +panic: test timed out after 10m0s +running tests: + TestBatchHandlerPauseAndResume (8m49s) + +goroutine 316 [running]: +testing.(*M).startAlarm.func1() + /usr/local/go/src/testing/testing.go:2259 +0x1fc +created by time.goFunc + /usr/local/go/src/time/sleep.go:176 +0x45 + +goroutine 1 [chan receive, 8 minutes]: +testing.(*T).Run(0xc000082ea0, {0x89c94f, 0x1e}, 0x8c4010) + /usr/local/go/src/testing/testing.go:1649 +0x856 +testing.runTests.func1(0x0?) + /usr/local/go/src/testing/testing.go:2054 +0x85 +testing.tRunner(0xc000082ea0, 0xc0000f9908) + /usr/local/go/src/testing/testing.go:1595 +0x239 +testing.runTests(0xc00009abe0?, {0xb33520, 0x9, 0x9}, {0x4a8459?, 0x4a9c31?, 0xb397c0?}) + /usr/local/go/src/testing/testing.go:2052 +0x897 +testing.(*M).Run(0xc00009abe0) + /usr/local/go/src/testing/testing.go:1925 +0xb58 +go.uber.org/goleak.VerifyTestMain({0x929a20, 0xc00009abe0}, {0xc0000f9e18, 0x3, 0x3}) + /home/behm015/go/pkg/mod/go.uber.org/goleak@v1.3.0/testmain.go:53 +0x65 +github.com/jxsl13/amqpx_test.TestMain(0xfdfb0802185865db?) + /home/behm015/Development/amqpx/amqpx_test.go:24 +0x2e9 +main.main() + _testmain.go:67 +0x308 + +goroutine 170 [semacquire, 7 minutes]: +sync.runtime_Semacquire(0xc0001287e8?) + /usr/local/go/src/runtime/sema.go:62 +0x25 +sync.(*WaitGroup).Wait(0xc0001287e0) + /usr/local/go/src/sync/waitgroup.go:116 +0xa5 +github.com/jxsl13/amqpx/pool.(*Subscriber).Close(0xc000128780) + /home/behm015/Development/amqpx/pool/subscriber.go:35 +0x12a +github.com/jxsl13/amqpx.(*AMQPX).Close.(*AMQPX).close.func1() + /home/behm015/Development/amqpx/amqpx.go:236 +0x96 +sync.(*Once).doSlow(0xb398d4, 0xc0000ad888) + /usr/local/go/src/sync/once.go:74 +0xf1 +sync.(*Once).Do(0xb398d4, 0xc0000ad878?) + /usr/local/go/src/sync/once.go:65 +0x45 +github.com/jxsl13/amqpx.(*AMQPX).close(...) + /home/behm015/Development/amqpx/amqpx.go:233 +github.com/jxsl13/amqpx.(*AMQPX).Close(0xb39840) + /home/behm015/Development/amqpx/amqpx.go:229 +0xf2 +github.com/jxsl13/amqpx.Close(...) + /home/behm015/Development/amqpx/amqpx.go:351 +github.com/jxsl13/amqpx_test.testBatchHandlerPauseAndResume(0xc00029a4e0) + /home/behm015/Development/amqpx/amqpx_test.go:751 +0x1331 +github.com/jxsl13/amqpx_test.TestBatchHandlerPauseAndResume(0x0?) + /home/behm015/Development/amqpx/amqpx_test.go:539 +0x31 +testing.tRunner(0xc00029a4e0, 0x8c4010) + /usr/local/go/src/testing/testing.go:1595 +0x239 +created by testing.(*T).Run in goroutine 1 + /usr/local/go/src/testing/testing.go:1648 +0x82b + +goroutine 34 [syscall, 9 minutes]: +os/signal.signal_recv() + /usr/local/go/src/runtime/sigqueue.go:152 +0x29 +os/signal.loop() + /usr/local/go/src/os/signal/signal_unix.go:23 +0x1d +created by os/signal.Notify.func1.1 in goroutine 19 + /usr/local/go/src/os/signal/signal.go:151 +0x47 + +goroutine 33 [select]: +github.com/rabbitmq/amqp091-go.(*Connection).heartbeater(0xc00017e6e0, 0x1bf08eb00, 0xc0002d82a0) + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:761 +0x26d +created by github.com/rabbitmq/amqp091-go.(*Connection).openTune in goroutine 19 + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:1016 +0xb8c + +goroutine 291 [IO wait]: +internal/poll.runtime_pollWait(0x7f8b9ce04e68, 0x72) + /usr/local/go/src/runtime/netpoll.go:343 +0x85 +internal/poll.(*pollDesc).wait(0xc00014a4a0, 0xc000495000?, 0x0) + /usr/local/go/src/internal/poll/fd_poll_runtime.go:84 +0xb1 +internal/poll.(*pollDesc).waitRead(...) + /usr/local/go/src/internal/poll/fd_poll_runtime.go:89 +internal/poll.(*FD).Read(0xc00014a480, {0xc000495000, 0x1000, 0x1000}) + /usr/local/go/src/internal/poll/fd_unix.go:164 +0x3e5 +net.(*netFD).Read(0xc00014a480, {0xc000495000, 0x1000, 0x1000}) + /usr/local/go/src/net/fd_posix.go:55 +0x4b +net.(*conn).Read(0xc0002da170, {0xc000495000, 0x1000, 0x1000}) + /usr/local/go/src/net/net.go:179 +0xad +bufio.(*Reader).Read(0xc0002d95c0, {0xc00023a159, 0x7, 0x7}) + /usr/local/go/src/bufio/bufio.go:244 +0x4be +io.ReadAtLeast({0x929c40, 0xc0002d95c0}, {0xc00023a159, 0x7, 0x7}, 0x7) + /usr/local/go/src/io/io.go:335 +0xd0 +io.ReadFull(...) + /usr/local/go/src/io/io.go:354 +github.com/rabbitmq/amqp091-go.(*reader).ReadFrame(0xc000165f18) + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/read.go:49 +0x98 +github.com/rabbitmq/amqp091-go.(*Connection).reader(0xc0000566e0, {0x929fa0?, 0xc0002da170}) + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:726 +0x2ab +created by github.com/rabbitmq/amqp091-go.Open in goroutine 170 + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:271 +0x67a + +goroutine 41 [IO wait]: +internal/poll.runtime_pollWait(0x7f8b9ce04d70, 0x72) + /usr/local/go/src/runtime/netpoll.go:343 +0x85 +internal/poll.(*pollDesc).wait(0xc0001283a0, 0xc00029f000?, 0x0) + /usr/local/go/src/internal/poll/fd_poll_runtime.go:84 +0xb1 +internal/poll.(*pollDesc).waitRead(...) + /usr/local/go/src/internal/poll/fd_poll_runtime.go:89 +internal/poll.(*FD).Read(0xc000128380, {0xc00029f000, 0x1000, 0x1000}) + /usr/local/go/src/internal/poll/fd_unix.go:164 +0x3e5 +net.(*netFD).Read(0xc000128380, {0xc00029f000, 0x1000, 0x1000}) + /usr/local/go/src/net/fd_posix.go:55 +0x4b +net.(*conn).Read(0xc000158090, {0xc00029f000, 0x1000, 0x1000}) + /usr/local/go/src/net/net.go:179 +0xad +bufio.(*Reader).Read(0xc00014c780, {0xc00013c259, 0x7, 0x7}) + /usr/local/go/src/bufio/bufio.go:244 +0x4be +io.ReadAtLeast({0x929c40, 0xc00014c780}, {0xc00013c259, 0x7, 0x7}, 0x7) + /usr/local/go/src/io/io.go:335 +0xd0 +io.ReadFull(...) + /usr/local/go/src/io/io.go:354 +github.com/rabbitmq/amqp091-go.(*reader).ReadFrame(0xc0000a9f18) + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/read.go:49 +0x98 +github.com/rabbitmq/amqp091-go.(*Connection).reader(0xc00017e2c0, {0x929fa0?, 0xc000158090}) + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:726 +0x2ab +created by github.com/rabbitmq/amqp091-go.Open in goroutine 19 + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:271 +0x67a + +goroutine 21 [select]: +github.com/rabbitmq/amqp091-go.(*Connection).heartbeater(0xc00017e2c0, 0x1bf08eb00, 0xc00008e660) + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:761 +0x26d +created by github.com/rabbitmq/amqp091-go.(*Connection).openTune in goroutine 19 + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:1016 +0xb8c + +goroutine 58 [IO wait]: +internal/poll.runtime_pollWait(0x7f8b9ce04c78, 0x72) + /usr/local/go/src/runtime/netpoll.go:343 +0x85 +internal/poll.(*pollDesc).wait(0xc00034c120, 0xc000365000?, 0x0) + /usr/local/go/src/internal/poll/fd_poll_runtime.go:84 +0xb1 +internal/poll.(*pollDesc).waitRead(...) + /usr/local/go/src/internal/poll/fd_poll_runtime.go:89 +internal/poll.(*FD).Read(0xc00034c100, {0xc000365000, 0x1000, 0x1000}) + /usr/local/go/src/internal/poll/fd_unix.go:164 +0x3e5 +net.(*netFD).Read(0xc00034c100, {0xc000365000, 0x1000, 0x1000}) + /usr/local/go/src/net/fd_posix.go:55 +0x4b +net.(*conn).Read(0xc000346030, {0xc000365000, 0x1000, 0x1000}) + /usr/local/go/src/net/net.go:179 +0xad +bufio.(*Reader).Read(0xc000344240, {0xc0003cc6d9, 0x7, 0x7}) + /usr/local/go/src/bufio/bufio.go:244 +0x4be +io.ReadAtLeast({0x929c40, 0xc000344240}, {0xc0003cc6d9, 0x7, 0x7}, 0x7) + /usr/local/go/src/io/io.go:335 +0xd0 +io.ReadFull(...) + /usr/local/go/src/io/io.go:354 +github.com/rabbitmq/amqp091-go.(*reader).ReadFrame(0xc00016bf18) + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/read.go:49 +0x98 +github.com/rabbitmq/amqp091-go.(*Connection).reader(0xc00017e160, {0x929fa0?, 0xc000346030}) + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:726 +0x2ab +created by github.com/rabbitmq/amqp091-go.Open in goroutine 19 + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:271 +0x67a + +goroutine 137 [select]: +github.com/rabbitmq/amqp091-go.(*Connection).heartbeater(0xc000056000, 0x1bf08eb00, 0xc00008e420) + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:761 +0x26d +created by github.com/rabbitmq/amqp091-go.(*Connection).openTune in goroutine 19 + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:1016 +0xb8c + +goroutine 238 [select]: +github.com/rabbitmq/amqp091-go.(*Connection).heartbeater(0xc000056840, 0x1bf08eb00, 0xc00008f3e0) + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:761 +0x26d +created by github.com/rabbitmq/amqp091-go.(*Connection).openTune in goroutine 170 + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:1016 +0xb8c + +goroutine 198 [select]: +github.com/rabbitmq/amqp091-go.(*Connection).heartbeater(0xc000056160, 0x1bf08eb00, 0xc000183b60) + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:761 +0x26d +created by github.com/rabbitmq/amqp091-go.(*Connection).openTune in goroutine 19 + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:1016 +0xb8c + +goroutine 162 [IO wait]: +internal/poll.runtime_pollWait(0x7f8b9ce04a88, 0x72) + /usr/local/go/src/runtime/netpoll.go:343 +0x85 +internal/poll.(*pollDesc).wait(0xc000002120, 0xc0000d8000?, 0x0) + /usr/local/go/src/internal/poll/fd_poll_runtime.go:84 +0xb1 +internal/poll.(*pollDesc).waitRead(...) + /usr/local/go/src/internal/poll/fd_poll_runtime.go:89 +internal/poll.(*FD).Read(0xc000002100, {0xc0000d8000, 0x1000, 0x1000}) + /usr/local/go/src/internal/poll/fd_unix.go:164 +0x3e5 +net.(*netFD).Read(0xc000002100, {0xc0000d8000, 0x1000, 0x1000}) + /usr/local/go/src/net/fd_posix.go:55 +0x4b +net.(*conn).Read(0xc000346048, {0xc0000d8000, 0x1000, 0x1000}) + /usr/local/go/src/net/net.go:179 +0xad +bufio.(*Reader).Read(0xc00008e240, {0xc0004db449, 0x7, 0x7}) + /usr/local/go/src/bufio/bufio.go:244 +0x4be +io.ReadAtLeast({0x929c40, 0xc00008e240}, {0xc0004db449, 0x7, 0x7}, 0x7) + /usr/local/go/src/io/io.go:335 +0xd0 +io.ReadFull(...) + /usr/local/go/src/io/io.go:354 +github.com/rabbitmq/amqp091-go.(*reader).ReadFrame(0xc0003ddf18) + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/read.go:49 +0x98 +github.com/rabbitmq/amqp091-go.(*Connection).reader(0xc000056000, {0x929fa0?, 0xc000346048}) + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:726 +0x2ab +created by github.com/rabbitmq/amqp091-go.Open in goroutine 19 + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:271 +0x67a + +goroutine 294 [IO wait]: +internal/poll.runtime_pollWait(0x7f8b9ce04898, 0x72) + /usr/local/go/src/runtime/netpoll.go:343 +0x85 +internal/poll.(*pollDesc).wait(0xc00014a620, 0xc0001c5000?, 0x0) + /usr/local/go/src/internal/poll/fd_poll_runtime.go:84 +0xb1 +internal/poll.(*pollDesc).waitRead(...) + /usr/local/go/src/internal/poll/fd_poll_runtime.go:89 +internal/poll.(*FD).Read(0xc00014a600, {0xc0001c5000, 0x1000, 0x1000}) + /usr/local/go/src/internal/poll/fd_unix.go:164 +0x3e5 +net.(*netFD).Read(0xc00014a600, {0xc0001c5000, 0x1000, 0x1000}) + /usr/local/go/src/net/fd_posix.go:55 +0x4b +net.(*conn).Read(0xc0002da1d0, {0xc0001c5000, 0x1000, 0x1000}) + /usr/local/go/src/net/net.go:179 +0xad +bufio.(*Reader).Read(0xc0002d98c0, {0xc0003cc459, 0x7, 0x7}) + /usr/local/go/src/bufio/bufio.go:244 +0x4be +io.ReadAtLeast({0x929c40, 0xc0002d98c0}, {0xc0003cc459, 0x7, 0x7}, 0x7) + /usr/local/go/src/io/io.go:335 +0xd0 +io.ReadFull(...) + /usr/local/go/src/io/io.go:354 +github.com/rabbitmq/amqp091-go.(*reader).ReadFrame(0xc000169f18) + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/read.go:49 +0x98 +github.com/rabbitmq/amqp091-go.(*Connection).reader(0xc000056840, {0x929fa0?, 0xc0002da1d0}) + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:726 +0x2ab +created by github.com/rabbitmq/amqp091-go.Open in goroutine 170 + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:271 +0x67a + +goroutine 77 [select]: +github.com/rabbitmq/amqp091-go.(*Connection).heartbeater(0xc00017e160, 0x1bf08eb00, 0xc000183200) + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:761 +0x26d +created by github.com/rabbitmq/amqp091-go.(*Connection).openTune in goroutine 19 + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:1016 +0xb8c + +goroutine 123 [IO wait]: +internal/poll.runtime_pollWait(0x7f8b9ce04b80, 0x72) + /usr/local/go/src/runtime/netpoll.go:343 +0x85 +internal/poll.(*pollDesc).wait(0xc000128520, 0xc00033d000?, 0x0) + /usr/local/go/src/internal/poll/fd_poll_runtime.go:84 +0xb1 +internal/poll.(*pollDesc).waitRead(...) + /usr/local/go/src/internal/poll/fd_poll_runtime.go:89 +internal/poll.(*FD).Read(0xc000128500, {0xc00033d000, 0x1000, 0x1000}) + /usr/local/go/src/internal/poll/fd_unix.go:164 +0x3e5 +net.(*netFD).Read(0xc000128500, {0xc00033d000, 0x1000, 0x1000}) + /usr/local/go/src/net/fd_posix.go:55 +0x4b +net.(*conn).Read(0xc00017c110, {0xc00033d000, 0x1000, 0x1000}) + /usr/local/go/src/net/net.go:179 +0xad +bufio.(*Reader).Read(0xc000182e40, {0xc00052b5d9, 0x7, 0x7}) + /usr/local/go/src/bufio/bufio.go:244 +0x4be +io.ReadAtLeast({0x929c40, 0xc000182e40}, {0xc00052b5d9, 0x7, 0x7}, 0x7) + /usr/local/go/src/io/io.go:335 +0xd0 +io.ReadFull(...) + /usr/local/go/src/io/io.go:354 +github.com/rabbitmq/amqp091-go.(*reader).ReadFrame(0xc0003d9f18) + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/read.go:49 +0x98 +github.com/rabbitmq/amqp091-go.(*Connection).reader(0xc00017e6e0, {0x929fa0?, 0xc00017c110}) + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:726 +0x2ab +created by github.com/rabbitmq/amqp091-go.Open in goroutine 19 + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:271 +0x67a + +goroutine 212 [IO wait]: +internal/poll.runtime_pollWait(0x7f8b9ce04990, 0x72) + /usr/local/go/src/runtime/netpoll.go:343 +0x85 +internal/poll.(*pollDesc).wait(0xc000128620, 0xc000241000?, 0x0) + /usr/local/go/src/internal/poll/fd_poll_runtime.go:84 +0xb1 +internal/poll.(*pollDesc).waitRead(...) + /usr/local/go/src/internal/poll/fd_poll_runtime.go:89 +internal/poll.(*FD).Read(0xc000128600, {0xc000241000, 0x1000, 0x1000}) + /usr/local/go/src/internal/poll/fd_unix.go:164 +0x3e5 +net.(*netFD).Read(0xc000128600, {0xc000241000, 0x1000, 0x1000}) + /usr/local/go/src/net/fd_posix.go:55 +0x4b +net.(*conn).Read(0xc0003461b8, {0xc000241000, 0x1000, 0x1000}) + /usr/local/go/src/net/net.go:179 +0xad +bufio.(*Reader).Read(0xc00014cd80, {0xc000431469, 0x7, 0x7}) + /usr/local/go/src/bufio/bufio.go:244 +0x4be +io.ReadAtLeast({0x929c40, 0xc00014cd80}, {0xc000431469, 0x7, 0x7}, 0x7) + /usr/local/go/src/io/io.go:335 +0xd0 +io.ReadFull(...) + /usr/local/go/src/io/io.go:354 +github.com/rabbitmq/amqp091-go.(*reader).ReadFrame(0xc0003e3f18) + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/read.go:49 +0x98 +github.com/rabbitmq/amqp091-go.(*Connection).reader(0xc000056160, {0x929fa0?, 0xc0003461b8}) + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:726 +0x2ab +created by github.com/rabbitmq/amqp091-go.Open in goroutine 19 + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:271 +0x67a + +goroutine 308 [sync.Mutex.Lock]: +sync.runtime_SemacquireMutex(0x929960?, 0x0?, 0xc0004252c0?) + /usr/local/go/src/runtime/sema.go:77 +0x25 +sync.(*Mutex).lockSlow(0xc0003c81d8) + /usr/local/go/src/sync/mutex.go:171 +0x213 +sync.(*Mutex).Lock(0xc0003c81d8) + /usr/local/go/src/sync/mutex.go:90 +0x55 +github.com/jxsl13/amqpx/pool.(*Connection).Recover(0xc0003c8160) + /home/behm015/Development/amqpx/pool/connection.go:300 +0x48 +github.com/jxsl13/amqpx/pool.(*Session).recover(0xc00014a100) + /home/behm015/Development/amqpx/pool/session.go:211 +0x46 +github.com/jxsl13/amqpx/pool.(*Session).Recover(0xc00014a100) + /home/behm015/Development/amqpx/pool/session.go:193 +0x89 +github.com/jxsl13/amqpx/pool.(*SessionPool).ReturnSession(0xc00014d260, 0xc00014a100, 0x1) + /home/behm015/Development/amqpx/pool/session_pool.go:155 +0x77 +github.com/jxsl13/amqpx/pool.(*Pool).ReturnSession(...) + /home/behm015/Development/amqpx/pool/pool.go:107 +github.com/jxsl13/amqpx/pool.(*Subscriber).batchConsume.func1() + /home/behm015/Development/amqpx/pool/subscriber.go:393 +0x22c +github.com/jxsl13/amqpx/pool.(*Subscriber).batchConsume(0xc000128780, 0xc00014c240) + /home/behm015/Development/amqpx/pool/subscriber.go:409 +0x6dc +github.com/jxsl13/amqpx/pool.(*Subscriber).batchConsumer(0xc000128780, 0xc00014c240, 0xc0001287e0) + /home/behm015/Development/amqpx/pool/subscriber.go:359 +0x3e5 +created by github.com/jxsl13/amqpx/pool.(*Subscriber).Start in goroutine 170 + /home/behm015/Development/amqpx/pool/subscriber.go:200 +0x5cf + +goroutine 223 [select]: +github.com/rabbitmq/amqp091-go.(*Connection).heartbeater(0xc0000562c0, 0x1bf08eb00, 0xc0002d8480) + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:761 +0x26d +created by github.com/rabbitmq/amqp091-go.(*Connection).openTune in goroutine 170 + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:1016 +0xb8c + +goroutine 278 [select]: +github.com/rabbitmq/amqp091-go.(*Connection).heartbeater(0xc0000566e0, 0x1bf08eb00, 0xc0001820c0) + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:761 +0x26d +created by github.com/rabbitmq/amqp091-go.(*Connection).openTune in goroutine 170 + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:1016 +0xb8c + +goroutine 306 [IO wait]: +internal/poll.runtime_pollWait(0x7f8b9ce046a8, 0x72) + /usr/local/go/src/runtime/netpoll.go:343 +0x85 +internal/poll.(*pollDesc).wait(0xc0001285a0, 0xc000262000?, 0x0) + /usr/local/go/src/internal/poll/fd_poll_runtime.go:84 +0xb1 +internal/poll.(*pollDesc).waitRead(...) + /usr/local/go/src/internal/poll/fd_poll_runtime.go:89 +internal/poll.(*FD).Read(0xc000128580, {0xc000262000, 0x1000, 0x1000}) + /usr/local/go/src/internal/poll/fd_unix.go:164 +0x3e5 +net.(*netFD).Read(0xc000128580, {0xc000262000, 0x1000, 0x1000}) + /usr/local/go/src/net/fd_posix.go:55 +0x4b +net.(*conn).Read(0xc0002da0d0, {0xc000262000, 0x1000, 0x1000}) + /usr/local/go/src/net/net.go:179 +0xad +bufio.(*Reader).Read(0xc00014cf00, {0xc0003cc819, 0x7, 0x7}) + /usr/local/go/src/bufio/bufio.go:244 +0x4be +io.ReadAtLeast({0x929c40, 0xc00014cf00}, {0xc0003cc819, 0x7, 0x7}, 0x7) + /usr/local/go/src/io/io.go:335 +0xd0 +io.ReadFull(...) + /usr/local/go/src/io/io.go:354 +github.com/rabbitmq/amqp091-go.(*reader).ReadFrame(0xc0003dff18) + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/read.go:49 +0x98 +github.com/rabbitmq/amqp091-go.(*Connection).reader(0xc000056580, {0x929fa0?, 0xc0002da0d0}) + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:726 +0x2ab +created by github.com/rabbitmq/amqp091-go.Open in goroutine 170 + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:271 +0x67a + +goroutine 255 [IO wait]: +internal/poll.runtime_pollWait(0x7f8b9ce047a0, 0x72) + /usr/local/go/src/runtime/netpoll.go:343 +0x85 +internal/poll.(*pollDesc).wait(0xc000128120, 0xc0003ea000?, 0x0) + /usr/local/go/src/internal/poll/fd_poll_runtime.go:84 +0xb1 +internal/poll.(*pollDesc).waitRead(...) + /usr/local/go/src/internal/poll/fd_poll_runtime.go:89 +internal/poll.(*FD).Read(0xc000128100, {0xc0003ea000, 0x1000, 0x1000}) + /usr/local/go/src/internal/poll/fd_unix.go:164 +0x3e5 +net.(*netFD).Read(0xc000128100, {0xc0003ea000, 0x1000, 0x1000}) + /usr/local/go/src/net/fd_posix.go:55 +0x4b +net.(*conn).Read(0xc0002da030, {0xc0003ea000, 0x1000, 0x1000}) + /usr/local/go/src/net/net.go:179 +0xad +bufio.(*Reader).Read(0xc0002d8420, {0xc00046d2a9, 0x7, 0x7}) + /usr/local/go/src/bufio/bufio.go:244 +0x4be +io.ReadAtLeast({0x929c40, 0xc0002d8420}, {0xc00046d2a9, 0x7, 0x7}, 0x7) + /usr/local/go/src/io/io.go:335 +0xd0 +io.ReadFull(...) + /usr/local/go/src/io/io.go:354 +github.com/rabbitmq/amqp091-go.(*reader).ReadFrame(0xc0000abf18) + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/read.go:49 +0x98 +github.com/rabbitmq/amqp091-go.(*Connection).reader(0xc0000562c0, {0x929fa0?, 0xc0002da030}) + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:726 +0x2ab +created by github.com/rabbitmq/amqp091-go.Open in goroutine 170 + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:271 +0x67a + +goroutine 307 [select]: +github.com/rabbitmq/amqp091-go.(*Connection).heartbeater(0xc000056580, 0x1bf08eb00, 0xc000345da0) + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:761 +0x26d +created by github.com/rabbitmq/amqp091-go.(*Connection).openTune in goroutine 170 + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:1016 +0xb8c + +goroutine 310 [sync.Mutex.Lock]: +sync.runtime_SemacquireMutex(0x4b3cce?, 0xd8?, 0x4a9c69?) + /usr/local/go/src/runtime/sema.go:77 +0x25 +sync.(*Mutex).lockSlow(0xc0003c81d8) + /usr/local/go/src/sync/mutex.go:171 +0x213 +sync.(*Mutex).Lock(0xc0003c81d8) + /usr/local/go/src/sync/mutex.go:90 +0x55 +github.com/jxsl13/amqpx/pool.(*Connection).channel(0xc0003c8160) + /home/behm015/Development/amqpx/pool/connection.go:356 +0x59 +github.com/jxsl13/amqpx/pool.(*Session).connect(0xc000128700) + /home/behm015/Development/amqpx/pool/session.go:164 +0x156 +github.com/jxsl13/amqpx/pool.(*Session).recover(0xc000128700) + /home/behm015/Development/amqpx/pool/session.go:219 +0x55 +github.com/jxsl13/amqpx/pool.(*Session).Recover(0xc000128700) + /home/behm015/Development/amqpx/pool/session.go:193 +0x89 +github.com/jxsl13/amqpx/pool.(*SessionPool).ReturnSession(0xc00014d260, 0xc000128700, 0x1) + /home/behm015/Development/amqpx/pool/session_pool.go:155 +0x77 +github.com/jxsl13/amqpx/pool.(*Pool).ReturnSession(...) + /home/behm015/Development/amqpx/pool/pool.go:107 +github.com/jxsl13/amqpx/pool.(*Subscriber).batchConsume.func1() + /home/behm015/Development/amqpx/pool/subscriber.go:393 +0x22c +github.com/jxsl13/amqpx/pool.(*Subscriber).batchConsume(0xc000128780, 0xc00014c300) + /home/behm015/Development/amqpx/pool/subscriber.go:409 +0x6dc +github.com/jxsl13/amqpx/pool.(*Subscriber).batchConsumer(0xc000128780, 0xc00014c300, 0xc0001287e0) + /home/behm015/Development/amqpx/pool/subscriber.go:359 +0x3e5 +created by github.com/jxsl13/amqpx/pool.(*Subscriber).Start in goroutine 170 + /home/behm015/Development/amqpx/pool/subscriber.go:200 +0x5cf + +goroutine 309 [runnable]: +github.com/rabbitmq/amqp091-go.(*allocator).reserve(0xc000505fa0, 0x6d0) + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/allocator.go:103 +0x105 +github.com/rabbitmq/amqp091-go.(*allocator).next(0xc000505fa0) + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/allocator.go:84 +0x176 +github.com/rabbitmq/amqp091-go.(*Connection).allocateChannel(0xc000056580) + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:819 +0xf6 +github.com/rabbitmq/amqp091-go.(*Connection).openChannel(0xc0003c81d8?) + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:847 +0x33 +github.com/rabbitmq/amqp091-go.(*Connection).Channel(...) + /home/behm015/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:873 +github.com/jxsl13/amqpx/pool.(*Connection).channel(0xc0003c8160) + /home/behm015/Development/amqpx/pool/connection.go:358 +0xb6 +github.com/jxsl13/amqpx/pool.(*Session).connect(0xc00014a180) + /home/behm015/Development/amqpx/pool/session.go:164 +0x156 +github.com/jxsl13/amqpx/pool.(*Session).recover(0xc00014a180) + /home/behm015/Development/amqpx/pool/session.go:219 +0x55 +github.com/jxsl13/amqpx/pool.(*Session).Recover(0xc00014a180) + /home/behm015/Development/amqpx/pool/session.go:193 +0x89 +github.com/jxsl13/amqpx/pool.(*SessionPool).ReturnSession(0xc00014d260, 0xc00014a180, 0x1) + /home/behm015/Development/amqpx/pool/session_pool.go:155 +0x77 +github.com/jxsl13/amqpx/pool.(*Pool).ReturnSession(...) + /home/behm015/Development/amqpx/pool/pool.go:107 +github.com/jxsl13/amqpx/pool.(*Subscriber).batchConsume.func1() + /home/behm015/Development/amqpx/pool/subscriber.go:393 +0x22c +github.com/jxsl13/amqpx/pool.(*Subscriber).batchConsume(0xc000128780, 0xc00014c2a0) + /home/behm015/Development/amqpx/pool/subscriber.go:409 +0x6dc +github.com/jxsl13/amqpx/pool.(*Subscriber).batchConsumer(0xc000128780, 0xc00014c2a0, 0xc0001287e0) + /home/behm015/Development/amqpx/pool/subscriber.go:359 +0x3e5 +created by github.com/jxsl13/amqpx/pool.(*Subscriber).Start in goroutine 170 + /home/behm015/Development/amqpx/pool/subscriber.go:200 +0x5cf +FAIL github.com/jxsl13/amqpx 600.024s +FAIL \ No newline at end of file diff --git a/Makefile b/Makefile index 281401c..22ff27e 100644 --- a/Makefile +++ b/Makefile @@ -4,5 +4,8 @@ environment: docker-compose up -d +down: + docker-compose down + test: - go test -v -race -count=1 ./... \ No newline at end of file + go test -v -race -count=1 ./... diff --git a/amqpx.go b/amqpx.go index e05e963..ae158a8 100644 --- a/amqpx.go +++ b/amqpx.go @@ -2,6 +2,7 @@ package amqpx import ( "context" + "errors" "fmt" "strings" "sync" @@ -210,14 +211,20 @@ func (a *AMQPX) Start(connectUrl string, options ...Option) (err error) { if err != nil { return } - a.sub = pool.NewSubscriber(subPool, pool.SubscriberWithAutoClosePool(true)) + a.sub = pool.NewSubscriber(subPool, + pool.SubscriberWitCloseTimeout(a.closeTimeout), + pool.SubscriberWithAutoClosePool(true), + ) for _, h := range a.handlers { a.sub.RegisterHandler(h) } for _, bh := range a.batchHandlers { a.sub.RegisterBatchHandler(bh) } - a.sub.Start() + err = a.sub.Start() + if err != nil { + return + } } }) return err @@ -233,15 +240,14 @@ func (a *AMQPX) close() (err error) { a.closeOnce.Do(func() { if a.sub != nil { - a.sub.Close() + err = errors.Join(err, a.sub.Close()) } if a.pub != nil { a.pub.Close() } - if a.pubPool != nil { - + if a.pubPool != nil && len(a.topologyDeleters) > 0 { ctx, cancel := context.WithTimeout(context.Background(), a.closeTimeout) defer cancel() @@ -251,13 +257,11 @@ func (a *AMQPX) close() (err error) { pool.TopologerWithTransientSessions(true), ) for _, f := range a.topologyDeleters { - e := f(topologer) - if e != nil { - err = e - return - } + err = errors.Join(err, f(topologer)) } + } + if a.pubPool != nil { // finally close the publisher pool // which is also used for topology. a.pubPool.Close() diff --git a/amqpx_test.go b/amqpx_test.go index f0c6a19..2e8ff2f 100644 --- a/amqpx_test.go +++ b/amqpx_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "os/signal" + "sync" "syscall" "testing" "time" @@ -21,7 +22,12 @@ var ( ) func TestMain(m *testing.M) { - goleak.VerifyTestMain(m) + goleak.VerifyTestMain( + m, + goleak.IgnoreTopFunction("internal/poll.runtime_pollWait"), + goleak.IgnoreTopFunction("github.com/rabbitmq/amqp091-go.(*Connection).heartbeater"), + goleak.IgnoreTopFunction("net/http.(*persistConn).writeLoop"), + ) } func TestExchangeDeclarePassive(t *testing.T) { @@ -43,7 +49,6 @@ func TestExchangeDeclarePassive(t *testing.T) { amqpx.WithLogger(log), amqpx.WithPublisherConnections(1), amqpx.WithPublisherSessions(2), - amqpx.WithPoolOption(pool.WithSlowClose(true)), // needed for goroutine leak tests ) assert.NoError(t, err) } @@ -67,7 +72,6 @@ func TestQueueDeclarePassive(t *testing.T) { amqpx.WithLogger(log), amqpx.WithPublisherConnections(1), amqpx.WithPublisherSessions(2), - amqpx.WithPoolOption(pool.WithSlowClose(true)), // needed for goroutine leak tests ) assert.NoError(t, err) } @@ -84,7 +88,6 @@ func TestAMQPXPub(t *testing.T) { amqpx.WithLogger(log), amqpx.WithPublisherConnections(1), amqpx.WithPublisherSessions(2), - amqpx.WithPoolOption(pool.WithSlowClose(true)), // needed for goroutine leak tests ) if err != nil { assert.NoError(t, err) @@ -156,7 +159,6 @@ func TestAMQPXSubAndPub(t *testing.T) { amqpx.WithLogger(log), amqpx.WithPublisherConnections(1), amqpx.WithPublisherSessions(5), - amqpx.WithPoolOption(pool.WithSlowClose(true)), // needed for goroutine leaks tests ) if err != nil { assert.NoError(t, err) @@ -237,7 +239,6 @@ func TestAMQPXSubAndPubMulti(t *testing.T) { amqpx.WithLogger(log), amqpx.WithPublisherConnections(1), amqpx.WithPublisherSessions(5), - amqpx.WithPoolOption(pool.WithSlowClose(true)), // needed for goroutine leak tests ) if err != nil { assert.NoError(t, err) @@ -285,7 +286,6 @@ func TestAMQPXSubHandler(t *testing.T) { amqpx.WithLogger(log), amqpx.WithPublisherConnections(1), amqpx.WithPublisherSessions(5), - amqpx.WithPoolOption(pool.WithSlowClose(true)), // needed for goroutine leaks tests ) if err != nil { assert.NoError(t, err) @@ -322,12 +322,96 @@ func TestCreateDeleteTopology(t *testing.T) { amqpx.WithLogger(log), amqpx.WithPublisherConnections(1), amqpx.WithPublisherSessions(2), - amqpx.WithPoolOption(pool.WithSlowClose(true)), // needed for goroutine leak tests ) assert.NoError(t, err) } +func TestPauseResumeHandlerNoProcessing(t *testing.T) { + queueName := "testPauseResumeHandler-01" + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGINT) + defer cancel() + + log := logging.NewNoOpLogger() + transientPool, err := pool.New( + connectURL, + 1, + 1, + pool.WithLogger(log), + pool.WithContext(ctx), + ) + require.NoError(t, err) + defer transientPool.Close() + + ts, err := transientPool.GetTransientSession(ctx) + require.NoError(t, err) + defer func() { + transientPool.ReturnSession(ts, false) + }() + + amqp := amqpx.New() + amqp.RegisterTopologyCreator(func(t *pool.Topologer) error { + _, err := t.QueueDeclare(queueName) + if err != nil { + return err + } + return nil + }) + + amqp.RegisterTopologyDeleter(func(t *pool.Topologer) error { + _, err := t.QueueDelete(queueName) + if err != nil { + return err + } + return nil + }) + + handler := amqp.RegisterHandler(queueName, func(d pool.Delivery) error { + log.Info("received message") + return nil + }) + + err = amqp.Start(connectURL, amqpx.WithLogger(log)) + if err != nil { + assert.NoError(t, err) + return + } + defer func() { + err = amqp.Close() + assert.NoError(t, err) + }() + + for i := 0; i < 5; i++ { + + assertConsumers(t, ts, queueName, 1) + assertActive(t, handler, true) + + err = handler.Pause(context.Background()) + if err != nil { + assert.NoError(t, err) + return + } + + assertConsumers(t, ts, queueName, 0) + assertActive(t, handler, false) + + err = handler.Resume(context.Background()) + if err != nil { + assert.NoError(t, err) + return + } + + assertActive(t, handler, true) + assertConsumers(t, ts, queueName, 1) + } +} + func TestHandlerPauseAndResume(t *testing.T) { + for i := 0; i < 5; i++ { + testHandlerPauseAndResume(t) + } +} + +func testHandlerPauseAndResume(t *testing.T) { ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGINT) defer cancel() @@ -337,8 +421,7 @@ func TestHandlerPauseAndResume(t *testing.T) { options := []amqpx.Option{ amqpx.WithLogger(log), amqpx.WithPublisherConnections(1), - amqpx.WithPublisherSessions(5), // only slow close once in the transient pool - amqpx.WithPoolOption(pool.WithSlowClose(true)), // needed for goroutine leaks tests + amqpx.WithPublisherSessions(5), // only slow close once in the transient pool } transientPool, err := pool.New( @@ -347,7 +430,6 @@ func TestHandlerPauseAndResume(t *testing.T) { 1, pool.WithLogger(log), pool.WithContext(ctx), - pool.WithSlowClose(true), // needed for goroutine leaks tests ) require.NoError(t, err) defer transientPool.Close() @@ -388,16 +470,12 @@ func TestHandlerPauseAndResume(t *testing.T) { // step 2 - process messages, pause, wait, resume, process rest, cancel context handler01 := amqpx.RegisterHandler("queue-01", func(msg pool.Delivery) (err error) { cnt++ - if cnt == publish/2 { + if cnt == publish/3 || cnt == publish/3*2 { err = amqpx.Publish("exchange-02", "event-02", pool.Publishing{ ContentType: "application/json", Body: []byte(fmt.Sprintf("%s: hit %d messages, toggling processing", eventContent, cnt)), }) - require.NoError(t, err) - } - - if cnt == publish { - cancel() + assert.NoError(t, err) } return nil @@ -406,120 +484,87 @@ func TestHandlerPauseAndResume(t *testing.T) { running := true amqpx.RegisterHandler("queue-02", func(msg pool.Delivery) (err error) { log.Infof("received toggle request: %s", string(msg.Body)) - if running { - beforePause01, err := ts.QueueDeclarePassive("queue-01") - assert.NoError(t, err) - if err != nil { - return nil - } - assert.Equal(t, 1, beforePause01.Consumers, "should have one consumer at p1 before pausing") + queue := handler01.Queue() - err = handler01.Pause(context.Background()) - require.NoError(t, err) - log.Infof("paused processing of %s", handler01.Queue()) + if running { running = false - active, err := handler01.IsActive(context.Background()) - if err != nil { - assert.NoError(t, err, "IsActive after pause failed") - return nil - } - - assert.Equal(t, running, active, "expected active to be false") - - // rabbitmq broker needs some time to update its internal consumer state - // we already know that we stopped consuming before the broker can update its internal state. - time.Sleep(5 * time.Second) - afterPause01, err := ts.QueueDeclarePassive("queue-01") - if err != nil { - assert.NoError(t, err) - return nil - } - - // after pause, there should be no more consumers on that queue - assert.Equal(t, 0, afterPause01.Consumers, "should have no consumers at p2 after pausing") + assertActive(t, handler01, true) + assertConsumers(t, ts, queue, 1) - err = amqpx.Publish("exchange-03", "event-03", pool.Publishing{ - ContentType: "application/json", - Body: []byte(fmt.Sprintf("%s: delayed toggle back", eventContent)), - }) + err = handler01.Pause(context.Background()) assert.NoError(t, err) + assertActive(t, handler01, false) + assertConsumers(t, ts, queue, 0) } else { + running = true - beforeResume01, err := ts.QueueDeclarePassive("queue-01") - assert.NoError(t, err) - if err != nil { - return nil - } - assert.Equal(t, 0, beforeResume01.Consumers, "should have no consumers at r1 before resuming") + assertActive(t, handler01, false) + assertConsumers(t, ts, queue, 0) err = handler01.Resume(context.Background()) - require.NoError(t, err) - log.Infof("resumed processing of %s", handler01.Queue()) - running = true - - active, err := handler01.IsActive(context.Background()) - if err != nil { - assert.NoError(t, err, "failed to check IsActive after resuming") - // do not return any errors or we will bounce - return nil - } + assert.NoError(t, err) + log.Infof("resumed processing of %s", queue) - assert.Equal(t, running, active, "expected active to be true after resume") + assertActive(t, handler01, true) + assertConsumers(t, ts, queue, 1) - afterResume01, err := ts.QueueDeclarePassive("queue-01") + // trigger cancelation + err = amqpxPublish.Publish("exchange-03", "event-03", pool.Publishing{ + ContentType: "application/json", + Body: []byte(fmt.Sprintf("%s: delayed toggle back", eventContent)), + }) assert.NoError(t, err) - if err != nil { - return nil - } - assert.Equal(t, 1, afterResume01.Consumers, "should have 1 consumer at r2 after resuming") } return nil }) amqpx.RegisterHandler("queue-03", func(msg pool.Delivery) (err error) { - defer func() { - // always resume processing - err = amqpx.Publish("exchange-02", "event-02", pool.Publishing{ - ContentType: "application/json", - Body: []byte(fmt.Sprintf("%s: delayed toggle", eventContent)), - }) - require.NoError(t, err) - }() - q1, err := ts.QueueDeclarePassive("queue-01") - assert.NoError(t, err) + queue := handler01.Queue() + + assertActive(t, handler01, true) + err = handler01.Pause(context.Background()) + if err != nil { + assert.NoError(t, err) + return nil + } + assertActive(t, handler01, false) + + q1, err := ts.QueueDeclarePassive(queue) if err != nil { + assert.NoError(t, err) return nil } // wait for potential further processing time.Sleep(3 * time.Second) - q2, err := ts.QueueDeclarePassive("queue-01") + q2, err := ts.QueueDeclarePassive(queue) assert.NoError(t, err) if err != nil { return nil } - assert.Equal(t, q1, q2) - assert.Equal(t, 0, q1.Consumers, "should have no consumers at q1") - assert.Equal(t, 0, q2.Consumers, "should have no consumers at q2") + assert.Equal(t, q1, q2) // message count should also be equal + assertConsumers(t, ts, queue, 0) + go func() { + // delay cancelation (due to ack) + time.Sleep(3 * time.Second) + cancel() + }() return nil }) - active, err := handler01.IsActive(context.Background()) - require.NoError(t, err) - assert.False(t, active, "handler should not be active before amqpx has been started") + assertActive(t, handler01, false) err = amqpx.Start( connectURL, amqpx.WithLogger(log), amqpx.WithPublisherConnections(1), amqpx.WithPublisherSessions(5), - amqpx.WithPoolOption(pool.WithSlowClose(true)), // needed for goroutine leaks tests ) if err != nil { assert.NoError(t, err) @@ -533,7 +578,14 @@ func TestHandlerPauseAndResume(t *testing.T) { assert.NoError(t, err) } + func TestBatchHandlerPauseAndResume(t *testing.T) { + for i := 0; i < 5; i++ { + testBatchHandlerPauseAndResume(t) + } +} + +func testBatchHandlerPauseAndResume(t *testing.T) { ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGINT) defer cancel() @@ -543,8 +595,7 @@ func TestBatchHandlerPauseAndResume(t *testing.T) { options := []amqpx.Option{ amqpx.WithLogger(log), amqpx.WithPublisherConnections(1), - amqpx.WithPublisherSessions(5), // only slow close once in the transient pool - amqpx.WithPoolOption(pool.WithSlowClose(true)), // needed for goroutine leaks tests + amqpx.WithPublisherSessions(5), // only slow close once in the transient pool } transientPool, err := pool.New( @@ -553,7 +604,6 @@ func TestBatchHandlerPauseAndResume(t *testing.T) { 1, pool.WithLogger(log), pool.WithContext(ctx), - pool.WithSlowClose(true), // needed for goroutine leaks tests ) require.NoError(t, err) defer transientPool.Close() @@ -570,7 +620,7 @@ func TestBatchHandlerPauseAndResume(t *testing.T) { eventContent := "TestHandlerPauseAndResume - event content" var ( - publish = 5000 + publish = 10000 cnt = 0 ) @@ -593,144 +643,109 @@ func TestBatchHandlerPauseAndResume(t *testing.T) { // step 2 - process messages, pause, wait, resume, process rest, cancel context handler01 := amqpx.RegisterBatchHandler("queue-01", func(msgs []pool.Delivery) (err error) { - _ = msgs[0] - cnt += (len(msgs)) - if cnt == publish/2 { - err = amqpx.Publish("exchange-02", "event-02", pool.Publishing{ - ContentType: "application/json", - Body: []byte(fmt.Sprintf("%s: hit %d messages, toggling processing", eventContent, cnt)), - }) - require.NoError(t, err) - } - - if cnt == publish { - cancel() + for _, msg := range msgs { + cnt++ + if cnt == publish/3 || cnt == publish/3*2 { + err = amqpx.Publish("exchange-02", "event-02", pool.Publishing{ + ContentType: "application/json", + Body: []byte(fmt.Sprintf("%s: hit %d messages, toggling processing: %s", eventContent, cnt, string(msg.Body))), + }) + assert.NoError(t, err) + } } - return nil - }, pool.WithMaxBatchSize(1), pool.WithBatchFlushTimeout(10*time.Second)) + }) running := true amqpx.RegisterBatchHandler("queue-02", func(msgs []pool.Delivery) (err error) { - msg := msgs[0] + queue := handler01.Queue() - log.Infof("received toggle request: %s", string(msg.Body)) - if running { - beforePause01, err := ts.QueueDeclarePassive("queue-01") - assert.NoError(t, err) - if err != nil { - return nil - } - assert.Equal(t, 1, beforePause01.Consumers, "should have one consumer at p1 before pausing") + for _, msg := range msgs { + log.Infof("received toggle request: %s", string(msg.Body)) + if running { + running = false - err = handler01.Pause(context.Background()) - require.NoError(t, err) - log.Infof("paused processing of %s", handler01.Queue()) - running = false + assertActive(t, handler01, true) + assertConsumers(t, ts, queue, 1) - active, err := handler01.IsActive(context.Background()) - if err != nil { - assert.NoError(t, err, "IsActive after pause failed") - return nil - } + err = handler01.Pause(context.Background()) + assert.NoError(t, err) - assert.Equal(t, running, active, "expected active to be false") + assertActive(t, handler01, false) + assertConsumers(t, ts, queue, 0) + } else { + running = true - // rabbitmq broker needs some time to update its internal consumer state - // we already know that we stopped consuming before the broker can update its internal state. - time.Sleep(5 * time.Second) - afterPause01, err := ts.QueueDeclarePassive("queue-01") - if err != nil { + assertActive(t, handler01, false) + assertConsumers(t, ts, queue, 0) + + err = handler01.Resume(context.Background()) assert.NoError(t, err) - return nil - } + log.Infof("resumed processing of %s", queue) - // after pause, there should be no more consumers on that queue - assert.Equal(t, 0, afterPause01.Consumers, "should have no consumers at p2 after pausing") + assertActive(t, handler01, true) + assertConsumers(t, ts, queue, 1) - err = amqpx.Publish("exchange-03", "event-03", pool.Publishing{ - ContentType: "application/json", - Body: []byte(fmt.Sprintf("%s: delayed toggle back", eventContent)), - }) - assert.NoError(t, err) + // trigger cancelation + err = amqpxPublish.Publish("exchange-03", "event-03", pool.Publishing{ + ContentType: "application/json", + Body: []byte(fmt.Sprintf("%s: delayed toggle back", eventContent)), + }) + assert.NoError(t, err) + } + } + return nil + }) - } else { + var once sync.Once + amqpx.RegisterBatchHandler("queue-03", func(msgs []pool.Delivery) (err error) { + _ = msgs[0] + once.Do(func() { + queue := handler01.Queue() - beforeResume01, err := ts.QueueDeclarePassive("queue-01") - assert.NoError(t, err) + assertActive(t, handler01, true) + err = handler01.Pause(context.Background()) if err != nil { - return nil + assert.NoError(t, err) + return } - assert.Equal(t, 0, beforeResume01.Consumers, "should have no consumers at r1 before resuming") + assertActive(t, handler01, false) - err = handler01.Resume(context.Background()) - require.NoError(t, err) - log.Infof("resumed processing of %s", handler01.Queue()) - running = true - - active, err := handler01.IsActive(context.Background()) + q1, err := ts.QueueDeclarePassive(queue) if err != nil { - assert.NoError(t, err, "failed to check IsActive after resuming") - // do not return any errors or we will bounce - return nil + assert.NoError(t, err) + return } - assert.Equal(t, running, active, "expected active to be true after resume") + // wait for potential further processing + time.Sleep(3 * time.Second) - afterResume01, err := ts.QueueDeclarePassive("queue-01") - assert.NoError(t, err) + q2, err := ts.QueueDeclarePassive(queue) if err != nil { - return nil + assert.NoError(t, err) + return } - assert.Equal(t, 1, afterResume01.Consumers, "should have 1 consumer at r2 after resuming") - } - return nil - }, pool.WithMaxBatchSize(1), pool.WithBatchFlushTimeout(10*time.Second)) - - amqpx.RegisterBatchHandler("queue-03", func(msgs []pool.Delivery) (err error) { - _ = msgs[0] - - defer func() { - // always resume processing - err = amqpx.Publish("exchange-02", "event-02", pool.Publishing{ - ContentType: "application/json", - Body: []byte(fmt.Sprintf("%s: delayed toggle", eventContent)), - }) - require.NoError(t, err) - }() - q1, err := ts.QueueDeclarePassive("queue-01") - assert.NoError(t, err) - if err != nil { - return nil - } - - // wait for potential further processing - time.Sleep(3 * time.Second) - - q2, err := ts.QueueDeclarePassive("queue-01") - assert.NoError(t, err) - if err != nil { - return nil - } + assert.Equal(t, q1, q2) // message count should also be equal + assertConsumers(t, ts, queue, 0) - assert.Equal(t, q1, q2) - assert.Equal(t, 0, q1.Consumers, "should have no consumers at q1") - assert.Equal(t, 0, q2.Consumers, "should have no consumers at q2") + go func() { + // delay cancelation (due to ack) + time.Sleep(3 * time.Second) + cancel() + }() + }) return nil - }, pool.WithMaxBatchSize(1), pool.WithBatchFlushTimeout(10*time.Second)) + }) - active, err := handler01.IsActive(context.Background()) - require.NoError(t, err) - assert.False(t, active, "handler should not be active before amqpx has been started") + assertActive(t, handler01, false) err = amqpx.Start( connectURL, amqpx.WithLogger(log), amqpx.WithPublisherConnections(1), amqpx.WithPublisherSessions(5), - amqpx.WithPoolOption(pool.WithSlowClose(true)), // needed for goroutine leaks tests ) if err != nil { assert.NoError(t, err) @@ -743,3 +758,40 @@ func TestBatchHandlerPauseAndResume(t *testing.T) { err = amqpx.Close() assert.NoError(t, err) } + +func assertConsumers(t *testing.T, ts *pool.Session, queueName string, expected int) { + // rabbitMQ needs some time before it updates its consumer count + time.Sleep(time.Second) + queue, err := ts.QueueDeclarePassive(queueName) + if err != nil { + assert.NoError(t, err) + return + } + + if expected != queue.Consumers { + assert.Equal(t, expected, queue.Consumers, "consumer count of queue %s should be %d", queueName, expected) + return + } +} + +func assertActive(t *testing.T, handler handlerStats, expected bool) { + active, err := handler.IsActive(context.Background()) + if err != nil { + assert.NoError(t, err) + return + } + + if expected != active { + as := "active" + if !expected { + as = "inactive" + } + assert.Equalf(t, expected, active, "expected handler of queue %s to be %q", handler.Queue(), as) + return + } +} + +type handlerStats interface { + Queue() string + IsActive(ctx context.Context) (active bool, err error) +} diff --git a/helpers_test.go b/helpers_test.go index 180a842..40eb281 100644 --- a/helpers_test.go +++ b/helpers_test.go @@ -97,7 +97,7 @@ func createQueue(name string, t *pool.Topologer) (err error) { if err != nil { return fmt.Errorf("queue %s was found even tho it should not exist: %w", name, err) } - return fmt.Errorf("queue %s was found even tho it should not exist: %v", name, err) + return fmt.Errorf("queue %s was found even tho it should not exist", name) } _, err = t.QueueDeclare(name) @@ -136,7 +136,7 @@ func createExchange(name string, t *pool.Topologer) (err error) { if err != nil { return fmt.Errorf("exchange %s was found even tho it should not exist: %w", name, err) } - return fmt.Errorf("exchange %s was found even tho it should not exist: %v", name, err) + return fmt.Errorf("exchange %s was found even tho it should not exist", name) } err = t.ExchangeDeclare(name, pool.ExchangeKindTopic) diff --git a/pool/connection.go b/pool/connection.go index efe04b0..f9b5fa2 100644 --- a/pool/connection.go +++ b/pool/connection.go @@ -39,8 +39,6 @@ type Connection struct { cancel context.CancelFunc log logging.Logger - - slowClose bool } // NewConnection creates a connection wrapper. @@ -54,7 +52,6 @@ func NewConnection(connectUrl, name string, options ...ConnectionOption) (*Conne ConnectionTimeout: 30 * time.Second, BackoffPolicy: newDefaultBackoffPolicy(time.Second, 15*time.Second), Ctx: context.Background(), - SlowClose: false, } // apply options @@ -96,8 +93,6 @@ func NewConnection(connectUrl, name string, options ...ConnectionOption) (*Conne log: option.Logger, lastConnLoss: time.Now(), - - slowClose: option.SlowClose, // for leak tests } err = conn.Connect() @@ -133,28 +128,7 @@ func (ch *Connection) Close() (err error) { ch.cancel() // close derived context if !ch.isClosed() { - // wait for dangling goroutines to timeout before closing. - // upon recovery the standard library still has some goroutines open - // that are only closed upon some tcp connection timeout. - // Those routinges poll the network. - awaitTimeout := time.Until(ch.lastConnLoss.Add(ch.conn.Config.Heartbeat)) - if ch.slowClose && awaitTimeout > 0 { - // in long running applications that were able to reestablish their connection - // this sleep should not affect their shutdown duration much. - // in short runing applications like the tests, shutdown takes longer, as we - // frequently kill the network connection in order to test the reconnects - // which requires to wait for the background network connections to timeout - // in order to prevent dangling goroutines from being killed. - time.Sleep(awaitTimeout) - } - return ch.conn.Close() // close internal channel - } else { - // same wait as above case - awaitTimeout := time.Until(ch.lastConnLoss.Add(ch.heartbeat)) - if ch.slowClose && awaitTimeout > 0 { - time.Sleep(awaitTimeout) - } } return nil diff --git a/pool/connection_options.go b/pool/connection_options.go index 4e4482c..badbc85 100644 --- a/pool/connection_options.go +++ b/pool/connection_options.go @@ -16,8 +16,6 @@ type connectionOption struct { BackoffPolicy BackoffFunc Ctx context.Context TLSConfig *tls.Config - - SlowClose bool // set to true for goleak tests } type ConnectionOption func(*connectionOption) @@ -81,13 +79,3 @@ func ConnectionWithTLS(config *tls.Config) ConnectionOption { co.TLSConfig = config } } - -// ConnectionWithSlowClose is only needed for integration tests. -// It waits for standard library tcp connection goroutines to properly timeout. -// So that we don't get false positives in our leak tests. -// Set to true in order to wait for dangling goroutines to timeout before closing the connection. -func ConnectionWithSlowClose(slowClose bool) ConnectionOption { - return func(co *connectionOption) { - co.SlowClose = slowClose - } -} diff --git a/pool/connection_pool.go b/pool/connection_pool.go index 8788b81..23a6b2e 100644 --- a/pool/connection_pool.go +++ b/pool/connection_pool.go @@ -32,8 +32,6 @@ type ConnectionPool struct { cancel context.CancelFunc log logging.Logger - - slowClose bool } // NewConnectionPool creates a new connection pool which has a maximum size it @@ -54,8 +52,7 @@ func NewConnectionPool(connectUrl string, numConns int, options ...ConnectionPoo ConnTimeout: 30 * time.Second, TLSConfig: nil, - Logger: logging.NewNoOpLogger(), - SlowClose: false, // for leak tests + Logger: logging.NewNoOpLogger(), } // apply options @@ -94,8 +91,6 @@ func newConnectionPoolFromOption(connectUrl string, option connectionPoolOption) cancel: cancel, log: option.Logger, - - slowClose: option.SlowClose, } cp.debug("initializing pool connections") @@ -139,7 +134,6 @@ func (cp *ConnectionPool) deriveCachedConnection(id int) (*Connection, error) { ConnectionWithTLS(cp.tls), ConnectionWithCached(true), ConnectionWithLogger(cp.log), - ConnectionWithSlowClose(cp.slowClose), ) } @@ -172,7 +166,6 @@ func (cp *ConnectionPool) GetTransientConnection(ctx context.Context) (*Connecti ConnectionWithHeartbeatInterval(cp.heartbeat), ConnectionWithTLS(cp.tls), ConnectionWithCached(false), - ConnectionWithSlowClose(cp.slowClose), ) if err == nil { return conn, nil diff --git a/pool/connection_pool_options.go b/pool/connection_pool_options.go index 454c47d..c9d1d4e 100644 --- a/pool/connection_pool_options.go +++ b/pool/connection_pool_options.go @@ -22,8 +22,6 @@ type connectionPoolOption struct { TLSConfig *tls.Config Logger logging.Logger - - SlowClose bool // for leak tests } type ConnectionPoolOption func(*connectionPoolOption) @@ -142,12 +140,3 @@ func maxi(a, b int) int { } return b } - -// ConnectionPoolWithSlowCLose is onl yinteresting for integration tests of this library. -// We want to wait for dangling tcp standard library goroutines to timeout before we signal that -// the connection pool is closed. This option affects every connection in the connection pool. -func ConnectionPoolWithSlowClose(slowClose bool) ConnectionPoolOption { - return func(po *connectionPoolOption) { - po.SlowClose = slowClose - } -} diff --git a/pool/connection_pool_test.go b/pool/connection_pool_test.go index 959715d..c480cb4 100644 --- a/pool/connection_pool_test.go +++ b/pool/connection_pool_test.go @@ -12,7 +12,7 @@ import ( func TestNewConnectionPool(t *testing.T) { connections := 5 - p, err := pool.NewConnectionPool("amqp://admin:password@localhost:5672", connections, + p, err := pool.NewConnectionPool(connectURL, connections, pool.ConnectionPoolWithName("TestNewConnectionPool"), pool.ConnectionPoolWithLogger(logging.NewTestLogger(t)), ) @@ -42,7 +42,7 @@ func TestNewConnectionPool(t *testing.T) { func TestNewConnectionPoolDisconnect(t *testing.T) { connections := 100 - p, err := pool.NewConnectionPool("amqp://admin:password@localhost:5672", connections, + p, err := pool.NewConnectionPool(connectURL, connections, pool.ConnectionPoolWithName("TestNewConnectionPoolDisconnect"), pool.ConnectionPoolWithLogger(logging.NewTestLogger(t)), ) diff --git a/pool/connection_test.go b/pool/connection_test.go index b33d190..12d95de 100644 --- a/pool/connection_test.go +++ b/pool/connection_test.go @@ -14,10 +14,9 @@ import ( func TestNewSingleConnection(t *testing.T) { c, err := pool.NewConnection( - "amqp://admin:password@localhost:5672", + connectURL, "TestNewSingleConnection", pool.ConnectionWithLogger(logging.NewTestLogger(t)), - pool.ConnectionWithSlowClose(true), ) if err != nil { @@ -35,10 +34,9 @@ func TestNewSingleConnectionWithDisconnect(t *testing.T) { started() defer stopped() c, err := pool.NewConnection( - "amqp://admin:password@localhost:5672", + connectURL, "TestNewSingleConnection", pool.ConnectionWithLogger(logging.NewTestLogger(t)), - pool.ConnectionWithSlowClose(true), ) if err != nil { @@ -61,10 +59,9 @@ func TestNewConnection(t *testing.T) { defer wg.Done() c, err := pool.NewConnection( - "amqp://admin:password@localhost:5672", + connectURL, fmt.Sprintf("TestNewConnection-%d", id), pool.ConnectionWithLogger(logging.NewTestLogger(t)), - pool.ConnectionWithSlowClose(true), ) if err != nil { assert.NoError(t, err) @@ -98,10 +95,9 @@ func TestNewConnectionDisconnect(t *testing.T) { defer wg.Done() c, err := pool.NewConnection( - "amqp://admin:password@localhost:5672", + connectURL, fmt.Sprintf("TestNewConnectionDisconnect-%d", id), //pool.ConnectionWithLogger(logging.NewTestLogger(t)), - pool.ConnectionWithSlowClose(true), ) if err != nil { assert.NoError(t, err) diff --git a/pool/helpers_context.go b/pool/helpers_context.go index f26ee94..c79a90b 100644 --- a/pool/helpers_context.go +++ b/pool/helpers_context.go @@ -2,6 +2,7 @@ package pool import ( "context" + "fmt" "sync" ) @@ -115,12 +116,255 @@ func (c *cancelContext) Reset(parentCtx context.Context) error { return nil } -type done interface { - Done() <-chan struct{} - Context() context.Context +type stateContext struct { + mu sync.Mutex + + parentCtx context.Context + + closed bool + + // canceled upon pausing + pausing *cancelContext + + // canceled upon resuming + resuming *cancelContext + + // back channel to handler + // called from consumer + paused *cancelContext + + // back channel to handler + // called from consumer + resumed *cancelContext +} + +func newStateContext(ctx context.Context) *stateContext { + sc := &stateContext{ + parentCtx: ctx, + pausing: newCancelContext(ctx), + resuming: newCancelContext(ctx), + paused: newCancelContext(ctx), + resumed: newCancelContext(ctx), + } + + sc.pausing.Cancel() + sc.paused.Cancel() + return sc +} + +// reset creates the initial state of the object +// initial state is the transitional state resuming (= startup and resuming after pause) +// the passed context is the parent context of all new contexts that spawn from this. +// After start has been called, all contexts are alive except for the resuming context which is canceled by default. +func (sc *stateContext) Start(ctx context.Context) (err error) { + sc.mu.Lock() + defer sc.mu.Unlock() + + defer func() { + if err != nil { + sc.closeUnguarded() + } + }() + + // override upon startup + sc.parentCtx = ctx + sc.closed = false + + // reset context + err = sc.pausing.Reset(sc.parentCtx) + if err != nil { + return err + } + err = sc.paused.Reset(sc.parentCtx) + if err != nil { + return err + } + err = sc.resuming.Reset(sc.parentCtx) + if err != nil { + return err + } + err = sc.resumed.Reset(sc.parentCtx) + if err != nil { + return err + } + + // cancel last context to indicate the running state + sc.resuming.Cancel() // called last + return nil } -type cancel interface { - Cancel() - CancelWithContext(context.Context) error +func (sc *stateContext) Paused() { + // explicitly NO mutex lock + sc.paused.Cancel() +} + +func (sc *stateContext) Resumed() { + // explicitly NO mutex lock + sc.resumed.Cancel() +} + +func (sc *stateContext) Resuming() doner { + return sc.resuming +} + +func (sc *stateContext) Pausing() doner { + return sc.pausing +} + +func (sc *stateContext) Pause(ctx context.Context) error { + select { + case <-sc.paused.Done(): + // already paused + return nil + default: + // continue + } + + err := func() error { + sc.mu.Lock() + defer sc.mu.Unlock() + err := sc.resuming.Reset(sc.parentCtx) + if err != nil { + return fmt.Errorf("%w: %v", ErrPauseFailed, err) + } + err = sc.resumed.Reset(sc.parentCtx) + if err != nil { + return fmt.Errorf("%w: %v", ErrPauseFailed, err) + } + err = sc.pausing.CancelWithContext(ctx) // must be called last + if err != nil { + return fmt.Errorf("%w: %v", ErrPauseFailed, err) + } + return nil + }() + if err != nil { + return err + } + + select { + case <-sc.paused.Done(): + // waid until paused + return nil + case <-ctx.Done(): + return fmt.Errorf("%w: %v", ErrPauseFailed, ctx.Err()) + } +} + +// Resume allows to continue the processing of a queue after it has been paused using Pause +func (sc *stateContext) Resume(ctx context.Context) error { + select { + case <-sc.resumed.Done(): + // already resumed + return nil + default: + // continue + } + + err := func() error { + sc.mu.Lock() + defer sc.mu.Unlock() + err := sc.pausing.Reset(sc.parentCtx) + if err != nil { + return fmt.Errorf("%w: %v", ErrResumeFailed, err) + } + err = sc.paused.Reset(sc.parentCtx) + if err != nil { + return fmt.Errorf("%w: %v", ErrResumeFailed, err) + } + + err = sc.resuming.CancelWithContext(sc.parentCtx) // must be called last + if err != nil { + return fmt.Errorf("%w: %v", ErrResumeFailed, err) + } + return nil + }() + if err != nil { + return err + } + + select { + case <-sc.resumed.Done(): + // wait until resumed + return nil + case <-ctx.Done(): + return fmt.Errorf("%w: %v", ErrResumeFailed, ctx.Err()) + } +} + +func (sc *stateContext) IsActive(ctx context.Context) (active bool, err error) { + closed := func() bool { + sc.mu.Lock() + defer sc.mu.Unlock() + return sc.closed + }() + if closed { + return false, nil + } + + select { + case <-sc.resumed.Done(): + return true, nil + case <-sc.paused.Done(): + return false, nil + case <-ctx.Done(): + return false, fmt.Errorf("failed to check state: %w", ctx.Err()) + } +} + +func (sc *stateContext) AwaitResumed(ctx context.Context) (err error) { + closed := func() bool { + sc.mu.Lock() + defer sc.mu.Unlock() + return sc.closed + }() + if closed { + return ErrClosed + } + + select { + case <-sc.resumed.Done(): + return nil + case <-ctx.Done(): + return fmt.Errorf("failed to check state: %w", ctx.Err()) + } +} + +func (sc *stateContext) AwaitPaused(ctx context.Context) (err error) { + closed := func() bool { + sc.mu.Lock() + defer sc.mu.Unlock() + return sc.closed + }() + if closed { + return ErrClosed + } + + select { + case <-sc.paused.Done(): + return nil + case <-ctx.Done(): + return fmt.Errorf("failed to check state: %w", ctx.Err()) + } +} + +// close closes all active contexts +// in order to prevent dangling goroutines +// When closing you may want to use pause first and then close for the final cleanup +func (sc *stateContext) Close() { + sc.mu.Lock() + defer sc.mu.Unlock() + sc.closeUnguarded() +} + +func (sc *stateContext) closeUnguarded() { + sc.pausing.Cancel() + sc.paused.Cancel() + sc.resuming.Cancel() + sc.resumed.Cancel() + sc.closed = true +} + +type doner interface { + Done() <-chan struct{} + Context() context.Context } diff --git a/pool/pool.go b/pool/pool.go index 3faaee6..bff25c9 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -44,8 +44,6 @@ func New(connectUrl string, numConns, numSessions int, options ...Option) (*Pool TLSConfig: nil, Logger: logger, - - SlowClose: false, // needed for goroutine leak tests }, spo: sessionPoolOption{ Size: numSessions, diff --git a/pool/pool_options.go b/pool/pool_options.go index a9f1161..7c42c56 100644 --- a/pool/pool_options.go +++ b/pool/pool_options.go @@ -75,14 +75,6 @@ func WithTLS(config *tls.Config) Option { } } -// WithSlowClose is only needed for connection pool and for integration tests. -// This option enables the waiting for goroutine timeouts for every connection it handles. -func WithSlowClose(slowClose bool) Option { - return func(po *poolOption) { - ConnectionPoolWithSlowClose(slowClose)(&po.cpo) - } -} - // WithBufferSize allows to configurethe size of // the confirmation, error & blocker buffers of all sessions func WithBufferSize(size int) Option { diff --git a/pool/pool_test.go b/pool/pool_test.go index 3ea8d31..d0ff8b6 100644 --- a/pool/pool_test.go +++ b/pool/pool_test.go @@ -5,21 +5,31 @@ import ( "testing" "time" + "github.com/jxsl13/amqpx" "github.com/jxsl13/amqpx/logging" "github.com/jxsl13/amqpx/pool" "github.com/stretchr/testify/assert" "go.uber.org/goleak" ) +var ( + connectURL = amqpx.NewURL("localhost", 5672, "admin", "password") +) + func TestMain(m *testing.M) { - goleak.VerifyTestMain(m) + goleak.VerifyTestMain( + m, + goleak.IgnoreTopFunction("internal/poll.runtime_pollWait"), + goleak.IgnoreTopFunction("github.com/rabbitmq/amqp091-go.(*Connection).heartbeater"), + goleak.IgnoreTopFunction("net/http.(*persistConn).writeLoop"), + ) } func TestNew(t *testing.T) { connections := 2 sessions := 10 - p, err := pool.New("amqp://admin:password@localhost:5672", connections, sessions, + p, err := pool.New(connectURL, connections, sessions, pool.WithName("TestNew"), pool.WithLogger(logging.NewTestLogger(t)), ) diff --git a/pool/publisher_test.go b/pool/publisher_test.go index 41f95b5..36d881c 100644 --- a/pool/publisher_test.go +++ b/pool/publisher_test.go @@ -14,7 +14,7 @@ import ( func TestPublisher(t *testing.T) { connections := 1 sessions := 10 // publisher sessions + consumer sessions - p, err := pool.New("amqp://admin:password@localhost:5672", + p, err := pool.New(connectURL, connections, sessions, pool.WithName("TestPublisher"), diff --git a/pool/session_pool_test.go b/pool/session_pool_test.go index 7ce0243..ec9a219 100644 --- a/pool/session_pool_test.go +++ b/pool/session_pool_test.go @@ -13,7 +13,7 @@ import ( func TestNewSessionPool(t *testing.T) { connections := 1 sessions := 10 - p, err := pool.NewConnectionPool("amqp://admin:password@localhost:5672", connections, + p, err := pool.NewConnectionPool(connectURL, connections, pool.ConnectionPoolWithName("TestNewConnectionPool"), pool.ConnectionPoolWithLogger(logging.NewTestLogger(t)), ) diff --git a/pool/session_test.go b/pool/session_test.go index 8103201..0d481de 100644 --- a/pool/session_test.go +++ b/pool/session_test.go @@ -15,10 +15,9 @@ import ( func TestNewSession(t *testing.T) { c, err := pool.NewConnection( - "amqp://admin:password@localhost:5672", + connectURL, "TestNewSession", pool.ConnectionWithLogger(logging.NewTestLogger(t)), - pool.ConnectionWithSlowClose(true), ) if err != nil { assert.NoError(t, err) @@ -135,10 +134,9 @@ func TestNewSession(t *testing.T) { func TestNewSessionDisconnect(t *testing.T) { c, err := pool.NewConnection( - "amqp://admin:password@localhost:5672", + connectURL, "TestNewSessionDisconnect", pool.ConnectionWithLogger(logging.NewTestLogger(t)), - pool.ConnectionWithSlowClose(true), ) if err != nil { assert.NoError(t, err) @@ -326,10 +324,9 @@ func TestNewSessionQueueDeclarePassive(t *testing.T) { }() c, err := pool.NewConnection( - "amqp://admin:password@localhost:5672", + connectURL, "TestNewSessionQueueDeclarePassive", pool.ConnectionWithLogger(logging.NewTestLogger(t)), - pool.ConnectionWithSlowClose(true), ) if err != nil { assert.NoError(t, err) diff --git a/pool/subscriber.go b/pool/subscriber.go index 8d5a7ae..0a10294 100644 --- a/pool/subscriber.go +++ b/pool/subscriber.go @@ -25,18 +25,33 @@ type Subscriber struct { wg sync.WaitGroup log logging.Logger + + closeTimeout time.Duration } -func (s *Subscriber) Close() { +func (s *Subscriber) Close() (err error) { s.debugSimple("closing subscriber...") defer s.infoSimple("closed") + ctx, cancel := context.WithTimeout(context.Background(), s.closeTimeout) + defer cancel() + for _, h := range s.handlers { + err = errors.Join(err, h.Pause(ctx)) + h.close() + } + + for _, bh := range s.batchHandlers { + err = errors.Join(err, bh.Pause(ctx)) + bh.close() + } + s.cancel() s.wg.Wait() if s.autoClosePool { s.pool.Close() } + return err } // Wait waits until all consumers have been closed. @@ -55,6 +70,7 @@ func NewSubscriber(p *Pool, options ...SubscriberOption) *Subscriber { option := subscriberOption{ Ctx: p.Context(), AutoClosePool: false, + CloseTimeout: 5 * time.Second, Logger: p.sp.log, // derive logger from session pool } @@ -68,9 +84,9 @@ func NewSubscriber(p *Pool, options ...SubscriberOption) *Subscriber { sub := &Subscriber{ pool: p, autoClosePool: option.AutoClosePool, - - ctx: ctx, - cancel: cancel, + closeTimeout: option.CloseTimeout, + ctx: ctx, + cancel: cancel, log: option.Logger, } @@ -159,13 +175,13 @@ func (s *Subscriber) RegisterBatchHandler(handler *BatchHandler) { defer s.mu.Unlock() s.batchHandlers = append(s.batchHandlers, handler) - view := handler.view() + opts := handler.Options() s.log.WithFields(withConsumerIfSet(handler.ConsumeOptions().ConsumerTag, map[string]any{ "subscriber": s.pool.Name(), - "queue": view.Queue, - "maxBatchSize": view.MaxBatchSize, // TODO: optimize so that we don't call getters multiple times (mutex contention) + "queue": opts.Queue, + "maxBatchSize": opts.MaxBatchSize, // TODO: optimize so that we don't call getters multiple times (mutex contention) "flushTimeout": handler.FlushTimeout, })).Info("registered batch message handler") } @@ -173,57 +189,65 @@ func (s *Subscriber) RegisterBatchHandler(handler *BatchHandler) { // Start starts the consumers for all registered handler functions // This method is not blocking. Use Wait() to wait for all routines to shut down // via context cancelation (e.g. via a signal) -func (s *Subscriber) Start() { +func (s *Subscriber) Start() (err error) { s.mu.Lock() defer s.mu.Unlock() - if s.started { panic("subscriber cannot be started more than once") } s.debugSimple("starting subscriber...") defer func() { - // after starting everything we want to set started to true - s.started = true - s.infoSimple("started") + if err != nil { + s.Close() + } else { + // after starting everything we want to set started to true + s.started = true + s.infoSimple("started") + } }() s.debugSimple(fmt.Sprintf("starting %d handler routine(s)", len(s.handlers))) - s.wg.Add(len(s.handlers)) for _, h := range s.handlers { + s.wg.Add(1) go s.consumer(h, &s.wg) + err = h.awaitResumed(s.ctx) + if err != nil { + return fmt.Errorf("failed to start consumer for queue %s: %w", h.Queue(), err) + } } s.debugSimple(fmt.Sprintf("starting %d batch handler routine(s)", len(s.batchHandlers))) - s.wg.Add(len(s.batchHandlers)) for _, bh := range s.batchHandlers { + s.wg.Add(1) go s.batchConsumer(bh, &s.wg) + err = bh.awaitResumed(s.ctx) + if err != nil { + return fmt.Errorf("failed to start batch consumer for queue %s: %w", bh.Queue(), err) + } } + return nil } func (s *Subscriber) consumer(h *Handler, wg *sync.WaitGroup) { defer wg.Done() - var err error - - // initialize all handler contexts - // to be in state resuming - err = h.start(s.ctx) + var err error + // trigger initial startup + // channel below + opts, err := h.start(s.ctx) if err != nil { - view := h.view() - s.error(view.ConsumerTag, view.Queue, err, "failed to start handler consumer") + s.error(opts.ConsumerTag, opts.Queue, err, "failed to start consumer") + return } - // close all contexts defer h.close() for { - // immutable view for every new loop iteration - hv := h.view() select { case <-s.catchShutdown(): return - case <-hv.resuming.Done(): - err = s.consume(hv) + case <-h.resuming().Done(): + err = s.consume(h) if errors.Is(err, ErrClosed) { return } @@ -231,52 +255,53 @@ func (s *Subscriber) consumer(h *Handler, wg *sync.WaitGroup) { } } -func (s *Subscriber) consume(view handlerView) (err error) { +func (s *Subscriber) consume(h *Handler) (err error) { + opts, err := h.start(s.ctx) + if err != nil { + return err + } + defer h.paused() - s.debugConsumer(view.ConsumerTag, "starting consumer...") + s.debugConsumer(opts.ConsumerTag, "starting consumer...") session, err := s.pool.GetSession() if err != nil { return err } defer func() { - if err == nil { - // no error - s.pool.ReturnSession(session, false) - s.infoConsumer(view.ConsumerTag, "closed") - } else if errors.Is(err, ErrClosed) { + if errors.Is(err, ErrClosed) { // graceful shutdown s.pool.ReturnSession(session, false) - s.infoConsumer(view.ConsumerTag, "closed") - } else { - select { - case <-view.pausing.Done(): - // expected closing due to context cancelation - // cancel errors the underlying channel - // A canceled session is an erred session. - s.pool.ReturnSession(session, true) - view.paused.Cancel() - s.infoConsumer(view.ConsumerTag, "paused") - default: - // actual error - s.pool.ReturnSession(session, true) - s.warnConsumer(view.ConsumerTag, err, "closed unexpectedly") - } + s.infoConsumer(opts.ConsumerTag, "closed") + return + } + + select { + case <-h.pausing().Done(): + // expected closing due to context cancelation + // cancel errors the underlying channel + // A canceled session is an erred session. + s.pool.ReturnSession(session, true) + s.infoConsumer(opts.ConsumerTag, "paused") + default: + // actual error + s.pool.ReturnSession(session, true) + s.warnConsumer(opts.ConsumerTag, err, "closed unexpectedly") } }() // got a working session delivery, err := session.ConsumeWithContext( - view.pausing.Context(), - view.Queue, - view.ConsumeOptions, + h.pausing().Context(), + opts.Queue, + opts.ConsumeOptions, ) if err != nil { return err } - view.resumed.Cancel() - s.infoConsumer(view.ConsumerTag, "started") + h.resumed() + s.infoConsumer(opts.ConsumerTag, "started") for { select { case <-s.catchShutdown(): @@ -286,17 +311,17 @@ func (s *Subscriber) consume(view handlerView) (err error) { return ErrDeliveryClosed } - s.infoHandler(view.ConsumerTag, msg.Exchange, msg.RoutingKey, view.Queue, "received message") - err = view.HandlerFunc(msg) - if view.AutoAck { + s.infoHandler(opts.ConsumerTag, msg.Exchange, msg.RoutingKey, opts.Queue, "received message") + err = opts.HandlerFunc(msg) + if opts.AutoAck { if err != nil { // we cannot really do anything to recover from a processing error in this case - s.errorHandler(view.ConsumerTag, msg.Exchange, msg.RoutingKey, view.Queue, fmt.Errorf("processing failed: dropping message: %w", err)) + s.errorHandler(opts.ConsumerTag, msg.Exchange, msg.RoutingKey, opts.Queue, fmt.Errorf("processing failed: dropping message: %w", err)) } else { - s.infoHandler(view.ConsumerTag, msg.Exchange, msg.RoutingKey, view.Queue, "processed message") + s.infoHandler(opts.ConsumerTag, msg.Exchange, msg.RoutingKey, opts.Queue, "processed message") } } else { - poolErr := s.ackPostHandle(view, msg.DeliveryTag, msg.Exchange, msg.RoutingKey, session, err) + poolErr := s.ackPostHandle(opts, msg.DeliveryTag, msg.Exchange, msg.RoutingKey, session, err) if poolErr != nil { return poolErr } @@ -306,7 +331,7 @@ func (s *Subscriber) consume(view handlerView) (err error) { } // (n)ack delivery and signal that message was processed by the service -func (s *Subscriber) ackPostHandle(view handlerView, deliveryTag uint64, exchange, routingKey string, session *Session, handlerErr error) (err error) { +func (s *Subscriber) ackPostHandle(opts HandlerView, deliveryTag uint64, exchange, routingKey string, session *Session, handlerErr error) (err error) { var ackErr error if handlerErr != nil { // requeue message if possible @@ -318,7 +343,7 @@ func (s *Subscriber) ackPostHandle(view handlerView, deliveryTag uint64, exchang // if (n)ack fails, we know that the connection died // potentially before processing already. if ackErr != nil { - s.warnHandler(view.ConsumerTag, exchange, routingKey, view.Queue, ackErr, "(n)ack failed") + s.warnHandler(opts.ConsumerTag, exchange, routingKey, opts.Queue, ackErr, "(n)ack failed") poolErr := session.Recover() if poolErr != nil { // only returns an error upon shutdown @@ -332,9 +357,9 @@ func (s *Subscriber) ackPostHandle(view handlerView, deliveryTag uint64, exchang // (n)acked successfully if handlerErr != nil { - s.infoHandler(view.ConsumerTag, exchange, routingKey, view.Queue, "nacked message") + s.infoHandler(opts.ConsumerTag, exchange, routingKey, opts.Queue, "nacked message") } else { - s.infoHandler(view.ConsumerTag, exchange, routingKey, view.Queue, "acked message") + s.infoHandler(opts.ConsumerTag, exchange, routingKey, opts.Queue, "acked message") } // successfully handled message return nil @@ -346,22 +371,20 @@ func (s *Subscriber) batchConsumer(h *BatchHandler, wg *sync.WaitGroup) { // initialize all handler contexts // to be in state resuming - err = h.start(s.ctx) + opts, err := h.start(s.ctx) if err != nil { - view := h.view() - s.error(view.ConsumerTag, view.Queue, err, "failed to start batch handler consumer") + s.error(opts.ConsumerTag, opts.Queue, err, "failed to start batch handler consumer") + return } // close all contexts defer h.close() for { - // immutable view for every new loop iteration - hv := h.view() select { case <-s.catchShutdown(): return - case <-hv.resuming.Done(): - err = s.batchConsume(hv) + case <-h.resuming().Done(): + err = s.batchConsume(h) if errors.Is(err, ErrClosed) { return } @@ -369,54 +392,56 @@ func (s *Subscriber) batchConsumer(h *BatchHandler, wg *sync.WaitGroup) { } } -func (s *Subscriber) batchConsume(view batchHandlerView) (err error) { - s.debugConsumer(view.ConsumerTag, "starting batch consumer...") +func (s *Subscriber) batchConsume(h *BatchHandler) (err error) { + opts, err := h.start(s.ctx) + if err != nil { + return err + } + defer h.paused() + + s.debugConsumer(opts.ConsumerTag, "starting batch consumer...") session, err := s.pool.GetSession() if err != nil { return err } defer func() { - if err == nil { - // no error - s.pool.ReturnSession(session, false) - s.infoConsumer(view.ConsumerTag, "closed") - } else if errors.Is(err, ErrClosed) { + if errors.Is(err, ErrClosed) { // graceful shutdown s.pool.ReturnSession(session, false) - s.infoConsumer(view.ConsumerTag, "closed") - } else { - select { - case <-view.pausing.Done(): - // expected closing due to context cancelation - // cancel errors the underlying channel - // A canceled session is an erred session. - s.pool.ReturnSession(session, true) - view.paused.Cancel() - s.infoConsumer(view.ConsumerTag, "paused") - default: - // actual error - s.pool.ReturnSession(session, true) - s.warnConsumer(view.ConsumerTag, err, "closed unexpectedly") - } + s.infoConsumer(opts.ConsumerTag, "closed") + return + } + + select { + case <-h.pausing().Done(): + // expected closing due to context cancelation + // cancel errors the underlying channel + // A canceled session is an erred session. + s.pool.ReturnSession(session, true) + s.infoConsumer(opts.ConsumerTag, "paused") + default: + // actual error + s.pool.ReturnSession(session, true) + s.warnConsumer(opts.ConsumerTag, err, "closed unexpectedly") } }() // got a working session delivery, err := session.ConsumeWithContext( - view.pausing.Context(), - view.Queue, - view.ConsumeOptions, + h.pausing().Context(), + opts.Queue, + opts.ConsumeOptions, ) if err != nil { return err } - view.resumed.Cancel() - s.infoConsumer(view.ConsumerTag, "started") + h.resumed() + s.infoConsumer(opts.ConsumerTag, "started") // preallocate memory for batch - batch := make([]Delivery, 0, maxi(1, view.MaxBatchSize)) + batch := make([]Delivery, 0, maxi(1, opts.MaxBatchSize)) defer func() { if len(batch) > 0 && errors.Is(err, ErrClosed) { // requeue all not yet processed messages in batch slice @@ -426,13 +451,13 @@ func (s *Subscriber) batchConsume(view batchHandlerView) (err error) { // There is no way to recover form this state in case an error is returned from the Nack call. nackErr := batch[len(batch)-1].Nack(true, true) if nackErr != nil { - s.warnBatchHandler(view.ConsumerTag, view.Queue, view.MaxBatchSize, err, "failed to nack and requeue batch upon shutdown") + s.warnBatchHandler(opts.ConsumerTag, opts.Queue, opts.MaxBatchSize, err, "failed to nack and requeue batch upon shutdown") } } }() var ( - timer = time.NewTimer(view.FlushTimeout) + timer = time.NewTimer(opts.FlushTimeout) drained = false ) defer closeTimer(timer, &drained) @@ -446,7 +471,7 @@ func (s *Subscriber) batchConsume(view batchHandlerView) (err error) { for { // reset the timer - resetTimer(timer, view.FlushTimeout, &drained) + resetTimer(timer, opts.FlushTimeout, &drained) select { case <-s.catchShutdown(): @@ -456,7 +481,7 @@ func (s *Subscriber) batchConsume(view batchHandlerView) (err error) { return ErrDeliveryClosed } batch = append(batch, msg) - if len(batch) == view.MaxBatchSize { + if len(batch) == opts.MaxBatchSize { break collectBatch } @@ -479,18 +504,18 @@ func (s *Subscriber) batchConsume(view batchHandlerView) (err error) { lastDeliveryTag = batch[len(batch)-1].DeliveryTag ) - s.infoBatchHandler(view.ConsumerTag, view.Queue, batchSize, "received batch") - err = view.HandlerFunc(batch) + s.infoBatchHandler(opts.ConsumerTag, opts.Queue, batchSize, "received batch") + err = opts.HandlerFunc(batch) // no acks required - if view.AutoAck { + if opts.AutoAck { if err != nil { // we cannot really do anything to recover from a processing error in this case - s.errorBatchHandler(view.ConsumerTag, view.Queue, batchSize, fmt.Errorf("processing failed: dropping batch: %w", err)) + s.errorBatchHandler(opts.ConsumerTag, opts.Queue, batchSize, fmt.Errorf("processing failed: dropping batch: %w", err)) } else { - s.infoBatchHandler(view.ConsumerTag, view.Queue, batchSize, "processed batch") + s.infoBatchHandler(opts.ConsumerTag, opts.Queue, batchSize, "processed batch") } } else { - poolErr := s.ackBatchPostHandle(view, lastDeliveryTag, batchSize, session, err) + poolErr := s.ackBatchPostHandle(opts, lastDeliveryTag, batchSize, session, err) if poolErr != nil { return poolErr } @@ -498,7 +523,7 @@ func (s *Subscriber) batchConsume(view batchHandlerView) (err error) { } } -func (s *Subscriber) ackBatchPostHandle(view batchHandlerView, lastDeliveryTag uint64, currentBatchSize int, session *Session, handlerErr error) (err error) { +func (s *Subscriber) ackBatchPostHandle(opts BatchHandlerView, lastDeliveryTag uint64, currentBatchSize int, session *Session, handlerErr error) (err error) { var ackErr error // processing failed if handlerErr != nil { @@ -512,7 +537,7 @@ func (s *Subscriber) ackBatchPostHandle(view batchHandlerView, lastDeliveryTag u // if (n)ack fails, we know that the connection died // potentially before processing already. if ackErr != nil { - s.warnBatchHandler(view.ConsumerTag, view.Queue, currentBatchSize, ackErr, "batch (n)ack failed") + s.warnBatchHandler(opts.ConsumerTag, opts.Queue, currentBatchSize, ackErr, "batch (n)ack failed") poolErr := session.Recover() if poolErr != nil { // only returns an error upon shutdown @@ -526,9 +551,9 @@ func (s *Subscriber) ackBatchPostHandle(view batchHandlerView, lastDeliveryTag u // (n)acked successfully if handlerErr != nil { - s.infoBatchHandler(view.ConsumerTag, view.Queue, currentBatchSize, "nacked batch") + s.infoBatchHandler(opts.ConsumerTag, opts.Queue, currentBatchSize, "nacked batch") } else { - s.infoBatchHandler(view.ConsumerTag, view.Queue, currentBatchSize, "acked batch") + s.infoBatchHandler(opts.ConsumerTag, opts.Queue, currentBatchSize, "acked batch") } // successfully handled message return nil diff --git a/pool/subscriber_batch_handler.go b/pool/subscriber_batch_handler.go index 5abe0cb..e09f99e 100644 --- a/pool/subscriber_batch_handler.go +++ b/pool/subscriber_batch_handler.go @@ -2,7 +2,6 @@ package pool import ( "context" - "fmt" "sync" "time" ) @@ -16,14 +15,13 @@ const ( // handler function and the queue name from which the handler fetches messages and processes those. // Additionally, the handler allows you to pause and resume processing from the provided queue. func NewBatchHandler(queue string, hf BatchHandlerFunc, options ...BatchHandlerOption) *BatchHandler { + if hf == nil { + panic("handlerFunc must not be nil") + } + // sane defaults - ctx := context.Background() h := &BatchHandler{ - parentCtx: ctx, - pausing: newCancelContext(ctx), - paused: newCancelContext(ctx), - resuming: newCancelContext(ctx), - resumed: newCancelContext(ctx), + sc: newStateContext(context.Background()), queue: queue, handlerFunc: hf, maxBatchSize: defaultMaxBatchSize, @@ -45,11 +43,9 @@ func NewBatchHandler(queue string, hf BatchHandlerFunc, options ...BatchHandlerO return h } -// BatchHandler is a struct that contains all parameter sneeded i order to register a batch handler function. +// BatchHandler is a struct that contains all parameters needed in order to register a batch handler function. type BatchHandler struct { - mu sync.Mutex - closed bool - + mu sync.Mutex queue string handlerFunc BatchHandlerFunc consumeOpts ConsumeOptions @@ -65,150 +61,76 @@ type BatchHandler struct { // when <= 0, will be set to 5s flushTimeout time.Duration - // untouched throughout the object's lifetime - parentCtx context.Context - - // canceled upon pausing - pausing *cancelContext - - // canceled upon resuming - resuming *cancelContext - - // back channel to handler - // called from consumer - paused *cancelContext + sc *stateContext +} - // back channel to handler - // called from consumer - resumed *cancelContext +func (h *BatchHandler) close() { + h.sc.Close() } // reset creates the initial state of the object // initial state is the transitional state resuming (= startup and resuming after pause) // the passed context is the parent context of all new contexts that spawn from this -func (h *BatchHandler) start(ctx context.Context) (err error) { +func (h *BatchHandler) start(ctx context.Context) (opts BatchHandlerView, err error) { h.mu.Lock() defer h.mu.Unlock() - defer func() { - if err != nil { - h.pausing.Cancel() - h.paused.Cancel() - h.resuming.Cancel() - h.resumed.Cancel() - } - }() - - h.parentCtx = ctx - - // reset contextx - err = h.pausing.Reset(h.parentCtx) - if err != nil { - return err - } - err = h.paused.Reset(h.parentCtx) - if err != nil { - return err - } - err = h.resuming.Reset(h.parentCtx) - if err != nil { - return err - } - err = h.resumed.Reset(h.parentCtx) - if err != nil { - return err - } - - // cancel last context to indicate the running state - h.resuming.Cancel() // called last - return nil + opts = h.optionsUnguarded() + err = h.sc.Start(ctx) + return opts, err } -// Pause allows to halt the processing of a queue after the processing has been started by the subscriber. -func (h *BatchHandler) Pause(ctx context.Context) error { +func (h *BatchHandler) Options() BatchHandlerView { h.mu.Lock() defer h.mu.Unlock() - err := h.resuming.Reset(h.parentCtx) - if err != nil { - return fmt.Errorf("%w: queue: %s: %v", ErrPauseFailed, h.queue, err) - } - err = h.resumed.Reset(h.parentCtx) - if err != nil { - return fmt.Errorf("%w: queue: %s: %v", ErrPauseFailed, h.queue, err) - } - err = h.pausing.CancelWithContext(ctx) // must be called last - if err != nil { - return fmt.Errorf("%w: queue: %s: %v", ErrPauseFailed, h.queue, err) - } + return h.optionsUnguarded() +} - select { - case <-h.paused.Done(): - // waid until paused - return nil - case <-ctx.Done(): - return fmt.Errorf("%w: queue: %s: %v", ErrPauseFailed, h.queue, ctx.Err()) +func (h *BatchHandler) optionsUnguarded() BatchHandlerView { + return BatchHandlerView{ + Queue: h.queue, + HandlerFunc: h.handlerFunc, + MaxBatchSize: h.maxBatchSize, + FlushTimeout: h.flushTimeout, + ConsumeOptions: h.consumeOpts, } } +// Pause allows to halt the processing of a queue after the processing has been started by the subscriber. +func (h *BatchHandler) Pause(ctx context.Context) error { + return h.sc.Pause(ctx) +} + +func (h *BatchHandler) pausing() doner { + return h.sc.Pausing() +} + +func (h *BatchHandler) paused() { + h.sc.Paused() +} + // Resume allows to continue the processing of a queue after it has been paused using Pause func (h *BatchHandler) Resume(ctx context.Context) error { - h.mu.Lock() - defer h.mu.Unlock() - err := h.pausing.Reset(h.parentCtx) - if err != nil { - return fmt.Errorf("%w: queue: %s: %v", ErrResumeFailed, h.queue, err) - } - err = h.paused.Reset(h.parentCtx) - if err != nil { - return fmt.Errorf("%w: queue: %s: %v", ErrResumeFailed, h.queue, err) - } + return h.sc.Resume(ctx) +} - err = h.resuming.CancelWithContext(h.parentCtx) // must be called last - if err != nil { - return fmt.Errorf("%w: queue: %s: %v", ErrResumeFailed, h.queue, err) - } +func (h *BatchHandler) resuming() doner { + return h.sc.Resuming() +} - select { - case <-h.resumed.Done(): - // waid until resumed - return nil - case <-ctx.Done(): - return fmt.Errorf("%w: queue: %s: %v", ErrResumeFailed, h.queue, ctx.Err()) - } +func (h *BatchHandler) resumed() { + h.sc.Resumed() } func (h *BatchHandler) IsActive(ctx context.Context) (active bool, err error) { - h.mu.Lock() - defer h.mu.Unlock() - if h.closed { - return false, nil - } - - select { - case <-h.resumed.Done(): - return true, nil - case <-h.paused.Done(): - return false, nil - case <-ctx.Done(): - return false, fmt.Errorf("failed to check state of handler of queue %q: %w", h.queue, ctx.Err()) - } + return h.sc.IsActive(ctx) } -func (h *BatchHandler) view() batchHandlerView { - h.mu.Lock() - defer h.mu.Unlock() - - return batchHandlerView{ - pausing: h.pausing, // front channel handler -> consumer - paused: h.paused, // back channel consumer -> handler - resuming: h.resuming, // front channel handler -> consumer - resumed: h.resumed, // back channel consumer-> handler +func (h *BatchHandler) awaitResumed(ctx context.Context) error { + return h.sc.AwaitResumed(ctx) +} - Queue: h.queue, - HandlerFunc: h.handlerFunc, - MaxBatchSize: h.maxBatchSize, - FlushTimeout: h.flushTimeout, - ConsumeOptions: h.consumeOpts, - } +func (h *BatchHandler) awaitPaused(ctx context.Context) error { + return h.sc.AwaitPaused(ctx) } func (h *BatchHandler) Queue() string { @@ -276,16 +198,3 @@ func (h *BatchHandler) SetFlushTimeout(flushTimeout time.Duration) { } h.flushTimeout = flushTimeout } - -// close closes all active contexts -// in order to prevent dangling goroutines -func (h *BatchHandler) close() { - h.mu.Lock() - defer h.mu.Unlock() - - h.pausing.Cancel() - h.paused.Cancel() - h.resuming.Cancel() - h.resumed.Cancel() - h.closed = true -} diff --git a/pool/subscriber_batch_handler_view.go b/pool/subscriber_batch_handler_view.go index 6695e6a..ffe4afa 100644 --- a/pool/subscriber_batch_handler_view.go +++ b/pool/subscriber_batch_handler_view.go @@ -4,16 +4,8 @@ import ( "time" ) -// batchHandlerView is a read only snapshot of the current handler's configuration and runtime state. -// This internal data structure is used in the corresponsing consumer. -type batchHandlerView struct { - // called in the consumer function & wrapper - pausing done - paused cancel - - resuming done - resumed cancel - +// BatchHandlerView is a read only snapshot of the current handler's configuration. +type BatchHandlerView struct { Queue string HandlerFunc BatchHandlerFunc MaxBatchSize int diff --git a/pool/subscriber_handler.go b/pool/subscriber_handler.go index 3f016ba..70524d2 100644 --- a/pool/subscriber_handler.go +++ b/pool/subscriber_handler.go @@ -3,7 +3,6 @@ package pool import ( "context" "errors" - "fmt" "sync" ) @@ -34,174 +33,95 @@ func NewHandler(queue string, hf HandlerFunc, option ...ConsumeOptions) *Handler copt = option[0] } - ctx := context.Background() h := &Handler{ - parentCtx: ctx, - pausing: newCancelContext(ctx), - paused: newCancelContext(ctx), - resuming: newCancelContext(ctx), - resumed: newCancelContext(ctx), queue: queue, handlerFunc: hf, consumeOpts: copt, + sc: newStateContext(context.Background()), } - // initial state is paused - h.pausing.Cancel() - h.paused.Cancel() - return h } // Handler is a struct that contains all parameters needed in order to register a handler function // to the provided queue. Additionally, the handler allows you to pause and resume processing or messages. type Handler struct { - mu sync.Mutex - closed bool - + mu sync.Mutex queue string handlerFunc HandlerFunc consumeOpts ConsumeOptions - // untouched throughout the object's lifetime - parentCtx context.Context - - // canceled upon pausing - pausing *cancelContext - - // canceled upon resuming - resuming *cancelContext - - // back channel to handler - // called from consumer - paused *cancelContext + // not guarded by mutex + sc *stateContext +} - // back channel to handler - // called from consumer - resumed *cancelContext +func (h *Handler) close() { + h.sc.Close() } // reset creates the initial state of the object // initial state is the transitional state resuming (= startup and resuming after pause) -// the passed context is the parent context of all new contexts that spawn from this -func (h *Handler) start(ctx context.Context) (err error) { +// the passed context is the parent context of all new contexts that spawn from this. +// After start has been called, all contexts are alive except for the resuming context which is canceled by default. +func (h *Handler) start(ctx context.Context) (opts HandlerView, err error) { h.mu.Lock() defer h.mu.Unlock() - defer func() { - if err != nil { - h.pausing.Cancel() - h.paused.Cancel() - h.resuming.Cancel() - h.resumed.Cancel() - } - }() + opts = h.optionsUnguarded() + err = h.sc.Start(ctx) + return opts, err - // reset contextx - err = h.pausing.Reset(h.parentCtx) - if err != nil { - return err - } - err = h.paused.Reset(h.parentCtx) - if err != nil { - return err - } - err = h.resuming.Reset(h.parentCtx) - if err != nil { - return err - } - err = h.resumed.Reset(h.parentCtx) - if err != nil { - return err - } - - // cancel last context to indicate the running state - h.resuming.Cancel() // called last - return nil } -// Pause allows to halt the processing of a queue after the processing has been started by the subscriber. -func (h *Handler) Pause(ctx context.Context) error { +func (h *Handler) Options() HandlerView { h.mu.Lock() defer h.mu.Unlock() - err := h.resuming.Reset(h.parentCtx) - if err != nil { - return fmt.Errorf("%w: queue: %s: %v", ErrPauseFailed, h.queue, err) - } - err = h.resumed.Reset(h.parentCtx) - if err != nil { - return fmt.Errorf("%w: queue: %s: %v", ErrPauseFailed, h.queue, err) - } - err = h.pausing.CancelWithContext(ctx) // must be called last - if err != nil { - return fmt.Errorf("%w: queue: %s: %v", ErrPauseFailed, h.queue, err) - } + return h.optionsUnguarded() +} - select { - case <-h.paused.Done(): - // waid until paused - return nil - case <-ctx.Done(): - return fmt.Errorf("%w: queue: %s: %v", ErrPauseFailed, h.queue, ctx.Err()) +func (h *Handler) optionsUnguarded() HandlerView { + return HandlerView{ + Queue: h.queue, + HandlerFunc: h.handlerFunc, + ConsumeOptions: h.consumeOpts, } } +// Pause allows to halt the processing of a queue after the processing has been started by the subscriber. +func (h *Handler) Pause(ctx context.Context) error { + return h.sc.Pause(ctx) +} + +func (h *Handler) pausing() doner { + return h.sc.Pausing() +} + +func (h *Handler) paused() { + h.sc.Paused() +} + // Resume allows to continue the processing of a queue after it has been paused using Pause func (h *Handler) Resume(ctx context.Context) error { - h.mu.Lock() - defer h.mu.Unlock() - err := h.pausing.Reset(h.parentCtx) - if err != nil { - return fmt.Errorf("%w: queue: %s: %v", ErrResumeFailed, h.queue, err) - } - err = h.paused.Reset(h.parentCtx) - if err != nil { - return fmt.Errorf("%w: queue: %s: %v", ErrResumeFailed, h.queue, err) - } + return h.sc.Resume(ctx) +} - err = h.resuming.CancelWithContext(h.parentCtx) // must be called last - if err != nil { - return fmt.Errorf("%w: queue: %s: %v", ErrResumeFailed, h.queue, err) - } +func (h *Handler) resuming() doner { + return h.sc.Resuming() +} - select { - case <-h.resumed.Done(): - // waid until resumed - return nil - case <-ctx.Done(): - return fmt.Errorf("%w: queue: %s: %v", ErrResumeFailed, h.queue, ctx.Err()) - } +func (h *Handler) resumed() { + h.sc.Resumed() } func (h *Handler) IsActive(ctx context.Context) (active bool, err error) { - h.mu.Lock() - defer h.mu.Unlock() - if h.closed { - return false, nil - } - - select { - case <-h.resumed.Done(): - return true, nil - case <-h.paused.Done(): - return false, nil - case <-ctx.Done(): - return false, fmt.Errorf("failed to check state of handler of queue %q: %w", h.queue, ctx.Err()) - } + return h.sc.IsActive(ctx) } -func (h *Handler) view() handlerView { - h.mu.Lock() - defer h.mu.Unlock() +func (h *Handler) awaitResumed(ctx context.Context) error { + return h.sc.AwaitResumed(ctx) +} - return handlerView{ - pausing: h.pausing, // front channel handler -> consumer - paused: h.paused, // back channel consumer -> handler - resuming: h.resuming, // front channel handler -> consumer - resumed: h.resumed, // back channel consumer-> handler - Queue: h.queue, - HandlerFunc: h.handlerFunc, - ConsumeOptions: h.consumeOpts, - } +func (h *Handler) awaitPaused(ctx context.Context) error { + return h.sc.AwaitPaused(ctx) } func (h *Handler) Queue() string { @@ -242,16 +162,3 @@ func (h *Handler) SetConsumeOptions(consumeOpts ConsumeOptions) { defer h.mu.Unlock() h.consumeOpts = consumeOpts } - -// close closes all active contexts -// in order to prevent dangling goroutines -func (h *Handler) close() { - h.mu.Lock() - defer h.mu.Unlock() - - h.pausing.Cancel() - h.paused.Cancel() - h.resuming.Cancel() - h.resumed.Cancel() - h.closed = true -} diff --git a/pool/subscriber_handler_view.go b/pool/subscriber_handler_view.go index 424348c..7ecb5f1 100644 --- a/pool/subscriber_handler_view.go +++ b/pool/subscriber_handler_view.go @@ -1,15 +1,8 @@ package pool -// handlerView is a read only snapshot of the current handler's configuration and runtime state. +// HandlerView is a read only snapshot of the current handler's configuration. // This internal data structure is used in the corresponsing consumer. -type handlerView struct { - // called in the consumer function & wrapper - pausing done - paused cancel - - resuming done - resumed cancel - +type HandlerView struct { Queue string HandlerFunc HandlerFunc ConsumeOptions diff --git a/pool/subscriber_options.go b/pool/subscriber_options.go index 5adba3e..03a2fc8 100644 --- a/pool/subscriber_options.go +++ b/pool/subscriber_options.go @@ -2,6 +2,7 @@ package pool import ( "context" + "time" "github.com/jxsl13/amqpx/logging" ) @@ -10,6 +11,8 @@ type subscriberOption struct { Ctx context.Context AutoClosePool bool + CloseTimeout time.Duration + Logger logging.Logger } @@ -32,3 +35,13 @@ func SubscriberWithAutoClosePool(autoClose bool) SubscriberOption { co.AutoClosePool = autoClose } } + +func SubscriberWitCloseTimeout(timeout time.Duration) SubscriberOption { + return func(co *subscriberOption) { + if timeout <= 0 { + co.CloseTimeout = 5 * time.Second + } else { + co.CloseTimeout = timeout + } + } +} diff --git a/pool/subscriber_test.go b/pool/subscriber_test.go index 836d7bc..986ad4c 100644 --- a/pool/subscriber_test.go +++ b/pool/subscriber_test.go @@ -15,7 +15,7 @@ import ( func TestSubscriber(t *testing.T) { sessions := 2 // publisher sessions + consumer sessions - p, err := pool.New("amqp://admin:password@localhost:5672", 1, sessions, pool.WithConfirms(true), pool.WithLogger(logging.NewTestLogger(t))) + p, err := pool.New(connectURL, 1, sessions, pool.WithConfirms(true), pool.WithLogger(logging.NewTestLogger(t))) if err != nil { assert.NoError(t, err) return @@ -123,7 +123,7 @@ func TestBatchSubscriber(t *testing.T) { numMessages = 50 batchTimeout = 10 * time.Second // keep this at a higher number for slow machines ) - p, err := pool.New("amqp://admin:password@localhost:5672", 1, sessions, pool.WithConfirms(true), pool.WithLogger(logging.NewTestLogger(t))) + p, err := pool.New(connectURL, 1, sessions, pool.WithConfirms(true), pool.WithLogger(logging.NewTestLogger(t))) if err != nil { assert.NoError(t, err) return