Skip to content

Latest commit

 

History

History
66 lines (52 loc) · 2.61 KB

2013-01-07-processing-a-streaming-api-with-ruby-sockets-without-net-http.html.markdown

File metadata and controls

66 lines (52 loc) · 2.61 KB
title
Processing a streaming API with Ruby sockets without Net::HTTP

I recently ran across the use-case of needing to consume the CouchDB _changes stream in a streaming fashion.

Most of the recommendations I found online were singing the praises of EventMachine or some other event-driven solution, but in fact, it's much easier than that, here's the code I came up with:

require 'json'
require 'thread'
require 'socket'

changes = Queue.new
reader  = Thread.new(changes) do |changes|

  s = TCPSocket.new 'localhost', 5984
  s.write "GET /omio_development/_changes?since=0&feed=continuous\r\n"
  s.write "\r\n"

  while line = s.gets
    changes.push JSON.parse(line) if line.chars.first == '{'
  end
  s.close
  changes.push NilClass

end

handler = Thread.new(changes) do |changes|
  while change = changes.pop
    break if change.is_a? NilClass
    warn change.inspect
  end
end

[reader, handler].map(&:join)

This is the simplest thing that can normally be made to work, and is idiomatic using Ruby's Queue class (imported from the thread part of the standard library) to communicate between two threads.

There's nothing groundbreaking here, it should probably be moved into a class, and made so that it can be started, and stopped from the background (I'd run these two threads in a new thread, that a start and stop method could be called easily and conveniently. I'd probably pass a locked mutex to the reader thread to stop it in the first line of it's body before calling start on my class, the second thread wouldn't need a mutex as it would block immediately on the handler

I've taken some liberties here, with the specifics of the couchdb continuous changes protocol, as I simply dont' care about the HTTP headers (they'd tell me it's a chunked response, with no Content-Length.

It would be prudent to test this a bit more thoroughly, but writing a test-case for this would involve writing a server as well, as I'm not aware of any of the web-mocking tools that will allow you to mock streaming bodies, but I haven't researched it extensively.

There's probably a better way of signalling the handler thread that the queue is finished than passing a NilClass, curiously in Ruby there's no way to close a queue, or terminate it, except to send a known end of queue value to cause the thread to break out. In the Go language, queues are called channels and they can be closed(), signalling anyone who is reading the channel that there's nothing else to read, and never will be.