Skip to content

Commit

Permalink
conn, pool, readme: remove remaining memory allocations, fix concurre…
Browse files Browse the repository at this point in the history
…ncy/memory safety for writeNoWait/sendNoWait, pass pooled byte buffers throughout Conn, update readme
  • Loading branch information
lithdew committed Jun 11, 2020
1 parent bf6b63d commit 909501e
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 50 deletions.
16 changes: 8 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ The bare minimum for high performance, fully-encrypted RPC over TCP in Go.

## Features

1. Send requests, receive responses, or send messages with no expectations for responses.
2. Send from 50MiB/s to 1500MiB/s, with zero to two allocations max per sent message or RPC call.
1. Send requests, receive responses, or send messages without waiting for a response.
2. Send from 50MiB/s to 1500MiB/s, with zero allocations per sent message or RPC call.
3. Gracefully establish multiple client connections to a single endpoint up to a configurable limit.
4. Set the total number of connections that may concurrently be accepted and handled by a single endpoint.
5. Configure read/write timeouts, dial timeouts, handshake timeouts, or customize the handshaking protocol.
Expand Down Expand Up @@ -44,10 +44,10 @@ $ go test -bench=. -benchtime=10s
goos: linux
goarch: amd64
pkg: github.com/lithdew/monte
BenchmarkSend-8 1643733 7082 ns/op 197.70 MB/s 123 B/op 1 allocs/op
BenchmarkSendNoWait-8 14516896 913 ns/op 1533.80 MB/s 152 B/op 0 allocs/op
BenchmarkRequest-8 424249 28276 ns/op 49.51 MB/s 156 B/op 2 allocs/op
BenchmarkParallelSend-8 5316900 2450 ns/op 571.40 MB/s 124 B/op 1 allocs/op
BenchmarkParallelSendNoWait-8 11475540 1072 ns/op 1305.66 MB/s 154 B/op 0 allocs/op
BenchmarkParallelRequest-8 1384652 7824 ns/op 178.93 MB/s 156 B/op 2 allocs/op
BenchmarkSend-8 1814391 6690 ns/op 209.27 MB/s 115 B/op 0 allocs/op
BenchmarkSendNoWait-8 10638730 1153 ns/op 1214.19 MB/s 141 B/op 0 allocs/op
BenchmarkRequest-8 438381 28556 ns/op 49.03 MB/s 140 B/op 0 allocs/op
BenchmarkParallelSend-8 4917001 2876 ns/op 486.70 MB/s 115 B/op 0 allocs/op
BenchmarkParallelSendNoWait-8 10317255 1291 ns/op 1084.78 MB/s 150 B/op 0 allocs/op
BenchmarkParallelRequest-8 1341444 8520 ns/op 164.32 MB/s 140 B/op 0 allocs/op
```
78 changes: 41 additions & 37 deletions conn.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package monte

import (
"encoding/binary"
"fmt"
"github.com/lithdew/bytesutil"
"github.com/valyala/bytebufferpool"
Expand Down Expand Up @@ -29,23 +30,6 @@ type Conn struct {
seq uint32
}

func (c *Conn) write(buf []byte) error {
c.once.Do(c.init)
pw, err := c.preparePendingWrite(buf, true)
if err != nil {
return err
}
defer releasePendingWrite(pw)
pw.wg.Wait()
return pw.err
}

func (c *Conn) writeNoWait(buf []byte) error {
c.once.Do(c.init)
_, err := c.preparePendingWrite(buf, false)
return err
}

func (c *Conn) NumPendingWrites() int {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down Expand Up @@ -106,24 +90,18 @@ func (c *Conn) SendNoWait(payload []byte) error { c.once.Do(c.init); return c.se
func (c *Conn) Request(dst []byte, payload []byte) ([]byte, error) {
c.once.Do(c.init)

buf := bytebufferpool.Get()
defer bytebufferpool.Put(buf)

seq := c.next()

buf.B = bytesutil.AppendUint32BE(buf.B, seq)
buf.B = append(buf.B, payload...)

pr := acquirePendingRequest(dst)
defer releasePendingRequest(pr)

pr.wg.Add(1)

seq := c.next()

c.mu.Lock()
c.reqs[seq] = pr
c.mu.Unlock()

err := c.writeNoWait(buf.B)
err := c.sendNoWait(seq, payload)

if err != nil {
pr.wg.Done()
Expand All @@ -148,23 +126,37 @@ func (c *Conn) send(seq uint32, payload []byte) error {
buf := bytebufferpool.Get()
defer bytebufferpool.Put(buf)

buf.B = bytesutil.AppendUint32BE(buf.B, seq)
buf.B = append(buf.B, payload...)
buf.B = bytesutil.ExtendSlice(buf.B, 4+len(payload))
binary.BigEndian.PutUint32(buf.B[:4], seq)
copy(buf.B[4:], payload)

return c.write(buf.B)
return c.write(buf)
}

func (c *Conn) sendNoWait(seq uint32, payload []byte) error {
buf := bytebufferpool.Get()
defer bytebufferpool.Put(buf)
buf.B = bytesutil.ExtendSlice(buf.B, 4+len(payload))
binary.BigEndian.PutUint32(buf.B[:4], seq)
copy(buf.B[4:], payload)
return c.writeNoWait(buf)
}

buf.B = bytesutil.AppendUint32BE(buf.B, seq)
buf.B = append(buf.B, payload...)
func (c *Conn) write(buf *bytebufferpool.ByteBuffer) error {
pw, err := c.preparePendingWrite(buf, true)
if err != nil {
return err
}
defer releasePendingWrite(pw)
pw.wg.Wait()
return pw.err
}

return c.writeNoWait(buf.B)
func (c *Conn) writeNoWait(buf *bytebufferpool.ByteBuffer) error {
_, err := c.preparePendingWrite(buf, false)
return err
}

func (c *Conn) preparePendingWrite(buf []byte, wait bool) (*pendingWrite, error) {
func (c *Conn) preparePendingWrite(buf *bytebufferpool.ByteBuffer, wait bool) (*pendingWrite, error) {
c.mu.Lock()
defer c.mu.Unlock()

Expand Down Expand Up @@ -237,15 +229,24 @@ func (c *Conn) next() uint32 {
}

func (c *Conn) writeLoop(conn BufferedConn) error {
var queue []*pendingWrite
var err error

for {
c.mu.Lock()
for !c.writerDone && len(c.writerQueue) == 0 {
c.writerCond.Wait()
}
done, queue := c.writerDone, c.writerQueue
c.writerQueue = nil
done := c.writerDone

if n := len(c.writerQueue) - cap(queue); n > 0 {
queue = append(queue[:cap(queue)], make([]*pendingWrite, n)...)
}
queue = queue[:len(c.writerQueue)]

copy(queue, c.writerQueue)

c.writerQueue = c.writerQueue[:0]
c.mu.Unlock()

if done && len(queue) == 0 {
Expand All @@ -261,6 +262,7 @@ func (c *Conn) writeLoop(conn BufferedConn) error {
pw.err = err
pw.wg.Done()
} else {
bytebufferpool.Put(pw.buf)
releasePendingWrite(pw)
}
}
Expand All @@ -270,12 +272,13 @@ func (c *Conn) writeLoop(conn BufferedConn) error {

for _, pw := range queue {
if err == nil {
_, err = conn.Write(pw.buf)
_, err = conn.Write(pw.buf.B)
}
if pw.wait {
pw.err = err
pw.wg.Done()
} else {
bytebufferpool.Put(pw.buf)
releasePendingWrite(pw)
}
}
Expand Down Expand Up @@ -357,6 +360,7 @@ func (c *Conn) close(err error) {
pw.err = err
pw.wg.Done()
} else {
bytebufferpool.Put(pw.buf)
releasePendingWrite(pw)
}
}
Expand Down
11 changes: 6 additions & 5 deletions pool.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package monte

import (
"github.com/valyala/bytebufferpool"
"sync"
"time"
)
Expand Down Expand Up @@ -50,15 +51,15 @@ func acquirePendingRequest(dst []byte) *pendingRequest {
func releasePendingRequest(pr *pendingRequest) { pendingRequestPool.Put(pr) }

type pendingWrite struct {
buf []byte // payload
wait bool // signal to caller if they're waiting
err error // keeps track of any socket errors on write
wg sync.WaitGroup // signals the caller that this write is complete
buf *bytebufferpool.ByteBuffer // payload
wait bool // signal to caller if they're waiting
err error // keeps track of any socket errors on write
wg sync.WaitGroup // signals the caller that this write is complete
}

var pendingWritePool sync.Pool

func acquirePendingWrite(buf []byte, wait bool) *pendingWrite {
func acquirePendingWrite(buf *bytebufferpool.ByteBuffer, wait bool) *pendingWrite {
v := pendingWritePool.Get()
if v == nil {
v = &pendingWrite{}
Expand Down

0 comments on commit 909501e

Please sign in to comment.