-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathcreate_topics_request.go
207 lines (168 loc) · 5.06 KB
/
create_topics_request.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
package healer
import (
"encoding/binary"
"errors"
)
type CreateTopicsRequest struct {
*RequestHeader
CreateTopicRequests []*CreateTopicRequest
Timeout uint32
}
// Length returns the length of bytes returned by Encode func
func (r *CreateTopicsRequest) Length() int {
l := r.RequestHeader.length()
l += 4
for _, r := range r.CreateTopicRequests {
l += r.length()
}
l += 4
return l
}
// Encode encodes CreateTopicsRequest to binary bytes
func (r *CreateTopicsRequest) Encode(version uint16) []byte {
requestLength := r.Length()
payload := make([]byte, requestLength+4)
offset := 0
binary.BigEndian.PutUint32(payload[offset:], uint32(requestLength))
offset += 4
offset += r.RequestHeader.EncodeTo(payload[offset:])
binary.BigEndian.PutUint32(payload[offset:], uint32(len(r.CreateTopicRequests)))
offset += 4
for _, r := range r.CreateTopicRequests {
offset += r.encode(payload[offset:])
}
binary.BigEndian.PutUint32(payload[offset:], r.Timeout)
//offset += 4
return payload
}
// CreateTopicRequest is sub struct in CreateTopicsRequest
type CreateTopicRequest struct {
Topic string
NumPartitions int32
ReplicationFactor int16
ReplicaAssignments []*ReplicaAssignment
ConfigEntries []*ConfigEntry
}
func (r *CreateTopicRequest) length() int {
l := 2 + len(r.Topic)
l += 4 + 2
l += 4
for _, ra := range r.ReplicaAssignments {
l += 4 // Partition
l += 4 //len(ra.Replicas)
l += 4 * len(ra.Replicas)
}
l += 4
for _, c := range r.ConfigEntries {
l += 2 + len(c.ConfigName) + 2 + len(c.ConfigValue)
}
return l
}
func (r *CreateTopicRequest) encode(payload []byte) (offset int) {
binary.BigEndian.PutUint16(payload[offset:], uint16(len(r.Topic)))
offset += 2
offset += copy(payload[offset:], r.Topic)
binary.BigEndian.PutUint32(payload[offset:], uint32(r.NumPartitions))
offset += 4
binary.BigEndian.PutUint16(payload[offset:], uint16(r.ReplicationFactor))
offset += 2
binary.BigEndian.PutUint32(payload[offset:], uint32(len(r.ReplicaAssignments)))
offset += 4
for _, ra := range r.ReplicaAssignments {
binary.BigEndian.PutUint32(payload[offset:], uint32(ra.Partition))
offset += 4
binary.BigEndian.PutUint32(payload[offset:], uint32(len(ra.Replicas)))
offset += 4
for _, replica := range ra.Replicas {
binary.BigEndian.PutUint32(payload[offset:], uint32(replica))
offset += 4
}
}
binary.BigEndian.PutUint32(payload[offset:], uint32(len(r.ConfigEntries)))
offset += 4
for _, c := range r.ConfigEntries {
binary.BigEndian.PutUint16(payload[offset:], uint16(len(c.ConfigName)))
offset += 2
offset += copy(payload[offset:], c.ConfigName)
binary.BigEndian.PutUint16(payload[offset:], uint16(len(c.ConfigValue)))
offset += 2
offset += copy(payload[offset:], c.ConfigValue)
}
return
}
// ReplicaAssignment is sub struct in CreateTopicRequest
type ReplicaAssignment struct {
Partition int32
Replicas []int32
}
// ConfigEntry is sub struct in CreateTopicRequest
type ConfigEntry struct {
ConfigName string
ConfigValue string
}
func NewCreateTopicsRequest(clientID string, timeout uint32) *CreateTopicsRequest {
requestHeader := &RequestHeader{
APIKey: API_CreateTopics,
APIVersion: 0,
ClientID: &clientID,
}
return &CreateTopicsRequest{
RequestHeader: requestHeader,
CreateTopicRequests: []*CreateTopicRequest{},
Timeout: timeout,
}
}
var (
errorDumplicatedTopic = errors.New("topic has been added to the create_topics request already")
)
// AddTopic gives user easy way to fill CreateTopicsRequest. it set ReplicaAssignment and ConfigEntries as nil, user can set them by AddReplicaAssignment and ADDConfigEntry
func (r *CreateTopicsRequest) AddTopic(topic string, partitions int32, replicationFactor int16) error {
for _, rr := range r.CreateTopicRequests {
if rr.Topic == topic {
return errorDumplicatedTopic
}
}
createTopicRequest := &CreateTopicRequest{
Topic: topic,
NumPartitions: partitions,
ReplicationFactor: replicationFactor,
}
r.CreateTopicRequests = append(r.CreateTopicRequests, createTopicRequest)
return nil
}
var (
errorTopicNotFound = errors.New("topic not found in the CreateTopicRequests")
errorPidOutOfRange = errors.New("partitionID in replica-assignment is out of range")
)
// AddReplicaAssignment add replicas of certain topic & pid to CreateTopicRequests. It returns errorTopicNotFound if topic has not beed added to the request; it overwrite replicas if pid exists
func (r *CreateTopicsRequest) AddReplicaAssignment(topic string, pid int32, replicas []int32) error {
var rr *CreateTopicRequest
var found = false
for _, rr = range r.CreateTopicRequests {
if rr.Topic == topic {
found = true
break
}
}
if !found {
return errorTopicNotFound
}
found = false
for _, ra := range rr.ReplicaAssignments {
if ra.Partition >= pid {
return errorPidOutOfRange
}
if ra.Partition == pid {
found = true
ra.Replicas = replicas
return nil
}
}
if !found {
rr.ReplicaAssignments = append(rr.ReplicaAssignments, &ReplicaAssignment{
Partition: pid,
Replicas: replicas,
})
}
return nil
}