Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

the context update #51

Merged
merged 76 commits into from
Mar 15, 2024
Merged

the context update #51

merged 76 commits into from
Mar 15, 2024

Conversation

jxsl13
Copy link
Owner

@jxsl13 jxsl13 commented Feb 26, 2024

  • add context to every session method, every start method
  • remove queue and replace with buffered channel
  • add context to TopologyFunc and to (Batch)HandlerFunc,
  • add special errors for rejecting messages
  • rework flow control handling (WIP)
  • improve session/connection pool closing
  • remove publish/await timeouts and replace with the single context that's passed to publish
  • and more ...

- add context to every sesion methos, every start method
- remove queue and replace with buffered channel
- add context to TopoloyFunc and to HandlerFunc,
- add special errors for rejecting messages
- rework flow control handling (WIP)
- improve pool closing
- and more ...
- remove publish/await timeouts and replace with the single context that's passed to publish
@jxsl13
Copy link
Owner Author

jxsl13 commented Feb 27, 2024

TODO:

  • move context.Canceled and context.DeadlineExceeded into central recoverable function as not recoverable errors.

Add options to add additional not recoverable errors.

@jxsl13
Copy link
Owner Author

jxsl13 commented Feb 28, 2024

TODO:

/Users/john/Desktop/Development/amqpx/connection.go:375: time=2024-02-28 23:41:55.244805 +0100 CET m=+0.122429876 level=info, msg=closed, connection=amqpx-pub-cached-connection-0
    /Users/john/Desktop/Development/amqpx/connection_pool.go:302: time=2024-02-28 23:41:55.244871 +0100 CET m=+0.122495543 level=info, msg=closed, connectionPool=amqpx-pub
    /Users/john/Desktop/Development/amqpx/amqpx_test.go:272:
        	Error Trace:	/Users/john/Desktop/Development/amqpx/amqpx_test.go:272
        	Error:      	Received unexpected error:
        	            	"queue-01" does not exist but is supposed to be deleted: context canceled
        	Test:       	TestAMQPXSubAndPubMulti
--- FAIL: TestAMQPXSubAndPubMulti (0.12s)

internal/testutils/rand.go Dismissed Show dismissed Hide dismissed
internal/testutils/rand.go Dismissed Show dismissed Hide dismissed
internal/testutils/jitter.go Dismissed Show dismissed Hide dismissed
pool/session.go Fixed Show fixed Hide fixed
pool/session.go Fixed Show fixed Hide fixed
@jxsl13
Copy link
Owner Author

jxsl13 commented Mar 14, 2024

 go test -timeout 900s -race -count=1 -covermode=atomic -coverprofile=coverage.txt ./...
  shell: /usr/bin/bash -e {0}
  env:
    CODEQL_ACTION_FEATURE_MULTI_LANGUAGE: false
    CODEQL_ACTION_FEATURE_SANDWICH: false
    CODEQL_ACTION_FEATURE_SARIF_COMBINE: true
    CODEQL_ACTION_FEATURE_WILL_UPLOAD: true
    CODEQL_ACTION_VERSION: 3.24.7
    CODEQL_ACTION_ANALYSIS_KEY: .github/workflows/test.yaml:test
    CODEQL_WORKFLOW_STARTED_AT: 2024-03-14T15:13:44.971Z
    CODEQL_UPLOAD_SARIF__GITHUB_WORKFLOWS_TEST_YAML_TEST_GO_VERSION_OLDSTABLE_PLATFORM_UBUNTU_LATEST__GOSEC: CODEQL_UPLOAD_SARIF__GITHUB_WORKFLOWS_TEST_YAML_TEST_GO_VERSION_OLDSTABLE_PLATFORM_UBUNTU_LATEST__GOSEC
