-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathcollapse.go
114 lines (84 loc) · 1.88 KB
/
collapse.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package templar
import (
"io"
"net/http"
"sync"
)
type RunningRequest struct {
Request *http.Request
others []Responder
done chan struct{}
}
type Collapser struct {
client Client
categorizer *Categorizer
lock sync.Mutex
running map[string]*RunningRequest
}
func NewCollapser(client Client, categorizer *Categorizer) *Collapser {
return &Collapser{
client: client,
categorizer: categorizer,
running: make(map[string]*RunningRequest),
}
}
type collapseResponder struct {
collapser *Collapser
request *http.Request
running *RunningRequest
}
func (c *collapseResponder) Send(res *http.Response) io.Writer {
return c.collapser.finish(c.request, res, c.running)
}
func (c *Collapser) finish(req *http.Request, res *http.Response, rr *RunningRequest) io.Writer {
c.lock.Lock()
key := req.URL.String()
delete(c.running, key)
c.lock.Unlock()
cw := &collapsedWriter{running: rr}
for _, c := range rr.others {
cw.w = append(cw.w, c.Send(res))
}
return cw
}
type collapsedWriter struct {
w []io.Writer
running *RunningRequest
}
func (cw *collapsedWriter) Write(p []byte) (n int, err error) {
for _, w := range cw.w {
n, err = w.Write(p)
if err != nil {
return
}
if n != len(p) {
err = io.ErrShortWrite
return
}
}
return len(p), nil
}
func (cw *collapsedWriter) Finish() {
close(cw.running.done)
}
func (c *Collapser) Forward(res Responder, req *http.Request) error {
if !c.categorizer.Stateless(req) {
return c.client.Forward(res, req)
}
c.lock.Lock()
key := req.URL.String()
if running, ok := c.running[key]; ok {
running.others = append(running.others, res)
c.lock.Unlock()
<-running.done
return nil
}
rr := &RunningRequest{
Request: req,
others: []Responder{res},
done: make(chan struct{}),
}
c.running[key] = rr
c.lock.Unlock()
return c.client.Forward(&collapseResponder{c, req, rr}, req)
}