Skip to content

Commit

Permalink
initial mvp
Browse files Browse the repository at this point in the history
  • Loading branch information
actatum committed Jun 19, 2022
0 parents commit 10ad39d
Show file tree
Hide file tree
Showing 19 changed files with 2,037 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
cmd
.idea
58 changes: 58 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package stormrpc

import (
"errors"

"github.com/nats-io/nats.go"
)

type Client struct {
nc *nats.Conn
}

func NewClient(natsURL string, opts ...ClientOption) (*Client, error) {
nc, err := nats.Connect(natsURL)
if err != nil {
return nil, err
}

return &Client{
nc: nc,
}, nil
}

type clientOptions struct{}

type ClientOption interface {
apply(*clientOptions)
}

func (c *Client) Do(r *Request) Response {
msg, err := c.nc.RequestMsgWithContext(r.Context, r.Msg)
if errors.Is(err, nats.ErrNoResponders) {
return Response{
Msg: msg,
Err: Errorf(ErrorCodeInternal, "no servers available for subject: %s", r.Subject()),
}
}
if err != nil {
return Response{
Msg: msg,
Err: err, // TODO: probably use errorf and inspect different error types from nats.
}
}

// Inspect headers and set error if appropriate
rpcErr := parseErrorHeader(msg.Header)
if rpcErr != nil {
return Response{
Msg: msg,
Err: rpcErr,
}
}

return Response{
Msg: msg,
Err: nil,
}
}
237 changes: 237 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
package stormrpc

import (
"context"
"errors"
"fmt"
"math/rand"
"strconv"
"testing"
"time"

"github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"
)

func TestNewClient(t *testing.T) {
t.Run("no nats server running", func(t *testing.T) {
_, err := NewClient(nats.DefaultURL)
if err == nil {
t.Fatal("expected error got nil")
}
})

t.Run("nats server running", func(t *testing.T) {
ns, err := server.NewServer(&server.Options{
Port: 41397,
})
if err != nil {
t.Fatal(err)
}
go ns.Start()
t.Cleanup(func() {
ns.Shutdown()
ns.WaitForShutdown()
})

if !ns.ReadyForConnections(1 * time.Second) {
t.Error("timeout waiting for nats server")
return
}

_, err = NewClient(ns.ClientURL())
if err != nil {
t.Fatal(err)
}
})
}