?   	github.com/jxsl13/amqpx/logging	[no test files]
ok  	github.com/jxsl13/amqpx	26.787s	coverage: 60.7% of statements
ok  	github.com/jxsl13/amqpx/internal/testutils	1.022s	coverage: 10.0% of statements
coverage: 62.9% of statements
panic: test timed out after 15m0s
running tests:
	TestBatchSubscriberMaxBytes (15m0s)

goroutine 7192 [running]:
testing.(*M).startAlarm.func1()
	/opt/hostedtoolcache/go/1.21.8/x64/src/testing/testing.go:2259 +0x259
created by time.goFunc
	/opt/hostedtoolcache/go/1.21.8/x64/src/time/sleep.go:176 +0x45

goroutine 1 [chan receive, 12 minutes]:
testing.tRunner.func1()
	/opt/hostedtoolcache/go/1.21.8/x64/src/testing/testing.go:1561 +0x9ac
testing.tRunner(0xc000106b60, 0xc0001798f8)
	/opt/hostedtoolcache/go/1.21.8/x64/src/testing/testing.go:1601 +0x295
testing.runTests(0xc00011cfa0?, {0xd0c880, 0x21, 0x21}, {0x442dc7?, 0xc0001799d0?, 0xd122a0?})
	/opt/hostedtoolcache/go/1.21.8/x64/src/testing/testing.go:2052 +0x8ae
testing.(*M).Run(0xc00011cfa0)
	/opt/hostedtoolcache/go/1.21.8/x64/src/testing/testing.go:1925 +0xcd8
go.uber.org/goleak.VerifyTestMain({0xa936a0, 0xc00011cfa0}, {0xc000179e18, 0x3, 0x3})
	/home/runner/go/pkg/mod/go.uber.org/[email protected]/testmain.go:53 +0x65
github.com/jxsl13/amqpx/internal/testutils.VerifyLeak(0x9da95c?)
	/home/runner/work/amqpx/amqpx/internal/testutils/leak.go:10 +0x325
github.com/jxsl13/amqpx/pool_test.TestMain(...)
	/home/runner/work/amqpx/amqpx/pool/pool_test.go:16
main.main()
	_testmain.go:149 +0x329

goroutine 67 [semacquire, 15 minutes]:
sync.runtime_Semacquire(0xc000204008?)
	/opt/hostedtoolcache/go/1.21.8/x64/src/runtime/sema.go:62 +0x25
sync.(*WaitGroup).Wait(0xc000204000)
	/opt/hostedtoolcache/go/1.21.8/x64/src/sync/waitgroup.go:116 +0xa5
github.com/jxsl13/amqpx/pool_test.TestBatchSubscriberMaxBytes(0xc0001b1860)
	/home/runner/work/amqpx/amqpx/pool/subscriber_test.go:163 +0x175
testing.tRunner(0xc0001b1860, 0xa10b30)
	/opt/hostedtoolcache/go/1.21.8/x64/src/testing/testing.go:1595 +0x262
created by testing.(*T).Run in goroutine 1
	/opt/hostedtoolcache/go/1.21.8/x64/src/testing/testing.go:1648 +0x846

goroutine 87 [chan receive, 14 minutes]:
github.com/jxsl13/amqpx/pool.(*SessionPool).Close(0xc00023eb40)
	/home/runner/work/amqpx/amqpx/pool/session_pool.go:289 +0x269
github.com/jxsl13/amqpx/pool.(*Pool).Close(0xc000376fc0)
	/home/runner/work/amqpx/amqpx/pool/pool.go:77 +0x8b
github.com/jxsl13/amqpx/pool_test.testBatchSubscriberMaxBytes(0xc0001b1860, 0x3f, 0xc000204000)
	/home/runner/work/amqpx/amqpx/pool/subscriber_test.go:308 +0x68a
created by github.com/jxsl13/amqpx/pool_test.TestBatchSubscriberMaxBytes in goroutine 67
	/home/runner/work/amqpx/amqpx/pool/subscriber_test.go:160 +0x6d

