Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
jroimartin committed Feb 28, 2015
0 parents commit a302db5
Show file tree
Hide file tree
Showing 10 changed files with 451 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.swp
9 changes: 9 additions & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# This is the official list of rpcmq authors for copyright purposes.

# Names should be added to this file as
# Name or Organization <email address>
# The email address is not required for organizations.

# Please keep the list sorted.

Roi Martin (@nibble_ds) <[email protected]>
23 changes: 23 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
Copyright (c) 2014 The rpcmq Authors. All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
* Neither the name of the rpcmq Authors nor the names of its contributors
may be used to endorse or promote products derived from this software
without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# rpcmq

RPC over AMQP for RabbitMQ
59 changes: 59 additions & 0 deletions amqp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright 2014 The rpcmq Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package rpcmq

import (
"fmt"

"github.com/streadway/amqp"
)

type amqpClient struct {
uri string
consumerTag string

conn *amqp.Connection
channel *amqp.Channel
queue amqp.Queue
done chan bool
}

func newAmqpRpc(uri string) *amqpClient {
r := &amqpClient{
done: make(chan bool),
uri: uri,
}
return r
}

func (r *amqpClient) init() error {
var err error
r.conn, err = amqp.Dial(r.uri)
if err != nil {
return fmt.Errorf("Dial: %v", err)
}

r.channel, err = r.conn.Channel()
if err != nil {
return fmt.Errorf("Channel: %v", err)
}

return nil
}

func (r *amqpClient) shutdown() error {
if r.consumerTag != "" {
if err := r.channel.Cancel(r.consumerTag, false); err != nil {
return fmt.Errorf("Channel Close: %v", err)
}
}

if err := r.conn.Close(); err != nil {
return fmt.Errorf("Connection Close: %v", err)
}

<-r.done
return nil
}
139 changes: 139 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright 2014 The rpcmq Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package rpcmq

import (
"encoding/json"
"fmt"

"github.com/streadway/amqp"
)

type Client struct {
queue string
ac *amqpClient
deliveries <-chan amqp.Delivery
results chan Result
}

type Result struct {
UUID string
Val interface{}
}

type rpcMsg struct {
Method string
Args []interface{}
}

func NewClient(uri, queue string) *Client {
c := &Client{
queue: queue,
ac: newAmqpRpc(uri),
results: make(chan Result),
}
return c
}

func (c *Client) Init() error {
if err := c.ac.init(); err != nil {
return err
}

var err error
c.ac.queue, err = c.ac.channel.QueueDeclare(
"", // name
true, // durable
false, // autoDelete
true, // exclusive
false, // noWait
nil, // args
)
if err != nil {
return fmt.Errorf("Queue Declare: %v", err)
}

c.ac.consumerTag, err = UUID()
if err != nil {
return fmt.Errorf("UUID: %v", err)
}

c.deliveries, err = c.ac.channel.Consume(
c.ac.queue.Name, // name
c.ac.consumerTag, // consumer
false, // autoAck
false, // exclusive
false, // noLocal
false, // noWait
nil, // args
)
if err != nil {
return fmt.Errorf("Queue Consume: %v", err)
}

go c.getDeliveries()

return nil
}

func (c *Client) getDeliveries() {
for d := range c.deliveries {
if d.CorrelationId == "" {
d.Nack(false, false)
continue
}
var r Result
if err := json.Unmarshal(d.Body, &r); err != nil {
d.Nack(false, false)
continue
}
c.results <- r
d.Ack(false)
}
c.ac.done <- true
}

func (c *Client) Shutdown() error {
return c.ac.shutdown()
}

func (c *Client) Call(method string, args ...interface{}) (uuid string, err error) {
uuid, err = UUID()
if err != nil {
return "", fmt.Errorf("UUID: %v", err)
}

msg := &rpcMsg{
Method: method,
Args: args,
}
body, err := json.Marshal(msg)
if err != nil {
return "", fmt.Errorf("Marshal: %v", err)
}

err = c.ac.channel.Publish(
"", // exchange
c.queue, // key
false, // mandatory
false, // immediate
amqp.Publishing{ // msg
CorrelationId: uuid,
ReplyTo: c.ac.queue.Name,
ContentType: "application/json",
Body: body,
DeliveryMode: amqp.Persistent, // TODO(jrm): Configurable mode
},
)
if err != nil {
return "", err
}

return uuid, nil
}

func (c *Client) Results() <-chan Result {
return (<-chan Result)(c.results)
}
30 changes: 30 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright 2014 The rpcmq Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package rpcmq

import (
"log"
"testing"
)

func TestClient(t *testing.T) {
c := NewClient("amqp://127.0.0.1:5672", "rcp-queue")
if err := c.Init(); err != nil {
t.Fatalf("Init: %v", err)
}
defer c.Shutdown()
uuid, err := c.Call("sum", 1, 2, 3, 4, 5, -12)
if err != nil {
t.Fatalf("Call: %v", err)
}
r := <-c.Results()
if r.UUID == uuid {
if ret, ok := r.Val.(float64); ok {
log.Printf("Received: %v (%v)\n", ret, r.UUID)
} else {
t.Fatalf("return values is not int")
}
}
}
Loading

0 comments on commit a302db5

Please sign in to comment.