forked from valyala/gozstd
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstream.go
128 lines (112 loc) · 3.61 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package gozstd
import (
"io"
"sync"
)
// StreamCompress compresses src into dst.
//
// This function doesn't work with interactive network streams, since data read
// from src may be buffered before passing to dst for performance reasons.
// Use Writer.Flush for interactive network streams.
func StreamCompress(dst io.Writer, src io.Reader) error {
return streamCompressDictLevel(dst, src, nil, DefaultCompressionLevel)
}
// StreamCompressLevel compresses src into dst using the given compressionLevel.
//
// This function doesn't work with interactive network streams, since data read
// from src may be buffered before passing to dst for performance reasons.
// Use Writer.Flush for interactive network streams.
func StreamCompressLevel(dst io.Writer, src io.Reader, compressionLevel int) error {
return streamCompressDictLevel(dst, src, nil, compressionLevel)
}
// StreamCompressDict compresses src into dst using the given dict cd.
//
// This function doesn't work with interactive network streams, since data read
// from src may be buffered before passing to dst for performance reasons.
// Use Writer.Flush for interactive network streams.
func StreamCompressDict(dst io.Writer, src io.Reader, cd *CDict) error {
return streamCompressDictLevel(dst, src, cd, 0)
}
func streamCompressDictLevel(dst io.Writer, src io.Reader, cd *CDict, compressionLevel int) error {
sc := getSCompressor(compressionLevel)
sc.zw.Reset(dst, cd, compressionLevel)
_, err := sc.zw.ReadFrom(src)
if err == nil {
err = sc.zw.Close()
}
putSCompressor(sc)
return err
}
type sCompressor struct {
zw *Writer
compressionLevel int
}
func getSCompressor(compressionLevel int) *sCompressor {
p := getSCompressorPool(compressionLevel)
v := p.Get()
if v == nil {
return &sCompressor{
zw: NewWriterLevel(nil, compressionLevel),
compressionLevel: compressionLevel,
}
}
return v.(*sCompressor)
}
func putSCompressor(sc *sCompressor) {
sc.zw.Reset(nil, nil, sc.compressionLevel)
p := getSCompressorPool(sc.compressionLevel)
p.Put(sc)
}
func getSCompressorPool(compressionLevel int) *sync.Pool {
// Use per-level compressor pools, since Writer.Reset is expensive
// between distinct compression levels.
sCompressorPoolLock.Lock()
p := sCompressorPool[compressionLevel]
if p == nil {
p = &sync.Pool{}
sCompressorPool[compressionLevel] = p
}
sCompressorPoolLock.Unlock()
return p
}
var (
sCompressorPoolLock sync.Mutex
sCompressorPool = make(map[int]*sync.Pool)
)
// StreamDecompress decompresses src into dst.
//
// This function doesn't work with interactive network streams, since data read
// from src may be buffered before passing to dst for performance reasons.
// Use Reader for interactive network streams.
func StreamDecompress(dst io.Writer, src io.Reader) error {
return StreamDecompressDict(dst, src, nil)
}
// StreamDecompressDict decompresses src into dst using the given dictionary dd.
//
// This function doesn't work with interactive network streams, since data read
// from src may be buffered before passing to dst for performance reasons.
// Use Reader for interactive network streams.
func StreamDecompressDict(dst io.Writer, src io.Reader, dd *DDict) error {
sd := getSDecompressor()
sd.zr.Reset(src, dd)
_, err := sd.zr.WriteTo(dst)
putSDecompressor(sd)
return err
}
type sDecompressor struct {
zr *Reader
}
func getSDecompressor() *sDecompressor {
v := sDecompressorPool.Get()
if v == nil {
return &sDecompressor{
zr: NewReader(nil),
}
}
return v.(*sDecompressor)
}
func putSDecompressor(sd *sDecompressor) {
sd.zr.Reset(nil, nil)
sDecompressorPool.Put(sd)
}
var sDecompressorPool sync.Pool