forked from linkerd/linkerd2
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathendpoint_profile_translator.go
172 lines (150 loc) · 5.17 KB
/
endpoint_profile_translator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
package destination
import (
"fmt"
pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
"github.com/linkerd/linkerd2/controller/api/destination/watcher"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
logging "github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
)
type endpointProfileTranslator struct {
enableH2Upgrade bool
controllerNS string
identityTrustDomain string
defaultOpaquePorts map[uint32]struct{}
stream pb.Destination_GetProfileServer
endStream chan struct{}
updates chan *watcher.Address
stop chan struct{}
current *pb.DestinationProfile
log *logging.Entry
}
// endpointProfileUpdatesQueueOverflowCounter is a prometheus counter that is incremented
// whenever the profile updates queue overflows.
//
// We omit ip and port labels because they are high cardinality.
var endpointProfileUpdatesQueueOverflowCounter = promauto.NewCounter(
prometheus.CounterOpts{
Name: "endpoint_profile_updates_queue_overflow",
Help: "A counter incremented whenever the endpoint profile updates queue overflows",
},
)
// newEndpointProfileTranslator translates pod updates and profile updates to
// DestinationProfiles for endpoints
func newEndpointProfileTranslator(
enableH2Upgrade bool,
controllerNS,
identityTrustDomain string,
defaultOpaquePorts map[uint32]struct{},
stream pb.Destination_GetProfileServer,
endStream chan struct{},
log *logging.Entry,
) *endpointProfileTranslator {
return &endpointProfileTranslator{
enableH2Upgrade: enableH2Upgrade,
controllerNS: controllerNS,
identityTrustDomain: identityTrustDomain,
defaultOpaquePorts: defaultOpaquePorts,
stream: stream,
endStream: endStream,
updates: make(chan *watcher.Address, updateQueueCapacity),
stop: make(chan struct{}),
log: log.WithField("component", "endpoint-profile-translator"),
}
}
// Start initiates a goroutine which processes update events off of the
// endpointProfileTranslator's internal queue and sends to the grpc stream as
// appropriate. The goroutine calls non-thread-safe Send, therefore Start must
// not be called more than once.
func (ept *endpointProfileTranslator) Start() {
go func() {
for {
select {
case update := <-ept.updates:
ept.update(update)
case <-ept.stop:
return
}
}
}()
}
// Stop terminates the goroutine started by Start.
func (ept *endpointProfileTranslator) Stop() {
close(ept.stop)
}
// Update enqueues an address update to be translated into a DestinationProfile.
// An error is returned if the update cannot be enqueued.
func (ept *endpointProfileTranslator) Update(address *watcher.Address) error {
select {
case ept.updates <- address:
// Update has been successfully enqueued.
return nil
default:
select {
case <-ept.endStream:
// The endStream channel has already been closed so no action is
// necessary.
return fmt.Errorf("profile update stream closed")
default:
// We are unable to enqueue because the channel does not have capacity.
// The stream has fallen too far behind and should be closed.
endpointProfileUpdatesQueueOverflowCounter.Inc()
close(ept.endStream)
return fmt.Errorf("profile update queue full; aborting stream")
}
}
}
func (ept *endpointProfileTranslator) queueLen() int {
return len(ept.updates)
}
func (ept *endpointProfileTranslator) update(address *watcher.Address) {
var opaquePorts map[uint32]struct{}
if address.Pod != nil {
opaquePorts = watcher.GetAnnotatedOpaquePorts(address.Pod, ept.defaultOpaquePorts)
} else {
opaquePorts = watcher.GetAnnotatedOpaquePortsForExternalWorkload(address.ExternalWorkload, ept.defaultOpaquePorts)
}
endpoint, err := ept.createEndpoint(*address, opaquePorts)
if err != nil {
ept.log.Errorf("Failed to create endpoint for %s:%d: %s",
address.IP, address.Port, err)
return
}
_, opaqueProtocol := opaquePorts[address.Port]
profile := &pb.DestinationProfile{
RetryBudget: defaultRetryBudget(),
Endpoint: endpoint,
OpaqueProtocol: opaqueProtocol || address.OpaqueProtocol,
}
if proto.Equal(profile, ept.current) {
ept.log.Debugf("Ignoring redundant profile update: %+v", profile)
return
}
ept.log.Debugf("Sending profile update: %+v", profile)
if err := ept.stream.Send(profile); err != nil {
ept.log.Errorf("failed to send profile update: %s", err)
return
}
ept.current = profile
}
func (ept *endpointProfileTranslator) createEndpoint(address watcher.Address, opaquePorts map[uint32]struct{}) (*pb.WeightedAddr, error) {
var weightedAddr *pb.WeightedAddr
var err error
if address.ExternalWorkload != nil {
weightedAddr, err = createWeightedAddrForExternalWorkload(address, opaquePorts)
} else {
weightedAddr, err = createWeightedAddr(address, opaquePorts, ept.enableH2Upgrade, ept.identityTrustDomain, ept.controllerNS)
}
if err != nil {
return nil, err
}
// `Get` doesn't include the namespace in the per-endpoint
// metadata, so it needs to be special-cased.
if address.Pod != nil {
weightedAddr.MetricLabels["namespace"] = address.Pod.Namespace
} else if address.ExternalWorkload != nil {
weightedAddr.MetricLabels["namespace"] = address.ExternalWorkload.Namespace
}
return weightedAddr, err
}