Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Julia NATS.jl client messaging examples #275

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/deploy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ jobs:
uses: actions/checkout@v4
with:
ref: ${{ github.head_ref }}
repository: ${{ github.event.pull_request.head.repo.full_name }}

- name: Setup Python
uses: actions/setup-python@v4
Expand Down
3 changes: 3 additions & 0 deletions cmd/nbe/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ var (
Elixir,
Crystal,
C,
Julia,
}
)

Expand Down Expand Up @@ -403,6 +404,8 @@ func chromaFormat(code, lang string) (string, error) {
lang = "js"
case DotNet, CSharp:
lang = "cs"
case Julia:
lang = "jl"
}

lexer := lexers.Get(lang)
Expand Down
7 changes: 6 additions & 1 deletion cmd/nbe/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const (
Ruby = "ruby"
Elixir = "elixir"
Crystal = "crystal"
Julia = "julia"
)

var (
Expand All @@ -49,6 +50,7 @@ var (
Ruby: "Ruby",
Elixir: "Elixir",
Crystal: "Crystal",
Julia: "Julia",
}

// TODO: add more as they become supported..
Expand All @@ -66,6 +68,7 @@ var (
CSharp: "Main.cs",
Elixir: "main.exs",
C: "main.c",
Julia: "main.jl",
}

languageMultiCommentDelims = map[string][2]string{
Expand All @@ -78,6 +81,7 @@ var (
Deno: {"/*", "*/"},
WebSocket: {"/*", "*/"},
C: {"/*", "*/"},
Julia: {"#=", "=#"},
}

languageLineCommentDelim = map[string]string{
Expand All @@ -95,6 +99,7 @@ var (
Ruby: "#",
Elixir: "#",
Crystal: "#",
Julia: "#",
}
)

Expand Down Expand Up @@ -214,7 +219,7 @@ func parseLineType(lang, line string) LineType {
}
return NormalLine

case Python, Ruby, Elixir, Crystal:
case Python, Ruby, Elixir, Crystal, Julia:
if hashLineCommentRe.MatchString(line) {
return SingleCommentLine
}
Expand Down
7 changes: 7 additions & 0 deletions docker/julia/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
FROM julia:1.10

COPY . .

RUN julia -e 'using Pkg; Pkg.add("NATS"); Pkg.add("JSON3")'

CMD ["julia", "main.jl"]
27 changes: 27 additions & 0 deletions examples/messaging/concurrent/julia/main.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Import NATS.jl package.
using NATS;

# Get the passed NATS_URL or fallback to the default. This can be
# a comma-separated string.
url = get(ENV, "NATS_URL", NATS.DEFAULT_CONNECT_URL)
@info "NATS server url is $url"

# Create a client connection to an available NATS server.
nc = NATS.connect(url)

# Subscribe to a subject and start waiting for messages in the background and
# start processing each message in a separate task. Handler sleeps for some random
# time up to 1 second to simulate doing some work.
subscribe(nc, "numbers"; spawn = true) do msg
sleep(rand())
@info "Received $(payload(msg))"
end

# Publish 50 messages at the same time. Despite expected total processing time is 25
# seconds due to parallel setup of subscription everything completes in less than second.
for i in 1:50
publish(nc, "numbers", string(i))
end

# Close connection.
drain(nc)
44 changes: 44 additions & 0 deletions examples/messaging/concurrent/julia/output.cast
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
{"version": 2, "width": 115, "height": 13, "timestamp": 1737200504, "env": {"SHELL": "/bin/zsh", "TERM": "xterm-256color"}, "title": "NATS by Example: messaging/concurrent/julia"}
[2.275171, "o", "[ Info: NATS server url is nats://nats:4222\r\n"]
[6.232629, "o", "[ Info: Received 24\r\n[ Info: Received 50\r\n[ Info: Received 5\r\n[ Info: Received 22\r\n"]
[6.252993, "o", "[ Info: Received 40\r\n"]
[6.317452, "o", "[ Info: Received 38\r\n"]
[6.359956, "o", "[ Info: Received 17\r\n"]
[6.367503, "o", "[ Info: Received 42\r\n"]
[6.373008, "o", "[ Info: Received 9\r\n"]
[6.376266, "o", "[ Info: Received 36\r\n"]
[6.382009, "o", "[ Info: Received 19\r\n"]
[6.402026, "o", "[ Info: Received 16\r\n"]
[6.416241, "o", "[ Info: Received 6\r\n"]
[6.429051, "o", "[ Info: Received 21\r\n"]
[6.431868, "o", "[ Info: Received 8\r\n"]
[6.457978, "o", "[ Info: Received 34\r\n"]
[6.468837, "o", "[ Info: Received 32\r\n"]
[6.508974, "o", "[ Info: Received 27\r\n"]
[6.522829, "o", "[ Info: Received 11\r\n"]
[6.547043, "o", "[ Info: Received 28\r\n"]
[6.557111, "o", "[ Info: Received 35\r\n"]
[6.575991, "o", "[ Info: Received 47\r\n"]
[6.595751, "o", "[ Info: Received 43\r\n"]
[6.597836, "o", "[ Info: Received 7\r\n"]
[6.649043, "o", "[ Info: Received 14\r\n"]
[6.652984, "o", "[ Info: Received 25\r\n"]
[6.694925, "o", "[ Info: Received 20\r\n"]
[6.706942, "o", "[ Info: Received 26\r\n"]
[6.763178, "o", "[ Info: Received 10\r\n[ Info: Received 4\r\n"]
[6.768228, "o", "[ Info: Received 45\r\n"]
[6.802965, "o", "[ Info: Received 23\r\n"]
[6.806929, "o", "[ Info: Received 31\r\n"]
[6.821213, "o", "[ Info: Received 33\r\n"]
[6.856241, "o", "[ Info: Received 2\r\n"]
[6.902989, "o", "[ Info: Received 48\r\n"]
[6.926514, "o", "[ Info: Received 3\r\n"]
[6.94227, "o", "[ Info: Received 46\r\n"]
[6.954194, "o", "[ Info: Received 15\r\n"]
[6.960372, "o", "[ Info: Received 29\r\n[ Info: Received 30\r\n"]
[6.973414, "o", "[ Info: Received 18\r\n[ Info: Received 41\r\n"]
[6.986722, "o", "[ Info: Received 49\r\n"]
[7.042087, "o", "[ Info: Received 13\r\n"]
[7.046769, "o", "[ Info: Received 12\r\n"]
[7.142571, "o", "[ Info: Received 44\r\n[ Info: Received 37\r\n"]
[7.170619, "o", "[ Info: Received 39\r\n[ Info: Received 1\r\n"]
51 changes: 51 additions & 0 deletions examples/messaging/concurrent/julia/output.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
[ Info: NATS server url is nats://nats:4222
[ Info: Received 24
[ Info: Received 50
[ Info: Received 5
[ Info: Received 22
[ Info: Received 40
[ Info: Received 38
[ Info: Received 17
[ Info: Received 42
[ Info: Received 9
[ Info: Received 36
[ Info: Received 19
[ Info: Received 16
[ Info: Received 6
[ Info: Received 21
[ Info: Received 8
[ Info: Received 34
[ Info: Received 32
[ Info: Received 27
[ Info: Received 11
[ Info: Received 28
[ Info: Received 35
[ Info: Received 47
[ Info: Received 43
[ Info: Received 7
[ Info: Received 14
[ Info: Received 25
[ Info: Received 20
[ Info: Received 26
[ Info: Received 10
[ Info: Received 4
[ Info: Received 45
[ Info: Received 23
[ Info: Received 31
[ Info: Received 33
[ Info: Received 2
[ Info: Received 48
[ Info: Received 3
[ Info: Received 46
[ Info: Received 15
[ Info: Received 29
[ Info: Received 30
[ Info: Received 18
[ Info: Received 41
[ Info: Received 49
[ Info: Received 13
[ Info: Received 12
[ Info: Received 44
[ Info: Received 37
[ Info: Received 39
[ Info: Received 1
58 changes: 58 additions & 0 deletions examples/messaging/iterating-multiple-subscriptions/julia/main.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Import NATS.jl package.
using NATS;

# Get the passed NATS_URL or fallback to the default. This can be
# a comma-separated string.
url = get(ENV, "NATS_URL", NATS.DEFAULT_CONNECT_URL)
@info "NATS server url is $url"

# Create a client connection to an available NATS server.
nc = NATS.connect(url)

# Specify list of subjects to listen on.
subjects = ["s1", "s2", "s3", "s4"]

# In Julia Channel is a way to establish communication between tasks.
# See [Channel documentation](https://docs.julialang.org/en/v1/manual/asynchronous-programming/#Communicating-with-Channels)
# for more details.
ch = Channel()

# Create subscriptions for all subjects, subscription handler
# puts received message into channel.
subs = map(subjects) do subject
subscribe(nc, subject) do msg
put!(ch, msg)
end
end

# Create consumer task consuming messages from the channel.
subscribe_task = Threads.@spawn while true
msg = take!(ch)
@info "Received $(payload(msg)) from subject $(msg.subject)"
end

# Create consumer task publishing messages.
# It sends numbers from 1 to 5 to every subject.
publish_task = Threads.@spawn for i in 1:5
for subject in subjects
publish(nc, subject, string(i))
end
sleep(0.5)
end

# Wait for all publications are done.
wait(publish_task)

# Close all subscriptions, `drain` will wait until all messages
# in buffers are processed.
for sub in subs
drain(nc, sub)
end

# Finally close the channel to force consumer task to complete.
close(ch)

# At this moment both spawned tasks should be completed.
# Let's make sure it is true, dangling tasks might cause resource leak.
@show istaskdone(publish_task)
@show istaskdone(subscribe_task)
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{"version": 2, "width": 115, "height": 17, "timestamp": 1737222971, "env": {"SHELL": "/bin/zsh", "TERM": "xterm-256color"}, "title": "NATS by Example: messaging/iterating-multiple-subscriptions/julia"}
[2.385072, "o", "[ Info: NATS server url is nats://nats:4222\r\n"]
[6.66447, "o", "[ Info: Received 1 from subject s4\r\n"]
[6.668972, "o", "[ Info: Received 1 from subject s2\r\n[ Info: Received 1 from subject s3\r\n[ Info: Received 1 from subject s1\r\n[ Info: Received 2 from subject s4\r\n[ Info: Received 2 from subject s2\r\n[ Info: Received 2 from subject s3\r\n[ Info: Received 2 from subject s1\r\n"]
[7.177356, "o", "[ Info: Received 3 from subject s4\r\n[ Info: Received 3 from subject s2\r\n[ Info: Received 3 from subject s3\r\n[ Info: Received 3 from subject s1\r\n"]
[7.684415, "o", "[ Info: Received 4 from subject s4\r\n[ Info: Received 4 from subject s2\r\n[ Info: Received 4 from subject s3\r\n[ Info: Received 4 from subject s1\r\n"]
[8.185909, "o", "[ Info: Received 5 from subject s4\r\n[ Info: Received 5 from subject s2\r\n[ Info: Received 5 from subject s3\r\n[ Info: Received 5 from subject s1\r\n"]
[9.573481, "o", "istaskdone(publish_task) = "]
[9.573668, "o", "true\r\n"]
[9.574009, "o", "istaskdone(subscribe_task) = true\r\n"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[ Info: NATS server url is nats://nats:4222
[ Info: Received 1 from subject s4
[ Info: Received 1 from subject s2
[ Info: Received 1 from subject s3
[ Info: Received 1 from subject s1
[ Info: Received 2 from subject s4
[ Info: Received 2 from subject s2
[ Info: Received 2 from subject s3
[ Info: Received 2 from subject s1
[ Info: Received 3 from subject s4
[ Info: Received 3 from subject s2
[ Info: Received 3 from subject s3
[ Info: Received 3 from subject s1
[ Info: Received 4 from subject s4
[ Info: Received 4 from subject s2
[ Info: Received 4 from subject s3
[ Info: Received 4 from subject s1
[ Info: Received 5 from subject s4
[ Info: Received 5 from subject s2
[ Info: Received 5 from subject s3
[ Info: Received 5 from subject s1
istaskdone(publish_task) = true
istaskdone(subscribe_task) = true
41 changes: 41 additions & 0 deletions examples/messaging/json/julia/main.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Import NATS.jl and JSON3 packages.
using NATS
using JSON3

# Get the passed NATS_URL or fallback to the default. This can be
# a comma-separated string.
url = get(ENV, "NATS_URL", NATS.DEFAULT_CONNECT_URL)
@info "NATS server url is $url"

# Define a struct that will be serialized and deserialized from JSON.
struct Payload
foo::String
bar::Int64
end

# Create a client connection to an available NATS server.
nc = NATS.connect(url)

# Construct a Payload value and serialize it.
data_object = Payload("bar", 27)
data_json = JSON3.write(data_object)

# Create a subscription. For messages that contain invalid payload error
# will be reported in a separate monitoring task.
sub = subscribe(nc, "foo") do msg
obj = JSON3.read(payload(msg), Payload)
@info "Received object: $obj"
end

# Publish the serialized payload and also some payloads that are not valid.
publish(nc, "foo", data_json)
publish(nc, "foo", "not a json")
publish(nc, "foo", "also not a json")

# Wait for error to be reported from handlers.
# To avoid excessive console output errors from subscrption handlers
# are batched and reported every few seconds.
sleep(10)

# Close connection.
drain(nc)
4 changes: 4 additions & 0 deletions examples/messaging/json/julia/output.cast
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"version": 2, "width": 115, "height": 17, "timestamp": 1737223446, "env": {"SHELL": "/bin/zsh", "TERM": "xterm-256color"}, "title": "NATS by Example: messaging/json/julia"}
[2.408164, "o", "[ Info: NATS server url is nats://nats:4222\r\n"]
[7.842433, "o", "[ Info: Received object: Payload(\"bar\", 27)\r\n"]
[11.993665, "o", "┌ Error: 2 handler errors on \"foo\" in last 5.0 s.\r\n│ err =\r\n│ ArgumentError: invalid JSON at byte position 1 while parsing type Payload: ExpectedOpeningObjectChar\r\n│ also not a json\r\n│ \r\n│ msg =\r\n│ MSG foo 1 15\r\r\n│ also not a json\r\n└ @ NATS ~/.julia/packages/NATS/pozkW/src/pubsub/subscribe.jl:295\r\n"]
11 changes: 11 additions & 0 deletions examples/messaging/json/julia/output.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[ Info: NATS server url is nats://nats:4222
[ Info: Received object: Payload("bar", 27)
┌ Error: 2 handler errors on "foo" in last 5.0 s.
│ err =
│ ArgumentError: invalid JSON at byte position 1 while parsing type Payload: ExpectedOpeningObjectChar
│ also not a json
│ msg =
│ MSG foo 1 15
│ also not a json
└ @ NATS ~/.julia/packages/NATS/pozkW/src/pubsub/subscribe.jl:295
Expand Down
33 changes: 33 additions & 0 deletions examples/messaging/pub-sub/julia/main.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Import NATS.jl library.
using NATS

# Get the passed NATS_URL or fallback to the default. This can be
# a comma-separated string.
url = get(ENV, "NATS_URL", NATS.DEFAULT_CONNECT_URL)
@info "NATS server url is $url"

# Create a client connection to an available NATS server.
nc = NATS.connect(url)

# To publish a message, simply provide the _subject_ of the message
# and encode the message payload. NATS subjects are hierarchical using
# periods as token delimiters. `greet` and `joe` are two distinct tokens.
publish(nc, "greet.bob", "hello")

# Now we are going to create a subscription and utilize a wildcard on
# the second token. The effect is that this subscription shows _interest_
# in all messages published to a subject with two tokens where the first
# is `greet`.
sub = subscribe(nc, "greet.*") do msg
@info "$(payload(msg)) on subject $(msg.subject)"
end

# Let's publish three more messages which will result in the messages
# being forwarded to the local subscription we have.
for name in ("joe", "pam", "sue")
publish(nc, "greet.$name", "hello")
end

# Finally we drain the connection which waits for any pending
# messages (published or in a subscription) to be flushed.
drain(nc)
Loading
Loading