-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtransport.go
165 lines (150 loc) · 4.85 KB
/
transport.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package cubequeue
import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/streadway/amqp"
)
// TransactionTransport is responsible for publishing messages to amqp (for now)
type TransactionTransport struct {
consumer, publisher *amqp.Channel
connection *amqp.Connection
queue amqp.Queue
}
// TransactionTransportConnectionQueueSettings stores settings for declaring a listening queue
// The most important one is the queue name, other settings can be left to default
type TransactionTransportConnectionQueueSettings struct {
QueueName string
Durable bool
AutoDelete bool
Exclusive bool
NoWait bool
Args amqp.Table
}
// GetDefaultQueueSetting returns default settings, suitable for most cases
func GetDefaultQueueSetting(queueName string) TransactionTransportConnectionQueueSettings {
return TransactionTransportConnectionQueueSettings{
QueueName: queueName,
Durable: true,
AutoDelete: false,
Exclusive: false,
NoWait: false,
Args: nil,
}
}
// TransactionTransportConnectionSetting stores connection settings for the transport
type TransactionTransportConnectionSetting struct {
URL string
Queue TransactionTransportConnectionQueueSettings
}
// RoutingTableHandler func for handling the event
type RoutingTableHandler func(amqp.Delivery) error
// RoutingTable is used to route the requests from amqp
type RoutingTable map[string]RoutingTableHandler
// GetDefaultRoutingHandler returns default - empty handler
func GetDefaultRoutingHandler() RoutingTableHandler {
return func(message amqp.Delivery) error { return nil }
}
// Important handlers the routing table
const (
NoHandlerMessage = "no_handler"
ErrorMessage = "error"
RollbackMessage = "rollback"
)
// SubscribeSettings contains settings when subscribing to the queue
type SubscribeSettings struct {
Queue string
Consumer string
AutoAck bool
Exclusive bool
NoLocal bool
NoWait bool
Args amqp.Table
}
// GetDefaultSubscribeSettings returns default settings
func GetDefaultSubscribeSettings(queue string) SubscribeSettings {
return SubscribeSettings{
Queue: queue,
Consumer: "",
AutoAck: true,
Exclusive: false,
NoLocal: false,
NoWait: false,
Args: nil,
}
}
// NewTransactionTransport create a new transport
func NewTransactionTransport(connectionSetting TransactionTransportConnectionSetting) (*TransactionTransport, error) {
transport := TransactionTransport{}
var err error
transport.connection, err = amqp.Dial(connectionSetting.URL)
if err != nil {
return nil, errors.Wrap(err, "Cannot connect to the amqp")
}
transport.consumer, err = transport.connection.Channel()
if err != nil {
return nil, errors.Wrap(err, "Cannot create a consumer channel")
}
transport.publisher, err = transport.connection.Channel()
if err != nil {
return nil, errors.Wrap(err, "Cannot create a publisher channel")
}
//Now declare the queue we will listen on for the incoming messages
transport.queue, err = transport.consumer.QueueDeclare(
connectionSetting.Queue.QueueName,
connectionSetting.Queue.Durable,
connectionSetting.Queue.AutoDelete,
connectionSetting.Queue.Exclusive,
connectionSetting.Queue.NoWait,
connectionSetting.Queue.Args,
)
if err != nil {
return nil, errors.Wrap(err, "Cannot declare the queue for messages")
}
return &transport, nil
}
// Publish a message to a given queue
func (transport *TransactionTransport) Publish(queue string, message amqp.Publishing) error {
err := transport.publisher.Publish("", queue, false, false, message)
if err != nil {
return errors.Wrap(err, "Cannot publish a message to the queue")
}
return nil
}
// Subscribe for messages in current queue
func (transport *TransactionTransport) Subscribe(routingTable RoutingTable, settings SubscribeSettings) error {
messages, err := transport.consumer.Consume(
settings.Queue,
settings.Consumer,
settings.AutoAck,
settings.Exclusive,
settings.NoLocal,
settings.NoWait,
settings.Args,
)
if err != nil {
return errors.Wrap(err, "Cannot register a consumer")
}
for message := range messages {
logrus.WithField("message", message).Debug("Received message")
if _, ok := routingTable[message.Type]; !ok {
//No handler for the message, then use the no_handler handler
err = routingTable[NoHandlerMessage](message)
if err != nil {
logrus.WithError(err).WithField("message", message).Error("Error happened during transaction execution")
}
} else {
err = routingTable[message.Type](message)
if err != nil {
logrus.WithError(err).WithField("message", message).Error("Error happened during transaction execution")
}
}
logrus.WithField("message", message).Debug("Processed message")
}
return nil
}
// Close close all connections
func (transport *TransactionTransport) Close() {
transport.consumer.Close()
transport.publisher.Close()
transport.connection.Close()
}