func TestClient_Do(t *testing.T) {
t.Parallel()

rand.Seed(time.Now().UnixNano())
ns, err := server.NewServer(&server.Options{
Port: 41397,
})
if err != nil {
t.Fatal(err)
}
go ns.Start()
t.Cleanup(func() {
ns.Shutdown()
ns.WaitForShutdown()
})

if !ns.ReadyForConnections(1 * time.Second) {
t.Error("timeout waiting for nats server")
return
}

t.Run("deadline exceeded", func(t *testing.T) {
t.Parallel()

timeout := 50 * time.Millisecond
subject := strconv.Itoa(rand.Int())
srv, err := NewServer("test", ns.ClientURL())
if err != nil {
t.Fatal(err)
}
srv.Handle(subject, func(r Request) Response {
time.Sleep(timeout + 10*time.Millisecond)
return Response{Msg: &nats.Msg{Subject: r.Reply}}
})
go srv.Run()
t.Cleanup(func() {
_ = srv.Shutdown(context.Background())
})

client, err := NewClient(ns.ClientURL())
if err != nil {
t.Fatal(err)
}

ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
r, err := NewRequest(ctx, subject, map[string]string{"howdy": "partner"})
if err != nil {
t.Fatal(err)
}

resp := client.Do(r)
if resp.Err == nil {
t.Fatal("expected error got nil")
}

if !errors.Is(resp.Err, context.DeadlineExceeded) {
t.Fatalf("got = %v, want %v", resp.Err, context.DeadlineExceeded)
}
})

t.Run("rpc error", func(t *testing.T) {
t.Parallel()

timeout := 50 * time.Millisecond
subject := strconv.Itoa(rand.Int())
srv, err := NewServer("test", ns.ClientURL())
if err != nil {
t.Fatal(err)
}
srv.Handle(subject, func(r Request) Response {
return NewErrorResponse(r.Reply, Errorf(ErrorCodeNotFound, "thingy not found"))
})
go srv.Run()
t.Cleanup(func() {
_ = srv.Shutdown(context.Background())
})

client, err := NewClient(ns.ClientURL())
if err != nil {
t.Fatal(err)
}

ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
r, err := NewRequest(ctx, subject, map[string]string{"howdy": "partner"})
if err != nil {
t.Fatal(err)
}

resp := client.Do(r)
if resp.Err == nil {
t.Fatal("expected error got nil")
}

code := CodeFromErr(resp.Err)
if code != ErrorCodeNotFound {
t.Fatalf("got = %v, want %v", code, ErrorCodeNotFound)
}
msg := MessageFromErr(resp.Err)
if msg != "thingy not found" {
t.Fatalf("got = %v, want %v", msg, "thingy not found")
}
})

t.Run("no servers", func(t *testing.T) {
t.Parallel()

subject := strconv.Itoa(rand.Int())

client, err := NewClient(ns.ClientURL())
if err != nil {
t.Fatal(err)
}

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

req, err := NewRequest(ctx, subject, map[string]string{"x": "D"})
if err != nil {
t.Fatal(err)
}
resp := client.Do(req)
if resp.Err == nil {
t.Fatal("expected error got nil")
}

code := CodeFromErr(resp.Err)
if code != ErrorCodeInternal {
t.Fatalf("got = %v, want %v", code, ErrorCodeInternal)
}
msg := MessageFromErr(resp.Err)
if msg != fmt.Sprintf("no servers available for subject: %s", subject) {
t.Fatalf(
"got = %v, want %v",
msg,
fmt.Sprintf("no servers available for subject: %s", subject),
)
}
})

t.Run("successful request", func(t *testing.T) {
t.Parallel()

timeout := 50 * time.Millisecond
subject := strconv.Itoa(rand.Int())
srv, err := NewServer("test", ns.ClientURL())
if err != nil {
t.Fatal(err)
}
srv.Handle(subject, func(r Request) Response {
resp, err := NewResponse(r.Reply, map[string]string{"hello": "world"})
if err != nil {
return NewErrorResponse(r.Reply, err)
}
return resp
})
go srv.Run()
t.Cleanup(func() {
_ = srv.Shutdown(context.Background())
})

client, err := NewClient(ns.ClientURL())
if err != nil {
t.Fatal(err)
}

ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
r, err := NewRequest(ctx, subject, map[string]string{"howdy": "partner"})
if err != nil {
t.Fatal(err)
}

resp := client.Do(r)
if resp.Err != nil {
t.Fatal(resp.Err)
}

var result map[string]string
if err = resp.Decode(&result); err != nil {
t.Fatal(err)
}

if result["hello"] != "world" {
t.Fatalf("got = %v, want %v", result["hello"], "world")
}
})
}
68 changes: 68 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package stormrpc

import (
"errors"
"fmt"
)

type ErrorCode int

const (
ErrorCodeUnknown ErrorCode = 0
ErrorCodeInternal ErrorCode = 1
ErrorCodeNotFound ErrorCode = 2
)

func (c ErrorCode) String() string {
switch c {
case ErrorCodeInternal:
return "STORMRPC_CODE_INTERNAL"
case ErrorCodeNotFound:
return "STORMRPC_CODE_NOT_FOUND"
default:
return "STORMRPC_CODE_UNKNOWN"
}
}

type Error struct {
Code ErrorCode
Message string
}

func (e Error) Error() string {
return fmt.Sprintf("%s: %s", e.Code.String(), e.Message)
}

func Errorf(code ErrorCode, format string, args ...any) *Error {
return &Error{
Code: code,
Message: fmt.Sprintf(format, args...),
}
}

func CodeFromErr(err error) ErrorCode {
var e *Error
if errors.As(err, &e) {
return e.Code
}
return ErrorCodeUnknown
}

func MessageFromErr(err error) string {
var e *Error
if errors.As(err, &e) {
return e.Message
}
return "unknown error"
}

func codeFromString(s string) ErrorCode {
switch s {
case "STORMRPC_CODE_INTERNAL":
return ErrorCodeInternal
case "STORMRPC_CODE_NOT_FOUND":
return ErrorCodeNotFound
default:
return ErrorCodeUnknown
}
}
Loading

0 comments on commit 10ad39d

Please sign in to comment.