Skip to content

Commit

Permalink
Make handlers Deferrable-aware
Browse files Browse the repository at this point in the history
If a handler returns an EM::Deferrable, wait for the EM result to be ready
before considering the handler complete, raising and error result as an
exception in the current context just like a non-async handler would have done.
This allows async filters/handlers to run in order.

To be able to sync, handle_data is wrapped in a Fiber.  To aid in testing or
other cases that want to wait until the whole process is done, handle_data
returns an EM::Deferrable that will signal the success or failure of the whole
process (for both the sync and async cases).  This can be safely ignored by most
callers.
  • Loading branch information
singpolyma committed Jun 30, 2022
1 parent a6cf7e5 commit e51f153
Showing 1 changed file with 41 additions and 8 deletions.
49 changes: 41 additions & 8 deletions lib/blather/client/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,23 @@ def receive_data(stanza)
end

def handle_data(stanza)
catch(:halt) do
run_filters :before, stanza
handle_stanza stanza
run_filters :after, stanza
end
df = EM::DefaultDeferrable.new

Fiber.new {
begin
catch(:halt) do
run_filters :before, stanza
handle_stanza stanza
run_filters :after, stanza
end
df.succeed
rescue => e
df.fail(e) # Fail in case anyone is listening
raise e # But still raise so it's not a silent failure
end
}.resume

df
end

# @private
Expand Down Expand Up @@ -334,12 +346,33 @@ def call_handler_for(type, stanza)
end

def call_handler(handler, guards, stanza)
if guards.first.respond_to?(:to_str)
result = stanza.find(*guards)
handler.call(stanza, result) unless result.empty?
result = if guards.first.respond_to?(:to_str)
found = stanza.find(*guards)
handler.call(stanza, found) unless found.empty?
else
handler.call(stanza) unless guarded?(guards, stanza)
end

return result unless result.is_a?(EM::Deferrable)

unless EM.reactor_thread == Thread.current
raise "Cannot sync EM::Deferrable across threads. " \
"Did you forget to setup with async: true?"
end

fiber = Fiber.current
EM.next_tick do
result.callback(&fiber.method(:resume))
result.errback do |e|
if e.is_a?(Exception)
fiber.raise(e)
else
fiber.raise(e.to_s)
end
end
end

Fiber.yield
end

# If any of the guards returns FALSE this returns true
Expand Down

0 comments on commit e51f153

Please sign in to comment.