forked from valyala/gorpc
-
Notifications
You must be signed in to change notification settings - Fork 1
/
encoding.go
118 lines (101 loc) · 2.28 KB
/
encoding.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package gorpc
import (
"bufio"
"compress/flate"
"encoding/gob"
"io"
)
// RegisterType registers the given type to send via rpc.
//
// The client must register all the response types the server may send.
// The server must register all the request types the client may send.
//
// There is no need in registering base Go types such as int, string, bool,
// float64, etc. or arrays, slices and maps containing base Go types.
//
// There is no need in registering argument and return value types
// for functions and methods registered via Dispatcher.
func RegisterType(x interface{}) {
gob.Register(x)
}
type wireRequest struct {
ID uint64
Request interface{}
}
type wireResponse struct {
ID uint64
Response interface{}
Error string
}
type messageEncoder struct {
e *gob.Encoder
bw *bufio.Writer
zw *flate.Writer
ww *bufio.Writer
}
func (e *messageEncoder) Close() error {
if e.zw != nil {
return e.zw.Close()
}
return nil
}
func (e *messageEncoder) Flush() error {
if e.zw != nil {
if err := e.ww.Flush(); err != nil {
return err
}
if err := e.zw.Flush(); err != nil {
return err
}
}
if err := e.bw.Flush(); err != nil {
return err
}
return nil
}
func (e *messageEncoder) Encode(msg interface{}) error {
return e.e.Encode(msg)
}
func newMessageEncoder(w io.Writer, bufferSize int, enableCompression bool, s *ConnStats) *messageEncoder {
w = newWriterCounter(w, s)
bw := bufio.NewWriterSize(w, bufferSize)
ww := bw
var zw *flate.Writer
if enableCompression {
zw, _ = flate.NewWriter(bw, flate.BestSpeed)
ww = bufio.NewWriterSize(zw, bufferSize)
}
return &messageEncoder{
e: gob.NewEncoder(ww),
bw: bw,
zw: zw,
ww: ww,
}
}
type messageDecoder struct {
d *gob.Decoder
zr io.ReadCloser
}
func (d *messageDecoder) Close() error {
if d.zr != nil {
return d.zr.Close()
}
return nil
}
func (d *messageDecoder) Decode(msg interface{}) error {
return d.d.Decode(msg)
}
func newMessageDecoder(r io.Reader, bufferSize int, enableCompression bool, s *ConnStats) *messageDecoder {
r = newReaderCounter(r, s)
br := bufio.NewReaderSize(r, bufferSize)
rr := br
var zr io.ReadCloser
if enableCompression {
zr = flate.NewReader(br)
rr = bufio.NewReaderSize(zr, bufferSize)
}
return &messageDecoder{
d: gob.NewDecoder(rr),
zr: zr,
}
}