goroutine 695 [IO wait]:
internal/poll.runtime_pollWait(0x756ef2184508, 0x72)
	/opt/hostedtoolcache/go/1.21.8/x64/src/runtime/netpoll.go:343 +0x85
internal/poll.(*pollDesc).wait(0xc0003181a0, 0xc0001cd000?, 0x0)
	/opt/hostedtoolcache/go/1.21.8/x64/src/internal/poll/fd_poll_runtime.go:84 +0xb1
internal/poll.(*pollDesc).waitRead(...)
	/opt/hostedtoolcache/go/1.21.8/x64/src/internal/poll/fd_poll_runtime.go:89
internal/poll.(*FD).Read(0xc000318180, {0xc0001cd000, 0x1000, 0x1000})
	/opt/hostedtoolcache/go/1.21.8/x64/src/internal/poll/fd_unix.go:164 +0x405
net.(*netFD).Read(0xc000318180, {0xc0001cd000, 0x1000, 0x1000})
	/opt/hostedtoolcache/go/1.21.8/x64/src/net/fd_posix.go:55 +0x4b
net.(*conn).Read(0xc000054060, {0xc0001cd000, 0x1000, 0x1000})
	/opt/hostedtoolcache/go/1.21.8/x64/src/net/net.go:179 +0xad
bufio.(*Reader).Read(0xc00042f380, {0xc000316279, 0x7, 0x7})
	/opt/hostedtoolcache/go/1.21.8/x64/src/bufio/bufio.go:244 +0x4be
io.ReadAtLeast({0xa93180, 0xc00042f380}, {0xc000316279, 0x7, 0x7}, 0x7)
	/opt/hostedtoolcache/go/1.21.8/x64/src/io/io.go:335 +0xd0
io.ReadFull(...)
	/opt/hostedtoolcache/go/1.21.8/x64/src/io/io.go:354
github.com/rabbitmq/amqp091-go.(*reader).ReadFrame(0xc0000dbf18)
	/home/runner/go/pkg/mod/github.com/rabbitmq/[email protected]/read.go:49 +0x98
github.com/rabbitmq/amqp091-go.(*Connection).reader(0xc0000f2160, {0xa93500?, 0xc000054060})
	/home/runner/go/pkg/mod/github.com/rabbitmq/[email protected]/connection.go:726 +0x2f2
created by github.com/rabbitmq/amqp091-go.Open in goroutine 301
	/home/runner/go/pkg/mod/github.com/rabbitmq/[email protected]/connection.go:271 +0x67a

goroutine 364 [select]:
github.com/rabbitmq/amqp091-go.(*Connection).heartbeater(0xc0003c06e0, 0x1bf08eb00, 0xc0003225a0)
	/home/runner/go/pkg/mod/github.com/rabbitmq/[email protected]/connection.go:761 +0x28d
created by github.com/rabbitmq/amqp091-go.(*Connection).openTune in goroutine 87
	/home/runner/go/pkg/mod/github.com/rabbitmq/[email protected]/connection.go:1016 +0xbac

goroutine 606 [select]:
github.com/rabbitmq/amqp091-go.(*Connection).heartbeater(0xc0000f2160, 0x1bf08eb00, 0xc00042f3e0)
	/home/runner/go/pkg/mod/github.com/rabbitmq/[email protected]/connection.go:761 +0x28d
created by github.com/rabbitmq/amqp091-go.(*Connection).openTune in goroutine 301
	/home/runner/go/pkg/mod/github.com/rabbitmq/[email protected]/connection.go:1016 +0xbac

goroutine 423 [IO wait]:
internal/poll.runtime_pollWait(0x756ef2184220, 0x72)
	/opt/hostedtoolcache/go/1.21.8/x64/src/runtime/netpoll.go:343 +0x85
