-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
225 lines (184 loc) · 5.33 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
package main
import (
"fmt"
"io/ioutil"
"math/rand"
"net"
"os"
"strings"
"time"
"github.com/garyburd/redigo/redis"
"github.com/op/go-logging"
"github.com/streadway/amqp"
kingpin "gopkg.in/alecthomas/kingpin.v2"
yaml "gopkg.in/yaml.v2"
)
// Globals
var (
consoleLog = logging.MustGetLogger("console")
rmqConn *amqp.Connection
redisPool *redis.Pool
quoteServerTCPAddr *net.TCPAddr
)
const (
// Shared RMQ queues / exchanges
quoteRequestQ = "quote_req"
quoteBroadcastEx = "quote_broadcast"
)
func main() {
rand.Seed(time.Now().Unix())
loadConfig()
initConsoleLogging()
resolveTCPAddresses()
initRMQ()
defer rmqConn.Close()
initRedis()
forever := make(chan bool)
go handleQuoteRequest()
<-forever
}
func failOnError(err error, msg string) {
if err != nil {
consoleLog.Fatalf("%s: %s", msg, err)
}
}
func initConsoleLogging() {
// Create a default backend
consoleBackend := logging.NewLogBackend(os.Stdout, "", 0)
// Add output formatting
var consoleFormat = logging.MustStringFormatter(
`%{time:15:04:05.000} %{color}▶ %{level:8s}%{color:reset} %{id:03d} %{message} (%{shortfile})`,
)
consoleBackendFormatted := logging.NewBackendFormatter(consoleBackend, consoleFormat)
// Add leveled logging
level, err := logging.LogLevel(config.env.logLevel)
if err != nil {
fmt.Println("Bad log level. Using default level of ERROR")
}
consoleBackendFormattedAndLeveled := logging.AddModuleLevel(consoleBackendFormatted)
consoleBackendFormattedAndLeveled.SetLevel(level, "")
// Attach the backend
logging.SetBackend(consoleBackendFormattedAndLeveled)
}
// Holds values from <config>.yaml.
// 'PascalCase' values come from 'pascalcase' in x.yaml
var config struct {
// Stuff from the command line, through kingpin
env struct {
logLevel string
serviceID string
configFile string
}
// Stuff from ./config/$env.yaml
Rabbit struct {
Host string
Port int
User string
Pass string
}
QuoteServer struct {
Host string
Port int
Retry int
Backoff int
} `yaml:"quote server"`
Redis struct {
Host string
Port int
MaxIdle int `yaml:"max idle connections"`
MaxActive int `yaml:"max active connections"`
IdleTimeout int `yaml:"idle timeout"`
KeyPrefix string `yaml:"key prefix"`
}
QuotePolicy struct {
BaseTTL int `yaml:"base ttl"`
BackoffTTL int `yaml:"backoff ttl"`
} `yaml:"quote policy"`
}
func loadConfig() {
app := kingpin.New("quote_manager", "Get quotes from the legacy service")
var logLevels = []string{"CRITICAL", "ERROR", "WARNING", "NOTICE", "INFO", "DEBUG"}
app.Flag("log-level", fmt.Sprintf("Minimum level for logging to the console. Must be one of: %s", strings.Join(logLevels, ", "))).
Default("WARNING").
Short('l').
EnumVar(&config.env.logLevel, logLevels...)
app.Flag("service-id", "Logging name for the service").
Default("quotemgr").
Short('s').
StringVar(&config.env.serviceID)
app.Flag("config", "YAML file with service config").
Default("./config/dev.yaml").
Short('c').
ExistingFileVar(&config.env.configFile)
kingpin.MustParse(app.Parse(os.Args[1:]))
// Load the yaml file
data, err := ioutil.ReadFile(config.env.configFile)
if err != nil {
// `consoleLog` not set up yet so fail the old-fashioned way.
panic(err)
}
err = yaml.Unmarshal(data, &config)
if err != nil {
panic(err)
}
}
func resolveTCPAddresses() {
// TCP connections need to know the specific IP for the destination.
// We can do the lookup in advance since destinations are fixed.
quoteServerAddress := fmt.Sprintf("%s:%d",
config.QuoteServer.Host, config.QuoteServer.Port,
)
var err error
quoteServerTCPAddr, err = net.ResolveTCPAddr("tcp", quoteServerAddress)
failOnError(err, "Could not resolve TCP addr for "+quoteServerAddress)
}
func initRMQ() {
rabbitAddress := fmt.Sprintf("amqp://%s:%s@%s:%d",
config.Rabbit.User, config.Rabbit.Pass,
config.Rabbit.Host, config.Rabbit.Port,
)
var err error
rmqConn, err = amqp.Dial(rabbitAddress)
failOnError(err, "Failed to rmqConnect to RabbitMQ")
// closed in main()
ch, err := rmqConn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// Make sure all of the expected RabbitMQ exchanges and queues
// exist before we start using them.
// Recieve requests
_, err = ch.QueueDeclare(
quoteRequestQ, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
// Broadcasting quote updates
err = ch.ExchangeDeclare(
quoteBroadcastEx, // name
amqp.ExchangeTopic, // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // args
)
failOnError(err, "Failed to declare an exchange")
}
func initRedis() {
redisAddress := fmt.Sprintf("%s:%d", config.Redis.Host, config.Redis.Port)
redisPool = &redis.Pool{
MaxIdle: config.Redis.MaxIdle,
MaxActive: config.Redis.MaxActive,
IdleTimeout: time.Second * time.Duration(config.Redis.IdleTimeout),
Dial: func() (redis.Conn, error) { return redis.Dial("tcp", redisAddress) },
}
// Test if we can talk to redis
conn := redisPool.Get()
defer conn.Close()
_, err := conn.Do("PING")
failOnError(err, "Could not establish connection with Redis")
}