Skip to content

Commit

Permalink
conn, pool, examples/session: catch errors while waiting for response…
Browse files Browse the repository at this point in the history
…, and add example for using monte.Session and monte.SessionConn
  • Loading branch information
lithdew committed Jun 12, 2020
1 parent 15ff088 commit 02975bf
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 20 deletions.
3 changes: 2 additions & 1 deletion conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (c *Conn) Request(dst []byte, payload []byte) ([]byte, error) {

pr.wg.Wait()

return pr.dst, nil
return pr.dst, pr.err
}

func (c *Conn) init() {
Expand Down Expand Up @@ -369,6 +369,7 @@ func (c *Conn) close(err error) {

for seq := range c.reqs {
pr := c.reqs[seq]
pr.err = err
pr.wg.Done()

delete(c.reqs, seq)
Expand Down
48 changes: 48 additions & 0 deletions examples/session/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package main

import (
"encoding/hex"
"fmt"
"github.com/lithdew/monte"
"net"
)

func main() {
check := func(err error) {
if err != nil {
panic(err)
}
}

ln, err := net.Listen("tcp", ":4444")
check(err)
defer ln.Close()

conn, err := ln.Accept()
check(err)
defer conn.Close()

sess, err := monte.NewSession()
check(err)

check(sess.DoServer(conn))

fmt.Println(hex.EncodeToString(sess.SharedKey()))

sc := monte.NewSessionConn(sess.Suite(), conn)

buf := make([]byte, 1024)

for i := 0; i < 100; i++ {
n, err := sc.Read(buf)
check(err)

fmt.Println("Decrypted:", string(buf[:n]))
}

for i := 0; i < 100; i++ {
_, err = sc.Write([]byte(fmt.Sprintf("[%d] Hello from Go!", i)))
check(err)
check(sc.Flush())
}
}
39 changes: 20 additions & 19 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,25 +32,6 @@ func acquireContext(conn *Conn, seq uint32, buf []byte) *Context {

func releaseContext(ctx *Context) { contextPool.Put(ctx) }

type pendingRequest struct {
dst []byte // dst to copy response to
wg sync.WaitGroup // signals the caller that the response has been received
}

var pendingRequestPool sync.Pool

func acquirePendingRequest(dst []byte) *pendingRequest {
v := pendingRequestPool.Get()
if v == nil {
v = &pendingRequest{}
}
pr := v.(*pendingRequest)
pr.dst = dst
return pr
}

func releasePendingRequest(pr *pendingRequest) { pendingRequestPool.Put(pr) }

type pendingWrite struct {
buf *bytebufferpool.ByteBuffer // payload
wait bool // signal to caller if they're waiting
Expand All @@ -73,6 +54,26 @@ func acquirePendingWrite(buf *bytebufferpool.ByteBuffer, wait bool) *pendingWrit

func releasePendingWrite(pw *pendingWrite) { pendingWritePool.Put(pw) }

type pendingRequest struct {
dst []byte // dst to copy response to
err error // error while waiting for response
wg sync.WaitGroup // signals the caller that the response has been received
}

var pendingRequestPool sync.Pool

func acquirePendingRequest(dst []byte) *pendingRequest {
v := pendingRequestPool.Get()
if v == nil {
v = &pendingRequest{}
}
pr := v.(*pendingRequest)
pr.dst = dst
return pr
}

func releasePendingRequest(pr *pendingRequest) { pendingRequestPool.Put(pr) }

var zeroTime time.Time

var timerPool sync.Pool
Expand Down

0 comments on commit 02975bf

Please sign in to comment.