internal/poll.(*pollDesc).wait(0xc0002d42a0, 0xc0003cb000?, 0x0)
	/opt/hostedtoolcache/go/1.21.8/x64/src/internal/poll/fd_poll_runtime.go:84 +0xb1
internal/poll.(*pollDesc).waitRead(...)
	/opt/hostedtoolcache/go/1.21.8/x64/src/internal/poll/fd_poll_runtime.go:89
internal/poll.(*FD).Read(0xc0002d4280, {0xc0003cb000, 0x1000, 0x1000})
	/opt/hostedtoolcache/go/1.21.8/x64/src/internal/poll/fd_unix.go:164 +0x405
net.(*netFD).Read(0xc0002d4280, {0xc0003cb000, 0x1000, 0x1000})
	/opt/hostedtoolcache/go/1.21.8/x64/src/net/fd_posix.go:55 +0x4b
net.(*conn).Read(0xc0002020c8, {0xc0003cb000, 0x1000, 0x1000})
	/opt/hostedtoolcache/go/1.21.8/x64/src/net/net.go:179 +0xad
bufio.(*Reader).Read(0xc0003b88a0, {0xc000316229, 0x7, 0x7})
	/opt/hostedtoolcache/go/1.21.8/x64/src/bufio/bufio.go:244 +0x4be
io.ReadAtLeast({0xa93180, 0xc0003b88a0}, {0xc000316229, 0x7, 0x7}, 0x7)
	/opt/hostedtoolcache/go/1.21.8/x64/src/io/io.go:335 +0xd0
io.ReadFull(...)
	/opt/hostedtoolcache/go/1.21.8/x64/src/io/io.go:354
github.com/rabbitmq/amqp091-go.(*reader).ReadFrame(0xc000325f18)
	/home/runner/go/pkg/mod/github.com/rabbitmq/[email protected]/read.go:49 +0x98
github.com/rabbitmq/amqp091-go.(*Connection).reader(0xc0003c06e0, {0xa93500?, 0xc0002020c8})
	/home/runner/go/pkg/mod/github.com/rabbitmq/[email protected]/connection.go:726 +0x2f2
created by github.com/rabbitmq/amqp091-go.Open in goroutine 87
	/home/runner/go/pkg/mod/github.com/rabbitmq/[email protected]/connection.go:271 +0x67a

goroutine 2376 [syscall, 13 minutes]:
os/signal.signal_recv()
	/opt/hostedtoolcache/go/1.21.8/x64/src/runtime/sigqueue.go:152 +0x29
os/signal.loop()
	/opt/hostedtoolcache/go/1.21.8/x64/src/os/signal/signal_unix.go:23 +0x1d
created by os/signal.Notify.func1.1 in goroutine 37
	/opt/hostedtoolcache/go/1.21.8/x64/src/os/signal/signal.go:151 +0x47
FAIL	github.com/jxsl13/amqpx/pool	900.075s
FAIL
Error: Process completed with exit code 1.

@jxsl13
Copy link
Owner Author

jxsl13 commented Mar 14, 2024

22000 INFO: session passed out at: github.com/jxsl13/amqpx/pool.(*Publisher).publish
1100 INFO: session passed out at: github.com/jxsl13/amqpx/pool.(*Subscriber).batchConsume
22000 INFO: session returned at: github.com/jxsl13/amqpx/pool.(*Publisher).publish.func2
1035 INFO: session returned at: github.com/jxsl13/amqpx/pool.(*Subscriber).returnSession

@jxsl13
Copy link
Owner Author

jxsl13 commented Mar 15, 2024

Exception (320) Reason: "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'"

@jxsl13 jxsl13 changed the title [WIP] the context update the context update Mar 15, 2024
@jxsl13 jxsl13 marked this pull request as ready for review March 15, 2024 19:52
@jxsl13 jxsl13 merged commit 469f1f9 into main Mar 15, 2024
5 checks passed
@jxsl13 jxsl13 deleted the feat/the-context-update branch March 15, 2024 20:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant