forked from influxdata/telegraf
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconsul.go
208 lines (180 loc) · 5.91 KB
/
consul.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
package prometheus
import (
"bytes"
"context"
"fmt"
"net/url"
"strings"
"text/template"
"time"
"github.com/hashicorp/consul/api"
"github.com/influxdata/telegraf/config"
)
type ConsulConfig struct {
// Address of the Consul agent. The address must contain a hostname or an IP address
// and optionally a port (format: "host:port").
Enabled bool `toml:"enabled"`
Agent string `toml:"agent"`
QueryInterval config.Duration `toml:"query_interval"`
Queries []*ConsulQuery `toml:"query"`
}
// One Consul service discovery query
type ConsulQuery struct {
// A name of the searched services (not ID)
ServiceName string `toml:"name"`
// A tag of the searched services
ServiceTag string `toml:"tag"`
// A DC of the searched services
ServiceDc string `toml:"dc"`
// A template URL of the Prometheus gathering interface. The hostname part
// of the URL will be replaced by discovered address and port.
ServiceURL string `toml:"url"`
// Extra tags to add to metrics found in Consul
ServiceExtraTags map[string]string `toml:"tags"`
serviceURLTemplate *template.Template
serviceExtraTagsTemplate map[string]*template.Template
// Store last error status and change log level depending on repeated occurence
lastQueryFailed bool
}
func (p *Prometheus) startConsul(ctx context.Context) error {
consulAPIConfig := api.DefaultConfig()
if p.ConsulConfig.Agent != "" {
consulAPIConfig.Address = p.ConsulConfig.Agent
}
consul, err := api.NewClient(consulAPIConfig)
if err != nil {
return fmt.Errorf("cannot connect to the Consul agent: %v", err)
}
// Parse the template for metrics URL, drop queries with template parse errors
i := 0
for _, q := range p.ConsulConfig.Queries {
serviceURLTemplate, err := template.New("URL").Parse(q.ServiceURL)
if err != nil {
p.Log.Errorf("Could not parse the Consul query URL template (%s), skipping it. Error: %s", q.ServiceURL, err)
continue
}
q.serviceURLTemplate = serviceURLTemplate
// Allow to use join function in tags
templateFunctions := template.FuncMap{"join": strings.Join}
// Parse the tag value templates
q.serviceExtraTagsTemplate = make(map[string]*template.Template)
for tagName, tagTemplateString := range q.ServiceExtraTags {
tagTemplate, err := template.New(tagName).Funcs(templateFunctions).Parse(tagTemplateString)
if err != nil {
p.Log.Errorf("Could not parse the Consul query Extra Tag template (%s), skipping it. Error: %s", tagTemplateString, err)
continue
}
q.serviceExtraTagsTemplate[tagName] = tagTemplate
}
p.ConsulConfig.Queries[i] = q
i++
}
// Prevent memory leak by erasing truncated values
for j := i; j < len(p.ConsulConfig.Queries); j++ {
p.ConsulConfig.Queries[j] = nil
}
p.ConsulConfig.Queries = p.ConsulConfig.Queries[:i]
catalog := consul.Catalog()
p.wg.Add(1)
go func() {
// Store last error status and change log level depending on repeated occurence
var refreshFailed = false
defer p.wg.Done()
err := p.refreshConsulServices(catalog)
if err != nil {
refreshFailed = true
p.Log.Errorf("Unable to refreh Consul services: %v", err)
}
for {
select {
case <-ctx.Done():
return
case <-time.After(time.Duration(p.ConsulConfig.QueryInterval)):
err := p.refreshConsulServices(catalog)
if err != nil {
message := fmt.Sprintf("Unable to refreh Consul services: %v", err)
if refreshFailed {
p.Log.Debug(message)
} else {
p.Log.Warn(message)
}
refreshFailed = true
} else if refreshFailed {
refreshFailed = false
p.Log.Info("Successfully refreshed Consul services after previous errors")
}
}
}
}()
return nil
}
func (p *Prometheus) refreshConsulServices(c *api.Catalog) error {
consulServiceURLs := make(map[string]URLAndAddress)
p.Log.Debugf("Refreshing Consul services")
for _, q := range p.ConsulConfig.Queries {
queryOptions := api.QueryOptions{}
if q.ServiceDc != "" {
queryOptions.Datacenter = q.ServiceDc
}
// Request services from Consul
consulServices, _, err := c.Service(q.ServiceName, q.ServiceTag, &queryOptions)
if err != nil {
return err
}
if len(consulServices) == 0 {
p.Log.Debugf("Queried Consul for Service (%s, %s) but did not find any instances", q.ServiceName, q.ServiceTag)
continue
}
p.Log.Debugf("Queried Consul for Service (%s, %s) and found %d instances", q.ServiceName, q.ServiceTag, len(consulServices))
for _, consulService := range consulServices {
uaa, err := p.getConsulServiceURL(q, consulService)
if err != nil {
message := fmt.Sprintf("Unable to get scrape URLs from Consul for Service (%s, %s): %s", q.ServiceName, q.ServiceTag, err)
if q.lastQueryFailed {
p.Log.Debug(message)
} else {
p.Log.Warn(message)
}
q.lastQueryFailed = true
break
}
if q.lastQueryFailed {
p.Log.Infof("Created scrape URLs from Consul for Service (%s, %s)", q.ServiceName, q.ServiceTag)
}
q.lastQueryFailed = false
p.Log.Debugf("Adding scrape URL from Consul for Service (%s, %s): %s", q.ServiceName, q.ServiceTag, uaa.URL.String())
consulServiceURLs[uaa.URL.String()] = *uaa
}
}
p.lock.Lock()
p.consulServices = consulServiceURLs
p.lock.Unlock()
return nil
}
func (p *Prometheus) getConsulServiceURL(q *ConsulQuery, s *api.CatalogService) (*URLAndAddress, error) {
var buffer bytes.Buffer
buffer.Reset()
err := q.serviceURLTemplate.Execute(&buffer, s)
if err != nil {
return nil, err
}
serviceURL, err := url.Parse(buffer.String())
if err != nil {
return nil, err
}
extraTags := make(map[string]string)
for tagName, tagTemplate := range q.serviceExtraTagsTemplate {
buffer.Reset()
err = tagTemplate.Execute(&buffer, s)
if err != nil {
return nil, err
}
extraTags[tagName] = buffer.String()
}
p.Log.Debugf("Will scrape metrics from Consul Service %s", serviceURL.String())
return &URLAndAddress{
URL: serviceURL,
OriginalURL: serviceURL,
Tags: extraTags,
}, nil
}